The Prefect Blog
Published in

The Prefect Blog

Prefect 2.7 is out with Pause & Resume and robust Cancellation functionality

And many more enhancements!

Flow run cancellation

We’re excited to announce a new robust way of canceling remote workflow execution. Flow runs can be canceled from the CLI, UI, REST API, or Python client. For example, to cancel a given flow run from the CLI, use:

prefect flow-run cancel flow_run_id

How does it work?

When cancellation is requested, the flow run is moved to a Cancelling state. The agent monitors the state of flow runs and detects that cancellation has been requested. The agent then sends a signal to the flow run infrastructure, requesting termination of the run. If the run does not terminate after a grace period (default of 30 seconds), the infrastructure will be killed, ensuring the flow run exits.

How does it differ from Prefect 1 implementation?

Unlike the implementation of cancellation in Prefect 1 — which could fail if the flow run was stuck — this provides a strong guarantee of cancellation.

Cancellation implementation differs across infrastructure blocks

Support for cancellation has been added to all core library infrastructure types:

Flow run pause and resume

In addition to cancellations, flow runs can also be paused and resumed after manual approval.

Example: manual approval after evaluating the model quality

Here is an example of how you can leverage that functionality to approve post-processing after manually validating the quality of a trained ML model:

from prefect import task, flow, pause_flow_run
from prefect import get_run_logger
from prefect.context import get_run_context
from prefect.blocks.notifications import SlackWebhook
from prefect.settings import PREFECT_UI_URL


def get_ui_flowrun_url() -> str:
id_ = get_run_context().flow_run.dict().get('id')
ui_url = PREFECT_UI_URL.value() or "http://ephemeral-orion/api"
return f"{ui_url}/flow-runs/flow-run/{id_}"


def send_alert(message: str):
slack_webhook_block = SlackWebhook.load("default")
slack_webhook_block.notify(message)


@task
def run_initial_processing():
logger = get_run_logger()
logger.info("Processing something important 🤖")
logger.info("Calculating the answer to life, the universe, and everything...")


@task
def run_something_critical():
logger = get_run_logger()
logger.info("We'll reveal the answer to life, the universe, and everything!")
logger.info("The answer is... 42!")


@flow
def semi_manual_process(
process: str = "ChatGPT training",
) -> None:
logger = get_run_logger()
run_initial_processing()
url = get_ui_flowrun_url()
send_alert(f"{process} finished. Please approve to continue processing: {url}")
logger.info("Waiting for approval...")
pause_flow_run(timeout=600)
logger.info("Process got approved! 🎉 Moving on to the next task")
run_something_critical() # post-processing, ML training process, reporting on KPIs


if __name__ == "__main__":
semi_manual_process()
from prefect import resume_flow_run

resume_flow_run("eca5860f-d94b-4be6-a0b4-6954af3bc6e8")

Resuming execution from the UI

This example sends a Slack notification when the process is ready for manual approval.

Logging of print statements in flows and tasks

Flows or tasks can now opt-in to logging print() statements. This works similarly to the log_stdout feature in Prefect 1, but we've improved the scoping so you can enable or disable the feature at the flow or task level.

Flow-level print statements

In the following example, the print() statements will be redirected to the logger with the INFO level for the flow run and task run accordingly:

from prefect import flow


@flow(log_prints=True)
def hi():
print("Hi from Prefect! 🤗")


if __name__ == "__main__":
hi()

Capture prints from custom functions

This feature will also capture prints made in functions called by tasks or flows. As long as you’re within the context of the run, the prints will be captured by the Prefect backend as logs.

from prefect import task, flow


def business_logic():
print("custom non-Prefect code")


@task
def my_task():
print("world 🌍")
business_logic()


@flow(log_prints=True)
def my_flow():
print("hello 👋")
my_task()


if __name__ == "__main__":
my_flow()

Disable capturing logs for sensitive tasks

If you have sensitive log messages, you can opt-out of logging the task-level print() statements, even if the flow decorator is configured with log_prints=True:

from prefect import task, flow


def business_logic():
print("custom sensitive data")


@task(log_prints=False)
def my_task():
print("print this only locally to the terminal")
business_logic()


@flow(log_prints=True)
def my_flow():
print("hello 👋")
my_task()


if __name__ == "__main__":
my_flow()
21:31:37.513 | INFO    | prefect.engine - Created flow run 'gigantic-cockatoo' for flow 'my-flow'
21:31:37.593 | INFO | Flow run 'gigantic-cockatoo' - hello 👋
21:31:37.605 | INFO | Flow run 'gigantic-cockatoo' - Created task run 'my_task-20c6ece6-0' for task 'my_task'
21:31:37.606 | INFO | Flow run 'gigantic-cockatoo' - Executing 'my_task-20c6ece6-0' immediately...
print this only locally to the terminal
custom sensitive data
21:31:37.636 | INFO | Task run 'my_task-20c6ece6-0' - Finished in state Completed()
21:31:37.651 | INFO | Flow run 'gigantic-cockatoo' - Finished in state Completed('All states completed.')

Agent flow run concurrency limits

Agents can now limit the number of concurrent flow runs they are managing. For example, you can start an agent with a limit of 10 concurrent runs:

prefect agent start -q default --limit 10

How does it differ from the work-queue concurrency limit?

If your agent is polling for work from a single work queue, this behavior is equivalent to setting the limit on a work queue and starting an agent:

prefect work-queue create default --limit 10
prefect agent start -q default
prefect agent start -q etl -q ml --limit 10

How does it work?

When the agent submits a flow run, it will track it in a local concurrency slot. If the agent manages more than 10 flow runs, the agent will not accept any more work from its work queues. When the infrastructure for a flow run exits, the agent will release a concurrency slot, and another flow run can be submitted.

When is this useful?

This feature is especially useful for limiting resource consumption when running flows locally. It also provides a way to roughly balance the load across multiple work queues.

Further enhancements & fixes

Here is a copy of the enhancements and fixes from the release notes.

Integration enhancements

  • Add Twilio SMS notification block — #7685
  • Add PagerDuty Webhook notification block — #7534

Agent-related enhancements

  • Increase default agent query interval to 10s — #7703
  • Add agent reporting of crashed flow run infrastructure — #7670
  • Add jitter to the agent query loop — #7652

Cancellation-related enhancements

  • The final state of a flow is now Cancelled when any task finishes in a Cancelled state — #7694
  • Raise CancelledRun when retrieving a Cancelled state's result — #7699

Database-related enhancements

  • Use a new database session to send each flow run notification — #7644
  • Increase default database query timeout to 10s — #7717

API-related enhancements

  • Include final state logs in logs sent to API — #7647
  • Update login to prompt for “API key” instead of “authentication key” — #7649

Other enhancements

  • Add tags and idempotency_key to run deployment#7641
  • Disable cache on result retrieval if disabled on creation — #7627
  • Add default messages to state exceptions — #7705
  • Update run_sync_in_interruptible_worker_thread to use an event — #7704

Fixes

  • Prompt workspace selection if the API key is set, but API URL is not set — #7648
  • Use PREFECT_UI_URL for flow run notifications — #7698
  • Display all parameter values a flow run was triggered with in the UI (defaults and overrides) — #7697
  • Fix bug where result event is missing when wait is called before submission completes — #7571
  • Fix support for sync-compatible calls in deployment build#7417
  • Fix bug in StateGroup that caused all_final to be wrong — #7678
  • Add retry on specified httpx network errors — #7593
  • Fix state display bug when state message is empty — #7706

Documentation enhancements & fixes

  • Fix heading links in docs — #7665
  • Update login and PREFECT_API_URL configuration notes — #7674
  • Add documentation about AWS retries configuration — #7691
  • Add GitLab storage block to deployment CLI docs — #7686
  • Add links to Cloud Run and Container Instance infrastructure — #7690
  • Update docs on final state determination to reflect Cancelled state changes — #7700
  • Fix link in ‘Agents and Work Queues’ documentation — #7659

Next steps

As always, to upgrade to the latest version, run:

pip install prefect -U

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store