Prefect 2.7 is out with Pause & Resume and robust Cancellation functionality
And many more enhancements!
Prefect 2.7 ships with robust flow run cancellation, pause and resume functionality allowing manual approval from the UI, logging of print statements, and agent concurrency limits.
We’ll cover the enhancements in this post. As always, check the release notes and our documentation to learn more.
Flow run cancellation
We’re excited to announce a new robust way of canceling remote workflow execution. Flow runs can be canceled from the CLI, UI, REST API, or Python client. For example, to cancel a given flow run from the CLI, use:
prefect flow-run cancel flow_run_id
How does it work?
When cancellation is requested, the flow run is moved to a Cancelling
state. The agent monitors the state of flow runs and detects that cancellation has been requested. The agent then sends a signal to the flow run infrastructure, requesting termination of the run. If the run does not terminate after a grace period (default of 30 seconds), the infrastructure will be killed, ensuring the flow run exits.
How does it differ from Prefect 1 implementation?
Unlike the implementation of cancellation in Prefect 1 — which could fail if the flow run was stuck — this provides a strong guarantee of cancellation.
Note: this process is robust to agent restarts but does require that an agent is running to enforce cancellation.
Cancellation implementation differs across infrastructure blocks
Support for cancellation has been added to all core library infrastructure types:
At the time of writing, cancellation support is in progress for all collection infrastructure types:
- ECS Tasks (PrefectHQ/prefect-aws#163)
- Google Cloud Run Jobs (PrefectHQ/prefect-gcp#76)
- Azure Container Instances (PrefectHQ/prefect-azure#58)
At this time, this feature requires the flow run to be submitted by an agent — flow runs without deployments cannot be canceled yet, but that feature is also coming soon.
Flow run pause and resume
In addition to cancellations, flow runs can also be paused and resumed after manual approval.
This release adds a new utility called pause_flow_run
. When you call this utility from within a flow, Prefect will move the flow run to a Paused
state and will block further execution. Any tasks that have begun execution before pausing will finish. Infrastructure will keep running and polling to check whether the flow run has been resumed. Paused
flow runs can be resumed with the resume_flow_run
utility or from the UI.
Additionally, you can provide a timeout to the pause_flow_run
utility. This way, if the flow run is not resumed within the specified timeout, the flow will be marked as Failed
.
Example: manual approval after evaluating the model quality
Here is an example of how you can leverage that functionality to approve post-processing after manually validating the quality of a trained ML model:
from prefect import task, flow, pause_flow_run
from prefect import get_run_logger
from prefect.context import get_run_context
from prefect.blocks.notifications import SlackWebhook
from prefect.settings import PREFECT_UI_URL
def get_ui_flowrun_url() -> str:
id_ = get_run_context().flow_run.dict().get('id')
ui_url = PREFECT_UI_URL.value() or "http://ephemeral-orion/api"
return f"{ui_url}/flow-runs/flow-run/{id_}"
def send_alert(message: str):
slack_webhook_block = SlackWebhook.load("default")
slack_webhook_block.notify(message)
@task
def run_initial_processing():
logger = get_run_logger()
logger.info("Processing something important 🤖")
logger.info("Calculating the answer to life, the universe, and everything...")
@task
def run_something_critical():
logger = get_run_logger()
logger.info("We'll reveal the answer to life, the universe, and everything!")
logger.info("The answer is... 42!")
@flow
def semi_manual_process(
process: str = "ChatGPT training",
) -> None:
logger = get_run_logger()
run_initial_processing()
url = get_ui_flowrun_url()
send_alert(f"{process} finished. Please approve to continue processing: {url}")
logger.info("Waiting for approval...")
pause_flow_run(timeout=600)
logger.info("Process got approved! 🎉 Moving on to the next task")
run_something_critical() # post-processing, ML training process, reporting on KPIs
if __name__ == "__main__":
semi_manual_process()
The UI will show:
You can grab the flow run ID from the UI and run:
from prefect import resume_flow_run
resume_flow_run("eca5860f-d94b-4be6-a0b4-6954af3bc6e8")
Once you run the above code or resume the execution from the UI, the run will continue:
Resuming execution from the UI
This example sends a Slack notification when the process is ready for manual approval.
When you click on this link, you’ll see a friendly “Resume” button:
Once you click on it and confirm, the run will move again into a Running
state:
And soon, it will reveal the answer to life, the universe, and everything!
This blocking style of pause that keeps infrastructure running is supported for all flow runs, including subflow runs. See #7637 for more details.
Logging of print statements in flows and tasks
Flows or tasks can now opt-in to logging print()
statements. This works similarly to the log_stdout
feature in Prefect 1, but we've improved the scoping so you can enable or disable the feature at the flow or task level.
Flow-level print statements
In the following example, the print()
statements will be redirected to the logger with the INFO
level for the flow run and task run accordingly:
from prefect import flow
@flow(log_prints=True)
def hi():
print("Hi from Prefect! 🤗")
if __name__ == "__main__":
hi()
The output from these prints will appear in the UI.
Capture prints from custom functions
This feature will also capture prints made in functions called by tasks or flows. As long as you’re within the context of the run, the prints will be captured by the Prefect backend as logs.
from prefect import task, flow
def business_logic():
print("custom non-Prefect code")
@task
def my_task():
print("world 🌍")
business_logic()
@flow(log_prints=True)
def my_flow():
print("hello 👋")
my_task()
if __name__ == "__main__":
my_flow()
Disable capturing logs for sensitive tasks
If you have sensitive log messages, you can opt-out of logging the task-level print()
statements, even if the flow decorator is configured with log_prints=True
:
from prefect import task, flow
def business_logic():
print("custom sensitive data")
@task(log_prints=False)
def my_task():
print("print this only locally to the terminal")
business_logic()
@flow(log_prints=True)
def my_flow():
print("hello 👋")
my_task()
if __name__ == "__main__":
my_flow()
This print statement will appear locally in the terminal:
21:31:37.513 | INFO | prefect.engine - Created flow run 'gigantic-cockatoo' for flow 'my-flow'
21:31:37.593 | INFO | Flow run 'gigantic-cockatoo' - hello 👋
21:31:37.605 | INFO | Flow run 'gigantic-cockatoo' - Created task run 'my_task-20c6ece6-0' for task 'my_task'
21:31:37.606 | INFO | Flow run 'gigantic-cockatoo' - Executing 'my_task-20c6ece6-0' immediately...
print this only locally to the terminal
custom sensitive data
21:31:37.636 | INFO | Task run 'my_task-20c6ece6-0' - Finished in state Completed()
21:31:37.651 | INFO | Flow run 'gigantic-cockatoo' - Finished in state Completed('All states completed.')
…but won’t be sent to the Prefect logger or API:
See the logging documentation and PR #7580 for more details.
Agent flow run concurrency limits
Agents can now limit the number of concurrent flow runs they are managing. For example, you can start an agent with a limit of 10 concurrent runs:
prefect agent start -q default --limit 10
How does it differ from the work-queue concurrency limit?
If your agent is polling for work from a single work queue, this behavior is equivalent to setting the limit on a work queue and starting an agent:
prefect work-queue create default --limit 10
prefect agent start -q default
Therefore, the agent flow run concurrency limit is especially helpful for agents that poll for work from multiple work queues but can’t execute more than X runs concurrently:
prefect agent start -q etl -q ml --limit 10
How does it work?
When the agent submits a flow run, it will track it in a local concurrency slot. If the agent manages more than 10 flow runs, the agent will not accept any more work from its work queues. When the infrastructure for a flow run exits, the agent will release a concurrency slot, and another flow run can be submitted.
When is this useful?
This feature is especially useful for limiting resource consumption when running flows locally. It also provides a way to roughly balance the load across multiple work queues.
Thanks to @eudyptula for contributing! See #7361 for more details.
Further enhancements & fixes
Here is a copy of the enhancements and fixes from the release notes.
Integration enhancements
Agent-related enhancements
- Increase default agent query interval to 10s — #7703
- Add agent reporting of crashed flow run infrastructure — #7670
- Add jitter to the agent query loop — #7652
Cancellation-related enhancements
- The final state of a flow is now
Cancelled
when any task finishes in aCancelled
state — #7694 - Raise
CancelledRun
when retrieving aCancelled
state's result — #7699
Database-related enhancements
- Use a new database session to send each flow run notification — #7644
- Increase default database query timeout to 10s — #7717
API-related enhancements
- Include final state logs in logs sent to API — #7647
- Update login to prompt for “API key” instead of “authentication key” — #7649
Other enhancements
- Add
tags
andidempotency_key
torun deployment
— #7641 - Disable cache on result retrieval if disabled on creation — #7627
- Add default messages to state exceptions — #7705
- Update
run_sync_in_interruptible_worker_thread
to use an event — #7704
Fixes
- Prompt workspace selection if the API key is set, but API URL is not set — #7648
- Use
PREFECT_UI_URL
for flow run notifications — #7698 - Display all parameter values a flow run was triggered with in the UI (defaults and overrides) — #7697
- Fix bug where result event is missing when wait is called before submission completes — #7571
- Fix support for sync-compatible calls in
deployment build
— #7417 - Fix bug in
StateGroup
that causedall_final
to be wrong — #7678 - Add retry on specified httpx network errors — #7593
- Fix state display bug when state message is empty — #7706
Documentation enhancements & fixes
- Fix heading links in docs — #7665
- Update login and
PREFECT_API_URL
configuration notes — #7674 - Add documentation about AWS retries configuration — #7691
- Add GitLab storage block to deployment CLI docs — #7686
- Add links to Cloud Run and Container Instance infrastructure — #7690
- Update docs on final state determination to reflect
Cancelled
state changes — #7700 - Fix link in ‘Agents and Work Queues’ documentation — #7659
Next steps
As always, to upgrade to the latest version, run:
pip install prefect -U
If you have any questions or encounter issues when using this release, you can reach us via our Community Slack and Prefect Discourse.
Massive kudos to Michael Adkins 💙 — this entire blog post is essentially copy-paste version of his excellent release notes.
Happy Engineering!