PySpark is widely used for large-scale data processing in both batch and real-time streaming scenarios. One of the most common real-world applications involves reading streaming data from Kafka and writing processed data to a Delta table. However, testing PySpark code — especially when dealing with streaming data from Kafka — can be challenging due to the external dependencies and distributed nature of Spark. In this article, we’ll explore how to unit test PySpark applications that read from Kafka streams and write to Delta tables, using mocking techniques.
1. What is PySpark?
PySpark is the Python API for Apache Spark, a distributed data processing framework that enables you to handle large-scale data with ease. Spark supports both batch and streaming processing, and in PySpark, we use DataFrames
and RDDs
(Resilient Distributed Datasets) to operate on distributed data.
In streaming applications, you can continuously process real-time data using Spark Streaming, a micro-batch processing engine built on Spark’s powerful batch engine.
2. What is Apache Kafka?
Kafka is a high-throughput, distributed messaging system designed for handling real-time data streams. Kafka allows you to produce and consume streams of data in real time, making it a popular choice for streaming data pipelines.
For instance, in a PySpark application, Kafka can be used as a source of data where the Spark job consumes a Kafka topic’s messages, processes them, and then writes the result to an external storage, such as a Delta table.
3. What is Delta Lake?
Delta Lake is an open-source storage layer that brings ACID transactions to big data workloads using Apache Spark. Delta tables store data in a fault-tolerant and scalable manner, enabling data engineers to write reliable data pipelines with strong consistency guarantees.
In our example, we’ll show how to write the transformed Kafka data to a Delta table after processing it in PySpark.
4. Unit Testing PySpark Code
Unit testing is essential for ensuring the correctness of your PySpark application. However, testing streaming applications can be tricky due to the following challenges:
- Kafka and Delta tables are external systems that your PySpark code interacts with, so you don’t want to depend on them during tests.
- You want to isolate the logic of your PySpark transformations and verify that the correct data is passed through.
To address these challenges, we can use mocking techniques. Mocking allows us to simulate external systems like Kafka and Delta without actually interacting with them, and focus on testing the behavior of our PySpark application.
5. Mocking in Unit Tests
For our tests, we will use the unittest
module along with the unittest.mock
package to mock Kafka streams and Delta table writes. Here’s what we’ll mock:
- Kafka stream: We’ll simulate reading from a Kafka stream by creating a static
DataFrame
. - Delta table write: We’ll mock the Delta table write operation to verify that it’s called with the correct data, without actually writing anything to storage.
6. Writing the PySpark Kafka Stream to Delta Table
Before diving into unit tests, let’s first understand how to write a Kafka stream to a Delta table in PySpark. Below is a simple example of reading messages from a Kafka topic, transforming them, and then writing the data to a Delta table.
PySpark Code Example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a Spark session
spark = SparkSession.builder \
.appName("KafkaToDelta") \
.getOrCreate()
# Read data from Kafka
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test_topic") \
.load()
# Transform the Kafka stream (e.g., extract the value and cast to String)
kafka_values = kafka_stream.selectExpr("CAST(value AS STRING)")
# Write the transformed stream to a Delta table
kafka_values.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/checkpoints") \
.start("/tmp/delta-table")
This example shows how to:
- Read from Kafka: Use
spark.readStream
to read from a Kafka topic. - Transform data: Extract the
value
field from Kafka messages and cast it to a string. - Write to Delta: Write the processed data to a Delta table using structured streaming.
7. Setting Up the Unit Test
Now that we understand the core PySpark code, let’s write a unit test for it. We will:
- Mock the Kafka stream using a static
DataFrame
. - Mock the Delta table write operation using the
patch
function from theunittest.mock
module. - Use assertions to verify that the data is transformed correctly and that the Delta write is called.
Unit Test Setup
- Spark Session: We need a local Spark session for testing, which is created in the
setUpClass
method of the test class. - Mock Kafka Input: Instead of reading from an actual Kafka topic, we simulate a Kafka input using a static DataFrame.
- Mock Delta Write: We mock the
start
method to avoid writing to an actual Delta table during the test. - Assertions: We validate that the data transformation is correct, and that the write operation was triggered as expected.
Unit Test Example
import unittest
from unittest.mock import patch
from pyspark.sql import SparkSession
from pyspark.sql import Row
class TestKafkaToDelta(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Create a local Spark session for testing
cls.spark = SparkSession.builder \
.master("local[2]") \
.appName("UnitTestKafkaToDelta") \
.getOrCreate()
def test_kafka_to_delta(self):
# Sample Kafka input data (mocked)
sample_data = [Row(value="test_message_1"), Row(value="test_message_2")]
# Create a DataFrame with mocked Kafka input data
kafka_df = self.spark.createDataFrame(sample_data)
# Mock the Kafka read stream
with patch('pyspark.sql.SparkSession.readStream') as mock_read_stream:
mock_read_stream.return_value = kafka_df
# Call the actual function (your code that reads from Kafka and writes to Delta)
kafka_values = kafka_df.selectExpr("CAST(value AS STRING)")
# Mock the write operation
with patch('pyspark.sql.streaming.DataStreamWriter.start') as mock_write_stream:
# Perform the write
kafka_values.writeStream.format("delta").outputMode("append").start("/tmp/delta-table")
# Assert that the DataFrame was correctly transformed
transformed_data = [row['value'] for row in kafka_values.collect()]
self.assertEqual(transformed_data, ["test_message_1", "test_message_2"])
# Assert that the write was triggered once
mock_write_stream.assert_called_once()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
if __name__ == '__main__':
unittest.main()
Explanation of the Unit Test
- setUpClass: Creates a local Spark session for the tests. This session simulates the distributed environment for Spark, allowing you to run PySpark operations locally.
- test_kafka_to_delta:
- Mocking Kafka Stream: Instead of consuming data from Kafka, we mock the Kafka stream using a static DataFrame containing test data (
sample_data
). - Mocking Delta Write: We mock the
start
method ofwriteStream
to ensure that the actual Delta table is not written to during the test. This allows us to focus on the transformations and the correctness of the data.
Assertions:
- The first assertion checks that the data was correctly transformed (i.e., the Kafka
value
field was cast to a string). - The second assertion ensures that the Delta write operation (
start
) was called exactly once. - tearDownClass: Stops the Spark session after all the tests have run, cleaning up resources.
9. Running the Tests
To run your PySpark unit tests, you can use Python’s unittest
command:
python -m unittest test_kafka_to_delta.py
This command will execute the test cases defined in your file and provide the output of the assertions.
10. Conclusion
Unit testing PySpark applications that involve Kafka streams and Delta tables is essential for building robust and reliable data pipelines. By using mocking techniques with Python’s unittest
framework, you can isolate the logic of your Spark jobs, simulate external systems like Kafka, and test your data transformations without depending on actual external resources.