In this post you’ll learn how we can use Python’s Generators feature to create data streaming pipelines. For production grade pipelines we’d probably use a suitable framework like Apache Beam, but this feature is needed to build Apache Beam’s custom components of your own.
Let’s look at the following demands for a data pipeline:
Write a small framework that processes an endless event stream of integers. The assumption is that processing will be static and sequential, where each processing unit pass the output to the next processing unit, unless defined differently (e.g. filter, fixed-event-window). The framework should include the 6 following “building blocks”:
1. stdin-source: reads one number from stdin, prints ‘> ‘ and the number afterwards. For example, if the user entered 1, it will print “> 1”
2. filter: only passes events that match a predicate (a function that returns true or false given a number). The predicate is given during the initialisation time of filter.
3. fixed-event-window: aggregates events into a fixed size array, pass it forward when full. The size of the fixed array is defined during the initialisation of fixed-event-window.
4. fold-sum: sums the value of the events in the array, and pass forward the sum.
5. fold-median: calculate the median value of the events in the array, and pass forward the median.
6. stdout-sink: prints the number to stdout and pass forward the number.
From these 6 building blocks, any pipeline can be built, here is one for example:
stdin-source -> filter(i=>i>0) -> fixed-event-window(2) -> fold-sum -> fixed-event-window(3) -> fold-median -> stdout-sink
How would you approach to solve such a problem? What I’d try to do at first was writing a function for each of those “building blocks”, and then running each of those function in an endless loop of user input, and just re-organise the functions in the order needed for the pipeline.
This approach has 2 obvious problems:
1. The filter function has to return null if it the predicate returns false, which either forces the other functions to handle nulls gracefully, or forces the pipeline to handle nulls itself - meaning, either the pipeline or the other functions have to be aware of the implementation of the filter function — if we change the order of the filter function in the pipeline, we’d have to move the relevant null checks as well.
2. The fixed-event-window function has to batch input elements and output them every X times — meaning, the function has to preserve its state across multiple executions. This requires the pipeline to not only be aware of the implementation of the filter function, but it actually has to be a part of the implementation (using variables out of the function’s scope to handle batches).
The code to write the pipeline in the example is fairly easy to write, but to write it in a way that will allow us to play around with the building blocks however we want (writing it as an actual pipeline and not just a bunch of code pieces put together in a specific manner) - that requires us to use Generators.
A Generator function is a function that returns an object called a generator (a subclass of iterator). We create a generator function by using the keyword yield instead of the keyword return. Let’s start by creating one:
Here is an example of a generator that returns the 0-4 integers:
>>> def gen():
… for i in range(5):
… yield i
But what happens here? Unlike the return keyword, the function isn’t finished after the first yield statement. The function will finish it’s execution only when there are no more values to yield - here, it’s when the loop reaches its end.
Let’s run the function and see what happens:
<generator object gen at 0x7f58682b7820>
Well, I did say that a generator function returns a Generator object, and I also said it’s an Iterator - so if we want to evaluate the results we’ll have to do this:
# returns a generator object, the function's code isn't executed yet
>>> res = gen()# here the code inside the generator is actually being run
>>> for j in res:
The for keyword implicitly calls the next function on the iterator until it’s out of values (we can also just use the list function on the object - this will implicitly call the next on all of the elements).
If you’re still curious about how exactly Generators work behind the scenes (and you should, it’s quite interesting), here is a really cool blog post about the subject.
Building the Pipeline
Now that we pretty much know what generators are and how we create them, let’s get back to solving the problem at hand. The task is writing a Streaming Framework, where each building block is doing some processing on the current value and passing it forward if needed, while still processing more values .
For the fixed-event-window step, we need to preserve the state of the function across multiple executions - which is exactly what Generators can do for us.
Let’s begin with the building blocks of the framework:
- Reading a stream of integer input from the user, and passing it forward:
return None for input in sys.stdin:
if input.strip() == 'exit':
exit() val = try_to_int(input)
if val is not None:
print(‘> %d’ % val)
2. Passing forward the values that match the predicate:
def filter_numbers(numbers, predicate):
for val in numbers:
3. Batching user input to an array defined locally (the array’s state is preserved across executions because it’s a generator), passing an list of fixed size every batch_size elements:
def fixed_event_window(numbers, batch_size):
arr = 
for val in numbers:
arr.append(val) if len(arr) == batch_size:
res = arr.copy()
arr = 
4. Summing the array’s elements:
for arr in arrs:
5. Calculating the median for the array’s elements:
# 5. fold-median
for arr in arrs:
6. Printing and passing forward the value:
# 6. stdout-sink
for val in numbers:
Then, let’s put all of these building blocks together as requested:
numbers = stdin_source()
filtered = filter_numbers(numbers, lambda x: x > 0)
windowed_for_sum = fixed_event_window(filtered, 2)
folded_sum = fold_sum(windowed_for_sum)
windowed_for_median = fixed_event_window(folded_sum, 3)
folded_median = fold_median(windowed_for_median)
res = stdout_sink(folded_median)
Pretty neat, right? It’s quite clear what is happening here, and it is also pretty easy to play with the order of the functions, and creating a different pipeline altogether without having to change anything else in the pipeline.
But, this pipeline won’t actually start running - this is just the pipeline’s definition, and if we’ll leave it as is nothing will happen - we created a bunch of generator objects and put them together to a pipeline - but no data is sent to the pipeline since nothing is iterating over the generator, so the user isn’t even prompted for input. In order for this to run, we just have to iterate over the pipeline’s result:
res = stdout_sink(folded_median)# implicitly calling next() on the generator object until there are # no more values
We can just run a part of the pipeline if we want - if we choose to iterate over the folded_sum generator then the pipeline will run only up to that step.
All of the code in the post can be found here - hope you found the post interesting and learned a bit about Generators and how they can be of use. In the next post I’ll talk about how we can use this feature to write custom components in Apache Beam pipelines.