Cloud Composer/Apache Airflow, Dataform & BigQuery

Usman Ali
Google Cloud - Community
4 min readMar 28, 2021

How to use Cloud Composer and Dataform together

Disclaimer: Opinions are my own and not the views of my employer

Cloud Composer is Google’s fully managed version of Apache Airflow and is ideal to write, schedule and monitor workflows. Google recently acquired Dataform which is everything about Transform in ELT (Extract Load Transform). This is a great news for BigQuery lovers as Dataform can help manage scripts, create dependencies, add data quality checks (aka assertions), document, maintain versions and push changes into different environments with native git integration. It offers UI, CLI and APIs to do all this, in short its awesome!

So if you are using or planning to use BigQuery as your DWH, Cloud Composer as your main orchestration tool and are fond of ELT then you have come to the right place. I will be showcasing how these products can talk to each-other based on a typical use-case as below:

As shown above, Composer is the main Orchestrator of all this + simple loading using gcs_to_bq Operator into BigQuery (which can replaced by any other EL(T) solution as well). Transformation is kept in SQLX in Dataform. There are plenty of good articles already on the Dataform so I am using one of them created by Lak which is solving BigQuery ML use-case using Dataform as below:

Dataform has its own scheduler too but normally you would want to run your Transformation after Loading is done and, for that sort of dependency chaining across different applications, Composer is coming to the rescue again.

So lets cut the chase and get to the code:

Waiting for a file in GCS

Our Transformation needs input data to be loaded into BigQuery. Assume that this data will arrive into GCS by an external job and we need to wait for that. For this we can use GoogleCloudStorageObjectSensor operator as below:

from airflow.contrib.sensors.gcs_sensor import GoogleCloudStorageObjectSensorgcs_sensor_task = GoogleCloudStorageObjectSensor(
task_id=”gcs_object_sensor”,
bucket=’gcs-bucket’,
object=’test-files/census_adult_income.csv’,
dag=dag
)

Load from GCS to BigQuery

Once data has arrived into GCS, we can use GoogleCloudStorageToBigQueryOperator operator to load into BigQuery using code below. You can alternatively specify schema in JSON as well, more details here.

from airflow.contrib.operators import gcs_to_bqload_gcs_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id=’gcs_to_bq_load’,
bucket=’gcs-bucket’,
source_objects=[‘test-files/census_adult_income.csv’],
destination_project_dataset_table=’gcs-bucket.ds.census_adult_income’,
schema_fields=[
{‘name’: ‘age’, ‘type’: ‘INTEGER’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘workclass’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘functional_weight’, ‘type’: ‘INTEGER’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘education’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘education_num’, ‘type’: ‘INTEGER’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘marital_status’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘occupation’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘relationship’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘race’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘sex’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘capital_gain’, ‘type’: ‘INTEGER’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘capital_loss’, ‘type’: ‘INTEGER’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘hours_per_week’, ‘type’: ‘INTEGER’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘native_country’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
{‘name’: ‘income_bracket’, ‘type’: ‘STRING’, ‘mode’: ‘NULLABLE’},
],
skip_leading_rows=1,
write_disposition=’WRITE_TRUNCATE’,
dag=dag)

Call Dataform for Transformation

Data loaded by the code above into BigQuery table ‘census_adult_income’ is an input to transformations in Dataform, so it’s time to run Dataform job. As of now there isn’t any Dataform operator in Composer however we can use REST API call via PythonOperator to call and poll for job to finish iteratively. More details on Dataform APIs here.

Although you are going to use Composer for orchestration you are still going to need to create a schedule in Dataform for REST API call. You can disable that as shown below:

Rest API also needs a project Id (not to be confused with GCP Project Id), I couldn’t find a place in Dataform UI which mentions it explicitly however its visible in the URL as highlighted below, so copy it from there (and I will update here if I find a better way!).

You can create Dataform API key as below:

And make the call:

from airflow.operators.python_operator import PythonOperator
import requests
import json
import time
def dataform_def():
base_url=’https://api.dataform.co/v1/project/<Project Id here>/run'
headers={‘Authorization’: ‘Bearer <Dataform Key>’}
run_create_request={“environmentName”: “<environment name here>”, “scheduleName”: “<schedule name from dataform here>”}
response = requests.post(base_url, data=json.dumps(run_create_request), headers=headers)run_url = base_url + ‘/’ + response.json()[‘id’]
response = requests.get(run_url, headers=headers)
while response.json()[‘status’] == ‘RUNNING’:
time.sleep(10)
print(‘Dataform job running’)
response = requests.get(run_url, headers=headers)
print(response.json())
return ‘Dataform job finished’run_dataform = PythonOperator(
task_id=’dataform_transform’,
python_callable=dataform_def,
dag=dag,
)

Stitching it together

Last part is to chain it all together in a Composer DAG:

gcs_sensor_task >> load_gcs_csv >> run_dataform

Move your DAG in Composer bucket:

gsutil cp census_elt_dag.py gs://composer-bucket

Composer will automatically pick this new DAG:

You can schedule and monitor it directly from Composer.

Thats it! Ping me for any questions or suggestions for improvement.

--

--