Copy data from cloud SQL to BigQuery using apache airflow/cloud composer part 2

Anders Elton
Compendium
Published in
5 min readJun 3, 2019

In my previous post I explained how to load data from cloud SQL into bigquery using command line tools like gcloud and bq. In this post I will go though an example on how to load data using apache airflow operators instead of command line tools. Doing it this way has a few advantages like cleaner code, less hacks needed to get stuff working and more failsafe. For example: We do not have to worry about cloud sql export jobs limit, or export to csv file bugs.

I am using gcp managed airflow that runs in kubernetes — cloud composer. The image version that runs the jobs in this example is: composer-1.6.1-airflow-1.10.1

Configure cloud sql proxy

The first thing we need to do now is to make the cloud composer cluster to be able to talk to the cloud sql instance. This can be achieved by firewall rules, but I think that using cloud sql proxy is a cleaner solution.

Thankfully, google has provided us with everything we need to spin up sql proxies. We only need two yaml files describing the deployment and service.

Deployment file: replace caps lock with own values. It is also possible to specify a service account, but in my case I want the SA to be the cluster SA. ( I will put an example in the github repo using postgreSQL with SA as well)

Working deployment files are here:

https://github.com/ael-computas/gcp_cloudsql_airflow_bigquery/tree/master/yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
run:
mysql-dvh-sqlproxy
name: mysql-dvh-sqlproxy
namespace: default
spec:
replicas:
1
selector:
matchLabels:
run:
mysql-dvh-sqlproxy
strategy:
rollingUpdate:
maxSurge:
1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
creationTimestamp:
null
labels:
run:
mysql-dvh-sqlproxy
spec:
volumes:
- name: ssl-certs
hostPath:
path:
/etc/ssl/certs
containers:
- image: gcr.io/cloudsql-docker/gce-proxy:1.11
volumeMounts:
- name: ssl-certs
mountPath: /etc/ssl/certs
command: ["/cloud_sql_proxy",
"-instances=PROJECT_ID:REGION:DATABASE=tcp:0.0.0.0:3306"]
imagePullPolicy: Always
livenessProbe:
exec:
command:
- /bin/sh
- -c
- netstat -tlnp | grep -i cloud_sql_proxy
failureThreshold: 3
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
name: mysql-dvh-sqlproxy
ports:
- containerPort: 3306
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
dnsPolicy: ClusterFirst
restartPolicy: Always
securityContext: {}
terminationGracePeriodSeconds: 30

The service is pretty straight forward:

kind: Service
apiVersion: v1
metadata:
labels:
run:
mysql-dvh-sqlproxy
name: mysql-dvh-sqlproxy-service
spec:
ports:
- port: 3306
protocol: TCP
targetPort: 3306
selector:
run:
mysql-dvh-sqlproxy

This means that the service will go into the default namespace, which is fine.

“kubectl apply -f “ on both files.

Ensure that the deployment and service runs in your kubernetes cluster. You might need to enable apis and fix service account permissions. The logs should be pretty helpful here.

Note, I have seen an operator that installs the proxy for you, but i have never tried it myself.. If you are scared of yaml files, check out https://airflow.readthedocs.io/en/latest/howto/connection/gcp_sql.html it might be what you are looking for.

Configure Airflow connection

After the kubernetes cluster reports that the deployment and service runs, you need to configure the connection string in airflow.

You can give the conn id any name, just remember that this is the identification you are going to use from code as well! Host is the same as the servicename, kubernetes will take care of dns magic for you.

Create the DAG

Once this is configured, take a look at these operators:

Compared to my previous post, the flow is now a lot simpler:

  • Export to cloud storage, schema in json
  • import to bigquery from storage.
  • Cloud storage should have a policy making files autodelete after some time.

The code should look something like this

class TableConfig:
STANDARD_EXPORT_QUERY = None
_STANDARD_EXPORT_QUERY = "SELECT * from {}"

def __init__(self,
cloud_sql_instance,
export_bucket,
export_database,
export_table,
export_query,
gcp_project,
stage_dataset,
stage_table,
stage_final_query,
bq_location
):

self.params = {
'export_table': export_table,
'export_bucket': export_bucket,
'export_database': export_database,
'export_query': export_query or self._STANDARD_EXPORT_QUERY.format(export_table),
'gcp_project': gcp_project,
'stage_dataset': stage_dataset,
'stage_table': stage_table or export_table,
'stage_final_query': stage_final_query,
'cloud_sql_instance': cloud_sql_instance,
'bq_location': bq_location or "EU",
}


def get_tables():
dim_tables = ["DimAge", "DimPerson"]
fact_tables = ["FactPerson"]
export_tables = dim_tables + fact_tables
tables = []
for dim in export_tables:
tables.append(TableConfig(cloud_sql_instance='CLOUD_SQL_INSTANCE_NAME',
export_table=dim,
export_bucket='YOUR_STAGING_BUCKET',
export_database='prod',
export_query=TableConfig.STANDARD_EXPORT_QUERY,
gcp_project="YOUR_PROJECT_ID",
stage_dataset="YOUR_STAGING_DATASET",
stage_table=None,
stage_final_query=None,
bq_location="EU"))
return tables

def gen_export_table_task(table_config):
export_task = MySqlToGoogleCloudStorageOperator(task_id='export_{}'.format(table_config.params['export_table']),
dag=dag,
sql=table_config.params['export_query'],
bucket=table_config.params['export_bucket'],
filename="cloudsql_to_bigquery/{}/{}".format(table_config.params['export_table'],
table_config.params['export_table']) + "_{}",
schema_filename="cloudsql_to_bigquery/schema/{}/schema_raw".format(table_config.params['export_table']),
mysql_conn_id="gcp_dvh_cloudsql")
return export_task


def gen_import_table_task(table_config):
import_task = GoogleCloudStorageToBigQueryOperator(
task_id='{}_to_bigquery'.format(table_config.params['export_table']),
bucket=table_config.params['export_bucket'],
source_objects=["cloudsql_to_bigquery/{}/{}*".format(table_config.params['export_table'],
table_config.params['export_table'])],
destination_project_dataset_table="{}.{}.{}".format(table_config.params['gcp_project'],
table_config.params['stage_dataset'],
table_config.params['stage_table']),
schema_object="cloudsql_to_bigquery/schema/{}/schema_raw".format(table_config.params['export_table']),
write_disposition='WRITE_TRUNCATE',
source_format="NEWLINE_DELIMITED_JSON",
dag=dag)

return import_task


"""
The code that follows setups the dependencies between the tasks
"""

for table_config in get_tables():
export_script = gen_export_table_task(table_config)
import_script = gen_import_table_task(table_config)

export_script >> import_script

This job looks like this rendered:

If you are using a managed airflow instance you might need to install additional dependencies. From your GCP console go to Composer and pypi packages to install them.

Compared to the previous solution, this is a more efficient way of doing exports, since it will run the exports in parallell — not just one at a time. It is a bit more verbose, since the data is now json instead of csv, but temporary storage is not really a problem here.

It also handles big chunks of data more gracefully, since the airflow operators will split up files that are larger than what bigquery likes.

My example github repo has been updated with new files containing this simplified workflow: (v2 files)

Note, if you need to load from mssql, there is also an operator for this in apache airflow. It does not, however, handle schema generation correctly, so you will need to do your own (for now) by selecting from database.INFORMATION_SCHEMA.COLUMNS and make sure that float type is bigquery float type.

If you like this post feel free to clap :)

Edit: Please check out this post with a relevant alternative to composer

--

--