Prefect Runs on Prefect
Every morning, Marvin runs standup for our team. That’s not unusual for a tech startup, except that Marvin is actually a Prefect workflow.
Update: Part 2 of this post, including all code, is available here.
At Prefect, we run our standup ceremony asynchronously through Slack, completely managed by our favorite internal bot: Marvin. For those unaware, “standup” is typically a short daily meeting in which a team describes what they worked on the day before and plan to work on today. To enable that, Marvin is responsible for collecting user updates (via a Slack slash command), reminding users who haven’t provided an update to do so, and ultimately posting all of the updates in our
#general Slack channel at a certain time. This setup is an important part of our effort to avoid unnecessary formality while still facilitating a culture of transparency.
Marvin has become an important member of our team. He began life as a playground for testing out new ideas and infrastructure, as he gave us a low-risk way to familiarize ourselves with novel technologies and also allowed everyone on our team, including non-developers, to contribute on a fun project via opening issues and suggesting new features. He quickly graduated to become a key component of Prefect’s internal processes. Using Marvin for standup was one of our earliest goals, but we had to write a lot of custom code to make that possible: schedulers, web-servers, database interactions, etc.
As the Prefect Cloud platform came online, it became apparent that our standup routine was a perfect match for an advanced Prefect workflow: a regularly-scheduled series of dependent tasks involving conditional logic, parameterized inputs, and dynamic scope (as we add new teammates). We began to view standup as a key internal use case: if it was easy to migrate our existing custom code to Prefect, then we’d have achieved many of our product goals.
Standup is also a great working example for our team because we’re all consumers of it. It provides a rare opportunity for everyone — from engineering to design — to work with data that’s meaningful to them.
Even better, eating our own dog food is the best — and possibly only — way to ensure that we are keeping our promises to our users. I can’t stress the benefits of doing this enough:
- it enhances customer empathy, which is crucial for building useful software.
- it sparks discussions about security and reliability, because we’re going to put critical sensitive data in the system before we ask anyone else to.
- it creates a relatively low-pressure environment for debugging issues
The number of tiny improvements you’ll make the moment you undertake this is astonishing. Within the first day of undertaking this project, we had made enhancements to improve our logs, simplify our API for common actions, and update user config settings to prevent anyone from accidentally publishing sensitive data.
To shed some more light on the functionality that Prefect provides, let’s walk through our standup problem, why Prefect is well-suited to solve it, and how we can achieve it with Prefect. In Part 2 of this post we’ll go through the actual code that runs our standup, for those who want a deeper look under-the-hood.
We seek to create two workflows, each with its own daily schedule:
- The “reminder” workflow: at a certain time every morning, prior to the scheduled standup time, remind any user who hasn’t provided an update to do so.
- The “standup” workflow: at the scheduled standup time, collect all user updates and post them to a provided Slack channel. This flow is more straightforward than the reminder flow.
Note that both of these flows will also require logic for determining which standup update is “active” at any given time. We use Google Cloud Firestore to store the user updates (for persistence and to let us revisit standup history).
This problem has many features that make it well suited for Prefect:
- a regularly-scheduled set of jobs with multiple task dependencies
- conditional workflow logic (we can skip sending a reminder if someone has already provided an update)
- dynamic scope (we don’t know what employees we’ll have in the future, but we want to include them automatically)
- “mapping” (repeating the same tasks for each user)
- secure communication with an external service and database
- requires sensitive secrets such as Slack tokens and Google Application Credentials
We will utilize the following Prefect concepts to implement our workflows:
Tasks represent individual modular units of work. While some frameworks prefer monolithic tasks, Prefect encourages tasks that are small enough to represent discrete logical steps of a workflow. Therefore, we will need:
get_standup_datetask for determining the active standup date. For example, if this Flow was manually executed off-schedule on a Saturday afternoon, the next active standup date is Monday morning.
is_reminder_neededtask that decides if a team member has provided an update yet; if a team member has already provided an update, we use a Prefect tool called “signaling” to indicate that the task should skip.
send_remindertask that — you guessed it — sends a reminder to someone that they need to publish an update. This task will depend on the outcome of the
is_reminder_neededtask, which means we don’t need to write an explicit
if/elseclause. If the upstream task skips, so will this one! This is precisely the sort of “workflow logic” that Prefect takes care of for you. (As with almost everything in Prefect, this behavior is configurable, though it has a common-sense default.)
post_updatestask that posts all available updates to a specific Slack channel (specified via a Prefect
get_teamtask pulls the list of team members currently participating in standup from Google Cloud Firestore; this task will return a list of team members which the
is_reminder_neededtask will map over.
Parameters are a special type of Task that represent values that will be provided at runtime. In our case, we create a Parameter called
standup_channel which stores the Slack channel that standup updates will be posted to. It defaults to
#general for scheduled runs but can be changed (e.g., to
#testing) when running the flow off-schedule during local testing.
Flows describe how all the tasks fit together. This includes how the tasks depend on each other and what state or data (if any) needs to be transferred between tasks. For example,
is_reminder_needed will determine if a given user has already provided an update and enter a
Skipped state if so. The flow is responsible for passing that information to the downstream task.
We will combine all of our tasks into a flow that represents the workflow logic.
Functional API for mapping
Prefect has a very flexible
map operator. While mapping can be used in a more traditional sense to parallelize any task, we commonly use it in Prefect to generate dynamic pipelines for inputs of unknown length.
For example, we know that we want to run our
send_reminder tasks for every member of the Prefect team, but we obviously don’t know what the team will look like in the future! Instead, we use a
get_team task to return a list of every Prefect employee and
is_reminder_needed task over that result. At runtime, Prefect will automatically generate a copy of the
is_reminder_needed task for every employee in the
It gets better: if we
send_reminder task over the (already-mapped)
is_reminder_needed task, Prefect will automatically build a dynamic multi-task pipeline for each user.
This functionality goes well beyond a typical “map” in which a function is merely called multiple times: Prefect’s
map creates first-class task objects that can succeed, fail, retry, and skip completely independently of the others. Therefore, my dynamically-generated
send_reminder task can be skipped while Jeremiah’s dynamically-generated
send_reminder task runs successfully.
As the author of this workflow, this means that I only have to write two tasks (
is_reminder_needed), and they only need to operate on a single user. This makes it much simpler to design and test the logic. Prefect takes care of scaling that to as many employees as our company has.
Prefect takes security seriously, and we hate the idea of embedding sensitive information in your workflows as much as you do. When using Prefect Cloud, users can create an encrypted key / value pair called a “Secret”. Any appropriately-permissioned task can request the value of a Secret by making a simple function call. This means that workflows only need to know the name of a secret; the value can be provided at runtime.
The workflows discussed here need a Slack API token to communicate via Slack, which we store as a Secret.
Attached to each Flow is an “Environment” which is, essentially, a description of “how to run this flow”. Most flows use a
ContainerEnvironment which contains all the information needed to build — and later retrieve — a Docker container capable of running the flow.
In our case, we need to customize the environment to include a Google Application Credentials file copied from our local machine, as well as an environment variable which points to this file so that we can access Google Cloud. We do this all within Prefect, which also builds and pushes the container to a registry of our choosing.
Additionally, each Flow has a Prefect “Schedule” object; this object encapsulates the logic for when this Flow should run. For our current situation in which we want to run our Flows every weekday at fixed times, a simple Cron string will suffice. In Prefect, having a bonafide Schedule object allows users to test their scheduling logic prior to deployment (more on this in Part 2).
At the end of the day, our setup for the daily reminder Flow looks like this:
To be honest, “running standup through Slack” was not on our original list of Prefect use cases. It certainly doesn’t feel like a traditional data engineering problem. And yet, it turns out to share a surprising number of qualities with the usual data suspects. We were pleased to discover that Prefect’s vocabulary described our standup workflow without any modification. In fact, implementing it in Prefect provided many concrete benefits:
- early adoption of our own product immediately provided usability enhancements
- our previous implementation stored user updates in an in-memory dictionary that would vanish on error or system failure. Prefect encouraged us to be better about designing robust processes.
- our previous implementation required PRs every time we added a new employee, because we had to update the input dictionary that kicked off the flow with their user name and Slack handle. Our Prefect implementation pulls that information from a user database, so all we have to do is update the database and everything continues to work as expected.
- we suddenly have “standup analytics”: we can use our GraphQL API to query the execution history and determine e.g., how many users need reminders on average — if there’s a lot, maybe we should push the time back!
- debugging issues is more pleasant now; our UI shows precisely which task failed, and we can look up what was returned from each task when the failure occurred to recreate the problem locally.
In this post, I wanted to describe a real problem we encountered and how we used Prefect to solve it. Our use case was “non-standard” in a way, but what data problem isn’t non-standard? Prefect works very hard to be as use-case agnostic and customizable as possible, while still delivering on its main promise: being a robust tool for building and executing arbitrary data workflows. Hopefully this has shed some light on how Prefect is architected and the novel features it exposes.
If this high level walkthrough didn’t satisfy you, check out Part 2. The next post walks through the actual code required to implement the standup workflows so you can see how lightweight it really is…