Prefect: A First Look

Prefect runs on Prefect, part 2

Published in
8 min readFeb 27, 2019

--

In part 1 of this post, I wrote about one of the ways that “Prefect runs on Prefect”: deploying a Slackbot named Marvin to manage our daily standup ceremony. In this sequel, I’d like to dive into how we did it, not sparing a single detail — you’ve been warned!

Hopefully, you’ll walk away from this post with a very strong sense of what it feels like to write Prefect code (spoiler alert: it’s very lightweight), along with some of the design patterns and customizations that Prefect allows.

Positive Engineering

Prefect is a tool for managing negative engineering. You write whatever code you want to run (the positive engineering), and Prefect helps by managing all the complexity of executing it robustly and managing failure states (the negative engineering).

To that end, let’s begin by considering the code we want to run, without involving Prefect at all. Consider a workflow that posts standup updates at 9:00am EST every weekday. Here’s all the code we need, organized into three functions — and remember, we aren’t using Prefect yet:

The positive code for our workflow

This code describes what we want to do. In order to run it, we just have to specify a few additional things, like how the functions “fit together” and depend on each other.

But in order to deploy it for robust execution, we have a number of outstanding challenges, including:

  • running on a schedule
  • running off-schedule for testing (or any other purpose)
  • providing sensitive configuration options like API tokens
  • monitoring the progress of our execution
  • maintaining a history of runs (to analyze performance over time)
  • collecting logs and making them available
  • automatically retrying in case the Google or Slack connections go bad
  • sending alerts on failure (or any other state transition)
  • automatically versioning updates via CI / CD
  • and quite a few others, as any data engineer can attest…

Adding these features to our code will overwhelm our simple, readable, positive workflow with negative infrastructure. It’s unlikely that the resulting mess will be comprehensible to our colleagues, and modifying the workflow in the future will be difficult. However, any one of these features would greatly improve our confidence that our code can run robustly.

Enter Prefect

Prefect allows us to gain all of the above properties (and more) with very little modification of our positive code. We use a small number of hooks to convert each function to a Prefect task and then combine those tasks into a flow.

Each task represents a discrete unit of work. At a high level, a task is basically a function: it optionally receives inputs, does a thing, and optionally returns outputs. Regardless of whether a task generates data or not, metadata about its state is always passed to downstream tasks. This allows tasks to react to the state of upstream tasks. In this way, Prefect enables many modes of operation, including in-memory data pipelines (tasks directly pass data to each other) and out-of-core data manipulation (tasks work with remote data or services and pass only information about whether they succeeded to each other). In addition, individual tasks have customizable settings for when they should run, whether they should retry, callback hooks, and many other parameters.

Since we organized our original code into three functions, we’ll use those same three functions as tasks. We’ll use Prefect’s functional API to build up a flow that wires these tasks together. Flows have many of the same customizable features as tasks, including callbacks and schedules, but — like everything in Prefect — also have sensible defaults.

Enough talking! Here’s what our production-ready Prefect workflow looks like. This is the exact same code as before, but with Prefect added. I’ve embedded a screenshot so I could highlight the new lines (feature request, Medium Staff!), but you can access the full gist here.

Our full workflow, highlighted to show Prefect-specific additions. Here’s a gist containing this code.

That’s… it.

The first and most important thing to point out is how little code I had to add to implement a full negative engineering solution. Don’t worry, there’s a fully-imperative class-based graph lurking in the background, and if you want to instrument every detail of your Prefect workflow you absolutely can. But we’ve worked hard to make sure you don’t have to!

For so little code, we touch a lot of Prefect features in this example.

Tasks

The @task decorator is a factory for converting Python functions into Prefect Tasks, and is just one of many ways they can be created. Prefect Tasks can also be built as classes with a particular interface, or you can avoid writing your own boilerplate altogether by taking advantage of our growing task library.

Schedules

Most workflow systems are beholden to schedules to the point that they have difficulty running jobs ad-hoc. Prefect flips that on its head: workflows are designed to be run at any time, for any reason, with any concurrency. A schedule is nothing more than a way to tell our system how to plan ahead, and you can mix scheduled runs with ad-hoc runs at any time.

Functional API

I mentioned our “Functional API” earlier. Practically, this means that we can build a flow by calling tasks just as if they were functions. Prefect monitors those calls and builds up the dependency graph in the background; none of our positive code gets executed. The functional API is most useful when tasks pass information to each other; if you simply want a task to run after another task, we have simple hooks for creating those types of dependencies as well (in our imperative API, we call that a “state dependency”, as opposed to a “data dependency”).

Testing

At this point, we could execute our flow by calling flow.run(). This would use the exact same workflow engine as the production version, but keep everything in the local process — allowing us to use whatever debugging tools we prefer to check execution. A simple config setting promotes the engine to its distributed mode.

Note that each task is still a unit-testable function. Even the parts of Prefect that appear to depend on our Cloud platform (like Secrets) have local hooks for unit testing.

Parameters

We create a Prefect Parameter to represent the standup channel. Parameters are essentially flow “arguments” that can be supplied at runtime.

Secrets

Prefect Cloud uses a concept called Secrets to pass… secret… information to tasks at runtime. In this example, an admin could create a secret for the Slack API token, and the value is only available during a production run.

Callbacks

Flows and tasks can have optional “state handlers” that allow user-customizable behavior any time they change state. Using state handlers to respond to failure is so common that we’ve exposed it as an optional on_failure keyword argument. Sometimes we use this to fire off Slack messages or emails for high-priority issues.

After we’ve tested and are confident in our flow, all that’s left is to deploy it to Prefect Cloud!

Flow Deployment

To deploy our flow above, we need to provide one additional piece of data: the flow’s “environment”. An environment is a Prefect object that describes exactly how and where to run a flow. For our purposes, a Docker container environment works well. It allows us to easily describe our Python dependencies, additional files, and environment variables, and automatically takes care of building and pushing the image.

Our environment can be specified as follows (I warned you, no detail will be spared!):

When we call flow.deploy, Prefect does a few things: the environment is “built” (in this case, a container is built and pushed to our registry), and the flow is registered with the Prefect Cloud platform. We’ve specified that it should live in our “marvin” project. If a previous version of our flow were found in that project, it would automatically be versioned and archived. This deploy process could easily be hooked into a CI/CD pipeline.

Advanced Execution Logic

Our use case above involved rather simple execution logic: each task runs if the previous task succeeds. However, there are many situations in which we want tasks to skip, or run only if certain things fail, or even run no matter what. To illustrate this, let’s walk through pieces of a different workflow that sends standup reminders (heavily featured in my previous post).

Recall the goal of the reminder workflow: 30 minutes prior to standup, updates are pulled (using the exact same logic from get_latest_updates above), and any user who has not already provided an update is sent a Slack reminder to do so. This logic is achieved in Prefect using two concepts: mapping and skipping.

Here is the complete code for our Prefect workflow. As before, you’ll notice it looks an awful lot like plain old Python:

This may look straightforward, but we’re doing some very advanced stuff. Let’s walk through some examples.

Mapping

Mapping lets us dynamically spawn new tasks at runtime. Just as a function can be mapped over each item of a list, a new version of our mapped task is created for each item returned by the upstream dependency.

In this case, the get_team task returns a list of Prefect employees, and we are dynamically generating an is_reminder_needed task for each one. Furthermore, we are extending our dynamic per-user pipelines with a mapped send_reminder task.

It’s worth mentioning that when we map a task, we are NOT just calling its function multiple times. Instead, each dynamically created task is a true Prefect task that can run, succeed, fail, or retry completely independently of its neighbors. Mapped “children,” as we call them, can be queried and worked with just like any other task.

Additionally, observe that we marked one input as unmapped. This tells the system to treat that task result as a constant input, and not to attempt to iterate over it.

Signals

Prefect does its best to figure out whether your tasks worked or not: if raise an error, they FAIL; if they don’t, they SUCCEED.

Sometimes, however, users need greater control over task states. Prefect uses a concept called “signals” to adjust task behavior. In our case, each mapped instance of the is_reminder_needed task raises a SKIP signal if an update has already been provided. This tells Prefect that execution should stop and the task should be placed into a Skipped state (with an optional message and even a result).

Because downstream tasks always receive metadata about upstream dependencies, each send_reminder task will also enter a Skipped state if its upstream is_reminder_needed task skips. Our final task, report, disables that behavior by setting skip_on_upstream_skip=False. It will run even if its upstream tasks skip.

Reference Tasks

Our flow’s terminal task is report. However, the success of the report task is unimportant to the true objective of our flow, which is to send reminders. Consequently, we set the collection of mapped reminders (stored in the result task) as the “reference tasks” for this flow. This means that the determination of whether the flow is considered successful will be based entirely on whether those reference tasks were successful.

This pattern is common in jobs that require some “cleanup” upon failure. In a system where terminal tasks determine the workflow state, a successful cleanup would be incorrectly reported as a successful flow! In Prefect, we can ensure that the flow’s success depends only on the appropriate tasks.

This is an advanced use case and like everything else in Prefect, reference tasks do have a common-sense default: the terminal tasks.

What will you build?

There are many more features that we didn’t cover here; a non-exhaustive list would include proper state-change notifications, if/else control flow logic, task triggers, and the ability to manually pause and resume Flow execution.

Regardless, I hope that this has given you a concrete idea of what Prefect is all about. The use cases presented here are simply some of the many diverse things you can do with Prefect. Managing your workflows shouldn’t be hard.

Stay tuned — follow us on Twitter!

--

--