Processing Event Streams with Kafka, Spark and Flink

Armen Shamelian
Sogeti Data | Netherlands
8 min readMar 26, 2021

--

Overview

Developing data analytics algorithms on streaming data can be a time-consuming job. Before you can get to the development part, it often takes some time to get an environment up and running. This example project is meant to help the reader easily set up a minimal containerized environment with Kafka, Zookeeper, Spark and Flink with up-and-running data streams so you can immediately start working on the models or algorithms to process the data.

The project contains the following components:

  • A Kafka cluster, managing topics and partitions.
  • A Kafka producer, publishing data from a CSV file to a Kafka topic at a regular interval.
  • A Zookeeper instance needed for managing the Kafka cluster.
  • A consumer using Spark Structured Stream to process the incoming messages.
  • A consumer using Apache Flink to process the incoming messages.
Basic architecture

In this example, the producer node publishes data of the names and ages of some users, and the consumer nodes each calculate their average ages over a sliding time window, using the Spark and Flink frameworks. Python is used for all implementations.

Source code:

Apache Kafka

Apache Kafka is an open-source distributed streaming platform. Originally it was developed by LinkedIn, these days it’s used by most big tech companies. Kafka allows for large-scale fault-tolerant communication between systems in a distributed way. It’s also compatible with many programming languages and reporting tools. Kafka’s use cases include real-time processing or analyzing of transactions, sensor data or logs, but also to make data available for use across the entire business. However, it takes a high level of in-depth knowledge to configure and manage a large-scale Kafka pipeline optimally. For streaming only a small amount of data, traditional messaging queues such as RabbitMQ are more suitable.

The key components of Kafka are the producers, consumers, and brokers.

Example Kafka architecture

The producer publishes data in the form of records, containing a key and value, to a Kafka topic. A topic is a category of records that is managed by a Kafka broker. These topics are internally partitioned and replicated, which contributes to Kafka’s scalability and fault tolerance. One broker is always designated as the controller node. The controller is responsible for monitoring the other brokers and managing the partitions in case another broker goes down.

For example, a topic could look something like this:

Example topic

Consumers can subscribe to a topic to receive the records for further processing. Consumers can also be organized into consumer groups, so that they each receive a copy of each record.

In our example project we have one producer, which publishes records to a topic called “users”. We also have two consumer groups that subscribe to this topic. Each consumer group receives all records from the topic for further processing.

In the Docker compose file, the example Kafka cluster is configured as follows:

kafka:
image: wurstmeister/kafka:2.13–2.7.0
depends_on:
- zookeeper
ports:
- "9091:9091"
expose:
- "9092"
environment:
KAFKA_ADVERTISED_LISTENERS:
INTERNAL://kafka:9092,EXTERNAL://localhost:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:INTERNAL:
PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9091
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
volumes:
- /var/run/docker.sock:/var/run/docker.sock

This defines a basic minimal Kafka container needed to run the example. A port is exposed for internal communication with the producers and consumers, and the Zookeeper address is specified.

Apache Zookeeper

Kafka needs a Zookeeper service to manage the configuration of its brokers and topics. If the controller node goes down for whatever reason, Zookeeper is a centralized service responsible for electing a new controller node. Zookeeper has its own protocol to elect a new controller from one of the remaining brokers. In addition, it manages some configurations of the Kafka cluster.

To launch the Zookeeper service, we only need to specify the image and expose the port so that it can communicate with the Kafka cluster. For now, we do not need any additional configuration, but in a large-scale distributed application environment, Zookeeper can be configured to help manage and coordinate all the separate components.

zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"

Apache Spark

Apache Spark is an open-source engine for analyzing and processing large scale distributed datasets. Spark supports both batch and stream processing. In this project we use Spark Structured Streaming, a framework for fault-tolerant large-scale stream processing. The Structured Streaming framework allows us to create a Kafka consumer that processes the received topic. Behind the scenes Spark uses a micro-batching approach, meaning the incoming stream of data is processed in small batches rather than one continuous stream. There is also a newer, experimental Continuous Processing mode. This mode allows for processing the data as one truly continuous stream, with potentially much lower latency. The functionality of this mode is still limited though.

To subscribe to the Kafka topic, we use the following code fragment:

df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", os.environ["KAFKA_HOST"])
.option("subscribe", os.environ["KAFKA_TOPIC"])
.option("startingOffsets", "latest")
.option("groupIdPrefix", os.environ["KAFKA_CONSUMER_GROUP"])
.load()
)

Here we specify the hostname, topic name and consumer group, which are defined in the docker-compose file as environment variables. “df” can now be used to receive the messages in the topic.

# Parse the "value" field as JSON format.
parsed_values = df.select(
"timestamp", from_json(col("value")
.cast("string"),schema)
.alias("parsed_values"))
# We only need the Age column for now.
ages = parsed_values.selectExpr(
"timestamp",
"parsed_values.age AS age"
)
# We set a window size of 10 seconds, sliding every 5 seconds.
averageAge = ages.groupBy(
window(ages.timestamp, "10 seconds", "5 seconds"))
.agg({"age": "avg"}
)

The messages in this example are JSON encoded key-value pairs, which we have to parse first. After parsing the values, the average age is calculated over a 10 second sliding time window. The output is written to the console, and looks something like this:

You can also open the Spark UI (localhost:4040) to monitor your jobs in the Structured Streaming tab:

Apache Flink

Apache Flink is an open-source framework for distributed processing of data streams. Flink offers multiple APIs, such as the DataStream API or Table API . Recently a Python API has also been added, which we are using in this example.

# Define source
st_env.execute_sql(
f"""
CREATE TABLE source (
id INT,
name STRING,
age INT,
ts BIGINT,
rowtime as TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka-0.11',
'topic' = '{os.environ["KAFKA_TOPIC"]}',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '{os.environ["KAFKA_HOST"]}',
'properties.zookeeper.connect' = '{os.environ["ZOOKEEPER_HOST"]}',
'properties.group.id' = '{os.environ["KAFKA_CONSUMER_GROUP"]}',
'format' = 'json'
)
"""
)

In this section, we define the source table and connect it to the Kafka topic. The table should match the schema of the received messages, in order to parse the messages and append them to the table. The “rowtime” attribute is also defined, which equals the timestamp of the received message. This value is then later used to divide the incoming messages into time windows.

Note that we have to explicitly state a consumer group that is different from the Spark consumer group. This ensures that both consumer groups receive a copy of each message in the topic. If multiple consumers are in the same consumer group, the messages will be divided over the two consumers.

Next, we have to define where the data goes after being processed, also known as a sink:

# Define output sink
st_env.execute_sql(
"""
CREATE TABLE sink (
average_age DOUBLE,
window_end TIMESTAMP(3)
) WITH (
'connector' = 'print',
'print-identifier' = 'Average Age, Window: '
)
"""
)

This creates a simple table with two columns, a time window, and the average age we will compute. The “print” connector means that the output is just printed to the console.

st_env.from_path("source").window(
Slide.over("10.seconds")
.every("3.seconds")
.on("rowtime")
.alias("w")
).group_by("w").select(
"AVG(1.0 * age) as average_age, w.end AS window_end"
).insert_into(
"sink"
)

In the actual calculation, we define a sliding time window with a length of 10 seconds, based on the “rowtime” value, and it slides every 3 seconds. Then we take the average age by aggregating over the timestamps and we say that the result should to into the sink we defined earlier.

And finally, in the output we get an average age and a timestamp for each window:

Docker

We use three different Dockerfiles: one for the producer and one for each of the two consumers. The purpose of each Dockerfile is to install only the needed dependencies for that container and run the corresponding Python script.

In the Docker-Compose, the images and configuration for each container is specified. You can add or remove containers here depending on your needs.

How to run

Requirements to run:

  • Docker
  • Docker Compose

Run docker-compose build to build the images, and afterwards docker-compose upto start all containers. To shut down, use docker-compose down -v to shut down all containers and delete mounted volumes.

After running it the first time, every subsequent build and run will be quicker since the build cache is used to re-build the images.

You can rebuild only one specific container by running docker-compose up --build <service_name>. This will rebuild the service, applying any changes to the script, and restart the container.

docker-compose up --build is a shortcut to restart only all services where something has changed.

To scale up a component, you can add the –scale flag to the “up” command. You can experiment with scaling up each component independently, creating multiple instances of those containers, based on your needs.

And you can also use Docker Swarm and Kubernetes for orchestrating resources on a larger scale.

Monitoring

Each service has a web UI available that can be used to monitor the process in an easier way than just looking at the logs or console output. The Kafka cluster can be monitored with CMAK or Kafdrop.

CMAK
Kafdrop

Spark and Flink also have UI’s that can be exposed to the localhost by adding the corresponding ports to the Docker Compose file.

Conclusion

In this article we have covered an example development setup for processing or analyzing streaming data. As a data scientist or machine learning engineer, you can add your own data and get started right away. We have also shed some light on the separate components and how to let them communicate with each other through the messaging broker. Setting up such a pipeline optimally takes a high level of in-depth knowledge, especially on large-scale systems with huge piles of data and hundreds or thousands of consumers. Nevertheless, I hope this article helps you understand some basic concepts around Apache Kafka, Spark and Flink.

--

--