Event-driven Data Pipelines with AWS Lambda, Prefect and GitHub Actions

Forget about sensors and DAGs. Run your flows based on events.

Anna Geller
The Prefect Blog
Published in
6 min readAug 29, 2022

--

Prefect is a dataflow coordination plane designed for a highly dynamic and rapidly changing world. Your workflows don’t have to be scheduled unless they need to — you can run them from anywhere, at any time and scale, for any reason. This post will look at how to implement scheduled and event-driven data pipelines. We’ll demonstrate how you can deploy such workflows to AWS Lambda with the serverless framework and GitHub Actions and how you can orchestrate and observe their execution with Prefect.

Getting started

To get started, use the following repository template. It contains everything required to build and deploy event-driven serverless dataflows with Prefect and AWS Lambda, including flows, Dockerfiles, and Infrastructure as Code.

Build your event-driven dataflow with Prefect

Within this repository, you should see five directories, one per flow. Among others, there is one directory for ML and another for a sample ETL flow. Each of those directories can be treated as an individual project with custom Python modules and a Dockerfile to separate dependencies for each flow. You can use that as a template and adjust it to your use case.

Scheduled and event-based dataflow execution with Prefect and serverless framework

One critical file in this repository is serverless.yml:

The below file defines how the serverless framework should deploy your project. It’s currently configured to build an individual Docker image per flow, and it allows you to determine when each function should be triggered:

  1. Based on schedule — it can be either rate (an interval) or a cron expression.
  2. Based on a custom event — an API call to some AWS service, which serves as a trigger for the serverless function’s execution.

Here, we define that the first function should run every minute, and the other one should run any time a new parquet file arrives in the prefectdata bucket under the folder timeseries. The existing: true (the last line) means that the given S3 bucket already exists — otherwise, the serverless framework would attempt to create that bucket for you.

Note: the serverless function doesn’t have access to the host machine other than the /tmp directory. This means that you need to set PREFECT_HOME to this directory in order for your flow to store results and function properly. You can do that by setting the PREFECT_HOME=/tmp/.prefect, as we do it here in serverless.yml.

Deploy to AWS Lambda with GitHub Actions

To make it easy for you to deploy your project, there is a GitHub Action workflow that you can manually trigger to automatically deploy your project to AWS and Prefect.

Before running it, make sure to adjust the Prefect Cloud workspace URL here and add the following credentials to the GitHub Actions secrets:

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • PREFECT_API_KEY

To do that, go to “Settings” → “Actions” → “New repository secret” and configure your secrets in the same way as in the image shown below:

Once you have configured your repository secrets, you can start the workflow by selecting the GitHub Action workflow and clicking on the green “Run workflow” button.

Under the hood, the serverless framework will spin up a CloudFormation template that will create all required resources, including AWS Lambda functions with event rules (one per flow), CloudWatch log groups, ECR repositories, IAM roles, and an S3 bucket for serverless deployment. If you want to clean up all those resources later, you can do so with a single command sls remove from your root project directory or by using this GitHub Actions workflow.

Once everything ran successfully, congrats! You’ve just deployed serverless with serverless!

Observe the results in Prefect Cloud UI

Given that some workflows are configured to run on a schedule every minute, you should be able to immediately observe your serverless function execution from your Prefect Cloud UI. In your flow run dashboard, you should see Marvin greeting you from the AWS Lambda:

The s3_event_flow is storing a new parquet file in an S3 data lake every minute:

And the s3_reactive_flow validates the data quality of every file that lands in the data lake as soon as any new file arrives:

Benefits of using serverless with Prefect

Using Prefect with serverless provides a wide range of benefits, including scalability and ease of use. Maintaining dataflows and the underlying infrastructure is as simple as committing your code and redeploying your project with a single click.

A critical benefit of using Prefect in this context is blocks. Blocks allow you to securely store credentials, configuration data, and arbitrary key-value pairs within your Prefect workspace.

Our current project includes two flows interacting with S3, using two Prefect blocks: JSON and Slack Webhook.

JSON Block to store configuration data

JSON blocks provide a convenient way of storing an arbitrary piece of information as key-value pairs. The s3_event_flow flow relies on a block called max-value, which holds a threshold value of 42.

You can create blocks directly from the UI. To do that, go to Blocks → New Block → JSON block:

If you set it to a different value than 42, then the validation checks in s3_reactive_flow will start failing, and Prefect will send you a Slack notification alerting you about the problem.

Slack Webhook block to store secrets and send alerts

To send those notifications, you need to configure a Slack Webhook block. Here is how this block can be set up from the UI:

The alerts sent by the flow leveraging the above block will look as follows:

And here is the flow run that generated that alert due to failed validation checks:

Summary

This is just one example showing how Prefect allows you to:

  • observe the state of your serverless functions,
  • use blocks to store custom parameter values,
  • use blocks to securely store sensitive information such as your Slack webhook token,
  • easily configure notifications,
  • orchestrate both scheduled and event-driven workflows with retries, caching, custom task runners, subflows, and more.

Cleanup

To clean up all AWS resources deployed as part of this project, run the command sls remove from your root project directory or use this GitHub Actions workflow. This will use the same CloudFormation stack to delete all resources configured with infrastructure as code.

Next Steps

We’ve just scratched the surface of what’s possible when using Prefect with AWS Lambda, serverless framework and blocks. If you have any questions about your event-driven or scheduled workflow use cases or want to ask anything about Prefect, you can reach us via our Community Slack.

Happy engineering!

--

--

Anna Geller
The Prefect Blog

DevRel, Data Professional, Cloud & .py fan. www.annageller.com. Get my articles via email: https://annageller.medium.com/subscribe