Creating a Throttle with an ActionBlock – Addendum (Cancelling)

In my previous post I described how to create a throttle with an action block so you wouldn’t have too many tasks running simultaneously. But what if you want to cancel the tasks?

In our use case, we have a hard-limit of 2 minutes to complete the work (or as much as possible). A typical run will take about 30-40 seconds. Sometimes due to network issues or database issues we can’t complete everything in that time, so we have to stop what we’re doing and come back later – and hopefully things will be better and we can complete our run.

So, we need to tell the ActionBlock to stop processing tasks. To do this we pass it a CancellationToken. When we’ve finished posting work items to the ActionBlock we tell the CancellationTokenSource to cancel after a set time. We also check the cancellation token from within our task for the cancelled state an exit at appropriately safe points.

// Before setting up the ActionBlock create a CancellationTokenSource
CancellationTokenSource cts = new CancellationTokenSource();

// Set up the ActionBlock with the CancellationToken passed in the options
ActionBlock<int> throttle = new ActionBlock<int>(
    action: i=>DoStuff(i),
    dataflowBlockOptions: new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 3,
        CancellationToken = cts.Token
    });

// ...Other code  to post work items to the action block...

// After posting the work items, set the timeout in ms.
cts.CancelAfter(2000);

// Wrap the await up to catch the cancellation
Task completionTask = throttle.Completion;
try
{
    await completionTask;
}
catch (TaskCanceledException e)
{
    Console.WriteLine(e);
}

The code is available on GitHub: https://github.com/colinangusmackay/ActionBlockThrottle/tree/master/src/04.CancellingTasksInTheActionBlock

Things to watch for

If you start your timer (When you set cts.CancelAfter(...)) before you’ve posted your work items, it is possible for the cancellation to trigger before you’ve posted all your work items, in which case you should check the cancellation token as you’re posting your work items, otherwise you will be wasting time posting work items that will never be processed.

Creating a Throttle with ActionBlock

We have an application that needs to perform a repetitive task on many external services and record then aggregate the results. As the system has grown the number of external systems has increased which causes some issues as we originally just created a number of tasks and waited on them all completing. This overwhelmed various things as all these tasks were launched near simultaneously. We needed a way to throttle each task, so we used an ActionBlock, part of the Task Parallel Library’s System.Threading.Tasks.Dataflow package.

Basic setup

I’ve created a little application that does some “work” (by sleeping for random periods of a few milliseconds). It looks like this:

class Program
{
    private static byte[] work = new byte[100];
    static void Main(string[] args)
    {
        new Random().NextBytes(work);
        for (int i = 0; i < work.Length; i++)
        {
            DoStuff(i);
        }
        Console.WriteLine("All done!");
        Console.ReadLine();
    }

    static void DoStuff(int data)
    {
        int wait = work[data];
        Console.WriteLine($"{data:D3} : Work will take {wait}ms");
        Thread.Sleep(wait);
    }
}

Also available on GitHub: https://github.com/colinangusmackay/ActionBlockThrottle/tree/master/src/00.BasicSerialImplementation

This is the very basic application that I’ll be parallelising.

A simple ActionBlock

Here is the same program, but with the work wrapped in an ActionBlock. It is a bit more complex, and currently for little extra benefit as we’re not done anything to parallelise it yet.

class Program
{
    private static byte[] work = new byte[100];
    static async Task Main(string[] args)
    {
        new Random().NextBytes(work);

        // Define the throttle
        var throttle = new ActionBlock<int>(i=>DoStuff(i));
        
        // Create the work set.
        for (int i = 0; i < work.Length; i++)
        {
            throttle.Post(i);
        }

        // indicate that there is no more work 
        throttle.Complete();

        // Wait for the work to complete.
        await throttle.Completion;

        Console.WriteLine("All done!");
        Console.ReadLine();
    }

    static void DoStuff(int data)
    {
        int wait = work[data];
        Console.WriteLine($"{data:D3} : Work will take {wait}ms");
        Thread.Sleep(wait);
    }
}

Also available on GitHub: https://github.com/colinangusmackay/ActionBlockThrottle/tree/master/src/01.SimpleActionBlock

This does the same as the first version, by default an ActionBlock does not parallelise any of the processing of the work. All the work is still processed sequentially.

The producer and consumer run in parallel

I said before this is for “little extra benefit”. So I should explain what I mean by that. There is now some parallelisation between the producer and the consumer portions. The for loop that contains the throttle.Post(...) (the producer) is running in parallel with the calls to DoStuff() (the consumer). You can see this if you slow down the producer and introduce some Console.WriteLine(...) statements to see things in action.

This is some example output from that version of the code.

000 : Posting Work Item 0.
000 : Work will take 32ms
001 : Posting Work Item 1.
001 : Work will take 179ms
002 : Posting Work Item 2.
003 : Posting Work Item 3.
004 : Posting Work Item 4.
002 : Work will take 28ms
005 : Posting Work Item 5.
003 : Work will take 9ms
004 : Work will take 100ms
006 : Posting Work Item 6.
007 : Posting Work Item 7.
005 : Work will take 109ms
008 : Posting Work Item 8.
009 : Posting Work Item 9.

I slowed the producer by introducing a wait of 50ms between posting items. As you can see in the time it took to post 10 items (it is zero based) it had only processed 6 items, but the producer and consumer are running simultaneously, so it is not waiting until the producer has completed before it starts processing the items.

Available on GitHub: https://github.com/colinangusmackay/ActionBlockThrottle/blob/master/src/02.SimpleActionBlockShowingProducerConsumer

Setting the Throttle

Finally, we get to the point that we can set some sort of throttle. In our use case we had a lot of work to do, most of which was actually waiting for external systems to respond, but if we threw everything in at once it would be overwhelmed.

Now we can set up some parallelism. The ActionBlock can take some options in the form of an ExecutionDataflowBlockOptions object. It has many options, but the one we’re interested in is MaxDegreeOfParallelism. The creation of the action block now looks like this:

ActionBlock<int> throttle = new ActionBlock<int>(
    action: i=>DoStuff(i),
    dataflowBlockOptions: new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 3
    });

In our example, we’re just going to set it to 3 for demonstration purposes, but you’ll likely want to experiment to see where you get the best results.

In the example application, I also added a small counter (tasksInProgress) to keep a count of the number of active tasks and added it to the Console.WriteLine(...) in the DoStuff(...) method. The output looks like this:

000 : Posting Work Item 0.
000 : [TIP:1] Work will take 34ms
001 : Posting Work Item 1.
001 : [TIP:2] Work will take 216ms
002 : Posting Work Item 2.
002 : [TIP:2] Work will take 177ms
003 : Posting Work Item 3.
003 : [TIP:3] Work will take 183ms
004 : Posting Work Item 4.
005 : Posting Work Item 5.
006 : Posting Work Item 6.
007 : Posting Work Item 7.
008 : Posting Work Item 8.
004 : [TIP:3] Work will take 15ms
009 : Posting Work Item 9.
005 : [TIP:3] Work will take 57ms
006 : [TIP:3] Work will take 85ms
010 : Posting Work Item 10.
... etc...

You can see at the start the number of simultaneously running tasks builds up to the MaxDegreeOfParallelism value that was set. So long as the producer part is producing work items faster than the consumer can consume them, the tasks in progress (TIP) will stay at or close to the MaxDegreeOfParallelism.

Code available on GitHub: https://github.com/colinangusmackay/ActionBlockThrottle/tree/master/src/03.MaxDegreesOfParallelismAsAThrottle

But why is the iterator operating in multiple-threads

Background

Recently, I had a bit of a problem with NHibernate when I was converting some code into parallel tasks. (If you have no interest in NHibernate, then don’t worry – it is just background to the issue I was having when I spotted this gulf between my expectation and reality. NHibernate is incidental to this and I won’t mention it much beyond this paragraph.) It turns out that Parallel.ForEach runs the iterator in multiple threads, not just the the action it performs on each item received from the iterator. NHibernate, being the source of the data was running inside the iterator and when I attached NHibernate Profiler to see what it could turn up it very quickly began reporting that the NHibernate session was running in multiple-threads and that NHibernate was not designed to be thread safe.

The Iterator Patten in .NET

In .NET the iterator pattern is exposed via an IEnumerator or IEnumerator<T> and there is some syntactic sugar so that you can create an iterator method using yield return. There is also syntactic sugar surrounding the consumption of iterators via foreach. This almost completely hides the complexities of IEnumerator implementations.

There are some limitations to this. The interface is inherently not thread safe as it does not provide for an atomic operation that retrieves an element and moves the internal pointer on to the next. You have to call MoveNext() followed by Current if it returned true. If the iterator needs thread-safety, it is the responsibility of the caller to provide it.

But, then this happens…

Knowing this, I would have assumed (always a bad idea, but I’m only human) that Parallel.ForEach() operates over the iterator in a single thread, but farms out each loop to different threads, but I was wrong. Try the following code for yourself and see what happens:

public class Program
{
    public static void Main(string[] args)
    {
        Parallel.ForEach(
            YieldedNumbers(),
            (n) => { Thread.Sleep(n); });
        Console.WriteLine("Done!");
        Console.ReadLine();
    }

    public static IEnumerable<int> YieldedNumbers()
    {
        Random rnd = new Random();
        int lastKnownThread = Thread.CurrentThread.ManagedThreadId;
        int detectedSwitches = 0;
        for (int i = 0; i < 1000; i++)
        {
            int currentThread = Thread.CurrentThread.ManagedThreadId;
            if (lastKnownThread != currentThread)
            {
                detectedSwitches++;
                Console.WriteLine(
                    $"{detectedSwitches}: Last known thread ({lastKnownThread}) is not the same as the current thread ({currentThread}).");
                lastKnownThread = currentThread;
            }
            yield return rnd.Next(10,150);
        }
    }
}

The Action<int> passed to the Parallel.ForEach simply simulates some work being done (and the times sent to the Thread.Sleep() are roughly analogous to the times of the tasks in the original project).

What I’ve done here also is detect when the thread changes and report that to the console. It happens roughly 15%-18% of the time on the runs I’ve made on my machine. Now that was surprising (not really, because NHibernate Profiler had already told me – but to have a very clean example of the same was). I can’t blame any weirdness in third party libraries. It happens with some very basic .NET code in a console application.

Possible Solutions

1. My first thought was to dump all the data retrieved from the iterator into a collection of some sort (e.g. an array or list), but the iterator was originally put in place because the volume of data was causing memory pressure. The app ran overnight and will process anything between a few hundred to a few hundred thousand customers and testing found that it significantly slowed down around the 7000 mark because of the size of the data, and fell over completely not far past that. So, the iterator that I created hides the fact that I now page the data, the calling code knows nothing about this paging and didn’t have to be modified. So that solution was out of the question, we’d be back to the problem we had a while ago.

2.The data could be processed in batches and each fully retrieved batch be run in parallel one at at time. I did try that but it just made the calling code difficult to read and more complex than it needed to be. The reader has to be able to understand why there are batches, and the person writing the code has to remember that the data may not fit an exact number of batches and will have to process the final batch outside the loop which adds to the cognitive load on the reader/maintainer.

public static void Main(string[] args)
{
    int batchSize = 97;
    List batch = new List<int>();
    foreach (int item in YieldedNumbers())
    {
        batch.Add(item);
        if (batch.Count >= batchSize)
            ProcessBatch(batch);
    }
    ProcessBatch(batch);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static int batchCount = 0;
private static void ProcessBatch(List<int> batch)
{
    batchCount ++;
    Console.WriteLine($"Processing batch {batchCount} containing {batch.Count} items");
    Parallel.ForEach(batch, (n) => { Thread.Sleep(n); });
    batch.Clear();
}

// The YieldedNumbers() method is unchanged from before.

The iterator is always called from a single thread and therefore never complains on this set up.

3. Use the Microsoft Data Flow for the Task Parallel library. Personally, I think this one is best because the pattern is clear and the complex bits can be moved away from the main algorithm. The only part I didn’t like was the effort to set up the Producer/Consumer pattern using this library, but it handles all the bits I want to abstract away quite nicely… And that set up can be abstracted out later. Here’s the basic algorithm.

public static void Main(string[] args)
{
    var producerOptions = new DataflowBlockOptions { BoundedCapacity = 97 };
    var buffer = new BufferBlock<int>(producerOptions);
    var consumerOptions = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = Environment.ProcessorCount,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    var consumer = new ActionBlock<int>( n=> {  Thread.Sleep(n); }, consumerOptions);
    buffer.LinkTo(consumer, linkOptions);
    Produce(buffer);
    Task.WaitAll(consumer.Completion);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static void Produce(ITargetBlock target)
{
    foreach (var n in YieldedNumbers())
    {
        // Normally, this will return immediately, but if the queue has
        // reached its limit then it will wait until the consumer has
        // processed items on the queue.
        Task.WaitAll(target.SendAsync(n));
    }
    // Set the target to the completed state to signal to the consumer
    // that no more data will be available.
    target.Complete();
}

I originally had the the Produce() method as an async/await method… But that didn’t work, it seems that doing that the iterator shifts around threads again because when the code wakes up after the await it may be restarted on a new thread. So I put it back to a simple Task.WaitAll() and it kept it all on the same thread.

The producer options are set so that the queue size is limited, it stops pulling from the producer if the queue reaches capacity and thus it keeps the app running smoothly. The producer won’t over produce.

The consumer options need to be set explicitly otherwise it acts on a single thread. Unlike other things in the TPL it won’t necessarily optimise for the number of cores you have, you have to specify that, and a crude rule of thumb for getting that number is Environment.ProcessorCount (crude, because if you have hyper threading it can treat that as being multiple processor cores). However, it is good enough unless you really need to optimise things accurately.

Now, a lot of this can be abstracted away so that the calling code can just get on with what it needs without the distractions that this pattern introduces.

Most of this code can be extracted out to a class that extends IEnumerable<T>

public static class IEnumerableExtensions
{
    public static void ConsumeInParallel<T>(this IEnumerable<T> source, Action<T> action, int queueLimit = int.MaxValue)
    {
        var producerOptions = new DataflowBlockOptions { BoundedCapacity = queueLimit };
        var buffer = new BufferBlock<T>(producerOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = Environment.ProcessorCount,
            MaxDegreeOfParallelism = Environment.ProcessorCount
        };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
        var consumer = new ActionBlock<T>(action, consumerOptions);
        buffer.LinkTo(consumer, linkOptions);
        Produce(source, buffer);
        Task.WaitAll(consumer.Completion);
    }

    private static void Produce<T>(IEnumerable<T> source, ITargetBlock<T> target)
    {
        foreach (var n in source)
            Task.WaitAll(target.SendAsync(n));
        target.Complete();
    }
}

With this, we can use any IEnumerator<T> as a source of data and it will happily process it. The queueLimit ensures that we don’t end up with too much data waiting to be processed as we don’t want memory pressures causing the app to become unstable.

The calling code now looks much neater:

public static void Main(string[] args)
{
    YieldedNumbers().ConsumeInParallel(n=> {Thread.Sleep(n);}, 97);

    Console.WriteLine("Done!");
    Console.ReadLine();
}