Orchestrate Fivetran using Apache Airflow

Rajesh Ku. Rout
Apache Airflow
Published in
7 min readMar 15, 2023

In today’s data-driven world, it’s essential to have a robust data pipeline that can handle large volumes of data and automate the ETL process. Fivetran is a cloud-based ETL tool that can help you extract data from various sources and load it into your data warehouse. Airflow, on the other hand, is a platform to programmatically author, schedule, and monitor workflows. In this blog, we’ll explore how to orchestrate Fivetran with Airflow to create a seamless data pipeline. We’ll cover the basics of Fivetran and Airflow, how to set up Fivetran as a source in Airflow, and how to schedule and monitor the data pipeline. By the end of this blog, you’ll have a clear understanding of how to use Fivetran and Airflow together to create a scalable and reliable data pipeline.

Running Fivetran in Airflow with operators, sensors, and hooks

Airflow defines data pipelines as directed acyclic graphs, or DAGs, that are built mostly of tasks called Operators and Sensors. The Fivetran provider enables the creation of FivetranOperators and FivetranSensors. All that is needed to run Fivetran in Airflow is the API Key, API Secret, and connector ID(s) from Fivetran and the Fivetran provider from the Astronomer registry. The API Key and API secret are configured as an Airflow Connection and the connector ID(s) are configured as Airflow Variables.

FivetranOperator

In Airflow, an operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs. The FivetranOperator validates that the connector ID supplied exists and is valid for the API Key and Secret given, changes the connector’s schedule type to run on a schedule defined by Airflow instead of Fivetran, and finally calls for the start of a Fivetran sync.

FivetranSensor

Sensors are a special kind of operator. When they run, they will check to see if certain criteria are met before they complete and let their downstream tasks execute. A FivetranSensor will monitor the status of a Fivetran sync and allow the DAG to progress as soon as data has been fully loaded into a warehouse.

FivetranHook

The FivetranHook is included in our provider but should be accessed and modified by advanced users only. This code is where Airflow interacts with Fivetran REST API so that complicated logic to start and monitor Fivetran jobs is abstracted away from the FivetranOperator and FivetranSensors in order to make them as readable and easy to use as possible.

Follow the below steps to create a DAG that will orchestrate your Fivetran Connectors

Step 1: Download the Fivetran provider

To get started, install the Fivetran provider package in your Airflow environment by running pip install airflow-provider-Fivetran on your airflow Instance.

pip install airflow-provider-Fivetran

Step 2: Restart Airflow Webserver and Scheduler ( Optional )

If you are using Docker, Kubernetes, or Airflow Managed Services like AWS, Azure, or GCP, then skip this step.

Webserver:-

airflow webserver -p 8080
  • 8080 is port no.

Scheduler:-

airflow scheduler

Step 3: Create API Keys in your Fivetran Account

Click on your Account NameAPI KeyGenerate Key

Fivetran Homepage → API Key Generation

If you will see your Fivetran Connector, sync_frequency is managed by Fivetran UI. We can arrange the frequency as per our need by scrolling it.

Sync Frequency is managed by Fivetran UI

Now we will orchestrate this with Airflow.

Step 4: Create a Connection through your Airflow UI

Go to your Airflow UI → Click on Admin Connections → Click on + Icon as shown below

Click Admin → Connections

In the Edit Connection Page, Enter the API key and Secret Key you generated in the Fivetran Dashboard by step 3 as shown below

Test the Connection

Give this connection a name that you will refer to later in the DAG to call Fivetran Connector through Airflow Fivetran Operator.

Step 5: Create a DAG

We will use Fivetran Operator and Sensor to create tasks in Airflow.

a) Import FivetranOperator and FivetranSensor in your DAG code

from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor

b) Pass the Connection ID we gave for the Fivetran Connection made on Airflow UI

Fivetran_Con_ID is the connection name for Fivetran Connection

Here, ‘fivetran_conn_id’ is the default parameter for the connection id Fivetran will look for. We can pass this in the default args and in the individual task as well. But as per the best practice, if you have more than one task for a specific provider, then pass the connection ID for that provider (Fivetran, AWS, Snowflake, etc.) in the default args.

Default args parameters are passed to all the tasks by default

c) Create two tasks

Two jobs would be required since the connector will be started by the Fivetran Operator, while the Fivetran Sensor will keep track of the sync and run the downstream task or take some other action after the successfully loaded data reaches the destination.

dag = DAG(
dag_id="Airflow-Fivetran-DAG",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:

fivetran_op = FivetranOperator(
task_id="Fivetran-Operator",
connector_id="Your connector ID",
) # This task will trigger the Fivetran Connector

fivetran_sensor = FivetranSensor(
task_id="Fivetran-Sensor",
connector_id="Your connector ID ",
poke_interval = 5

) # This task will monitor the sync process of that connector untill it
# gets a condition as Failed or Success

fivetran_op >> fivetran_sensor

Connector_id:- Go to your Fivetran Account → Click on the Connector you want to trigger through Airflow → Click Setup Copy the Connector ID

Connector ID for the Google Sheet Connector

As we have passed the Connection ID in the default args, Fivetran will catch the connection ID in the back end and make a connection to your Fivetran account.

d) Trigger the DAG

Unpause the DAG to run automatically. We have set up a scheduled interval that will make DAG run automatically when you unpause it.

Unpause the DAG

You can see both Tasks in the DAG

When DAG runs, we can see our connector also starts syncing. Check your Fivetran Account for the same.

Google Sheet Connector starts syncing

Before, the sync_frequency was managed by the Fivetran UI.

Sync Frequency is managed by Fivetran UI

But now, the connector’s sync frequency is managed by the Fivetran API. This means that to initiate sync for this connector, Airflow uses the Fivetran API.

Sync_frequency → Controlled by Fivetran API

In conclusion, I would say orchestrating Fivetran with Airflow can help you create a robust and scalable data pipeline that automates the ETL process. By using Fivetran as a source in Airflow, you can easily extract data from various sources and load it into your data warehouse, while Airflow helps you schedule and monitor the entire process. With this integration, you can save valuable time and resources, reduce errors, and ensure that your data pipeline is always up-to-date. I hope this blog has provided you with a clear understanding of how to use Fivetran and Airflow together to create a seamless data pipeline. So, go ahead and try it out for yourself, and see how it can simplify your data management process. Thank you!

Check out my blog on connecting AWS services and orchestrating them through Airflow. Click here

— — — — —Snowflake Core Certification 2023 Practice Sets Course -— — — —

If you’re looking to advance your knowledge and skills in data warehousing and analytics, consider enrolling in the Udemy Snowflake Core Certification course. Snowflake is rapidly becoming one of the most popular cloud-based data warehousing platforms available today. As businesses increasingly rely on data to make critical decisions, having the skills and expertise to manage and analyze that data effectively is more important than ever.

https://www.udemy.com/course/snowflake-snowpro-core-certification-2023-practice-sets/?referralCode=D9755936C300A61FA7A4

Clearing the Snowflake Core Certification is a great way to demonstrate your expertise in using Snowflake to manage and analyze data. It is a globally recognized certification that is highly valued by employers, and having it on your resume can help you stand out in a crowded job market.

By taking this course and clearing the Snowflake Core Certification, students can not only demonstrate their expertise to potential employers but also gain the skills needed to succeed in a data-driven world. So if you’re looking to advance your career in data warehousing and analytics, consider enrolling in my Udemy Snowflake Core Certification course today.

--

--

Rajesh Ku. Rout
Apache Airflow

Apache Airflow Global Champion Snowflake Squad Member 2024 --- Data Scientist - www.linkedin.com/in/rajeshrout97