Data Warehousing Made Easy with Google BigQuery and Apache Airflow

Marcus Baitz
Jun 18 · 10 min read

Prefer reading code over blog posts? Head over to our Github repository to reproduce our example use case.


Google is releasing new Business Intelligence (BI) and Data Warehousing (DWH) services at high speed. Most recently, the BigQuery BI Engine is a fast, in-memory analysis service that allows you to analyze data stored in BigQuery. Furthermore, you can run Apache Airflow in a managed Kubernetes cluster with Google Cloud Composer to author, schedule, and monitor your ELT workflows. In my opinion, together with Data Studio for creating and sharing interactive reports and dashboards, this is the best combination to manage business data.

In this blog post, we illustrate how we use Google Cloud Platform (GCP) to take on BI, ELT, and DWH projects. In these projects, we frequently encounter similar tasks and challenges that can be summarized as follows:

  • Listen to new data arriving in Google Cloud Storage (or any of the many other supported data sources)
  • Insert only new data (change data capture) into the BigQuery DWH
  • No side effects, such as duplicates, after re-runs of our data pipelines (idempotence) to allow for data reprocessing
  • Managing everything as code, to allow rebuilding the pipeline and DWH from scratch at any time
  • Provide fast dashboards and efficient BI tooling to a variety of different stakeholders

In our customer projects, we integrate constantly growing transaction data with master data to allow for an up-to-date BI experience. For this project, we use a clear, self-explanatory example from the world of football (soccer, that is, for our American readers).


Example Scenario: English Premier League

We want to integrate scorer data from the English Premier League as it arrives during a matchday into BigQuery to build live dashboards such as the league table, the top scorer ranking list, or live match results.

  • During a matchday, for every new goal, we receive a new CSV file with updated scorer information in a Cloud Storage bucket. These files contain overlapping data records!
  • Our warehousing pipeline, an Apache Airflow workflow, integrates (only) the new data into a BigQuery Data Warehouse.
  • Using BigQuery, we can provide interactive data exploration as well as a set of dynamic SQL views: The current league table results for completed matches, a list of recent results, and a ranking of the top scorers of the league.

In the following sections, we focus on the most important steps in defining a warehousing pipeline for the GCP using Apache Airflow. If you’d rather follow along with our example scenario by deploying the pipeline to the GCP yourself, you can skip to here.


Ingesting Data into BigQuery with Apache Airflow

Our warehousing pipeline needs to cover three steps to ingest new data from Google Cloud Storage into our BigQuery data warehouse:

  1. Copy the new data from Google Cloud Storage to a staging table in BigQuery.
  2. Run a SQL MERGE query between the staging table and the warehouse table in BigQuery to ensure we capture duplicated data records only once.
  3. Move the processed data out of our ingestion prefix into the storage bucket, so we don’t process an ever-growing amount of data.

This functionality is implemented as an Airflow DAG that makes use of the GCP-specific GoogleCloudStoragePrefixSensor operator to react to new data in our storage bucket.

Since the three tasks we describe above are identical for every data source and entity in our typical ELT workflow, we provide generic DAG creation logic, which uses entity names and schemata from a configuration file to instantiate the appropriate Airflow DAG. Note how the logical layout of the workflow can be seen in gc_tasks.py line 16 and line 49 in premier_league_data_to_gc.py. wait_for_data, i.e., the GooglePrefixSensor instance, blocks the workflow until new data is detected at the defined ingestion path. Incoming data then triggers the remaining workflow, which is comprised of operators that first copy the data to BigQuery, execute the MERGE SQL query to integrate new records into the DWH, mark the files as processed to avoid unnecessary reprocessing, and finally rerun the same DAG once again. That last operation is a concession to the fact that workflows in Airflow are, well… acyclic, and need to be rerun to stay active after a completed execution.

Top-level function to create a new warehousing workflow as an Airflow DAG that integrates with Google Cloud Platform services
ELT workflow creation. Tasks 1 to 3 manage copying data from storage to BigQuery, merging only new or changed data into the DWH, and marking processed data.

These three steps are the same for almost all data warehousing pipelines and do not differ depending on the use case. You might want to store sales data for your analytics team or warehouse sensor data from your IoT cluster, and you will be able to do so by reusing this exact Airflow DAG and changing only some table names and schemata.


Performant Data Access and Dashboarding in BigQuery

Once we have ingested scorer and matchday data into our warehouse tables in BigQuery, we can create views to gain live insights from our data, safely share these views with other teams, and use BigQuery views and tables as the basis for building reports and dashboards using Data Studio.

As an example, the following block of SQL code can create a view of match results by grouping reported goals that belong to the same match.

The SQL query that populates the match result view in BigQuery

Using this SQL allows to easily implement a live league table or top scorer statistics as views. A screenshot of this view after processing data for the entire 2018/19 season can be found here.

Especially useful for time-sensitive queries on very large data sets, Google BI Engine enables highly concurrent query execution on BigQuery views and tables for dashboards built with DataStudio. This allows analysts to build highly reactive, interactive dashboards and reports on top of your warehouse data.


Reprocessing

Reprocessing input data is a common task not only during the development of your pipeline, but it may also be necessary after identifying and fixing bugs in a production system or demo to customers. Generally, it is a good idea to be able to recreate your warehouse data as well as your DWH setup itself from scratch.

The DWH pipeline implemented by the aforementioned Airflow workflow is idempotent. You can delete parts or all of the warehouse data in BigQuery, then reprocess all input data by moving the contents of your Google Storage bucket’s `processed` directory to its parent directory, and it will recreate the exact same state of the DWH’s contents.

In addition to data reprocessing, the repository for this example project contains an initialization script that sets up the DWH in a new Google Cloud Composer environment and deploys all warehousing pipelines from scratch. This allows us to deploy a production system with exactly the same configuration as the one used during development or in a testing environment, by executing one script.


Deploying the warehouse

Prerequisites

If you want to run the setup script from your local machine, you may need to install some prerequisite tools. However, you can skip this step by running the contents of the initialization script from the Google Cloud Shell. If you want to do the latter, skip to Environment Initialization.

To recreate the example scenario and deploy the data warehouse to the GCP, you’ll need to have some prerequisite tools installed on your local development machine

You can find instructions on how to install the Google Cloud SDK for your platform here. If you have the SDK installed already, update all components and in any case make sure to install the kubectl component:

jq comes with most Linux distros but may need to be installed on Mac OS, which can be done via homebrew or by downloading the binary from their website.

Google Cloud Platform Billing Account

We are going to create a new GCP project for the deployment of the data warehouse and warehousing pipelines. Some of the services and resources used in this setup require you to connect the project with a billing account. Of course, you are free to use any billing account you may already use (remember to shut down the deployment, though), but if you are new to the GCP you can activate the free tier for your Google Account and activate a 12-month free trial that includes $300 credit.

Environment Initialization

If you did not install the prerequisite tools on your local computer, you can check out the project repo and run all initialization and deployment commands on the Google Cloud CLI. To do so, log into the console using your Google account credentials and click the CLI button >_ in the navigation bar.

Start by cloning our Github Repo to your machine and change into the project’s root directory.

Now simply run the initialization and deployment script located at ./scripts/google_init.sh to deploy the Airflow workflows to Google Cloud Composer in a new environment. You will be prompted to enter your organization and billing account ids. While the organization id is optional, the billing account id is required. Before each prompt you will see a list of all organizations and billing accounts that are available to you, so you can simply copy and paste the ids from the respective list.

The initialization script does most of the heavy lifting and makes setting up a new DWH project a breeze

The script will then set up a new project, link it to your provided billing account, activate Cloud Composer, Cloud Storage, and BigQuery, and deploy your Airflow DAGs for data ingestion. This can take up to 20 minutes.

Take note of the id for the newly created project, which is output after the setup is completed. You will need it to upload data to your associated Cloud Storage bucket and to access the Airflow Web UI and logs.

Head to Google Cloud Composer, where you should see a list of environments containing at least one environment that has just been created for the new project. If you see a green checkmark to the left of the environment’s name, the setup and deployment are completed. You can find a link to the Airflow web UI in the column Airflow Webserver. Clicking on it will land you in the overview page for the DAGs or workflows that have been deployed during the setup. The matchweek_data_to_gc and scorer_data_to_gc DAGs are the ingestion workflows that we have talked about in some detail.

Apache Airflow DAGs overview. The highlighted “Play” buttons manually trigger the corresponding DAGs, which is necessary after the workflows have been deployed for the first time.

Let’s upload some data to Google Cloud Storage to test our data ingestion DAGs and see if scored goals and matches are correctly copied to BigQuery. We provide a script that handles the data upload, so back on your local machine or the Google Cloud CLI run google_upload_data.sh.

The provided data upload script requires only your Cloud Platform project’s ID

As the script is running, you will see the *_to_gc DAGs jump into action in the Airflow Web UI. Clicking on one of their names will even show you, which of the DAG’s operators is currently running (select Graph View for the depicted visualization). This view also depicts which operator was responsible for a failed run.

Apache Airflow Web UI, Graph View visualization of the matchweek_data_to_gc DAG

Finally, after all of our data has successfully been loaded into BigQuery, head to its UI to investigate the created tables and views. Select any view or table, click the Query View/Table button, complete the SQL query in the Query editor with a *, and hit run to have a look at its contents.

Comparing the Data

The view league_table shows the whole state on the current data. With that, we can now compare it with the official data.

The overall status of the premier league data

Export the Data to Analyze

Hitting the Export button allows you to open the resource in Data Studio to build live dashboards and reports based on the tables and views in BigQuery.

Google BigQuery UI (available at after project setup) with the newly created resources, content query, and export buttons highlighted

To recap: So far we have integrated scorer and matchday data from Google Cloud storage into BigQuery using our Apache Airflow DAGs. We are now ready to build interactive reports using Data Studio.

Navigate to the Data Studio reports page, create a new report (+), and choose a BigQuery table or view as the data source. In our case, we chose the scorer table to create a simple, interactive report on the top scorers of the season and how the number of total goals developed over time. But feel free to play around with the different views and tables that were created with the project as sources and with the variety of visualizations available through Data Studio.

Simple Data Studio report on total goals scored per player based on the BigQuery scorer table

Remember to shut down your resources

To avoid incurring unnecessary costs or eating up your GCP trial budget, make sure to shut down all resources in the newly created environment. The easiest way to do so is deleting the entire project and making sure you were successful.

Deleting the Google Cloud Platform project

Conclusion

Google Cloud Composer is Apache Airflow as a service. It enables incorporating other Google Cloud Platform services into your tasks and workflows in a fast and simple manner, to deliver functionality that is bigger than each individual component. We use Cloud Composer to manage ELT and DWH pipelines in the cloud, still having access to all the perks of Apache Airflow: Testability, monitoring, and task retries, just to name a few. In this blog post, we share an exemplary use case showcasing not only the ease of setting up a warehousing pipeline using Cloud Composer but also some of the capabilities you are unlocking by employing a cloud-based data warehouse using BigQuery.

bakdata

bespoke data engineering

Thanks to Frederic Schneider, Lawrence Benson, Philipp Schirmer, Christoph Böhm, and Sven Lehmann.

Marcus Baitz

Written by

bakdata

bakdata

bespoke data engineering