Copy data from Cloud SQL to BigQuery using Apache Airflow

TLDR; link to code repo at the bottom with an example Airflow DAG.

When Google made Apache Airflow a managed service in GCP last year I was enthusiastic — mostly because I had looked into airflow before, and I found it nice to have a task scheduler that is written in actual code instead of point and click. Airflow was the first proper task scheduler for GCP and prior to this, if you wanted a scheduler you would have to use a third party service or cron scheduler . Cron is fine if you have tasks like “ping that every minute” or “reload cache every hour”, but once you start having tasks like “load data from that source, then put it into another source, then email someone”, you really need a task manager. Because your jobs will crash, and then you need to restart something in the middle and kick off an entire pipeline. You do not want to create those kinds of scripts without a framework!

Almost a year has passed since Airflow made it to GCP, and while there has been many good additions that makes GCP and BigQuery integration easier, there is no documentation yet that covers how to move data from Cloud SQL to BigQuery in a repetitive manner. Hopefully there will come an easy way of moving data from Cloud SQL to BigQuery in the near future.

First solution: automate with gcloud and Cloud Composer!

What Google has done well, however, is gcloud , the toolbox of GCP. You can do whatever you can do in the GUI (and even a bit more), and in the GUI you can do exports and imports.

The solution I will present now is best suited for batch like data transfers in the “daily” range. Say, if you wanted to move your data warehouse tables to BigQuery every night.

With gcloud it is possible to trigger an export job (to CSV) to Cloud Storage for a table in Cloud Composer. Since table export also takes a custom SQL you can make the export a bit smarter, so you don’t have to do a full copy every day.

So my initial plan looked something like this:

  1. Export table to Cloud Storage
  2. Import into BigQuery staging area
  3. Possibly a BigQuery SQL to merge and join.

After working on this for some hours I ran into a very irritating bug that required me to alter the pipeline slightly. For some reason Google doesn’t know how to export Cloud SQL to CSV properly. If you have a NULL value in the CSV the export is broken (“foo”,””,”bar” becomes “foo”,”N”,”bar”). The bug has been around for quite some time, so to use exports, we have to work around it.

In my case it could be solved by some script hacking and creative uses of gsutil and sed. Relevant code can be found here.

The other cases I hadn’t really thought about when I started this task was the schema. With just a CSV file, you will get very generic names on the columns (colum1, colum2, etc). When I first wrote this, I was planning on adding a JSON file containing the data model for each table to be synced, but then I thought… why not grab it from MySQL? Then we don’t have to spend (manual) time on maintaining a separate data model.

Fortunately it is pretty easy to add new tasks to Airflow with dependencies needed, so that went really well.

Revisited steps:

  1. Create a bucket in the same location as where your BigQuery data set is, this will temporary save data for you. You probably want to set retention policy to a fixed time (for example 1 week), because you will hold quite some duplicate data here.
  2. Export table to Cloud Storage as CSV, using gcloud
  3. Export MySQL schema (column name, type) as CSV, using gcloud
  4. Create a BigQuery JSON schema from the exported CSV file, and fix some data types, like MySQL DATE being exported as YYYY-mm-dd hh:mm:ss, while BigQuery wants this as YYYY-mm-dd only
  5. Fix NULL values in the export due to this bug (https://cloud.google.com/sql/docs/mysql/known-issues#import-export) — basically exported NULL values will break the CSV. Using gsutil and sed.
  6. Import it to bigquery

Disclaimer: I have not tested how step 4 works with very large files.

Example output

Step 1: The bucket

Uhm. Just create the bucket in the UI. Grant access to the Composer service account.

Step 2: Example Bash export script

If you were to write a Bash script to do this for you it might look something like this:

gcloud --project foo-bar sql export csv my-sql-instance gs://my-bucket/export_table_YYYYMMDD --database=production --query="SELECT * from example"
Tip! I am postfixing with YearMonthDay so the files can linger around for some time. In the buckets I am exporting to I have a 14 day retention, before stuff is deleted. Usually a job will crash at some point, and having some data to look at makes debugging a lot easier.

I will use the script created above as a template for my Airflow pipeline.

Once you have the data in Cloud Storage, a lot of options open up on what to do with it next. Moving it into BigQuery from here is a lot easier (Dataflow, Cloud Functions, query federation, probably many more)

Step 3 & 4: Getting schema from Cloud SQL (MySQL)

We also need to get the schema:

gcloud --project foo-bar sql export csv my-sql-instance gs://my-bucket/export_table_YYYYMMDD --database=production --query="SELECT COLUMN_NAME,DATA_TYPE  FROM INFORMATION_SCHEMA.COLUMNS  WHERE TABLE_SCHEMA = '{{ params.export_database }}' AND TABLE_NAME = '{{ params.export_table }}' order by ORDINAL_POSITION;"

That command will export the schema to a CSV file, and then a small script can transform it into a BigQuery schema. That small script can also clean table names so you don’t get columnns like “strange column with / and weird characters”. You can find the Python script in GitHub

Step 5: Fix broken CSV

gsutil cp gs://the/bucket/and/file.csv - \
| sed 's/,"N,/,"",/g' | sed 's/,"N,/,"",/g' | sed 's/^"N,/"",/g' | sed 's/,"N$/,""/g' \
| gsutil cp - gs://the/bucket/and/file.csv

Idea from https://issuetracker.google.com/issues/64579566#comment22

Step 6: Example Bash import script

A BigQuery import command looks something like this:

bq --project foo-bar --location=EU load --replace --source_format=CSV my_dataset.my_table_YYYYMMDD gs://my-bucket/export_table_YYYYMMDD export_table_YYYYMMDD_schema.json

As you can see, I am assuming I have a schema file here (created by the previous script). BigQuery has the option of autodetecting the schema, but I have found that for more complex tables it’s a smart thing to create an explicit schema.

Final thoughts

I have added a sample on GitHub where you can gather some ideas on how to do this for yourself. It fully syncs 3 tables (2 dimensions and one fact) to BigQuery.

https://github.com/ael-computas/gcp_cloudsql_airflow_bigquery

A more flexible solution would be to read from the Cloud SQL database directly, using its API. For example create a new Airflow operator that does this using streaming API (and loosing the glcoud/gsutil/bg commands). This would require a Cloud SQL proxy to run in the Cloud Composer Kubernetes cluster, and possibly some other steps.

Maybe I will come back with a followup to create that operator!