Modular Data Stack — Build a Data Platform with Prefect, dbt and Snowflake (Part 4)
Scheduling, data ingestion, and backfilling implemented with modular building blocks and simple deployment patterns
This is a continuation of a series of articles about building a data platform with Prefect, dbt, and Snowflake. If you’re new to this series, check out this summary linking to previous posts. This demo will be hands-on and dive into scheduled and ad-hoc runs, the difference between ad-hoc and scheduled runs, local development with Prefect, data ingestion, and backfilling.
To make the demo easy to follow, you’ll see this 🤖 emoji highlighting sections that prompt you to run or do something (rather than only explaining something). The code for the entire tutorial series is available in the prefect-dataplatform GitHub repository.
Table of contents· 🤖 Creating a flow run from deployment
· 🤖 Inspecting parent and child flow runs from the UI
· 🤖 Scheduling a deployment
∘ Scheduling is decoupled from execution
∘ 🤖 Scheduling from the UI
∘ 🤖 Scheduling from CLI
· Local development of data platform workflows
∘ 🤖 Getting started with data platform development
∘ 🤖 Coordinate Python with Prefect
· Ingestion flow
∘ What is data ingestion
∘ Example: on a conceptual level
∘ 🤖 Running the ingestion flow locally
· Backfills
∘ 🤖 Backfilling locally from code IDE
∘ 🤖 Backfilling from a terminal
∘ 🤖 Backfilling from the UI
· Adding ingestion logic to a custom Prefect Block
∘ Prefect Block capabilities
∘ Using a custom Prefect Block for data ingestion
∘ Using the same Prefect Block for ML use cases
∘ Benefits of Prefect Blocks
· Next steps
🤖 Creating a flow run from deployment
Assuming that you followed the previous posts, you should now have a development environment with blocks and deployments. Let’s start an agent in a terminal using the following Prefect CLI command:
prefect agent start -q default
Then, to start a run from deployment, you can use the UI:
Or use the following CLI command in a separate terminal window:
prefect deployment run jaffle-shop-ingest-transform/local-process
The output will generate a flow run URL allowing you to navigate to the UI to inspect the logs easily:
🤖 Inspecting parent and child flow runs from the UI
The Subflow Runs UI tab makes it easy to navigate from the parent flow run sassy-tench
to the subflow runs:
If you go to the child flow run page for the last child flow run spiritual-pegasus
(flow named raw-data-jaffle-shop
), you can inspect its task runs:
Also, you can navigate from the parent flow run sassy-tench
to the child flow run magenta-cuttlefish
, corresponding to the flow dbt-jaffle-shop
.
The task run page for dbt transformations is particularly interesting because the Dbt
block from the prefect-dataplatform repository parses the dbt manifest and adds custom emojis for dbt run
, dbt test
, and other dbt
CLI commands.
While a DAG diagram can be helpful to visualize dependencies, a very large dbt DAG loses its usefulness as a visualization tool:
Often a search functionality to filter only for relevant tasks is more helpful than a dependency graph. The “Search by name” UI filter can become a great troubleshooting tool once your dbt DAG gets large. Neither visualization option is better than the other, but each has its pros and cons:
- use a dbt DAG visualization for dependency tracking
- use the search functionality in the Prefect UI to find the component that you want to inspect
🤖 Scheduling a deployment
The last section covered how you can trigger an ad-hoc flow run from the UI or CLI. Once you validate with a manual run that everything is working as expected, you can attach a schedule to this deployment.
Scheduling is decoupled from execution
It’s worth highlighting that Prefect decouples scheduling from execution. This reflects a common and sensible engineering process:
1) First, you create a deployment
2) Then, you manually trigger an ad-hoc run and validate that:
a) Your dataflow works as designed.
b) Your data looks as expected.
3) Once you have validated that everything is working correctly, you can add a schedule to run the production deployment on a regular cadence.
Many users migrating to Prefect from legacy orchestration tools find this decoupling helpful in managing their deployment processes. They are often positively surprised that Prefect (finally) makes such engineering workflow possible and that it allows more control over execution and scheduling.
Since the first two steps (creating a deployment and starting an ad-hoc run) have been completed in the previous sections, we can move to step three: scheduling.
🤖 Scheduling from the UI
You can add a schedule to any deployment directly from the UI:
Adding a schedule this way is practical as you would typically validate the ad-hoc run (step 2) from the UI, anyway. You can use the integrated calendar and dropdown menus to reliably pick the right start_date
, timezone
, and scheduling interval
.
We’ve all “been there, done that” — you certainly have some scars from timezone conversions or timestamp parsing when defining schedules via code or config files. You can tell us your horror story via Slack. Until then, you can enjoy the user-friendly scheduling experience from the Prefect UI as shown below:
🤖 Scheduling from CLI
As an alternative to scheduling your deployments from the UI, you can use the set-schedule
CLI command to attach a schedule to a deployment. Here is an example CLI command setting a daily schedule at 2 AM in the America/New_York
timezone:
Soon after you add a schedule, you should see some new scheduled runs in your flow run UI page.
Local development of data platform workflows
So far, we’ve covered several aspects of building development and production environments, creating blocks and deployments, and creating runs. But all that was focused on the productionization aspect of a data platform. Let’s now look at how you can develop dataflows locally.
🤖 Getting started with data platform development
You can start any new workflow by building your logic first in just Python (and potentially also SQL and dbt) and leveraging existing integrations from Prefect Collections and Blocks. Then, you can coordinate the steps using a functional programming style allowing clean and readable dataflow.
🤖 Coordinate Python with Prefect
Once your Python logic is ready, you can add flow
(and optionally, also task
) decorators and use Prefect states to define:
- the order — task
ingest
must finish successfully before the tasktransform
can start - the orchestration — task
clean_up
should always run at the very end, even if tasksingest
andtransform
fail; similarly, the subflow concurrently refreshing all dashboards should only run if the ETL part completed without errors - the concurrency and parallelism — tasks
refresh
can be called at the same time for multiple dashboards that need to be refreshed, so you can put that logic into a subflow, use mapping and assign either (the default)ConcurrentTaskRunner
for concurrent execution; for parallelism, use eitherDaskTaskRunner
orRayTaskRunner
— the tasks can then be submitted to that task runner and gain performance benefits.
The above Gist demonstrates that:
- moving from Python script to a Prefect flow is as simple as adding a
flow
decorator - task runners make it transparent which parts of your workflow should run sequentially and which should run concurrently or in parallel
- returning and operating on states makes it easy to define dynamic conditional orchestration logic triggering a subflow or a task only if needed and skipping that step otherwise; for example, you may want to update your dashboards only if the final ETL task was completed without errors
- if you want to always run a certain task regardless of upstream failures, such as the cleanup task (line 58), you can submit it to a task runner; This way, if a task has no dependencies, it will always be executed — all that can be accomplished without tedious orchestration logic or manual exception handling.
Ingestion flow
The last section covered a generic workflow for any local development with Prefect. This section will dive into ingestion workflows, and in future posts, we’ll cover data transformation with dbt and ML flows as well.
What is data ingestion
Data ingestion involves extracting data from some source system and loading it into a destination, e.g., your Snowflake data warehouse. Data replication tools, such as Fivetran and Airbyte, provide connectors to help automate that process. However, regardless of whether you leverage a data replication tool or build the ingestion logic yourself, those workflows, syncing data from source to destination, need to be regularly triggered to keep your Snowflake data warehouse up-to-date. This is where Prefect can help.
Example: on a conceptual level
The prefect-dataplatform repository includes a script performing data ingestion. Even though the demo data is randomly generated (using the super fun faker
library), the actual script represents a real-life scenario of concurrently running multiple data ingestion steps. Several Prefect features facilitate that process:
1) Prefect blocks make it easy to combine securely stored credentials with capabilities to load raw data. This logic can be defined once and reused across all ingestion processes.
2) The default ConcurrentTaskRunner
makes concurrency easy — as long as you add the .submit()
method when calling a task, it will be submitted to a task runner for asynchronous concurrent execution.
Why is this useful for ingestion workflows? Because data ingestion is an IO operation — you are connecting to external APIs and databases to extract and load data without the need for parallel processing across CPU cores. With the ConcurrentTaskRunner
, you can achieve highly performant ingestion without having to deal with distributed compute or multithreading.
💡 TMYK: You can specify a custom task runner as an attribute on the flow decorator — given that the
ConcurrentTaskRunner
is the default, you don’t have to explicitly define it.
🤖 Running the ingestion flow locally
Here is the full ingestion workflow using the default ConcurrentTaskRunner
and custom Prefect block with the capability to load data (line 76).
You can run it locally as any other Python script:
python flows/ingestion/ingest_jaffle_shop.py
If you would like to increase the dataset size, you can adjust the parameter value in the last line:
And more importantly, if you want to load data for a different time interval, you can adjust the start_date
and end_date
parameters:
Reloading data for a given time period is often called backfilling. We’ll cover that next.
Backfills
The example ingestion flow defines a start_date
and end_date
to make it easy to backfill data for any date range when needed. Suppose you have a daily ingestion flow that usually loads data for the time range from yesterday to today. Imagine that this flow failed over the weekend. As a result, data generated in source systems during that time period is missing in your data platform.
🤖 Backfilling locally from code IDE
To fix that weekend failure, set the start_date
to last Friday when triggering a custom run. This backfill run can be initiated from your code IDE, Prefect UI, or CLI. Assuming that today would be Monday, November 14th, 2022, here is how you could trigger this backfilling workflow locally from your machine:
🤖 Backfilling from a terminal
To do the same from the CLI, you can leverage the following command:
This will generate a link to the UI:
The Parameters
tab in the UI allows you to inspect which start_date
was used for that run:
🤖 Backfilling from the UI
Finally, you can trigger a backfill run directly from the UI by leveraging the custom run option and modifying parameter values for the run:
Use the date picker to select the dates for backfilling and click on Run:
The backfilling flow run is now scheduled for the requested time range:
In the same way, if you want to backfill a longer historical time range, you don’t need to redeploy your code or deal with complicated backfill engines baked into a legacy orchestrator. With Prefect, all you have to do is specify a
start_date
with a date in the past and trigger a run that will take care of ingesting that historical data.
Adding ingestion logic to a custom Prefect Block
So far, we’ve looked at how you can build ingestion flows locally and from deployments, but we didn’t cover how they are built under the hood. To make ingestion modular, reusable, and easy to change, the prefect-dataplatform repository provides a custom Prefect Block called SnowflakePandas
.
You can change this block type name to match your team's needs — here,
SnowlakePandas
indicates that this block is used to interact with Snowflake using Pandasread_sql
andto_sql
operations.
Prefect Block capabilities
This custom block currently does the following:
- it retrieves the credentials from the
SnowflakeConnector
, which is a Prefect block from the Snowflake collection - it constructs a connection string from those Snowflake credentials
- it uses this securely generated connection string to read and write Snowflake data using SQL
Using a custom Prefect Block for data ingestion
With the secure configuration and modularity of blocks, all our ingestion flows need to do is:
- Load the block
- Call the block method to load raw data
Using the same Prefect Block for ML use cases
A flow that processes data for a machine learning use case can leverage the same block to read data from Snowflake. This way, your credentials are managed in one place, and several teams can reuse business logic defined as a block capability:
Benefits of Prefect Blocks
This section highlighted the benefits of blocks:
- Blocks are defined by their attributes and capabilities, not specific purposes. Instead of a separate connector or abstraction for ETL and another one for ML, you can have one block that represents a given system or resource, securely manage credentials to access it, and you can add various block methods that represent what you can do with that system.
- Blocks are extensible — as your system evolves, you may continue adding new methods to represent new capabilities or use cases.
- Instead of duplicating credentials definition and business logic, blocks can be combined together into modular components (as we did by referencing the
SnowflakeConnector
on theSnowflakePandas
block). - Blocks can be defined once and reused across many flows, teams, and use cases.
Next steps
This post demonstrated how to create scheduled and ad-hoc runs, how to locally develop data platform flows, and how you can approach data ingestion and backfilling using Prefect Blocks and parametrization.
If anything we’ve discussed in this post is unclear, feel free to tag me when asking a question in the Prefect Community Slack.
Thanks for reading, and happy engineering!