Seamless Data Processing: Spark Structured Streaming for AWS Kinesis Message Streams

Harnessing Spark Structured Streaming for efficient AWS Kinesis message processing and analysis

Gaurav Patil
Globant
9 min readJun 22, 2023

--

Photo by Markus Spiske on Unsplash

Organizations increasingly rely on real-time insights to make critical business decisions in today's data-driven world. Streaming data has become a valuable resource, providing a continuous flow of information that can be processed and analyzed in near real-time. AWS Kinesis, a fully managed streaming service, offers a powerful platform for capturing, processing, and storing streaming data. However, efficiently processing and analyzing this massive influx of data can be challenging. Spark Structured Streaming is a high-level API built on Apache Spark that simplifies the development of scalable, fault-tolerant, and real-time data processing applications. By seamlessly integrating with AWS Kinesis, Spark Structured Streaming provides an ideal framework for unlocking the full potential of streaming data.

In this blog post, we will learn how to implement streaming data processing using AWS Kinesis and Spark Structured Streaming. We will explore the benefits of using Spark Structured Streaming for handling AWS Kinesis messages. Whether you are a data engineer, data scientist, or technology enthusiast looking to enhance your real-time data processing capabilities, this blog post will guide you on processing Kinesis stream messages using Spark Structured Streaming.

Understanding AWS Kinesis

AWS Kinesis is a fully managed streaming service offered by Amazon Web Services (AWS) that enables organizations to capture, process, and analyze real-time streaming data at scale. It provides a reliable and scalable platform to ingest and store massive volumes of data from various sources, such as websites, mobile applications, IoT devices, and more. To comprehend the power of Spark Structured Streaming for AWS Kinesis message processing, it’s essential to understand the key components of AWS Kinesis.

AWS Kinesis Service Key Components

Let’s understand the key components of AWS Kinesis one by one.

  • Kinesis Data Streams: This component acts as the foundation of AWS Kinesis. It allows you to create and manage highly scalable and durable data streams. A stream comprises one or more shards, each providing a fixed-capacity unit. Data records are distributed across these shards, ensuring parallel processing and fault tolerance.
  • Kinesis Data Firehose: This service simplifies loading streaming data into AWS data stores or analytics services. It automatically scales to match the data ingestion rate and handles data delivery, compression, and transformation. Kinesis Data Firehose can directly integrate with data lakes, Amazon Redshift, Amazon S3, and other AWS services for further processing and analysis.
  • Kinesis Data Analytics: This component allows you to run real-time SQL queries on streaming data. With Kinesis Data Analytics, you can perform transformations, aggregations, filtering, and join operations on the incoming data streams. It provides a powerful and intuitive interface for processing and gaining insights from streaming data without the need for complex infrastructure management.

Understanding Spark Structured Streaming

Apache Spark is a widely adopted open-source distributed computing framework renowned for its speed, scalability, and ease of use. Spark Structured Streaming is a high-level API built on top of Apache Spark that simplifies the development of real-time data processing applications. It provides a declarative and unified programming model that treats streaming data as continuous tables, enabling developers to apply the same set of operations for both batch and streaming data. Let’s understand how structured streaming works in Spark.

Spark Structured Streaming follows a micro-batch processing model, dividing streaming data into small batches. It leverages the underlying Spark engine’s capabilities to process these micro-batches in parallel and provide low-latency and fault-tolerant processing. The architecture includes a streaming source, transformations and operations, and output sinks.

The programming model of Spark Structured Streaming

Spark Structured Streaming offers several benefits for processing and analyzing streaming data:

  • Unified Batch and Streaming Processing: Spark Structured Streaming provides a unified programming model seamlessly integrating batch and streaming data processing. It allows you to use the same DataFrame and SQL API for static (batch) and dynamic (streaming) data, simplifying the development and maintenance of data pipelines.
  • High-Level API: The high-level API of Spark Structured Streaming abstracts away the complexities of handling streaming data. It offers intuitive constructs like DataFrames, Datasets, and SQL queries, enabling developers to express their data processing logic in a declarative and SQL-like manner.
  • Fault Tolerance and Reliability: Spark Structured Streaming provides fault tolerance and reliability guarantees out of the box. It maintains end-to-end reliability by checkpointing metadata and saving intermediate results. If failures occur, it can recover and resume processing from the last known consistent state automatically.
  • Scalability and Parallelism: Spark’s distributed processing engine allows you to scale your streaming applications horizontally across a cluster of machines. It automatically parallelizes the processing of data across multiple nodes, enabling high throughput and low-latency processing of large-scale streaming data.
  • Stateful Stream Processing: Spark Structured Streaming supports stateful stream processing, which allows you to maintain and update the state as new data arrives. This is useful for scenarios that require aggregations, event-time processing, windowed computations, or maintaining session-based information.
  • Rich Ecosystem and Integrations: Spark has a vast ecosystem and supports a wide range of data sources and data sinks, making integrating with various systems and technologies easily. You can seamlessly connect to popular data storage systems, message queues, databases, and more, enabling flexible data ingestion and output options.
  • Extensibility and Compatibility: Spark Structured Streaming is built on top of the Spark engine, which provides extensive libraries, APIs, and integrations for data processing, machine learning, graph analytics, and more. It is compatible with various programming languages, including Scala, Java, Python, and R, allowing you to leverage your preferred language and existing codebase.

These benefits make Spark Structured Streaming a powerful framework for building real-time data processing applications, enabling developers to handle complex streaming use cases with ease and scalability.

By combining the capabilities of Spark Structured Streaming and AWS Kinesis, organizations can leverage the strengths of both platforms to process, analyze, and gain real-time insights from their streaming data at scale.

A hands-on example of processing Kinesis messages with Spark Structured Streaming

In this section, you will see how we can use Spark Structured Streaming for processing Kinesis messages. For this, we will set up a new Kinesis stream. We will put some messages to a Kinesis stream. Then we will read messages using Spark Streaming and print messages on the console.

To create a Kinesis Data Stream, follow the steps below.

  1. Sign in to the AWS Management Console and open the Kinesis console at https://console.aws.amazon.com/kinesis.
  2. Choose Data Streams in the navigation pane.
  3. In the navigation bar, expand the Region selector and choose a Region.
  4. Choose Create Kinesis stream.
  5. Enter a name for your stream (for example, orders-data-stream).
  6. Enter 1 for the number of shards, but leave Estimate the number of shards you'll need as is.
  7. Choose Create Kinesis stream.

The next step is to put some data in Kinesis Data Stream, which we created in the earlier step. We will need the boto3 library to connect to the AWS Kinesis service, the FastAvro library to compress data to the Avro file and the PySpark library to work with Spark. Perform all necessary imports in your Python code as given in below code snippet.

import boto3
import fastavro
import tempfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro

Define variables for stream name and Avro message schema. In the below code snippet, you need to provide the name of your Kinesis stream in the kinesis_stream_name variable. Provide schema of the message in avro_schema variable for writing the message in Avro format.

# Kinesis data stream name
kinesis_stream_name = 'your-kinesis-stream-name'

# Avro message schema
avro_schema = {
'type': 'record',
'name': 'Message',
'fields': [
{'name': 'id', 'type': 'string'},
{'name': 'title', 'type': 'string'}
]
}

Now we will create a new function generate_avro_record that takes two parameters like Avro schema and dictionary object to be put as Avro data. We will call schemaless_writer function of the FastAvro library to generate bytes format data.

def generate_avro_record(avro_schema, json_data):
avro_schema = fastavro.parse_schema(avro_schema)
fp = tempfile.TemporaryFile()
fastavro.schemaless_writer(fp, avro_schema, json_data)
fp.seek(0)
data = fp.read()
return data

Next, we will initialize Kinesis Client, and using this object, we will get serialized data using generate_avro_record function and put it into the Kinesis data stream.

# Initialize Kinesis client
kinesis_client = boto3.client('kinesis', region_name='your-aws-region')

data = [
{'id': '1', 'title': 'Data Engineer'},
{'id': '2', 'title': 'Data Scientist'},
{'id': '3', 'title': 'System Admin'},
{'id': '4', 'title': 'BI Developer'}
]

# Put data into Kinesis stream
for json_data in data:
avro_bytes = generate_avro_record(avro_schema, json_data)
response = kinesis_client.put_record(
StreamName=kinesis_stream_name,
Data=avro_bytes,
PartitionKey='1'
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
print("Data was successfully put into Kinesis stream.")
else:
print("Failed to put data into Kinesis stream.")

We will initialize a Spark Session for processing the streaming data, which connects to Kinesis and reads streaming data in the Spark program.

# Configure Spark Environment
spark = SparkSession.builder \
.appName('KinesisSparkStreamingExample') \
.getOrCreate()

# Read Avro Data from Kinesis Stream in PySpark
streaming_df = spark.readStream \
.format('kinesis') \
.option('streamName', kinesis_stream_name) \
.option('region', 'your-aws-region') \
.option('awsAccessKey', 'your-aws-access-key') \
.option('awsSecretKey', 'your-aws-secret-key') \
.load()

You can apply some transformations to your data according to your needs. For example, removing special characters from a string, tokenizing, concatenating two or more columns, etc. In the below code snippet, I have used from_avro function to deserialize Avro data stored as a Kinesis message using avro_schema which I have defined earlier.

processed_df = streaming_df.select(from_avro('data', avro_schema).alias('avroData')) \
.select("avroData.id", "avroData.title")

You can write processed data to a file, Kafka topic, database, or output console. In below code snippet, I have used console as an output format.

# Define Output Sink
output_query = processed_df.writeStream \
.format("console") \
.outputMode("append") \
.start()

# Start the Streaming Query
output_query.awaitTermination()

This way, we can use Spark to process messages in the Kinesis data stream.

When you want to apply a custom action or operation on each micro-batch of data in a streaming DataFrame, you can use the foreachBatch() function in Spark Structured Streaming. It provides a flexible way to process the data and perform actions not directly supported by built-in streaming operations or sinks.

The below example demonstrates the use of foreachBatch() function. In this code snippet, I have provided the Kinesis stream name in kinesis_stream_name variable. The process_micro_batch function is defined to handle each micro-batch of data. You can write your custom transformation logic inside this function. Each micro-batch represents a portion of the streaming data that has arrived within a specific time interval. The batch_id argument is the default argument passed to process_micro_batch function. It holds an integer-type value and can be utilized for processing or referencing purposes within the function. Generated batch_id is specific to each micro-batch and is not a system-wide or universally unique number. The foreachBatch() function takes a user-defined function as an argument, which is applied to each micro-batch of data.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp

# Define Kinesis stream name
kinesis_stream_name = 'micro-batch-test-stream'

# configure Spark Environment
spark = SparkSession.builder \
.appName("CustomMicroBatchExample") \
.getOrCreate()

# Read data
streaming_df = spark.readStream \
.format('kinesis') \
.option('streamName', kinesis_stream_name) \
.option('region', 'your-aws-region') \
.option('awsAccessKey', 'your-aws-access-key') \
.option('awsSecretKey', 'your-aws-secret-key') \
.load()

streaming_df.writeStream \
.foreachBatch(process_micro_batch) \
.start() \
.awaitTermination()

Summary

Spark Structured Streaming, in combination with AWS Kinesis, offers a powerful solution for processing and analyzing streaming data. By integrating Spark with Kinesis, organizations can leverage the scalability and reliability of Kinesis for data ingestion while utilizing Spark’s advanced processing capabilities. The combination of Spark Structured Streaming and AWS Kinesis empowers organizations to build real-time data processing applications with ease, leveraging the scalability, durability, and reliability of Kinesis and the advanced processing capabilities of Spark. This integration enables businesses to extract valuable insights and make data-driven decisions from streaming data sources seamlessly and efficiently.

Thank you!!!

--

--