Data’s Inferno: 7 Circles of Data Testing Hell with Airflow

TL; DR

Real data behaves in many unexpected ways that can break even the most well-engineered data pipelines. To catch as much of this weird behaviour as possible before users are affected, the ING Wholesale Banking Advanced Analytics team has created 7 layers of data testing that they use in their CI setup and Apache Airflow pipelines to stay in control of their data. The 7 layers are:

Image for post
Image for post
Original image courtesy of Columbia Spectator (http://spc.columbiaspectator.com/spectrum/2016/03/31/nine-circles-columbia-hell)
  1. DAG Integrity Tests; have your CI (Continuous Integration) check if you DAG is an actual DAG

We have ordered the 7 layers in order of complexity of implementing them, where Circle 1 is relatively easy to implement, and Circle 7 is more complex. Examples of 5 of these circles can be found at: https://github.com/danielvdende/data-testing-with-airflow.

We cannot make all 7 public, 4 and 5 are missing, as that would allow everyone to push to our git repository, and post to our Slack channel :-).

Circle 1: DAG integrity tests

DAG integrity tests are the first Circle of Data’s Inferno. The idea of this circle is to prevent any typos or mistakes in the definition of our Airflow DAG. It does this by performing a very simple check: is the DAG you have written a valid DAG? It verifies this by using Airflow’s own internal API to check if the DAG object is a correct, valid instance of the Airflow DAG model. These tests can be easily integrated into your CI pipeline, which enables you to automatically check a DAG’s validity every time you push to git. The power of this test is not only to catch typos. One of the easiest mistakes to make in Airflow is to accidentally define cycles. An example of a DAG with a cycle could be:

task_a = BashOperator(task_id='task_a', …)test_task_a = BashOperator(task_id='test_task_a', …)task_b = BashOperator(task_id='task_a', …)

The mistake here is the task_id value given to the final operator. This kind of mistake can easily happen with quick copy-pasting, and can be very annoying to debug. But the DAG integrity tests flag this issue before you deploy your code to your Airflow environment. So instead of seeing this in the Airflow UI:

Image for post
Image for post
A rather unspecific error message in the Airflow UI

You’ll see this error in your CI:

Image for post
Image for post
But in the CI tests, a much clearer message of what the problem is

How to use

Integrating this layer requires a number of steps (assuming you already have a CI environment set up):

  1. In your CI, setup a task/stage that has pytest, airflow, and coverage available
assert any(
isinstance(var, af_models.DAG)
for var in vars(module).values()
)

This code verifies that the file in which you claim to have defined a DAG actually contains a valid Airflow DAG.

Circle 2: Split Ingestion from Deployment

In an environment where you ingest data from multiple sources that end up in your project, you often have ETL specifically for your ingestion that can be logically separated from the ETL that goes into your project. At WBAA we do this by creating one Airflow DAG per data source and one DAG that does the ETL for our project. There were several reasons for us to do this:

  • If your ingestion fails, you do not want your project ETL to fail immediately. It might be perfectly fine that (one of) your data sources is not completely up-to-date. Your project can just use the data that was previously ingested correctly

How to use

This layer is fairly easy to implement, you’ll need to logically separate your ingestion and deployment. The way it is implemented at WBAA:

  1. Create a GIT repository per data source, containing the ETL for the ingestion, and one per project, containing the ETL for that specific project

We will give an example of an interface we defined that forces all the different systems in ING that do payment transactions into the same shape. At the end of the ETL of the ingestion of a payment transaction data source we enforce the following schema for the data:

  • payer account

Why is this a necessary? Because one source system could send things in a party, counterparty format where the +or -sign within the monetary amount column indicates which way the money went. Yet another could do the exact same but have a credit or debit column, you get the picture :-).

Circle 3: Data Tests

You can see a data test as a way to check if the logic you have just used in an ETL process has done what you expected it to do. It combines a sort of unit test for your ETL step with a more statistical test to see if the data still makes sense after processing.

As previously mentioned, at WBAA we have split our Airflow DAGs in those for ingestion and those for project ETL. We also make a distinction in what kind of data tests we want to run in these DAGs to make sure that after running the DAG it has done what we expect.

Examples of data tests for ingestion

  • Are there files available for ingestion?

The valid rows test is where we check if the customer ID’s that we got in a data source, actually reference a real record in our central customer database (referential integrity). You’d be surprised how often a simple test like this picks out weird records. Same with checking if after ingestion, you have more records. This simple test has let us know on several occasions that when we changed logic, we made a mistake because suddenly the number of records unexpectedly decreased.

Examples of project ETL data tests

Imagine you’re dealing with payment transaction data that has both information of companies and private individuals (PIs) and you need to perform an ETL step that aggregates these PIs in to one line per week for all payments from PIs to companies and vice versa, filters out PI to PI payments, and keeps company to company payments untouched.

An example of tests can then be:

  • Are the known private individuals filtered out?

In general, WBAA aims to have at least one data test after every ETL task in a DAG.

How to use

  1. Make sure your environment has both pytest and airflow available
pytest_cmd = """
export PYTHONPATH=/usr/spark/python/:/usr/spark/python/lib/\
py4j-0.10.4-src.zip:{spark_directory} &&\
export SPARK_HOME={spark_home} &&\
export ENVIRONMENT={environment} &&\
export MASTER=local &&\
cd /usr/local/airflow &&\
python -m pytest {directory}{script}
"""
test_union_transactions = BashOperator(
task_id='test_union_transactions',
bash_command=pytest_cmd.format(
environment=ENVIRONMENT,
directory=TESTS_DIRECTORY,
spark_directory=SPARK_DIRECTORY,
script='test_union_transactions.py',
spark_home=SPARK_HOME
),
dag=dag
)

Circle 4: Alerting

When things go wrong (and we assume that this will happen), it is important that we, as engineers, are notified, so that we can respond quickly. Chuck Norris helps us with this. Chuck Norris is a Slack ‘bot’ that posts to a designated Slack channel when a DAG fails, with some details on which DAG and which task in this DAG failed. For privacy reasons, we don’t post any more information on the error in Slack, but you could add this if your data is not privacy-sensitive. Chuck Norris allows us to avoid constantly checking a DAG’s progress. One danger is that you can become reliant on Chuck to notify you of failure, so if Chuck himself fails (for instance, because a firewall opening is closed), you would wrongfully assume all is well.

How to use

When we initially added Chuck Norris, Airflow’s Slack operator did not meet our requirements. We therefore use a simple BashOperator. Because we don’t want you good people posting into our Slack channel, we haven’t included this in the GitHub repository. You can, however, easily integrate this in your own code with the following steps:

  1. In the default_args object that you use to create your DAG object, add the following:
'on_failure_callback': send_slack_alert

2. Add the following function to your Airflowfile:

def send_slack_alert(context=None):
"""Send slack alert on failure to alert the team"""
payload_vars = {
'url': 'https://hooks.slack.com/services/<CREDENTIALS>',
'run_id': str(context['run_id']),
'task': str(context['task']),
'dag_name': str(context['dag'].dag_id)
}
error_message = """
{dag_name} Failure! Task failed: {task} Check log at: \
{run_id}
""".format(**payload_vars)

payload_vars['json'] = """
payload={{"channel":"#<YOUR_CHANNEL_HERE>","text":"{0}"}}
""".format(error_message)
slack_cmd = """
curl-X POST \
--data-urlencode '{json}' \
{url}
""".format(**payload_vars)
slack_alert = BashOperator(
task_id='slack_alert',
dag=dag,
bash_command=slack_cmd
)
slack_alert.execute(context)

Now, every time a task fails, the send_slack_alert() function is called, which can perform any logic you want (in this case, post a Slack message).

Circle 5: Git Enforcing

Git Enforcing to us means making sure that each day a process resets each DAG to the last verified version (i.e. the code on origin/master ). We call this ‘Nuclear git’, because it will blow away any unchecked code. In this way, we can safely guarantee what code is actually executed in our daily ingestion and deployment DAGs.

In our environment, we have a single Airflow installation, which runs all data pipelines for dev, tst, acc and prd. One of the main reasons for this is that it limits the access to the central infrastructure to a single place, which makes it easier to control. Moreover, for some of our circles (e.g. Circle 7), it is very important that our code runs on production data. This means, however, that in some cases code in our Airflow environment is undergoing active development. Even the best engineers are only human (well, most of them ;-) ), and thus can forget to checkout the correct branch at the end of the day. In order to prevent unexpected results (because of dangling, in-development code), we added Nuclear git to our environment. The assumption is that all DAGs are git repositories that reside on your Airflow environment.

How to use

Nuclear git is, in fact, an Airflow DAG, in our case scheduled to run every day. The code is very simple, a single BashOperator with the following code:

BEWARE! Run this code at your own peril!

# this will hard reset all repos to the version on master branch
# any local commits that have not been pushed yet will be lost.
echo "Resetting "${dir%/*}
git fetch
git checkout -f master
git reset --hard origin/master
git clean -df
git submodule update --recursive --force

A nice side-effect of nuclear git is that it enforces good version control behaviour. A single commit at the end of the day is not good enough, and is very easy to forget. Nuclear git punishes this behaviour, and makes developers think about their git usage more consciously.

Circle 6: Mock Pipeline Tests

One of the most difficult aspects of designing and implementing reliable data pipelines is the fact that when working with real data, there are two moving parts: the data (and its quality) and your code. In order to be able to reliably test your code, it’s very important to ensure that your code is the only ‘moving part’. This circle of tests fixes the data in place, allowing the tests to focus on the code. By fixing the data, you know what data goes in to the functions in your code, and you can verify that the output of the functions in your code matches what you expect. In a sense, we are treating our code as a black box, and verifying its behaviour given certain input data.

Image for post
Image for post

Implementing this layer is quite involved, and requires a good understanding and definition of your data. The usefulness of the tests here will very strongly depend on how well your fake data models the real data. Of course, as you learn more about your data and its characteristics, you can and should update your fake data.

How to use

We have implemented our fake data tests with PySpark, as almost all of our data processing logic leverages Spark. In order to use the fake data tests, you’ll need a couple of things (assuming you have a working CI environment):

  1. In your CI, setup a task/stage that has pytest, pyspark, and coverage available

Step 1: Define your fake data

PERSON =  [('name': 'Kim Yong Un', 'country': 'North Korea', 'iban': 'NK99NKBK0000000666'), …]TRANSACTIONS= [('iban': 'NK99NKBK0000000666', 'amount': 10 ), …]

Step 2: Run your function

filter_data(spark, "tst")

Step 3: Verify the output of the function

assert spark.sql("""
SELECT COUNT(*) ct
FROM filtered_data
WHERE iban = 'NK99NKBK0000000666'
""").first().ct == 0

Circle 7: DTAP

Splitting application back-ends and front-ends across multiple environments has been a best practice in software development for many years. Generally, four environments are defined: development, test, acceptance, and production. Each of these serves a distinct purpose. At WBAA, in order to deal with the reality of real data, we have extended this concept of multiple environments to our data. Normally, only the production environment of an application runs on production (i.e. real) data. In our case, however, we have created 4 distinct environments, in which real data is used in all. We do this, because real data always contains unexpected, surprising characteristics. We use 4 environments:

Image for post
Image for post
  • Development: contains a completely random, very small (0.0025% in our case) sample of your data. The data won’t make any sense, and as a consequence checking the development application using this data will not make any sense for a user. This environment’s purpose is to verify that the tasks in your data pipeline can be executed, and can complete with non-zero exit codes. Because the amount of data is very small, this pipeline will run very quickly compared to the production one.
  1. This environment contains the full dataset. In some cases, mistakes in processing logic will only come to light when running on the full dataset (for example, incorrect statistics).
  • Production: contains all production data. This is the environment your users will use. As such, it is very important that problems with the data are kept to a minimum. Of course, it is very difficult to achieve to never have any data issues in your application, but this circle of data testing hell (and the others) can help keeping the number of issues to a minimum.

Each of these environments is a separate git branch in our data pipeline repository. Each branch is separately checked out in our Airflow environment, so we have 4 branches of the same repository checked out in our Airflow environment, resulting in 4 DAGs. In order to define which environment a DAG represents, we add a simple text file in each checked out directory, identifying the environment. In order to keep manual work to a minimum, we have added automatic promotion functionality to our DTAP procedure. In short, an average workflow on a feature will be as follows:

  • One of our engineers works on the feature on a branch off the Development branch, this branch will only get merged into Development if runs through successfully.
Image for post
Image for post

How to use

Setting up your own DTAP is a rather involved process that can be tricky. However, it is extremely valuable to have in place, as a lot of issues are caught before your users notice. Moreover, due to the sampling of the data, the deployments on Development and Test give you a quick heads-up if something is horribly wrong. There are a number of steps necessary to successfully set up a DTAP environment:

  1. Make sure you have separate databases for each environment. In our case, that means a separate Hive database, a separate Druid datasource, and a separate PostgreSQL instance.
promote_branch_to_acc = BashOperator(
task_id='promote_branch_to_acc',
bash_command="""
cd {}/awesome_dag_acc/ &&
git checkout acceptance &&
git pull &&
git merge origin/test &&
git push &&
git submodule update --recursive --force
""".format(DAG_LOCATION),
dag=dag
)

# trigger acceptance.
trigger_next_environment_deploy = TriggerDagRunOperator(
task_id='trigger_next_environment_deploy',
python_callable=lambda context, dag_run: dag_run,
trigger_dag_id="awesome_dag_acc",
dag=dag
)

These are the basic steps required. In our GitHub repository, you can find an example. In this example, we have not included the code promotion, as it would have meant giving write access to the repository for everyone :-). If you spin up the Docker container provided, you’ll see the four branches checked out, with the triggering behaviour described above.

Wrapping Up

In this post, we’ve described our 7 Circles of Data Testing Hell; when to use each circle and also how to use it. Our goal was to present our setup, and to help others in setting up their data tests. You don’t have to apply every circle if you don’t need/want to, the 7 Circles should be seen as an à la carte menu, where you can pick and choose the circles that are applicable/useful in your situation. Once again, you can find an example of the circles here: https://github.com/danielvdende/data-testing-with-airflow

If you have any questions, remarks, or just want to chat about any of this, please get in touch :-).

Daniel van der Ende & John Müller
ING Wholesale Banking Advanced Analytics

wbaa

Wholesale Banking Advanced Analytics team

Sign up for ING WBAA

By wbaa

Reading through ING WBAA publication you will get to discover more about our inclusive, multicultural tribe, through a nice mix of business and tech content! Enjoy! Take a look

By signing up, you will create a Medium account if you don’t already have one. Review our Privacy Policy for more information about our privacy practices.

Check your inbox
Medium sent you an email at to complete your subscription.

WB Advanced Analytics

Written by

ING’s Wholesale Banking Advanced Analytics team

wbaa

wbaa

Wholesale Banking Advanced Analytics team

WB Advanced Analytics

Written by

ING’s Wholesale Banking Advanced Analytics team

wbaa

wbaa

Wholesale Banking Advanced Analytics team

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store