A modern ETL pattern using Prefect’s S3 Storage and Kubernetes runtime
Since I’m fond of cats, the Great British Baking Show, and dragging out analogies… I’ll say “there are many ways to bake a cake” when it comes to creating modern ETL-like jobs moving data from point A to point B.
However, for data engineers, the challenge is rarely baking a single cake once— it’s in baking many types of cake on inter-dependent schedules, in a customizable manner that’s easy to repeat, manage, scale, and understand.
Prefect is an orchestration engine that takes your family-favorite cake recipes and makes them easy to repeat, manage, scale, and understand — erm… well, I mean, just as long as the cake recipes are actually Python jobs.
Below I’ll lay out a Prefect ETL pattern with 3 main considerations in mind:
- rapid deployment — addressing Docker storage’s drawbacks at scale, RE: building / pushing a Docker image to a registry whenever code changes
- scalability — offered by containerized runtime environments like EKS
- modularization — clear categorization of various Python jobs (flows) into Prefect Projects, based on intended outcomes and required dependencies
What do I need in order to use a pattern like this?
This specific example will require a few pieces of supporting infrastructure:
- AWS S3 Bucket for storing our python jobs as scripts (how to provision)
- AWS EKS Cluster for job execution (how to provision using Terraform)
- AWS ECR Repository for custom base-image — optional (how to set up)
I’m using AWS for cloud, but this same pattern works for GCP / Azure.
Why are we using Prefect again?
We’ll use a few of Prefect’s convenient features (* → Prefect Cloud only):
- the S3 Storage object (used to automatically upload flows to an S3 Bucket)
- the KubernetesRun run configuration (used to specify an image & more)
- PrefectSecrets (for accessing sensitive values like credentials, API keys, etc)
- the key-value store* (e.g. for stateful flow input Parameters)
How does all of this piece together? i.e code plz
Most directly, we can build our flows using Prefect’s Functional API. In less true but generally Pythonic terms, we throw some specially @task
-decorated Python functions in a Flow
context manager and let Prefect go to work:
Wait, when does it run?
Here’s what this Flow
context looks like as a DAG.
By running the Python file as is, Prefect will use our Flow
context definition (lines 47–50 from above) to build a DAG (Directed Acyclic Graph, i.e. execution plan). The DAG is inferred by Prefect based on how our task
objects are called and how return
values are passed between them.
Let’s break down these 4 lines, since there’s a fair amount happening here:
- We defined a data sources Parameter in a JSON-like
List[Dict]
in terms of whatrequests.get()
would need toextract
each source. - We tell Prefect to dynamically map this
extract()
task across each source in data sources, producingraw_data: List[raw_piece_of_data]
- We map our
transform()
task over theraw_data
fromextract()
(which is inferred to be an upstream task, i.e.transform
iffextract
succeeds) - The terminal task will
load()
our squeakyclean_data
to our destination of choice — storing any relevant flow results in a bucket.
Now that our flow is built, we can use flow.run()
outside the flow context to trigger flow runs locally:
if __name__ == "__main__":
flow.run()
… or we can also use the CLI or the Prefect Cloud UI to trigger flow runs.
Where?
At this point, we can create a Prefect Cloud project and assign this flow to it:
prefect register -p flow.py --project DEMO
This registration will upload our flow as a script to the bucket
using the {PROJECT}/flow.py
key defined in the S3_storage
object — no need to build and push a whole new Docker image to deploy iterative changes or fixes.
The flow will now be loaded from S3 and executed on the EKS cluster as prompted by our Kubernetes Agent, which can be based on a Schedule, manual triggers from the UI, or event-driven Client
/ GraphQL API calls.
To handle dependencies, I built a custom image on top of the Prefect Python 3.9 base image including hypothetically common Python libraries to this (demo) Prefect project. I pushed that project’s image to an AWS ECR repo, stored the image URL in Prefect KV store as {PROJECT}_BASE_IMAGE
, and provided it to KubernetesRun()
as the image to run on a pod at runtime.
A couple thoughts on scalability
These data sources could easily be defined elsewhere as a JSON-like object, loaded at runtime, and passed to flow runs as Parameters. A good example of ‘elsewhere’ would be the Prefect KV store, where you can store and update stateful values for use in Flow
runs, like this example.
In order to further parallelize our mapped tasks in production, we could use Dask Kubernetes’ KubeCluster
, as outlined in a great article by George Coyne.
This pattern was designed to be extensible across projects, teams and organizations. I chose to store my flows in sub-divisions of my S3 bucket according to Prefect Project
, but flows can be stored in buckets in whatever logical grouping of S3 blobs is most convenient for the project at hand.
—
Any questions on patterns for python ETLs leveraging Prefect workflow orchestration? Don’t hesitate in reaching out to us in our Slack Community.
Happy engineering!
Nathan Nowack — nate@prefect.io