Apache Kafka micro-batch Consumer in Python

Alex Rosenblatt
Jan 2 · 4 min read

Introduction

Apache Kafka is a great tool. It is an event streaming platform that is capable to publish and subscribe to streams of events, store streams of events, and process streams of events. Basically, it’s all about “streams of events” making Kafka the best candidate for use-cases such as building real-time streaming data pipelines. But what if I need a near-real-time? What if I would like to work with micro-batches instead of streaming? While Spark Streaming is literally based on micro-batches, for Kafka we have to find some workaround.

Someone may ask, why would I need it? Well, there may be plenty of use-cases. For example, you may want to make your corporate reporting near-real-time, but you may also need to reuse your existing database loads which work in a batch manner. To solve this you should read streaming data from Kafka, but instead of pushing incoming messages to the database one by one you accumulate them for N minutes and then dump into data files, so that they can be read by the database as a chunk.

Algorithm

So let’s dive into this task. As usual in Kafka, with Kafka Consumer API you consume messages as soon as they arrive. Your application code somehow processes those records and it all happens in the infinite loop. Here are some simple examples from Confluent.

In this article, we implement the algorithm which will make it possible to perform the above-mentioned records processing for a certain period (batch duration interval) with some trigger when the batch is finished. The code will be written in Python, the application will consume records by the means of Kafka Python Client by Confluent.

In order to split the infinite streaming flow into time frames the obvious choice is to use the timestamp value of the Kafka record’s meta column. This way each record could belong to a certain time frame. And as soon as a new record comes with a timestamp that is greater than the upper boundary of the current time frame, the batch is considered to be finished: a specific piece of code is executed to handle this event, and time frame boundaries are updated.

This is a very brief description of the algorithm. Let’s get into more details with the below diagram.

Algorithm for Kafka Consumer working in a batch manner

With Kafka Consumer client we poll messages with some periodicity defined in a parameter. It means that there may or may not be a record returned by this call. However, we still need to track the current boundary (timestamp) in every loop iteration to be able to finish the batch when no more messages coming. When the record is None we simply add the poll timeout value to the current boundary.

On every iteration, it is checked whether the current boundary exceeded the upper limit or not.

Implementation

Let’s look at the code implementation itself. Below is the Batch Manager class which controls the current boundary and upper boundary, as well as responsible for identifying when the batch should be finished and the next batch should be started. Please, see the inline comments for more details.

Kafka Batch Consumer. Batch Manager class controls boundaries and batch intervals

Such micro-batch approach requires tracking of boundaries for every message consumed. Thus, additional checks are performed during the implementation of Batch Consumer.

Kafka Batch Consumer. Batch Consumer class implements Kafka Consumer with micro-batch mode

The cool thing here is that you can simply use these classes without modification, except defining your functions to parse_kafka_message and finish_micro_batch. In this particular example, all records are collected in a list and printed to console when the batch is finished.

Finally, the application entry point to make it run.

Kafka Batch Consumer. Main entry point

Run Kafka and Batch Consumer in Docker

It’s time to run the application to see how it works. For sure, we also need Kafka itself to push some data into the broker. To be able to do this we only need to have three files with the following content:

Kafka Batch Consumer. Prerequisites to run application in Docker

When done, start up your application by running docker-compose up -d --build from the project directory. Let’s play around!

  1. Connect to Kafka Broker machine from your terminal with docker-compose exec broker bash
  2. Create a topic to push data to and pull data from with kafka-topics --create --bootstrap-server broker:9092 --topic my-topic --partitions 1 --replication-factor 1
  3. From another terminal window connect to your application with docker-compose exec kafka-batch-consumer bash
  4. And run the application with some necessary arguments by python main.py --broker broker:29092 --topic my-topic --group_id my-group --batch_interval_seconds 30 --poll_interval_seconds 10
  5. Return to the first terminal and start pushing some dummy data to the Kafka topic with kafka-console-producer --broker-list broker:9092 --topic my-topic

As soon as you push the message it will be polled by the consumer and processed as you defined. In this example, the message’s timestamp is printed to console in a human-readable format. However, when the batch is finished, all messages that have come during the specified batch interval are printed to the console.

Kafka console producer on the left, Kafka batch consumer on the right

That’s it! We’ve built a custom consumer working in a micro-batch manner. You only need to implement your business logic to process messages.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data…

Alex Rosenblatt

Written by

Big Data and programming fan, but try to write on wide range of topics.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Alex Rosenblatt

Written by

Big Data and programming fan, but try to write on wide range of topics.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store