Scaling Kafka Consumer for Billions of Events

Archit Agarwal
Nov 18 · 5 min read
Adobe Stock by Alex

Details of the application

Overview

PayPal is in the process of migrating its analytical workloads to Google Cloud Platform (GCP). As part of the migration, our team designed and developed a streaming application which consumes data from Kafka and streams it directly to BigQuery. This application is critical to PayPal as most of the analytical readouts are based on this data and, since it is real-time, it reduces the time for readouts from 12 hours to a few seconds. For context, this application takes approximately 30–35 billion events on a daily basis and is built of top of Project Reactor.

Our team wanted to do a Load and Performance (LnP) test of this application before going to production, in order to understand the behavior of our system under different circumstances.

Performance testing is a type of software testing that determines system performance in terms of sensitivity, reactivity and stability under a particular workload. It is very important to understand how our system behaves so we can allot appropriate capacity for our application, and increase or decrease capacity over the application’s lifetime. Many times, performance testing is not given its due importance. This leads to surprises and an unpredictable environment in production.

Performance testing focuses on certain factors of a software program such as:

  • Speed — It checks whether the response of the application is fast.
  • Scalability — It determines the maximum user load.
  • Stability — It checks if the application is stable under varying loads.
  • Throughput — It checks the number of events processed per minute and second.

Sample Producer

We wanted to produce more data than our consumer can consume, so we started our testing in a stage environment where we created a custom topic with five partitions. In LIVE, there are 300 partitions for the same topic.

We wrote a sample producer which can pump a large quantity of data to Kafka (more than production).

Here is the sample code for a producer which was used for LnP Testing:

public class SampleProducer {
public static final String topicName = "test.platform.lnp";
static Producer<String, byte[]> producer = null;
static AtomicLong counter = new AtomicLong();

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(12, 12, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>());
createProducer();

while (true) {
for (int j = 0; j < 200000; j++) {
executorService.submit(new MessageSender(getBeacon(), getEncoder()));
}
Thread.sleep(5000);
}
}

public static void createProducer() {
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", "localhost:9092");

//If the request fails, the producer can automatically retry,
props.put("retries", 0);
props.put("batch.size", 250000);
props.put("linger.ms", 5000);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
producer = new KafkaProducer<>(props);
}


static class MessageSender implements Runnable {
private Beacon beacon;
private Encoder encoder;

MessageSender(Beacon beacon, Encoder encoder) {
this.beacon = beacon;
this.encoder = encoder;
}

@Override
public void run() {
// Java Class representing avro schema
Event event = new Event(beacon);
byte[] byteArray = encoder.toBytes(event);
final ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, byteArray);
producer.send(record,
(recordMetadata, e) -> {
long count = counter.incrementAndGet();
if (count % 100000 == 0)
System.out.println(count + " records reached Kafka");
});
}
}

public void closeProducer() {
producer.close();
}

}

Kafka Consumer config

kafka.consumer.topics=test.platform.lnp
kafka.consumer.group.id=test.dev.consumer
kafka.consumer.max.poll.interval.ms=900000
kafka.consumer.max.poll.records=1000
kafka.consumer.fetch.min.bytes=104857
kafka.consumer.max.partition.fetch.bytes=1048576
kafka.consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
kafka.consumer.client.id=test-consumer
kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.consumer.heartbeat.interval.ms=20000
kafka.consumer.session.timeout.ms=60000
kafka.consumer.auto.offset.reset=earliest

Application Config

Min Heap: 2 GB

Max Heap: 6 GB

GC: Concurrent mark sweep

CPU: 24 core machine

Consumer group: Single machine

When we started pumping data from the producer, we could see very high heap usage. After some time, the consumer stopped consuming as it needed to rebalance because of an out of memory error.

After analysis, we determined that the application was consuming as much data as it could. It couldn’t push all the data to BigQuery, so it was holding those messages in memory.

We then changed the application to control the number of messages consumed, changed the garbage collection algorithm to G1GC, and ran the application. Once the application finished running, the memory consumption was fine. The reason we changed to G1GC was because there were a lot of full GC pauses. The application couldn’t send a heartbeat to the Kafka consumer coordinator and the application was going for a rebalance.

Visual VM

Other Test Cases

Since this application writes behavioral analytics data, which is critical for making data-driven decisions, testing all possible scenarios was very important.

  1. We created a lag in the topic by pumping in more data, and then we started consuming from the topic. We monitored the application and it was able to catch up quickly. This was critical to test. There can be instances when the consumer is not running for some time due to production issues, so we should have a catch up plan and understand how the application can do the catch up.
  2. For checking the endurance of our application, we created a topic with 300 partitions with high volume. We ran only a single consumer consuming from it to make sure the application can handle the load and not crash.

Throughput

Based on the throughput of the application, we wanted to raise the capacity ticket for this application.We could see the throughput was 950,000 per minute. Our consumer could consume and was able to publish the same amount of events during the same minute to BigQuery. Here is the screenshot from our application log:

2021-06-11 09:07:35,203 INFO [kafka-producer-network-thread | test-producer] KafkaProducer Successfully sent the data to Kafka {"BQEventPublished":956060,"EventConsumed":957369}

Since the consumer performed well, our application could run only with just 75 machines with two core CPU and 8 GB Memory in LIVE — a big win for us. Our application could consume 30 billion events and stream all those in BigQuery in real time. The application could maintain a small virtual machine (VM) footprint and still have bandwidth to catch up in case of any issues. This application can scale as traffic increases.mThis benchmarking exercise helped us get the correct pool size, which reduces the cost as well.

Key Takeaway

Benchmarking an application and understanding the behavior of the application is crucial before going to Production to avoid any surprises and production issues. We did intensive testing of our consumer and made sure we understood the behavior of it under all circumstances. During this process, we increased the performance of the application by tuning a few parameters.

Our Team: Shobana Neelakantan,Aride Chettali,Vignesh Raj K

The PayPal Technology Blog

The PayPal Technology Blog