Data’s Inferno: 7 Circles of Data Testing Hell with Airflow

TL; DR

Real data behaves in many unexpected ways that can break even the most well-engineered data pipelines. To catch as much of this weird behaviour as possible before users are affected, the ING Wholesale Banking Advanced Analytics team has created 7 layers of data testing that they use in their CI setup and Apache Airflow pipelines to stay in control of their data. The 7 layers are:

Original image courtesy of Columbia Spectator (http://spc.columbiaspectator.com/spectrum/2016/03/31/nine-circles-columbia-hell)
  1. DAG Integrity Tests; have your CI (Continuous Integration) check if you DAG is an actual DAG
  2. Split your ingestion from your deployment; keep the logic you use to ingest data separate from the logic that deploys your application
  3. Data Tests; check if your logic is outputting what you’d expect
  4. Alerting; get slack alerts from your data pipelines when they blow up
  5. Git Enforcing; always make sure you’re running your latest verified code
  6. Mock Pipeline Tests; create fake data in your CI so you know exactly what to expect when testing your logic
  7. DTAP; split your data into four different environments, Development is really small, just to see if it runs, Test to take a representative sample of your data to do first sanity checks, Acceptance is a carbon copy of Production, allowing you to test performance and have a Product Owner do checks before releasing to Production

We have ordered the 7 layers in order of complexity of implementing them, where Circle 1 is relatively easy to implement, and Circle 7 is more complex. Examples of 5 of these circles can be found at: https://github.com/danielvdende/data-testing-with-airflow.

We cannot make all 7 public, 4 and 5 are missing, as that would allow everyone to push to our git repository, and post to our Slack channel :-).

Circle 1: DAG integrity tests

DAG integrity tests are the first Circle of Data’s Inferno. The idea of this circle is to prevent any typos or mistakes in the definition of our Airflow DAG. It does this by performing a very simple check: is the DAG you have written a valid DAG? It verifies this by using Airflow’s own internal API to check if the DAG object is a correct, valid instance of the Airflow DAG model. These tests can be easily integrated into your CI pipeline, which enables you to automatically check a DAG’s validity every time you push to git. The power of this test is not only to catch typos. One of the easiest mistakes to make in Airflow is to accidentally define cycles. An example of a DAG with a cycle could be:

task_a = BashOperator(task_id='task_a', …)
test_task_a = BashOperator(task_id='test_task_a', …)
task_b = BashOperator(task_id='task_a', …)

The mistake here is the task_id value given to the final operator. This kind of mistake can easily happen with quick copy-pasting, and can be very annoying to debug. But the DAG integrity tests flag this issue before you deploy your code to your Airflow environment. So instead of seeing this in the Airflow UI:

A rather unspecific error message in the Airflow UI

You’ll see this error in your CI:

But in the CI tests, a much clearer message of what the problem is

How to use

Integrating this layer requires a number of steps (assuming you already have a CI environment set up):

  1. In your CI, setup a task/stage that has pytest, airflow, and coverage available
  2. Define a conftest.py file in which you add any custom Airflow configuration you might need.
  3. For every file in your repository (get the paths to these files yourself), run the following code:
assert any(
isinstance(var, af_models.DAG)
for var in vars(module).values()
)

This code verifies that the file in which you claim to have defined a DAG actually contains a valid Airflow DAG.

Circle 2: Split Ingestion from Deployment

In an environment where you ingest data from multiple sources that end up in your project, you often have ETL specifically for your ingestion that can be logically separated from the ETL that goes into your project. At WBAA we do this by creating one Airflow DAG per data source and one DAG that does the ETL for our project. There were several reasons for us to do this:

  • If your ingestion fails, you do not want your project ETL to fail immediately. It might be perfectly fine that (one of) your data sources is not completely up-to-date. Your project can just use the data that was previously ingested correctly
  • It created a nice place to enforce an interface between data sources that are logically the same, but arrive in (vastly) different formats
  • It also allowed us to have one GIT repository per ingestion and per project, this meant that we could separate all the code and CI tests that we want to run per repository
  • It makes it easy to add and remove data sources from a project, and reuse data sources for other projects (or replace them if a newer latest-and-greatest data source shows up).

How to use

This layer is fairly easy to implement, you’ll need to logically separate your ingestion and deployment. The way it is implemented at WBAA:

  1. Create a GIT repository per data source, containing the ETL for the ingestion, and one per project, containing the ETL for that specific project
  2. Keep all the logic and CI tests belonging to source/project isolated
  3. Define an interface per logical part

We will give an example of an interface we defined that forces all the different systems in ING that do payment transactions into the same shape. At the end of the ETL of the ingestion of a payment transaction data source we enforce the following schema for the data:

  • payer account
  • payer name
  • beneficiary account
  • beneficiary name
  • amount
  • currency
  • date

Why is this a necessary? Because one source system could send things in a party, counterparty format where the +or -sign within the monetary amount column indicates which way the money went. Yet another could do the exact same but have a credit or debit column, you get the picture :-).

Circle 3: Data Tests

You can see a data test as a way to check if the logic you have just used in an ETL process has done what you expected it to do. It combines a sort of unit test for your ETL step with a more statistical test to see if the data still makes sense after processing.

As previously mentioned, at WBAA we have split our Airflow DAGs in those for ingestion and those for project ETL. We also make a distinction in what kind of data tests we want to run in these DAGs to make sure that after running the DAG it has done what we expect.

Examples of data tests for ingestion

  • Are there files available for ingestion?
  • Did we get the columns that we expected?
  • Are the rows that are in there valid?
  • Did the row count of your table only increase?

The valid rows test is where we check if the customer ID’s that we got in a data source, actually reference a real record in our central customer database (referential integrity). You’d be surprised how often a simple test like this picks out weird records. Same with checking if after ingestion, you have more records. This simple test has let us know on several occasions that when we changed logic, we made a mistake because suddenly the number of records unexpectedly decreased.

Examples of project ETL data tests

Imagine you’re dealing with payment transaction data that has both information of companies and private individuals (PIs) and you need to perform an ETL step that aggregates these PIs in to one line per week for all payments from PIs to companies and vice versa, filters out PI to PI payments, and keeps company to company payments untouched.

An example of tests can then be:

  • Are the known private individuals filtered out?
  • Are the known companies still in?
  • Has the aggregation of PIs worked correctly?

In general, WBAA aims to have at least one data test after every ETL task in a DAG.

How to use

  1. Make sure your environment has both pytest and airflow available
  2. Define a conftest.py file in which you create a SparkSession and other configuration that you want to set
  3. Create a .py file per test that you want to run
  4. You can now schedule this test file in Airflow by setting it downstream of your ETL step and then calling it with a BashOperator and running it with pytest
pytest_cmd = """
export PYTHONPATH=/usr/spark/python/:/usr/spark/python/lib/\
py4j-0.10.4-src.zip:{spark_directory} &&\
export SPARK_HOME={spark_home} &&\
export ENVIRONMENT={environment} &&\
export MASTER=local &&\
cd /usr/local/airflow &&\
python -m pytest {directory}{script}
"""
test_union_transactions = BashOperator(
task_id='test_union_transactions',
bash_command=pytest_cmd.format(
environment=ENVIRONMENT,
directory=TESTS_DIRECTORY,
spark_directory=SPARK_DIRECTORY,
script='test_union_transactions.py',
spark_home=SPARK_HOME
),
dag=dag
)

Circle 4: Alerting

When things go wrong (and we assume that this will happen), it is important that we, as engineers, are notified, so that we can respond quickly. Chuck Norris helps us with this. Chuck Norris is a Slack ‘bot’ that posts to a designated Slack channel when a DAG fails, with some details on which DAG and which task in this DAG failed. For privacy reasons, we don’t post any more information on the error in Slack, but you could add this if your data is not privacy-sensitive. Chuck Norris allows us to avoid constantly checking a DAG’s progress. One danger is that you can become reliant on Chuck to notify you of failure, so if Chuck himself fails (for instance, because a firewall opening is closed), you would wrongfully assume all is well.

How to use

When we initially added Chuck Norris, Airflow’s Slack operator did not meet our requirements. We therefore use a simple BashOperator. Because we don’t want you good people posting into our Slack channel, we haven’t included this in the GitHub repository. You can, however, easily integrate this in your own code with the following steps:

  1. In the default_args object that you use to create your DAG object, add the following:
'on_failure_callback': send_slack_alert

2. Add the following function to your Airflowfile:

def send_slack_alert(context=None):
"""Send slack alert on failure to alert the team"""
payload_vars = {
'url': 'https://hooks.slack.com/services/<CREDENTIALS>',
'run_id': str(context['run_id']),
'task': str(context['task']),
'dag_name': str(context['dag'].dag_id)
}
error_message = """
{dag_name} Failure! Task failed: {task} Check log at: \
{run_id}
""".format(**payload_vars)

payload_vars['json'] = """
payload={{"channel":"#<YOUR_CHANNEL_HERE>","text":"{0}"}}
""".format(error_message)
    slack_cmd = """
curl-X POST \
--data-urlencode '{json}' \
{url}
""".format(**payload_vars)
    slack_alert = BashOperator(
task_id='slack_alert',
dag=dag,
bash_command=slack_cmd
)
slack_alert.execute(context)

Now, every time a task fails, the send_slack_alert() function is called, which can perform any logic you want (in this case, post a Slack message).

Circle 5: Git Enforcing

Git Enforcing to us means making sure that each day a process resets each DAG to the last verified version (i.e. the code on origin/master ). We call this ‘Nuclear git’, because it will blow away any unchecked code. In this way, we can safely guarantee what code is actually executed in our daily ingestion and deployment DAGs.

In our environment, we have a single Airflow installation, which runs all data pipelines for dev, tst, acc and prd. One of the main reasons for this is that it limits the access to the central infrastructure to a single place, which makes it easier to control. Moreover, for some of our circles (e.g. Circle 7), it is very important that our code runs on production data. This means, however, that in some cases code in our Airflow environment is undergoing active development. Even the best engineers are only human (well, most of them ;-) ), and thus can forget to checkout the correct branch at the end of the day. In order to prevent unexpected results (because of dangling, in-development code), we added Nuclear git to our environment. The assumption is that all DAGs are git repositories that reside on your Airflow environment.

How to use

Nuclear git is, in fact, an Airflow DAG, in our case scheduled to run every day. The code is very simple, a single BashOperator with the following code:

BEWARE! Run this code at your own peril!
# this will hard reset all repos to the version on master branch
# any local commits that have not been pushed yet will be lost.
echo "Resetting "${dir%/*}
git fetch
git checkout -f master
git reset --hard origin/master
git clean -df
git submodule update --recursive --force

A nice side-effect of nuclear git is that it enforces good version control behaviour. A single commit at the end of the day is not good enough, and is very easy to forget. Nuclear git punishes this behaviour, and makes developers think about their git usage more consciously.

Circle 6: Mock Pipeline Tests

One of the most difficult aspects of designing and implementing reliable data pipelines is the fact that when working with real data, there are two moving parts: the data (and its quality) and your code. In order to be able to reliably test your code, it’s very important to ensure that your code is the only ‘moving part’. This circle of tests fixes the data in place, allowing the tests to focus on the code. By fixing the data, you know what data goes in to the functions in your code, and you can verify that the output of the functions in your code matches what you expect. In a sense, we are treating our code as a black box, and verifying its behaviour given certain input data.

Implementing this layer is quite involved, and requires a good understanding and definition of your data. The usefulness of the tests here will very strongly depend on how well your fake data models the real data. Of course, as you learn more about your data and its characteristics, you can and should update your fake data.

How to use

We have implemented our fake data tests with PySpark, as almost all of our data processing logic leverages Spark. In order to use the fake data tests, you’ll need a couple of things (assuming you have a working CI environment):

  1. In your CI, setup a task/stage that has pytest, pyspark, and coverage available
  2. Create a conftest.py file in which you define the schemas of your data. Once that is in place, use these schemas to generate a given number of rows of data in Spark dataframes. Be sure to create rows of data that are representative of your real data. Write out each Spark dataframe to a table. By default, Spark will use Derby as its metastore. For testing purposes this is fine, you should be careful to ensure that your data generation in conftest.py uses the same metastore and spark-warehouse that the tests will use (otherwise the data will be missing)
  3. Create as many test files as you want. The general rule of thumb here is one test file for every Spark script in your data pipeline, possibly with multiple tests per file. In each test, call the function you’re testing. Once this completes, use Spark to verify that the output tables now contain the data you would expect given the input data you defined. As an example, imagine you work for a bank and have to ensure that North Korean transactions are removed from your data

Step 1: Define your fake data

PERSON =  [('name': 'Kim Yong Un', 'country': 'North Korea', 'iban': 'NK99NKBK0000000666'), …]
TRANSACTIONS= [('iban': 'NK99NKBK0000000666', 'amount': 10 ), …]

Step 2: Run your function

filter_data(spark, "tst")

Step 3: Verify the output of the function

assert spark.sql("""
SELECT COUNT(*) ct
FROM filtered_data
WHERE iban = 'NK99NKBK0000000666'
""").first().ct == 0

Circle 7: DTAP

Splitting application back-ends and front-ends across multiple environments has been a best practice in software development for many years. Generally, four environments are defined: development, test, acceptance, and production. Each of these serves a distinct purpose. At WBAA, in order to deal with the reality of real data, we have extended this concept of multiple environments to our data. Normally, only the production environment of an application runs on production (i.e. real) data. In our case, however, we have created 4 distinct environments, in which real data is used in all. We do this, because real data always contains unexpected, surprising characteristics. We use 4 environments:

  • Development: contains a completely random, very small (0.0025% in our case) sample of your data. The data won’t make any sense, and as a consequence checking the development application using this data will not make any sense for a user. This environment’s purpose is to verify that the tasks in your data pipeline can be executed, and can complete with non-zero exit codes. Because the amount of data is very small, this pipeline will run very quickly compared to the production one.
  • Test: contains a slightly larger, non-random sample of the data. This non-random sample is achieved in our case by handpicking some entities for which to keep the data. The advantage of this is that the data for these entities will now make sense in your application. Therefore, you can now access the test application and verify (for this limited set of entities) that the data has been processed correctly. The definition of these entities is very important, and will depend heavily on the application and its use cases. Although the amount of data is significantly smaller than for the production pipeline (and so much quicker), the Test environment contains slightly more data than the Development environment due to the sampling method, and therefore will be slightly slower.
  • Acceptance: contains a carbon copy of the production data. The value of the acceptance environment lies in multiple aspects;
  1. This environment contains the full dataset. In some cases, mistakes in processing logic will only come to light when running on the full dataset (for example, incorrect statistics).
  2. Due to the sampling in the previous two environments, the Acceptance environment is also the first place in which the computational performance of the data pipeline can be examined and verified.
  3. Because all data is present, this environment can be used for your Product Owner to verify changes before pushing them to production (where the users will see them)
  • Production: contains all production data. This is the environment your users will use. As such, it is very important that problems with the data are kept to a minimum. Of course, it is very difficult to achieve to never have any data issues in your application, but this circle of data testing hell (and the others) can help keeping the number of issues to a minimum.

Each of these environments is a separate git branch in our data pipeline repository. Each branch is separately checked out in our Airflow environment, so we have 4 branches of the same repository checked out in our Airflow environment, resulting in 4 DAGs. In order to define which environment a DAG represents, we add a simple text file in each checked out directory, identifying the environment. In order to keep manual work to a minimum, we have added automatic promotion functionality to our DTAP procedure. In short, an average workflow on a feature will be as follows:

  • One of our engineers works on the feature on a branch off the Development branch, this branch will only get merged into Development if runs through successfully.
  • When finished, his/her code is reviewed and merged into the Development branch.
  • The Development DAG is manually triggered.
  • If the Development DAG successfully runs through, the final two tasks firstly promote the code to Test, then trigger the Test DAG to run.
  • If the Test DAG successfully runs through, the final two tasks promote the code to Acceptance, then trigger the Acceptance DAG to run.
  • Finally, we have chosen to have a ‘green-light’ procedure for deployments to Production. That means that we don’t automatically promote our Acceptance data pipeline code to Production. The main reason for this is that we want to do a final manual verification step with the Acceptance application before giving the ‘green light’ to use the changes on Production.
  • Once we’re happy with the data in Acceptance, we open a Merge Request into master (i.e. Production), which is then reviewed again. This merge may contain multiple changes that several people worked on, so it’s a good final check.
  • Production is not manually triggered, but instead uses the Airflow scheduler to run at fixed times.

How to use

Setting up your own DTAP is a rather involved process that can be tricky. However, it is extremely valuable to have in place, as a lot of issues are caught before your users notice. Moreover, due to the sampling of the data, the deployments on Development and Test give you a quick heads-up if something is horribly wrong. There are a number of steps necessary to successfully set up a DTAP environment:

  1. Make sure you have separate databases for each environment. In our case, that means a separate Hive database, a separate Druid datasource, and a separate PostgreSQL instance.
  2. Checkout your data pipeline repository 4 times in your airflow environment. Make sure each directory checks out the correct branch (i.e. the dev directory checks out the dev branch)
  3. Place a environment.conf file in each check out, which will specify which environment the directory is (and add this file to the .gitignore )
  4. Ensure your Airflow environment is configured to push to git (this usually means configuring a non-personal account).
  5. For the Development and Test branches, we use automatic promotion and triggering. This means adding two tasks to the DAGs. The first one takes care of the code promotion.
promote_branch_to_acc = BashOperator(
task_id='promote_branch_to_acc',
bash_command="""
cd {}/awesome_dag_acc/ &&
git checkout acceptance &&
git pull &&
git merge origin/test &&
git push &&
git submodule update --recursive --force
""".format(DAG_LOCATION),
dag=dag
)

# trigger acceptance.
trigger_next_environment_deploy = TriggerDagRunOperator(
task_id='trigger_next_environment_deploy',
python_callable=lambda context, dag_run: dag_run,
trigger_dag_id="awesome_dag_acc",
dag=dag
)

These are the basic steps required. In our GitHub repository, you can find an example. In this example, we have not included the code promotion, as it would have meant giving write access to the repository for everyone :-). If you spin up the Docker container provided, you’ll see the four branches checked out, with the triggering behaviour described above.

Wrapping Up

In this post, we’ve described our 7 Circles of Data Testing Hell; when to use each circle and also how to use it. Our goal was to present our setup, and to help others in setting up their data tests. You don’t have to apply every circle if you don’t need/want to, the 7 Circles should be seen as an à la carte menu, where you can pick and choose the circles that are applicable/useful in your situation. Once again, you can find an example of the circles here: https://github.com/danielvdende/data-testing-with-airflow

If you have any questions, remarks, or just want to chat about any of this, please get in touch :-).

Daniel van der Ende & John Müller
ING Wholesale Banking Advanced Analytics