Sitemap
CodeX

Everything connected with Tech & Code. Follow to join our 1M+ monthly readers

Follow publication

Thought process: High-throughput data fetching and processing in C#

14 min readMay 19, 2021

--

The setup

Considerations

The simulated API

public static class Acquisition
{
#region Notification API

public static DriverResponse WaitForNotification(DriverOptions options) => ...;

#endregion

#region Buffer pinning

public static DriverResponse PinBuffer(
byte[] buffer,
DriverOptions options)
{ ... }

#endregion

#region Data fetching API

public static uint StartFetchingData(DriverOptions options,
byte[] buffer) => ...;

public static DriverResponse EndFetchingData(
uint fetchingId) =>
...;

#endregion
}
[StructLayout(LayoutKind.Sequential)]
public struct DriverOptions
{
public uint ChannelId { get; set; }

public uint TimeoutInMilliseconds { get; set; }

public byte BufferIdentifier { get; set; }
}
[StructLayout(LayoutKind.Sequential)]
public struct DriverResponse
{
public byte ChannelId { get; set; }

public ResponseStatus Status { get; set; }

public byte BufferIdentifier { get; set; }

public uint ResponseDuration { get; set; }
}

public enum ResponseStatus : byte
{
Success = 0,
Timeout = 1,
NoData = 2,
EquipmentNotInitialized = 3,
HardwareFailure = 4,
}
public static class Constants
{
public static int PayloadSize { get; set; } = 6291456; // 6MB
}

Beginning to design

private class ConsoleArgumentData
{
[Option('o', "output")]
public string OutputFolder { get; set; }
[Option('c', "channel")]
public uint Channel { get; set; }
}
static void Main(string[] args)
{
Parser.Default.ParseArguments(args)
.WithParsed<ConsoleArgumentData>(
data =>
{
// We use a cancellation token source to be able
// to stop the process later
CancellationTokenSource cts = new CancellationTokenSource();

// We run the worker method MainLoop on a separate
// thread, giving the output folder and the
// cancellation token
Task runningTask = Task.Run(() => MainLoop(data.OutputFolder, data.Channel, cts.Token));

// Let's await for any user input
Console.WriteLine("Press any key to end...");

Console.Read();

Console.WriteLine("Finishing...");

// Cancelling the task immediately
cts.Cancel();

// Let's wait for 30 seconds so everything
// finishes processing, since we have them
runningTask.Wait(TimeSpan.FromSeconds(30));
});
}

First draft, a naïve but functional version

private static void MainLoop(string outputFolder, uint channel, CancellationToken cancellationToken)
{
// Buffer and initial driver options
byte[] buffer = new byte[Constants.PayloadSize];
DriverOptions driverOptions = new()
{
ChannelId = channel
};

// Attempt to pin the buffer
DriverResponse response = Acquisition.PinBuffer(
buffer,
driverOptions);

if (response.Status != ResponseStatus.Success)
{
// Failure to pin
Console.WriteLine("Buffer could not be pinned.");
return;
}

// Let's read until we're told to quit
int fileIndex = 0;
while (!cancellationToken.IsCancellationRequested)
{
// Block while waiting for notification
response = Acquisition.WaitForNotification(driverOptions);

if (response.Status != ResponseStatus.Success)
{
// Missed frame
continue;
}

// Set options for fetching
DriverOptions nextDriverOptions = new()
{
ChannelId = channel,
BufferIdentifier = response.BufferIdentifier
};

// Start data fetching (non-blocking)
uint fetchId = Acquisition.StartFetchingData(
nextDriverOptions,
buffer);

response = Acquisition.EndFetchingData(fetchId);

if (response.Status != ResponseStatus.Success)
{
// Missed frame
continue;
}
/*
* Add processing here
*/
try
{
// Let's save the file
using FileStream sw = File.Create(
Path.Combine(
outputFolder,
$"{fileIndex++}.bin"));

sw.Write(buffer);
}
catch
{
// Missed frame
continue;
}
}
}

First improvement, offloading to another thread

// Start data fetching (non-blocking)
uint fetchId = Acquisition.StartFetchingData(
nextDriverOptions,
buffer);

(uint, byte[], int, string) outerState = (fetchId, buffer, fileIndex, outputFolder);
fileIndex++;

ThreadPool.QueueUserWorkItem(
state =>
{
var (fId, buf, fI, oF) = ((uint, byte[], int, string))state;

var r2 = Acquisition.EndFetchingData(fId);

if (r2.Status != ResponseStatus.Success)
{
// Missed frame
return;
}

/*
* Add processing here
*/

try
{
// Let's save the file
using FileStream sw = File.Create(
Path.Combine(
oF,
$"{fI}.bin"));

sw.Write(buf);
}
catch
{
// Missed frame
return;
}
}, outerState);

Ensuring that buffers don’t overrun

private static void MainLoop(string outputFolder, uint channel, CancellationToken cancellationToken)
{
// Let's read until we're told to quit
int fileIndex = 0;
while (!cancellationToken.IsCancellationRequested)
{
// Buffer and initial driver options
byte[] buffer = new byte[Constants.PayloadSize];
DriverOptions driverOptions = new()
{
ChannelId = channel
};

// Attempt to pin the buffer
DriverResponse response = Acquisition.PinBuffer(
buffer,
driverOptions);

if (response.Status != ResponseStatus.Success)
{
// Failure to pin
Console.WriteLine("Buffer could not be pinned.");

continue;
}

// Block while waiting for notification
response = Acquisition.WaitForNotification(driverOptions);
... the rest of the method

Digging in deeper in garbage collection

public static class Constants
{
public static int PayloadSize { get; set; } = 6291456; // 6MB

public static int EstimatedNumberOfThreads { get; set; } = 8;
}
// Initialize buffers
byte[][] buffers = new byte[Constants.EstimatedNumberOfThreads][];
for (int i = 0; i < Constants.EstimatedNumberOfThreads; i++)
{
buffers[i] = new byte[Constants.PayloadSize];
}

// Let's read until we're told to quit
int fileIndex = 0;
int bufferIndex = 0;

while (!cancellationToken.IsCancellationRequested)
{
// Buffer and initial driver options
byte[] buffer = buffers[bufferIndex++];
if (bufferIndex == Constants.EstimatedNumberOfThreads)
{
bufferIndex = 0;
}

DriverOptions driverOptions = new()
{
ChannelId = channel
};
...

Using dynamic buffers

// Initialize buffers
ObjectPool<byte[]> buffers = new ObjectPool<byte[]>(
() => new byte[Constants.PayloadSize],
Constants.EstimatedNumberOfThreads);

// Let's read until we're told to quit
int fileIndex = 0;

while (!cancellationToken.IsCancellationRequested)
{
// Buffer and initial driver options
using var bufferObject = buffers.Get();
var buffer = bufferObject.Value;

DriverOptions driverOptions = new()
{
ChannelId = channel
};
...

Final word

--

--

CodeX
CodeX

Published in CodeX

Everything connected with Tech & Code. Follow to join our 1M+ monthly readers

Adrian Mos
Adrian Mos

Written by Adrian Mos

Software developer, architect and father, with a love of science and writing, and a desire for global social and economic development.

No responses yet