PySpark Structured Streaming for multi format scalable Data Ingestion Workloads

Prithvijit Guha
9 min readJul 24, 2023

--

Introduction

What is (Py)Spark?

Apache Spark is a unified analytics engine for large-scale data processing. PySpark is the Python API for the Spark Engine, there are other high level API’s to the Spark Engine like Java, Scala and R

What is Spark Structured Streaming?

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive

What is Data Ingestion?

Data ingestion refers to the process of collecting, acquiring, and importing data from various sources into a data storage or processing system, such as a database, data lake, or data warehouse. In a lot of case this might be various formats like json, parquet, delta in batches or streaming.

Why Structured Streaming?

  • Checkpoints: Its a feature of Structured Streaming that let’s you read from where you left off, without you having to define that logic yourself
  • Multiple Data Formats: Most people assume that structured streaming mainly is associated with Kafka or some streaming source, however you can actually use Structured Streaming with multiple data formats like delta, parquet, json, orc, text or table from your hive metastore.
  • Triggers: This tells how often or what interval to process your data in, as a continuous stream or trigger “once” and stop once all new files are processed. (This can be useful when you are running in small intervals of 10mins)

Use cases?

There are of course way more use cases, but some I found useful are mentioned below:

  • Batch loading tables incrementally
  • Reading hourly datasets

Required tools for this tutorial

  • Python
  • Spark 3.3.1 and Above
  • Notebook environment IDE

OR

  • Databricks notebook which comes with both Python and Spark. Highly recommend since it requires minimal setup. You can test this with the Free Databricks Community Edition.

OR

Step 1:

Prepare your datasets

I prepared some simple data for upload into 3 separate csv’s.

Sample dataset actual data
Sample dataset sitting in local directory

Download them from here:

https://github.com/prithvijitguha/spark_structured_streaming_example/tree/main/sample_datasets

Upload your first dataset to HDFS/DBFS

Screenshot of uploaded dataset into file store or HDFS

Step 2:

Prepare the schema for our read stream, this only required for json and csv data types. Parquet, orc and delta can be read directly.

from pyspark.sql.types import StructType, StructField, StringType, BooleanType


# The schema specification is only required for json and csv. Parquet, Delta and orc should be fine without it
sample_dataset_schema = StructType([\
StructField("Firstname",StringType(),True),\
StructField("Lastname",StringType(),True),\
StructField("Jedi_Rank", StringType(), True),\
StructField("IsCouncilMember", BooleanType(), True),\
StructField("snapshot_date", StringType(), True)
])

Prepare our Data Stream reader

# Read the dataframe into a stream specifying the schema and load data directory
df_sink = spark.readStream.format("csv")\
.schema(sample_dataset_schema)\
.option("header", True)\
.load("dbfs:/FileStore/source_directory/*.csv")

Let’s break down the code and options:

  • format: source data format, can be json, parquet, delta
  • schema: A schema to read for csv and json formats
  • header: Reads the first row as the Header(only for csv)
  • load: The target directory we want to read from and only the *.csv, as we can see it accepts glob patterns for the directory. This can also be an AWS S3 bucket or Azure Storage Container s3://mybucket/

Prepare some standarizations we want to do on the source data. We convert all the column names to lower case and add input_file_name and a timestamp to the record

from pyspark.sql.functions import input_file_name, col, current_timestamp

# we can add some other parameters like renaming the columns to a more standardized snake_case format
# and adding input file name and record load timestamp
df_standardized = df_sink.select(
col("Firstname").alias("first_name"),
col("Lastname").alias("last_name"),
col("Jedi_Rank").alias("jedi_rank"),
col("IsCouncilMember").alias("is_council_member"),
col("snapshot_date").cast("date"),
input_file_name().alias("source_file_name"),
current_timestamp().alias("record_load_timestamp"),
)

Prepare the target writer

# Now we an write the stream to 
# a target location and specify a checkpoint as well
df_standardized.writeStream\
.format("parquet")\
.outputMode("append")\
.trigger(availableNow=True)\
.partitionBy("snapshot_date")\
.option("checkpointLocation", "dbfs:/sample_dataset_checkpoint/")\
.toTable("hrz_sample_dataset")

Let’s break down this code

Then we initialize a write stream, here are the options we provide and what they mean:

  • format: Input source data type. Others are parquet, delta, json, console
  • outputMode: How the data should be written to target, options are complete(like overwrite), append(appends the new rows to target), update(only updated rows are written)
  • trigger: How often this should run, availableNow=True, will process all the new rows/files and shutdown once down. This results in more cost savings as opposed to a continuous stream processing every 1 sec
  • partitionBy: How to partition your target table. This is good for performance.
  • checkpointLocation: A checkpoint location let’s us start from where we left off once we’re done with a batch or if the stream fails in between.
  • toTable: We save as a hive SQL table and save the data, you can save as external by passing the ‘path’ keyword. But you could also just pass ‘path’, ‘some_hdfs_path

Step 3:

Start your stream and view the dataset

loaded target table with standardized data

Step 4

Drop more datasets into your source directory and run the trigger again. You’ll notice only the new rows are updated and we are not reprocessing the old file again.

No duplicate rows and no need of an overwrite. Structured Streaming only processes the new files and rows.

Command cell in notebook where we ran the write stream query
Querying the loaded table with SQL

Alternative Setups and Configurations

Other optional ways to configure your ingestion workloads with Structured Streaming or Autoloader

Continuous stream options

# Incase you want to use a continuous stream, which is perpetually on
# Default trigger (runs micro-batch as soon as it can)
df_standardized.writeStream\
.format("console") \
.start()

# ProcessingTime trigger with two-seconds micro-batch interval
df_standardized.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()

Reading from table instead

The advantage of this method is that, any time this table is updated, we append rows to the target table

# Alternate forms of reading data, read from table
# In this case we only update the pipeline the table is updated
df_sink_table_format = spark.readStream.table("hrz_sample_dataset")

Using Databricks Autoloader example

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))

# Databricks Page: https://docs.databricks.com/getting-started/etl-quick-start.html

Visualize the pipeline with Flowrunner

Flowrunner is a python package to visualize your data pipelines. Here is an example of using the same visualization tool in your data pipeline. I am the creator of this package so I could not miss an opportunity to extend it with Structured Streaming

from flowrunner import BaseFlow, end, start, step
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
from pyspark.sql.functions import input_file_name, col, current_timestamp


spark = SparkSession.builder.getOrCreate()


class IngestionFlow(BaseFlow):
@start
@step(next=["standardize_data"])
def read_data_stream(self):
"""
This method is used to read the datastream
"""
# The schema specification is only required for json and csv. Parquet, Delta and orc should be fine without it
sample_dataset_schema = StructType([\
StructField("Firstname",StringType(),True),\
StructField("Lastname",StringType(),True),\
StructField("Jedi_Rank", StringType(), True),\
StructField("IsCouncilMember", BooleanType(), True),\
StructField("snapshot_date", StringType(), True)
])

# Read the dataframe into a stream specifying the schema and load data directory
self.df_sink = spark.readStream.format("csv")\
.load("dbfs:/FileStore/source_directory/*.csv", schema=sample_dataset_schema, header=True)



@step(next=["write_datastream"])
def standardize_data(self):
"""
This method we standardize the data a little bit with some column renaming
"""
# we can add some other parameters like renaming the columns to a more standardized snake_case format
# and adding input file name and record load timestamp
self.df_standardized = self.df_sink.select(
col("Firstname").alias("first_name"),
col("Lastname").alias("last_name"),
col("Jedi_Rank").alias("jedi_rank"),
col("IsCouncilMember").alias("is_council_member"),
col("snapshot_date").cast("date"),
input_file_name().alias("source_file_name"),
current_timestamp().alias("record_load_timestamp"),
)

@end
@step
def write_datastream(self):
"""
Here we write the dataframe to final DBFS/HDFS location
"""
# Now we an write the stream to
# a target location and specify a checkpoint as well
self.df_standardized.writeStream\
.format("parquet")\
.outputMode("append")\
.trigger(availableNow=True)\
.partitionBy("snapshot_date")\
.option("checkpointLocation", "dbfs:/sample_flow_checkpoint/")\
.toTable("hrz_sample_dataset_flow")



ingestion_flow = IngestionFlow() # create instance of pipeline


ingestion_flow.display() # display your flow

With Flowrunner we can create a diagram in real time without running the actual pipeline using the display() method

Visualize your ingestion pipeline flow with Flowrunner

Run your ingestion flow with the run() method

Run your structured streaming pipeline with flowrunner

Abstract Ingestion Pipeline Factory

Setup an abstract factory to build all ingestion pipelines.

from dataclasses import dataclass


@dataclass
class IngestionPipeline:
"""Sample Ingeestion Pipeline

This class is used as a reusable abstract factory to build new pipelines

Examples:

.. code-block:: python

ingestion_pipeline = IngestionPipeline()

ingestion_pipeline.read_data_stream(
file_path="dbfs:/FileStore/source_directory/*.csv",
file_type="csv",
schema=sample_dataset_schema,
header=True
)

ingestion_pipeline.write_data_stream(
data_format = "delta",
table_name = "some_target_table"
)
"""

def __post_init__(self):
"""Method invoked post creating instance to get a SparkSession"""
self.spark = SparkSession.builder.getOrCreate()

def read_data_stream(
self,
file_path: str,
file_type: str,
**kwargs
):
"""Method to read data into a stream using Spark Structured Streaming

Kwargs is used so we can pass any others arguments that may be required.
Since this method can be overriden, users
can override this to add any other stream capabilities that may not be covered

Args:
- file_path(str): A path or directory to look for, this argument accepts glob patterns.
eg. "dbfs:/some_directory_name/*.csv".
- file_type(str): A type of data format to read or look
for eg. "json", "parquet", "delta",
"csv"
- kwargs(dict): Any additional arguments to be added. eg. header, schema, etc
"""
self.stream_query = spark.readStream.format(file_type).load(file_path, **kwargs)

def write_data_stream(
self,
data_format: str,
table_name: str,
output_mode: str = "append",
checkpoint_location: str = None,
):
"""Method to write structured streaming dataframe to dbfs/hdfs

We create a reusable method to avoid writing boilerplate code again and again.
This method does not require a checkpoint to be
mentioned, but it instead will create a checkpoint for you.
This method can be overriden, users can override this to add
any writing functionality that might not be covered.

Args:
- data_format(str): Final write format to write in
eg. console, parquet, delta
- table_name(str): A name for SparkSQL/Hive table to save as
- output_mode(str, optional): Output mode, options are append,
update or complete(meaning overwrite)
- checkpoint_location(str, optional): An optional argument for
checkpoint location. If not specified, then this
method will use a default location for checkpoint location.

Note:
Although 'checkpoint_location' is optional, if not specified a default location will be
used, checkpoint is required
for tracking progress
"""
if not checkpoint_location:
checkpoint_location = f"dbfs:/_checkpoints/{table_name}"
self.stream_query.writeStream.format(data_format).outputMode(
output_mode
).trigger(availableNow=True).option(
"checkpointLocation", checkpoint_location
).toTable(
table_name
)

Handling Historical Data Loading

# Handling history data so that it does break your pipeline 
# Read the dataframe into a stream specifying the schema and load data directory
# We also mention the max files per trigger so that we only 20 per trigger pull
df_sink = spark.readStream.format("csv")\
.option("maxFilesPerTrigger", 20)\
.load("dbfs:/FileStore/source_directory/*.csv", schema=sample_dataset_schema, header=True)

Complete notebook with all examples is available here:

https://github.com/prithvijitguha/spark_structured_streaming_example/blob/main/structured_streaming_example.py

FAQ

Spark Structured Streaming vs Databricks Autoloader

Databricks autoloader builds on structured streaming with added scalability, performance and schema evolution

https://docs.databricks.com/ingestion/auto-loader/index.html#benefits-of-auto-loader-over-using-structured-streaming-directly-on-files

Spark Structured Streaming vs Spark Streaming(Legacy)

Spark streaming is the legacy version of streaming prior to Spark 2.0 which uses DStreams underneath. Structured streaming is the modern version of streaming which uses Datasets and Dataframe API which is supported and an evolving API

Further Reading

If you’re interesting in taking this further in your environments, I recommend this further reading

--

--