Handling API Errors with Airflow

Nathan Lim
StashAway Engineering
6 min readJun 25, 2019

Airflow is a great tool to handle cron jobs. The many operators available out of the box and the extendability with your own custom plugin makes it super versatile for all sorts of scenario. Furthermore, being written in Python makes it really easy for many developers to pick it up. A simple cron job that is to be triggered daily can be written in a few lines of codes.

Simple DAG for airflow

But sometimes the cron job can get a little more complex. Recently at StashAway, we had to work with a third party API provider to handle a batch money collection job. A few examples of failures that could be encountered are:

  1. When the customer has entered a wrong account information
  2. When there is insufficient money in the account
  3. The collection is not allowed during weekends/public holidays
    and etc.

Furthermore, as long as one customer in the collection causes an exception in the API call, the whole batch job will fail. So how do we have a solution that allows us to fail gracefully and retry but at the same time allows the admin to have a good visualisation on what had happened? Since we are already using airflow extensively for many of our services, we thought to look into how we can use airflow to solve this problem. And here is the final DAG that we came up with.

Full DAG for handling money collection

The advantage of using airflow here is that it allows us to visualise every step of the money collection process. If there are failures, an admin can easily come into the UI and check what has happened through the airflow logs. Furthermore, if the third party API has any sudden changes that break our code, we can easily edit the codes for the airflow DAGs and just restart the DAG from the point of failure.

In this blog post, I won't be going through the whole code for this entire DAG, but instead just picking out the most important concepts that enable me to build this DAG. We will be learning about XCOM, PythonOperators as well as TriggerDagOperator.

The final product you will build will consist of 2 DAGs looking like this:

Finished Process Users Dag
Finished Cron DAG

ProcessUsersDag

The first DAG will receive a list of user ids to collect money from. It will perform an API call and when the API returns an error with a list of user ids that are causing an error, it will also need to remove the user ids that are causing an error and try the API call again. As we will want to be able to view all the errors that occurred in the airflow UI, we have chosen to make use of the TriggerDagRunOperator to trigger a new DAG run once we encountered a failure.

Firstly, we will need to be able to trigger this DAG with a custom list of user ids. We will make use of the DAG Run Configuration parameter. This will allow us to provide parameters that can be retrieved throughout the whole DAG run. This provides a few advantages:

  1. Admin will be able to do an API call to this DAG with a set of UUIDs to manually collect them
  2. We can write a separate DAG that provides UUIDs to this DAG to make a collection on.
  3. When the API encounters an error, the task that runs after the API call will be able to obtain from the DAG run configuration the list of UUIDs that was originally sent to the API call.

Our first task in the DAG will be to retrieve the list of UUIDs from the dag run configuration and then submit a call to the API. We can make use of airflow macros however, it is better to validate the UUIDs received before making a call to the API. A solution we came up with is to merge the SimpleHttpOperator and PythonOperator into a single operator called the ExtendedHttpOperator. This new operator will allow us to pass in a python function to generate the post body passed to the SimpleHttpOperator.

In order to make this plugin available to our DAG, we need to add in a __init__.py that extends AirflowPlugin.

Finally, we import it in our DAG to use it.

This extended HTTP operator allows us to write a python function to retrieve data from the context. Other than getting the DAG run configuration, we can also use it to get anything that the airflow context provides such as xcom, execution date etc. We can even manipulate the data first before attempting the API call.

Once we receive the API response, we can make use of the BranchPythonOperator to decide what tasks to move to based on the result of the API call. We will use xcom to pull out the API response from the task process_user_ids.

If it is successful, we can direct it to the task success. However, if the response tells us that some user ids causes an error, we can direct it to the handle_api_error task.

The handle_api_error task will remove the problematic user ids and then trigger a new DAG run with a list of users that are error free. We will make use of the TriggerDagRunOperator here which allows you to trigger a new DAG run in a DAG. It requires the parameters task_id, trigger_dag_id, as well as a python_callable param that needs to return a DagRunOrder object. By providing a new run_id and payload to the DagRunOrder, it gives us the ability to control the new DAG run with new name and data.

For example, our DAG run started with a set of user ids, but when we received the list of user ids with error, we can remove them and create a new DAG run with a completely new set of user IDs by passing the new list into the DagRunOrder object.

We will finish up the first DAG by chaining up the tasks using the following code

CronJobDag

The second DAG is a cron job that runs daily to pull us a list of users IDs to process. It will make an API call to retrieve a list of user ids then trigger our ProcessUsersDag with the list of user ids.

Let us assume we have an API call that returns us a list of user ids, we can obtain this list of user IDs using a SimpleHttpOperator

With this list of user ids, we will trigger the collection DAG using the TriggerDagRunOperator

Finally, we will chain up the tasks with this line of code

Running the DAG

With the codes finished, you should be able to start up the airflow and trigger the DAG to make sure everything is working. If you need a mock API endpoint or need help setting up the server, you can refer to the readme and view the full source code at my repository here: Link to Source Code

Summary

Airflow is most definitely a powerful and versatile tool. Even though it is not built for the purpose of handling API errors, the easy to read the source code and customisability allow us to use it for such. Of course, there are also other tools out there that may be more suitable for this job. But as with many teams, there are limited manpower and resources and sometimes it is better to not keep adding new tools/services which requires new expertise. Hopefully, next time when you encounter a new issue in your projects, you may also think of creative ways to use Airflow to solve them too!

We are constantly on the lookout for great tech talent to join our engineering team — visit our website to learn more and feel free to reach out to us!

--

--

Nathan Lim
StashAway Engineering

Full Stack Developer SpecialProjectsTeam@StashAway Currently grinding levels in Kotlin+Springboot. Proficient with Ruby+Rails, Python+Airflow,JS+React,RN