The cuStreamz Series: Running Streaming Word Count at Scale with RAPIDS and Dask on NVIDIA GPUs

Chinmay Chandak
RAPIDS AI
Published in
4 min readNov 19, 2020

In the cuStreamz introduction blog, we demonstrated how to implement a classic Streaming Word Count example using RAPIDS cuStreamz on GPUs. In this blog, we show how to easily and efficiently scale that same word count job in a distributed fashion to leverage multiple GPU machines.

Here is the notebook that runs streaming word count end-to-end in a distributed mode using Dask. This cuStreamz job reads streaming data in JSON format from a Kafka topic, computes the updated word count after reading every batch, and writes the latest word count result back to Kafka. The details and logistics on how one can start the Dask scheduler and workers are all provided in the notebook.

Deployment Tips and Tricks

Here are some helpful tips and tricks that have helped us write and monitor production-grade large-scale streaming jobs in cuStreamz.

Tune Dask Configurations

While running cuStreamz jobs in a distributed setting, Dask workers can sometimes get really busy with long-running computationally-expensive functions which then can cause connection issues between the workers and the scheduler. For that reason, we recommend tweaking a few default Dask configurations like “distributed.comm.timeouts.tcp” as mentioned in the notebook.

Use Multiple Processes

In general, there’s a non-trivial amount of control flow in Python that makes threading more inefficient than using processes. For this reason, we recommend using multiple processes (one for each Dask worker), in workflows with high-speed input rate. Each worker process can have multiple threads each. Since these workers might also use System Memory intensively, increasing the worker memory limits per process is also a good idea. This can be done using CLI itself while starting up the workers.

Use RAPIDS cuIO for Fast Data Loading

A large percentage of streaming pipelines tend to have data in Kafka in JSON format. Users can use API like cudf.read_json() inside a Stream.map() function to convert the batches of data read from Kafka in an accelerated manner into RAPIDS cuDF dataframes before starting the actual computation. If the data format is not JSON, please have a look at the IO API in cuDF, and you will most likely find a GPU-accelerated reader that can be leveraged.

Upgrade your CPU Specs Too

Majority of the heavy-lifting is done by the GPUs, you would say? Partly, yes. But reading from Kafka comprises a significant portion of time to process the streaming data end-to-end. Since reading from Kafka is CPU-based, it needs considerable CPU cores and RAM (system memory). Especially, when dealing with data coming at a high-speed input rate into Kafka and for cuStreamz to process it in real-time (or under a strict SLA), reading from Kafka becomes a bottleneck, and the more RAM and CPU cores your machine has, the better.

New Accelerated Kafka Datasource

Expanding on the point above, we will soon be releasing a blog on an accelerated Kafka datasource that boosts up the reading-from-Kafka process, and gives really awesome improvements (our experiments show up to ~2x throughput using half the CPUs) in accelerating the streaming pipelines end-to-end. To leverage this accelerated Kafka reader, please use engine=”cudf” when setting up the stream from Kafka as shown in the notebook. This eliminates the need of having to do the preprocessing step wherein the messages in Kafka are to be uploaded to the GPU as cuDF dataframes explicitly using cuIO methods as mentioned above. This cuStreamz accelerated Kafka reader requires that the data in Kafka be in JSON format for now. We are working to support other data formats like Avro and CSV soon.

Dashboards

The Dask diagnostic dashboard provides a ton of information for both real-time monitoring and debugging purposes for cuStreamz jobs. Combine that with NVDashboard for GPU Metrics, and one has an excellent way of real-time visualization to monitor system resources and performance. For example, when you run the word count in distributed mode, you would be able to see the workers on both machines processing the data (the time each batch took, each function execution within the worker took, the system memory (RAM) the worker needed, etc.).

Next Steps

Look forward to upcoming cuStreamz blogs covering:

Stay tuned, and stream on!

--

--