Dr. PySpark: How I Learned to Stop Worrying and Love Data Pipeline Testing
One major challenge in data pipeline implementation is reliably testing the pipeline codes. The outcome of the code is tightly coupled with data and the environment and this consequently blocks the developer to follow test-driven development, identify early bugs by writing good unit testing, and release the code via CICD with confidence.
One way to overcome the reliability challenge is to use immutable data to run and test the pipeline so that the result of ETL functions can be matched against known outputs.
Obviously, this requires a good knowledge of the application and how well the data matches business requirements. Also required are some set-ups to enable the developer to focus on building the application instead of spending time on the environment preparation.
This blog-post focuses on providing a model of self-contained data pipelines with CICD implementation.
The idea is to incrementally develop and test the pipeline based on locally stored data in immutable files. We use Apache Spark and its Python(PySpark) APIs for developing data pipelines and Pytest to test it.
Required spark environment, DataFrames, and tables will be made available during testing using a Pyspark based conftest file based on the configuration stored in a JSON file named test_bed.json.
We will structure and our pipeline into decoupled modules. We then focus on the heavy lifting of testing environment set-up. Once we have the test cases ready, we plug in to Jenkins based CICD.
This blog is very detailed and meant to be followed along with the code in https://github.com/soyelherein/pyspark-cicd-template
For demonstration purposes, let’s consider we have a single file of pipeline that consumes file containing “pageviews” data and merges it into a final target table. The code is tightly coupled with the environment in terms of files and data. It is a single standalone file taking care of everything from starting and stopping SparkSession.
The application can be submitted using Spark with the below simple command.
If you look closely, there are five major sections in the pipeline creation of spark session, static configuration variables, Extract, Transform, Load.
Let’s now deep dive into structuring the project. The overall structure would look like below. Each part will be explained in the later section.
Decouple Spark Environment
As it becomes tedious and impractical to test and debug spark-jobs by sending them to a cluster (_spark-submit_) and teams can become Sherlock Holmes — investigating clues in stack-traces on what could have gone wrong.
pipenv — To avoid the lifeless scenarios we might encounter, we can create an isolated environment (say thanks to pipenv) to initiate a Pyspark session whereas:
- all development and production dependencies are described in the Pipfile
- pipenv helps us managing project dependencies and Python environments (i.e. virtual environments)
- convenient with dependencies management on an ad-hoc basis just with
pip install pipenv --dev
dependencies.job_submitter — Since a data application can have numerous upstream and downstream pipelines, it makes sense to take the spark environment management and other common tasks into a shared entry point so that the applications can focus only on their business logic.
This submitter module takes the job name as an argument and executes the functionality defined in it. The pipeline itself has to expose a run method(discussed in the Decouple Application section) that is the entry point for the ETL. With this submitter module, the command is changed like below:
--py-files dependencies/job_submitter.py, jobs/pipeline_wo_modules.py \
dependencies/job_submitter.py --job pipeline_wo_modules
It is entrusted with starting and stopping spark sessions, parsing the configuration files containing static variables, and any dynamic command-line arguments then executing the requested pipeline. Please head back to the Github repo for the details.
jobs — We design our functions to have Extract and Load functions to handle the IO operations, we will test those using mocks to avoid side effects. Transform functions are designed to be side-effect free taking DataFrames input and returning DataFrames output which can be compared against the locally stored data. Additionally, we will have an entry point method named to run for our pipeline doing the integration of the ETL. Developers are encouraged to have different pipeline files inside the jobs directory focusing on different business logic instead of having a single big file.
Extract — Reads the incremental file and historical data from the table and return 2 Dataframes
Transform — Calculates the metrics based on incremental and historical DataFrames and return a final DataFrame
Load — Writes the data into the final output path
Run — Does the integration between ETL process. It is exposed to the job submitter module. It accepts the spark session, job configurations, and a logger object to execute the pipeline.
configs and ddl — We will take out the static configurations and place them in a JSON file (configs/config.json) so that it can be overwritten as per the test config.
As explained in the job_submitter module, this config along with any dynamic parameters to the job is made available to the pipeline methods as a dictionary.
We will also take out the schema from the code in the ddl/schema.py file. This will be helpful to create the test data in the form of DataFrames and Tables using a helper method during testing.
Given that we have structured our ETL jobs in testable modules we are all set to focus on the tests.
conftest — We have used Pytest style tests for our pipeline along with leveraging a few features (i.e. mock, patch) from unittest. This file does the heavy lifting of setting up jobs for tests i.e. providing test sparkSession and mocks creating the tables and DataFrames locally from the CSV files. The mapping is defined in the testbed.json file.
This config is pretty self-explanatory. We have defined the DataFrame and table details under the “data” key. If the job accepts any dynamic parameter as job-args(i.e. process_date), that override should be part of the “config” key. It would be sent as a dictionary argument to the job. setup_testbed, (please refer to the Github for implementation details), helper method is responsible for producing the DataFrame and tables once the test_bed.json file is configured. The file format can be configured as per the need in the conftest, default is as shown below.
For read and write operations we encourage teams to use the generic methods like “read.load” and “write”, instead of “read.csv” or “read.orc” so that our mocks can be more generic. This file must be changed per specific needs.
test_pipeline —We have created a session-level Pytest fixture containing all the hard work done in the conftest in an object. As you see in the later section we will perform the entire testing using its member attributes.
Now let’s test our transform method that takes the incremental and historical DataFrames as input and produces the final DataFrame.
Since the I/O operations are already been separated out we can introspect the calling behavior of extract and load using mocks. These mocks are set up in the conftest file.
Since we have already tested individual methods we can make use of patching to do the integration test by patching the outcomes of different functions and avoiding side-effects of writing into the disk.
These tests can be run from IDE or by simply running `pytest` command.
In a complex production scenario, related pipeline methods can be connected in terms of inputs and expected outputs which is immutable. A fair understanding of application and segregation of different subject area can provide a valuable regression like confidence for CICD integration.
Dockerfile — Contains the dockerized container with the virtual environment set up for the Jenkins agent.
Makefile — This Makefile utility zips all the code, dependencies, and config in the packages.zip file so that Jenkins can create the artifact, and the CD process can upload it into a repository. The final code can be submitted as below:
--py-files packages.zip \
--files configs/config.json \
dependencies/job_submitter.py --job pipeline --conf-file configs/config.json
Jenkinsfile — It defines the CICD process. where the Jenkins agent runs the docker container defined in the Dockerfile in the prepare step followed by running the test. Once the test is successful in the prepare artifact step, it uses the makefile to create a zipped artifact. The final step is to publish the artifact which is the deployment step.
All you need to have a Jenkins setup where you define a pipeline project and point to the Jenkins file.
Source code: https://github.com/soyelherein/pyspark-cicd-template
Best Practices for PySpark ETL Projects
I have often lent heavily on Apache Spark and the SparkSQL APIs for operationalising any type of batch data-processing…
Best Practices Writing Production-Grade PySpark Jobs
How to Structure Your PySpark Job Repository and Code