How to Make Your Data Pipelines More Dynamic Using Parameters in Prefect
How to pass runtime-specific parameter values to your data pipelines
Parametrization is one of the most critical features of any modern workflow orchestration solution. It allows you to dynamically overwrite parameter values for a given run without having to redeploy your workflow. Most orchestration frameworks provide rather limited functionality in that regard, such as only allowing to override global variables. Prefect, however, provides a first-class abstraction for handling dynamic parametrized workflows. Let’s look at it in more detail.
Table of contents:· How to start an ad-hoc parametrized flow run
∘ 1. Start a parametrized local flow run from a Python client
∘ 2. Start a parametrized local flow run from a CLI
∘ 3. Start a parametrized remote flow run from a CLI
∘ 4. Start a parametrized remote flow run from an API call
∘ 5. Start a parametrized remote flow run from the UI
∘ 6. Start a parametrized remote child flow run from a parent flow
∘ Section summary
· How to schedule parametrized flows
∘ Setting Parameter defaults
· How to use Parameter values in a state handler
· Things to watch out for
∘ How to use dynamic values properly
∘ Avoid Zombie-Parameters
∘ Allowed parameter values
∘ Allowed parameter names
· Next steps
How to start an ad-hoc parametrized flow run
Parameter task is a simple abstraction to run dynamic workflows that adjust their behavior based on the parameter value. To use it, you need to specify your
Parameter task within the
Prefect provides a great amount of flexibility with respect to how you want to trigger your flow. Let’s look at six different ways to start a parametrized workflow.
1. Start a parametrized local flow run from a Python client
When running your flow locally, you can pass your parameter values to the
parameters keyword in the
The above code will trigger three flow runs, each with a different parameter value:
2. Start a parametrized local flow run from a CLI
If you prefer running your flows from CLI, here is how you can trigger a flow run with custom parameter values:
prefect run -p parametrized_flow.py --param x=3
The output of this:
You can see that when using the Prefect CLI, you don’t need to use
click or similar command-line interface libraries.
3. Start a parametrized remote flow run from a CLI
Let’s say that you want to trigger your flow from a terminal, but you want to run it on your remote agent (e.g. in a Kubernetes cluster). To do that, you need to register your flow:
prefect register --project community -p parametrized_flow.py
To trigger a remote flow run that will be picked up by your corresponding agent, we can use the CLI by using the flow name:
prefect run --name parametrized_flow --param x=42 --watch
4. Start a parametrized remote flow run from an API call
But what if you need to start a flow run from a serverless function or some other programming language? You can leverage the GraphQL API. Here is how we can use the same parametrized flow in an API call:
5. Start a parametrized remote flow run from the UI
The easiest and most accessible way of triggering a parametrized flow run is the Prefect UI. To start your flow, navigate to the “Run” tab of the respective flow page:
From here, you can enter your desired parameter values and click on “Run”.
6. Start a parametrized remote child flow run from a parent flow
Prefect is the only open-source workflow orchestration platform that provides first-class support for the parent-child workflow orchestration pattern. Here is how you can start a parametrized child flow run from a parent flow:
create_flow_run task allows specifying which flow do we want to trigger, and what parameter values should we use for this flow run. The additional task
wait_for_flow_run ensures that this parent flow is only considered successful if the child flow ran without any issues. Thanks to the
stream_logs argument, we can see the child flow run logs directly from the parent flow.
We can use the CLI to register and run the parent flow:
Regardless of whether your ad-hoc flow runs are executed locally or on remote infrastructure, Prefect allows you to trigger your parametrized workflows via:
- a Python client,
- a command-line interface (CLI),
- an API call (e.g. using the
requestslibrary or even plain
- Prefect UI,
- other (parent) flows.
No matter which of the above methods you use, with
Parameter tasks you immediately gain the advantage of dynamic runtime-parametrization of your workflows.
How to schedule parametrized flows
So far, we’ve looked at how to overwrite default parameter values at runtime when triggering flows ad-hoc. Let’s explore how to attach parameter values to your schedules. The code block below demonstrates how you can run two workflows simultaneously every minute, each with different default parameter values:
This flow will trigger two flow runs every minute — one with parameter value 9, and another one with 99.
The UI allows you to inspect which parameter values have been used for each run:
Setting Parameter defaults
When you schedule flow runs, parameter defaults are particularly important. Without providing default values either on your Parameter task or on the clock, Prefect will not know which value to use and won’t be able to run such flow on schedule. Here are two ways of setting parameter defaults:
How to use Parameter values in a state handler
Sometimes you may need to access parameter values within a state handler. For instance, when alerting about a failed task run, you may want to include the parameter value within a Slack message. To do that, you can leverage the Prefect context:
The example below shows how to use a state handler to rename a flow run based on the provided parameter value:
We can register this flow and run it twice: once with a default, and once with a custom parameter value:
By following the flow run URL printed in the CLI output, you should see that both flow runs have been renamed as intended:
Things to watch out for
While parameters provide an extremely convenient abstraction to make your workflows more dynamic, the default parameter values themselves are static and can be overridden at runtime. This means that Prefect evaluates default parameter values at registration time and stores those as “frozen” values in the backend. There are several implications of this:
- Don’t use dynamic dates such as
datetime.datetime.today()as a default value. Since Prefect evaluates those at registration time, your data pipeline would get stuck reliving the same day, much like Phil Connors in the movie Groundhog Day.
- Don’t store any sensitive data as default parameter values unless you are fine that this data will be stored in a Prefect Cloud backend. All parameter values are serialized and persisted using PrefectResult.
- Don’t use empty or large values such as large text or JSON documents as parameter values since the backend API allows a payload of up to 5 MB. Large objects can be stored in a resource like an S3 bucket, and you could use the Parameter default to point to its location.
How to use dynamic values properly
Some Prefect users try to use parameters for backfilling workflows. This is a common anti-pattern (don’t do this):
This approach will backfire as long as the default values are used.
To mitigate the issue of default parameter values being frozen at runtime, you can use a separate task to return a dynamic default date. Then, if the parameter value is not
None, your flow will use a custom value provided at runtime rather than the default date generated in your task. Here is an example that illustrates this approach (do that instead):
Note that default parameter values are set to
None. The default values are therefore generated within the tasks and custom parameter overrides are only used when those are set explicitly when running the flow from CLI, UI, API, or from another flow.
Here is how we can start this parametrized flow using CLI:
💡 You don’t have to use Parameters! The easiest way of implementing backfilling flows is not by using
Parametertasks but rather by leveraging the KV Store. The documentation provides an example of how to go about it.
Apart from handling the parameter values correctly, make sure to either use the Parameter value in your downstream tasks or add it to the flow explicitly. Here is a common anti-pattern to watch out for:
When you trigger this flow, it will generate a
ValueError: Flow.run received the following unexpected parameters: dummy
The error happens because the
dummy_parameter is not used anywhere in the flow, nor is it added to the flow structure manually. Therefore, Prefect does not recognize
dummy as a valid parameter. The first solution is to pass the parameter value to some downstream task as a data dependency:
Alternatively, you can explicitly add the Parameter task to your flow as follows:
Allowed parameter values
The only rule regarding parameter values is that they must be JSON serializable. This means that you can’t use Python objects such as a numpy array, but you can use all JSON serializable Python data types such as strings, integers, lists, or dictionaries. For instance, the flow run from the last section had
None as a default value which looks as follows in the UI:
But if you want to set a custom value for an ad-hoc backfilling flow run, you could leverage the date picker built into the UI, which prevents from accidentally entering wrong values such as invalid dates:
Allowed parameter names
You are free to choose any name for your parameter, as long as there is only one parameter with that name in the flow. All Prefect tasks have a “slug”, which is a name that uniquely identifies a task in a flow, including the order in which this task appears in a flow. For each
Parameter task, the slug is automatically and immutably set to the parameter name, which ensures that the flow has no other parameters with the same name. This is the only “gotcha” when naming your parameters.
In this post, we took a deep dive into parametrized workflows using Prefect. If you don’t want to miss any next Prefect posts, sign up for our newsletter. Also, if anything about what we’ve discussed in this post is unclear, feel free to ask your questions in our community Slack.
We can’t wait to see what you build. Thanks for reading, and happy engineering!