Optimizing Kafka for the cloud
By Ambud Sharma | Software Engineer, Logging team
- Developing locality aware systems and balancing algorithms can help substantially reduce cost
- Making Kafka Producer and Consumer rack aware helps efficiently route traffic
- Data-driven engineering decision making is critical to continuously improve systems
One of the fundamental principles when operating in the cloud is to ensure applications safeguard against outages. A common way to achieve this is to spread an application’s deployment across multiple failure domains. In a public cloud environment, Availability Zone (AZ) can serve as a failure domain. We can use multiple AZs to provide fault tolerance for an application.
Distributed systems like HDFS are traditionally made rack aware to improve fault tolerance by spreading replicas across multiple racks within a datacenter. However, using AZs as rack information is a common practice when running in cloud environments. This enables spreading data copies across multiple AZs, thereby providing fault tolerance in case one fails. While replicating data across AZs provides fault tolerance, it does come at a premium in form of AZ transfer costs.
At Pinterest, we use Kafka extensively as a scalable, fault tolerant distributed message bus to power several key services like user action counting and change data capture (CDC). Since we have Kafka running at a very large scale, we need to be mindful of AZ transfer costs and run as efficiently as possible, and so we focused on reducing the amount of data transferred across AZ.
When a Kafka cluster has brokers spread across multiple AZs, it results in three types of cross AZ network traffic:
- Inter-broker replication traffic
- Traffic from Producers in different AZs
- Traffic from Consumers in different AZs
Out of the above traffic types we need 1 for fault tolerance. However, 2 and 3 are unwanted side effects that cause additional cost which, in theory, can be eliminated.
There are two potential ways to solve this problem.
We can make our Producers and Consumers write/read data only for partitions whose leaders share the same AZ to make them more cost efficient.
Alternatively we could deploy AZ specific Kafka clusters, but in order to achieve this any other real-time consumers would need to make their business logic AZ aware.
In the interest of simplicity, we chose to go with Approach 1 since it minimized code and stack changes. Producer / Consumer AZ awareness can be achieved by looking up the rack information for the leader broker of the partition we are trying to read/write to, and change the partitioning logic for producers and assignments for consumer.
In Kafka, the broker’s rack information is part of the PartitionInfo metadata object that is shared with Kafka clients (both consumers and producers). Therefore, we deployed rack awareness to our Kafka clusters, where each broker publishes the AZ it’s in as node rack info.
We started this initiative with our biggest producer and consumer applications for Kafka, logging agent and S3 transporter.
Producer AZ Awareness
Our logging agent is responsible for reading data from log files and shipping them to Kafka in microbatches. The agent also lets users configure how logs are partitioned across a topic’s partitions.
One key design of our logging agent is the ability to pre-partition the data before calling Kafka’s producer.send(). This allows us to add more advanced routing. To make it AZ aware, we added ability for the logging agent to look up AZ info for the node it’s running on using the EC2 Metadata API. Next, we enhanced our partitioner to leverage rack information in Kafka’s producer metadata to limit writes to only partitions for which leaders are in the same AZ as the logging agent. (This change was only applicable to topics that didn’t use key based partitioning since ordering couldn’t be guaranteed after AZ awareness change, as a partition switches AZ.)
Consumer AZ Awareness
S3 transporter is responsible for reading logs from Kafka and persisting them to S3 for further processing. We tried something similar to the producer for S3 transporter.
Our S3 transporter doesn’t use Kafka consumer assignments. Rather, it uses its own partition assignment system. This allows us to preserve locality in case of node restarts or has temporary network isolation, thus reducing the amount of data that needs to be replayed for a given batch.
We make each S3 transporter worker look up and publish its AZ info to Zookeeper, which helps the S3 transporter master assign Kafka partitions to the workers based on their rack (AZ). If we are unable to look up the (rack) AZ information of a partition, we degrade to assignment across all available workers.
We rolled out AZ aware S3 transporter to production, which resulted in more than 25% savings in AZ transfer cost for Logging. We’re currently in the process of slowly rolling out AZ aware logging agent to further reduce our AZ transfer costs.
We’re also working to extend this design to standard Kafka Producers and Consumers to help us extend our savings to other applications as well, which may include KIP-392 once it’s implemented.
Acknowledgements: Huge thanks to Eric Lopez, Henry Cai, Heng Zhang, Ping-Min Lin, Vahid Hashemian, Yi Yin, Yinian Qi and Yu Yang who helped with improving Logging at Pinterest. Also, thanks to Kalim Moghul for helping us with this effort.