The Prefect Blog
Published in

The Prefect Blog

Serverless Real-Time Data Pipelines on AWS with Prefect, ECS and GitHub Actions

A guide to fully automated serverless real-time data pipelines

Most data platforms these days are still operated using batch processing. Even though streaming technology matured, building automated and reliable real-time data pipelines is still difficult and often requires a team of engineers to operate the underlying platform. But it doesn’t have to be that way. We wrote about it already last year.

In this post, we’ll get more hands-on. You’ll see how to turn any batch processing Python script into a real-time data pipeline orchestrated by Prefect. We’ll deploy the real-time streaming flow to a serverless containerized service running on AWS ECS Fargate — all resources will be deployed with Infrastructure as Code (leveraging CloudFormation), and the deployment process can be triggered with a single click from a GitHub Actions workflow.

With a CI/CD template, we’ll then ensure that future changes will be automatically redeployed with no manual intervention and no downtime.

Table of contents:1. Why Prefect 2.0 for real-time data pipelines?
Drawbacks of batch processing
Opinion: why you likely don’t need a distributed message queue
How can Prefect 2.0 handle such low-latency real-time workflows?
Benefits of moving towards real-time workflows with Prefect 2.0
Why can I not just run a single DAG 24/7?
2. Demo time!
Prerequisites
Typical batch-processing flow
Turn it into a streaming service
What if something goes wrong?
3. Getting value from real-time: take automated action!
Using Prefect Blocks to store key-value pairs
Validate the data
Conclusion on a local demo
4. Deploy the real-time data pipeline as a serverless container
Configure repository secrets
Deploy the entire infrastructure in a single click
Observe the real-time data pipelines in your Prefect UI
5. Automate future deployments with CI/CD
Making changes to the code
(Optional) One flow run gets stuck in a Running state
What happens when there are infrastructure issues?
Limitations of the approach presented in this post
6. Clean up the resourcesNext steps

1. Why Prefect 2.0 for real-time data pipelines?

Drawbacks of batch processing

While batch processing minimizes operational complexity, its drawback is that you can’t do anything with that data until it lands in your data warehouse or data lake. This might take from several minutes to hours or days.

But what if you want to take action immediately without waiting for slow-moving batch processing DAGs? What if you want to continuously replicate data from source systems to your staging area in real-time? Or if you want that your dbt transformations are continuously rebuilt as soon as raw data lands in your staging area? And what if you want to continuously stream metadata changes to a platform such as OpenMetadata?

To solve those problems, many engineers turn for help to distributed message queues such as Kafka, but such tools don’t lend themselves well to the Modern Data Stack and to (often data science-driven) Python-based workflows. And even if you get such a distributed platform to work, the operational complexity of maintaining a separate streaming platform is high and requires changing how your work. Is there a middle ground? Yes, there is! Say hi to Prefect 2.0 with the Orion engine.

Opinion: why you likely don’t need a distributed message queue

Using tools such as Kafka for real-time streaming use cases may be helpful if your data streams must be simultaneously processed by multiple consumers. However, if your only consumer is your data warehouse (your single source of truth), and all downstream workflows consume data from there, distributed message queues are redundant. Instead, you can build a real-time streaming service for each producer (e.g., one per source system). This service can continuously load new data streams (e.g., via Change Data Capture) into your data warehouse, data lake, or time-series database.

Using Prefect to orchestrate real-time data streams that get stored in your data warehouse can significantly simplify your architecture and eliminates the friction involved in maintaining redundant systems.

Why not have both? If you already stream your Change Data Capture events to Kafka using e.g. Debezium, you still need a consumer that will pick up messages from Kafka and load it into your data warehouse. This consumer could be a Prefect flow, as described in the rest of this post.

How can Prefect 2.0 handle such low-latency real-time workflows?

Most workflow orchestrators allow only slow-moving workflows — DAGs scheduled hourly, or every 15 minutes (if you are brave enough). They also require constant redeployment of infrastructure, which adds unnecessary latency. If you have to redeploy a Kubernetes job or ECS task and pull the container image from a registry with every pipeline run, you can’t build any real-time data flow. The latency of spinning up the infrastructure required for each workflow run already turns it into a batch process.

Benefits of moving towards real-time workflows with Prefect 2.0

If you instead leverage a more performant dynamic orchestration engine such as Prefect 2.0, you don’t need to schedule your workflows. Instead, you can run them as a service and use the Prefect orchestration engine (and the Prefect UI) to observe the state of each run, and get alerted if (or rather when) something goes wrong. This way, the orchestrator is used not just for scheduling but for monitoring and observability — it’s most useful in case of failure. When things run smoothly, you don’t have to care about the orchestrator, but when any real-time data pipeline run fails, it will:

  • alert you that the failure occurred,
  • it will tell you when and why it happened,
  • and will help you fix that and easily redeploy a change.

With those capabilities, Prefect 2.0 provides a unified coordination plane to orchestrate dataflow and observe the state of your data platform — you can use it to coordinate batch, real-time streaming, as well as event-driven workflows and applications, giving you visibility into all moving parts in one place.

Why can I not just run a single DAG 24/7?

  1. Because something inevitably goes wrong. If this single run fails, you would need manual intervention to restart that workflow. In contrast, with the approach presented in this post, a failure of a single run doesn’t affect other flow runs.
  2. Because you lose observability — the main reason for using an orchestrator is to know what is the state of your workflows. In the setup with a single workflow run being on 24/7, your UI will always display a single running instance. You don’t know how long each run and each task within that run took. To find out why some flow run has failed, you would need to dive into a massive amount of logs collected over the entire time period to find a single log message about what happened. In contrast, having one flow run per streaming iteration (as you’ll see in a demo shortly) allows you to quickly find out when something fails and gives you fine granular visibility into the state of every flow run and task run, including their runtimes, logs, and their outputs.
  3. Because of operational complexity — such a single long-running workflow run would be a single point of failure, and it would be hard to redeploy and scale. In contrast, the approach we demonstrate here makes it easy to redeploy changes, scale the amount of CPU and memory when your data volume grows, and retain the logs and observability of previous iterations. You can track which code artifact (e.g., the Docker image tag, or ECS task revision) was used with each run, and you can redeploy the streaming service with no downtime. Even configuring log retention is much easier that way.

2. Demo time!

Talk is cheap, let’s get hands-on. The entire repository with the Prefect flow, Dockerfile, CloudFormation templates, and GitHub Actions workflows is available here:

You can clone the repository and use it as a template for your streaming flows.

Prerequisites

To follow this demo, you need to:

  1. Install Prefect 2.0
  2. Sign up for Cloud 2.0, create a workspace, generate an API key, and log into that workspace from your local terminal — the getting started documentation is available here
  3. Sign up for GitHub and clone the above repository prefect-streaming
  4. Sign up for AWS — you need an AWS account with an IAM user that has permission to interact with S3, CloudFormation, ECR, and ECS (and if you want to follow the data lake example flow, also Glue and Athena).

Typical batch-processing flow

A typical batch-processing flow includes workflow steps to extract, transform and load data in whatever order you prefer (ETL, ELT, reverse ETL). Here is an example of a typical pipeline, extracting, transforming, and loading data to an S3 data lake table (full code):

Turn it into a streaming service

In order to turn that workflow into a real-time streaming service, you need to add a while loop to the main function:

If you run this script locally, you’ll start seeing a bunch of flow runs triggered every couple of seconds. Each run can be individually inspected in the Prefect UI:

You can see the state of each run, the duration of each flow and task run, the runs triggered over time, and which of them failed (the red dots in the scatterplot).

What if something goes wrong?

To demonstrate a failure, let’s temporarily remove our AWS credentials. You can inspect the failed run directly from the Prefect UI:

We can easily find out what was the root cause of the error:

This error could have been:

  • a failure in your dbt model,
  • a database connection timeout,
  • a failed API request,
  • network disruption, etc.

Troubleshooting such errors would be much more difficult using distributing message queues or using a single DAG running 24/7. In contrast, identifying and fixing errors in real-time data pipelines orchestrated with Prefect 2.0 is easy. And those beautiful logs in the UI are a nice bonus!

3. Getting value from real-time: take automated action!

Doing real-time for the sake of getting more frequent data refreshes in a dashboard doesn’t add that much value. Real-time streaming becomes more important once you take automated actions based on some state of the world. Imagine that you want to get immediately notified via Slack when a Bitcoin price drops below a certain threshold. Here is a pseudo-code showing how we may extend our flow to take automated action in real-time (actual code):

This flow looks up the current price threshold and compares the current price with that threshold. If there is a dip in price, we send a Slack alert that may look as follows:

Using Prefect Blocks to store key-value pairs such as price threshold

Blocks allow you to securely store credentials, configuration data, and arbitrary key-value pairs within your Prefect workspace.

To create a new block, go to the sidebar and select “Blocks”. Then, add a new Block of type String. Enter a name and a price threshold value and click on Submit.

That’s everything you need in order to store this value and look it up at runtime! The benefit of it is that you don’t need to redeploy the entire service to adjust your desired price threshold — you can do that directly from the Prefect UI by editing the Block and saving it with a new value:

Validate the data

To validate that real-time data gets inserted into an Athena data lake table continuously, let’s run a query:

Conclusion on a local demo

This concludes the demo of how to run the workflow locally. You now have a continuously-running data pipeline, which simultaneously:

  • updates an S3 data lake table with real-time price updates,
  • takes action in real-time to alert you once something you care about happens (here: when the price falls below a certain threshold).

4. Deploy the real-time data pipeline as a serverless container with AWS ECS Fargate

Now that the streaming service is ready, we can package and deploy it. Sounds scary? Don’t worry! All resources can be deployed with a single click. All you have to do is:

  • build flows that add value to your business by taking action in real-time,
  • save credentials required to communicate with Prefect Cloud and AWS,
  • commit and push your code.

Configure repository secrets

In order to run this workflow from GitHub Actions, make sure to add your Prefect and AWS credentials as repository secrets:

You may use the command prefect config view in your terminal, which will give you the values you need:

💡 If your terminal is still not authenticated with Prefect Cloud, check this getting started documentation.

All secrets shown in the GitHub screenshot (two for Prefect and four for AWS) must be added. To avoid typing, you can copy-paste from here. And if you don’t know what’s your AWS_ACCOUNT_ID, you can retrieve it using:

aws sts get-caller-identity --query Account --output text

Note: to make it easier for you to reproduce the entire example, the logic sending a Slack alert was moved to a separate flow. To reproduce it using the Slack alert, you would also need to add a repository secret SLACK_WEBHOOK_URL and uncomment all lines adding that secret to the container — here, here and here.

Deploy the entire infrastructure in a single click

We are now ready to deploy the service to AWS ECS Fargate. To do that, simply run the “Deploy Prefect Streaming Service to AWS ECS” GitHub Actions workflow. You may adjust the CPU and memory requirements:

This workflow will take care of creating a VPC, ECS cluster, and ECR repository. It will automatically package your code into a Docker image and push it to Amazon ECR. Finally, it will deploy your real-time data pipeline to AWS ECS Fargate running as a containerized service.

Once the workflow finishes, you should see a summary and task definition artifact. This summary makes it easy to track which image tag and task definition revision were deployed as part of that commit.

Make sure to download the task definition artifact and commit it to the root directory in your repository. Before you commit this task definition, you may remove fields that will be populated at build time, including the image, taskDefinitionArn, requiresAttributes, registeredAt and registeredBy. All the empty fields can be removed, too.

Observe the real-time data pipelines in your Prefect UI

We can validate in the Prefect Cloud UI that the real-time data pipelines are running — all without having to manually set up and configure any infrastructure. That’s the magic of Prefect 2.0 paired with serverless and automation through Infrastructure as Code and CI/CD with GitHub Actions.

Congrats on building your first fully-automated serverless real-time streaming service orchestrated with Prefect. 🎉

5. Automate future deployments with CI/CD

So far, we set up everything for the initial service, but what if we want to make changes to the flow code or code dependencies?

First, make sure that you add the task-definition.json that was generated as an artifact in the initial GitHub Actions workflow. You can further customize it if you want to.

Making changes to the code

Let’s add a log message with some emojis to make it all more fun. Once we commit and push the changes, another GitHub Actions workflow will take care of deploying a new service to AWS ECS. This happens automatically once we push any changes to the flow code or code dependencies in the main branch.

To test the redeployment process, add this line to your flow, then commit and push the changes (including the task-definition.json):

Make sure that the GitHub Actions workflow main.yml points to the right branch in which your code is located (by default, it should be the main branch).

Now you can push the code and watch the magic of automation. The automatically triggered workflow should redeploy the service — you should be able to validate that by looking at the new revision in the task definition and the new image tag (the tag being your Git commit SHA).

To ensure no downtime and continuity in your real-time data pipeline service, you should see two ECS tasks (containers) running as part of this service during the deployment process. This ensures that your service will remain robust and your data will never stop streaming!

Then, once the new redeployed service is stable and running, the old one gets shut down.

And the best proof that everything is working as expected is provided by the beautiful logs with emojis in the Prefect Cloud UI:

(Optional) One flow run gets stuck in a Running state due to redeployment — why?

When you redeploy your real-time streaming service, both the old and new services will be running at the same time for a very short period of time (a couple of seconds). Then, the old service will be shut down. It’s likely that when the old container instance gets terminated, as a result of this transition, one flow run will remain in a Running state (it can’t finish since the underlying container is gone). In terms of your flow process, everything is fine — one flow run instance is still running at the same time, but it now runs in a different container.

It can be confusing to see this run listed as running in the UI. To get rid of this wisteria-zebra run that couldn’t finish due to a transition into a new service, you can delete this run manually from the UI:

What happens when there are infrastructure issues?

To test the robustness of this setup, we deliberately allocated a very small amount of CPU and memory. But the nice thing about running this as ECS Service is that AWS will automatically redeploy a new ECS task for your streaming service if the container runs out of memory:

Still, to avoid such issues, make sure to allocate enough resources for production deployments.

Limitations of the approach presented in this post

The approach presented in this post is opinionated, and the automated infrastructure deployment works only with AWS. If you decide to approach it differently (e.g., by deploying it to a self-hosted Kubernetes cluster), you are responsible for operationalizing the deployment yourself.

In the future, Prefect will make streaming use cases even easier by operationalizing this process for you as part of the managed Prefect Cloud platform. If you’d like to hear more, reach out.

6. Clean up the resources

One of the greatest benefits of a declarative Infrastructure as Code is that you can easily terminate all resources when they are no longer needed. To make the experience as easy for you as possible, the repository provides another GitHub Actions template called “Delete ECS Cluster, Prefect ECS Service and ECR repository”.

Running this workflow will delete all AWS resources created as part of this tutorial.

Next steps

In this post, we demonstrated how you can orchestrate your real-time data pipelines. With Prefect 2.0 and automation enabled by Infrastructure as Code and CI/CD workflows, building real-time data pipelines becomes easy. You can extend it even further and apply it to many more use cases such as data replication (to keep your staging area always up-to-date), continuously rerunning your dbt models, or continuously retraining your ML models — the possibilities are endless. Prefect will help you make those streaming workflows observable and will support you when things go wrong.

We are still exploring real-time streaming use cases, and we welcome your feedback in this regard. If you want to talk about your real-time workflow use case or ask about anything, you can reach us via our Community Slack.

Happy engineering!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Anna Geller

Anna Geller

Lead Community Engineer at Prefect, Data Professional, Cloud & .py fan. www.annageller.com. Get my articles via email: https://annageller.medium.com/subscribe