Apache Airflow: How to add a connection to Google Cloud with CLI

In order to connect to google cloud platform from airflow operators, we need to register a connection for that. Although of course we can add one for GCP via the airflow web UI, the CLI of airflow-1.8.2rc1 or less doesn’t work with a connection whose type doesn’t follow the RFC of URL schema name rule.

Today, I would like to describe how to add a connection to google cloud platform with airflow as a workaround.

tl;dr

Motivation

Nowadays, it is quite natural to manage a system environment with container technology like docker and kubernetes. In such a case, it is one of the most important key points how to add a connection when building a docker image with the airflow CLI like the below:

Airflow CLI of course allows us to add a connection, but airflow-1.8.2rc1 or less doesn’t support connection types which don’t follow the RFC of URL schema naming rule. That’s why we need a workaround to add a connection to google cloud platform.

Scheme names consist of a sequence of characters. The lower case
letters “a” — “z”, digits, and the characters plus (“+”), period
(“.”), and hyphen (“-”) are allowed. For resiliency, programs
interpreting URLs should treat upper case letters as equivalent to
lower case in scheme names (e.g., allow “HTTP” as well as “http”).
Cited by https://www.ietf.org/rfc/rfc1738.txt
# It works
airflow connections -add --conn_id mysql_conn --conn_url mysql://mysql-host:3306
# It doesn't work
airflow connections -add --conn_id google_cloud_default --conn_url google_cloud_platform://xxxxx

How can we add connections using internal API?

Digging in the airflow CLI to add a connection, it means airflow connections -add , I understood that we can add a connection into the metadata DB of airflow directly.

  1. Locate a service account credentials JSON appropriately at /var/local/google_cloud_default.json beforehand. Feel free to modify to your case.
  2. Create a python function to add a connection to GCP like the below, where the connection id is google_cloud_platform . You have to replace the GCP project ID with yours.
def add_gcp_connection(ds, **kwargs):
""""Add a airflow connection for GCP"""
new_conn = Connection(
conn_id=get_default_google_cloud_connection_id(),
conn_type='google_cloud_platform',
)
scopes = [
"https://www.googleapis.com/auth/pubsub",
"https://www.googleapis.com/auth/datastore",
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/logging.write",
"https://www.googleapis.com/auth/cloud-platform",
]
conn_extra = {
"extra__google_cloud_platform__scope": ",".join(scopes),
"extra__google_cloud_platform__project": "your-gcp-project",
"extra__google_cloud_platform__key_path": '/var/local/google_cloud_default.json'
}
conn_extra_json = json.dumps(conn_extra)
new_conn.set_extra(conn_extra_json)

session = settings.Session()
if not (
session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
else:
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
msg = msg.format(conn_id=new_conn.conn_id)
print(
msg)

How to make a airflow DAG

We have created a python function to add a connection to google cloud platform, then we will make a airflow DAG. The reason why we need to call the function via a DAG is that setting up configurations to connect the airflow metadata DB is a little complicated. I think calling it from a DAG is the easiest way to use the configuration.

In the DAG, All we have to do is to make a task with PythonOperator to call the function to add a connection. Since the DAG should not be a scheduled job, we should set schedule_interval to @once or None .

dag = DAG(
'add_gcp_connection',
default_args=default_args,
schedule_interval="@once")

# Task to add a connection
t1 = PythonOperator(
dag=dag,
task_id='add_gcp_connection_python',
python_callable=add_gcp_connection,
provide_context=True,
)

How to add a connection when provisioning airflow

As you know, airflow allows us to run a specific task with the airflow CLI. Embedding the following command, for example in your Dockerfile , you can add a connection whose type is not allowed to do with the airflow CLI.

airflow run add_gcp_connection add_gcp_connection_python 2001-01-01

Future works

We airflow community are addressing the issue: https://issues.apache.org/jira/browse/AIRFLOW-1330 .I believe we will release the feature as airflow-1.9. I hope you look forward to it!

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.