Event-driven workflows with AWS Lambda

Christopher White
The Prefect Blog
Published in
3 min readOct 28, 2019

--

One of Prefect Cloud’s major features is an expressive GraphQL API. You can query, filter, and sort for any information about your workflows, from run histories to internal DAG structures. You can also take actions via GraphQL mutations. This post provides a walkthrough of setting up an AWS Lambda function for triggering Prefect Cloud flow runs using the Cloud GraphQL API.

If you want to dive right in, all code from this post is available in this public GitHub repository.

The basic setup

Suppose we want to run an ETL job anytime new data lands in one of our S3 buckets in AWS. Instead of setting up a long-running polling process to watch for changes, we can instead create an AWS Lambda function which is called anytime a PUT event occurs on the bucket. (This scenario is merely one of many that could justify an AWS Lambda, and nothing we present here is unique to this particular trigger!)

Setting this up is as simple as:

  • creating a new Lambda function with Python 3.6 runtime
  • setting the appropriate trigger event
  • copy / pasting the following code snippet for your Lambda function code:
Template for an AWS Lambda function which triggers Prefect Cloud flow runs; code available at https://github.com/cicdw/prefect-cloud-lambda

The only configuration required for this to start working immediately are the following three environment variables:

  • PREFECT__FLOW_ID: the ID of the Flow you wish to run with each event
  • PREFECT__CLOUD__API: the API URL for Prefect Cloud (https://api.prefect.io)
  • PREFECT__CLOUD__AUTH_TOKEN: a Prefect Cloud API token

Enhancements

Depending on the nature of the triggering event and the workflow, Prefect allows you to take this much further than simply creating a Flow Run.

Parameters

Prefect allows for parametrized workflows as a first-class pattern. Parameters are a special type of task whose output can be specified at runtime (this pattern is supported because Prefect allows for data to be exchanged between tasks). The only requirement for parameters is that their output be JSON serializable objects. In our current context, we might want to send information about the triggering event as a parameter to our Prefect flow. For example, this would allow us to have a single flow that could process information coming from many different buckets — simply provide the bucket name as a parameter!

Example of altering the GraphQL inputs to pass parameters

Idempotency keys

There are situations in which we expect an event to result in multiple, duplicated triggers. For example, perhaps we have a flow that responds to any and all actions performed on an S3 bucket and we want to consider groups of actions which occur in a short timeframe as a single triggering event. Prefect allows for users to hedge against this scenario through the use of “idempotency keys” on flow runs. When provided, Prefect ensures that one and only one run is created per key. Just as with parameters, providing an idempotency key is simple:

Example of altering the GraphQL inputs to pass an idempotency key

Flexible Start Times

When creating ad-hoc flow runs, there is no requirement that your flow should begin immediately. Users can create ad-hoc runs far in the future if their use case requires it. To do so, simply provide your desired scheduled start time as an ISO formatted timestamp:

Example of altering the GraphQL inputs to pass an altered start time

Conclusion

Here we have presented all of the pieces users need for triggering workflows in Prefect Cloud through AWS Lambda. As you can see, this is as simple as making an authenticated POST request to the Cloud GraphQL API — anywhere you can make such a request can serve as a trigger for Cloud flow runs!

--

--