“TPL Dataflow is an interesting mix of asynchronous and parallel technologies. It’s useful when you have a sequence of processes that need to be applied to your data.”
— Stephen Cleary, Concurrency in C# Cookbook
I recently needed to traverse a recursive category tree — starting with ~20 root nodes, each with an unknown number of subcategories (and sub-subcategories, and so on).
This kind of dynamically expanding workload doesn’t play well with static parallelization tools like Parallel.ForEach.
If we run Parallel.ForEach on the 20 root categories:
- Thread 1 might get a small branch and finish in 1 second.
- Thread 2 might get a massive branch and take 10 minutes.
Even with MaxDegreeOfParallelism = 50, most threads finish early and go idle — while a few get stuck processing deep, heavy trees. Load imbalance and wasted resources.
The Solution: ActionBlock<T> as a Dynamic Work Queue
ActionBlock<T> from TPL Dataflow provides a centralized, thread-safe queue with dynamic task generation and controlled concurrency.
Here’s how it works:
- Create one
ActionBlock<CategoryNode>with a fixed concurrency limit. - “Prime” it with the initial root categories.
- Each worker processes its node, finds subcategories, and posts them back into the same
ActionBlock. - The block keeps running until all items (current + pending) are done.
This pattern is effectively a recursive, self-balancing queue — all threads stay busy until the entire tree is processed.
Example Implementation
public class MultithreadTreeParser { private int _activeItems; private ActionBlock<CategoryNode> _actionBlock = null!; public async Task<int> StartAsync() { var rootNode = await GetDepartmentsRootNodeAsync() ?? throw new Exception("Failed to get root node"); var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }; _actionBlock = new ActionBlock<CategoryNode>(async node => { try { await ProcessCategoryAsync(node); var subs = node.SubCategories; if (subs.Count > 0) { Interlocked.Add(ref _activeItems, subs.Count); foreach (var sub in subs) await _actionBlock.SendAsync(sub); } } finally { var remaining = Interlocked.Decrement(ref _activeItems); if (remaining == 0) _actionBlock.Complete(); } }, options); var rootSubs = rootNode.SubCategories; Interlocked.Add(ref _activeItems, rootSubs.Count); foreach (var sub in rootSubs) await _actionBlock.SendAsync(sub); await _actionBlock.Completion; return 0; } }
Why This Works
- Dynamic load balancing: Workers pull from a shared queue. As soon as they finish, they grab the next available node.
- Recursive workload expansion: Each task can add new tasks safely to the same queue.
- Controlled parallelism: MaxDegreeOfParallelism keeps resource usage in check.
- Clean completion tracking: _activeItems counts in-flight work — when it hits zero, the pipeline gracefully completes.
Source: DEV Community.

Leave a Reply