The cuStreamz Series: The Accelerated Kafka Datasource

Chinmay Chandak
RAPIDS AI
Published in
4 min readNov 20, 2020

By: Chinmay Chandak, Jeremy Dyer

https://www.istockphoto.com/photo/crazy-ride-on-the-night-by-car-gm481924646-69957833

A large percentage of production streaming pipelines today have Kafka as their source. Over the years, Apache Kafka has become one of the most popular open-source distributed stream-processing platforms for handling real-time data feeds. Consuming streaming data from Kafka, especially at extremely high speeds, is a CPU-intensive operation. At NVIDIA, when we started development on cuStreamz, the first GPU-accelerated streaming data processing library built on top of RAPIDS, reading from Kafka at high speeds was found to be a bottleneck for processing streaming data on GPUs efficiently. Since the speed with which data was being consumed from Kafka and being uploaded to GPU was bottlenecked, we were not able to exploit GPU parallel computing for efficient streaming computations.

Currently, reading from Kafka is CPU-bound. No questions there. But, in the case of cuStreamz, a considerable portion of this reading-from-Kafka bottleneck stems from a Python Global Interpreter Lock (GIL) issue within the Confluent Kafka (CK) Python library. The CK library is a robust, reliable implementation of a Python API for interacting with Kafka used by multiple open-source libraries, including Streamz. However, the GIL issue was a definite roadblock to maximize performance in cuStreamz. Let’s look at how this particular GIL issue severely impacted end-to-end streaming performance.

G(u)il(ty) as Charged!

When the CK library retrieves messages from Kafka using librdkafka (the Apache Kafka C/C++ library) in the C/C++ layer, it creates a PyObject for each message before returning them back to the Python layer for the Python CK consumer to consume. The Python interpreter will only execute one thread at a time (because of the GIL) to create PyObjects which in turn bottlenecks the speed with which we can read a batch of messages from Kafka. So, there is really no benefit of multithreading.

As expected, using multiple processes to consume from Kafka delivers a massive speedup as compared to using multiple threads within a single process. By starting multiple Dask workers, each worker being a process, to achieve high consumer throughput, results in high end-to-end stream processing performance. But starting many discrete processes on a GPU is not at all an ideal solution. Each process (in this context, a Dask worker) creates a CUDA device context on the GPU. This eats up a considerable amount of GPU memory, which otherwise could have been used in useful streaming computations. For example, when using NVIDIA T4 GPUs (16GB GPU Memory), along with 20 Dask worker processes, we noticed each worker using ~400MB for a CUDA Context (w/ RAPIDS cuDF v0.16). This means that we expended ~8GB GPU Memory just like that, and only ~8GB was available for actual streaming data computation. We needed to resolve this somehow!

An Accelerated Kafka Data Source

Since our goal is to upload batches of streaming data onto the GPU as RAPIDS cuDF dataframes for downstream processing, why not avoid the intermediate Python layer in the route altogether. How about making the librdkafka C++ layer directly upload the batch of Kafka messages to the GPU instead of first gathering the messages back to Python and then creating a cuDF dataframe? And we implemented exactly this in the cudf_kafka package as the accelerated Kafka data source. This circumvents the Python GIL issue (by eliminating Python altogether, heh!) and accelerates the streaming pipelines by eliminating multiple copies of the batches of data in memory and providing the most efficient path for the data from Kafka to the GPU memory that is currently possible. The cuStreamz package provides Streamz-compatible API as wrappers on top of the cudf_kafka functionality.

Performance Impact

Our testing reveals that we are able to achieve double the streaming throughput (process ~2x the data in the same time) using less than half the number of processes/Dask workers (hence less than half the GPU memory as compared to before being used for non-computation operations like CUDA context creation). This is a huge boost to performance. More importantly, this performance improvement comes at a much lower cost because we cut down the CPU (both CPU cores and System Memory used by the Dask worker processes) usage considerably. But, in general, there’s still a non-trivial amount of control flow in Python and in the various libraries that cuStreamz encapsulates which makes threading more inefficient than using processes. We are not yet at the stage where we can get the same streaming performance with a single process.

API

As for ease of using this accelerated data source, it’s literally just an additional parameter in the API to read from Kafka — that’s genius. For an example, please refer to our Word Count @ Scale blog. Currently, the accelerated Kafka data source supports reading data only in JSON format from Kafka directly into cuDF dataframes. But, we plan to roll out support for other data formats in Kafka like Avro and CSV soon.

Next Steps

Look forward to upcoming cuStreamz blogs covering:

  • Running cuStreamz Word Count @ Scale on Kubernetes as an Orchestration Layer
  • cuStreamz’ Journey through Performance and Benchmarking

Stay tuned, and stream on!

--

--