Implementing Trigger Dag Runs on Airflow and Cloud Composer GCP
Background Story
There is a small story for why we implemented Trigger Dag Runs by API rather than using Airflow Dag schedule in airflow or even other data pipelines that specialized in data streaming such as dataflow. We currently have a client that requested to get the data ingested as soon as possible but here is the problem, the one who uploads the data to us is not uploading the data in fixed time so it can be random between 9 AM and 4 PM. At first, we considered just using dataflow and treating it as data streaming but for quite small data it feels like wasting money to implement dataflow just for this data. That’s why we think it’s better to Trigger the Dag Runs by API when the data is received from the client, for the EDTS case it’s when data is received in Google Cloud Storage.
Purpose and Scope
In this article, I will try to explain how to implement API on Airflow and only how to Trigger the Dag Runs, though you can just change the URL to trigger another type of API if you want (click here for documentation).
I will also explain how to implement it in Cloud Composer 2 (Airflow of Google Cloud Platform/GCP).
Implementation of API on Airflow
Implementing Airflow on local or docker Airflow is relatively easy as you only need to do a few things.
Here are the steps:
1. Enable API Authentication
- Go through Admin -> Configurations
- Make sure in “auth_backends” the value has basic_auth as one of the values.
auth_backends : airflow.api.auth.backend.basic_auth
- If basic_auth is not in auth_backends then you need to add it in the docker-compose.yaml.
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
2. Create a new Role
Although you can use the default Role, it’s better to create a new Role specified for API so you can limit the permission it has.
We only want to trigger a dag run in the API. These values below are the only permissions you need to add (For other types of API you need to add the permissions it needs).
can read on DAG Runs
can create on DAG Runs
can edit on DAGs
3. Create a new User
Now we need to make the user. Below is an example.
4. Make the Python Script
After we have created the user, we need to make the Python Script. Below is an example of a simple script to run API Trigger Dag Runs. In the example, there’s a Dag named “placeholder_dag” for testing purposes.
import requests
username = 'test_trigger_api'
password = 'test_trigger_api'
dag_name = 'placeholder_dag'
url = f"http://localhost:8080/api/v1/dags/{dag_name}/dagRuns"
data = {
"conf": {}
}
r = requests.post(url,
auth=(username, password),
json=data
)
if r.status_code == 200:
print('Trigger Success!')
else:
print('Trigger Fail')
print('Error : ' + str(r.content))
5. Run the Python Script
Now you only need to run the script.
Implementation of API on GCP (Google Cloud Platform) Composer
We have implemented the API on local Airlow but how about in Cloud Composer 2? The process is a bit more complicated than implementing the API on Local Airflow.
Here are the steps:
1. Enable Cloud Composer API
- Go through APIs & Services
- Click ENABLE APIS AND SERVICES
- Search for Cloud Composer API
- Make sure the API is enabled
2. Enable API Authentication
In Cloud Composer it’s enabled by default but to make sure you can access Admin → Configurations in the Airflow web UI and ensure the value is like below.
auth_backend : airflow.composer.api.backend.composer_auth
3. Create a service account
Service Account is used as an authentic account to access Google Cloud Platform services. Making a new service account for triggering Cloud Composer API is recommended rather than using the default service account.
- Access Service Accounts
- Create service account
4. Create a new Role
Pretty much the same step as before in “Implementation of API on Airflow”.
can read on DAG Runs
can create on DAG Runs
can edit on DAGs
5. Create a new User
Also the same step as before in “Implementation of API on Airflow”. Although there are some differences.
On User Name use “accounts.google.com:OAuth 2 Client ID” from the service account. For Example:
- Copy the OAuth 2 Client ID from the Service Accounts pages.
- So the User Name is “accounts.google.com:101100294360511164408”.
The result should be something like this.
6. Make the Python Script in the Cloud Function
Here are the examples of triggering the Dag Runs from Python in Cloud Function.
- Create the Cloud Function
- Fill in the rest except the service account and authentification. For the service account pick the service account you created before meanwhile for the authentication for testing purposes I’m using “Allow unauthenticated invocations” but for production, it’s recommended to use “Require authentification” to not make the cloud function public.
- Change the Runtime and Entrypoint to this.
- Fill in the code and requirements. The code is taken from official Cloud Composer documentation, just change the dag_id and the webserver URL that you use.
from __future__ import annotations
from typing import Any
import functions_framework
import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests
# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])
def make_composer2_web_server_request(
url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
"""
Make a request to Cloud Composer 2 environment's web server.
Args:
url: The URL to fetch.
method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
"""
authed_session = AuthorizedSession(CREDENTIALS)
# Set the default timeout, if missing
if "timeout" not in kwargs:
kwargs["timeout"] = 90
return authed_session.request(method, url, **kwargs)
def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
"""
Make a request to trigger a dag using the stable Airflow 2 REST API.
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Args:
web_server_url: The URL of the Airflow 2 web server.
dag_id: The DAG ID.
data: Additional configuration parameters for the DAG run (json).
"""
endpoint = f"api/v1/dags/{dag_id}/dagRuns"
request_url = f"{web_server_url}/{endpoint}"
json_data = {"conf": data}
response = make_composer2_web_server_request(
request_url, method="POST", json=json_data
)
if response.status_code == 403:
raise requests.HTTPError(
"You do not have a permission to perform this operation. "
"Check Airflow RBAC roles for your account."
f"{response.headers} / {response.text}"
)
elif response.status_code != 200:
response.raise_for_status()
else:
return response.text
@functions_framework.http
def main(request):
# TODO(developer): replace with your values
dag_id = "" # Replace with the ID of the DAG that you want to run.
dag_config = {}
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
)
response_text = trigger_dag(
web_server_url=web_server_url, dag_id=dag_id, data=dag_config
)
print(response_text)
return response_text
functions-framework==3.*
google-auth
requests
7. Run the script
To test the script just trigger the Cloud Function URL. In my case, I’m using Postman to trigger it.
Conclusions and Improvement
Implementing API in Airflow is relatively easy. In general, all you have to do is:
- Ensure API Authentification is enabled in Airflow Configurations
- Create a new Role and a User to Trigger the API
- Make the script to Trigger the API
The improvement I would make is making each Role for each related API available in Airflow API for example API about Dags Runs and Dag are different and making restrictions on the script what the script can trigger in the API.