Airflow meets bigquery-to-datastore

I described a tool to export a BigQuery table to Google Datastore in the article. Of course, the tool would be helpful when you make a novel version of a data product by CLI.

In this article, I will describe that airflow can enhance bigquery-to-datastore , making it a scheduled job. I would like to give you a hint to use bigquery-to-datastore in production.

What is bigquery-to-datastore

bigquery-to-datastore is a tool to export a BigQuery table to Google Datastore. We don’t have to be bothered by its schema, and we just execute one command to do that.

In this regard, I think we can regard BigQuery as not only a data analysis platform, but also a distributed processing platform. As you know, BigQuery is super fast, has a bunch of convenient functions as well as it is easy to learn. Many people can join in developing scalable data products.

BigQuery with Airflow

As you probably know, apache airflow has a various operators related to BigQuery: BigQueryOperator executes bigquery SQL queries. It allows us to store a result of a query to a bigquery table; BigQueryTableDeleteOperator can delete bigquery tables; BigQueryToBigQueryOperator allows us to copy from a bigquery table to another; as well as BigQueryValueCheckOperator and BigQueryIntervalCheckOperator allow us to check a result of a bigquery SQL query. That’s why we don’t basically need to implement any functions for bigquery any more in airflow.

Airflow meets bigquery-to-datastore

As I described above, we can completely operate bigquery with airflow. We are able to make scheduled jobs, making the most of airflow’s bigquery operators and bigquery-to-datastore . I will give you an example to use it.

Now, consider that we would like to give the users' stats pages in a blog site like Medium. Besides, a stats page includes the number of users who read a page by day. To do that, we must calculate the stats and then store the result to any data store.

Directed Acyclic Graph with Airflow to

In the example DAG(directed acyclic graph), there are three tasks. First, a task of BigQueryTableDeleteOperator deletes a bigquery table if there is existing one. Unfortunatelly, BigQueryOperater under airflow 1.8.2 doesn’t suppert the write disposition, such as WRITE_TRUNCATE . Hence, we can’t over-write a table with executing a query directly. That’s why we need to delete one just in case beforehand. Secondly, a task of BigQueryOperator stores the result of a query to another. Finally, a task will execute bigquery-to-datastore to transfer the table to Google Datastore.

# Delete a BigQuqery table in just case
delete_table_task = BigQueryTableDeleteOperator(
dag=dag,
task_id='delete_table',
deletion_dataset_table="stats.page_views",
bigquery_conn_id='google_cloud_default',
ignore_if_missing=True,
)

where deletion_dataset_table option means the target bigquery table which we would like to delete, ignore_if_missing option means tha task will ignore when the target table doesn’t exist.

query = """
SELECT
page,
DATE(event_time) AS dt,
COUNT(DISTINCT user_id) AS users
FROM event_log
WHERE DATE_DIFF(CURRENT_DATE(), dt, DAY) <= 30
GROUP BY 1
"""
insert_table_task = BigQueryOperator(
dag=dag,
task_id='insert_table',
bigquery_conn_id='google_cloud_default',
bql=query,
destination_dataset_table="stats.page_views",
allow_large_results=True,
use_legacy_sql=False)

where destination_dataset_table means the bigquery table which we would like to store the result of bigquery SQL query, user_legacy_sql=False means the query is executed as a standard SQL.

bash_command = """
java -cp /var/lib/dataflow/bigquery-to-datastore-bundled.jar
com.github.yuiskw.beam.BigQuery2Datastore
--project=YOUR_GCP_PROJECT
--runner=DataflowRunner
--gcpTempLocation=gs://dataflow/dataflow-staging/
--tempLocation=gs://udataflow/dataflow-staging/
--inputBigQueryDataset=stats
--inputBigQueryTable=page_views
--outputDatastoreNamespace=stats
--outputDatastoreKind=PageViews
--keyColumn=page
--workerMachineType=n1-standard-4
--maxNumWorkers=5
bigquery_to_datastore_task = BashOperator(
dag=dag,
task_id="bigquery_to_datastore",
bash_command=bash_command,
)

Finally, we just define the dependencies between those tasks.

delete_table_task.set_downstream(insert_table_task)
insert_table_task.set_downstream(bigquery_to_datastore_task)

Conclusion

In this article, I described how to combine airflow with bigquery-to-datastore. We are able to make scalable data products using airflow and bigquery-to-datastore super easily. I know it could not satisfy all of what you want. If you can avoid a system not using However, it allows people who is not so familiar with bigdata products to building scalable data products.

We don’t have to do everything with machine learnig. If you can build something without machine learning, it would be better. In such a case, I am sure what I described in the article would be helpful.