Building a Data Pipeline with Python Generators

Ilan Uzan
Ilan Uzan
Nov 30, 2020 · 5 min read

In this post you’ll learn how we can use Python’s Generators feature to create data streaming pipelines. For production grade pipelines we’d probably use a suitable framework like Apache Beam, but this feature is needed to build Apache Beam’s custom components of your own.

The Problem

Let’s look at the following demands for a data pipeline:

Write a small framework that processes an endless event stream of integers. The assumption is that processing will be static and sequential, where each processing unit pass the output to the next processing unit, unless defined differently (e.g. filter, fixed-event-window). The framework should include the 6 following “building blocks”:

1. stdin-source: reads one number from stdin, prints ‘> ‘ and the number afterwards. For example, if the user entered 1, it will print “> 1”
2. filter: only passes events that match a predicate (a function that returns true or false given a number). The predicate is given during the initialisation time of filter.
3. fixed-event-window: aggregates events into a fixed size array, pass it forward when full. The size of the fixed array is defined during the initialisation of fixed-event-window.
4. fold-sum: sums the value of the events in the array, and pass forward the sum.
5. fold-median: calculate the median value of the events in the array, and pass forward the median.
6. stdout-sink: prints the number to stdout and pass forward the number.

From these 6 building blocks, any pipeline can be built, here is one for example:
stdin-source -> filter(i=>i>0) -> fixed-event-window(2) -> fold-sum -> fixed-event-window(3) -> fold-median -> stdout-sink

How would you approach to solve such a problem? What I’d try to do at first was writing a function for each of those “building blocks”, and then running each of those function in an endless loop of user input, and just re-organise the functions in the order needed for the pipeline.

This approach has 2 obvious problems:

1. The filter function has to return null if it the predicate returns false, which either forces the other functions to handle nulls gracefully, or forces the pipeline to handle nulls itself - meaning, either the pipeline or the other functions have to be aware of the implementation of the filter function — if we change the order of the filter function in the pipeline, we’d have to move the relevant null checks as well.

2. The fixed-event-window function has to batch input elements and output them every X times — meaning, the function has to preserve its state across multiple executions. This requires the pipeline to not only be aware of the implementation of the filter function, but it actually has to be a part of the implementation (using variables out of the function’s scope to handle batches).

The code to write the pipeline in the example is fairly easy to write, but to write it in a way that will allow us to play around with the building blocks however we want (writing it as an actual pipeline and not just a bunch of code pieces put together in a specific manner) - that requires us to use Generators.

Generators introduction

A Generator function is a function that returns an object called a generator (a subclass of iterator). We create a generator function by using the keyword yield instead of the keyword return. Let’s start by creating one:

Here is an example of a generator that returns the 0-4 integers:

>>> def gen(): 
… for i in range(5):
… yield i

But what happens here? Unlike the return keyword, the function isn’t finished after the first yield statement. The function will finish it’s execution only when there are no more values to yield - here, it’s when the loop reaches its end.

Let’s run the function and see what happens:

>>> gen() 
<generator object gen at 0x7f58682b7820>

Well, I did say that a generator function returns a Generator object, and I also said it’s an Iterator - so if we want to evaluate the results we’ll have to do this:

# returns a generator object, the function's code isn't executed yet
>>> res = gen()
# here the code inside the generator is actually being run
>>> for j in res:
... print(j)
0
1
2
3
4

The for keyword implicitly calls the next function on the iterator until it’s out of values (we can also just use the list function on the object - this will implicitly call the next on all of the elements).

If you’re still curious about how exactly Generators work behind the scenes (and you should, it’s quite interesting), here is a really cool blog post about the subject.

Building the Pipeline

Now that we pretty much know what generators are and how we create them, let’s get back to solving the problem at hand. The task is writing a Streaming Framework, where each building block is doing some processing on the current value and passing it forward if needed, while still processing more values .
For the fixed-event-window step, we need to preserve the state of the function across multiple executions - which is exactly what Generators can do for us.

Let’s begin with the building blocks of the framework:

def stdin_source():
def try_to_int(val):
try:
return int(val)
except ValueError:
return None
for input in sys.stdin:
if input.strip() == 'exit':
exit()
val = try_to_int(input)
if val is not None:
print(‘> %d’ % val)
yield val

2. Passing forward the values that match the predicate:

def filter_numbers(numbers, predicate):
for val in numbers:
if predicate(val):
yield val

3. Batching user input to an array defined locally (the array’s state is preserved across executions because it’s a generator), passing an list of fixed size every batch_size elements:

def fixed_event_window(numbers, batch_size):
arr = []
for val in numbers:
arr.append(val)
if len(arr) == batch_size:
res = arr.copy()
arr = []
yield res

4. Summing the array’s elements:

def fold_sum(arrs):
for arr in arrs:
yield sum(arr)

5. Calculating the median for the array’s elements:

# 5. fold-median
def fold_median(arrs):
for arr in arrs:
yield median(arr)

6. Printing and passing forward the value:

# 6. stdout-sink
def stdout_sink(numbers):
for val in numbers:
print(val)
yield val

Then, let’s put all of these building blocks together as requested:

numbers = stdin_source()
filtered = filter_numbers(numbers, lambda x: x > 0)
windowed_for_sum = fixed_event_window(filtered, 2)
folded_sum = fold_sum(windowed_for_sum)
windowed_for_median = fixed_event_window(folded_sum, 3)
folded_median = fold_median(windowed_for_median)
res = stdout_sink(folded_median)

Pretty neat, right? It’s quite clear what is happening here, and it is also pretty easy to play with the order of the functions, and creating a different pipeline altogether without having to change anything else in the pipeline.

But, this pipeline won’t actually start running - this is just the pipeline’s definition, and if we’ll leave it as is nothing will happen - we created a bunch of generator objects and put them together to a pipeline - but no data is sent to the pipeline since nothing is iterating over the generator, so the user isn’t even prompted for input. In order for this to run, we just have to iterate over the pipeline’s result:

....
res = stdout_sink(folded_median)
# implicitly calling next() on the generator object until there are # no more values
list(res)

We can just run a part of the pipeline if we want - if we choose to iterate over the folded_sum generator then the pipeline will run only up to that step.

Summary

All of the code in the post can be found here - hope you found the post interesting and learned a bit about Generators and how they can be of use. In the next post I’ll talk about how we can use this feature to write custom components in Apache Beam pipelines.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data…

Sign up for Analytics Vidhya News Bytes

By Analytics Vidhya

Latest news from Analytics Vidhya on our Hackathons and some of our best articles! Take a look.

By signing up, you will create a Medium account if you don’t already have one. Review our Privacy Policy for more information about our privacy practices.

Check your inbox
Medium sent you an email at to complete your subscription.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Ilan Uzan

Written by

Ilan Uzan

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store