Primal Data Advent Calendar #6: Triggering Airflow jobs via its REST API

Marek Šuppa
Slido developers blog
5 min readDec 13, 2020

This piece is a part of the Primal Data Advent Calendar, a series of miscellaneous articles written by the Slido Data Team. We release one on each Advent day that happens to be a prime (like 13 on the 13th of December). Enjoy!

If you use Airflow as a job orchestrator, there is a good chance your DAGs, along with your custom operators and other glue code, are checked in some source code version control system. If I had to bet, I’d say you are using Git, but Mercurial, Fossil, Bazaar or SVN/CVS would also work.

It is also quite probable the repository where you store all this code is connected with some Continuous Integration (CI) system which runs like tests and/or linters to ensure your codebase stays in good shape.

The obvious next step is to push it up a notch and establish a Continuous Delivery (CD) pipeline — something that would ensure the code Airflow uses to execute your DAGs changes when you update your trunk (say the master or main branch). There exist documented ways of doing that but what if the Airflow instance is completely “air-gapped”? Is Continuous Delivery even possible in these situations?

Well, we ended up in a similar situation some time ago and while out solution sounds rather convoluted, it was quite straightforward to implement: we use Airflow to update Airflow by triggering a DAG via Airflow’s REST API.

Let’s see how it works.

Our setup and the catch

At Slido, we run Airflow on AWS in one of the simplest ways possible — as a task in an Fargate container on ECS. For various security reasons, its VPC cannot be reached from the Internet, and can only be accessed from within our internal VPN.

With regards to the actual Docker container that is running on ECS, there is not too much to talk about. If you’d like to set one up yourself, the great puckel repo is probably still the best place to start from, despite the fact that it has not been updated for quite a while.

The code repository that contains our DAGs and all the other “glue code” is hosted on Bitbucket.org and so we naturally use Bitbucket Pipelines as a platform for what one would call Continuous Integration.

Combining these two pieces (Airflow in specific VPC on AWS and code in Bitbucket) together, it seems we have arrived at an impasse. How do we affect what code is running in Airflow from Bitbucket Pipelines? Especially since Bitbucket Pipelines is running on the public Internet and Airflow’s VPC was specifically designed not to be reachable from there?

After exploring a ton of crazy ideas (like changing a file on S3 and picking the change up with an Airflow Sensor), we arrived at a rather straightforward solution: use AWS Lambda as a proxy to reach Airflow’s REST API, which can trigger a DAG, which will in turn ensure Airflow’s container has the latest changes (by executing something on the order of git pull).

Airflow REST API

Although not that well known, Airflow REST API is one of the nice experimental features of the newer Airflow versions. Previously, you needed to install a specific Airflow Plugin to get such functionality but now (tested with version 1.10.13) you get it out of the box with the standard Airflow installation.

So what can this REST API do?

Judging from the REST API reference on Airflow docs, it can mostly return information on Airflow DAGs, their runs, the tasks that can be found in these runs and couple other interesting pieces of information. Among the few non-read-only operations it can do is triggering of a DAG run.

It’s actually really simple. Given an ID of a DAG to run, all we need to do is to send a POST request against our Airflow instance:

curl -X POST \
http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"run_id": "api_triggered_run"}'

This will create a new DAG run in Airflow’s database, and one of its workers will pick it up and start executing its tasks on the next “heartbeat” run.

All that’s left is wrapping this curl call into a Lambda we can then execute from anywhere we can access AWS.

Calling Airflow REST API from AWS Lambda

There are various options for writing AWS Lambda “serverless” code. As its support includes Bash, we could just take the curl command from above and be done with it. But since we may want to extend this Lambda in the future and because we’ve already had some nice experience with writing Lambdas in Python, we’ve opted for doing the same thing in this case as well.

Possibly the easiest and most straightforward way to do a POST request from Python is to use the requests library. Thanks to its well thought out design, we could reduce the whole Lambda to just about three lines of code.

But then again, we would need to install and pack it as part of the Lambda package. If we’d like to keep the size of the Lambda package to a minimum, we can use the urllib3 library, which is already built in.

Here is what it may look like:

As you can see, the code is rather straightforward. On line 27 we took the liberty of giving any DAG runs that are created this way the lambda_run prefix and appending the date and time of its creation to its name. Put together, the full DAG run ID may look as follows:lambda_run_2020–12–13T16:11:55.360221

If you happen to be in a similar situation than we are, altering this script to your circumstances should be very simple — just update line 7 to the URL, at which your Airflow instance is running and line 8 to the ID of the DAG you would like to run. As we mentioned before, the DAG we use ensures the DAGs folder in the Docker container is the same as in the repository (by running the equivalent of git pull).

So to sum it up, here is the whole flow:

  1. Airflow DAG code gets changed in the repository
  2. CI Pipeline with tests and linters gets executed
  3. If everything passes, the Lambda above is triggered
  4. The Lambda triggers a DAG that ensures Airflow works with code that is up to date

Conclusion

Data engineering tends to get a bad reputation for not being on par with software engineering when it comes to standard engineering practices. But thanks to the use of an orchestrator like Ariflow we can rest assured that pipelines gets executed at scheduled times, that their execution can be re-tried and if that does not help, at least we can get notified about that. And thanks to Airflow’s REST API and AWS Lambda, the pipelines can be kept up to date via a CI job, even if Airflow itself is only reachable from internal networks. All in all, it seems like that bad reputation may end up being quite short-lived.

--

--