Using Airflow to dynamically schedule workflows

Tomasz Posluszny
2 min readAug 20, 2020

--

Apache Airflow is a platform for scalable workflow scheduling and execution with detailed monitoring and management. In this article I will describe its specific use case: dynamically scheduling API requests to enrich your data.

Issue

We need to update economic event data in our database as soon as its data is released. We do not have any subscription API available, we only have a REST API and we know the specific date at which event’s data should be released.

We store upcoming economic events in our database and we want to dynamically schedule workflows to pull additional data at specified dates. The dates are stored in a table (actually a MongoDB collection).

We also want to be able to monitor workflows execution and easily retry failed ones.

Airflow to the rescue

Now let’s discuss how Airflow works and how it can help us achieve what we want.

Airflow consists of 3 separate processes:

  1. Web server — it serves user interface to create, manage and monitor workflows
  2. Scheduler — it schedules workflows
  3. Worker — it runs workflow tasks

The processes communicate with each other via a metadata database.

How do we add a workflow?

Workflows are generated from Python source files containing DAG definition. DAG is an acronym for Directed Acyclic Graph. This name describes very well how tasks can be connected with each other in a workflow.

Implementation

First we have created a workflow, which pulls upcoming events and stores them in an Airflow variable. In another Python file one DAG per economic event is created and scheduled to run once at desired start_date — the economic data release date.

The DAG consists of 3 tasks, each of them represents a part of the ETL process:

  1. Extraction of an event data from REST API endpoint
  2. Transformation of the response
  3. Loading the enriched event object into database
DAG for getting economic data
DAG view in Airflow UI

The full DAG definition file code is available in a Gist below.

Conclusion

Now we have an efficient mechanism for updating economic data. If there are any issues with specific events, we can easily find related DAG by event ID and check what went wrong. Then re-trigger the idempotent workflow.

Sources

--

--

Tomasz Posluszny

Passionate Software Engineer, enjoys learning new technologies and sharing his knowledge. Data Engineer | Back-End Developer | Chromium Expert