MirrorMaker Performance Tuning
Tuning Kafka for Cross Data Center Replication
Apache Kafka Architecture at Salesforce
Today, Salesforce manages data centers across the globe, delivering customer success with the most trusted, reliable and resilient infrastructure available. Each data center has one or more Kafka clusters, each for servicing producers in a logical subdivision of the data center. The data in each of these leaf Kafka clusters is mirrored to a Kafka cluster in an aggregate data center. Data consumers in an aggregate data center can read from the local Kafka cluster.
The primary use cases supported are transportation of metric and log data. These two cases have had significant influence over the system architecture. Overall, throughput is valued more than end-to-end latency due to the non-real time nature of the data and the high volume. For instance, for metrics, they’re generally produced once per minute, so if there were a 10–30 second delay in a portion of them, no downstream systems would be affected. Logs are similar, but the tolerance for delay is on the order of minutes rather than seconds as they’re used for archiving and later analysis.
As for data volume, metrics and logs behave differently. Metrics are small, on the order of 100 bytes, but frequent and steady through the day. Each leaf cluster receives ~1MB of metrics per second. Logs on the other hand, are large, roughly 1kB — 100kB per log record, and have a high volume during business hours and lower volume outside business hours. Each leaf cluster ingests ~10–40MB/s during peak and ~5–20MB/s off peak depending on the cluster.
While we optimized for throughput rather than latency, latency is still the primary basis of our service’s SLA. The reason being that while no one will notice a few seconds of delay, people notice if their metrics are 15 minutes late. When the service is under heavy load, inadequate throughput causes high latency as messages build up in the leaf cluster. Optimizing MirrorMaker throughput minimizes these high-latency-spike events and provides a consistent low-average latency.
Performance Improvements
Increase throughput over high latency
With our MirrorMaker hosts producing over a high latency (~30–100ms) connection, data volume growth quickly revealed MirrorMaker to be the throughput bottleneck for the service. We experienced three main issues affecting throughput: inadequate compression, under-filled batches, and low parallelism. We made changes to produce higher throughput. The effects on a low latency connection would also be positive but are more pronounced on a high latency connection.
Enable compression
Smaller messages are faster to send than bigger messages, so making larger messages smaller will, in general, make them send faster. It also saves on bandwidth, which, is finite. It is important to note that compression is more effective with larger messages than with smaller ones as there is more opportunity to find repeated chunks, so enabling compression combines well with ensuring message batches are full.
We changed compression.type
from the default of none
to gzip
. The effect was a substantial decrease in the number of bytes transmitted from MirrorMaker to the aggregate cluster. It also reduced the amount of disk space needed to store the messages as Kafka stores them in their original compressed batches. This allowed us to increase retention for some of the higher volume topics.
Fill message batches
Compressed data has a higher throughput than uncompressed data. The Kafka producer collects messages into a batch, compresses the batch, then sends it to a broker. If a batch gets too old before it’s full, the producer sends it before it’s completely full. The smaller batches don’t compress as efficiently and a larger number of batches need to be transmitted for the same total volume of data. The Kafka producer has a setting called linger.ms
which allows batches to stay open longer to allow more time for data to fill it. This can result in larger batches which can increase throughput.
Both metrics and logs were configured to send batches, however metrics volume was low enough that each batch only contained 1–2000 messages (~100–100K bytes). Part of the analysis of this data involved setting linger.ms
to 60000 and setting up a consumer that sampled all the data over a period across all the topics. It measured how long it took to fill each batch and output a histogram of the time to fill the batches. Metrics took longer to fill a batch of the same size than logs because they’re smaller and less frequent.
From the data, 95% of metric batches would be filled with linger.ms
set to 30000. With the same value, 99% of log batches would be filled. Since metric volume is relatively small compared to log volume and is more latency sensitive, we changed linger.ms
to a much more conservative value of 15000 for metrics. We went with the full 30000 for logs. This change increased the base latency when load is lighter, but had a positive effect on batch sizes and consequently on throughput.
Increase parallelism
MirrorMaker in version 0.8 had multiple consumer and producer threads, the number of which could be configured independently. Being able to configure the number of producer threads was valuable over a high latency connection because the sending threads can wait out the round trip time in parallel rather than in series. So for this Kafka version we had 1 MirrorMaker process per MirrorMaker host.
However with Kafka 0.9, MirrorMaker changed to having a single producer while the number of consumers was configurable. This design works better if MirrorMaker is situated in the same data center as the aggregate Kafka cluster. This way it consumes using many threads over the high latency connection and produces over the low latency connection. Our architecture is operating under the security constraint that network connections can only be initiated from leaf data centers into the aggregate data centers and not the other way around.
So in order to increase the number of producers, we automated starting up multiple MirrorMaker processes per MirrorMaker host. We found maximum throughput when we configured the total number of processes to the maximum number of partitions across all the replicated topics. For example, if the biggest topic has 20 partitions, then start 10 MirrorMaker processes on each of 2 MirrorMaker hosts. Likewise, if there are 4 MirrorMaker hosts, then start 5 processes on each. Adding more processes above the number of partitions in a topic results in idle producers as a partition is only assigned to one consumer. To do this we configured MirrorMaker to only create one consumer thread, per process, so only one partition per topic would be assigned per process. This allowed us to evenly smear a topic’s partitions across all the MirrorMaker processes and consequently, the producers. Following the previous example, if the largest topic were 20 partitions, then there would be 20 MirrorMaker processes, each with 1 consumer and 1 producer.
Batch size should be smaller than max message size
Initially we increased the MirrorMaker producer batch.size
from the default of 16384 to 1000000 bytes, which is the same size as the default topic max.message.bytes
. The problem with this approach is that Kafka fills the batch of messages not based on the total size of uncompressed messages, but rather on the estimated compressed size of messages. So what can happen is the compression ratio of the data is sometimes higher than the estimated ratio which causes the size of the compressed batch to be larger than the topic max.message.bytes
size. This will cause the batch of messages to be rejected by the broker. The broker will log a kafka.common.MessageSizeTooLargeException
when this happens. MirrorMaker will either drop the message or terminate depending on how it is configured.
The first part of the solution was to ensure MirrorMaker’s producer max.request.size
= topic max.message.bytes
. Also, producer max.request.size
= broker message.max.bytes
. This ensures that regular (non-batched) messages read from the leaf Kafka cluster can be successfully sent to the aggregator Kafka cluster.
The second part of the solution is to ensure the producer’s batch.size
< “expected compression ratio” * max.request.size
. The Kafka producer will dynamically update its expected compression ratio based on the compression ratio of past messages. So to guard against a compressed batch being larger than max.message.bytes
, choose a batch.size
that is less than max.message.bytes
* “maximum compression ratio”. So a really conservative value would be 20:1 or 0.05. It depends on the input messages, but a theoretical maximum could be 1000:1, at which point you should doubt the value of the data being sent as it is almost entirely redundant. The Kafka producer reports the average compression ratio, which could be a good starting point, but a full histogram of all the batches would be the most accurate. In our case, we used (0.05 * batch.size
= 50000) since we’re mirroring a variety of topics and don’t have direct control over the message content.
Enough buffer memory for all the partitions
The Kafka producer collects messages into batches in a single shared block of memory. The block is divided into chunks that are batch.size
bytes each. As messages are sent to the producer, they’re added to a new batch or to an existing one if there is one. If the number of partitions on the aggregate cluster side is much greater than the number of batches that can fit in the the buffer, the call to producer.send()
will block until one becomes available. This can have a significant negative impact on throughput performance.
The solution was to set the producer buffer.size
= “total aggregate cluster partitions” * batch.size
. This causes the MirrorMaker process to allocate a large static chunk of memory but the consumer threads don’t become blocked waiting for space in the producer’s buffer, and are able to consume data as fast as it becomes available. The producer is able to compress and send full batches which maximizes overall throughput.
Summary
Together, the performance affecting configuration values applied to the MirrorMaker producer are:
batch.size = 50000
buffer.memory = 2000000000
compression.type = gzip
linger.ms = 15000
max.request.size = 1000000
To increase producer parallelism, launch multiple MirrorMaker processes. The default for the num.streams
command line argument is 1 so it doesn’t require modification.
MirrorMaker is designed for the pull model, though with some tuning, it can be made to perform well under the push model.
Follow us on Twitter: @SalesforceEng
Want to work with us? Salesforce Eng Jobs