MultiThread processing pipeline with ordered output in Java

Ricky Iudica
CodeX
Published in
10 min readNov 14, 2021

Recently I came in front of a challenge: given an audio stream as input, how can I process it in close realtime? And what if the process involves several stages? I worked on a simple solution in Java, described here together with the justifications of my choices.. Enjoy it!

The Problem

As in all problems of Software Engineering, we have to start asking: what is our use case, what do we want to accomplish?

Requirements

Our system is an example of the producer / consumer pattern, where the audio source (the producer) injects data in our system (the consumer).

Let’s make some assumptions, which will probably make sense in most of the cases. They will help us define the requirements.

Audio Source

The audio source produces raw data (i.e. byte arrays) at fixed intervals of time. Let’s assume that the audio is divided into frames of fixed size and each frame contains a sequence number. If this is not the case, we can always decorate the audio source with some logic to split the data into frames.

Another requirement is that our system is linear. The output of each frame depends on itself and not on previous or following frames (we’ll see how this affects our design).

The last requirements from the data source is implicit and is a characteristic of multimedia signals: after the processing, the frames of the audio source must keep the same order (order preservation). Putting the last two requirements together, we don’t care how we process each frame as long as, in the output, the processed frames maintain the same order they had in the input.

In the following snippet you can see the APIs of the Frame that I defined (some of the methods will make more sense in the next sections).

The Engine

Typically, in an audio processing flow there are two, three or more steps (filters), which, for example, enhance the audio, compress it, etc… For simplicity, let’s assume that we have just two steps (the same reasoning can be applied with many more).

Two steps audio processing Engine

The last requirement of the Engine is that it will stop automatically. This is not mandatory for the goal of our discussion, but it’s a nice to have and will make the implementation more interesting.

To Recap

Here are our requirements:

  • The data stream shall be split in numbered frames.
  • The system shall be linear.
  • The system shall shut down automatically.
  • The filtered audio signal in the output shall contain the frames ordered by sequence number.

The architecture

Here I will describe three alternative architectures of the system, starting from the simplest (and less performant) to the most complicated (and fastest).

Serial Processing

The next diagram shows maybe the simplest implementations:

Serial implementation of the Engine

The engine is composed of two filters; we apply the first filter to all the frames before applying the second (imagine two for loops, one after the other). This solution is the simplest to implement, but is also the one which takes more time to process our audio source.

If we call t1 and t2 the average time to process a frame in the first and second filter respectively, the total processing time of a signal composed of n frame will be: n*(t1 + t2)

Parallel Processing

Since our system is linear, we can easily improve its performance by running the filters in two separate threads which process the frames independently.

Parallel implementation of the Engine

For example, while the frame i is being processed by the second filter, the frame i+1 is being processed by the first. Meanwhile, the frame i+2 is put in a queue to be taken by the first filter as soon as this is free and so on.

By doing that, we decrease the amount of time needed for the processing of the whole audio stream. Using the same notation as before, the total processing time will be: n*max(t1, t2) + min(t1, t2). If the two processing time are similar and n is a big number, this tends to be roughly equal to half the time of the Serial Processing.

Parallel Processing — MultiThread

Let’s spice things up! Nowadays most, if not all, of our hardware (laptops, smartphones, etc..) have many CPU cores; we can take advantage of this and run the same filter in more than one thread. In the previous example, we could run the two filters in two and three parallel threads each.

Parallel and MultiThread implementation of the Engine

To calculate the processing time, let’s use the same notation as before, plus f1 and f2 as the number of threads on which we run the first and the second filter respectively. The processing time becomes: n*(t1/f1 + t2/f2)

If f1 and f2 are both at least 2 and if t1 is similar to t2, for big value of n we can reduce the processing time by at least another half, which means a quarter of the time of the Serial Processing (I leave you this and the previous calculation as homework).

The Chosen Architecture

We will use this last architecture, parallel multithread, which is the most performant.

The Engine is made up of several Layer; each Layer corresponds to a step of the audio processing and is characterized by a filter and the number of threads on which that filter will run. At the input and output of each Layer there is a queue.

The queues and their APIs

We discussed so far two of the requirements: the audio source divided in numbered frames and the linearity of the system. These allowed us to analyze few architectures and decide to go for an engine with parallel and MultiThread filters.

What is missing is to analyze the consequences of the other two requirements:

  • the filtered audio signal in the output shall contain the frame correctly ordered;
  • the system shall shut down automatically.

Both requirements drive our choice on the type of queue that we use.

Queues for multithread

When dealing with multithreading, the best place to start is the java.util.concurrent package in Java. There you can find the PriorityBlockingQueue class, which is interesting because of two reasons:

  • it implements the BlockingQueue interface. This means that every insertion or extraction is managed with a lock to prevents race conditions. Moreover, there is a Condition associated to the locks; when a thread wants to extract an element, if the queue is empty, it will wait until a new element is inserted in the queue;
  • you can configure it to order its elements by some custom logic by implementing a comparator. In our case, we can sort the frames according to their sequence number; this way, when more than one frame is inserted in the queue, they will be automatically ordered.

While using the PriorityBlockingQueue would solve race conditions and frame ordering, it can’t help us with the synchronization between the layers, the queues and the output needed for the automatic shut down.

Here is how I solved this problem.

The Automatic Shut Down (which I called the “Rolling Last Frame” technique)

The logic of the automatic shut down is based on few rules:

  • when the source or any other filter finishes its processing, it will emit a particular frame, let’s call it Last Frame. This Frame will have the sequence number set to infinite ( Long.MAX_VALUE ), so that it will be automatically inserted last in the queue. The Last Frame will contain a counter called layerSize, initially set as the size of the emitter layer. In our example, the source has size 1, the first layer has size 2 and the second layer has size 3.
  • when the LastFrame enters a queue, the queue will check if there is already a LastFrame present (eventually emitted by the previous filter from a different thread). If so, it will decrease the layerSize of the LastFrame.
  • when a frame is requested from the queue, if the first frame is a data frame, it will be extracted; if it’s a LastFrame, it’s reference will be returned but the LastFrame will remain in the queue, available for other threads.
  • when a LastFrame is taken by a filter in the following layer, if the layerSize of the LastFrame is equal to 1, that filter will stop its processing and, as consequence, will emit a LastFrame to the following queue. If the LastFrame contains a layerSize bigger than 1, it will be ignored.

The next diagram shows the automatic shut down with the Rolling Last Frame. The LastFrame will be propagated (rolled) through the system. As soon as any filter receives a LastFrame with layerSize equal to 1, it will shut down (represented in grey).

Automatic shut down of the Engine using the Rolling LastFrame technique

This logic has to be implemented in two places: in the queue and in the filters.

Filter

This snippet shows the logic of the filter.

A frame is extracted from the queue (I am passing the condition as predicate, since I wanted to decouple the implementation of the queue from the inner logic of the Frame). If the frame is a data frame (not a LastFrame), it will be processed and inserted in the output queue. Otherwise, the filter checks if the frame is the last of the previous layer (i.e. if the layerSize is equal to 1). If so, the filter will emit a LastFrame to the output queue and set itself as inactive; otherwise, the filter will ignore the Frame.

Queue

The other part of the logic is implemented in the queue. I extended the logic of PriorityBlockingQueue and implemented what I called ConditionalPriorityBlockingQueueImpl (I know, I could have found a better name).

The method to extract the frame is relatively simple: the queue takes the first frame (let’s remember that they’re ordered by sequence number). The queue applies the predicate, used to check whether to extract the frame, passed as parameter to the Frame. Depending on the result of the predicate, the frame is extracted and the queue updated, or just returned as reference.

The method to insert a LastFrame is a little more complex. Here, once again, I decoupled the logic of the queue from the Frame. First, the queue checks if there’s already a LastFrame in the queue. If so, it call the editFunction on it (to decrease the layerSize). If not, it will create a new frame (with the elementCreatorMethod, a static constructor out of the scope in this discussion) and insert it in the queue.

The MultiThread Engine

Since the engine shuts down automatically, its APIs become really simple. Besides, each filter contains its input and output queues as properties, so I can design the engine as a simple wrapper of those filters and delegate its construction to a factory, also out of scope.

Testing

To test the engine, I used the configuration described before: 1 source, 2 threads in the first layer, 3 threads in the second layer, 1 serial writer.
For the sake of this test, the filters in each layer copy the input frame to the output, print a log in the console and have a random delay to simulate a real case scenario where we can’t predict the exact processing time of each frame.

The writer implements a similar logic as the filters, with the only difference that it will have an internal counter to know which frame it has to wait for and extract from the queue to preserve the required order in the output. Its getFrameList method waits until it finishes the processing before returning the processed frames.

Here is the unit test I wrote to verify the behavior of the system when the data source emits 10 frames.

And this is the output log. As you can see, the frames are not always processed in the same order in both layers, but they reach the output in order.

Frame 1 processed in layer 1Frame 2 processed in layer 1Frame 0 processed in layer 2Frame 3 processed in layer 1Frame 2 processed in layer 2Frame 4 processed in layer 1Frame 1 processed in layer 2Frame 5 processed in layer 1Frame 3 processed in layer 2Writing frame 0Writing frame 1Writing frame 2Frame 4 processed in layer 2Frame 6 processed in layer 1Writing frame 3Frame 7 processed in layer 1Writing frame 4Frame 5 processed in layer 2Frame 8 processed in layer 1Frame 6 processed in layer 2Writing frame 5Frame 9 processed in layer 1Frame 7 processed in layer 2Frame 8 processed in layer 2Writing frame 6Writing frame 7Frame 9 processed in layer 2Writing frame 8Writing frame 9

Conclusions

I described here my solution of a common problem: how to implement a MultiThread parallel data processing engine, keeping in mind the final order of the processed data; all with a simple Java implementation (not using any message-broker or event-driven architecture). I showed what I think are the most interesting parts of the code in the snippets included and I left out other parts like factories or the implementation details of the Frame.

In my solution I focused more on abstraction (e.g. using generics or predicates and function as parameters), which made the code more reusable, at the expenses of readability.

An alternative would been a more coupled but simpler implementation, maybe with some code duplication but more readability, while postponing the need for generalization. This would also be a valid option.

As always, it depends! it depends on the situation, on our use cases, on our current knowledge of if/how/when our system will evolve in the future and, of course, on your personal taste and sense of aesthetics!

--

--

Ricky Iudica
CodeX
Writer for

Wild Software Engineer with the passion for sports and physiology, follows his logic half of the time and his sense of aesthetic the other half!