The cuStreamz Series: Checkpointing Through Reference Counting in RAPIDS cuStreamz

Jarod Maupin
RAPIDS AI
Published in
7 min readSep 8, 2020

Introduction

Checkpointing is a necessary feature for production streaming data pipelines and one of the major milestones in bringing cuStreamz into reality. It saves a record of the application’s position in a data stream, so it can be restarted from where it left off in case of failure. cuStreamz is built on top of the Python Streamz library, much of which operates asynchronously. We cannot know when data has been completely processed unless there is some mechanism for tracking when these asynchronous operations complete. For example, if a checkpoint is made only upon reading data from a source, it is possible that the processed result can be lost in the pipeline. The application hosting the pipeline can be terminated before the processed result of the data is written to the target. To prevent data loss, a checkpoint can only be created after each micro-batch has been successfully processed.

In this post, we walk through how a technique involving metadata and reference counting can be used to determine when a checkpoint should be created to achieve a zero data-loss, high-speed pipeline with at-least-once semantics.

Summary

cuStreamz has an assortment of functions that can be used to manipulate streaming data and control the flow of the pipeline. These functions can be tied together into a pipeline by defining which functions receive the output of other functions. In the pipeline, metadata is passed downstream to accompany the associated data. This metadata contains a reference counter that is incremented for each function node. It enters and decremented when the node no longer holds a reference to the associated data. When there are no longer any nodes holding a reference to the data, it is assumed that the data has exited the pipeline.

For the purposes of checkpointing, we can categorize the functions in cuStreamz as either synchronous or asynchronous.

  • Synchronous functions return only after emitting data downstream every time it is received.
  • Asynchronous functions may have a cache, a delay, or drop data. They may emit data at some time in the future, but they may return before emitting the data.

Metadata

Metadata is a common feature in data pipelines. In Kafka, for example, the message key is often used to save details about the value. It proved useful for future use cases to implement metadata and use it as the container for the reference counting. This container is what is passed downstream.

For most of the functions in cuStreamz, managing this container is a simple matter of forwarding it downstream without making any changes. However, the asynchronous functions require more attention due to the fact that they do not always immediately emit data downstream. For these functions, the metadata must also be retained with the associated data. In other functions, data is combined from multiple streams. We must carefully consider how to merge the metadata from each stream. Also, functions that collect data, and eventually emit data as tuples need solutions on how they will emit the metadata. The main rule on how metadata is handled is that it must be emitted with the data to which it is tied, even if that data is emitted multiple times. This means that some of the nodes cache and group the metadata in order to obey the rule.

Reference Counting

Reference counting is a method often used in memory management. When an object is instantiated, the number of references to that object is maintained in a counter. When the counter reaches zero, then the memory used by the object can be freed. The same technique is used in cuStreamz to determine if any of the functions in the pipeline still hold a reference to a datum. When the reference counter associated with a datum reaches zero, we can say the datum is “done.”

In practice, most users will be reading data from an external source. Using this provided data source has the reference counting built-in, and it will work out-of-the-box with Kafka’s existing checkpointing mechanism. Notice in the following example; the user does not need to define anything more than the group.id parameter for Kafka to specify the consumer group in Kafka.

args = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
}
source.from_kafka_batched('my-topic', args, npartitions=1) \
.map(work) \
.sink(print)

How It Works

For synchronous functions, it is simple to know when data has exited the pipeline. In cuStreamz, when the user invokes a function like .map() or .sink()the functions are not immediately executed; they are only staged. The functions only execute on the data when data is emitted into the pipeline with a call to .emit(). During execution, each stage will have an .update() function that is called from the previous stage. This causes the call stack to grow until the end of the pipeline is reached, at which point, the stack is unwound back to the starting point as each function completes. If you are unfamiliar with the APIs in the following examples, please see the Streamz documentation.

To illustrate, take the following example:

# Example UDF
def inc(x):
return x + 1
# Stage the pipeline
source = Stream()
L = source.map(inc).map(inc).sink_to_list()
# Create the reference counter and pass it into
# the pipeline via the metadata
ref = RefCounter()
source.emit(1, metadata=[{'ref': ref}])

Note that the user should not have to create a RefCounter in most cases. It is already managed by the source. When .emit(1) is called, the call stack will evolve like so:

call-stack
time offset frame
0 0 source.emit(1)
1 1 map.update(1) # first call to map
2 2 self._emit(2) # from map
3 3 map.update(2) # second call to map
4 4 self._emit(3) # from map
5 5 stream.sink_to_list(3)
6 # Unwind the stack 0

We can see that a call from the user to .emit(1) from the code above will not return until the result has been returned from the last function in the pipeline.

The difficulty arises when asynchronous functions are introduced into the pipeline because they often return before calling the next function in the pipeline. The following example illustrates this using the .buffer() function.

# Example UDF
def inc(x):
return x + 1
# Stage the pipeline
source = Stream()
source.map(inc) \
.buffer(1000) \
.filter(lambda x: x % 2 == 0) \
.sink(print)
# Will be called when the data is “done”
def done_callback():
print('Your data is done!')
# Create the reference counter and pass it into
# the pipeline via the metadata
ref = RefCounter(cb=done_callback)
source.emit(1, metadata=[{'ref': ref}])

This example will produce the following series of events:

call-stack
time offset frame
0 0 source.emit(1)
1 1 map.update(1)
2 2 self._emit(2) # From map
3 3 buffer.update(2)
4 0 # buffer returns. unwind stack to 0
n+0 1 self._emit(2) # from buffer at time n
n+1 2 filter.update(2) # "2" passes filter
n+2 3 self._emit(2) # from filter
n+3 4 sink(print)
n+5 0 # The stack unwinds to 0

Here we can see two problems. The first being that the asynchronous node creates a disconnect in the pipeline. The .emit(1) call will return after the data is cached in the buffer. The data will be emitted from the buffer at some unknown time in the future. This means that relying on when the original call to .emit(1) returns is not sufficient in determining when data has been completely processed. It is possible that an error can occur after .buffer() has emitted the data downstream. Secondly, the pipeline is split into two pipelines. How can we track that the data has been fully processed from both pipelines?

With the use of reference counters, we can better track when data has exited the pipeline. In this technique, a counter object is created and emitted into the pipeline to accompany the data. The counter provides a callback to notify the original sender when the count is decremented to zero. The callback is an asynchronous notification to indicate that data has been completely processed. Before the data is emitted forward in the pipeline, the count is incremented by the number of downstream functions. After each function completes, the count is decremented by one. If an asynchronous function receives and caches the data, it is responsible for incrementing the counter by one, and decrementing the counter after it is no longer holding a reference to that data.

Let’s revisit the previous example and add reference counters.

Animation of RAPIDS cuStreamz event streaming workflow
call-stack
time offset frame counter
0 0 source.emit(1) 1
1 1 map.update(1) 1
2 2 self._emit(2) # From map 2
3 3 buffer.update(2) # Retains a reference 3
4 0 # buffer returns. unwind stack to 0 1
n+0 1 self._emit(2) # from buffer at time n 2
n+1 2 filter.update(2) # "2" passes filter 2
n+2 3 self._emit(2) # from filter 3
n+3 4 sink(print) 3
n+5 0 # The stack unwinds to 0 0

The .emit() method increments the reference counter before each function and decrements it after each function has completed. The .buffer() function does the same to indicate that it is holding a reference to the data. We can see that, even though the call stack has unwind after the call to .buffer(), the reference counter has not reached zero. Only after the .sink_to_list() function has completed do we send a notification of completion.

Conclusion

In the end, what this means for the average user who is likely going to be reading from a streaming source like Kafka, is that everything will work transparently and behind the scenes. The sources and sinks for Kafka that are included in cuStreamz already have the metadata handling implemented. However, each new addition will need to properly implement the metadata handling and the reference counting if it has any asynchronous behavior as described above.

The asynchronous functionality of many of the cuStreamz functions make it difficult to track when data has been committed to the target system. Reference counting is a solution to this problem. The overhead of passing references becomes negligible when compared to the large data frames that are being processed by the GPU. At a minimum, it’s a new technique for your toolbox to solve problems in asynchronous environments.

To get started, I invite you to take a loot at the Getting Started section of the introduction post and try to use the Kafka source as shown above.

--

--