Beyond Scheduling: Event-Driven Flows with Prefect

Will Raphaelson
The Prefect Blog
Published in
6 min readJul 10, 2023

Forget about Time

Time is, generally speaking, weird. It’s the least-worst way to describe when something has occurred, is occurring, or will occur, but it’s kind of a messy abstraction. In the realm of data pipelines, it’s an abstraction, or a guess, of when something else has happened that demands a pipeline be executed.

Running a refresh pipeline every hour represents a guess that in the span of an hour, new data thats worth moving or displaying elsewhere was produced. Scheduling a pipeline to run weekly is similarly a guess that the supply of or demand for data changes about once a week.

But modern data practitioners need to take action on data as it is produced — emailing users as soon as they sign up, updating fulfillment statuses as soon as an order is placed, and displaying sensor data as soon as it makes a reading.

Event-driven architectures allow us to stop guessing — rather than guessing that some customer data came in in the past hour, we should just run the pipeline when new customer data comes in, immediately, with no extra steps and no educated guesses.

Time is, of course, useful — pipeline runs almost always cost money, and you might not want to spend money every time an event occurs — but it shouldn’t be the only way to say when something should run. Prefect makes event-driven data pipelines simple.

Event-Driven Flows with Kafka, Event Webhooks and Deployment Triggers

Imagine you have a web app, and luckily, people are signing up! You know that your onboarding email series is most effective if you send it immediately after signup.

In this post, I’ll wire up a system that subscribes to a Kafka topic with my new user signups, pipes those events into Prefect Cloud, and triggers the new user onboarding pipeline on receipt of those events. It assumes basic familiarity with Prefect Cloud. If you’re not quite there yet, check out our docs or hit me up in the Slack Community.

New User Signups → Prefect Events

When a new user signs up for my app, my app produces a message to a Kafka topic, which is an event store commonly used in distributed applications. I have a Kafka topic hosted on Confluent Cloud that contains these messages.

I know that I want to use a Prefect Flow to reach out to those new users as soon as possible after they sign up. To do this, I’m going to need to get these signup messages into Prefect, where I can observe and react to them.

To get messages from my topic into Prefect Cloud, we’re going to take advantage of Prefect Cloud event webhooks. Webhooks are an ultra-lightweight way to expose an endpoint that accepts HTTP traffic, and are used for simple integrations across the modern web. When an HTTP call is made to a Prefect webhook, it emits a Prefect event and captures data from the HTTP call for use downstream.

Using Confluent’s Httpsink connector, I can automatically hit a Prefect webhook every time a message appears in the topic.

So if I get a new user signup message in the form of:

"[
{
\\"registertime\\":1494287144756,
\\"gender\\":\\"OTHER\\",
\\"regionid\\":\\"Region_4\\",
\\"userid\\":\\"User_5\\"
}
]"

Kafka will fire out to Prefect and save the message as a Prefect event in real time. We use a Jinja template in our webhook configuration to do some light transformation — instructing prefect to construct an event name from the user id of the message:

A Prefect Cloud Event Webhook configuration modal

Upon save, we immediately see user events start streaming into Prefect Cloud

A Prefect Cloud Event Stream showing new user signup events

New User Events → Deployment Triggers

Now that I have my new user signups streaming into Prefect, it’s time to do something with them. As it happens (😉) I have the following Python code, which sends a welcome email, checks to ensure that the email didn’t bounce, and updates an internal marketing database. I’ve turned this code into a flow and my functions into tasks with Prefect decorators and deployed it to Prefect Cloud.

from prefect import flow, task

@task(log_prints=True)
def send_welcome_email(name: str="will") -> None:
print(f"Sending onboarding campaign to {name}!")
...
pass

@task(log_prints=True)
def check_for_bounce(email_id: int=12345) -> None:
print(f"checking for bounce on email {email_id}")
...
pass

@task(log_prints=True)
def update_marketing_tracker(email_id: int=12345):
print(f"updating marketing tracker for email {email_id}")
...
pass

@flow
def new_user_flow(name: str) -> None:
send_welcome_email(name)
check_for_bounce()
update_marketing_tracker()

if __name__ == "__main__":
new_user_flow(name="will")

I need this flow to run when a new user event hits my system, using the user id from the event payload as the input parameter to the flow. there are a number of ways to do this both in code and the UI, but the simplest way is to navigate to an event like the ones you want to use to trigger your flow run, and click automate.

This bootstraps a trigger definition that listens for customer events. Then we can proceed to the next step of the wizard to tell it what to go when the trigger fires.

A Prefect Cloud trigger configuration screen
A Prefect Cloud action configuration screen

You’ll notice some interesting syntax in the user_id field. Automations capture information from the event that triggered them, and you can use Jinja syntax to pass that information into your flow run at runtime. {{ event.payload.userid }} says to take the userid from the event we captured, and use that as the main parameter value for the flow. On creation, this automation is now linked to the deployment and visible on the deployment details page under “Triggers”.

Putting it all Together

On the creation of this link, flows start kicking off in response to the Kafka new user messages:

New user flow runs kicked off in response to events

And our flows and events tell the story of a new user signing up and getting their welcome email just seconds after their signup:

A new user flow run in action

This is just the tip of the iceberg on event-driven flows with Prefect. Triggers can fire on a lack of observed events, webhooks can use jinja to flexibly transform payloads into Prefect events from CloudEvents or custom calls, and you can specify trigger policies directly from deployments without having to create automations manually. Give event-driven flows a try and let me know what you think in the Slack Community!

--

--