Prefect & Fivetran: integrate all the tools & orchestrate them in Python
Cloud data ingestion and orchestration made easy
--
⚠️ Note: I no longer work at Prefect. This post may be completely outdated. Refer to Prefect docs and website to stay up-to-date.
What is Prefect?
It’s a flexible framework to build, reliably execute and observe your dataflow while supporting various execution and data access patterns. It lets you turn any Python script into a fully operationalized application.
What is Fivetran?
It’s a data integration platform that allows businesses to replicate data from various systems into a central data warehouse. It offers pre-built connectors for a wide range of data sources, including databases, SaaS applications, cloud storage, event logs, and more. Fivetran can help you keep your data consistent, complete, and up-to-date by automating the data integration process.
Why Fivetran + Prefect?
Prefect coordinates dataflow across your entire stack. With Fivetran, you can easily move data from sources to destinations. Prefect allows you to:
- schedule and observe your data replication syncs,
- add a sync to the part of your pipeline that needs to operate on the most up-to-date data from any given source system,
- ensure reliable dataflow through retries, alerting, and dependency management.
Getting started with Prefect
First, sign up for a free Prefect Cloud account and create a workspace. Then, install Prefect and log into your Cloud workspace from a terminal:
pip install prefect
prefect cloud login
This command will redirect you to your browser to automatically create an API key and CLI profile. This way, your local development machine gets authenticated with the Prefect Cloud workspace.
Then, create a script called myflow.py
:
from prefect import flow
@flow(log_prints=True)
def hello():
print("You're the Prefectionist now!🤗")
if __name__ == "__main__":
hello()
Finally, run your first flow using python myflow.py
. If you navigate to the Cloud UI, you’ll see a flow run that you triggered from your local terminal.
Getting started with Fivetran
To get started with Fivetran, sign up for a trial account. Then, a welcome screen will walk you through the initial setup to configure your first Fivetran sync by selecting a source, destination, and data to integrate.
Connect source
To get started, you can select GitHub as a connector and pick repositories to sync. This will allow you to run SQL queries to analyze your GitHub stars, pull requests, etc.
Connect destination
You can select a destination from a variety of options. Most of them are cloud data warehouses and cloud databases:
For this demo, we’ll pick Snowflake. Once you select the destination, you get a SQL transaction snippet that you can execute in your Snowflake worksheet to set everything up. As soon as you run that script, you can set corresponding fields (user name, password, database, role) in the form on the left:
The same code block is available in the Fivetran documentation. If the connection test passes, you can continue with data selection:
Select data
Some columns can be blocked or hashed for privacy and compliance:
For this demo, select: “Sync all data” and click “Continue”.
New connector: Google Sheets
Another simple connector can replicate data from a Google Sheet. You can create one that looks similar to the following table:
Once you have created a table, go to Data → Named ranges:
Select the first three columns and save that as a named range e.g. Fivetran
. For instance, to set all rows from columns A, B, and C in Sheet1
, you can use Sheet1!A:C
. This is important so that Fivetran knows which rows and columns you want to sync.
In Fivetran, paste your Sheet URL and then select the named range. Then, save and test the connector:
If everything works as expected, you can select “Sync all data” and “Continue”:
Now you can start the initial sync:
After the sync, you should see a new schema and a table corresponding to that Google Sheet in your Snowflake worksheet.
Automate data replication with Prefect & Fivetran
Generate a Fivetran API key
To schedule and orchestrate your syncs, your Prefect flows need to authenticate with Fivetran. To do that, you need an API key. Go to your account settings and generate an API key as shown below:
This will give you an API key identifier and API secret token:
Create a Prefect Block for Fivetran
Install the prefect-fivetran
collection and register a Fivetran block:
pip install prefect prefect_fivetran
prefect block register -m prefect_fivetran
You can create your Fivetran block from the UI or via code. Here is what the block creation process looks like from the UI:
And here is how you can do that with code:
from dotenv import load_dotenv
import os
from prefect_fivetran import FivetranCredentials
load_dotenv()
fivetran_credentials = FivetranCredentials(
api_key=os.environ.get("FIVETRAN_API_KEY"),
api_secret=os.environ.get("FIVETRAN_API_SECRET_KEY"),
)
fivetran_credentials.save("default")
Trigger a Fivetran sync from a Prefect flow
Finally, we can run a flow gsheet.py
to trigger a sync:
from prefect import flow
from prefect_fivetran import FivetranCredentials
from prefect_fivetran.connectors import (
wait_for_fivetran_connector_sync,
start_fivetran_connector_sync,
)
@flow
def example_flow(connector_id: str):
fivetran_credentials = FivetranCredentials.load("default")
last_sync = start_fivetran_connector_sync(
connector_id=connector_id,
fivetran_credentials=fivetran_credentials,
)
return wait_for_fivetran_connector_sync(
connector_id=connector_id,
fivetran_credentials=fivetran_credentials,
previous_completed_at=last_sync,
poll_status_every_n_seconds=60,
)
if __name__ == "__main__":
example_flow("bereft_indices")
How did we get bereft_indices
as the connector ID in the example_flow("bereft_indices")
call? The answer can be found in your Fivetran connector’s Setup
tab:
Triggering a data replication sync
To validate that the Fivetran, Prefect, and Snowflake integration is set up properly, we’ll trigger this flow twice. First, we’ll use the Google Sheet table as-is. Then, we’ll modify some records and inspect whether the sync workflow updated them in the destination. This way, we can validate that the Snowflake table stays in sync with the source system.
Let’s trigger this flow for the first time:
python gsheet.py
The flow run succeeded. You can inspect the same execution in the Fivetran dashboard:
The sync was successful, and nothing has changed in the Snowflake table as a result of the first flow run:
Let’s now change the first name for the first customer from Michael to Mike, remove the customer record for Shawn with ID 2, and add another customer with ID 101:
After making those changes, trigger the Prefect flow again:
python gsheet.py
The flow run took equally long, i.e., it finished the execution after 1m 5s:
We can validate that modified data has been correctly replicated to the Snowflake table: 🎉
Slowly Changing Dimensions
Note that the changes reflecting the update of the first name from Michael to Mike and the deletion of Shawn’s record are not visible in this table. If you are interested in tracking that information as part of your Slowly Changing Dimensions strategy, you need to enable History for that sync. For a list of connectors that support tracking history, check the history mode Fivetran documentation.
Scheduling your syncs
If you want to run this script on schedule, e.g., every day at 9 AM, you can use the following command to create a Prefect deployment:
prefect deployment build --cron "0 9 * * *" gsheet.py:example_flow -n dev -a
And start a Prefect agent that will monitor and execute scheduled runs:
prefect agent start -q default
For more information about remote scheduling, check the Prefect documentation.
Next steps
This post demonstrated how to start with Prefect and Fivetran and create your first data replication syncs from GitHub and Google Sheets to Snowflake. We looked at how you can securely store API keys using Prefect Blocks and orchestrate Fivetran syncs from a Prefect flow.
If anything discussed in this post is unclear, feel free to tag me when asking a question in the Prefect Community Slack or Prefect Discourse.
Thanks for reading, and happy engineering!