Making Sense of Big Data

Enabling streaming data with Spark Structured Streaming and Kafka

In this article, I’ll share a comprehensive example of how to integrate Spark Structured Streaming with Kafka to create a streaming data visualization.

Thiago Cordon
Data Arena

--

Photo by Markus Spiske on Unsplash

Introduction

Apache Kafka is being largely adopted in modern architectures providing a more reliable and scalable way to capture and integrate real-time data between systems. It provides among other features a scalable and fault-tolerant distributed framework, APIs to publish /consume event streams and it allows you to control how long the event will be stored.

Important Kafka concepts:

  • Broker: A Kafka cluster consists of one or more servers running Kafka. These servers are called brokers and the Zookeeper service manages them.
  • Topic: Is a category/feed name to which records are stored and published. It is divided into partitions that are split across multiple brokers, allowing parallelization.
  • Producers: Applications that push records into Kafka topics within the broker.
  • Consumers: Applications that pull records off a Kafka topic.
Kafka Components — Image by author.

Apache Spark has an engine called Spark Structured Streaming to process streams in a fast, scalable, fault-tolerant process. It uses micro-batches to process data streams as a series of small-batch jobs with low latency.

Spark Structured Streaming — Image by author.

In this article, I’ll use Kafka to store the streaming events produced by Wikipedia containing changes on wiki pages.

Spark Structured Streaming will be used to process and transform the event in a report of the top 10 users with more editions in a time window frame.

Architecture

The following picture shows the high-level architecture used in this article:

High-level architecture — image by author.

A Python application will consume streaming events from a Wikipedia web service and persist it into a Kafka topic. Then, a Spark Streaming application will read this Kafka topic, apply some transformations, and save the streaming event in Parquet format. Another Spark Streaming application will read the Parquet files to generate the report.

Setting up the environment

The following docker-compose was used to set-up the environment.

Components of this docker environment:

Zookeeper: required by Kafka to keep track of cluster nodes, topics, partitions, etc.

Kafka: the Kafka cluster.

  • Set the KAFKA_ADVERTISED_HOST_NAME with your docker host IP
  • The KAFKA_CREATE_TOPICS configuration creates a topic called wiki-changes with 1 partition and 1 replica just for test purposes and you can change it if you want to test with more partitions or replicas. More details on how to configure this property here.

Spark: Jupyter service with Spark embedded to interactively run the spark application.

The project is available here. You can clone it from Github and start the docker containers as described below to run the next steps:

git clone https://github.com/cordon-thiago/spark-kafka-consumer
cd spark-kafka-consumer/docker
docker-compose up -d

Producing events from Wikipedia

To simulate an application generating events for Kafka, I created this simple Python producer application that reads events from Wikipedia and sends them to the wiki-changes Kafka topic.

The message is serialized in JSON format to send to the Kafka topic. The library sseclient is required to read the source events and the kafka-python enables the Python to produce Kafka messages.

Python notebook— event producer.

Kafka event persistence

Once you start the producer application, the Kafka topic wiki-changes starts receiving the events.

To check that, I’ll connect to the Kafka container and run a console consumer to print the events received.

# Connecting to the Kafka container
cd spark-kafka-consumer/docker
docker exec -it docker_kafka_1 bash
# Inside the Kafka container
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wiki-changes
kafka-console-consumer.sh output

Now that our Kafka cluster is working properly, let’s move on.

Spark Streaming event consumer

The Spark notebook that consumes the events is composed of the following steps:

  • The Kafka topic is read by the Stream Dataframe called df.
  • The key and value received by the Kafka topic are converted to String then a schema is defined for value, generating the df_wiki.
  • Some transformations like converting to a tabular format, column renaming, and column conversion are made over df_wiki, generating the df_wiki_formatted.
  • The queryStream persists the df_wiki_formatted dataframe as parquet files partitioned by change_timestamp_date and server_name.
  • The queryStreamMem reads the parquet files as a stream to output the number of rows every 5 seconds — just for debugging. Note that the memory output sink is not the recommended output to use in production.
Spark notebook — event consumer.

Running this notebook, we are ingesting the events in a local path, simulating a Data Lake — to keep things simple. In a real scenario, it’s common to use a cloud persistent storage layer like Amazon S3, Azure Data Lake, or Google Cloud Storage for this purpose.

Visualizing Streaming Data

The following steps compose the visualization notebook that shows the top 10 users with more page editions in near real-time:

  • The parquet files generated by the consumer notebook are read by the streaming dataframe called df_stream.
  • The df_count is generated by the count of changes (rows) by every 10 minutes time frame based on the event timestamp (change_timestamp column). Here it’s applied the window-based aggregation concept with windows of 10 minutes, sliding every 10 minutes. Another important feature here is the watermarking that doesn’t aggregate events arriving more than 10 minutes late considering the event timestamp.
  • The queryStream object outputs the df_count to a table in memory called wiki_changes that can be accessed as a table in Spark SQL to extract the top 10 users with more page editions.
  • The final output is printed as a graph (using seaborn library) and table (converting the spark dataframe to Pandas dataframe). If the producer and consumer applications are running, this notebook will output an updated graph and table every 10 seconds.
Graph — Top 10 users with more page editions.
Table — Top 10 users with more page editions.

Final thoughts

In this article, I provided a guide on how to set up a docker environment with Spark and Kafka, create a Python event producer, a Spark event consumer, how to ingest the events in a persistence layer, and how to consume these events to generate a report in near real-time.

I choose Jupyter to create the Python and Spark applications just because it’s interactive and easier for people who are starting with Spark.

I don't recommend using the memory sink for streaming dataframes in production because it uses the driver’s memory.

Regarding data visualization, there are a lot of solutions that can be used and you should evaluate which of them better fits your necessity when planning a production application. I choose in this article the seaborn library because it generates the graph by code and it’s simple to use in this use case.

I hope you enjoyed this content and feel free to reach out if you have questions or improvement suggestions. If you liked it, clap and share ;)

--

--