Airflow for Google Cloud: Part 1 — BigQuery

You know Big Data… it’s a dirty business. All the literature shows you how powerful all those data crunchers and query engines are, but it all assumes that all the data is ready to be consumed. In reality a lot of automated Dataflow, Spark and BigQuery ETL processes are glued together with bash or Python.

Well it’s time to change that… and to take a look at Apache Airflow. Airflow is a workflow engine that will make sure that all your transform-, crunch- and query jobs will run at the correct time, order and when the data they need are ready for consumption. No more writing a lot of fragile boilerplate code to schedule, retry and wait: just focus on the workflow.

A lot of work has been purred into Airflow in 2016 to make it a first class workflow engine for Google Cloud. This Medium series will explain how you can use Airflow to automate a lot of Google Cloud products and make it so that your data transitions smoothly from one engine into another. I won’t dive into how to construct DAG’s into Airflow, you should read the docs on that. Basically all DAG’s are build up though Python objects… I will be focusing on the Google Cloud integration.

BigQuery integration

In this first part we’ll explain how you can automate BigQuery tasks from Airflow.

Note: The series talks about the upcoming Airflow 1.8, make sure you have the latest verion. You can now pip install airflow.

BigQuery is a very popular for interactive querying very large datasets, but I also love to use it for storing a lot of temporary data. I do prefer it over files on Cloud Storage because you can do some ad-hoc exploratory queries on it. So let’s get start using Airflow to get data in and out of BigQuery.

Queries

The first BigQuery integration is executing a query and having the output stored in a new table, this is done with the BigQueryOperator. The operator takes a query (or a reference to a query file) and an output table.

If you look at the destination_dataset_table you will notice the template parameter. Airflow uses the power of jinja templates for making your workflow more dynamic and context aware. In our example it will fill in the ds_nodash with the current execution_date. Execution date in Airflow is the contextual date of your data. For example if your calculation some metrics for the 4th of July the execution_date will be 2017–07–04T00:00:00+0.

You can even use jinja templates in your referenced queries. This is powerful tool because in general the BigQuery API doesn’t support parameters, but with the template engine you can simulate this.

For a full list of whats possible and what kind of parameters are available in the context have a look at the Airflow macro documentation.

In our example the above query file lives on the filesystem next to the example DAG.

Extract to storage

Next to querying you want to get your data in and out of BigQuery. Let’s start with extracting to Cloud Storage.

Extracting is a very common scenario and done with BigQueryToCloudStorageOperator. People familiar with the BigQuery API will recognise a lot of the parameter, but basically you have to specify the source table and the destination storage object pattern. It’s important that you specify a pattern though (example: …/part-.avro). The pattern will make sure if you have huge tables that you have multiple storage objects per extract.

Notice again the use of the jinja template parameter. In this case we read variables defined in Airflow, very useful for differentiating between different environments like production or test.

Load from storage

But before doing queries and extraction you need to get the data into BigQuery. That’s done with the GoogleCloudStorageToBigQueryOperator.

Compared to extraction, the load operator does have a few more parameters. The reason is that you need to tell BigQuery a bit of metadata of the imported object are, like the schema and the format. You do this by specifying the schema_fields and source_format.

Another important parameter to specify is the create_disposition and write_disposition. I like my operations repeatable so you can run them again and again, so CREATE_IF_NEEDED and WRITE_TRUNCATE are good defaults. Just make sure your tables can handle it, so use partitioned tables. For now you’ll have to use the classic partitioning with the date suffixed in the name. In a next later release of Airflow we will add full support for the new BigQuery partitioning.

Next in this series we’ll have a look a Airflow driving Dataproc, see you then.


The examples are taken from the Airflow for Google Cloud integration test and examples: https://github.com/alexvanboxel/airflow-gcp-examples