Apache Kafka is a great tool. It is an event streaming platform that is capable to publish and subscribe to streams of events, store streams of events, and process streams of events. Basically, it’s all about “streams of events” making Kafka the best candidate for use-cases such as building real-time streaming data pipelines. But what if I need a near-real-time? What if I would like to work with micro-batches instead of streaming? While Spark Streaming is literally based on micro-batches, for Kafka we have to find some workaround.
Someone may ask, why would I need it? Well, there may be plenty of use-cases. For example, you may want to make your corporate reporting near-real-time, but you may also need to reuse your existing database loads which work in a batch manner. To solve this you should read streaming data from Kafka, but instead of pushing incoming messages to the database one by one you accumulate them for N minutes and then dump into data files, so that they can be read by the database as a chunk.
So let’s dive into this task. As usual in Kafka, with Kafka Consumer API you consume messages as soon as they arrive. Your application code somehow processes those records and it all happens in the infinite loop. Here are some simple examples from Confluent.
In this article, we implement the algorithm which will make it possible to perform the above-mentioned records processing for a certain period (batch duration interval) with some trigger when the batch is finished. The code will be written in Python, the application will consume records by the means of Kafka Python Client by Confluent.
In order to split the infinite streaming flow into time frames the obvious choice is to use the timestamp value of the Kafka record’s meta column. This way each record could belong to a certain time frame. And as soon as a new record comes with a timestamp that is greater than the upper boundary of the current time frame, the batch is considered to be finished: a specific piece of code is executed to handle this event, and time frame boundaries are updated.
This is a very brief description of the algorithm. Let’s get into more details with the below diagram.
With Kafka Consumer client we poll messages with some periodicity defined in a parameter. It means that there may or may not be a record returned by this call. However, we still need to track the current boundary (timestamp) in every loop iteration to be able to finish the batch when no more messages coming. When the record is None we simply add the poll timeout value to the current boundary.
On every iteration, it is checked whether the current boundary exceeded the upper limit or not.
Let’s look at the code implementation itself. Below is the Batch Manager class which controls the current boundary and upper boundary, as well as responsible for identifying when the batch should be finished and the next batch should be started. Please, see the inline comments for more details.
Such micro-batch approach requires tracking of boundaries for every message consumed. Thus, additional checks are performed during the implementation of Batch Consumer.
The cool thing here is that you can simply use these classes without modification, except defining your functions to parse_kafka_message and finish_micro_batch. In this particular example, all records are collected in a list and printed to console when the batch is finished.
Finally, the application entry point to make it run.
Run Kafka and Batch Consumer in Docker
It’s time to run the application to see how it works. For sure, we also need Kafka itself to push some data into the broker. To be able to do this we only need to have three files with the following content:
When done, start up your application by running
docker-compose up -d --build from the project directory. Let’s play around!
- Connect to Kafka Broker machine from your terminal with
docker-compose exec broker bash
- Create a topic to push data to and pull data from with
kafka-topics --create --bootstrap-server broker:9092 --topic my-topic --partitions 1 --replication-factor 1
- From another terminal window connect to your application with
docker-compose exec kafka-batch-consumer bash
- And run the application with some necessary arguments by
python main.py --broker broker:29092 --topic my-topic --group_id my-group --batch_interval_seconds 30 --poll_interval_seconds 10
- Return to the first terminal and start pushing some dummy data to the Kafka topic with
kafka-console-producer --broker-list broker:9092 --topic my-topic
As soon as you push the message it will be polled by the consumer and processed as you defined. In this example, the message’s timestamp is printed to console in a human-readable format. However, when the batch is finished, all messages that have come during the specified batch interval are printed to the console.
That’s it! We’ve built a custom consumer working in a micro-batch manner. You only need to implement your business logic to process messages.