Data’s Inferno: 7 Circles of Data Testing Hell with Airflow

TL; DR

Real data behaves in many unexpected ways that can break even the most well-engineered data pipelines. To catch as much of this weird behaviour as possible before users are affected, the ING Wholesale Banking Advanced Analytics team has created 7 layers of data testing that they use in their CI setup and Apache Airflow pipelines to stay in control of their data. The 7 layers are:

Original image courtesy of Columbia Spectator (http://spc.columbiaspectator.com/spectrum/2016/03/31/nine-circles-columbia-hell)
  1. Split your ingestion from your deployment; keep the logic you use to ingest data separate from the logic that deploys your application
  2. Data Tests; check if your logic is outputting what you’d expect
  3. Alerting; get slack alerts from your data pipelines when they blow up
  4. Git Enforcing; always make sure you’re running your latest verified code
  5. Mock Pipeline Tests; create fake data in your CI so you know exactly what to expect when testing your logic
  6. DTAP; split your data into four different environments, Development is really small, just to see if it runs, Test to take a representative sample of your data to do first sanity checks, Acceptance is a carbon copy of Production, allowing you to test performance and have a Product Owner do checks before releasing to Production

Circle 1: DAG integrity tests

DAG integrity tests are the first Circle of Data’s Inferno. The idea of this circle is to prevent any typos or mistakes in the definition of our Airflow DAG. It does this by performing a very simple check: is the DAG you have written a valid DAG? It verifies this by using Airflow’s own internal API to check if the DAG object is a correct, valid instance of the Airflow DAG model. These tests can be easily integrated into your CI pipeline, which enables you to automatically check a DAG’s validity every time you push to git. The power of this test is not only to catch typos. One of the easiest mistakes to make in Airflow is to accidentally define cycles. An example of a DAG with a cycle could be:

task_a = BashOperator(task_id='task_a', …)test_task_a = BashOperator(task_id='test_task_a', …)task_b = BashOperator(task_id='task_a', …)
A rather unspecific error message in the Airflow UI
But in the CI tests, a much clearer message of what the problem is

How to use

Integrating this layer requires a number of steps (assuming you already have a CI environment set up):

  1. Define a conftest.py file in which you add any custom Airflow configuration you might need.
  2. For every file in your repository (get the paths to these files yourself), run the following code:
assert any(
isinstance(var, af_models.DAG)
for var in vars(module).values()
)

Circle 2: Split Ingestion from Deployment

In an environment where you ingest data from multiple sources that end up in your project, you often have ETL specifically for your ingestion that can be logically separated from the ETL that goes into your project. At WBAA we do this by creating one Airflow DAG per data source and one DAG that does the ETL for our project. There were several reasons for us to do this:

  • It created a nice place to enforce an interface between data sources that are logically the same, but arrive in (vastly) different formats
  • It also allowed us to have one GIT repository per ingestion and per project, this meant that we could separate all the code and CI tests that we want to run per repository
  • It makes it easy to add and remove data sources from a project, and reuse data sources for other projects (or replace them if a newer latest-and-greatest data source shows up).

How to use

This layer is fairly easy to implement, you’ll need to logically separate your ingestion and deployment. The way it is implemented at WBAA:

  1. Keep all the logic and CI tests belonging to source/project isolated
  2. Define an interface per logical part
  • payer name
  • beneficiary account
  • beneficiary name
  • amount
  • currency
  • date

Circle 3: Data Tests

You can see a data test as a way to check if the logic you have just used in an ETL process has done what you expected it to do. It combines a sort of unit test for your ETL step with a more statistical test to see if the data still makes sense after processing.

  • Did we get the columns that we expected?
  • Are the rows that are in there valid?
  • Did the row count of your table only increase?
  • Are the known companies still in?
  • Has the aggregation of PIs worked correctly?

How to use

  1. Make sure your environment has both pytest and airflow available
  2. Define a conftest.py file in which you create a SparkSession and other configuration that you want to set
  3. Create a .py file per test that you want to run
  4. You can now schedule this test file in Airflow by setting it downstream of your ETL step and then calling it with a BashOperator and running it with pytest
pytest_cmd = """
export PYTHONPATH=/usr/spark/python/:/usr/spark/python/lib/\
py4j-0.10.4-src.zip:{spark_directory} &&\
export SPARK_HOME={spark_home} &&\
export ENVIRONMENT={environment} &&\
export MASTER=local &&\
cd /usr/local/airflow &&\
python -m pytest {directory}{script}
"""
test_union_transactions = BashOperator(
task_id='test_union_transactions',
bash_command=pytest_cmd.format(
environment=ENVIRONMENT,
directory=TESTS_DIRECTORY,
spark_directory=SPARK_DIRECTORY,
script='test_union_transactions.py',
spark_home=SPARK_HOME
),
dag=dag
)

Circle 4: Alerting

When things go wrong (and we assume that this will happen), it is important that we, as engineers, are notified, so that we can respond quickly. Chuck Norris helps us with this. Chuck Norris is a Slack ‘bot’ that posts to a designated Slack channel when a DAG fails, with some details on which DAG and which task in this DAG failed. For privacy reasons, we don’t post any more information on the error in Slack, but you could add this if your data is not privacy-sensitive. Chuck Norris allows us to avoid constantly checking a DAG’s progress. One danger is that you can become reliant on Chuck to notify you of failure, so if Chuck himself fails (for instance, because a firewall opening is closed), you would wrongfully assume all is well.

How to use

When we initially added Chuck Norris, Airflow’s Slack operator did not meet our requirements. We therefore use a simple BashOperator. Because we don’t want you good people posting into our Slack channel, we haven’t included this in the GitHub repository. You can, however, easily integrate this in your own code with the following steps:

'on_failure_callback': send_slack_alert
def send_slack_alert(context=None):
"""Send slack alert on failure to alert the team"""
payload_vars = {
'url': 'https://hooks.slack.com/services/<CREDENTIALS>',
'run_id': str(context['run_id']),
'task': str(context['task']),
'dag_name': str(context['dag'].dag_id)
}
error_message = """
{dag_name} Failure! Task failed: {task} Check log at: \
{run_id}
""".format(**payload_vars)

payload_vars['json'] = """
payload={{"channel":"#<YOUR_CHANNEL_HERE>","text":"{0}"}}
""".format(error_message)
slack_cmd = """
curl-X POST \
--data-urlencode '{json}' \
{url}
""".format(**payload_vars)
slack_alert = BashOperator(
task_id='slack_alert',
dag=dag,
bash_command=slack_cmd
)
slack_alert.execute(context)

Circle 5: Git Enforcing

Git Enforcing to us means making sure that each day a process resets each DAG to the last verified version (i.e. the code on origin/master ). We call this ‘Nuclear git’, because it will blow away any unchecked code. In this way, we can safely guarantee what code is actually executed in our daily ingestion and deployment DAGs.

How to use

Nuclear git is, in fact, an Airflow DAG, in our case scheduled to run every day. The code is very simple, a single BashOperator with the following code:

# this will hard reset all repos to the version on master branch
# any local commits that have not been pushed yet will be lost.
echo "Resetting "${dir%/*}
git fetch
git checkout -f master
git reset --hard origin/master
git clean -df
git submodule update --recursive --force

Circle 6: Mock Pipeline Tests

One of the most difficult aspects of designing and implementing reliable data pipelines is the fact that when working with real data, there are two moving parts: the data (and its quality) and your code. In order to be able to reliably test your code, it’s very important to ensure that your code is the only ‘moving part’. This circle of tests fixes the data in place, allowing the tests to focus on the code. By fixing the data, you know what data goes in to the functions in your code, and you can verify that the output of the functions in your code matches what you expect. In a sense, we are treating our code as a black box, and verifying its behaviour given certain input data.

How to use

We have implemented our fake data tests with PySpark, as almost all of our data processing logic leverages Spark. In order to use the fake data tests, you’ll need a couple of things (assuming you have a working CI environment):

  1. Create a conftest.py file in which you define the schemas of your data. Once that is in place, use these schemas to generate a given number of rows of data in Spark dataframes. Be sure to create rows of data that are representative of your real data. Write out each Spark dataframe to a table. By default, Spark will use Derby as its metastore. For testing purposes this is fine, you should be careful to ensure that your data generation in conftest.py uses the same metastore and spark-warehouse that the tests will use (otherwise the data will be missing)
  2. Create as many test files as you want. The general rule of thumb here is one test file for every Spark script in your data pipeline, possibly with multiple tests per file. In each test, call the function you’re testing. Once this completes, use Spark to verify that the output tables now contain the data you would expect given the input data you defined. As an example, imagine you work for a bank and have to ensure that North Korean transactions are removed from your data

Step 1: Define your fake data

PERSON =  [('name': 'Kim Yong Un', 'country': 'North Korea', 'iban': 'NK99NKBK0000000666'), …]TRANSACTIONS= [('iban': 'NK99NKBK0000000666', 'amount': 10 ), …]

Step 2: Run your function

filter_data(spark, "tst")

Step 3: Verify the output of the function

assert spark.sql("""
SELECT COUNT(*) ct
FROM filtered_data
WHERE iban = 'NK99NKBK0000000666'
""").first().ct == 0

Circle 7: DTAP

Splitting application back-ends and front-ends across multiple environments has been a best practice in software development for many years. Generally, four environments are defined: development, test, acceptance, and production. Each of these serves a distinct purpose. At WBAA, in order to deal with the reality of real data, we have extended this concept of multiple environments to our data. Normally, only the production environment of an application runs on production (i.e. real) data. In our case, however, we have created 4 distinct environments, in which real data is used in all. We do this, because real data always contains unexpected, surprising characteristics. We use 4 environments:

  • Test: contains a slightly larger, non-random sample of the data. This non-random sample is achieved in our case by handpicking some entities for which to keep the data. The advantage of this is that the data for these entities will now make sense in your application. Therefore, you can now access the test application and verify (for this limited set of entities) that the data has been processed correctly. The definition of these entities is very important, and will depend heavily on the application and its use cases. Although the amount of data is significantly smaller than for the production pipeline (and so much quicker), the Test environment contains slightly more data than the Development environment due to the sampling method, and therefore will be slightly slower.
  • Acceptance: contains a carbon copy of the production data. The value of the acceptance environment lies in multiple aspects;
  1. Due to the sampling in the previous two environments, the Acceptance environment is also the first place in which the computational performance of the data pipeline can be examined and verified.
  2. Because all data is present, this environment can be used for your Product Owner to verify changes before pushing them to production (where the users will see them)
  • When finished, his/her code is reviewed and merged into the Development branch.
  • The Development DAG is manually triggered.
  • If the Development DAG successfully runs through, the final two tasks firstly promote the code to Test, then trigger the Test DAG to run.
  • If the Test DAG successfully runs through, the final two tasks promote the code to Acceptance, then trigger the Acceptance DAG to run.
  • Finally, we have chosen to have a ‘green-light’ procedure for deployments to Production. That means that we don’t automatically promote our Acceptance data pipeline code to Production. The main reason for this is that we want to do a final manual verification step with the Acceptance application before giving the ‘green light’ to use the changes on Production.
  • Once we’re happy with the data in Acceptance, we open a Merge Request into master (i.e. Production), which is then reviewed again. This merge may contain multiple changes that several people worked on, so it’s a good final check.
  • Production is not manually triggered, but instead uses the Airflow scheduler to run at fixed times.

How to use

Setting up your own DTAP is a rather involved process that can be tricky. However, it is extremely valuable to have in place, as a lot of issues are caught before your users notice. Moreover, due to the sampling of the data, the deployments on Development and Test give you a quick heads-up if something is horribly wrong. There are a number of steps necessary to successfully set up a DTAP environment:

  1. Checkout your data pipeline repository 4 times in your airflow environment. Make sure each directory checks out the correct branch (i.e. the dev directory checks out the dev branch)
  2. Place a environment.conf file in each check out, which will specify which environment the directory is (and add this file to the .gitignore )
  3. Ensure your Airflow environment is configured to push to git (this usually means configuring a non-personal account).
  4. For the Development and Test branches, we use automatic promotion and triggering. This means adding two tasks to the DAGs. The first one takes care of the code promotion.
promote_branch_to_acc = BashOperator(
task_id='promote_branch_to_acc',
bash_command="""
cd {}/awesome_dag_acc/ &&
git checkout acceptance &&
git pull &&
git merge origin/test &&
git push &&
git submodule update --recursive --force
""".format(DAG_LOCATION),
dag=dag
)

# trigger acceptance.
trigger_next_environment_deploy = TriggerDagRunOperator(
task_id='trigger_next_environment_deploy',
python_callable=lambda context, dag_run: dag_run,
trigger_dag_id="awesome_dag_acc",
dag=dag
)

Wrapping Up

In this post, we’ve described our 7 Circles of Data Testing Hell; when to use each circle and also how to use it. Our goal was to present our setup, and to help others in setting up their data tests. You don’t have to apply every circle if you don’t need/want to, the 7 Circles should be seen as an à la carte menu, where you can pick and choose the circles that are applicable/useful in your situation. Once again, you can find an example of the circles here: https://github.com/danielvdende/data-testing-with-airflow

wbaa

Wholesale Banking Advanced Analytics team