Prefect Workflow Automation: Step-by-Step

A Comprehensive Guide to Automating Your Workflows

Lucca Miorelli
Poatek
6 min readAug 23, 2023

--

Introduction

For engineers used to Apache Airflow, Prefect presents a compelling alternative. While Airflow offers robust customization options and intricate DAG structures, Prefect’s intuitive design with features like the @task and @flow decorators provides a smoother workflow management experience. Prefect's emphasis on simplicity and observability can be a refreshing change for those seeking an efficient yet user-friendly workflow solution.

In this post, we will explore the main concepts of Prefect and learn how to deploy a flow and configure a simple infrastructure.

What is Prefect?

Prefect is a Python-based workflow management system, which allows you to build, test, and run workflows with ease. It provides a simple and intuitive way to define and execute complex workflows, making it an ideal tool for data engineering, machine learning, and other data-intensive applications.

Prefect UI’s dashboard by prefect.docs

Main Concepts of Prefect

Prefect introduces essential concepts that lay the foundation for crafting and executing workflows.

Some of the main concepts include:

🧱 Tasks: Building Blocks of Workflows

Tasks are the building blocks of workflows. They represent individual units of work that need to be performed as part of a larger workflow. Tasks can be anything from simple Python functions to complex data transformations or machine learning models. You can simply add the @task decorator to a function for it to become a Prefect task.

@task
def extract(endpoint:str):
# Sends GET request to an endpoint
r = requests.get(endpoint)

return r.json() # returns response as json

🔄 Flows: Structuring Workflow Logic

Flows are the collection of tasks that make up a workflow. They define the dependencies between tasks and the order in which they need to be executed. Flows can be built using a simple and intuitive decorated Python function, which makes it easy to define complex workflows. The same convention goes for flows, so just decorate your function with @flow and you’re good to go.

@flow
def etl_pipeline():
# Calls tasks in desired order
json = extract(ENDPOINT) # Task 1
df = transform(json) # Task 2
response = load(df) # Task 3

if __name__ == "__main__":
etl_pipeline()

In the example above, the etl_pipeline flow orchestrates data extraction, transformation, and loading. The distinct tasks enhance observability throughout the workflow.

Example of Flow Run visualization by prefect.docs

By now, with this simple Python script, you can already run flows locally and take advantage of Prefect’s logging and observability capabilities. However, deploying and automating the execution of these flows requires Flow Deployment.

✅ Flow Deployment: Orchestrating Execution

A deployment is an entry-point to a flow which contains all information about its configuration. By deploying a flow, you’re saying to Prefect’s API what, how and where the flow will be run.

The configuration is all written in a yaml file, which works like a recipe. The process is simplified by using the CLI command prefect deployment build, which generates the configuration automatically.

$ prefect deployment build "./flow_file.py:flow_function"   \    # Indicates where's the flow function
--name "Deployment Name" \ # Deployment's name
--tag "tag-1" \ # You can add tags to it
--tag "tag-2" \ # Even more than one
--pool default-agent-pool \ # Selects the default pool
--work-queue default \ # Selects the default work queue
--infra process \ # Defines how the flow will run
--storage-block github/github-block \ # Prefect GitHub block
--cron "0 */4 * * *" \ # Defines frequency
--output "./deployment_config.yaml" \ # Where the YAML file will be saved
--apply # Automatically applies the deployment

Prefect’s API only stores metadata of your deployment, therefore it only pushes a flow run — an instance of a flow — to a Work Queue.

With numerous flow runs awaiting execution in the Work Queue, the next step is configuring Prefect Workers.

🤖 Workers: Enabling Work Execution

A worker is a lightweight polling service that get scheduled work from a work pool and execute them. They run on a target infrastructure and manage the execution of flows on that infrastructure. By default, Prefect’s Worker will poll for scheduled flow runs every 15 seconds.

Once the infrastructure is set up, initiate the worker with the following command:

$ prefect worker start            \
--pool default-agent-pool \ # Defines Work Pool to poll
--work-queue default # Work Pool can have multiple Queues

☁ Infrastructure: Execution Environment

Essentially, the infrastructure is where your flow runs are executed — your execution environment. It can be either locally, or in a cloud provider’s instance for example.

Different execution types are available:

  • Process: Executes within a new instance process.
  • DockerContainer: Runs within a Docker container.
  • KubernetesJob: Executes as a Kubernetes job.

By now, you can already have your flow scheduled by Prefect API and run it smoothly on the defined infrastructure!

But still, to further optimize your use of Prefect, consider utilizing blocks.

🗃 Blocks: Reusable Configurations

Blocks are Prefect’s way to abstract configuration and make them easily reusable. There’s a wide variety of blocks, just pointing out ones:

  • AWS Credentials: Manages AWS authentication.
  • Azure: Connects to various Azure services.
  • BigQuery: Queries data from BigQuery Warehouse.
  • dbt: Integrates dbt with Prefect.
  • Databricks: Manages Databricks credentials.
  • Slack, Teams, Email, etc.: Facilitates flow notifications.

This way, developers can efficiently reuse code components. In the example, the GitHub Storage Block retrieves the latest flow code from a project repository.

Deploying a Flow and Configuring Infrastructure with Prefect Cloud

Step 1: Execution Environment configuration

Establish an execution environment for running your flow. This could range from your local machine to an alternative environment.

Explore this 🔗 blog post to set up a Free-Tier EC2 instance using Terraform.

Step 2: Environment setup

Configuration depends on your chosen deployment type (Process, DockerContainer).

If you opt for a new process deployment, ensure essential dependencies are manually installed on your infrastructure. One approach could be sending requirements.txt through SSH.

Step3: Configure your Orchestration Environment (Prefect Cloud | Prefect Server)

While you can run your own Prefect Server, consider benefiting from the Personal pricing plan of Prefect Cloud. This plan offers one workspace, up to three users, and full access to core features. If you're using Prefect Cloud, generate an API key to log in to your execution environment (Step 1) using the command:

$ prefect cloud login

Step 4: Deploy Your Flow

At this stage, every script you run in your environment (while logged in) will have its metadata stored in Prefect Cloud, and that’s awesome.

The next step involves deploying your flow to the Orchestration Environment, which schedules flow runs for execution in the Execution Environment.

$ prefect deploy build my-flow.py:my_flow \
-n "my-deployment" \
-p default-agent-pool \
-q default \
-a

Step 5: Start the worker

On your Execution Environment (Step 1), initiate a worker with this straightforward CLI command:

$ prefect worker start

Conclusion

Prefect provides a simple and intuitive way to build and run workflows. Its key concepts, including tasks, flows, workers, and cloud infrastructure, make it easy to build and deploy complex workflows with ease. By following the steps outlined in this post, you can deploy a flow and configure its infrastructure in just a few simple steps, and make the most out of this tool to manage your workflows!

For additional insights and content, explore these other blog posts:

🔗 How to provision a Free-Tier EC2 using Terraform

🔗 Tracking Movie Tweets with Airflow: Data Engineering Project

You can also reach me:

🔗 Linkedin

🔗 GitHub

--

--