Building Data Pipelines With Kafka
Today our goal is to write a data pipeline using Kafka.
This post requires Kafka to be installed and is aimed at engineers who are building their first data pipeline. However, engineers who have built data pipelines can quickly browse through it.
What to Expect
A Proof of concept on how to build a data pipeline. This does not include effort needed for the data pipeline to be operational, resilient and scalable i.e., production ready.
Let’s say we have a product that can be accessed only by voice(Similar to Google Home or Amazon Alexa). We have permission to listen to users’ conversations for the entire session the product[in the form of hardware device] is On.
Our objective is to find out interesting topics that are being discussed at the current moment, in each home(or any location the device is connected via WiFi). This information would help us in better serving our customers.
Note: The interesting topics we want to find are different from Kafka topics. More on Kafka topics in below sections.
1. Voice conversations should be converted to text. Techniques like speech to text are at our rescue. We could either build our own model or buy third party service. For this post, this module is out of scope.
2. Converted text should be sent back to backend systems in realtime. These systems/processes are called producers.
3. Data should be held in a way that processes(other than producers) can either ask for future data or wait for future data to get generated. We call this kind of system as a Queue which follows First In, First Out semantics.
4. Processes called as consumers should be deployed to read the data from queues, hold the data in memory for certain amount of time usually in the order of minutes and perform business logic computations needed.
5. Aggregated data which is the outcome of the above computations could either be sent to another Queue or can be persisted to a database, depending on the need.
Note: Some hardware devices are capable of performing business logic computations[these kind of devices are called edge devices]. Building data pipelines using edge devices is out of scope of the current post.
To build data pipeline for the above objective following assumptions are made.
1. Movies descriptions are used to simulate users’ conversations. In the next post we will replace movies descriptions with movies subtitles. Each movie is considered as home.
Before getting our hands dirty in building our first data pipeline lets get a crisp view of the following essential building blocks of Kafka.
Brokers: Brokers are entry point to the ecosystem. They allow producers to write data to Queue in a partition of a topic and allow consumers to read data at a particular offset in a partition of a topic.
Topic: Topic is a logical identifier in Kafka storage. It is analogous to table name in a database. Producers and Consumers must specify the topic name to write or read to the Kafka storage respectively.
Partitions: Partitions are the building blocks of fault tolerance and throughput in Kafka distributed system.
Partition is a unit of parallelism in Kafka. More partitions will allow messages to be consumed in parallel by consumers. However it comes with a cost of more File Pointers needed to write to partitions, more memory needed on the client side(both producer and consumer). For more information on this, checkout confluent blog about partitions.
When Kafka is spin up as a multi node cluster, one of the nodes is assigned as the leader to each partition[of a particular topic].
Each partition is replicated to multiple nodes for fault tolerance. Leader node is responsible to send the data written to it to its replicated nodes. In the event of leader node went out of service, one of the replicated nodes are elected as Leader for the partition.
While designing no. of partitions and partition key within a topic following should be given importance:
- Ordering Guarantee — all the events generated should be read in chronological order. Lets say, the consumption pattern is to read all events in sequence of a home(from our example), then partition key could be unique identifier for home. This would ensure all events of the same home would go to same partition. This does not mean we need to create a partition for each home, which could be in millions. A partition will contain conversations from multiple homes. But no conversation of a home will present in 2 partitions.
- Concurrency — Messages residing in partitions could be read simultaneously.
- Logical separation of data — Ability to read data simultaneously is good. The partition key should be designed in such a way that, all the data needed by our consumption patterns should reside in a partition.
Offsets: Each message in the partition is identified with an offset, an ever increasing number. Offset is analogous to index in a table or index in a array. These offsets act as a reference to retrieve data.
Producer: A producer is any client that has made connection to the broker using Producers API. It has to mention a topic name to write data to the Kafka. If the topic is not yet created, new topic will be created automatically(the configuration to create new topic automatically can be turned off from properties)
Consumer: A consumer is any client that has made connection the broker using Consumers API. It has to mention a topic name to read data from the Kafka.
Consumer Groups: A set of consumers[possibly running on same machine or on different machines] can have a unique consumer group id[a string or integer]. While spinning up a new consumer using Consumers API, this consumer group id can be used. Consumers of a consumer group subscribe to a topic(or multiple topics). Kafka takes up the responsibility of delivering the next message to the consumers set subscribed to a topic. Zookeeper is used to make a note of offset read by each consumer group. Once the server gets the ack that a consumer had read the message, next message is delivered.
Examples: Examples of consumer groups are email notification sending group with group id emailnotif, SMS notification sending group with group id smsnotif, App notification sending group with group id appnotif etc..
The consumer group config in our codebase could contain
- name of the consumer group
- id of the consumer group
- number of consumers in the consumer group.
This information can be saved as configuration file or in a database and be used to spin up enough number of consumer threads.
Pubsub is the only way of producing and consuming messages, i.e., producer will
publish a message and consumer will
subscribe to a topic partition to read messages. Kafka topic is used only as a storage system for the messages. Until the retention period of the message, the message stays in the system. After that, message gets purged by the Kafka.
Message Queue Mode: Pubsub model can also be used to convert a topic into a message queue using application logic.
This mode is generally used when one of the consumers in all consumer groups have read the message it should be deleted from the Queue.
Application logic should delete message from the topic, once consumer groups have read the message.
Lets say, we have a stream of messages in a queue and for every message in the queue we need to send an email, app and SMS notification[You can assume that message will have all the required details ]. In this case we assign a set of consumers to a consumer group id, Kafka will make sure that a message will be sent to only one of the consumers.
Let’s (Actually) Get Started
Run Zookeeper and Kafka Server
Producer: Let’s Write messages to Kafka Server on to the topic voice.
Create an instance of KafkaProducer which is thread-safe. The instance can be shared between multiple threads without any additional overhead.
Why KafkaProducer with key value pair?
Key could be any datatype(for ex: Integer or String for most of the cases) which could be used to designate a partition in the topic. Value could be any datatype(Avro or Json etc.), which ultimately will encoded to byte array by Kafka before sending to broker by producer and also decoded to original dataype by Kafka after consumer has received the message.
Simulate text conversation by using a file which contain movie id and movie summary as tab delimited lines
For each line in the above file plot_summaries.txt below
action is performed, which also involves sending data to
.send does not invoke a network call immediately. This is balanced by
- Messages are sent in batches. Batches are decided on batch.size(in bytes). This setting applies to per partition.
- When a client invokes a .send method, the message is buffered using buffer.memory to accumulate upto a batch.size. buffer.memory applies to entire producer
- .Producer client will block .send invocation if buffer.memory is filled up. If before max.block.ms client can add message to buffer then client will be blocked. Post the max.block.ms exception would be raised
Complete code for the producer can be found at ListenProducer
Consumer: Lets read data from the topic voice.
KafkaConsumer is not thread-safe. So, a single instance of KafkaConsumer cannot be shared across multiple threads. Spinning up of new consumer should happen after child threads are created. Complete code for the producer is at ListenConsumer.
There are two ways to determine the number of threads.
No. of Threads = No. of Partitions:
No. of Threads will always be equal to No. of Partitions.
In this case, we spin up a consumer thread for each partition in a topic. It could ensure ordering guaranty within each partition, however, there won’t be any control on the no. of threads being created. We need to use
assign method in Consumer API to assign a partition to consumer thread.
As the consumer job forking the threads does not have any control on the no. of threads, the count could either overwhelm the machine or the no. of threads created could be too less to use all the cores of the machine.
Using this method, consumer is in control of determining the threads. Depending on no. of cores in the machine, consumer threads are determined and the partitions are shared among the consumer threads. Care should be taken that the no. of threads are no more than no. of partitions.
Using subscribe method gives the consumer job more flexibility in forking appropriate no. of consumer threads. This would be ideal way to create consumer threads, if ordering guarantee within partition is not needed.
No. of Threads ≤ No. of Partitions
No. of Threads will always be less than or equal to No. of Partitions.
Case 1: No. of Threads = No. of Partitions
In this case, the system has the capability of a thread reading from a partition, but there is no guaranty that this will be handled always.
Case 2: No. of Threads < No. of Partitions
In this case, one consumer thread might be reading from one or more partitions.
- There are also other ways to determine the no. of consumers, but are in general too complex to code and maintain[in my opinion].
- To build data pipeline, if any stream processing framework being used [like Spark or Flink], then the overhead of creating threads[or cores] is handled by the frameworks itself.
Lets apply business logic:
Read stopwords from a file and create a list out of it. This list is used to remove words that are of no significance(words like I,you, a, an, the etc.,)
For every minute poll the server for latest messages, process the records as needed and persist the aggregated data to Redis.
Data Consumption Layer:Data persisted to Redis via our data-pipeline can be consumed by multiple downstream clients. One such client queries the Redis for every 5 minutes and checks the latest 10 topics with highest number of conversations.
Though not mentioned in the heading, what we have built now is a realtime datapipeline, but without any support of distributed processing framework like Spark or Flink.
The pipeline that we have built now is cumbersome to scale for the following reasons:
1. Glue code to spin up multiple consumers to scale them according to load should be written and maintained by the data pipeline developers. This would be additional burden along with the processing logic code that should be written.
2. There are two kinds in realtime data pipelines. One kind reads the data at an offset applies the required business logic, writes to the destination. The second kind will accumulate related data from a stream or different streams for a certain time in the order of minutes to perform business logic. The business logic could include transformations and aggregations similar to our example. This is called microbatch. Distributed frameworks [like Spark] has the capability to handle these multiple microbatches. For the first kind too, to make the pipeline scalable, fault tolerant and resilient, using distributed processing framework is better.
So, in our next post, I will write how to convert the current realtime datapipeline to use distributed framework like Flink and Spark.
1. plot_summaries.txt is at CMU Movie Summary Corpus.
2. stopwords.txt is at NLTK Stopwords list.