How we embraced the new release of Apache Kafka

As introduced in our previous posts (link 1, link 2) many applications behind Walmart.com are being powered by the highly scalable and distributed streaming platform, Apache Kafka. With the high-speed revolution, Kafka has a new milestone, release 0.10. With this release, Kafka and its ecosystem have reached a new level of maturity. In this post, I would like share our recent interesting results with Kafka 0.10 release. The next post will more focus on the streaming, big data and Hadoop ecosystem around Kafka.

Kafka 0.10 and its Downstream Consumers around it

Consumer offset storage: Kafka topic

In earlier Kafka releases (before 0.8.2), consumers commit their offsets to ZooKeeper. During the last holiday season, our Zookeeper cluster was experiencing a very high volume of writes caused by many consumers committing offset very frequently. Beyond increasing Zookeeper capacity (e.g. SSD as storage), Kafka now provides an alternative to store consumer offsets into a special and separate Kafka topic, which is replicated and highly available.

Given this new option, we conducted some performance evaluations to understand it better, before migrating from Zookeeper as the storage for consumer offsets.

We used 8 virtual machines (VM) on Walmart’s OpenStack cloud. On top of 8 VMs, Kafka release 0.10.1.0 was deployed by Walmart’s open-source PaaS - OneOps, to form 1 Kafka cluster. Each VM had the following specs:

  • 8 CPU cores, 24GB memory, 1Gbps NIC, 160GB disk (backed up by SSD)

The Zookeeper cluster ran externally on 3 dedicated VM, and was also deployed by OneOps using Zookeeper cookbook.

Eight Kafka topics were created: each had 100 partitions, 2 replicas and a reasonable retention policy was set to avoid overflowing the disk. Other topic configurations remain as default.

To generate and consume workloads (message size 10KB), we used the producer and consumer performance test suite which ships along with each Apache Kafka release. To reduce the network latency and fully load the Kafka cluster, 8 producers and 8 consumers were deployed on separate VMs in the same data center as Kafka.

All producers and consumers used default configurations, except that we tune the following configuration on the consumer side:

  • auto.commit.interval.ms: the frequency in milliseconds that the consumer offsets are auto-committed to Kafka or Zookeeper

Here is one of our observations:

when using Kafka topic to store consumer offset, a small performance penalty (~7% loss) was observed. This only happens when the frequency of committing consumer offsets is very high, potentially overloading the disk.

To validate this, we set auto.commit.interval.ms = 50 to simulate the high frequency of committing offsets and notice the write traffic to the Kafka topic (_consumers_offsets_) is interesting: 12k message/s only generates 1.2MB/s, translating into tiny message size, around 100 Bytes:

Write bandwidth (MB/s) for Kafka topic (_consumers_offsets_) that stores consumer offsets
Number of message per second for Kafka topic (_consumers_offsets_) that stores consumer offsets

Since _consumers_offsets_ topic was replicated 3 times with 200 partitions, it was reasonable to infer that the penalty may be due to the fact that offsets (tiny messages) have to be written synchronously (at least twice). However when the commit frequency is not that high, no performance hit was observed. Here are the detailed numbers between auto.commit.interval.ms = 50 and 5000 (5000 is default):

  1. Kafka throughput: 533 MB/s v.s. 580MB/s
  2. Consumer throughput: 53300 message/s v.s. 58000 message/s

So our preliminary take-away is: for those cases when re-processing messages should be minimized (to achieve this, consumer needs to frequently commit offsets), a small performance penalty may potentially exist if consumer offsets are stored in Kafka. This could be invisible if the disk I/O bandwidth is large enough. Overall, we think having consumer offset in Kafka topic still be better than overloading and halting Zookeeper, leading to even worse scenarios (e.g. connection timeout, cannot elect leaders).

SSL (Secure Sockets Layer)

To serve critical business use cases, (such as shipping customer transaction data) the communication between clients and Kafka must be secure. To this end, we invested in enabling SSL for the Kafka 0.10 release. We will briefly share our observation on the performance trade-off with SSL.

There is no doubt that enabling SSL will slow down the Kafka performance, because of the additional encryption and authorization. So as above, we did a performance evaluation between SSL and non-SSL to quantify how much performance penalty we have to pay for SSL.

We enabled SSL from producer to Kafka and from Kafka to consumer, without SSL inter-broker communication.

Based on above setup, our results show there is around 20% performance drop when enabling SSL. For example, 460 MB/s v.s. 580MB/s

Kafka cluster throughput comparison between non-SSL and SSL

Also notice that when SSL was enabled, the CPU usage on Kafka broker was significantly higher than non-SSL, so Kafka became more CPU bound, comparing to network bound when SSL is not enabled. Theoretically, adding more CPU capacity should help to improve the performance.

Additionally, we enabled SSL inter-broker communication. As expected, the performance further degraded in trade for the extra level of security.

The root cause of penalty is mostly because of the SSL engine in JDK used by Kafka. OpenSSL may be a promising replacement that should run faster and a ticket exists for tracking of this.

Message Loss Prevention

Small message loss is acceptable in some use cases, such as monitoring, machine learning, prediction. However for business critical cases, e.g. financial reporting, accounting reconciliation, message loss should be minimized or prevented.

Kafka has positioned “no message loss” as the first-class citizen since 0.9 release, one example is MirrorMaker, whose default configuration is to prevent message loss. To certify MirrorMaker will not lose message in most scenarios, we have done rigorous “chaos” testing:

Beyond bouncing and killing MirrorMaker process, the following actions have been taken on MirrorMaker machine:

  • block full range of port
  • bring down network (/sbin/service network stop)
  • network packet drop (e.g. drop 10% traffic on eth0: /sbin/tc qdisc add dev eth0 root handle 1000 netem loss 10%)

From all chaos testings, we do see there may be 5–10 % duplicate message delivered by MirrorMaker, due to the re-send logic in the chaos scenario. However, we did not see the message loss, which is a good signal to trust MirrorMaker as a reliable data shipping method between Kafka clusters.

What is Next?

We plan to introduce the streaming, big data and Hadoop ecosystem around Kafka and how it brings business value to Walmart.com

Twitter: @ningZhang6