Prefect 2.5 Makes Modular and Serverless Dataflows Easier Than Ever
Prefect 2.5 adds new serverless infrastructure capabilities, batch deletion from the UI, and utilities to orchestrate flow runs from deployments
Managing infrastructure and building deployment processes for dataflows can be challenging. Prefect helps solve that problem by providing flexible building blocks that allow you to easily provision and maintain the underlying runtime environment, including several serverless options. We are excited to share that Prefect now supports Google Cloud Run (and now also Azure Container Instances) for deploying dataflows as serverless containers.
As another serverless enhancement, we’ve recently added ECSTask
customizations to allow setting (and overriding) custom AWS VPCs, subnets, security groups, and more.
Lastly, Prefect 2.5 ships with utilities to support a commonly requested dataflow design pattern that involves triggering flow runs from deployments, waiting for their completion, and tracking dependencies across deployments, thereby further improving collaboration between teams and code dependency management.
We’ll cover those new features in this post. To learn more about all improvements, check the release notes and our documentation.
Going with the (serverless) flow: Google Cloud Run
If you are not familiar with Prefect Deployments, they allow you to turn any flow into an API-managed entity. The key components of deployments are storage and infrastructure blocks that make it easy to configure (even from the UI) where your code lives and where it should be executed at runtime.
We’ve recently added one more infrastructure block to the mix: CloudRunJob
. It allows you to deploy your flows as serverless containers to GCP, as well as to spin up arbitrary containerized tasks within that platform.
What are blocks? They combine stateful information (including secrets) with extra capabilities to provide you with flexible building blocks applicable to various scenarios. You can leverage the same block to configure where your flow runs should be deployed, and you can leverage the same infrastructure block to run other containerized tasks that may even be written in another programming language.
Demo of the Google Cloud Run infrastructure block
To get started, install the GCP collection: pip install prefect-gcp
.
If the GCP block types are not available within your UI, you can register them server-side using the commands:
Now, you can configure the block values either from the code:
Or from the UI:
Once you have saved your infrastructure block, you can use it within your deployments by leveraging the -ib
flag in the format -ib cloud-run-job/block_name
.
Here is an example of using it in combination with a GCS block (-sb gcs/dev
) and a flow named hello
located in the script demo.py
:
You can also use this block directly to trigger a command in a serverless container running on GCP:
Serverless on Azure: Azure Container Instances
This infrastructure block works the same way as the above-described Google Cloud Run job:
- Install the Prefect Collection for Azure:
pip install prefect-azure
. - Run:
prefect block register -m prefect_azure.container_instance
. - Configure your blocks from the code or from the Prefect UI (as shown in the image below)
- Use the infrastructure block in your deployments or in your flows.
To read more about that block, check out this blog post:
AWS ECS task customizations
Continuing the serverless enhancements, PR #120 exposes customizations of the payload for AWS ECS tasks, allowing users to:
- set fields not available at the top level
- override any of the options that Prefect sets
We follow the standard for supplying JSON patches to add or modify customizations which supports replacing field values, adding new fields, appending to lists, etc. This common and familiar standard allows full customization of payloads without the complexity of different merge implementations across platforms (AWS ECS, Kubernetes, etc.).
Use cases this opens up for Prefect users leveraging AWS ECS include:
- running ECS tasks only in private
subnets
, - passing existing
securityGroups
to enable communication between ECS tasks and other network-isolated services in custom VPCs, - disabling public IP when
assignPublicIp
is set toFalse
.
Here is an example of how this works:
For more details and examples, check the ecs_customizations section of the dataflow-ops AWS ECS repository template.
Create flow runs from deployments
A common flow design we see among Prefect users is what we often call the orchestrator pattern. In such a setup, users are interested not only in representing dependencies between tasks, subflows, and flows but also in running deployments that depend on each other. Many data (platform) engineers requested support for that pattern to build modular data ingestion and transformation workflows.
Here are common problems that users are trying to solve:
- “I want to trigger a flow run from my custom application via an API call”
- “One step in my workflow depends on a process maintained by another team — I want to trigger a flow run from that team’s deployment and wait for its completion before starting my task or flow”
- “I want to treat each workflow as an independent
job
that can be triggered both independently and as part of a parent workflow” - “I want to be able to start my parametrized
dbt run
independently from the parent flow orchestrating a full data warehouse refresh pipeline” - “I want to run each subflow on a different
infrastructure
- my transformation flow requires aKubernetesJob
with GPU resources, while the extract and load process only requires a CPU”.
PR 7047 makes all the above use cases possible with a utility function called run_deployment
. Here is a simple usage example triggering a parametrized run from a deployment:
Example: data warehouse refresh pipeline coordinate work from different teams and running backfills
Suppose your data team consists of data engineers building ingestion workflows, analytics engineers building data transformations, and ML engineers running training jobs and batch predictions. Prefect deployments let you orchestrate work from multiple teams without stepping on each other’s toes:
The UI makes it easy to inspect the logs and execution details of each child flow run:
Each team can maintain its own flow deployment process without affecting other engineers, while still being able to orchestrate processes that depend on other teams’ work.
Example: ML training job that requires a GPU
Imagine that you want to kick off a flow running an ML training process on a remote infrastructure (e.g., a Kubernetes job with GPU resources), wait for its completion, and then once this job is completed, continue with some postprocessing, generating batch predictions, and creating a custom report with the final results. Only the step that runs on infrastructure with GPU needs to be triggered from deployment, while other steps could run even on your laptop. Here is how you could do that with the run_deployment
feature:
Example: trigger a flow run from deployment from a FastAPI app
Another common usage pattern we see is that users trigger a flow run via an API call from a custom application. While technically, you could execute the flow directly (and Prefect will still track that flow run execution), you may instead prefer to execute that flow on specific infrastructure.
Suppose you have a flow that runs for an hour, and you don’t want that your FastAPI application (which triggers that flow) executes such a long-running job directly. As long as you specify timeout=0
(see line 12), the flow run will be triggered without waiting for completion.
Example: schedule a one-off run later
Let’s say that you are scraping data from an API. Once you hit a certain limit, you want to stop further processing and instead schedule the next parametrized run in five hours (possibly passing the last scraped timestamp as a parameter to the next scheduled run). Here is how you can accomplish that:
Example: running each step of a workflow in a separate container or Kubernetes pod
Some users prefer running each step of a workflow in a separate container or Kubernetes pod to better manage code dependencies, potentially even orchestrating work written in other programming languages. The run_deployment
utility makes that easy:
As soon as Prefect receives the instruction to trigger a run, it will coordinate the execution, as configured in the deployment referenced by a name.
Batch flow run deletion from the UI
Thanks to PR 7086, the UI now supports batch deletion! If you want to delete multiple scheduled or failed runs, you can now easily do that from the UI. And it looks so pretty! 💙
Other bug fixes, enhancements, and documentation additions
There are many additional enhancements, fixes, and docs improvements. We’ll just list those here and you can dive into details through links to the corresponding PR:
Enhancements
- Update
put_directory
to exclude directories from upload counts — #7054 — big thanks to Stéphan Taljaard for contributing! 🚀 - Always suppress griffe logs — #7059
- Add OOM warning to
Process
exit code log message — #7070 - Add idempotency key support to
OrionClient.create_flow_run_from_deployment
— #7074
Fixes
- Fix default start date filter for deployments page in UI — #7025
- Fix
sync_compatible
handling of wrapped async functions and generators — #7009 - Fix bug where server could error due to an unexpected null in task caching logic — #7031
- Add exception handling to block auto-registration — #6997
- Remove the “sync caller” check from
sync_compatible
— #7073
Documentation
- Add
ECSTask
block tutorial to recipes — #7066 - Update documentation for organizations for member management, roles, and permissions — #7058
New collections
- New prefect-soda-core collection for integration with Soda. Huge thanks to Alessandro Lollo for contributing this fantastic and important integration! 💯
Next steps
This post covered a lot! If you have any questions about any of this, you can reach us via our Community Slack and Prefect Discourse.
Happy Engineering!