Data’s Inferno: 7 Circles of Data Testing Hell with Airflow

TL; DR

Original image courtesy of Columbia Spectator (http://spc.columbiaspectator.com/spectrum/2016/03/31/nine-circles-columbia-hell)
  1. DAG Integrity Tests; have your CI (Continuous Integration) check if you DAG is an actual DAG
  2. Split your ingestion from your deployment; keep the logic you use to ingest data separate from the logic that deploys your application
  3. Data Tests; check if your logic is outputting what you’d expect
  4. Alerting; get slack alerts from your data pipelines when they blow up
  5. Git Enforcing; always make sure you’re running your latest verified code
  6. Mock Pipeline Tests; create fake data in your CI so you know exactly what to expect when testing your logic
  7. DTAP; split your data into four different environments, Development is really small, just to see if it runs, Test to take a representative sample of your data to do first sanity checks, Acceptance is a carbon copy of Production, allowing you to test performance and have a Product Owner do checks before releasing to Production

Circle 1: DAG integrity tests

task_a = BashOperator(task_id='task_a', …)test_task_a = BashOperator(task_id='test_task_a', …)task_b = BashOperator(task_id='task_a', …)
A rather unspecific error message in the Airflow UI
But in the CI tests, a much clearer message of what the problem is

How to use

  1. In your CI, setup a task/stage that has pytest, airflow, and coverage available
  2. Define a conftest.py file in which you add any custom Airflow configuration you might need.
  3. For every file in your repository (get the paths to these files yourself), run the following code:
assert any(
isinstance(var, af_models.DAG)
for var in vars(module).values()
)

Circle 2: Split Ingestion from Deployment

  • If your ingestion fails, you do not want your project ETL to fail immediately. It might be perfectly fine that (one of) your data sources is not completely up-to-date. Your project can just use the data that was previously ingested correctly
  • It created a nice place to enforce an interface between data sources that are logically the same, but arrive in (vastly) different formats
  • It also allowed us to have one GIT repository per ingestion and per project, this meant that we could separate all the code and CI tests that we want to run per repository
  • It makes it easy to add and remove data sources from a project, and reuse data sources for other projects (or replace them if a newer latest-and-greatest data source shows up).

How to use

  1. Create a GIT repository per data source, containing the ETL for the ingestion, and one per project, containing the ETL for that specific project
  2. Keep all the logic and CI tests belonging to source/project isolated
  3. Define an interface per logical part
  • payer account
  • payer name
  • beneficiary account
  • beneficiary name
  • amount
  • currency
  • date

Circle 3: Data Tests

  • Are there files available for ingestion?
  • Did we get the columns that we expected?
  • Are the rows that are in there valid?
  • Did the row count of your table only increase?
  • Are the known private individuals filtered out?
  • Are the known companies still in?
  • Has the aggregation of PIs worked correctly?

How to use

  1. Make sure your environment has both pytest and airflow available
  2. Define a conftest.py file in which you create a SparkSession and other configuration that you want to set
  3. Create a .py file per test that you want to run
  4. You can now schedule this test file in Airflow by setting it downstream of your ETL step and then calling it with a BashOperator and running it with pytest
pytest_cmd = """
export PYTHONPATH=/usr/spark/python/:/usr/spark/python/lib/\
py4j-0.10.4-src.zip:{spark_directory} &&\
export SPARK_HOME={spark_home} &&\
export ENVIRONMENT={environment} &&\
export MASTER=local &&\
cd /usr/local/airflow &&\
python -m pytest {directory}{script}
"""
test_union_transactions = BashOperator(
task_id='test_union_transactions',
bash_command=pytest_cmd.format(
environment=ENVIRONMENT,
directory=TESTS_DIRECTORY,
spark_directory=SPARK_DIRECTORY,
script='test_union_transactions.py',
spark_home=SPARK_HOME
),
dag=dag
)

Circle 4: Alerting

How to use

  1. In the default_args object that you use to create your DAG object, add the following:
'on_failure_callback': send_slack_alert
def send_slack_alert(context=None):
"""Send slack alert on failure to alert the team"""
payload_vars = {
'url': 'https://hooks.slack.com/services/<CREDENTIALS>',
'run_id': str(context['run_id']),
'task': str(context['task']),
'dag_name': str(context['dag'].dag_id)
}
error_message = """
{dag_name} Failure! Task failed: {task} Check log at: \
{run_id}
""".format(**payload_vars)

payload_vars['json'] = """
payload={{"channel":"#<YOUR_CHANNEL_HERE>","text":"{0}"}}
""".format(error_message)
slack_cmd = """
curl-X POST \
--data-urlencode '{json}' \
{url}
""".format(**payload_vars)
slack_alert = BashOperator(
task_id='slack_alert',
dag=dag,
bash_command=slack_cmd
)
slack_alert.execute(context)

Circle 5: Git Enforcing

How to use

# this will hard reset all repos to the version on master branch
# any local commits that have not been pushed yet will be lost.
echo "Resetting "${dir%/*}
git fetch
git checkout -f master
git reset --hard origin/master
git clean -df
git submodule update --recursive --force

Circle 6: Mock Pipeline Tests

How to use

  1. In your CI, setup a task/stage that has pytest, pyspark, and coverage available
  2. Create a conftest.py file in which you define the schemas of your data. Once that is in place, use these schemas to generate a given number of rows of data in Spark dataframes. Be sure to create rows of data that are representative of your real data. Write out each Spark dataframe to a table. By default, Spark will use Derby as its metastore. For testing purposes this is fine, you should be careful to ensure that your data generation in conftest.py uses the same metastore and spark-warehouse that the tests will use (otherwise the data will be missing)
  3. Create as many test files as you want. The general rule of thumb here is one test file for every Spark script in your data pipeline, possibly with multiple tests per file. In each test, call the function you’re testing. Once this completes, use Spark to verify that the output tables now contain the data you would expect given the input data you defined. As an example, imagine you work for a bank and have to ensure that North Korean transactions are removed from your data

Step 1: Define your fake data

PERSON =  [('name': 'Kim Yong Un', 'country': 'North Korea', 'iban': 'NK99NKBK0000000666'), …]TRANSACTIONS= [('iban': 'NK99NKBK0000000666', 'amount': 10 ), …]

Step 2: Run your function

filter_data(spark, "tst")

Step 3: Verify the output of the function

assert spark.sql("""
SELECT COUNT(*) ct
FROM filtered_data
WHERE iban = 'NK99NKBK0000000666'
""").first().ct == 0

Circle 7: DTAP

  • Development: contains a completely random, very small (0.0025% in our case) sample of your data. The data won’t make any sense, and as a consequence checking the development application using this data will not make any sense for a user. This environment’s purpose is to verify that the tasks in your data pipeline can be executed, and can complete with non-zero exit codes. Because the amount of data is very small, this pipeline will run very quickly compared to the production one.
  • Test: contains a slightly larger, non-random sample of the data. This non-random sample is achieved in our case by handpicking some entities for which to keep the data. The advantage of this is that the data for these entities will now make sense in your application. Therefore, you can now access the test application and verify (for this limited set of entities) that the data has been processed correctly. The definition of these entities is very important, and will depend heavily on the application and its use cases. Although the amount of data is significantly smaller than for the production pipeline (and so much quicker), the Test environment contains slightly more data than the Development environment due to the sampling method, and therefore will be slightly slower.
  • Acceptance: contains a carbon copy of the production data. The value of the acceptance environment lies in multiple aspects;
  1. This environment contains the full dataset. In some cases, mistakes in processing logic will only come to light when running on the full dataset (for example, incorrect statistics).
  2. Due to the sampling in the previous two environments, the Acceptance environment is also the first place in which the computational performance of the data pipeline can be examined and verified.
  3. Because all data is present, this environment can be used for your Product Owner to verify changes before pushing them to production (where the users will see them)
  • Production: contains all production data. This is the environment your users will use. As such, it is very important that problems with the data are kept to a minimum. Of course, it is very difficult to achieve to never have any data issues in your application, but this circle of data testing hell (and the others) can help keeping the number of issues to a minimum.
  • One of our engineers works on the feature on a branch off the Development branch, this branch will only get merged into Development if runs through successfully.
  • When finished, his/her code is reviewed and merged into the Development branch.
  • The Development DAG is manually triggered.
  • If the Development DAG successfully runs through, the final two tasks firstly promote the code to Test, then trigger the Test DAG to run.
  • If the Test DAG successfully runs through, the final two tasks promote the code to Acceptance, then trigger the Acceptance DAG to run.
  • Finally, we have chosen to have a ‘green-light’ procedure for deployments to Production. That means that we don’t automatically promote our Acceptance data pipeline code to Production. The main reason for this is that we want to do a final manual verification step with the Acceptance application before giving the ‘green light’ to use the changes on Production.
  • Once we’re happy with the data in Acceptance, we open a Merge Request into master (i.e. Production), which is then reviewed again. This merge may contain multiple changes that several people worked on, so it’s a good final check.
  • Production is not manually triggered, but instead uses the Airflow scheduler to run at fixed times.

How to use

  1. Make sure you have separate databases for each environment. In our case, that means a separate Hive database, a separate Druid datasource, and a separate PostgreSQL instance.
  2. Checkout your data pipeline repository 4 times in your airflow environment. Make sure each directory checks out the correct branch (i.e. the dev directory checks out the dev branch)
  3. Place a environment.conf file in each check out, which will specify which environment the directory is (and add this file to the .gitignore )
  4. Ensure your Airflow environment is configured to push to git (this usually means configuring a non-personal account).
  5. For the Development and Test branches, we use automatic promotion and triggering. This means adding two tasks to the DAGs. The first one takes care of the code promotion.
promote_branch_to_acc = BashOperator(
task_id='promote_branch_to_acc',
bash_command="""
cd {}/awesome_dag_acc/ &&
git checkout acceptance &&
git pull &&
git merge origin/test &&
git push &&
git submodule update --recursive --force
""".format(DAG_LOCATION),
dag=dag
)

# trigger acceptance.
trigger_next_environment_deploy = TriggerDagRunOperator(
task_id='trigger_next_environment_deploy',
python_callable=lambda context, dag_run: dag_run,
trigger_dag_id="awesome_dag_acc",
dag=dag
)

Wrapping Up

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store