Kafka with Spark Streaming —Different Approaches to Read Data

Explaining types of streams PySpark provides for dealing with Kafka

Ahmed Uz Zaman
Plumbers Of Data Science
6 min readMay 5, 2023

--

Image Source databricks

Intro

Apache Kafka is a distributed messaging system that allows you to publish and subscribe to streams of records. Kafka is a distributed streaming platform that allows you to publish, subscribe, store, and process real-time data. It is a popular choice for big data applications because of its high throughput, low latency, and scalability.

Spark Streaming can consume data from Kafka topics. You can use Kafka with PySpark to build real-time data pipelines. For example, you could use Kafka to collect data from sensors, then use PySpark to process and analyze that data.

Here are some of the benefits of using Kafka with PySpark:

  • Scalability: Kafka is highly scalable, so you can easily add more brokers to handle more data.
  • Durability: Kafka stores all data in durable logs, so you can be sure that your data will not be lost.
  • Reliability: Kafka is highly reliable, so you can be sure that your data will be processed even if there are failures.
  • Flexibility: Kafka can be used for a variety of applications, including streaming analytics, real-time data processing, and event sourcing.

Examples — How to read data from kafka

There are different ways to read data from Kafka using PySpark, depending on your use case and requirements. Here are some of the ways to do it:

  1. Direct Stream Approach
  2. Receiver-based Approach
  3. Structured Streaming Approach

Direct Stream Approach

This approach uses the createDirectStream() method from the pyspark.streaming.kafka module to read data directly from Kafka. This method creates a direct connection between the Spark Streaming application and the Kafka brokers. It allows you to consume data from one or more Kafka topics, and provides automatic load balancing and fault tolerance.

Here's an example:

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

ssc = StreamingContext(spark.sparkContext, 1)

kafkaParams = {
"bootstrap.servers": "kafka:9092",
"auto.offset.reset": "smallest"
}

directStream = KafkaUtils.createDirectStream(
ssc,
topics=['my_topic'],
kafkaParams=kafkaParams
)

directStream.print()

ssc.start()
ssc.awaitTermination()

Receiver-based Approach

This approach uses the createStream() method from the pyspark.streaming.kafka module to read data from Kafka. This method creates a receiver on the Spark worker nodes that receives data from Kafka, and then forwards it to the Spark Streaming application. This approach is useful when you want to consume data from multiple Kafka topics, or when you want to use the Kafka high-level consumer API.

Here's an example:

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

ssc = StreamingContext(spark.sparkContext, 1)

kafkaParams = {
"zookeeper.connect": "localhost:2181",
"group.id": "my_group"
}

receiverStream = KafkaUtils.createStream(
ssc,
"localhost:2181",
"my_group",
{"my_topic": 1}
)

receiverStream.print()

ssc.start()
ssc.awaitTermination()

Structured Streaming Approach

This approach uses the readStream() method from the pyspark.sql.streaming module to read data from Kafka using Spark's structured streaming API. This method creates a streaming DataFrame that reads data from Kafka and allows you to perform SQL-like operations on the data.

Here's an example:

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
.add("id", IntegerType()) \
.add("name", StringType()) \
.add("age", IntegerType())

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_topic") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")

df.writeStream \
.format("console") \
.option("truncate", "false") \
.start() \
.awaitTermination()

In this example, we’re using Spark’s structured streaming API to read data from Kafka and perform operations on the data using DataFrames. We’re using the from_json() function to parse the JSON data from Kafka and convert it into a DataFrame with columns that match the schema. Then, we're selecting the columns we want and writing the output to the console.

Advantages and Limitations

Direct Stream Approach:

Advantages:

  • Provides low latency data ingestion with minimal overhead and delay
  • Supports consuming data from multiple Kafka topics and partitions
  • Provides automatic load balancing and fault tolerance
  • Suitable for high-volume data streaming applications

Limitations:

  • Requires configuration of offsets and partition management
  • Limited control over offset management and may require manual intervention in some cases
  • Requires continuous and stable connectivity between Spark Streaming application and Kafka brokers

Receiver-based Approach

Advantages:

  • Provides better throughput than direct stream approach in some cases
  • Allows for consuming data from multiple Kafka topics and partitions
  • Provides automatic load balancing and fault tolerance
  • Can use the Kafka high-level consumer API

Limitations:

  • Can have higher latency compared to direct stream approach
  • Requires continuous and stable connectivity between Spark Streaming application and Kafka brokers
  • May require manual intervention in some cases, such as handling receiver failures or restarting streaming application

Structured Streaming Approach

Advantages:

  • Provides a high-level API for real-time data processing with Spark
  • Supports consuming data from multiple Kafka topics and partitions
  • Provides support for exactly-once semantics with the use of the “checkpoint” option
  • Allows for integration with Spark SQL for complex analytics and data transformations on the data stream

Limitations:

  • Can have higher latency compared to direct stream approach or receiver-based approach
  • Requires stable and reliable Kafka cluster with appropriate configuration and tuning
  • Requires proper management of checkpointing and state management for reliable processing of data

In summary, each approach has its own set of advantages and limitations, and the choice of the best approach for your use case will depend on factors such as the volume, velocity, and complexity of the data, as well as the desired level of fault tolerance and reliability of the data processing.

Some Real Life Scenarios — Kafka & Spark Streaming

Direct Stream Approach

Real-time processing of high-volume data streams, such as clickstream data, sensor data, or financial market data, where low latency is critical and a high volume of data needs to be ingested quickly.

  • Example: A financial services company processing a high volume of real-time stock market data for trading strategies and decision-making.

Real-time processing of telemetry data from IoT devices, where low latency is important to detect and respond to anomalies quickly.

  • Example: An energy company processing telemetry data from wind turbines to detect faults and optimize power generation.

Receiver-based Approach

Real-time processing of data streams where moderate-to-high throughput is required, and processing can tolerate slightly higher latency.

  • Example: A social media platform processing user-generated content data in real-time for moderation, sentiment analysis, and targeted advertising.

Real-time processing of data streams where high throughput is required, but lower latency is acceptable, and data sources have limited partitioning.

  • Example: A video streaming platform processing real-time user engagement data for recommendation algorithms and improving user experience.

Structured Streaming Approach

Processing of data streams that require complex analytics, aggregation, or joins, and require a high level of fault tolerance and exactly-once semantics.

  • Example: A transportation logistics company processing real-time sensor data from vehicles, weather data, and traffic data to optimize delivery routes and improve fleet management.

Real-time processing of data streams where complex transformations are required, such as feature engineering for machine learning models or anomaly detection using statistical models.

  • Example: A cybersecurity company processing real-time network traffic data to detect and prevent cyber attacks.

Some General Examples

Here are some examples of how Kafka is used with PySpark:

  • Streaming analytics: You can use Kafka to collect data from sensors, then use PySpark to process and analyze that data in real time.
  • Real-time data processing: You can use Kafka to process large amounts of data in real time. For example, you could use Kafka to process financial data or social media data.
  • Event sourcing: You can use Kafka to store event data, which can then be used to reconstruct the state of your system at any point in time.

These are just a few more examples to illustrate the types of use cases where each Kafka read stream technique may be most appropriate. Ultimately, the best approach will depend on a variety of factors, including the volume and velocity of the data, the desired level of fault tolerance and reliability, and the complexity of the processing required.

Conclusion

In conclusion, Kafka and PySpark are a powerful combination for real-time data processing. Kafka is a highly scalable and reliable platform for storing and processing data streams, while PySpark is a powerful and easy-to-use platform for data analysis. Together, they can be used to build real-time data processing pipelines that can handle large volumes of data at high speeds.

My Published Libraries

PySpark — SQL Basics

PySpark — Built-in Functions

PySpark — DataFrame API

--

--

Ahmed Uz Zaman
Plumbers Of Data Science

Lead QA Engineer | ETL Test Engineer | PySpark | SQL | AWS | Azure | Improvising Data Quality through innovative technologies | linkedin.com/in/ahmed-uz-zaman/