Creating a Spark Streaming ETL pipeline with Delta Lake at Gousto
This is how we reduced our data latency from two hours to 15 seconds with Spark Streaming.
We used to get files from the software that controls Gousto’s factory once a day via an SFTP server: several CSV files containing atomic data for each box that went through the production line on the previous day, such as the timestamps of when a box starts and exits the line. This data was used by Gousto's operations teams to measure our Supply Chain performance and detect issues on production lines.
We had an ingestion pipeline composed of a Lambda Function that moves files from the SFTP server to our Data Lake in S3 plus a job triggered by Airflow on EMR. The whole pipeline was ingesting CSVs, applying some simple transformations, saving table as Parquet and exposing data to users with Redshift Spectrum.
The EMR job alone was taking about two hours to complete. But since it was processed overnight, fresh data was available every morning for reporting.
Then everything changed.
Files started arriving in 15 minutes chunks, instead of once a day. This was an old request from operations that was finally being fulfilled by the factory software. Of course, we immediately got a request to update the reports built on top of this data at the same frequency.
As you can already notice, with a job that takes two hours to execute, the best we could do was a two hours refresh latency on reports. With the company shifting towards a Supply Chain Data Strategy, we had to rethink our ETL to deliver production line data in almost real-time.
I will try to explain why we made some choices, how our stack evolved and also provide some code to help out others that are facing similar challenges.
Why was it taking two hours?
The first approach was to understand why that job was taking so long to execute and try to tune it for better performance. We noticed the processing was inefficient and that the bigger our tables, the worse it would get. Either by requiring more time to complete, or more processing power.
Here is a brief description of what our Spark job was doing:
Adding new partitions to raw table
Our Spark job was first running
MSCK REPAIR TABLE on Data Lake Raw tables to detect missing partitions. This can be a costly operation because it needs to scan the table’s sub-tree in the file system (the S3 bucket). This was already taking a lot of time to complete, and it would only take longer over time since we were adding new partitions with multiple small files every day.
Defining dataframes for Raw and Cooked Tables
Spark is lazy, so just creating the dataframes for raw and cooked tables is not a problem. Just remember that raw is composed of the original CSV files and Cooked is composed of transformed Parquet files. Both dataframes were not being cached, this will be important to understand the subsequent steps.
Counting rows in both dataframes
The idea here was to stop processing early on if there was no data to be moved from raw to cooked — only if the count in raw was greater than the count in cooked, the job would continue for that table.
To count rows Spark will scan all CSV files in all partitions in raw and aggregate the data. For cooked this is better since the data is already stored as Parquet, so its metadata will already contain
Selecting rows to be inserted
Here the job was calculating the maximum value of a timestamp column in cooked and then using it to select rows in raw that were greater than max.
Because the dataframes were not cached, Spark was again reading Parquet metadata to get the max value. Applying filter on raw is lazy, so no problems here.
Union cooked dataframe to filtered rows
The selected rows from raw were then unioned back to cooked dataframe. Lazy operation again.
Write the new table to Data Lake Cooked
Finally, it would call write to Data Lake Cooked. Here, Spark was reading again all CSVs in raw and all Parquet in cooked to execute the operation. It was also overwriting the cooked table, rewriting all files in all partitions.
Rinse and repeat for multiple tables and data sources
To deliver this data required by operations we had to process eight tables. However other sources were also using the same job, and it was not possible to decouple them without creating separate clusters for each data source.
Here is a simple diagram of this process to make it easier to understand:
Just from reading the above description, we can see a few easy wins:
MSCK REPAIR TABLEwith
ALTER TABLE ADD PARTITION
- Cache the dataframes
- Append to cooked instead of overwriting
But we still had to deal with other issues, such as files that arrive out of order or containing unprocessed records from previous hours. Because we were filtering on max timestamp, those records would never be processed and moved from raw to cooked — remember that now we were receiving data every 15 minutes instead of daily.
We had a few ideas on how to solve it, but once started working on implementation, found challenges within our architecture.
Our architecture was slowing down development
We were depending on Airflow to spin up EMR clusters and submit jobs. Gousto's data processing architecture was basically composed of two pieces:
- Artefacts: A repository that would publish our scripts to an S3 bucket.
- Airflow: The orchestration tool responsible for spinning up a new EMR cluster, submitting the jobs and then terminating the cluster.
We were not allowed to create clusters using AWS CLI or the console. It was also tough to prototype our ideas because we didn't have proper access to EMR notebooks.
Once we were more or less confident about our code, we had to deploy it to a development environment to test with real data. This involved:
- Waiting while CI pipeline publishes scripts to S3.
- Going to Airflow, manually deselecting all tables we didn't want to test and triggering the DAG.
- Waiting on average ten minutes for EMR cluster.
- Wait for the job to fail or succeed.
- Going to AWS console to find log files.
As you can notice, it was very, very painful. The minimum feedback loop — the time between submitting code and the job failing — was around 20 minutes.
Developing anything in those settings would take forever. This influenced our first architecture decision: to move away from EMR and adopt a platform that would allow us to have complete control of our clusters and prototyping. After some research, we decided to give Databricks a try.
Moving from batch to streaming
After deploying Databricks in a separate AWS account and granting access to our Data Lake and Glue Catalog we were finally ready to work on improvements to our ETL job.
After discussing our challenge with Databricks, we came up together with a new paradigm: move from batch to streaming processing. This was also much more aligned to Gousto's future data plans, so why not start now?
The main advantage was that we wouldn't need to compare raw and cooked anymore to decide which data needs to be processed. The streaming job would only process new incoming files. As a bonus, we wouldn't need Airflow anymore to manage clusters and submit jobs.
A diagram describing the new architecture is as follows:
We would need to create a notification service to detect new files in Data Lake Raw and then use s3-sqs from Qubole to trigger Spark Streaming Batches. Here Databricks was very helpful. We just used Autoloader, and it took care of setting up bucket notification, SNS and SQS for us in the background. Not having to maintain this infrastructure saved us a lot of time.
Here is an example of our code to create a streaming job:
This script is very similar to what you'll find in Spark's and Databricks' docs and it works really well for this use case, where each new file contains new data that needs to be appended to the destination table in Data Lake Cooked.
Note that we had to create a raw table definition in Glue Catalog. Spark Streaming (and Autoloader) cannot infer schema at this moment, so before reading the stream, we have to fetch the schema from Glue.
We prototyped everything in Databricks notebooks. When we were confident that our code was good enough, we moved it to a separate project and implemented the CI/CD automation. I describe this implementation and provide some code at the end of this post.
Note on Writing as Delta
Our target data architecture is completely lake-based. Choosing to write as Delta in the cooked layer was natural. Delta supports ACID transactions, upserts and is an open-source format — maintained by Databricks — based on Parquet. Although for this first use case, we didn't use any of those features, we have other sources that would benefit from Delta.
Our data lake implementation is not finished yet. We still have part of our data only available in Redshift. To create reports on top of this factory data, we need to join tables from data lake with tables from Redshift. To overcome that we decided to expose those Delta tables through Redshift Spectrum.
At that moment there was no native Redshift connection to Delta. Following the suggested implementation from Databricks proved to be inefficient and very slow. To overcome those performance issues, we temporarily created another streaming job that reads from Delta and writes back as Parquet in the served layer. But this is not needed anymore, because now Spectrum supports Delta natively.
Benefits of new architecture
With this approach, we now have complete control of our clusters. We decide the permissions we need to attach to them, how big they are, which spark config are set, etc.
It takes about five minutes to spin up a cluster, and then we can easily attach notebooks to prototype new features and debug issues. The feedback loop went from 20 minutes to 20 seconds because notebooks allow exploring the dataframes and inspecting the traceback and logs interactively.
After deploying the new streaming job to production, the latency to update data in cooked layer went down from two hours to 15 seconds. This was essential to meet the Supply Chain Data Strategy requirements of having reports refreshed at the same rate data is received.
Our cooked data is not exposed to final users. We use DBT to manage our data modelling and create a single source of truth for the business. DBT reads from Data Lake Cooked (using Spectrum) and persists models directly to Redshift, which are then accessed by BI tools and analysts.
The whole implementation, from the first contact with Databricks to have the job running in production took about two months — which was surprisingly fast given the size of Gousto tech and the governance processes in place.
Costs remained almost the same as before. The previous job was running every two hours — and taking two hours to complete. In practical terms, it was a 24/7 cluster. We were able to use smaller instances with the streaming cluster, which compensated for the higher costs of Databricks (when compared to EMR).
Confidence in Supply Chain data — and reports — also improved a lot since we deployed the new pipeline. The streaming pipeline is running steadily without any issues since deployment (5 months already), while the previous batch job was failing constantly.
A good side effect was the reduction in our codebase complexity. We went from 565 to 317 lines of Python code. From 252 lines of YML configuration to only 23 lines. We also don't have a dependency on Airflow anymore to create clusters or submit jobs, making it easier to manage.
We switched off the old batch pipeline. Now our ETL stack is 100% streaming. Our operations teams have performance dashboards and reports being refreshed near-real-time and we are one step closer to fulfil our Supply Chain Data Strategy objectives.
For those interested, I'm also describing below how we are applying continuous integration with Databricks for our production pipelines.
Automated Deploy / Continuous Integration
We didn't want to execute production pipelines from notebooks. Our idea was to have a separate repository on GitHub that would contain all our pipeline code and configuration. We also wanted to benefit from continuous integration, making sure that our jobs are automatically deployed to Databricks when code is merged to master branch.
This is how our project is organized:
1. Python package
The contents of this folder will be packaged (as a Python egg). They are the actual pyspark scripts that define our pipeline and any associated configuration. This is where we create the stream demonstrated previously.
2. Cluster definition
Everything inside this directory defines a cluster on Databricks. The
raw_to_cooked subdirectory is packaged as egg file as described above. All contents of this folder are uploaded to S3. The package is installed on cluster and job.py is called when the cluster starts.
MANIFEST.in and setup.py
Files responsible for creating python egg from
This is the script that will be executed when the cluster starts. It imports
raw_to_cooked package and calls a function that will start the streaming jobs.
This is a Jinja template to generate the JSON payload required by Databricks Jobs API. Values such as environment and AWS Account ID are rendered into template just before deployment. Here I'm showing a rendered template as an example.
Note that we reference the egg file inside
libraries and also the Python file that will be executed inside
3. Multiple clusters
Here we see multiple cluster definitions. Each subdirectory maps to one cluster in Databricks.
4. Root directory
This is our project root directory, where we keep the deployment scripts, tests and CircleCI configuration.
Our CI Script that will define the workflow to deploy new clusters and jobs to Databricks. It calls
setup to generate the egg file and uploads everything to S3 using
deploy-artefact.sh. Then it calls Databricks Jobs API with
api_request.py. I'm not sharing the implementation for those; it should be quite simple to implement by yourself.