Coordinate ELT in 2023 with Airbyte, dbt and Prefect
Orchestrate data ingestion and transformation for Snowflake data warehouse with Airbyte, dbt, and Prefect
When wrangling data on a small team, it’s hard not to rely on tools that help us get event data from the world to our data consumers — they’re just too useful to ignore!
Though, as a data practice scales, it can get difficult to keep track of your tools’ interactions that are not automated and visible.
This tutorial is a quick start to a simple, scalable, and maintainable ELT workflow using an open-source stack and a free-tier data warehouse. We’ll be using the following tools:
- Airbyte to extract data sources and load them to a data warehouse (like Snowflake)
- dbt to transform the data
- Prefect to automate and observe our ecosystem of tools
- GitHub as a source of event data and a place to store our code
For the purpose of illustrating an ELT pattern, we choose a fun question that serves as the motivation for the workflow:
Who are the common contributors to airbyte, dbt and prefect?
Alrighty, let’s get into it!
Table of Contents
· Set up an Airbyte instance
· Create some Airbyte connections
∘ Create a GitHub source connection
∘ Create a Snowflake destination connection
· Create some dbt models to transform the raw data
· Write a Prefect flow to orchestrate our tools
∘ Let’s use some Prefect collections to make our lives easier
· Define a Prefect flow
· Running Airbyte connections
· Run dbt models
· Run some queries against the transformed data
· The whole enchilada
· Running our flow
· … and that’s it!
· Extending this example
· Next steps
Set up an Airbyte instance
This tutorial is based on Open-Source Airbyte. If you are using Airbyte Cloud, you’ll need to wait for the Airbyte Cloud API to programmatically interact with Airbyte from Prefect.
To set up a local Airbyte instance, follow the Airbyte Quickstart guide or use a helm chart.
Create some Airbyte connections
For this tutorial, we’ll use Airbyte’s GitHub source connector to pull data from the GitHub organizations whose repositories are used in this tutorial:
Create a GitHub source connection
All Airbyte needs to connect to GitHub is a valid personal access token.
Let’s go ahead and create a new connection in the Airbyte UI by clicking on the Sources tab and click + New Source. Select the GitHub source connector and pass along your token (or comma-separated list of tokens to distribute load across multiple actors):
For this tutorial, we’ll create a connection for each of the three GitHub organizations we’re interested in, grabbing the repository of interest for each organization.
Create a Snowflake destination connection
To tell Airbyte where to drop our raw data in Snowflake, we’ll need to provide the following information:
- account: the name of your Snowflake account
- username: the name of the user that Airbyte will use in Snowflake
- password: the password for this user
- database: the name of the database that Airbyte will use to write data to
- schema: the name of the schema that Airbyte will use to write data to
Let’s go ahead and create a new connection in the Airbyte UI by clicking on the Destinations tab and then clicking on + New Destination. Select the Snowflake destination connector and pass in the information above.
💡 Note: While you’ve got this info handy, you can also use it to connect your dbt CLI to Snowflake. See the dbt docs for more info — psst…
dbt debug
is your friend!
Create some dbt models to transform the raw data
Now that we’ve given Airbyte the ability to pull data from GitHub and write it to Snowflake, we can use dbt to transform the raw data into a format that’s more useful for our downstream consumers (like BI tools, fun dashboards and downstream dbt models).
While not the focus of this tutorial, the source code for the dbt models we’ll be using can be found in the linked repository.
In this tutorial, we’ll create two models: common_commiters and common_issue_submitters. These will create views in Snowflake containing the committers and issues submitters that are common across all three repositories. We’ll then be able to query these views and check out the overlap between the three communities.
Write a Prefect flow to orchestrate our tools
It’s time to connect our tools with Prefect!
Let’s use some Prefect collections to make our lives easier
pip install prefect-airbyte prefect-dbt prefect-snowflake
… and now let’s import some of their useful functionality for our flow:
from prefect_airbyte import AirbyteConnection, AirbyteServer
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect_snowflake.database import SnowflakeConnector, snowflake_multiquery
Define a Prefect flow
It’s often useful to take a step back and think about each step we want to take in our flow. In this case, we want to:
- trigger existing Airbyte connections to drop raw data into our Snowflake database
- run dbt models to transform the raw data into something more useful
- query the transformed data in Snowflake to find the answer to our original question
Let’s dive into the code for each of these steps!
Running Airbyte connections
Lucky for us, the AirbyteConnection
block defined in prefect-airbyte
takes care of the hard work for us. If we tell it which connection to trigger and which AirbyteServer
to use, it'll do the rest!
Let's create our AirbyteConnection
blocks, all on the same AirbyteServer
:
airbyte_server = AirbyteServer(server_host="localhost", server_port=8000)
airbyte_connections = [
AirbyteConnection(
connection_id=connection_id,
airbyte_server=airbyte_server
)
for connection_id in [
"fa8d5164-ca22-47da-83e0-829cc86a70b8", # Airbyte Github Stats
"8e3a3ef1-6c2d-4255-99d4-21e9e542d853", # DBT Github Stats
"980c14d9-2992-46a7-a1dc-2b92e60e1475" # Prefect Github Stats
]
]
…and a task to manage their execution:
@task
def run_airbyte_sync(connection: AirbyteConnection) -> AirbyteSyncResult:
job_run = connection.trigger()
job_run.wait_for_completion()
return job_run.fetch_result()
💡 Note: All Prefect blocks that implement the
JobBlock
interface (likeAirbyteConnection
) have atrigger
method that returns aJobRun
, which haswait_for_completion
andfetch_result
methods.
Run dbt models
To trigger dbt run
, you can leverage the predefined Prefect task from the dbt collection called trigger_dbt_cli_command
. You can pass the command to run and the path to your dbt project as follows:
dbt_result = trigger_dbt_cli_command( # run dbt models
command="dbt run",
project_dir="github_common_contributors",
wait_for=airbyte_results
)
Notice the use of wait_for
here. This tells Prefect that we want to make sure the airbyte tasks have successfully dropped raw data in Snowflake before we try to transform it with dbt.
💡 Note: If you’re using dbt Cloud, you can use the functionality defined in
prefect_dbt.cloud
instead.
Run some queries against the transformed data
Assuming we now have transformed data, we need to tell Prefect how to connect to Snowflake and which queries to run. The first part can be accomplished by creating a SnowflakeConnector
block:
💡 Note: A
SnowflakeConnector
needs a set ofSnowflakeCredentials
to authenticate with Snowflake. As with any block, you can create this in the UI or programmatically.
Now we can use our SnowflakeConnector
to connect to the right place in Snowflake and snowflake_multiquery
(another pre-built task) to ask questions about the transformed data that lives there:
common_authors, common_issue_submitters = snowflake_multiquery(
queries=[
"select login from commit_authors",
"select login from issue_submitters"
],
snowflake_connector=SnowflakeConnector.load("github-contributors"),
wait_for=[dbt_result]
)
In the same way as before, we use wait_for
to make sure the dbt models have successfully run before we try to query the transformed data.
The whole enchilada
Pretty nifty, huh? Let’s put it all together into a @flow
:
@flow(log_prints=True)
def my_elt_flow():
# run airbyte syncs
airbyte_results = run_airbyte_sync.map(airbyte_connections)
# run dbt models
dbt_result = trigger_dbt_cli_command(
command="dbt run",
project_dir="github_common_contributors",
wait_for=airbyte_results
)
# query our fancy new tables
common_authors, common_issue_submitters = snowflake_multiquery(
queries=[
"select login from commit_authors",
"select login from issue_submitters"
],
snowflake_connector=SnowflakeConnector.load("github-contributors"),
wait_for=dbt_result
)
# log the common contributors among our repos
print(
f"Common authors: {common_authors} "
f"Common issue submitters: {common_issue_submitters}"
)
if __name__ == "__main__":
my_elt_flow()
Running our flow
To run our flow locally, we can simply run the script:
python elt_orchestrator.py
…and watch it go 🚀
Turns out dependabot
is the only common contributor since last August — laaaaaaaame.
… but oh well — we have a nice and sturdy ELT pattern now.
Let’s create a deployment for our flow so we can run it when and where we want.
To do this, we need to make two decisions:
- where do we want to run our flow?
- where do we want to store our flow?
To keep things simple, I’ll run our code locally and keep it in GitHub (where it is, anyway!).
In terms of prefect deployments, that means I’m using the default Process infrastructure and a GitHub filesystem block.
The command to create the deployment is:
prefect deployment build flows/elt_orchestrator.py:my_elt_flow \
-n "elt-pipeline" \
-sb github/demo-repo \
-a
💡 Note: If we wanted to use a different runtime infrastructure (like Kubernetes), we could specify that in the
prefect deployment build
command by passing-ib kubernetes/my-k8s-job-template
.-a
tells prefect to go ahead andapply
the deployment.
With our new deployment created, we can hop over to the UI and run it manually or attach an interval/cron schedule.
… and that’s it!
We now have an intelligent Prefect workflow that will not waste time/resources running downstream tasks if upstream tasks fail.
When the flow fails (as it probably will at some point), it’s easy to identify and triage issues by observing the failing flow run in the Prefect UI!
If we ever want to change how or when our flow executes, we can do so without ever changing our code!
- change how often it runs → update the deployment schedule in the UI
- change where we store our code → add new storage block, rebuild deployment with
prefect deployment build ... -sb block-type/name
(same goes for infrastructure block determining where it runs)
Extending this example
While this pattern is sophisticated enough to handle many teams’ ELT workflows, there are many ways we could extend it:
- parameterize
my_elt_flow
to accept an arbitrary number of Airbyte Connections or dbt commands to run - create flow runs from this deployment via API calls when you know Airbyte has new data to sync instead of on a schedule
- use the Airbyte sync row count in
AirbyteSyncResult
to decide how many threadsdbt
should run! - do basically anything you want in Python at any point.
Next steps
If you haven’t already, you can create a Prefect Cloud account for free to serve as your launch pad into a world of more effective data engineering!
Questions? Feel free to ask them in our Slack Community!
Happy Engineering!