Airflow meets bigquery-to-datastore
I described a tool to export a BigQuery table to Google Datastore in the article. Of course, the tool would be helpful when you make a novel version of a data product by CLI.
In this article, I will describe that airflow can enhance
bigquery-to-datastore , making it a scheduled job. I would like to give you a hint to use
bigquery-to-datastore in production.
bigquery-to-datastore is a tool to export a BigQuery table to Google Datastore. We don’t have to be bothered by its schema, and we just execute one command to do that.
bigquery-to-datastore - Export a whole BigQuery table to Google Datastore with Apache Beam/Google Dataflowgithub.com
In this regard, I think we can regard BigQuery as not only a data analysis platform, but also a distributed processing platform. As you know, BigQuery is super fast, has a bunch of convenient functions as well as it is easy to learn. Many people can join in developing scalable data products.
BigQuery with Airflow
As you probably know, apache airflow has a various operators related to BigQuery:
BigQueryOperator executes bigquery SQL queries. It allows us to store a result of a query to a bigquery table;
BigQueryTableDeleteOperator can delete bigquery tables;
BigQueryToBigQueryOperator allows us to copy from a bigquery table to another; as well as
BigQueryIntervalCheckOperator allow us to check a result of a bigquery SQL query. That’s why we don’t basically need to implement any functions for bigquery any more in airflow.
As I described above, we can completely operate bigquery with airflow. We are able to make scheduled jobs, making the most of airflow’s bigquery operators and
bigquery-to-datastore . I will give you an example to use it.
Now, consider that we would like to give the users' stats pages in a blog site like Medium. Besides, a stats page includes the number of users who read a page by day. To do that, we must calculate the stats and then store the result to any data store.
In the example DAG(directed acyclic graph), there are three tasks. First, a task of
BigQueryTableDeleteOperator deletes a bigquery table if there is existing one. Unfortunatelly,
BigQueryOperater under airflow 1.8.2 doesn’t suppert the write disposition, such as
WRITE_TRUNCATE . Hence, we can’t over-write a table with executing a query directly. That’s why we need to delete one just in case beforehand. Secondly, a task of
BigQueryOperator stores the result of a query to another. Finally, a task will execute
bigquery-to-datastore to transfer the table to Google Datastore.
# Delete a BigQuqery table in just case
delete_table_task = BigQueryTableDeleteOperator(
deletion_dataset_table option means the target bigquery table which we would like to delete,
ignore_if_missing option means tha task will ignore when the target table doesn’t exist.
query = """
DATE(event_time) AS dt,
COUNT(DISTINCT user_id) AS users
WHERE DATE_DIFF(CURRENT_DATE(), dt, DAY) <= 30
GROUP BY 1
insert_table_task = BigQueryOperator(
destination_dataset_table means the bigquery table which we would like to store the result of bigquery SQL query,
user_legacy_sql=False means the query is executed as a standard SQL.
bash_command = """
java -cp /var/lib/dataflow/bigquery-to-datastore-bundled.jar
bigquery_to_datastore_task = BashOperator(
Finally, we just define the dependencies between those tasks.
In this article, I described how to combine airflow with
bigquery-to-datastore. We are able to make scalable data products using airflow and
bigquery-to-datastore super easily. I know it could not satisfy all of what you want. If you can avoid a system not using However, it allows people who is not so familiar with bigdata products to building scalable data products.
We don’t have to do everything with machine learnig. If you can build something without machine learning, it would be better. In such a case, I am sure what I described in the article would be helpful.