C# — Async Pipeline Action/Transform/Buffer blocks

Saurabh Singh
4 min readJul 18, 2020

--

I wonder if you guys ever struggled with processing time it takes to fetch and process data.

Sometimes it require us to read data from one data source, do some processing/filtering, then fetch more data, may be from another source.

At times, the processing requires too much CPU cycles, we cannot process all records together or in parallel.

This is a very basic scenario, but it can be lot more complex and the pipeline may grow.

As per MSDN

BatchBlock(T)

The BatchBlock<T> class combines sets of input data, which are known as batches, into arrays of output data. You specify the size of each batch when you create a BatchBlock<T> object. When the BatchBlock<T> object receives the specified count of input elements, it asynchronously propagates out an array that contains those elements. If a BatchBlock<T> object is set to the completed state but does not contain enough elements to form a batch, it propagates out a final array that contains the remaining input elements.

TransformBlock(TInput, TOutput)

The TransformBlock<TInput,TOutput> class resembles the ActionBlock<TInput> class, except that it acts as both a source and as a target. The delegate that you pass to a TransformBlock<TInput,TOutput> object returns a value of type TOutput.

The delegate that you provide to a TransformBlock<TInput,TOutput> object can be of type System.Func<TInput, TOutput> or type System.Func<TInput, Task<TOutput>>. When you use a TransformBlock<TInput,TOutput> object with System.Func<TInput, TOutput>, processing of each input element is considered completed when the delegate returns.

When you use a TransformBlock<TInput,TOutput> object used with System.Func<TInput, Task<TOutput>>, processing of each input element is considered completed only when the returned Task<TResult> object is completed. As with ActionBlock<TInput>, by using these two mechanisms, you can use TransformBlock<TInput,TOutput> for both synchronous and asynchronous processing of each input element.

ActionBlock(T)

The ActionBlock<TInput> class is a target block that calls a delegate when it receives data. Think of a ActionBlock<TInput> object as a delegate that runs asynchronously when data becomes available.

The delegate that you provide to an ActionBlock<TInput> object can be of type Action<T> or type System.Func<TInput, Task>. When you use an ActionBlock<TInput> object with Action<T>, processing of each input element is considered completed when the delegate returns.

When you use an ActionBlock<TInput> object with System.Func<TInput, Task>, processing of each input element is considered completed only when the returned Task object is completed. By using these two mechanisms, you can use ActionBlock<TInput> for both synchronous and asynchronous processing of each input element.

Following is the code snippet from one of the reports I created and implemented async pipeline.

The BatchBlock receives names of users(50,000 in total). For each user, I fetch additional information by calling an external API. API doesn’t accept more them 300 names at a time.

After fetching and merging data from API, I need to fetch data from MongoDB and REDIS(it depends on the result of API, so I cannot fetch it in beginning). Finally, I combine, process and transform everything to send to UI.

Pipeline Design

Since API doesn’t accept more than 300 names, I used BatchBlock with size 300. BatchBlock will keep receiving records till it accepts all 50,000 names, but sends forward to TransformBlock in a batch/array of size 300.

Inside TransformBlock there is time consuming processing of calling external API and processing/merging that to send next stage.

Action receives the input from TransformBlock and perform the final processing.

--

--