The Prefect Blog
Published in

The Prefect Blog

Orchestrating ELT on Kubernetes with Prefect, dbt & Snowflake (Part 2)

A flow of flows: a guide on how to deploy large-scale data pipelines to production

Photo by Andrea Piacquadio from Pexels

This article is the second in a series of tutorials about orchestrating ELT data pipelines. The first post demonstrated how to organize and orchestrate a variety of flows written by different teams and how to trigger those in the correct order using Prefect. This post builds on that by capturing more advanced use cases and showcases how to deploy the entire project to a Kubernetes cluster on AWS.

Table of contents:
· Snowflake configuration
Creating database credentials
SQL alchemy connection
Using the connection to load raw data (Extract & Load)
Turning the extract & load script into a Prefect flow
· dbt configuration
· Deploying your flows to a remote Kubernetes cluster on AWS EKS
1. Building a custom Docker image
2. Pushing the image to ECR
3. Creating a demo Kubernetes cluster on AWS EKS
4. Deploying a Prefect’s Kubernetes agent
Changing the run configuration in your flows to KubernetesRun
Cleanup no longer needed AWS resources
· Building a repeatable CI/CD process
· Next steps

Snowflake configuration

Let’s start by setting up a playground data warehouse in Snowflake. If you don’t have an account yet, you can signup for a 30-day trial here.

Creating database credentials

Once you’re logged in, you can create a database and a user that we’ll leverage in this demo.

Make sure to keep the above credentials somewhere safe — we will need those in the dbt flow.

SQL alchemy connection

Since we’ll be loading data into Snowflake from a Python script, we need to configure the SQL alchemy connection string. To do that, we need to fill in the following placeholders in the connection string:

  • user,
  • password,
  • account_id,
  • database & schema for the session,
  • warehouse,
  • and role.

An example of a complete connection string:

You need to specify the compute warehouse for the queries to be executed within Snowflake. Here, we use the default warehouse COMPUTE_WH added by default when creating an account.

Using the connection to load raw data (Extract & Load)

First, make sure to clone this repository and install all dependent packages using:

Here is an example of how you can use the previously defined connection string to load raw data into Snowflake:

💡 Why do we replace the table on each run? In this example, we treat the table raw_customers as a temporary table, i.e., one that is replaced at each run, rather than appending values or performing a merge logic. The proper historization of this data can be handled later in dbt.

Turning the extract & load script into a Prefect flow

Ingesting raw data into a staging area is typically the first step in a data pipeline. Thus it’s a perfect candidate for parallelization. Let’s use Prefect’s mapping for that:

If you followed part 1 of this article series, you should already have a Prefect project called “jaffle_shop”. If not, you can create it as shown below. Then, you can register and run your flow:

dbt configuration

Now that the process of extracting raw data and loading it into Snowflake is finished, we can configure the dbt transformations and orchestrate those in a Prefect flow. Compared to the previous post, we only need to adjust the DbtShellTask to make it work with Snowflake. Some of the configuration data is provided as Secrets - those Secret values must be set via the Prefect UI or GraphQL API before referencing them in the flow.

How to find out what is your Snowflake account ID? You can find it in your URL, e.g., in the URL shown below, the account ID is

If you followed the setup from the first section of this article, then your SNOWFLAKE_USER name is DBT_USER, and the SNOWFLAKE_ROLE is SYSADMIN. Here is a complete flow example:

You can register both flows by executing:

At this point, you can notice that some configurations, such as storage and run configuration, get duplicated across flows. Additionally, once we are ready to promote the flows from a Local agent to a production Kubernetes cluster, we need to make changes to all flows individually.

To follow the DRY (Don’t Repeat Yourself) principle and make it easier to switch from the development stage to production, we can add the following module to our custom flow_utilities package:

And here is how it can be used in a flow:

The definition of both storage and run configuration is now a matter of importing the configuration functions and calling those on the Flow() constructor.

An added benefit to this approach is that if we would move all flows to some other execution platform in the future, say, an ECSAgent, or a VertexAgent, all that we need to change is this single module.

This concludes our local development with dbt and Snowflake. In the next section, we’ll package and deploy the project to a Kubernetes cluster on AWS.

Deploying your flows to a remote Kubernetes cluster on AWS EKS

Before we can run the flows on a Kubernetes cluster, we will need to:

  1. Build a custom Docker image.
  2. Push the image to ECR — AWS container registry.
  3. Create a demo Kubernetes cluster on AWS EKS.
  4. Spin up a Prefect’s KubernetesAgent that will poll the Prefect backend API for new flow runs and will deploy those as Kubernetes jobs.
  5. Change the run configuration to KubernetesRun.

Let’s dive into each of those steps.

1. Building a custom Docker image

In order to define a Docker image, we can leverage the following Dockerfile:

You can see that:

  • we start with an official base image from Prefect,
  • we then upgrade pip and define a working directory,
  • finally, we copy our custom modules and the corresponding requirements.txt, and we install those as a package.

To build the image, we need a single command:

2. Pushing the image to ECR

Here are the required steps to push the image to the AWS Elastic Container Registry:

  1. Create a repository,
  2. Login to the container registry,
  3. Tag the image using the repository URL,
  4. Finally, push the image to ECR.

And those are the four commands required to push the image we’ve just built to AWS ECR (make sure to replace XXX with your AWS account ID):

3. Creating a demo Kubernetes cluster on AWS EKS

First, make sure to install eksctl. Then, you can use it to create a cluster. The command below will create a demo cluster with just a single managed node.

Eksctl allows for various configurations such as:

Under the hood, eksctl deploys a CloudFormation stack that creates all resources required by your cluster, including a custom VPC, IAM roles, and compute instances.

Once your cluster is ready, you can run the following commands to validate that your current context points to a remote cluster on AWS:

4. Deploying a Prefect’s Kubernetes agent

Prefect’s CLI contains an install command that will generate a YAML declaration with all resources that you need for a Prefect agent. Before using it, make sure to create a service account API key, as described on this documentation page.

Then, run the following command to generate a YAML file:

Note that:

  • the --rbac option adds a Role and RoleBinding,
  • you can choose an arbitrary label and assign it to your agent — as long as you assign the same label to your flow’s KubernetesRun run configuration, your flows will be picked up by this agent and will get deployed as Kubernetes jobs.

Within the YAML file, you can configure settings such as changing the default image, imagePullPolicy, or adding custom environment variables, e.g.:

Once your YAML declaration is ready, you can deploy it to the cluster using:

Finally, you can verify that a new Kubernetes agent is available in your Prefect Cloud UI:

Changing the run configuration in your flows to KubernetesRun

Since we defined the run configuration as part of a custom module, we can modify it there. This change will be applied to all flows during registration:

To prevent using sensitive information such as AWS account ID directly in our code, we defined it as PrefectSecret in the Prefect Cloud UI. This way, we can reference the Secret instead of hardcoding the value in our flow's configuration.

All that is left is to reregister our flows:

In the CLI output, you should see the new flow IDs:

You can use the ID of the parent flow to trigger a flow run directly from the CLI:

Alternatively, you can start the flow run by referencing the flow name, without the ID:

If you have another flow with the same name in another project, you would need to additionally provide the project name:

Regardless of how you initiated the flow run, you can use the UI to observe how Prefect orchestrates multiple flows running on a remote Kubernetes cluster on AWS.

Switching from “Overview” to the “Run Config” tab, you can inspect the execution details, such as the Docker image used to run this flow, which can be helpful for debugging issues in the future.

The cleanup step

Once you have finished this demo, make sure to delete the AWS resources to avoid unnecessary costs:

Building a repeatable CI/CD process

Now that we have all flows in our Github repository and the execution environment is configured, we can look at how to configure a continuous deployment pipeline.

There are many tools that you could use for CI/CD. One of the most popular platforms in this category is CircleCI. You can include it as an integration to your Github account. Then, you can add a directory .circleci within your project’s root directory, and within .circleci, add a file called config.yml:

The above deployment pipeline registers all flows. However, the re-registration will be skipped for flows with no changes to its metadata since the last registration.

Note that we need to add the PREFECT_API_KEY to the CircleCI Project’s environment variables. To do that, go to your Project Settings, and add an API key generated from the Prefect Cloud UI:

Prefect Cloud UI — generate a Service Account API key for the CI/CD tool
CircleCI — add the API key as an environment variable

As soon as you commit and push the .circleci/config.yaml config file (and later, once you push any code changes to the main branch), CircleCI will automatically trigger the build.

💡 Why is the CI/CD process so small? The pipeline demonstrated here is very minimal. You could, for instance, include the process of building a Docker image and pushing it to ECR as part of your deployment pipeline. However, this would slow down the builds and possibly increase the ECR storage costs when pushing to the registry too frequently. Consider this trade-off for your use case, and include the Docker image build process in your deployment pipeline when necessary.

Next steps

This article investigated how to orchestrate an entire ELT project with Prefect and deploy it to production. We looked at how to switch from local development with a Postgres database to a cloud data warehouse such as Snowflake. We built a custom Docker image for the project and pushed it to ECR. After creating a Kubernetes cluster on AWS, we spun up a Prefect Kubernetes agent and deployed all flows to the same cluster. Finally, we explored how to build a repeatable CI/CD process to ensure that all changes merged into the main branch are automatically registered to the Prefect API.

In the next post, we’ll look at how to extend this project even further by triggering dbt Cloud jobs directly from a Prefect workflow, as well as building a resource manager for Snowflake to allow reusing database connections across tasks in your flow.

If anything about what we’ve discussed in this post is unclear, feel free to ask your questions in our community Slack.

We can’t wait to see what you build. Thanks for reading, and happy engineering!



Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store