Upgrading Kafka on a large infra, or: when moving at scale requires careful planning

Martin Richard
May 13, 2019 · 11 min read

At Criteo, Kafka is one of the most critical pieces of infrastructure we maintain. It receives between 4M and 8M records per second. Any downtime or incident can result in data loss that we can’t afford.

Thanks to the good design of Kafka and our automation tools, upgrading Kafka is usually a straightforward process. Yet, the version 0.11 introduced a major change in its network protocol. The impact on our infrastructure was so large that we decided to postpone the upgrade and focus on other tasks for almost a year.

In this blog post, we’ll tell the story of how we managed to upgrade from Kafka 0.10 to 2.0. It took us around six months of work to finish it. We will describe our Kafka infrastructure, the way we manage it and discuss some SRE principles we followed to make this upgrade a success.

The Kafka setup that powers Criteo

Criteo owns its hardware and doesn’t rely on the cloud for its production. Some of our systems are running on Mesos (in containers), some others are running on bare-metal.

Kafka happens to run on bare-metal. Our servers are powered by large CPUs, 256 GiB of RAM, 10Gbps network and 14TiB of hard disk drives. We automate their provisioning and configuration with chef.

We use Kafka in all our data centers, spread across the globe, for two main use cases.

Kafka as a data sink

All our applications serving internet traffic dump the data they produce in the closest Kafka cluster (usually in the same data center). A single large consumer application will import and organize the data collected on Hadoop.

6 Kafka clusters handling up to 2.2M messages/sec (800 MiBps)

We have 6 Kafka clusters of various size, from 18 to 43 servers. Since they use similar hardware and are kept well balanced, any server deals with more or less the same load: 70k messages per second (or 25 MiB/sec). These messages are sent by 2 or 3 thousand of clients.

We keep 72 hours of data on disk. It gives us some time to cope with a catastrophic failure (it never happened) during a week-end. We also keep around one hour of data in memory: the goal is to leverage the IO cache of the OS to avoid reading fresh data from the disks.

Kafka for streaming apps

We also opened 8 smaller clusters. We call them our “general-purpose” clusters. They are used for all the use cases that don’t fall in the first one: that is, our streaming applications. They receive between 100k and 200k messages per second at peak.

These clusters are multi-tenant: it’s significantly harder to predict the usage patterns of these clusters than for our data sink. We believe that running one cluster per use-case (or client) is still too complex and wasteful to be worth it. To build a “Kafka-as-a-service” platform, we need to add another layer of automation and complexity. We want to wait until we know that it will be useful at Criteo.

Clients

Most of the clients of Kafka are applications written in C#. The others use the official Java driver. When we started to use Kafka, there was no C# client we could use, so we built our own. It’s open-source and available on Github at https://github.com/criteo/kafka-sharp.

This driver has slightly different trade-offs than the official driver. In particular, it will try to dump its data as fast as possible on any server it can. It does so because the producer can’t afford to pile up messages in memory for too long, and might be required to discard some.

The C# driver was the very first reason why upgrading our infrastructure took so long. We had to implement the protocol changes in the client before we upgraded the servers.

Hope for the best, plan for the worst

Since we can’t pause the whole system for the duration of the upgrade, we operate on a single server at a time. Thus, Kafka must maintain the compatibility between different versions of the clients and other servers.

A Kafka server may need to convert messages to the older protocol to exchange with peers not supporting the new version yet. This extra work incurs more CPU and memory usage. We had to understand how it would impact our SLA.

The SLA (Service Level Agreement) is a form of contract with our users defining how well our Kafka servers must perform, and what measures we put in place (on-call rotations) to ensure it.

In a cluster, there is always a server available to receive messages: the producers will not need to drop messages. There is always a server available to send data: if not, the import to Hadoop will start to lag. When a server acknowledged a message, it must be durable (not lost).

How do we guarantee this? A partition of the data is replicated on 3 servers. This allows us to have 2 servers offline and still be able to read data.

We have only 2 Kafka servers of the same cluster per rack. Unless one machine is already down or 2 racks fail at the same time, the previous requirement is satisfied. It allows us to plan short maintenance operations. For instance, we can have a rack down for 15 minutes when the switch of this rack is upgraded.

If one server is unavailable for a longer period, we have to work. We need to spread its work on the other alive machines, then fix (or replace) the failing server.

We also keep an eye on the “produce” latency: it’s the amount of time it takes for a server to acknowledge a message to a client. If this becomes too long, the client might think that its message is lost or that the server is failing. It is enforced by good capacity planning: the servers are not overloaded and can handle requests in a timely fashion.

Explorations

We had to understand how we would perform the upgrade and see how it would impact a running cluster. We decided to try in on a preprod cluster. There were two goals. We wanted to train ourselves with the operational procedure. We also wanted to test the stability of a cluster while the servers run in “compatibility mode” and once they are all upgraded.

We knew that we would not be confident enough to start in production right after this test. The way we simulate the production pipeline in preprod allows us to catch bugs and test our software (the import to Hadoop). However, the size and load of the preprod can’t guarantee us that we will uncover all major operational issues that can happen in production.

We have been able to uncover some pitfalls though. Notably, we found a bug in Kafka. It was severe enough to delay the upgrade: in some cases, a server could take hours to synchronize with the others in the cluster, which translates into a long period of downtime of one server threatening our SLA.

We spent around two weeks working with Confluent: we did our best to provide accurate data to investigate the bug. Eventually, we successfully tried their patch and were ready to move on. If you are interested, the bug has been reported and fixed there: https://issues.apache.org/jira/browse/KAFKA-7886.

In the meantime, we had to find a way to rollout the upgrade in production despite the risk of discovering new problems in production. Would we discover a new bug when running in production? If so, how can we cope with it? Would we have enough server power to sustain the extra work of the “compatibility mode”?

We spent several weeks testing various scenarios in which we could expect problems. For instance, sometimes, the cluster receives invalid messages from a source we didn’t identify yet. We had to simulate this case and others to check that the server would not crash while trying to convert one of these invalid messages.

We also decided to use canaries that would alert us if ever we missed anything else.

A canary consists of one (or few) server(s) that will be upgraded first. If they behave well for a while, it tells us that it seems safe to proceed and upgrade more servers.

We thus decided to roll out the upgrade progressively in a cluster: each week we upgraded more servers (1, then 3, then half a cluster, then the remaining servers). We could adjust and fasten the deployment if we thought it was safe enough.

Using canaries meant that the deployment would span over weeks, not counting the time required to upgrade the clients. We had to be extra sure that our clusters were sufficiently sized to handle the cost of conversion. We decided to run load tests.

Load testing and tuning

The goal of the test was to fine-tune the configuration of a server to ensure it could sustain the required load without breaking our SLA. We set our target to more or less 100k messages per second per server. The reasoning behind this number is a back-of-the-envelope estimate:

  • Assuming 70k messages per second per server,

It was crucial to have both the right tools to observe that everything was going OK and a procedure to manage the case when things would go wrong. This is what we’re going to discuss next.

Preparing for production changes: applying SRE principles

We have an exhaustive monitoring stack in place. While we store a lot of metrics from the Kafka servers, we don’t use them that much in our daily job. We prefer to use higher level metrics giving us a holistic view of our system.

It means that we look and set alerts on metrics matching our use cases. For instance, how many messages a producer failed to send, or how late our import pipeline is. We only look at technical metrics (like latency, CPU load, etc) when we need to investigate an anomaly in the high-level metrics.

However, when performing a large technical change like this one, we believe that it’s worth investing time to build monitoring tools that will allow keeping track of our progress. We decided to use a small set of relevant technical metrics to keep an eye on the health of our servers. In particular, we tracked:

  • The rate of conversions: compared to the rate of messages flowing through our system, that’s how we know that we indeed upgraded our servers.

We made sure our monitoring was keeping track of relevant metrics that would allow to investigate issues related to this change. We also built a dashboard to keep an eye on these metrics. We use it to quickly answer the question: is everything all right with the cluster we upgraded?

The data showed on the dashboard is filtered by cluster. We tried to keep it as light and meaningful as possible.

There is a table exposing the list of servers and their configuration settings, telling if a server has been upgraded yet or not. We also have graphs showing an aggregate of the metrics we listed previously: we display the sum, average or 99th percentile of the metrics to keep at most one or two series per graph.

Since the board displays stats for only one cluster, we added two tables showing the most loaded servers across all clusters: we highlight in orange or red the ones with values that should be checked. Thanks to this, we always keep an eye at all clusters without switching between boards.

You can see a screenshot of this board during the upgrade of one of our clusters (between 11:15 and 11:55). It was off-peak time, but the impact is nevertheless noticeable.

We then wrote a run-book to help the engineer on-call to evaluate if a page is related to the upgrade process. It contains a guide explaining how to investigate with the dashboard, and a list of mitigation techniques and rollback procedures by order of priority:

  1. A list of configuration settings which can be tweaked. We include recommended values and how they will impact the system,

We documented the procedures step-by-step without assumptions about the familiarity of the on-call engineer with the system. Even if the whole team that share on-calls for the Kafka architecture is trained, we wanted the run-book to be as straightforward as possible. We know that an excellent engineer can make mistakes at 3am!

Last but not least, we communicated with other parties during the whole process: we detailed the potential impact to users and kept relevant teams in sync when changes were performed in production. We kept all this information well organized in a wiki, along copies of the emails we sent.

Pushing the big button

Thanks to our chef automation pipeline, applying changes to the servers configuration is a matter of merging a git commit and clicking on a deploy button. Even better, we have a system ensuring that a configuration change that require a restart of a server can’t be performed unless the whole cluster is healthy: we can submit several changes which will be correctly applied automatically.

So, we prepared all the patches to review, and following our release schedule, we clicked “deploy” regularly.

We were happy to observe each step going smoothly. We observed the CPU and latency increase we expected, but it stayed in the range of acceptable values. We didn’t use the run-book yet. One might think that it was a waste of time to plan that much. We prefer to think it’s because we prepared this upgrade carefully that we didn’t need it.

There is still work to do though! This upgrade is a first step, now it’s time to leverage all the new features of Kafka to build robust streaming systems working at scale. Feeling like it’s a job for you? We’re looking for engineers to grow our team. You don’t need to be an expert in Kafka or chef, most of us had no prior experience with them when we joined!

Want to be part of it? Join us!

Criteo R&D Blog

Tech stories from the R&D team

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store