Apache Spark Structured Streaming in PySpark with Apache Iceberg & Kafka

Thomas Lawless
7 min readJul 16, 2024

--

Photo by Conny Schneider on Unsplash

In modern data architectures, integrating streaming and batch processing with efficient data storage and retrieval is critical. Apache Kafka, Apache Iceberg, and Apache Spark Structured Streaming offer a powerful combination for building such pipelines. This blog will guide you through setting up and using these tools together, implemented in PySpark.

Note: The code examples below use a simple but powerful local development environment outlined in a previous article.

Overview

In this blog post, we’ll explore how to combine the best of Apache Kafka, Apache Spark, and Apache Iceberg in a simple example of Apache Spark Structured Streaming.

Apache Kafka

Apache Kafka is a distributed event streaming platform capable of handling high-throughput, real-time data feeds. It serves as a central hub for data streams, allowing seamless data flow between systems and applications. Kafka’s ability to ingest and process large volumes of data with low latency makes it critical for modern data streaming pipelines. It ensures reliable and scalable message brokering, facilitating real-time analytics, monitoring, and data integration across diverse systems. By providing robust fault tolerance and scalability, Kafka empowers organizations to build efficient, resilient, and responsive data-driven architectures.

Apache Spark

Apache Spark is an open-source unified analytics engine designed for large-scale data processing. It offers an in-memory computing capability, significantly boosting the speed of data processing tasks compared to traditional disk-based processing. Spark’s robust framework supports various workloads, including batch processing, interactive queries, machine learning, and real-time streaming, making it versatile for different data scenarios. Its ability to handle large volumes of data quickly and efficiently makes it a critical tool in modern data streaming pipelines, enabling real-time analytics, stream processing, and complex data transformations. Spark’s ease of integration with other big data tools and its extensive library support further enhance its utility in building comprehensive and responsive data-driven solutions.

Apache Iceberg

Apache Iceberg is an open table format designed for large-scale, high-performance analytics on big data. It enables reliable, high-speed querying and efficient data management by organizing large datasets into manageable, optimized partitions. Iceberg supports schema evolution and concurrent write operations, which are crucial for maintaining data integrity and performance in dynamic, real-time environments. Its compatibility with various big data processing engines, such as Apache Spark, ensures seamless integration into modern data streaming pipelines. By providing robust data versioning and atomic commits, Iceberg enhances the reliability and consistency of data operations, making it a critical tool for efficient and scalable data streaming workflows.

Step-by-Step Guide

The steps below build upon a local development environment for Apache Iceberg, outlined in a previous article, by adding Apache Kafka as a streaming data source. In order to complete these steps, you must have the following tools installed in addition to those described in the previous article:

Step 1. Setting Up Apache Kafka

We will use Podman with podman-compose to configure and run a single node instance of Apache Kafka acting as the source of streaming data for this example.

Create a Docker Compose File

Create a new file in the root directory of the project named docker-compose.yaml with the following content.

services:
kafka:
image: apache/kafka:latest
ports:
- '9092:9092'
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'

Run Kafka using Podman

Use the following command to run Kafka using Podman as defined in the docker-compose.yaml file provided above.

podman compose --file ./docker-compose.yaml up

The following log output should been displayed once Kafka has successfully started.

[KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

Create a Kafka Topic

Next, let’s create the Kafka topic to be used for our streaming example.

podman compose exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic metrics

Kafka is now running and configured for our streaming example.

Step 2: Create a Steaming Pipeline using PySpark

Next, let’s create our PySpark job which will read streaming data from Kafka and append the data to an Apache Iceberg table.

Create a Spark Session

The first step is to create a Spark session.

from pyspark.sql import SparkSession

# Initialize Spark session with Iceberg configurations
spark = SparkSession.builder \
.appName("IcebergLocalDevelopment") \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
.getOrCreate()

Take note of the additional JAR file added to the session’s configuration. The org.apache.spark:spark-sql-kafka-0–10_2.12:3.5.1 JAR in the code above provides the dependencies needed for Kafka.

Create an Iceberg Table

Next, let’s define the schema for our Iceberg table where the streaming data will be persisted.

# Create the iceberg table if it does not exist
spark.sql("""
CREATE TABLE IF NOT EXISTS local.streaming.data_points (
id INT,
name STRING,
value INT
) USING iceberg
""")

Create a Streaming Reader

We can now create a Kafka streaming reader in our Spark job.

# Create a streaming reader connected to our local Kafka instance
metrics_stream_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'metrics') \
.option("startingOffsets", "latest") \
.load()

Parsing the Message

Kafka messages are read using a set of standard columns into a Spark DataFrame. The code below will parse the value column of the message creating a set of columns which align with our Iceberg table schema.

from pyspark.sql.functions import from_csv

# Convert value from binary to string
metrics_csv_df = metrics_bin_stream_df.selectExpr("CAST(value AS STRING) as value")

# Convert the value of the event into columns using the schema.
metrics_exploded_df = metrics_csv_df \
.select(from_csv('value', 'id INT, name STRING, value INT').alias('value')) \
.select('value.*')

Create a Streaming Writer

With the value of our message parsed into columns, we can now append the data to the Iceberg table we defined above.

# Append the data to the Iceberg table.
iceberg_df = metrics_exploded_df.writeStream \
.format('iceberg') \
.outputMode('append') \
.trigger(processingTime='1 seconds') \
.option('checkpointLocation', '.local_checkpoint') \
.toTable('local.streaming.data_points')

Take note of the trigger and the checkpointLocation option. The trigger instructs Spark to process streaming messages every second while the checkpointLocation option provides a directory where Spark will store topic offset information. Spark uses the information in the checkpoint directory when jobs are restarted so that processing can pickup where it left off.

Run Until Interrupted

Finally, we’ll instruct Spark to run this streaming job until it receives a termination signal.

# Run until a termination signal is received.
iceberg_df.awaitTermination()

Step 3: Testing the Pipeline

Let’s perform a simple test on our streaming data pipeline. For this step, we’ll create a simple bash script which will create CSV data and produce messages using the Kafka console producer utility.

Create a Data Generator

The bash script below will produce 1,000 test messages to our local metrics topic.

#!/bin/bash

for x in {1..1000};
do
id=$x
name="metric_$((1 + RANDOM % 2))"
value=$((1 + $RANDOM % 10))
echo "$id,$name,$value";
sleep .1;
done \
| podman compose exec kafka \
/opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic metrics

Test the Pipeline

Next, let’s start the Spark job by executing the python script containing our streaming data pipeline. This can be done on the command line, for example:

python ./streaming_data_pipeline.py

With the pipeline running, execute the bash script in a separate terminal window. The script will take a moment or two to complete. Once the script is complete, you can stop the Spark job.

Query the Results

Let’s take a look at the data in our Iceberg table.

from pyspark.sql import SparkSession

# Initialize Spark session with Iceberg configurations
spark = SparkSession.builder \
.appName("IcebergLocalDevelopment") \
.master("local[*]") \
.config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1') \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
.getOrCreate()

# Count the rows in the table
spark.sql("SELECT count(*) FROM local.streaming.data_points").show(truncate=False)
+--------+
|count(1)|
+--------+
|1000 |
+--------+

# Show a sample of the data
spark.sql("SELECT * FROM local.streaming.data_points").show(truncate=False)
+---+--------+-----+
|id |name |value|
+---+--------+-----+
|946|metric_1|6 |
|947|metric_1|6 |
|948|metric_2|5 |
|949|metric_2|6 |
|950|metric_1|8 |
|951|metric_2|4 |
|952|metric_2|3 |
|953|metric_1|2 |
|954|metric_1|5 |
|937|metric_1|2 |
|938|metric_2|5 |
|939|metric_1|8 |
|940|metric_1|1 |
|941|metric_1|10 |
|942|metric_1|7 |
|943|metric_2|6 |
|944|metric_2|4 |
|945|metric_1|4 |
|235|metric_1|3 |
|236|metric_2|5 |
+---+--------+-----+
only showing top 20 rows

Success! The Iceberg table contains the data we expected to see.

Apache Iceberg Table Maintenance

Apache Iceberg table maintenance is crucial in data streaming use cases to ensure optimal query performance and system reliability. As data continuously flows into Iceberg-managed tables, maintenance tasks such as compaction, partition management, and metadata pruning become essential to prevent performance degradation. Regular compaction of small files into larger ones minimizes the overhead associated with file handling and improves read efficiency. Effective partition management ensures that data is organized in a way that reduces the amount of data scanned during queries, thereby accelerating query response times. Metadata pruning, on the other hand, helps in maintaining the efficiency of table operations by discarding outdated or unnecessary metadata, reducing the overall storage footprint and enhancing query planning. By performing these maintenance tasks diligently, organizations can sustain high query performance, enabling real-time analytics and decision-making processes to run smoothly and efficiently in data streaming environments.

Note: I’ve provided an overview of Apache Iceberg table maintenance in a previous article.

Conclusion

Integrating Apache Kafka, Apache Iceberg, and Apache Spark Structured Streaming provides a robust solution for handling real-time data ingestion, storage, and processing. Kafka ensures reliable message delivery, Iceberg offers a scalable and efficient table format, and Spark Structured Streaming processes the data seamlessly. By following the steps outlined in this guide, you can set up a streaming pipeline that efficiently manages your data flow from ingestion to storage and querying.

--

--

Thomas Lawless

Distinguished Engineer, IBM CIO Data, AI, and Automation Platform