Kedro goes streaming

Understanding and unlocking the potential of streaming analytics with Kedro

--

Stream of data

As a data practitioner, there are scenarios in which working with real-time data streams is essential to delivering timely insights.

Unlike batch analytics, which involves processing large volumes of data in predefined batches, streaming analytics processes real-time data as it arrives, offering immediate insights and enabling real-time decision-making. Streaming data is therefore well-suited for time-sensitive applications that require continuous analysis and fast response to constantly flowing data.

If you already use Kedro, an open-source data pipeline framework, to help you build production-ready data science projects, you’ve probably wondered if you can apply it to projects beyond batch analytics applications. Now you can! Kedro has recently introduced support for a Streaming Dataset, enabling real-time data handling.

The following diagram illustrates where the analysis of streaming data fits into your data pipeline.

Diagram illustrating where analysis of streaming data fits into the data pipeline
Diagram of where streaming datasets fits into your pipeline

This post will illustrate how to use the Kedro Streaming Dataset by building a working proof of concept (PoC) and suggest some tips and tricks for further consideration.

Build an event-processing proof of concept with Kedro

Streaming data is used across industries such as stock trading, social media, gaming, the Internet of Things, and inventory management, where real-time data streams provide efficient inventory handling, offering benefits such as real-time updates, demand forecasting, automated alerts, and anomaly detection.

Probably the best way to demonstrate how this new Kedro dataset works in practice is to build a PoC, in this case, an inventory management streaming system which includes two data sources: real-time sales data and real-time inventory data on each product.

Configuring and setting up streaming datasets in Kedro

The first step is to install Kedro and Kedro Datasets:

# Create and activate the virtual environment
python3.8 -m venv kedro_env
source kedro_env/bin/activate

# Upgrade pip
pip install - upgrade pip

# Install Kedro and Kedro Datasets
pip install kedro==0.18.10 kedro-datasets==1.4.1

Next, we will include two Kedro Hooks to enhance the PoC:

  • PySpark Integration Hook — this will enable the integration of PySpark into the Kedro application, allowing us to leverage its powerful data processing capabilities
  • Continuous Streaming Query Hook — this will ensure our Kedro application runs continuously, enabling seamless handling of streaming queries in real time

To add the Hooks, include the following code snippet in your src/$your_kedro_project_name/hooks.py file:

from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession

class SparkHooks:
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""

# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader.get("spark*", "spark*/**")
spark_conf = Spark

# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context._package_name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")

class SparkStreamsHook:
@hook_impl
def after_pipeline_run(self) -> None:
"""Starts a spark streaming await session
once the pipeline reaches the last node
"""

spark = SparkSession.builder.getOrCreate()
spark.streams.awaitAnyTermination()

Then, once the hooks.py has been created, it must be registered in the settings.py as follows:

src/$your_kedro_project_name/settings.py

from streaming.hooks import SparkHooks, SparkStreamsHook
HOOKS = (SparkHooks(), SparkStreamsHook())

Kedro data catalog definition

To define the sales and inventory datasets, first set up your stream of data using the specified schema. Once set up, determine the streaming sources in the config.yml file as follows, along with their schema definition:

raw.inventory:
type: spark.SparkStreamingDataSet
filepath: data/01_raw/stream/inventory/
file_format: json
load_args:
schema:
filepath: data/01_raw/schema/inventory_schema.json

raw.sales:
type: spark.SparkStreamingDataSet
filepath: data/01_raw/stream/sales/
file_format: json
load_args:
schema:
filepath: data/01_raw/schema/sales_schema.json
Metadata information of raw inventory dataset
Table 1: Metadata information of raw inventory dataset
Metadata information of raw sales dataset
Table 2: Metadata information of raw sales dataset
inventory_schema.json
{
"fields": [
{
"metadata": {},
"name": "sku",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "new_stock",
"nullable": true,
"type": "long"
},
{
"metadata": {},
"name": "event_time",
"nullable": true,
"type": "string"
}
],
"type": "struct"
}
sales_schema.json
{
"fields": [
{
"metadata": {},
"name": "sku",
"nullable": true,
"type": "string"
},
{
"metadata": {},
"name": "quantity_sold",
"nullable": true,
"type": "long"
},
{
"metadata": {},
"name": "event_time",
"nullable": true,
"type": "string"
}
],
"type": "struct"
}

As shown in the diagram below, our inventory management pipeline reads the raw inventory and sales data in real-time from their respective file paths, during which the system captures the processing time (“Add Processing Time”). These records are saved to the “New Sales” and “New Inventory” memory datasets and are then used in three streaming processes within their respective Kedro notes, ensuring a smooth flow of data throughout the inventory management process system.

Diagram demonstrating the inventory manageent data pipeline
Diagram demonstrating the inventory manageent data pipeline

Tips and tricks for streaming datasets

We’ve seen how the Kedro Streaming Dataset can be used to build a simulated real-time inventory management system. This is, of course, just one example of the dataset in action.

Here are some further tips you might want to consider when using streaming datasets in the future

  • Logging — When dealing with Spark Streaming, you’ll notice that Kedro logs all the steps as it discovers them before the pipeline begins. This is the opposite to dealing with batch processes, where Kedro logs progress as it happens.
  • Kedro runner — Using the following command, the ThreadRunner executes the pipeline to maximize concurrency:
kedro run --runner=ThreadRunner
  • Kafka connection — Integrating Kafka with Spark currently requires an additional jar, although testing for Structured Streaming integration with Kafka is planned for the future.
  • DataFrame Schemas — Streaming DataFrame Schemas are enforced. Structured Streaming from file-based sources requires you to specify the schema rather than relying on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures.
  • Submitting Kedro pipelines to another cluster — Finally, if you intend to integrate Kedro Streaming Dataset into a project and deploy a Spark job to a cluster, this blog post offers step-by-step guidance.

In summary

Streaming data is the future of analytics. Businesses across all industries increasingly expect access to real-time insights to inform their decision-making processes. They simply don’t have the luxury of time to wait for the results of batch analytics. The introduction of its support for streaming datasets means Kedro is well placed to ensure your data analytics capabilities are ahead of the curve.

You can read more about Kedro here, and find the Kedro Spark Streaming Dataset plugin here.

Authored by: Haris Michailidis, Principal Data Engineer, Tingting Wan, Data Engineer, Tom Kurian, Data Engineer, Shamama Muhit, Senior Data Engineer

Special contributions from: Evangelos Theodoridis, Senior Principal Data Scientist, Ivan Danov, Senior Principal Machine Learning Engineer, Nok Lam Chan, Engineer

--

--

QuantumBlack, AI by McKinsey
QuantumBlack, AI by McKinsey

We are the AI arm of McKinsey & Company. We are a global community of technical & business experts, and we thrive on using AI to tackle complex problems.