The Magic of .NET Dataflow

Ori Hers
CodeX
Published in
7 min readJul 22, 2023
Photo by Markus Spiske on Unsplash

Do you want to process data like a boss? Do you hate dealing with threads, locks, and queues? Do you love blocks, pipelines, and transformations? If you answered yes to any of these questions, then C# Dataflow is for you. Dataflow is a library that makes concurrent and asynchronous data processing a breeze. It lets you create Dataflow blocks that do all the hard work for you and connect them into awesome pipelines that handle data like magic.

In this article, you’ll learn how to use Dataflow to process CSV files in parallel and get tips and tricks for making your Dataflow pipelines faster and smoother.

By the end of this article, you’ll be a C# Dataflow master, and your data will thank you for it.

Why.

  1. Simplified Development: Dataflow offers a high-level abstraction that simplifies the implementation of complex data processing pipelines. It allows you to focus on the logic and transformation of your data, rather than getting tangled in low-level threading details. This leads to cleaner and more maintainable code.
  2. Efficient and Scalable Processing: The built-in support for backpressure handling in Dataflow ensures that data processing remains efficient and scalable, even under heavy workloads. It manages the flow of data between blocks, preventing overload and maintaining optimal performance.
  3. Flexible Block Types: Dataflow provides a variety of block types to suit different processing needs. Whether you need to transform data, perform actions, or broadcast data to multiple consumers, the library has you covered. This flexibility allows you to design and construct data pipelines that meet your specific requirements with ease.

When.

  1. Concurrent Data Processing: When you need to process data concurrently, Dataflow provides an excellent solution. It allows you to easily construct Dataflow pipelines where multiple blocks can execute in parallel, enabling efficient and concurrent data processing.
  2. Asynchronous Workflows: Dataflow is well-suited for handling asynchronous workflows, especially when dealing with tasks that have dependencies or require synchronization. It simplifies the coordination and management of asynchronous operations, making it easier to handle complex data processing scenarios.
  3. Complex Data Transformations: If you have complex data transformation requirements, Dataflow can be a valuable tool. Its block-based architecture and built-in support for transformation functions allow you to define and manage intricate data processing logic, making it easier to implement and maintain complex data transformations.

Which.

Dataflow has three different types of blocks:

  • Source Blocks: Source blocks produce data or act as the entry point into the Dataflow pipeline. They are not connected to any preceding blocks but provide data to downstream blocks. Source blocks generate data asynchronously and make it available for consumption by other blocks in the pipeline.
var sourceBlock = new BufferBlock<int>();
  • Target Blocks: Target blocks are the final destination for data in the Dataflow pipeline. They receive input from preceding blocks but do not produce any output themselves. Target blocks typically perform actions or consume the data for the final processing or result. Examples of actions could be writing to a database, sending an email, or performing a specific computation.
var targetBlock = new ActionBlock<string>(result => Console.WriteLine(result));
  • Propagator Blocks: Propagator blocks, also known as processing blocks, consume and produce data. They receive input from preceding blocks, perform some processing or transformation, and produce output to be passed on to subsequent blocks. Propagator blocks play a crucial role in the Dataflow pipeline by manipulating and transforming data as it flows through it.
var propagatorBlock = new TransformBlock<int, string>(input => (input * 2).ToString());

Connect blocks.

We connect blocks using the linkTo() method. Each source block can be connected to one or many target blocks, and each target block could be connected to one or many source blocks. The linkTo() method can also get a delegate that determines whether the target block accepts or rejects a message based on the value of that message. This filtering mechanism guarantees that a Dataflow block receives only specific values. For most predefined Dataflow block types, if a source block is connected to multiple target blocks when a target block rejects a message, the source offers that message to the next target.

Dataflow Block Completion signifies the moment when a Dataflow block completes processing all incoming data. It facilitates coordination and synchronization in a data processing pipeline by monitoring block completion. By handling block completion events, you can execute additional actions when a block finishes processing. the syntax is similar to the async-await we usually use in .NET.

// Create our action block.
var block = new ActionBlock<int>(async number =>
{
await Task.Delay(1000);
Console.WriteLine($"Processed number: {number}");
});

// Post data to the pipeline.
for (int i = 0; i < 10; i++)
{
block.Post(i);
}

block.Complete();

//Await the pipeline complition.
await block.Completion;

//Run post-pipeline code.
Console.WriteLine("All numbers processed.");

Types Types Types.

  1. TransformBlock<TInput, TOutput>: This block allows you to transform each input item of type TInput into an output item of type TOutput. It is often used for data processing and manipulation, such as performing calculations, applying business logic, or converting data from one format to another.
  2. ActionBlock<TInput>: This block enables you to perform an action or execute a method on each input item of type TInput. It is useful when you need to trigger side effects, perform I/O operations, or execute tasks that don’t require producing an output.
  3. BroadcastBlock<T>: This block broadcasts the input item of type T to all connected blocks in the Dataflow pipeline. It is commonly used when you want to distribute the same input data to multiple downstream blocks or perform parallel processing on the same input.
  4. BufferBlock<T>: This block provides a buffer for storing and passing on input items of type T to subsequent blocks. It allows you to control the flow of data and can be useful in scenarios where you want to temporarily store or batch-process items before passing them down the pipeline.
  5. BatchBlock<T>: This block collects a specified number of input items of type T or waits for a defined time span before releasing them as a batch. It can be beneficial when you aggregate or process items in groups rather than individually.
Photo by Ryan Quintal on Unsplash

Coding Time.

The best way to learn is by example. The bellow code is processing a person's income/outcome from different CSV files in parallel and prints when there is a person who has more than 50$ in his account. Using optimized resources, we would use the Dataflow framework to complete the task in parallel.

We would use the following:

  • TransformBlock to read CSV files and skip header rows.
  • TransformManyBlock to process CSV lines and update the aggregation dictionary.
  • ActionBlock to print results for people with more than $50.
  • DataflowLinkOptions to connect blocks and propagate completion signals.
  • SendAsync to send data to blocks.
  • Completion to signal block completion.
  • Task.WhenAll to wait for all blocks completion.
using System.Collections.Concurrent;
using System.Threading.Tasks.Dataflow;

class Program
{
static async Task Main()
{
// Specify the directory where the CSV files are located
var directoryPath = @".\Files";

// Read all CSV files in the directory
var csvFiles = Directory.GetFiles(directoryPath, "*.csv").ToList();

// Create data structure for aggregations
var aggregationResults = new ConcurrentDictionary<string, int>();

// Initialize a dataflow block for importing data from CSV files
// Read the CSV file and skip the header row
var importBlock = new TransformBlock<string, IEnumerable<string>>(csvFile => File.ReadAllLines(csvFile).Skip(1));

// Initialize a dataflow block for parsing and aggregating the data
var aggregationBlock = new TransformManyBlock<IEnumerable<string>, KeyValuePair<string, int>>(lines =>
{
// Process each line in the CSV file
foreach (string line in lines)
{
// Split the line by comma
var fields = line.Split(',');

// Extract the necessary data
var name = fields[0];
var money = int.Parse(fields[1]);

aggregationResults.AddOrUpdate(
name,
money,
(n, _) => aggregationResults[n] + money);
}

return aggregationResults;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

// Initialize a dataflow block for printing the results if there is a person which currently has more than 50$ in his account
var printBlock = new ActionBlock<KeyValuePair<string, int>>(result =>
{
if(result.Value > 50)
{
Console.WriteLine($"Name: {result.Key}, Total amount: {result.Value}");
}
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

// Connect the dataflow blocks
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
importBlock.LinkTo(aggregationBlock, linkOptions);
aggregationBlock.LinkTo(printBlock, linkOptions);

// Process each CSV file
foreach (string csvFile in csvFiles)
{
await importBlock.SendAsync(csvFile);
}

// Signal the completion of the import block
importBlock.Complete();

// Wait for all blocks to complete
await Task.WhenAll(importBlock.Completion, aggregationBlock.Completion, printBlock.Completion);
Console.ReadLine();
}
}
Photo by Peter Olexa on Unsplash

Best Practices.

In software development, perfection requires time and experience, here are some best practices that can help us take our pipelines to the next level.

  • Properly Configure Degree of Parallelism: Adjust the MaxDegreeOfParallelism property of the Dataflow blocks to control the number of parallel operations. Set it to an optimal value based on the available system resources and the nature of your processing workload.
  • Use Batch Blocks for Grouping: If you have a large number of incoming data items, consider using BatchBlock to group them into batches. This reduces the overhead of passing individual items through the pipeline and improves overall throughput.
  • Properly Dispose of Dataflow Blocks: Ensure proper disposal of Dataflow blocks to release resources when they are no longer needed. Use the Complete() method to signal the end of processing, and use the await block.Completion pattern to await the completion of all blocks before disposing of them.
  • Monitor and Tune Performance: Monitor the performance of your data processing pipeline using tools like performance counters or logging. Adjust the configuration parameters and block settings, such as buffer sizes, to optimize the performance and throughput of the pipeline. The following article can be useful for that matter: 5 Rules for .NET Services Optimization

Heavy computations and async data processing are never easy tasks, it requires writing edge performance code and vast knowledge. Now, we can also add a Dataflow to our toolbox, and trust me, it’s worth trying.

More info can also be found in Microsoft documentation, and also a guide for creating our first DataFlow pipeline.

--

--

Ori Hers
CodeX
Writer for

Out of the box thinker. Software engineer @ Microsoft by day, horse rider by night.