Using Airflow Experimental Rest API on Google Cloud Platform: Cloud Composer and IAP

Jacob Ferriero
Aug 9, 2018 · 9 min read

Whenever You Say So

So you’ve started using Apache Airflow and you’re hooked or you’re just looking into how Airflow can help you BUT your requirements don’t quite fit into the we want to migrate our cron scheduling or the just do it whenever a file land in the bucket. Alas, you have a need for your DAG to run whenever you say so. Well, have I got news for you! Airflow has an experimental REST API which you can use to trigger DAGs. So when your upstream system finishes something, you can invoke a simple python script to handle authentication with the Google Identity Aware Proxy and make a HTTP request to your Airflow endpoint. This article will walk you through setting up a Cloud Composer environment to do just this!

What you’ll build

So grab the latest version of the Google Cloud SDK and let’s use Google Cloud Composer to automate the transform and load steps of an ETL data pipeline! The pipeline will create a Dataproc cluster, perform transformations on extracted data (via a PySpark job), and then upload the results to BigQuery. You’ll then trigger this pipeline by authenticating with Google Identity Aware Proxy (IAP) and posting to the Airflow endpoint for your DAG.

Architecture Diagram for using HTTP POST to an endpoint to trigger an Airlfow DAG that automates spins up/tear down of a Dataproc cluster to run a Spark job to enhance data and writes the enhanced data to BigQuery.

Getting Set Up

Project Setup

Open the Cloud Shell from your Google Cloud Platform Console page:

Check out the code from the Google Cloud Platform Professional Services Github.

Set a variable equal to your project ID for convenience:

Enable Google Cloud Platform APIs

Enable BigQuery, Compute Engine, Dataproc, Composer and Google Cloud Storage APIs if not already enabled using this helper link. Sip your coffee, this will take a few minutes.

Create Cloud Storage Bucket

Use the make bucket command to create a new regional bucket in us-central1 within your project.

Create the BigQuery Dataset

Create a dataset in BigQuery. This is where all of your tables will be loaded within BigQuery.

Export the Data

In this tutorial, you will use a BigQuery public table dump as your upstream data source. You will be using the New York City Yellow Cab data. Follow this link to take a learn more about the data and then export the table as newline delimited JSON format to the path with the following gcloud command. You will timestamp the data to avoid collision. Note, that the dag_trigger.py, later in this lab, is dependent on this $EXPORT_TIMESTAMP bash environment variable.

Next, copy the prepared schema file for your enhanced data from this public Cloud Storage bucket:

Create a Composer Environment

Create a Cloud Composer Environment for this DAG. This will spin up the necessary compute resources to host your DAG and install the necessary software.

In the Cloud Shell create the environment with the gcloud command. Note the env-variables takes a list of Variables that will be available to all DAGs in this environment. Sip your coffee again, this will also take a few minutes.

Note the Composer DAG folder

Click the DAG folder icon for your new composer environment. Note the bucket name.(Something like: us-central1-demo-ephemeral — ********-bucket)

Preparing IAP Authentication

The endpoint to trigger your DAG will sit behind an Identity Aware Proxy and will need to be called using a service account.

Get the latest Python code for making IAP requests

First, you need to get the latest python script for making IAP requests and install it’s requirements.

Create a Service Account for POST Trigger

You need to create a service account facilitate triggering your DAG by a POST to an endpoint.

Triggering the DAG

Python Setup

First, you will do a bit of setup for the required python libraries.

Getting Airflow web server URL

Next, you need to find the url of your Airflow API for this DAG and the client id for airflow.

In the console use the hamburger stack to navigate to Cloud Composer. You should see this:

Click on the name of your environment and copy the url of the web server into a note you will need it later.

Getting Airflow Client ID

Visit the Airflow URL https://YOUR_UNIQUE_ID.appspot.com (which you noted in the last step) in an incognito window, don’t login. At this first landing page for IAP Auth has client id in the url in the address bar:

Constructing Endpoint URL and Triggering the DAG

The endpoint of triggering the DAG had the following structure:

https://<airflow web server url>/api/experimental/dags/<dag-id>/dag_runs

Some Notes on the DAG Code

The spark job in this example is really just a placeholder. However, the DAG is defined in ephemeral_dataproc_spark_dag.py and is the main interest of this post. The two important things to understand about this DAG are:

  1. The schedule_interval property is set to None so that this DAG only runs when there is a post to the endpoint.
  2. Our DAG reads the contents of the HTTP POST payload through the conf property of the dag_run object created each time the DAG runs. Note that there should also be a unique run_id in the POST.

Staging the code in GCS

After reviewing the code, you need to upload our DAG into the DAG folder the Cloud Composer created for us and upload our spark job source code to Google Cloud Storage. Sip your coffee again, it takes Cloud Composer 5 minutes to process your DAG and cascade the changes into the Airflow environment.

Trigger the endpoint using a convenience Python script

The code in dag_trigger.py will construct and HTTP POST request containing the Google Cloud Storage path to the new files you wish to enhance. Our DAG can read the contents of the post message using the conf property of the dag_run object.

Observing Airflow do it’s thing

Navigate to the Airflow UI

You will land on the DAGs page. In the row for average-speed click on “Graph View” icon.

Here you can view the structure of your DAG and the state of each operator in your workflow.

Navigate to Browse > Task Instances

Here you can see a history of the Task Instances that have run in this environment. By clicking in a task instance, you can dig into the associated metadata, logs, rendered templates, and XComs recorded during the execution of this Task Instance. This is useful when debugging a DAG.

Monitor your GCP Resources from the Console

It is more than mildly entertaining to sit back and watch these resources get created and destroyed while the DAG is executing.

First, navigate to the Dataproc Clusters and watch the Cluster get created. (90 seconds)

Next, hop over to the Dataproc Jobs to see the DAG submit the PySpark job to add an average_speed field to the data and convert to CSV. By clicking on your job your can monitor the logs. (40 mins)

Navigate to the Google Cloud Storage chose your project bucket and the to see the enhanced data accumulate in a timestamped GCS folder.

Next, you can observe that the DAG clean up the enhanced files in GCS, (10 seconds)

Then navigate to the BigQuery console and see that the enhanced data was written to a BigQuery table and navigate back to Dataproc Clusters and watch the cluster be torn down. (2 min)

Clean Up

While the Dataproc cluster is torn down for you by the DAG, the other resources will live on draining your free credits or worse charging your credit card. To avoid this scenario be sure to delete the Cloud Storage Bucket, Composer Environment and BigQuery dataset you created during this tutorial.

Closing Remarks

When you’re starting out using Airflow there’s a lot to consider and Composer simplifies a lot of that for you. With just a few modifications to this very simple DAG you could migrate your existing Spark jobs to Dataproc and only pay for the compute while that job is running and only run it when you submit a HTTP POST request to the airflow endpoint.

Security Considerations

I would be remiss not to mention that Composer sets up the Airflow UI and REST API endpoints on a public URL. While the Google Identity Aware Proxy is a robust authentication method, this may not be in line with your company’s security protocols.

Google Cloud - Community

A collection of technical articles published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Jacob Ferriero

Written by

Data Engineer @Google

Google Cloud - Community

A collection of technical articles published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade