Thought process: High-throughput data fetching and processing in C#
Software development is a process, which can and should evolve.
Let us assume that we have a serious piece of equipment that outputs high volumes of data in a specific format, which needs to be loaded onto a computer in real time and processed, then saved onto a storage medium. You are tasked with creating a .NET 5 program that takes said data from the equipment via a dedicated driver API, does the processing, then outputs the results.
If the above sounds scary, it shouldn’t be. Software development is almost always the result of an evolutive process. Experience will usually allow you to start later and later in the process, but, while learning, starting from the beginning is what everyone should do.
The setup
The easiest to imagine situation in which this could be necessary is an image acquisition setup. Usually, dedicated hardware will have a real-time operation, whereas a PC will not, and timing will become crucial for success.
At 60 frames per second of uncompressed 1080p frames (approximately 6MB per frame), the total amount of data that needs to be transferred, processed and saved would be:
6MB/f *60f/s = 360MB/s
Since we’re talking about 60 frames per second real-time, we’re talking about:
1000 ms/s / 60f/s = 16.(6) ms/f
This means that the time between two frames is slightly less than 17 milliseconds.
Considerations
Usually, since 360MB/s is quite a hefty load to transfer via multiple a chain of multiple interfaces, drivers, OS layers and software layers, and do processing on it as well, a few things are required for success:
- Enough buffer should be available so that, should processing slow down, there is space to accumulate so that data doesn’t get lost
- Either a fast-enough processing cycle, or a parallel-enough processing cycle, must be applied, so that the time it takes between subsequent moments that the software can process does not overrun the data input time from the dedicated hardware
- A fast-enough storage medium, or a good-enough compression cycle, must be used so that the time it takes to save the result to a non-volatile storage medium does not overrun the data input times
For point #1, a lot will depend on hardware. Usually, dedicated hardware will have enough buffer so that what it’s attached to will have the chance to read data before the same buffer is overwritten by new data. This is usually achieved with a buffer certain times larger than the actual data package being received (for the sake of simplicity, let’s just assume that the size of each data package (in our example case a bitmap-format 1080p frame) is constant).
Point #2 will mostly depend on software factors. If the requirements are for processing into a specific format (say, a series of PNG images), then the processing cycle defines which strategy should be employed. A still image encoding, for instance, will take a lot longer than 16.6 milliseconds, the gap between two frames at 60f/s. So, in such a situation, the only strategy that can ensure proper timing is parallel processing.
A separate problem that again forces a parallel strategy is the choice of software platform. If you chose a managed platform, such as .NET, you must contend with various timing skews and delays that come from various platform-specific sources, such as thread pool management, garbage collection, marshalling, p/invoke, etc.
Point #3 will mostly be based on requirement and available hardware. If the machine that you are running on has a dedicated encoding equipment that only takes 5ms to process, and a dedicated API to call, then, of course, you won’t have to use a parallel strategy on point #2. But, should the requirements be custom encoding, or no hardware is available, then you need to plan accordingly.
Ultimately, should the requirements actually be met, then considerations for points #1–3 will have only determined how many resources you need, and how efficient you are in using them.
However, in real-life scenarios, the considerations for points #1–3 will determine your strategy, as the inability to offer the speed or the timing required for dedicated hardware on a PC will usually greatly limit your range of available strategies.
The simulated API
Let’s assume that we have a driver API (translated in C#) that follows some principles:
- The driver does not push anything itself — anything that is required must be fetched
- The driver notifies for data available in a blocking way, and writes data into a specified buffer in a non-blocking way — this ensures the maximum amount of speed for the driver itself, while allowing the maximum amount of non-blocking time for the API caller, while simultaneously ensuring that the caller is guaranteed data available for reading upon notification
- The driver uses the fastest possible data transfer strategies, which means that PC RAM must be allocated and registered within it in order to be used
These principles are mostly in line with what one would expect from a dedicated hardware’s API. It would look something like this:
public static class Acquisition
{
#region Notification API
public static DriverResponse WaitForNotification(DriverOptions options) => ...;
#endregion
#region Buffer pinning
public static DriverResponse PinBuffer(
byte[] buffer,
DriverOptions options)
{ ... }
#endregion
#region Data fetching API
public static uint StartFetchingData(DriverOptions options,
byte[] buffer) => ...;
public static DriverResponse EndFetchingData(
uint fetchingId) =>
...;
#endregion
}
We will use WaitForNotification in order to block a listening thread until data is available. We’ll use StartFetchingData to initiate writing of data into a predefined buffer, which would be non-blocking, and we’ll use EndFetchingData to block (if necessary) until the buffer is filled. The buffers that we’ll use are going to be pre-allocated and registered with PinBuffer.
Since drivers will require option sets and will probably wish to return complex results as structured data types. For the sake of example, we’ll be using two structures for this purpose.
- A struct called DriverOptions, which is basically input for driver API methods:
[StructLayout(LayoutKind.Sequential)]
public struct DriverOptions
{
public uint ChannelId { get; set; }
public uint TimeoutInMilliseconds { get; set; }
public byte BufferIdentifier { get; set; }
}
- A struct called DriverResponse, which is status response with a minor information payload:
[StructLayout(LayoutKind.Sequential)]
public struct DriverResponse
{
public byte ChannelId { get; set; }
public ResponseStatus Status { get; set; }
public byte BufferIdentifier { get; set; }
public uint ResponseDuration { get; set; }
}
public enum ResponseStatus : byte
{
Success = 0,
Timeout = 1,
NoData = 2,
EquipmentNotInitialized = 3,
HardwareFailure = 4,
}
- A set of constants about the API:
public static class Constants
{
public static int PayloadSize { get; set; } = 6291456; // 6MB
}
Beginning to design
For the sake of simplicity, let’s make a few decisions that are not likely to influence us on the long term, and can be easily adjusted according to the situation:
- we will be using a console app
- we will not be limiting the resources that are being used: CPU, RAM, threads
- We’ll be skipping some data validation — doing data validation is always a good idea, so please do validate user input, but, for the sake of
Beginning the code, we’ll do some simple plumbing to get some input parameters. I’ve found that the CommandLineParser NuGet package is a good way to do simple parsing of command line arguments, by creating a class that will contain argument data, or default values.
Let’s define such a class, inside the Program static class, with option, such as the output folder:
private class ConsoleArgumentData
{
[Option('o', "output")]
public string OutputFolder { get; set; } [Option('c', "channel")]
public uint Channel { get; set; }
}
Let’s also create a Main function that parses arguments using the default parser, uses a separate thread to do the job, and then simply waits for user input to finish calling the driver:
static void Main(string[] args)
{
Parser.Default.ParseArguments(args)
.WithParsed<ConsoleArgumentData>(
data =>
{
// We use a cancellation token source to be able
// to stop the process later
CancellationTokenSource cts = new CancellationTokenSource();
// We run the worker method MainLoop on a separate
// thread, giving the output folder and the
// cancellation token
Task runningTask = Task.Run(() => MainLoop(data.OutputFolder, data.Channel, cts.Token));
// Let's await for any user input
Console.WriteLine("Press any key to end...");
Console.Read();
Console.WriteLine("Finishing...");
// Cancelling the task immediately
cts.Cancel();
// Let's wait for 30 seconds so everything
// finishes processing, since we have them
runningTask.Wait(TimeSpan.FromSeconds(30));
});
}
The only thing left is to implement the main method that waits for being notified, then fetches the data, processes it, and saves it. We have already called the MainLoop method, let’s write down a basic skeleton for it below.
First draft, a naïve but functional version
Let’s try to write our method without considering things like timing and performance:
private static void MainLoop(string outputFolder, uint channel, CancellationToken cancellationToken)
{
// Buffer and initial driver options
byte[] buffer = new byte[Constants.PayloadSize];
DriverOptions driverOptions = new()
{
ChannelId = channel
};
// Attempt to pin the buffer
DriverResponse response = Acquisition.PinBuffer(
buffer,
driverOptions);
if (response.Status != ResponseStatus.Success)
{
// Failure to pin
Console.WriteLine("Buffer could not be pinned.");
return;
}
// Let's read until we're told to quit
int fileIndex = 0;
while (!cancellationToken.IsCancellationRequested)
{
// Block while waiting for notification
response = Acquisition.WaitForNotification(driverOptions);
if (response.Status != ResponseStatus.Success)
{
// Missed frame
continue;
}
// Set options for fetching
DriverOptions nextDriverOptions = new()
{
ChannelId = channel,
BufferIdentifier = response.BufferIdentifier
};
// Start data fetching (non-blocking)
uint fetchId = Acquisition.StartFetchingData(
nextDriverOptions,
buffer);
response = Acquisition.EndFetchingData(fetchId);
if (response.Status != ResponseStatus.Success)
{
// Missed frame
continue;
} /*
* Add processing here
*/ try
{
// Let's save the file
using FileStream sw = File.Create(
Path.Combine(
outputFolder,
$"{fileIndex++}.bin"));
sw.Write(buffer);
}
catch
{
// Missed frame
continue;
}
}
}
The method does what we want it to do: it creates a buffer, it registers it (“pins”) with the driver, then starts a cycle waiting for a notification, reading the data when one is received, then saving the data.
First improvement, offloading to another thread
Let’s think of a few things here:
- The entire method runs synchronously on one thread, which means that it has to execute within the time interval of 16.6 milliseconds, otherwise it will miss data
- The method does not use the blocking/non-blocking mechanism offered by the driver
These are both improvement points that we can capitalize on, so let’s try and do a small improvement in the second part of the method:
// Start data fetching (non-blocking)
uint fetchId = Acquisition.StartFetchingData(
nextDriverOptions,
buffer);
(uint, byte[], int, string) outerState = (fetchId, buffer, fileIndex, outputFolder);
fileIndex++;
ThreadPool.QueueUserWorkItem(
state =>
{
var (fId, buf, fI, oF) = ((uint, byte[], int, string))state;
var r2 = Acquisition.EndFetchingData(fId);
if (r2.Status != ResponseStatus.Success)
{
// Missed frame
return;
}
/*
* Add processing here
*/
try
{
// Let's save the file
using FileStream sw = File.Create(
Path.Combine(
oF,
$"{fI}.bin"));
sw.Write(buf);
}
catch
{
// Missed frame
return;
}
}, outerState);
Now this looks somewhat better. Once the data fetching is initiated, a value is created that holds the data which should be passed onto a different thread. Then, this data is passed through to a worker thread which takes over and waits (in a blocking way) for the data to be brought to system memory, then processes it and saves it, all the while leaving the original thread to go on and catch another notification.
Now, should the driver method that initiates reading be of sufficient performance (and there is little reason for it not to be), this new way of writing the method would allow the thread that is waiting for notifications to run with minimal interference, and start off threads with every notification it gets.
Resource-wise, we can follow the threads in the thread pool, and do some calculations of our own. We will clearly have more than one thread processing at one time, but it is important to note that multiple processing threads does not necessarily mean that the performance of our software is going to be bad. It just means that we are using the advantage of our PC (that it can be massively parallel) to complement the advantage of our equipment (that it is real-time), ensuring that the high-throughput of the entire cycle will remain the same.
Ideally, in this case, the number of parallel processing threads will grow sharply at the beginning of the cycle, then taper off and oscillate around the same value after a short while. This means that the PC can handle the load well. We can even calculate how many threads there will be:
Let Tp be the time it takes to process one payload (we’ll include the time it takes to spawn the thread and save the data in here as well), and let Tn be the time between two successive notifications. We can therefore compute Tmin and Tmax as:
Tmin = ceiling(Tp / Tn)
Tmax = ceiling(Tp / Tn)
We get Tmin and Tmax as the interval in which the number of parallel processing threads oscillates.
We can further expand this formula to a non-constant time elapsed for processing or notifications, considering Tpmin and Tnmin as the smallest intervals for processing and notification, and Tpmax and Tnmax as the largest. We therefore get:
Tmin = ceiling(Tpmin / Tnmax)
Tmax = ceiling(Tpmax / Tnmin)
Ensuring that buffers don’t overrun
If you were to actually run the code written in the step before, you would have noticed that the data is garbled and unrecognizable. This is because we use the same buffer to write data, then to process.
Let’s switch our code to use a buffer for each time we need it:
private static void MainLoop(string outputFolder, uint channel, CancellationToken cancellationToken)
{
// Let's read until we're told to quit
int fileIndex = 0;
while (!cancellationToken.IsCancellationRequested)
{
// Buffer and initial driver options
byte[] buffer = new byte[Constants.PayloadSize];
DriverOptions driverOptions = new()
{
ChannelId = channel
};
// Attempt to pin the buffer
DriverResponse response = Acquisition.PinBuffer(
buffer,
driverOptions);
if (response.Status != ResponseStatus.Success)
{
// Failure to pin
Console.WriteLine("Buffer could not be pinned.");
continue;
}
// Block while waiting for notification
response = Acquisition.WaitForNotification(driverOptions);... the rest of the method
This looks like it’s running, but it seems to miss quite a few frames. Why is that?
Digging in deeper in garbage collection
We seem to be forgetting that our buffers are 6MB in size, each! The garbage collection mechanism puts them directly in the large objects heap, and, as you’ve noticed, we’re discarding each after every use. This prompts the GC to keep finding 6MB chunks of memory to allocate, which it naturally doesn’t want to do, so it has to collect the unused ones. This, in turn, triggers a level 2 heap collection, which severely slows down the process.
Naturally, the solution is to use multiple pre-allocated buffers. But how would you know how many?
A clue is the formulas that we have computed in the previous phase and create enough buffers to accommodate the maximum number of threads that one can have. This is a risky proposition, though: the processing speed depends on available CPU time, so the Tpmax time will remain constant only as long as there is available CPU time for all processing threads. Once the resources become limited (e.g. the user is doing something else on that machine), the Tpmax value will increase, and we will be in exactly the same situation as before.
Clearly, a solution with a dynamic number of buffers is ultimately required. Let’s start to prepare our code for multiple buffers.
The values that we have computed before can be used as reference points. The Tmin value is the lowest that we will have, so it is reasonable that we would begin with that number of buffers, and let them increase as needed.
We could also use Tmax as a starting point, but whether or not we should do so depends on how the minimum and maximum times for processing and notification were calculated. If the maximum was calculated with serious outliers, it might be wasteful to pre-allocate buffers to serve that much, since you will only benefit from them in edge cases. Perhaps a smarter way to be safe and have high-throughput at the cost of using up more RAM would be to calculate a superior average and start from there:
Tpmax = avg(Tp where Tp > avg(Tp))
This way, we would begin with an already-hefty allocation that would serve us well in the vast majority of cases, and only increase if absolutely necessary. This would be a trial-and-error determination, a practice which is a lot more useful in the world of development than you think.
Let’s modify our API constants class to have that value for number of initial buffers:
public static class Constants
{
public static int PayloadSize { get; set; } = 6291456; // 6MB
public static int EstimatedNumberOfThreads { get; set; } = 8;
}
And now let’s modify our class so that the buffers are there. We first take the buffer initialization to the outside of the cycle, and then we run through buffers in the cycle:
// Initialize buffers
byte[][] buffers = new byte[Constants.EstimatedNumberOfThreads][];
for (int i = 0; i < Constants.EstimatedNumberOfThreads; i++)
{
buffers[i] = new byte[Constants.PayloadSize];
}
// Let's read until we're told to quit
int fileIndex = 0;
int bufferIndex = 0;
while (!cancellationToken.IsCancellationRequested)
{
// Buffer and initial driver options
byte[] buffer = buffers[bufferIndex++];
if (bufferIndex == Constants.EstimatedNumberOfThreads)
{
bufferIndex = 0;
}
DriverOptions driverOptions = new()
{
ChannelId = channel
};
...
We now see that the software executes correctly while we still have buffers, and it also has no timing issues, but we’ve used up a lot of RAM. This might be a trade-off that you willing to do in order to keep performance at an all-time high.
As an added bonus, we’ve kept our application relatively garbage-collection-free. Let’s think of a few ways to improve this, and the very first is to switch to fully dynamic buffers.
Using dynamic buffers
In order to use dynamic buffers, we can and should write our own dynamic object pool. But for the purpose of this exercise, we will use a ready-made one: IX.StandardExtensions. If you want to research how to make your own dynamic object pool, you could look through the ObjectPoolQueue<T> class, which we will use (along with its associated PooledObject<T> class).
After using the NuGet package, our code thus becomes:
// Initialize buffers
ObjectPool<byte[]> buffers = new ObjectPool<byte[]>(
() => new byte[Constants.PayloadSize],
Constants.EstimatedNumberOfThreads);
// Let's read until we're told to quit
int fileIndex = 0;
while (!cancellationToken.IsCancellationRequested)
{
// Buffer and initial driver options
using var bufferObject = buffers.Get();
var buffer = bufferObject.Value;
DriverOptions driverOptions = new()
{
ChannelId = channel
};
...
And we can go on and on, improving our algorithms more and more.
Final word
As you can see, a complex problem has been tackled in an evolutive manner: we started small, and, with each solution, a new set of problems or concerns showed themselves, and were tackled one by one.
We used C#, we used a little basic maths, some logical deduction and our own analysis of how the software behaves. And we went from a simple somewhat working solution to a solution that is actually usable and performant.
This technique is not a magic bullet that will solve all your software development problems, but it is a good way to start gaining experience.
By looking at how your software behaves beyond the first working solution, you can discover new things to consider. After, for instance, you tackle the buffer problem, you start considering memory usage more. After starting to use multiple buffers, garbage collection is suddenly on your mind. And with every software or component that you code, more considerations come to you naturally, and that “first working solution” will get more and more complex. This, in turn, will open up new considerations, and higher levels of performance and refinement.
I hope that, at the end of this article, you will have realized that even a problem that sounds complex and slightly frightening can, in fact, be solved.