Data pipeline recipes in Python

Iftimie Alexandru
10 min readDec 24, 2023

--

As a Computer Vision engineer, one of my jobs was to train Neural Network models. To train those models I had access to very large datasets (TB) of videos, but the problem was that it was difficult to manage all that data.

In this article, I will share with you some of the techniques I used in Python to make my life easier as a researcher.

Messy data

The data was not always in the best format. Sometimes, I had video files of a few GBs associated with a metadata file containing labelings for each frame, other times the video was split into .png and .xml files for each frame.

The location of the data was not very well organized either. Some datasets were located on Windows Servers, others on Linux systems, others on some developer desktops, and it wasn’t easy to have proper code reading from all of these sources.

Also, for some projects, the datasets had to be processed only on Windows systems because some internal image processing libraries were built only for Windows.

Dealing with problem

Sometimes, the work of a researcher can feel overwhelming just because of the data. But there is a light at the end of the tunnel, and we can write nice and clean code for dealing with the imperfect world that we live in.

Using some advanced Python features and libraries, I managed to solve important problems like dealing with different OS environments, data-formats, versioning, and performance, and here I am going to share with you some simple recipes for solving complex problems.

A dummy problem

We’ll start with a simple problem. Suppose we have a dataset organized in multiple files, and we want to get every word inside it.

It is a trivial problem, and it can be solved in many ways, even in fewer lines of code than I am about to show you, but hopefully, it is complex enough that you can relate to real-world projects that you worked on.

The data

To do such a thing, my junior self would write something like below.

It’s not too bad. The first thing we can do is to refactor it a bit just to make it easier to understand for other developers. I will apply some of the lessons learned from reading Clean Code by Robert C. Martin.

Clean up the code

One of the lessons from the aforementioned book is that if there are commented sections of code, that is a sign that they need to be moved to separate functions.

Also, it’s better to arrange them in a top-down hierarchical view. The main function should stay at the top, then, the rest of the functions should be defined below in a Breadth-First-Search fashion.

This should be better. If a new developer looks at this code, it’s sufficient to look at the main function to get a good idea of what is happening without looking at the internals.

Improving memory performance

The problem with all of this however is that it’s not very memory friendly. Especially when dealing with large datasets, the computer may not have enough memory to fit all of the data. One quick way to solve all of this is to use python generators which will lazy load the data, and process one item at a time.

Quick python generators introduction

Suppose we have the following function that returns a list of numbers

def normal_function():
result = []
for i in range(5):
result.append(i)
return result
print(normal_function()) # [0, 1, 2, 3, 4]

We can change it to a generator, but we also need to change the way we load the results from it.

 def generator():
for i in range(5):
yield i

If we call it simply we will get an instance of a generator.

print(generator()) # <generator object generator at 0x7f94fbfad5f0>

To actually load the results we can iterate like below.

generator_instance = generator()
for item in generator_instance:
print(item)

So what is the difference? The flow of execution inside the generator function is paused until elements from it are needed again. It means that the instruction pointer will jump back and forth from the main function to the generator. This will allow to process one element at a time instead of having an entire list of items in the memory.

Maybe it’s a bit confusing now, but I hope the video at the end of this section will make it clearer.

I will also share with you some other ways to get data from a generator, that I will use over the course of this article. For example, I can just call the list function over the generator instance.

print(list(generator())) # [0, 1, 2, 3, 4]

Or we can iterate in the most verbose way.

generator_instance = generator()
while True:
try:
print(next(generator_instance))
except StopIteration:
break

There are also syntactic sugar ways to create a generator from an iterable. I can re-write the generator above in the following way which can save 1 or 2 lines from time to time, and maintain the same behavior.

def generator():
yield from range(5)

The next great thing about generators is that we can combine them easily in the following way.

def generator1():
for number in range(1, 5):
yield number
def generator2(another_generator):
for number in another_generator:
new_value = number * 2
yield new_value
g1 = generator1()
g2 = generator2(g1)
for number in g2:
print(number)

To understand better how the execution works in this scenario, maybe the following video will help you understand better.

Refactor with generators

Coming back to the original problem, it’s pretty easy to change the previous functions to generators.

What will happen now is that a filepath is read, then the file is opened, the first line is read, the line is split into a list of words, and each word from that line is collected in the main function.

Then the whole process repeats until we have no more words on the current line, then no more lines in the current file up until there are no more files.

Now it’s better. It only took a few lines to change, and the layout is pretty much the same. We can visualize it as in the image below. Everything is linear.

Each box is a generator

Copying and chaining generators

But things can still get complicated when contributing to a real-life project. What if we discover that we want to add multiple input sources or we may want to process a file or a line in a different way. We can solve this problem by creating a Directed Acyclic Graph in Python using generators.

Suppose for this problem that we want to collect all the words from the file, but also all the words after each line have been reversed, so we’ll have double the amount of data. It’s a silly requirement, but you can think of it as another operation that needs to be applied to an item. So we may want to change the flow like in the image below.

To solve this problem, the new thing that I will introduce is the itertools.tee function that will duplicate a generator. Normally a generator can be used only once, as you can see in the snippet below.

def generates_numbers():
yield from range(5)
g = generates_numbers()
print(list(g)) # [0, 1, 2, 3, 4]
print(list(g)) # []

Thetee function is inspired by the Linux command, and its name comes from the fact that it looks like a T or more visually like a pipe with one input and two outputs. In the snippet below you can see how it acts.

from itertools import teedef generates_numbers():
yield from range(10)
g = generates_numbers()
g1, g2 = tee(g)
print(list(g1)) # [0, 1, 2, 3, 4]
print(list(g2)) # [0, 1, 2, 3, 4]

Now, the original g should be carefully used. Iterating over it or pulling one item from it will leave its two copies with fewer elements.

from itertools import teedef generates_numbers():
yield from range(10)
g = generates_numbers()
g1, g2 = tee(g)
print(next(g)) # # consumes one element from g and prints 0
print(list(g1)) # [1, 2, 3, 4]
print(list(g2)) # [1, 2, 3, 4]

Coming back to the original problem, it will be adapted in a similar way. Now the graph will have 2 branches. To combine the generators, the itertools.chain function will be used, which will make it possible to concatenate the results. There are many ways to combine generators that can have interesting results. More on them on itertools package.

Improving time performance

Ok, that’s cool. But what if some of the operations are CPU or IO bound. We can parallelize this. There are 2 popular libraries for doing this in Python. One of them is multiprocessing, the other is concurent.futures. I used the latter in this article because I think it has a simpler API.

In the snippet below, I came back to the original linear problem, but now one of the stages will be parallelized. Let’s assume that processing a file in the “lines” generator is a slow operation. We can solve this problem by extracting the actual processing of the file to a new function, and mapping this function over the input filepaths generator.

Using a cluster

We can improve our pipeline by using frameworks for distributed tasks. I believe there are tens of good frameworks. Personally, I played with Celery and Dask only.

So instead of submitting the job to a few CPUs, we can submit the job to a few machines.

Obviously, some things could be improved here. Instead of doing a blocking gather, we could fetch from whichever task finished first. This would require more code, but it’s a feasible idea.

Considerations when using multiprocessing or cluster configurations

In distributed programming, you should use only simple data types, like ints or strings, or floats when passing data from one generator to another. It’s also a good idea to keep things simple. Anything other than that either complicates the codebase or creates a network or CPU overhead.

In the past, I wasted a lot of time on failed experiments by trying to work with file pointers in multiprocessing, or I was trying to share a generator between multiple processes.

Including monitoring

Up until now, we assumed that our code works perfectly, that data has the same format everywhere and every file could be opened. But in the real world, things may not stay the same. It can happen that after a few hours of processing data, you end up with a broken pipeline, and you have to restart the whole process.

To deal with such a problem in our example, we could move the inner part of the processing to try-catch blocks. That would solve our problem, but the code is starting to look verbose.

When things are getting duplicated, it’s a good idea to remove them. Luckily, in Python, we can do this easily by using decorators.

A quick intro to decorators

In Python, a decorator is just a function that takes as input a function and returns another function.

A very basic example looks like below. The Python interpreter will replace underneath say_world with new_function , so when the say_world is called, it will actually go inside new_function , call the original say_world then it will also do its extra functionality which is to print("Hello")

def decorator(function_to_decorate):
def new_function():
print("Hello")
function_to_decorate()

return new_function
@decorator
def say_world():
print("World")
say_world() # will print "Hello World"

Writing @decorator is just syntactic sugar for writing

say_hello = decorator(say_hello)

Furthermore, we can make our decorator more general by capturing all input arguments to the original function and returning whatever is returned.

In the example below, the new_function is capturing all positional arguments args , which in this case is simply "Alex"and all keyword arguments *kwargs which is num_exclamation_marks=5

def decorator(function_to_decorate):
def new_function(*args, **kwargs):
print("Hello")
return function_to_decorate(*args, **kwargs)

return new_function
@decorator
def say_world(username, num_exclamation_marks):
print("World", "!" * num_exclamation_marks)
return 10
say_world("Alex", num_exclamation_marks=5)
# will print "Hello World !!!!!"

And the last step is to make our decorator even more flexible by making it accept parameters. I will take the initial decorator example and update it.

def decorator(user):
def inner_decorator(function_to_decorate):
def new_function():
print(f"{user} says Hello")
function_to_decorate()

return new_function
return inner_decorator
@decorator("Alex")
def say_world():
print("World")
say_world() # will print "Alex says Hello Word"

Again, @decorator("Alex")is again just syntactic sugar for the code below:

parametrized_decorator = decorator("Alex")
say_world = parametrized_decorator(say_world)

Removing duplication using decorators

Coming back to our original problem now we can create a decorator for error handling. To do this, I extracted the inner part of the loops to separate functions that were decorated with monitor_error . Because this decorator returns a default value, either None or [] in case of exceptions, I have to add an extra check in every generator to not yield it.

Decorator for generator

I especially wanted to add this one, because it may happen that you want to add some monitoring to every item in your pipeline. Maybe you may find it useful to run a running average or count the number of processed items.

Conclusions

Writing data pipelines is a complex task. Especially when you work as a researcher and you need to make tons of experiments that require different data processing changes.

I am pretty sure that there are other ways to solve such problems, but these were the tools that helped me, and I hope that maybe these will help some of you as well.

--

--