Advanced Testing Techniques for your Python Data Pipeline with Dask and Pytest Fixtures

Achieve full coverage; Use fixtures and write tests easily; Refactor in confidence with non-regression tests

Marc Wouts
CFM Insights
12 min readDec 10, 2021

--

Photo by Joshua Sortino on Unsplash

CFM is a Quantitative Hedge Fund with more than 30 years of experience in the domain of quantitative investing. Our daily production (and research) is done with a complex Python data pipeline.

In this post, we share our experience in writing tests for the pipeline. Our first objective was to improve the coverage of the pipeline. Then, we also wanted to improve the identification of the contributions that could break any production or research usage.

The techniques that we have used, and that we document in this article are:

  • Test the functions using fixtures
  • Generate fixtures for every node of the pipeline
  • Identify unexpected code impacts with non-regression tests
  • Put breakpoints programmatically and compare the arguments passed to a given function in two different versions of the pipeline, to refactor with confidence.

Introducing our Python Pipeline

We use a pipeline to best combine the research contributions of the different teams. For instance, we have tasks dedicated to

  • data streams (financial information like past prices, economic data, news, …)
  • signals, derived from the data, that express numerically views on future returns, for one or more financial instruments
  • and many other tasks up to portfolio construction.

A convenient way to schedule this tools is to use a pipeline. At CFM, we use an in-house Python utility for this, but for this post, I’ll assume that our pipeline is implemented with Dask (see this video: Next Generation Big Data Pipelines with Prefect and Dask for a review of Dask and a few other alternatives). Also, for the sake of simplicity our sample pipeline is made of only four nodes:

We can visualize the pipeline with pipeline.visualize():

Combining the nodes into a pipeline is a great improvement compared to using a simple bash scheduler:

  • With the pipeline, running the pipeline in full is accessible to every user, since the dependencies are taken care of by the pipeline. The users do not need any more to have experience with all the tasks in the pipeline.
  • The nodes must be implemented with pure functions, that is, functions with an output that is deterministic given the inputs, and have no side effect. This is a very important requirement — not using pure functions will cause lots of complexity and extra maintenance work when a task must be regenerated.
  • The pipeline, being more Python-oriented, is also notebook friendly — isn’t it easier to call a function than run a script in a Jupyter notebook?

Testing the pipeline and the nodes

Our next objective after building our pipeline is to improve coverage. While coverage is not enough to make sure that you will detect problems with a new contribution, it is necessary. Without tests, the pipeline will often break in research.

A simple way to improve coverage is to run the pipeline in full on the CI. Obviously, we don’t use the full configuration — our test pipeline runs for just two portfolios, a handful of financial instruments, and extends over just a few days. Overall it runs in under one minute.

But we wanted to go a bit further. We have observed that many contributors find it difficult to write tests because preparing the inputs for the function to be tested is challenging. And many tests were long to write just because of the data preparation code:

We first decided to separate the test input preparation from the test itself. We created fixtures (see below) that can be used in as many tests as we want. With fixtures at hand, writing a test becomes very easy - the example above becomes much shorter indeed:

Another important point is that, within a pipeline, the inputs for a node are its parent nodes. So, we decided to generate the fixtures directly from the pipeline — but before discussing that, let us introduce the fixtures.

Introducing pytest fixtures

A pytest fixture is a function decorated with pytest.fixture. You can define the fixtures either directly in the test file, or in a conftest.py file in the same or in a parent directory.

For our first fixture example in the folder tests_1_hand_written_fixtures, we create the fixtures by just calling the corresponding functions:

Now we can use these fixtures in the test just by putting them as arguments to the test:

Why use scope="session"?

Our test pipeline executes in full in under one minute, but we don’t want to multiply this by the number of tests. With the scope="session" option, we save a lot of time as the fixtures are generated just once (per worker, so they will still be generated multiple times if you use pytest-xdist).

For instance, if we launch the tests in test_1_data.py, you see that the fixtures are generated on demand, and just once for each of them (cf. the INFO logs).

It is legitimate to use scope="session" as we work with pure functions - as required by the pipeline. If you are not so sure that your functions are pure, and want to double-check that they do not modify their input by reference, you can do so in the fixture teardown:

In this example we have used DeepDiff to show the recursive differences between two Python objects — this sounds like a great library, but I have to mention that I have no extended experience with it.

Do I need to duplicate the pipeline in the conftest.py?

In our first example ( tests_1_hand_written_fixtures), we have actually re-implemented the pipeline in the conftest.py file. This causes duplication and will require specific maintenance when you change the pipeline, so we recommend this approach only for short and stable pipelines.

In the second folder tests_2_fixtures_generated_with_the_pipeline, we used another approach. We created a fixture for the evaluated pipeline — a dictionary with the value for every node — and then we exposed each node as a fixture.

The advantages of that approach are:

  • Low maintenance: you just need to add or remove fixtures when a node is added to or removed from the pipeline. Changes on the node arguments are automatically replicated on the fixtures.
  • The pipeline is covered in full — no matter if some nodes are not used in the tests, they are evaluated

But it also has a few drawbacks:

  • The fixtures become fragile. Say you work on the get_signals function, start developing and introduce an error in the function... Because of this the pipeline cannot be evaluated anymore. So you cannot get a value for the fixtures, and you are not in a position to launch the test on get_signals anymore.
  • The fact that the pipeline is evaluated in full makes the creation of the fixtures a bit slower. Now the logs look like this:

In particular, we see that the signals are generated even if they are not used in the tests.

As a conclusion, this tests_2_fixtures_generated_with_the_pipeline approach is good for the CI, but not for local development.

Generating the fixtures from a cached run of the pipeline

This is the approach that we use in practice. The pipeline is run in full on the CI, and for local developments, we save the results to a cache.

Our sample implementation is available at tests_3_fixtures_from_a_cached_pipeline, and we cite a short extract here:

Note that the fixture cached_pipeline_path may not return instantly - it will evaluate and cache the full pipeline if necessary (e.g. if it executed on the CI, or if the user removed the local cache).

This approach has many advantages:

  • The fixtures are available instantaneously (they are loaded from disk, not computed)
  • We can develop freely and make breaking changes locally, that will not affect the fixtures generation (well, not until we decide to regenerate)
  • And we get full coverage of the pipeline on the CI.

The only disadvantage of this method is that the developer must be aware of the cache, and will need to know when to remove and regenerate it.

The first time we run the test suite, we see a mention that the cache is being generated, and from the second time on the log will point out to the cache being reused:

My pipeline has parameters. Should I write multiple conftests with different fixtures?

We recommend working with only one set of fixtures. Maintaining a pipeline of fixtures is an investment in code, in user training, so it is best if everyone knows what the sample pipeline is.

We do understand that some tests require specific inputs. When this is the case, we recommend to derive custom fixtures from the reference ones.

Assume for instance that some signal generation requires that "FB" be among the tickers. In that case, we can simply create a new fixture

⚠️Pay attention to not change the original fixture by reference, i.e. do tickers + {"FB"} but not tickers.add('FB')!

Non-regression tests

With the test fixtures documented above, we already get excellent coverage for the data pipeline. Still, this is not enough to ensure that the pipeline will work in practical applications.

So we added another kind of test to our platform, the non-regression tests. These tests are run with the complete portfolio configuration, for each portfolio that we have in production. We want a test that is not too slow (< 5 minutes), so we don’t cover the full data history but just one month. Also, we might not cover the full cartesian product of portfolios times tasks, but only the selection that most matters to us.

Our sample non-regression test is coded at tests_4_non_regression, and the test itself is also reproduced below.

The difference between the non-regression test and the previous test suite, are that

  • The non-regression test has better coverage of the production use cases (since it uses the production configuration)
  • It will detect any impact on the outputs of the nodes. Unlike the simple tests that we wrote before, we don’t only check the shape of the outputs, but also their value.
  • It also takes more time to run but gives much more confidence in the updated code.

In our example, we saved the non-regression data into a simple file. When a non-regression occurs and is expected, that file must be updated with the new outputs (i.e. deleted, the framework will regenerate it). It is possible to save the non-regression data outside the project repository (i.e. on disk/url) if it is too big. In that case, make sure the non-regression data sets are incremental (i.e. use a new file name or URL for each new non-regression run), otherwise the non-regression tests on existing branches will break.

Refactor and test that the arguments passed to a certain function don’t change

We will conclude this article with one last technique that we have found useful in the context of large refactorings. The objective is to guarantee that, after the refactoring, a given function is called with the exact same arguments as before (so, in particular, it will have the same outputs).

Our technique is a bit comparable to a breakpoint that we would set programmatically, and that would export the arguments at that point of the program. We have implemented this with a context manager. The context manager intercepts the (first) call to the target function, stops the computation, without evaluating the target function, and returns the arguments of the call.

With this intercept_function_arguments context manager we can write tests like test_same_arguments.py:

A subtlety in the above is that the target function is patched using mock.patch, so you will have to be careful with imports. If you import the target function before entering the intercept_function_arguments, then fun_path should be the path where the function is imported, see the section on where to patch in the standard library.

Conclusion

We hope this post will help you keep your data pipeline under control! As we have seen, creating a fixture for each task in the pipeline makes the writing of tests very easy. The non-regression tests are also super useful to identify unexpected impacts before a contribution gets accepted. And if you want to go further and guarantee that the inputs of a certain function don’t change, then the intercept_function_arguments technique is all yours!

Acknowledgments

This article was written by Marc Wouts, a researcher at CFM, and the author of Jupytext. Marc would like to thank the Portfolio team for the collaboration on the pipeline, the Open Source Program Office at CFM for the support on this article, Florent Zara for the time spent reading the many draft versions of this post, Emmanuel Serie and the Dask Discourse Group for advice on Dask.

Originally published at http://github.com.

--

--

Marc Wouts
CFM Insights

Author of Jupytext and ITables. I love maths, data visualization and programming in mixed languages