Coordinate ELT in 2023 with Airbyte, dbt and Prefect

Orchestrate data ingestion and transformation for Snowflake data warehouse with Airbyte, dbt, and Prefect

Nate Nowack
The Prefect Blog
8 min readJan 31, 2023

--

When wrangling data on a small team, it’s hard not to rely on tools that help us get event data from the world to our data consumers — they’re just too useful to ignore!

Though, as a data practice scales, it can get difficult to keep track of your tools’ interactions that are not automated and visible.

This tutorial is a quick start to a simple, scalable, and maintainable ELT workflow using an open-source stack and a free-tier data warehouse. We’ll be using the following tools:

  • Airbyte to extract data sources and load them to a data warehouse (like Snowflake)
  • dbt to transform the data
  • Prefect to automate and observe our ecosystem of tools
  • GitHub as a source of event data and a place to store our code

For the purpose of illustrating an ELT pattern, we choose a fun question that serves as the motivation for the workflow:

Who are the common contributors to airbyte, dbt and prefect?

Alrighty, let’s get into it!

Set up an Airbyte instance

This tutorial is based on Open-Source Airbyte. If you are using Airbyte Cloud, you’ll need to wait for the Airbyte Cloud API to programmatically interact with Airbyte from Prefect.

To set up a local Airbyte instance, follow the Airbyte Quickstart guide or use a helm chart.

Create some Airbyte connections

For this tutorial, we’ll use Airbyte’s GitHub source connector to pull data from the GitHub organizations whose repositories are used in this tutorial:

Create a GitHub source connection

All Airbyte needs to connect to GitHub is a valid personal access token.

Let’s go ahead and create a new connection in the Airbyte UI by clicking on the Sources tab and click + New Source. Select the GitHub source connector and pass along your token (or comma-separated list of tokens to distribute load across multiple actors):

Airbyte UI when creating a GitHub source connector

For this tutorial, we’ll create a connection for each of the three GitHub organizations we’re interested in, grabbing the repository of interest for each organization.

Create a Snowflake destination connection

To tell Airbyte where to drop our raw data in Snowflake, we’ll need to provide the following information:

  • account: the name of your Snowflake account
  • username: the name of the user that Airbyte will use in Snowflake
  • password: the password for this user
  • database: the name of the database that Airbyte will use to write data to
  • schema: the name of the schema that Airbyte will use to write data to

Let’s go ahead and create a new connection in the Airbyte UI by clicking on the Destinations tab and then clicking on + New Destination. Select the Snowflake destination connector and pass in the information above.

💡 Note: While you’ve got this info handy, you can also use it to connect your dbt CLI to Snowflake. See the dbt docs for more info — psst… dbt debug is your friend!

Create some dbt models to transform the raw data

Now that we’ve given Airbyte the ability to pull data from GitHub and write it to Snowflake, we can use dbt to transform the raw data into a format that’s more useful for our downstream consumers (like BI tools, fun dashboards and downstream dbt models).

While not the focus of this tutorial, the source code for the dbt models we’ll be using can be found in the linked repository.

In this tutorial, we’ll create two models: common_commiters and common_issue_submitters. These will create views in Snowflake containing the committers and issues submitters that are common across all three repositories. We’ll then be able to query these views and check out the overlap between the three communities.

Write a Prefect flow to orchestrate our tools

It’s time to connect our tools with Prefect!

Let’s use some Prefect collections to make our lives easier

pip install prefect-airbyte prefect-dbt prefect-snowflake
it’s 2023, use a venv

… and now let’s import some of their useful functionality for our flow:

from prefect_airbyte import AirbyteConnection, AirbyteServer
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_snowflake.database import SnowflakeConnector, snowflake_multiquery

Define a Prefect flow

It’s often useful to take a step back and think about each step we want to take in our flow. In this case, we want to:

  • trigger existing Airbyte connections to drop raw data into our Snowflake database
  • run dbt models to transform the raw data into something more useful
  • query the transformed data in Snowflake to find the answer to our original question

Let’s dive into the code for each of these steps!

Running Airbyte connections

Lucky for us, the AirbyteConnection block defined in prefect-airbyte takes care of the hard work for us. If we tell it which connection to trigger and which AirbyteServer to use, it'll do the rest!

Let's create our AirbyteConnection blocks, all on the same AirbyteServer:

airbyte_server = AirbyteServer(server_host="localhost", server_port=8000)

airbyte_connections = [
AirbyteConnection(
connection_id=connection_id,
airbyte_server=airbyte_server
)
for connection_id in [
"fa8d5164-ca22-47da-83e0-829cc86a70b8", # Airbyte Github Stats
"8e3a3ef1-6c2d-4255-99d4-21e9e542d853", # DBT Github Stats
"980c14d9-2992-46a7-a1dc-2b92e60e1475" # Prefect Github Stats
]
]

…and a task to manage their execution:

@task
def run_airbyte_sync(connection: AirbyteConnection) -> AirbyteSyncResult:
job_run = connection.trigger()
job_run.wait_for_completion()
return job_run.fetch_result()

💡 Note: All Prefect blocks that implement the JobBlock interface (like AirbyteConnection) have a trigger method that returns a JobRun , which has wait_for_completion and fetch_result methods.

Run dbt models

To trigger dbt run, you can leverage the predefined Prefect task from the dbt collection called trigger_dbt_cli_command. You can pass the command to run and the path to your dbt project as follows:

dbt_result = trigger_dbt_cli_command( # run dbt models
command="dbt run",
project_dir="github_common_contributors",
wait_for=airbyte_results
)

Notice the use of wait_for here. This tells Prefect that we want to make sure the airbyte tasks have successfully dropped raw data in Snowflake before we try to transform it with dbt.

💡 Note: If you’re using dbt Cloud, you can use the functionality defined in prefect_dbt.cloud instead.

Run some queries against the transformed data

Assuming we now have transformed data, we need to tell Prefect how to connect to Snowflake and which queries to run. The first part can be accomplished by creating a SnowflakeConnector block:

Prefect UI while creating a `SnowflakeConnector` block

💡 Note: A SnowflakeConnector needs a set of SnowflakeCredentials to authenticate with Snowflake. As with any block, you can create this in the UI or programmatically.

Now we can use our SnowflakeConnector to connect to the right place in Snowflake and snowflake_multiquery (another pre-built task) to ask questions about the transformed data that lives there:

common_authors, common_issue_submitters = snowflake_multiquery(
queries=[
"select login from commit_authors",
"select login from issue_submitters"
],
snowflake_connector=SnowflakeConnector.load("github-contributors"),
wait_for=[dbt_result]
)

In the same way as before, we use wait_for to make sure the dbt models have successfully run before we try to query the transformed data.

The whole enchilada

Pretty nifty, huh? Let’s put it all together into a @flow:

@flow(log_prints=True)
def my_elt_flow():

# run airbyte syncs
airbyte_results = run_airbyte_sync.map(airbyte_connections)

# run dbt models
dbt_result = trigger_dbt_cli_command(
command="dbt run",
project_dir="github_common_contributors",
wait_for=airbyte_results
)

# query our fancy new tables
common_authors, common_issue_submitters = snowflake_multiquery(
queries=[
"select login from commit_authors",
"select login from issue_submitters"
],
snowflake_connector=SnowflakeConnector.load("github-contributors"),
wait_for=dbt_result
)

# log the common contributors among our repos
print(
f"Common authors: {common_authors} "
f"Common issue submitters: {common_issue_submitters}"
)

if __name__ == "__main__":
my_elt_flow()

Running our flow

To run our flow locally, we can simply run the script:

python elt_orchestrator.py

…and watch it go 🚀

shell output after running our flow locally

Turns out dependabot is the only common contributor since last August — laaaaaaaame.

Photo by Thomas Park on Unsplash

… but oh well — we have a nice and sturdy ELT pattern now.

Let’s create a deployment for our flow so we can run it when and where we want.

To do this, we need to make two decisions:

  • where do we want to run our flow?
  • where do we want to store our flow?

To keep things simple, I’ll run our code locally and keep it in GitHub (where it is, anyway!).

In terms of prefect deployments, that means I’m using the default Process infrastructure and a GitHub filesystem block.

The command to create the deployment is:

prefect deployment build flows/elt_orchestrator.py:my_elt_flow \
-n "elt-pipeline" \
-sb github/demo-repo \
-a

💡 Note: If we wanted to use a different runtime infrastructure (like Kubernetes), we could specify that in the prefect deployment build command by passing -ib kubernetes/my-k8s-job-template. -a tells prefect to go ahead and apply the deployment.

With our new deployment created, we can hop over to the UI and run it manually or attach an interval/cron schedule.

Note the “Schedule: ✏️ Add” button on the right side

… and that’s it!

We now have an intelligent Prefect workflow that will not waste time/resources running downstream tasks if upstream tasks fail.

When the flow fails (as it probably will at some point), it’s easy to identify and triage issues by observing the failing flow run in the Prefect UI!

If we ever want to change how or when our flow executes, we can do so without ever changing our code!

  • change how often it runs → update the deployment schedule in the UI
  • change where we store our code → add new storage block, rebuild deployment with prefect deployment build ... -sb block-type/name (same goes for infrastructure block determining where it runs)

Extending this example

While this pattern is sophisticated enough to handle many teams’ ELT workflows, there are many ways we could extend it:

  • parameterize my_elt_flow to accept an arbitrary number of Airbyte Connections or dbt commands to run
  • create flow runs from this deployment via API calls when you know Airbyte has new data to sync instead of on a schedule
  • use the Airbyte sync row count in AirbyteSyncResult to decide how many threads dbt should run!
  • do basically anything you want in Python at any point.

Next steps

If you haven’t already, you can create a Prefect Cloud account for free to serve as your launch pad into a world of more effective data engineering!

Questions? Feel free to ask them in our Slack Community!

Happy Engineering!

--

--

Nate Nowack
The Prefect Blog

hi, I’m Nate. I’m a software engineer with Prefect. I’m a lover of music, mexican food, and automating boring things with python.