C# — Async Pipeline Action/Transform/Buffer blocks
I wonder if you guys ever struggled with processing time it takes to fetch and process data.
Sometimes it require us to read data from one data source, do some processing/filtering, then fetch more data, may be from another source.
At times, the processing requires too much CPU cycles, we cannot process all records together or in parallel.
This is a very basic scenario, but it can be lot more complex and the pipeline may grow.
As per MSDN
BatchBlock(T)
The BatchBlock<T> class combines sets of input data, which are known as batches, into arrays of output data. You specify the size of each batch when you create a BatchBlock<T> object. When the BatchBlock<T> object receives the specified count of input elements, it asynchronously propagates out an array that contains those elements. If a BatchBlock<T> object is set to the completed state but does not contain enough elements to form a batch, it propagates out a final array that contains the remaining input elements.
TransformBlock(TInput, TOutput)
The TransformBlock<TInput,TOutput> class resembles the ActionBlock<TInput> class, except that it acts as both a source and as a target. The delegate that you pass to a TransformBlock<TInput,TOutput> object returns a value of type TOutput
.
The delegate that you provide to a TransformBlock<TInput,TOutput> object can be of type System.Func<TInput, TOutput>
or type System.Func<TInput, Task<TOutput>>
. When you use a TransformBlock<TInput,TOutput> object with System.Func<TInput, TOutput>
, processing of each input element is considered completed when the delegate returns.
When you use a TransformBlock<TInput,TOutput> object used with System.Func<TInput, Task<TOutput>>
, processing of each input element is considered completed only when the returned Task<TResult> object is completed. As with ActionBlock<TInput>, by using these two mechanisms, you can use TransformBlock<TInput,TOutput> for both synchronous and asynchronous processing of each input element.
ActionBlock(T)
The ActionBlock<TInput> class is a target block that calls a delegate when it receives data. Think of a ActionBlock<TInput> object as a delegate that runs asynchronously when data becomes available.
The delegate that you provide to an ActionBlock<TInput> object can be of type Action<T> or type System.Func<TInput, Task>
. When you use an ActionBlock<TInput> object with Action<T>, processing of each input element is considered completed when the delegate returns.
When you use an ActionBlock<TInput> object with System.Func<TInput, Task>
, processing of each input element is considered completed only when the returned Task object is completed. By using these two mechanisms, you can use ActionBlock<TInput> for both synchronous and asynchronous processing of each input element.
Following is the code snippet from one of the reports I created and implemented async pipeline.
The BatchBlock receives names of users(50,000 in total). For each user, I fetch additional information by calling an external API. API doesn’t accept more them 300 names at a time.
After fetching and merging data from API, I need to fetch data from MongoDB and REDIS(it depends on the result of API, so I cannot fetch it in beginning). Finally, I combine, process and transform everything to send to UI.
Pipeline Design
Since API doesn’t accept more than 300 names, I used BatchBlock with size 300. BatchBlock will keep receiving records till it accepts all 50,000 names, but sends forward to TransformBlock in a batch/array of size 300.
Inside TransformBlock there is time consuming processing of calling external API and processing/merging that to send next stage.
Action receives the input from TransformBlock and perform the final processing.
There can me more than one way to design pipeline to get the desired result ofthe same problem. You need to study more about the context switching, parallel tasks and multi-threading to design the best optimized solution.
Check my other articles to give an overview on Task Parallel Library and async design patterns.