Using Airflow Experimental Rest API on Google Cloud Platform: Cloud Composer and IAP
Whenever You Say So
So you’ve started using Apache Airflow and you’re hooked or you’re just looking into how Airflow can help you BUT your requirements don’t quite fit into the we want to migrate our cron scheduling or the just do it whenever a file land in the bucket. Alas, you have a need for your DAG to run whenever you say so. Well, have I got news for you! Airflow has an experimental REST API which you can use to trigger DAGs. So when your upstream system finishes something, you can invoke a simple python script to handle authentication with the Google Identity Aware Proxy and make a HTTP request to your Airflow endpoint. This article will walk you through setting up a Cloud Composer environment to do just this!
What you’ll build
So grab the latest version of the Google Cloud SDK and let’s use Google Cloud Composer to automate the transform and load steps of an ETL data pipeline! The pipeline will create a Dataproc cluster, perform transformations on extracted data (via a PySpark job), and then upload the results to BigQuery. You’ll then trigger this pipeline by authenticating with Google Identity Aware Proxy (IAP) and posting to the Airflow endpoint for your DAG.
Getting Set Up
Project Setup
Open the Cloud Shell from your Google Cloud Platform Console page:
Check out the code from the Google Cloud Platform Professional Services Github.
# Contains source code for pyspark job and DAG.
git clone https://github.com/GoogleCloudPlatform/professional-services.git ~/professional-services
Set a variable equal to your project ID for convenience:
export PROJECT=<REPLACE-THIS-WITH-YOUR-PROJECT-ID>
gcloud config set project $PROJECT
Enable Google Cloud Platform APIs
Enable BigQuery, Compute Engine, Dataproc, Composer and Google Cloud Storage APIs if not already enabled using this helper link. Sip your coffee, this will take a few minutes.
Create Cloud Storage Bucket
Use the make bucket command to create a new regional bucket in us-central1 within your project.
gsutil mb -c regional -l us-central1 gs://$PROJECT
gsutil mb -c regional -l us-central1 gs://$PROJECT-dataproc-staging
Create the BigQuery Dataset
Create a dataset in BigQuery. This is where all of your tables will be loaded within BigQuery.
bq mk ComposerDemo
Export the Data
In this tutorial, you will use a BigQuery public table dump as your upstream data source. You will be using the New York City Yellow Cab data. Follow this link to take a learn more about the data and then export the table as newline delimited JSON format to the path with the following gcloud command. You will timestamp the data to avoid collision. Note, that the dag_trigger.py, later in this lab, is dependent on this $EXPORT_TIMESTAMP bash environment variable.
export EXPORT_TS=`date “+%Y-%m-%dT%H%M%S”`&& bq extract \
--destination_format=NEWLINE_DELIMITED_JSON \
nyc-tlc:yellow.trips \
gs://$PROJECT/cloud-composer-lab/new-$EXPORT_TS/nyc-tlc-yellow-*.json
Next, copy the prepared schema file for your enhanced data from this public Cloud Storage bucket:
gsutil cp gs://python-dataflow-example/schemas/nyc-tlc-yellow.json gs://$PROJECT/schemas/nyc-tlc-yellow.json
Create a Composer Environment
Create a Cloud Composer Environment for this DAG. This will spin up the necessary compute resources to host your DAG and install the necessary software.
In the Cloud Shell create the environment with the gcloud command. Note the env-variables takes a list of Variables that will be available to all DAGs in this environment. Sip your coffee again, this will also take a few minutes.
gcloud composer environments create demo-ephemeral-dataproc1 \
--location us-central1 \
--zone us-central1-b \
--machine-type n1-standard-2 \
--disk-size 20 # Set Airflow Variables in the Composer Environment we just created.
gcloud composer environments run \
demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set gcp_project $PROJECTgcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set gce_zone us-central1-bgcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set gcs_bucket $PROJECTgcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set bq_output_table $PROJECT:ComposerDemo.nyc_tlc_yellow_tripsgcloud composer environments run demo-ephemeral-dataproc1 \
--location=us-central1 variables -- \
--set dataproc_bucket $PROJECT-dataproc-staging
Note the Composer DAG folder
Click the DAG folder icon for your new composer environment. Note the bucket name.(Something like: us-central1-demo-ephemeral — ********-bucket)
Preparing IAP Authentication
The endpoint to trigger your DAG will sit behind an Identity Aware Proxy and will need to be called using a service account.
Get the latest Python code for making IAP requests
First, you need to get the latest python script for making IAP requests and install it’s requirements.
# Install necessary requirements for making iap requests with
# dag_trigger.py
(curl https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/requirements.txt; echo 'tzlocal>=1.5.1') >> ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/iap_requirements.txt# Get latest version of make_iap_request from python-docs-samples.
curl https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/iap/make_iap_request.py >> ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/make_iap_request.py
Create a Service Account for POST Trigger
You need to create a service account facilitate triggering your DAG by a POST to an endpoint.
gcloud iam service-accounts create dag-trigger# Give service account permissions to create tokens for
# iap requests.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountTokenCreatorgcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/iam.serviceAccountActor# Service account also needs to be authorized to use Composer.
gcloud projects add-iam-policy-binding $PROJECT \
--member \
serviceAccount:dag-trigger@$PROJECT.iam.gserviceaccount.com \
--role roles/composer.user# We need a service account key to trigger the dag.
gcloud iam service-accounts keys create ~/$PROJECT-dag-trigger-key.json \
--iam-account=dag-trigger@$PROJECT.iam.gserviceaccount.comexport GOOGLE_APPLICATION_CREDENTIALS=~/$PROJECT-dag-trigger-key.json
Triggering the DAG
Python Setup
First, you will do a bit of setup for the required python libraries.
cd professional-services/examples/cloud-composer-example# Here you set up the python environment.
# Pip is a tool, similar to maven in the java world
pip install — upgrade virtualenv
pip install -U pip
virtualenv composer-env
source composer-env/bin/activate# By default one of Airflow's dependencies installs a GPL dependency
#(unidecode). To avoid this dependency set
# SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you
# install or upgrade Airflow.
export SLUGIFY_USES_TEXT_UNIDECODE=yes# Install requirements for this and other examples in
# cloud-composer-examples
pip install -r requirements.txt# Required for dag_trigger.py
pip install -r iap_requirements.txt# (Optional for testing spark code locally).
# pip install pyspark>=2.3.1
Getting Airflow web server URL
Next, you need to find the url of your Airflow API for this DAG and the client id for airflow.
In the console use the hamburger stack to navigate to Cloud Composer. You should see this:
Click on the name of your environment and copy the url of the web server into a note you will need it later.
Getting Airflow Client ID
Visit the Airflow URL https://YOUR_UNIQUE_ID.appspot.com (which you noted in the last step) in an incognito window, don’t login. At this first landing page for IAP Auth has client id in the url in the address bar:
curl https://raw.githubusercontent.com/GoogleCloudPlatform/python-docs-samples/master/composer/rest/get_client_id.py >> ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/get_client_id.pyCLIENT_ID=`python get_client_id.py $PROJECT us-central1 demo-ephemeral-dataproc`
Constructing Endpoint URL and Triggering the DAG
The endpoint of triggering the DAG had the following structure:
https://<airflow web server url>/api/experimental/dags/<dag-id>/dag_runs
Some Notes on the DAG Code
The spark job in this example is really just a placeholder. However, the DAG is defined in ephemeral_dataproc_spark_dag.py
and is the main interest of this post. The two important things to understand about this DAG are:
- The
schedule_interval
property is set toNone
so that this DAG only runs when there is a post to the endpoint. - Our DAG reads the contents of the HTTP POST payload through the
conf
property of thedag_run
object created each time the DAG runs. Note that there should also be a uniquerun_id
in the POST.
Staging the code in GCS
After reviewing the code, you need to upload our DAG into the DAG folder the Cloud Composer created for us and upload our spark job source code to Google Cloud Storage. Sip your coffee again, it takes Cloud Composer 5 minutes to process your DAG and cascade the changes into the Airflow environment.
gsutil cp ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/ephemeral_dataproc_spark_dag.py gs://<dag-folder>/dagsgsutil cp ~/professional-services/examples/cloud-composer-examples/composer_http_post_example/spark_avg_speed.py gs://$PROJECT/spark-jobs/
Trigger the endpoint using a convenience Python script
The code in dag_trigger.py
will construct and HTTP POST request containing the Google Cloud Storage path to the new files you wish to enhance. Our DAG can read the contents of the post message using the conf
property of the dag_run
object.
AIRFLOW_URI=`gcloud composer environments describe demo-ephemeral-dataproc --location us-central1 --format="get(config.airflowUri)"`python dag_trigger.py \
--url=https://${AIRFLOW_URI}/api/experimental/dags/average-speed/dag_runs \
--iapClientId=${CLIENT_ID} \
--raw_path=gs://$PROJECT/cloud-composer-lab/new-$EXPORT_TS
Observing Airflow do it’s thing
Navigate to the Airflow UI
You will land on the DAGs page. In the row for average-speed click on “Graph View” icon.
Here you can view the structure of your DAG and the state of each operator in your workflow.
Navigate to Browse > Task Instances
Here you can see a history of the Task Instances that have run in this environment. By clicking in a task instance, you can dig into the associated metadata, logs, rendered templates, and XComs recorded during the execution of this Task Instance. This is useful when debugging a DAG.
Monitor your GCP Resources from the Console
It is more than mildly entertaining to sit back and watch these resources get created and destroyed while the DAG is executing.
First, navigate to the Dataproc Clusters and watch the Cluster get created. (90 seconds)
Next, hop over to the Dataproc Jobs to see the DAG submit the PySpark job to add an average_speed field to the data and convert to CSV. By clicking on your job your can monitor the logs. (40 mins)
Navigate to the Google Cloud Storage chose your project bucket and the to see the enhanced data accumulate in a timestamped GCS folder.
Next, you can observe that the DAG clean up the enhanced files in GCS, (10 seconds)
Then navigate to the BigQuery console and see that the enhanced data was written to a BigQuery table and navigate back to Dataproc Clusters and watch the cluster be torn down. (2 min)
Clean Up
While the Dataproc cluster is torn down for you by the DAG, the other resources will live on draining your free credits or worse charging your credit card. To avoid this scenario be sure to delete the Cloud Storage Bucket, Composer Environment and BigQuery dataset you created during this tutorial.
yes -Y | gcloud composer environments delete <your-composer-environment-name>gsutil rm -r gs://$PROJECTbq rm -r -f -d $PROJECT:ComposerDemo
Closing Remarks
When you’re starting out using Airflow there’s a lot to consider and Composer simplifies a lot of that for you. With just a few modifications to this very simple DAG you could migrate your existing Spark jobs to Dataproc and only pay for the compute while that job is running and only run it when you submit a HTTP POST request to the airflow endpoint.
Security Considerations
I would be remiss not to mention that Composer sets up the Airflow UI and REST API endpoints on a public URL. While the Google Identity Aware Proxy is a robust authentication method, this may not be in line with your company’s security protocols.