In my last project I stumbled on the problem of having to process enormous amounts of data which did not fit in memory. Obviously that means that I have to process smaller chunks and stream the data. But I did have a hard time of doing that while keeping the code clean and well readable.
In my frustration I looked around and was pointed towards the way NodeJS uses streams and that the approach might be what is missing for my app — even if it is a monolithic Swift app. So I gave it a shot to see if copying their implementation would resolve my problems. Here’s what I found.
I divided this article into the following sections:
- The problem: Performing multiple asynchronous processing steps on a chunk
- How NOT to do it: The naïve and easy approach of processing data chunks
- How to use NodeJS’ stream pattern
- Reflecting: Is this actually a good idea?
The problem: Performing multiple asynchronous processing steps on a chunk
The scenario I am working on is a video processing application. I need to read all frames, process some with an asynchronous routine on the gpu, create a video based on the result of the routine, combine all smaller videos into a new video.
To make this a little more complicate, I also need to carry the audio for all the frames in the new video and I am not guaranteed that the frames are read in order.
How NOT to do it: The naïve and easy approach of processing data chunks
The first solution that might come to your mind, and the first one I tried, was to just create an array, read all frames into the array and process the frames from there. That way I can define an upper limit on the array and only read in new frames if there is room left. Once a frame is processed I can remove it from the array and the next frame can be added.
Here is the code if you are interested:
The first thing I notice, when I see the code, is that the logical process order is not the same as the order of processing steps in the code. Here is the code order and I added a number to indicate the logical order of each step: 4. Clean data, 5. Write data, 1. Read data, 2. Sort data and 3. Analyze data
There is a reason for the steps to appear in the code in their current order. Namely, you will want to make sure that you can write as much data to your output so that you can read in new frames. So step 5. Writing data goes above step 1. Loading data. But to be able to write the data you need to filter out all frames which should not be part of the output. So step 4. Clean data needs to happen before step 5. Writing data.
The second problem is that some of the processing steps are hidden by control flow. That is the case for steps 1. Read data through step 3. Analyze data.
Finally, this code will lead to a lot of busy waiting since you need to walk through the entire queue in every iteration to see which data needs to be filtered. This is due to the fact that you cannot guarantee that an analyzed frame can be written right after being analyzed. As a result, there might be quit a few frames in the array which still need to be written but have already been passed through the filter.
Summing up, this naïve implementation results in the code order of processing steps being different than the logical order, processing steps are hidden by control flow and a lot of busy waiting might happen to process the data.
How to use NodeJS’ stream pattern
NodeJS’ streams are implemented in a combination of a push and pull flow. A stream producer starts producing elements once a subscriber registers itself. Once started, the producer passes a message to the subscriber. The subscriber then tells the producer if it wants more messages. Once the subscriber is filled up, it will ask the subscriber to stop producing messages, thereby ending the push piece of the flow.
When the subscriber is ready to receive elements again, it calls a continue method on the producer which will then start emitting messages until the subscriber is filled up again. This allows the stream to deal with back pressure in your system and in my case allows me to keep the memory usage under control.
In case of a finite producer, the producer ends the stream with a complete message signaling all its subscribers that the stream finished.
With the concept out of the way, let’s look at the code for a producer: (I will not include the complete message in the code since you can imagine a complete method on the subscriber. There is no magic there.)
As you can see the producer sends — in my case — frames to the subscriber and the subscriber returns whether or not the producer can send more messages. In this implementation the producer provides a callback to the subscriber with every event. It is the subscriber’s responsibility to save the callback if it wants to able to restart the producer.
This subscriber contains a queue to hold elements while they are processing on the gpu. Based on the queue the subscriber can easily detect whether or not it can handle more elements at the current time.
If the subscriber can not handle more elements, it will save the producer’s callback in the drainHandler property. Once the gpu routine is finished and the queue is reduced, the callback will be executed and the producer can start sending messages again.
The final piece to the puzzle is a simple operator. The nice thing about a simple synchronous operator is that you do not need to concern yourself with the callback of the producer. You can just pass that along to the next subscriber since the operator does not hold a queue on its own.
Important note: You will want to schedule the execution of the drainHandler on the main thread (with DispatchQueue.main.async) in order to not get into callback hell with multi-threading. Otherwise it is possible that you fill a queue in a callback handler but you are still stuck in a method on a different thread which assumes that the queue is empty. Finding these errors really takes some time and is not worth the hassle.
Is this actually a good idea?
To make it short: I don’t have a definite answer to this question. It works well for me and it is a lot more efficient than the naïve approach. But would I have known who complex this system is, I would have implemented an easier one. Nevertheless, since I already did the work, I am going to keep it.
One of the benefits is that the stream’s implementation facilitates using many different producer, subscriber and operators. For example, here is a diagram of a part of my solution:
Pros: what are the benefits of this implementation?
It solves the 3 problems I set out to solve:
- ✅ Order of process steps in code equals the logical order
- ✅ No processing steps are hidden by control flow
- ✅ No busy waiting
And there are some more benefits:
- Clean code
- Clear way to extend and alter the process later on
- Does not use a battle tested library
- Difficult to develop since you need to make sure that no events are lost somewhere in the stream. (Unit test!!!)
- Lots of boilerplate code especially if you want objects which combine multiple streams and act as a publisher.
- Determining whether or not a operator is completed needs some thinking to get right. This is a lot more complex than I first expected.
- You are potentially running into callback hell.
- If you don’t schedule every operator on the same thread you get funny errors. E.g. a queue being full even though you just checked it is empty, reordered messages, etc.
Alternatives to implementing streams yourself:
- Apple’s Combine Framework
- Push-only streams
- Pull-only streams (This is what I would try if I were to re-implement the workflow and I could not use Apple’s Combine Framework.)
- RxSwift (There are a lot of sites out there which only mention back pressure with the terms: “we will not go into details here since this has been solved in many other articles”. However, none of the articles I found really solved the problem.)
I hope you enjoyed the article 🎉