Event-driven Data Pipelines with AWS Lambda, Prefect and GitHub Actions
Forget about sensors and DAGs. Run your flows based on events.
--
Prefect is a dataflow coordination plane designed for a highly dynamic and rapidly changing world. Your workflows don’t have to be scheduled unless they need to — you can run them from anywhere, at any time and scale, for any reason. This post will look at how to implement scheduled and event-driven data pipelines. We’ll demonstrate how you can deploy such workflows to AWS Lambda with the serverless framework and GitHub Actions and how you can orchestrate and observe their execution with Prefect.
Getting started
To get started, use the following repository template. It contains everything required to build and deploy event-driven serverless dataflows with Prefect and AWS Lambda, including flows, Dockerfiles, and Infrastructure as Code.
Build your event-driven dataflow with Prefect
Within this repository, you should see five directories, one per flow. Among others, there is one directory for ML and another for a sample ETL flow. Each of those directories can be treated as an individual project with custom Python modules and a Dockerfile to separate dependencies for each flow. You can use that as a template and adjust it to your use case.
Scheduled and event-based dataflow execution with Prefect and serverless framework
One critical file in this repository is serverless.yml
:
The below file defines how the serverless framework should deploy your project. It’s currently configured to build an individual Docker image per flow, and it allows you to determine when each function should be triggered:
- Based on schedule — it can be either
rate
(an interval) or acron
expression. - Based on a custom event — an API call to some AWS service, which serves as a trigger for the serverless function’s execution.
Here, we define that the first function should run every minute, and the other one should run any time a new parquet file arrives in the prefectdata
bucket under the folder timeseries
. The existing: true
(the last line) means that the given S3 bucket already exists — otherwise, the serverless framework would attempt to create that bucket for you.
Note: the serverless function doesn’t have access to the host machine other than the
/tmp
directory. This means that you need to setPREFECT_HOME
to this directory in order for your flow to store results and function properly. You can do that by setting thePREFECT_HOME=/tmp/.prefect
, as we do it here inserverless.yml
.
Deploy to AWS Lambda with GitHub Actions
To make it easy for you to deploy your project, there is a GitHub Action workflow that you can manually trigger to automatically deploy your project to AWS and Prefect.
Before running it, make sure to adjust the Prefect Cloud workspace URL here and add the following credentials to the GitHub Actions secrets:
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
PREFECT_API_KEY
To do that, go to “Settings” → “Actions” → “New repository secret” and configure your secrets in the same way as in the image shown below:
Once you have configured your repository secrets, you can start the workflow by selecting the GitHub Action workflow and clicking on the green “Run workflow” button.
Under the hood, the serverless framework will spin up a CloudFormation template that will create all required resources, including AWS Lambda functions with event rules (one per flow), CloudWatch log groups, ECR repositories, IAM roles, and an S3 bucket for serverless deployment. If you want to clean up all those resources later, you can do so with a single command sls remove
from your root project directory or by using this GitHub Actions workflow.
Once everything ran successfully, congrats! You’ve just deployed serverless with serverless!
Observe the results in Prefect Cloud UI
Given that some workflows are configured to run on a schedule every minute, you should be able to immediately observe your serverless function execution from your Prefect Cloud UI. In your flow run dashboard, you should see Marvin greeting you from the AWS Lambda:
The s3_event_flow
is storing a new parquet file in an S3 data lake every minute:
And the s3_reactive_flow
validates the data quality of every file that lands in the data lake as soon as any new file arrives:
Benefits of using serverless with Prefect
Using Prefect with serverless provides a wide range of benefits, including scalability and ease of use. Maintaining dataflows and the underlying infrastructure is as simple as committing your code and redeploying your project with a single click.
A critical benefit of using Prefect in this context is blocks. Blocks allow you to securely store credentials, configuration data, and arbitrary key-value pairs within your Prefect workspace.
Our current project includes two flows interacting with S3, using two Prefect blocks: JSON and Slack Webhook.
JSON Block to store configuration data
JSON blocks provide a convenient way of storing an arbitrary piece of information as key-value pairs. The s3_event_flow
flow relies on a block called max-value, which holds a threshold
value of 42.
You can create blocks directly from the UI. To do that, go to Blocks → New Block → JSON block:
If you set it to a different value than 42, then the validation checks in s3_reactive_flow
will start failing, and Prefect will send you a Slack notification alerting you about the problem.
Slack Webhook block to store secrets and send alerts
To send those notifications, you need to configure a Slack Webhook block. Here is how this block can be set up from the UI:
The alerts sent by the flow leveraging the above block will look as follows:
And here is the flow run that generated that alert due to failed validation checks:
Summary
This is just one example showing how Prefect allows you to:
- observe the state of your serverless functions,
- use blocks to store custom parameter values,
- use blocks to securely store sensitive information such as your Slack webhook token,
- easily configure notifications,
- orchestrate both scheduled and event-driven workflows with retries, caching, custom task runners, subflows, and more.
Cleanup
To clean up all AWS resources deployed as part of this project, run the command sls remove
from your root project directory or use this GitHub Actions workflow. This will use the same CloudFormation stack to delete all resources configured with infrastructure as code.
Next Steps
We’ve just scratched the surface of what’s possible when using Prefect with AWS Lambda, serverless framework and blocks. If you have any questions about your event-driven or scheduled workflow use cases or want to ask anything about Prefect, you can reach us via our Community Slack.
Happy engineering!