Shallow Mirror

Pinterest Engineering
Pinterest Engineering Blog
8 min readMay 10, 2021

Enhancement to Kafka MirrorMaker to reduce CPU/memory pressure

Henry Cai | Software Engineer, Data Engineering

Pinterest uses Kafka as the backbone for data transportation. As a part of our infrastructure, Kafka MirrorMaker v1 is used to replicate traffic among different Kafka clusters spread across multiple regions. Kafka MirrorMaker is a powerful tool to replicate/aggregate data between different Kafka clusters. Figure-1 shows a typical setup to use the MirrorMaker to replicate traffic between two or more regions. Figure-2 shows how one MirrorMaker v1 cluster works in a multi-node setup.

While Kafka Mirrormaker satisfied our initial needs, we started seeing several scalability problems.

OOM caused due to traffic increase

During the traffic peak time, we often saw CPU spiking and out of memory (OOM) occurring frequently. Further diagnosis showed most of the CPU time was spent on message decompression and recompression; memory usage was often 2–10 times bigger than the number of bytes we were fetching over the network. This also explained part of the root cause of OOMs.

Upon further investigation into the internals of the Kafka MirrorMaker, we confirmed that besides the decompression, data (buffer) duplication in multiple places inside the Kafka MirrorMaker internals was also contributing to increased memory usage. Figure-3 shows the internals of the Kafka MirrorMaker process broken down by various stages where data is copied.

The reason why there are so many stages during message processing is because Kafka MirrorMaker was designed to allow messages to be inspected/transformed by custom plugins before being sent out, so the messages needed to be uncompressed and transformed from network raw bytes into a Java ConsumerRecord object. But in Pinterest deployment, we merely used the MirrorMaker as a replication engine to transfer bytes between Kafka clusters. The source and destination clusters usually had similar setup, and the messages in the source cluster were already properly compressed and batched.

The first thought that came to our mind was to do a direct copy of raw bytes from the receiver’s socket buffer into the sender’s socket buffer since we only cared about message replication. However, we quickly realized that the response packets need to be re-assembled in order for payloads to be readable. Figure-5 is a typical Kafka FetchResponse message on the wire (the message received by consumer on the MirrorMaker node):

Figure-5 shows the structure of FetchResponse. The data from different topics and partitions are usually packaged together in the same network message if those topics/partitions reside on the same source node. Similarly, the Kafka ProduceRequest message sent from the MirrorMaker to the target Kafka cluster also packages multiple topic/partitions’ data together if those partitions reside on the same target node. The partition to node mapping is not guaranteed to be the same between the source cluster and the target cluster. Therefore, we knew we needed to break the bytes to repackage. But could we skip the decompression/re-compression, which was the most CPU/memory intensive operation?

Figure-6: MemoryRecords Data Structure

From the MemoryRecords data structure for each partition (Figure-6), we could see the messages inside a partition were organized in a sequence of RecordBatches and records inside each batch were compressed, but the batch header was not compressed. Therefore, if we could pass the RecordBatch as it is without going deep inside the batch, we could skip the compression stage.

This led us to design an improved Kafka MirrorMaker, called Shallow Mirror, with the following logic (Figure-7):

  1. We will shallowly iterate RecordBatches inside MemoryRecords structure instead of deeply iterating records inside RecordBatch
  2. We will shallowly copy (share) pointers inside ByteBuffer for the message instead of deep copying and deserializing bytes into objects

Instead of writing a new mirror product, we decided to go deep into the Kafka producer/consumer library to introduce a raw bytes mode. When the raw bytes mode is turned on, the consumer will return the raw bytes corresponding to the RecordBatch back to the application client, and the application client will pass the RecordBatch raw bytes to the producer to send to the destination, or else it will behave in the old way. We chose to enhance the consumer/producer API such that this feature can be adopted by other use cases (e.g. MirrorMaker v2). We had also proposed our Shallow Mirror enhancement upstream in Kafka KIP-712.

In late 2020, we deployed the Shallow Mirror in our production and saw significant scaling and performance improvements due to savings on CPU/memory/GC. Figures 8 and 9 showed CPU/memory usage changes before and after the Shallow Mirror was enabled.

Although the strategy sounded good in principle, we encountered several obstacles along the implementation. The next section covers the lessons we learned during the Shallow Mirror implementation.

Byte buffer gotchas

We were passing byte buffer pointers between receiver and sender to avoid deep copy, but we quickly realized that we needed to modify the byte buffer. Figure-10 shows the header fields of the RecordBatch byte buffer. Guess which content needs to be changed?

Figure-10: RecordBatch data structure

Yes, the BaseOffset field. The BaseOffset in the incoming message represents the Kafka message offsets in the source Kafka cluster, but they won’t have the same offset in the target cluster. Once we started to change the byte buffer, we had to be careful regarding the buffer’s read/write mode as well as offset, position, limit, and mark.

First Batch

After the product was deployed, sometimes we noticed that we got more messages than we asked for. Take a look at the following example:

When the source broker only sent messages A, B, and C, the consumer on the tailend got several more messages before A, B,and C. This happened occasionally. We traced this problem back to how the messages were stored on the source broker side.

On the source broker side, the messages are stored by RecordBatch. In this example messages 1, 2, and 3 are in the first batch, and 4, 5, and 6 are stored in the second batch. When the consumer asks for messages from offset 3, the broker actually sends back all 6 messages in two batches (because Kafka brokers leverage sendFile API to eliminate buffer copy to user space) and relies on the consumer to do the filtering. Based on this understanding, we knew we had to crop the first batch before returning it to the client.

Small Batch

The initial prototype worked for the first topic. However, when we moved to the second Kafka topic, we noticed the outbound message throughput between MirrorMaker and target broker started to drop. What was special about the second Kafka topic? When we profiled the raw bytes for that topic, we noticed the batch size for that topic was very small (averaging a few hundred bytes).

Figure 13: Batch size for the small batch messages

When the batch size was small, we lost a lot of network efficiency (remember, network buffers and packets are usually organized in kilobyte size). We tried to increase outbound flow parallelism by increasing Kafka’s max.inflight.requests config and tried using multiple TCP connections, but that didn’t help since the target broker was sequentially processing the reply message in order to maintain the message ordering.

However, we noticed the network performance was only the problem for outbound flow and not for inbound traffic. By looking at the inbound FetchResponse message and outbound ProduceRequest message, we noticed a parity anomaly. Both inbound and outbound messages used MemoryRecords data structure to capture the data within one topic/partition (see Figure-6 for MemoryRecords data structure above in this article). But inbound MemoryRecords contained multiple RecordBatches, while outbound only contained one. We modified the producer library code to relax that constraint, and this solved the throughput problem for that use case.

Message Conversion again … in broker

Everything seemed rosy for a while until our SRE engineer noticed message conversions on the brokers:

Figure-14: Message conversion stats chart

Figure-14 showed the message conversion was happening inside the broker. For brief context on message conversion, this is the process that the Kafka broker needs to translate the message from one format to another and usually happens when the producer and broker use different message formats or different message compression algorithms. Message conversion is one of the key root causes of high CPU usage on Kafka and poor throughput.

If you are with us so far, you can see that we have spent a lot of effort reducing CPU/memory usage on MirrorMaker nodes to skip unnecessary message transformations, and now the kink just moved down to the latter part of the water pipe (the broker). We inspected the broker code and realized it was re-constructing the whole batch on multiple RecordBatches inputs, so we had that fixed.

Future work

We are hoping to contribute this work back to the Kafka community by submitting a Kafka KIP proposal: KIP-712. Please feel free to contribute to the discussion here or extend this work to support more use cases such as Kafka MirrorMaker v2.

Acknowledgement

We would like to thank Pinterest Logging team (Ambud Sharma, Vahid Hashemian et al.) and Apache community members for the brainstorming, thorough review, and guidance.

--

--

Pinterest Engineering Blog
Pinterest Engineering Blog

Published in Pinterest Engineering Blog

Inventive engineers building the first visual discovery engine, 300 billion ideas and counting.

Responses (1)