MirrorMaker Performance Tuning

Tuning Kafka for Cross Data Center Replication

Morgan Galpin
Salesforce Engineering
8 min readSep 26, 2017

--

Photo By romainguy (Flickr) [CC BY-SA 2.0], via Wikimedia Commons

Apache Kafka Architecture at Salesforce

Today, Salesforce manages data centers across the globe, delivering customer success with the most trusted, reliable and resilient infrastructure available. Each data center has one or more Kafka clusters, each for servicing producers in a logical subdivision of the data center. The data in each of these leaf Kafka clusters is mirrored to a Kafka cluster in an aggregate data center. Data consumers in an aggregate data center can read from the local Kafka cluster.

Figure 1: Salesforce push model replication.

The primary use cases supported are transportation of metric and log data. These two cases have had significant influence over the system architecture. Overall, throughput is valued more than end-to-end latency due to the non-real time nature of the data and the high volume. For instance, for metrics, they’re generally produced once per minute, so if there were a 10–30 second delay in a portion of them, no downstream systems would be affected. Logs are similar, but the tolerance for delay is on the order of minutes rather than seconds as they’re used for archiving and later analysis.

As for data volume, metrics and logs behave differently. Metrics are small, on the order of 100 bytes, but frequent and steady through the day. Each leaf cluster receives ~1MB of metrics per second. Logs on the other hand, are large, roughly 1kB — 100kB per log record, and have a high volume during business hours and lower volume outside business hours. Each leaf cluster ingests ~10–40MB/s during peak and ~5–20MB/s off peak depending on the cluster.

While we optimized for throughput rather than latency, latency is still the primary basis of our service’s SLA. The reason being that while no one will notice a few seconds of delay, people notice if their metrics are 15 minutes late. When the service is under heavy load, inadequate throughput causes high latency as messages build up in the leaf cluster. Optimizing MirrorMaker throughput minimizes these high-latency-spike events and provides a consistent low-average latency.

Performance Improvements

Increase throughput over high latency

With our MirrorMaker hosts producing over a high latency (~30–100ms) connection, data volume growth quickly revealed MirrorMaker to be the throughput bottleneck for the service. We experienced three main issues affecting throughput: inadequate compression, under-filled batches, and low parallelism. We made changes to produce higher throughput. The effects on a low latency connection would also be positive but are more pronounced on a high latency connection.

Enable compression

Smaller messages are faster to send than bigger messages, so making larger messages smaller will, in general, make them send faster. It also saves on bandwidth, which, is finite. It is important to note that compression is more effective with larger messages than with smaller ones as there is more opportunity to find repeated chunks, so enabling compression combines well with ensuring message batches are full.

We changed compression.type from the default of none to gzip. The effect was a substantial decrease in the number of bytes transmitted from MirrorMaker to the aggregate cluster. It also reduced the amount of disk space needed to store the messages as Kafka stores them in their original compressed batches. This allowed us to increase retention for some of the higher volume topics.

Fill message batches

Compressed data has a higher throughput than uncompressed data. The Kafka producer collects messages into a batch, compresses the batch, then sends it to a broker. If a batch gets too old before it’s full, the producer sends it before it’s completely full. The smaller batches don’t compress as efficiently and a larger number of batches need to be transmitted for the same total volume of data. The Kafka producer has a setting called linger.ms which allows batches to stay open longer to allow more time for data to fill it. This can result in larger batches which can increase throughput.

Both metrics and logs were configured to send batches, however metrics volume was low enough that each batch only contained 1–2000 messages (~100–100K bytes). Part of the analysis of this data involved setting linger.ms to 60000 and setting up a consumer that sampled all the data over a period across all the topics. It measured how long it took to fill each batch and output a histogram of the time to fill the batches. Metrics took longer to fill a batch of the same size than logs because they’re smaller and less frequent.

Figure 2: Distribution of time required to fill batches of metrics and logs.

From the data, 95% of metric batches would be filled with linger.ms set to 30000. With the same value, 99% of log batches would be filled. Since metric volume is relatively small compared to log volume and is more latency sensitive, we changed linger.ms to a much more conservative value of 15000 for metrics. We went with the full 30000 for logs. This change increased the base latency when load is lighter, but had a positive effect on batch sizes and consequently on throughput.

Figure 3: The drop in network usage for metrics when linger.ms was applied. Note that compression was already enabled.

Increase parallelism

MirrorMaker in version 0.8 had multiple consumer and producer threads, the number of which could be configured independently. Being able to configure the number of producer threads was valuable over a high latency connection because the sending threads can wait out the round trip time in parallel rather than in series. So for this Kafka version we had 1 MirrorMaker process per MirrorMaker host.

However with Kafka 0.9, MirrorMaker changed to having a single producer while the number of consumers was configurable. This design works better if MirrorMaker is situated in the same data center as the aggregate Kafka cluster. This way it consumes using many threads over the high latency connection and produces over the low latency connection. Our architecture is operating under the security constraint that network connections can only be initiated from leaf data centers into the aggregate data centers and not the other way around.

So in order to increase the number of producers, we automated starting up multiple MirrorMaker processes per MirrorMaker host. We found maximum throughput when we configured the total number of processes to the maximum number of partitions across all the replicated topics. For example, if the biggest topic has 20 partitions, then start 10 MirrorMaker processes on each of 2 MirrorMaker hosts. Likewise, if there are 4 MirrorMaker hosts, then start 5 processes on each. Adding more processes above the number of partitions in a topic results in idle producers as a partition is only assigned to one consumer. To do this we configured MirrorMaker to only create one consumer thread, per process, so only one partition per topic would be assigned per process. This allowed us to evenly smear a topic’s partitions across all the MirrorMaker processes and consequently, the producers. Following the previous example, if the largest topic were 20 partitions, then there would be 20 MirrorMaker processes, each with 1 consumer and 1 producer.

Batch size should be smaller than max message size

Initially we increased the MirrorMaker producer batch.size from the default of 16384 to 1000000 bytes, which is the same size as the default topic max.message.bytes. The problem with this approach is that Kafka fills the batch of messages not based on the total size of uncompressed messages, but rather on the estimated compressed size of messages. So what can happen is the compression ratio of the data is sometimes higher than the estimated ratio which causes the size of the compressed batch to be larger than the topic max.message.bytes size. This will cause the batch of messages to be rejected by the broker. The broker will log a kafka.common.MessageSizeTooLargeException when this happens. MirrorMaker will either drop the message or terminate depending on how it is configured.

The first part of the solution was to ensure MirrorMaker’s producer max.request.size = topic max.message.bytes. Also, producer max.request.size = broker message.max.bytes. This ensures that regular (non-batched) messages read from the leaf Kafka cluster can be successfully sent to the aggregator Kafka cluster.

The second part of the solution is to ensure the producer’s batch.size < “expected compression ratio” * max.request.size. The Kafka producer will dynamically update its expected compression ratio based on the compression ratio of past messages. So to guard against a compressed batch being larger than max.message.bytes, choose a batch.size that is less than max.message.bytes * “maximum compression ratio”. So a really conservative value would be 20:1 or 0.05. It depends on the input messages, but a theoretical maximum could be 1000:1, at which point you should doubt the value of the data being sent as it is almost entirely redundant. The Kafka producer reports the average compression ratio, which could be a good starting point, but a full histogram of all the batches would be the most accurate. In our case, we used (0.05 * batch.size = 50000) since we’re mirroring a variety of topics and don’t have direct control over the message content.

Enough buffer memory for all the partitions

The Kafka producer collects messages into batches in a single shared block of memory. The block is divided into chunks that are batch.size bytes each. As messages are sent to the producer, they’re added to a new batch or to an existing one if there is one. If the number of partitions on the aggregate cluster side is much greater than the number of batches that can fit in the the buffer, the call to producer.send() will block until one becomes available. This can have a significant negative impact on throughput performance.

Figure 4: Consumer threads share a fixed buffer and write to batches in assigned chunks. The producer’s sender thread reads completed batches and frees the chunk in the buffer. The shared buffer is full of in-progress batches so consumer thread #3 blocks waiting for a chunk to become available. Messages from topic 3 become delayed.

The solution was to set the producer buffer.size = “total aggregate cluster partitions” * batch.size. This causes the MirrorMaker process to allocate a large static chunk of memory but the consumer threads don’t become blocked waiting for space in the producer’s buffer, and are able to consume data as fast as it becomes available. The producer is able to compress and send full batches which maximizes overall throughput.

Figure 5: There is enough space in the buffer for one batch per partition so no consumer threads block.

Summary

Together, the performance affecting configuration values applied to the MirrorMaker producer are:

To increase producer parallelism, launch multiple MirrorMaker processes. The default for the num.streams command line argument is 1 so it doesn’t require modification.

MirrorMaker is designed for the pull model, though with some tuning, it can be made to perform well under the push model.

Follow us on Twitter: @SalesforceEng
Want to work with us?
Salesforce Eng Jobs

--

--

Morgan Galpin
Salesforce Engineering

Software Engineering Lead Member of Technical Staff at Salesforce