Learn Airflow and BigQuery by making an ETL for COVID-19 data

Shaked Mor
7 min readFeb 14, 2022

--

Apache Airflow is an excellent tool for managing workflows, and more specifically, ETL/ELTs.
In this tutorial we are going to create a simple ETL, while practicing a few Airflow key concepts.

Technologies which will be used in this article:

  • Apache Airflow
  • Google BigQuery — Google’s data warehouse. Don’t worry if you don’t know it, it has very simple APIs and is ANSI SQL compliant.
  • Google Cloud Composer — Airflow hosted on Google Cloud. Used here to avoid the hassle of installing and configuring Airflow (don’t even think about running it locally on Windows)
  • Python 3

ETL Overview

As shown, our goal is to take data regarding COVID-19 confirmed cases, found in Our World in Data’s Github, add some fields to it, and load the data to BigQuery to be analyzed.

Thus, we will build the ETL step-by-step following these steps:

  1. Read the data from OWID’s GitHub using pandas.
  2. Drop non-relevant columns from the dataset, and add a new column with an added metric.
  3. Load the processed data into BigQuery, and make sure it was loaded.
  4. Combine all the seperate parts inside a DAG, and upload it to Airflow.

Before we begin — a little Airflow background and terminology

  • DAG — our ETL will be created in the form of a DAG.
    In Airflow, a DAG is basically a Python script that defines a set of tasks and the dependencies between them, so that they can be depicted as a Directed Acylcical Graph (hench the name “DAG”).
This is the DAG that we’ll create in this article.
  • Each node in the DAG is called a task. In Airflow, tasks are created by instantiating objects called Operators (we’ll see it in the code in the following steps).
  • There are different Operators for different types of tasks we want our DAG to carry out, e.g., PythonOperator for running Python functions.

Part 1: Reading the data from GitHub

Inside OWID’s COVID-19 data repository there’s a .csv file containing a comprehensive time-series, including many metrics for each country in the world.

In our favorite IDE we’ll create a file called extract.py and write the following:

A few important points:

  • Notice how we read from the raw GitHub in order to access the .csv file itself, and not get a webpage.
  • We get a datetime parameter in the Extract object so we would be able to run the code on any date, not just the current date.

Part 2: Transforming the data using pandas

Once we got the data in DataFrame format, we can start to change it.
For this example we’ve added two columns that help us grasp the daily scaled confirmed cases, in relation to all the other countries in the world. To do that, we’ll calculate the standard score (aka Z score) of the daily cases per million column.

*Statistics will not be discussed in this article, as the main focus is the ETL itself, and not the data’s content.

Let’s create a file called transform.py:

A few important points:

  • The Transform object recieves the DataFrame Extract.execute_extraction() returns.

Part 3: Loading data into BigQuery

To load our data to BigQuery we’ll need to a few things first:

  1. Create a new project in google cloud, create a BQ dataset in it, and a table in which we will store the data from the ETL.
  2. Update the table’s data schema to match the data we want to load into the table. The schema should look somewhat like this:
This can either be done in the BigQuery UI via web browser, or through other means.

3. Make sure the environment you’re running has the following Python libraries: google-cloud-bigquery, pyarrow, pandas.
If you are using Cloud Composer: Simply add the libraries in the cloud composer UI.

If you are running Airflow locally: Run the following command using pip: pip install google-cloud-bigquery[pandas,pyarrow]. At the time of writing I used version 2.28.0 because later versions were causing errors.

Then, create a file called load.py and write the following:

A few important points:

  • The Load object gets the transformed DataFrame from the Transform object.
  • Notice we need to write the name of our BigQuery project, dataset, and table in the table_id variable, which we supply to the BigQuery job config.
  • Critical remark: We need to authenticate our requests to BigQuery. To do so we’ll have to supply our Google service account.
    If you are using Cloud Composer: As long as there aren’t any permission problems, the default connection parameters should be set in your environment, and you should be able authenticate with the above code.
    If you are running Airflow locally: You need to authorize your requests, and that requires a bit of configuration — could be in code, or in environment. Reference these for the how-to: BigQuery API docs, googleapis.dev docs.

As for the data check, we are just going to be using an Operator called BigQueryCheckOperator, that runs a query and expects a single record (see the documentation).

Part 4: writing a DAG with all the parts, and uploading it to Airflow

In Airflow a DAG usually follows this structure:

  1. Creating a DAG object instance, and supplying it arguements.
  2. Defining tasks using Operators, e.g., creating a PythonOperator and passing it a Python function (the DAG object is passed to Operator).
  3. Lastly, setting the order and dependencies of the tasks.

So let’s create our DAG!

Here we’ll create a file called owid_to_bq.py, so assuming the following file structure:

owid_to_bq.py
owid_covid_dag/
--------------extract.py
--------------transform.py
--------------load.py

We’ll write the following:

The key points to understand here(And there are a few):

  • Notice the args we passed to DAG object, wether via the default_args dict or through the constructor. These mean different things about how the DAG would behave and run. You can research each one in Airflow’s documentation.
  • In run_etl() we are using a parameter passed to function called ds, this is an Airflow variable that holds the logical date for the DAG’s run. Read the docs on the different variables here.
  • The same ds is later used in the data check, which inserts the parameter to the table using jinja.
  • In this DAG, the logical date is used to reference the desired date for each DAG run. Because we’re using this logical date, the DAG could theoretically automatically run for every date, regardless of the actual time when it is running.
  • this DAG was run in 2022–02–14, but the start_date parameter was defined as 2022–02–01. That means that when we upload it to Airflow, the DAG will run for each date it had “missed” up until now (unless catchup=False).
  • At the end of the DAG, we defined that t2 is the upstream task for t1 (and if you scroll up to see the Directed Acyclical Graph, we could see just that).
    Upstream means that t2 would execute only after t1 has successfuly finished to execute.

Recap: Upload our DAG to Airflow

So We’ve written all the neccesary code and now we just need to upload it to Airflow.
The next step is to copy the files that we have been working on to the dags folder, which resides in Airflow’s home folder (Cloud Composer allows uploading it through Google Storage).

Airflow should recognize the DAG automatically

Now because of the args we set, Airflow will run the ETL for each of the dates between the start_date and today (not including).
We can see that in the DAG’s tree view:

The circles are DAG runs, and the squares are the individual task runs.

Now let’s check in BigQuery to see if the data was loaded:

We see that the data was loaded correctly! Now we can run an additional query, for example to check the amount of records that were inserted for each date:

select t.date,count(*) as record_countfrom DATASET_NAME.TABLE_NAME tgroup by t.dateorder by t.date
We can see that as expected, we have results starting from the start_date

Conclusion

We used Airflow to build an ETL that:

  1. Retrieves data from GitHub in .csv format.
  2. Processes it.
  3. Loads it into Google BigQuery.
  4. Checks each time that the data has indeed been loaded.

I hope you’ve found this guide as helpful, make sure to give it claps so others will be able to find it.

--

--