Parallelisation Talk examples – Cancelling Tasks

This example showed what happens when tasks are cancelled. In this example, some tasks will be able to run to completion, others will be cancelled and other won’t even get a chance to start because the cancellation token was signalled before the task gets a chance to start.

Here is the code for the cancellation example shown in the talk

class Program
{
    static void Main(string[] args)
    {
        const int numTasks = 9;

        // Set up the cancellation source and get the token.
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        CancellationToken token = tokenSource.Token;

        // Set up the tasks
        Task[] tasks = new Task[numTasks];
        for (int i = 0; i < numTasks; i++)
            tasks[i] = Task.Factory.StartNew(() => PerformTask(token), token);

        // Now the tasks are all set up, show the state.
        // Most will be WaitingToRun, some will be Running
        foreach (Task t in tasks.OrderBy(t => t.Id))
            Console.WriteLine("Tasks {0} state: {1}", t.Id, t.Status);

        // Give some of the tasks a chance to do something.
        Thread.Sleep(1500);

        // Cancel the tasks
        Console.WriteLine("Cancelling tasks");
        tokenSource.Cancel();
        Console.WriteLine("Cancellation Signalled");

        try
        {
            // Wait for the tasks to cancel if they've not already completed
            Task.WaitAll(tasks);
        }
        catch (AggregateException aex)
        {
            aex.Handle(ex =>
            {
                // Handle the cancelled tasks
                TaskCanceledException tcex = ex as TaskCanceledException;
                if (tcex != null)
                {
                    Console.WriteLine("Handling cancellation of task {0}", tcex.Task.Id);
                    return true;
                }

                // Not handling any other types of exception.
                return false;
            });
        }

        // Show the state of each of the tasks.
        // Some will be RanToCompletion, others will be Cancelled.
        foreach(Task t in tasks.OrderBy(t => t.Id))
            Console.WriteLine("Tasks {0} state: {1}", t.Id, t.Status);


        Console.WriteLine("Program End");
        Console.ReadLine();
    }

    static void PerformTask(CancellationToken token)
    {
        try
        {
            // The loop simulates work that can be cooperatively cancelled.
            Console.WriteLine("Task {0}: Starting", Task.CurrentId);
            for (int i = 0; i < 4; i++)
            {
                // Check for the cancellation to be signalled
                token.ThrowIfCancellationRequested();

                // Write out a little bit showing the progress of the task
                Console.WriteLine("Task {0}: {1}/4 In progress", Task.CurrentId, i + 1);
                Thread.Sleep(500); // Simulate doing some work
            }
            // By getting here the task will RunToCompletion even if the
            // token has been signalled.
            Console.WriteLine("Task {0}: Finished", Task.CurrentId);
        }
        catch (OperationCanceledException)
        {
            // Any clean up code goes here.
            Console.WriteLine("Task {0}: Cancelling", Task.CurrentId);
            throw; // To ensure that the calling code knows the task was cancelled.
        }
        catch(Exception)
        {
            // Clean up other stuff
            throw; // If the calling code also needs to know.
        }
    }
}

 

Here is the output of the program (your results may vary):

Task 1: Starting
Task 1: 1/4 In progress
Task 2: Starting
Task 2: 1/4 In progress
Tasks 1 state: Running
Task 3: Starting
Task 3: 1/4 In progress
Tasks 2 state: Running
Task 4: Starting
Task 4: 1/4 In progress
Tasks 3 state: Running
Tasks 4 state: Running
Tasks 5 state: WaitingToRun
Tasks 6 state: WaitingToRun
Tasks 7 state: WaitingToRun
Tasks 8 state: WaitingToRun
Tasks 9 state: WaitingToRun
Task 1: 2/4 In progress
Task 2: 2/4 In progress
Task 3: 2/4 In progress
Task 4: 2/4 In progress
Task 1: 3/4 In progress
Task 2: 3/4 In progress
Task 4: 3/4 In progress
Task 3: 3/4 In progress
Task 1: 4/4 In progress
Task 2: 4/4 In progress
Task 4: 4/4 In progress
Task 3: 4/4 In progress
Task 5: Starting
Task 5: 1/4 In progress

To this point the tasks have been given a chance to operate normally. The tasks that have started are outputing to the console their progress. The main thread reports on the state of the tasks and shows tasks 1 to 4 are Running while the remainder are WaitingToRun. After a while the scheduler decides to start task 5.

Next the tasks are going to be cancelled.

Cancelling tasks
Cancellation Signalled
Task 1: Finished
Task 2: Finished
Task 4: Finished
Task 3: Finished
Task 5: Cancelling

When the cancellation token is signalled the tasks have to cooperate. Tasks 1 to 4 are too far gone and will run to completion. Task 5, which was only just started, cooperates with the cancellation request and writes that it is cancelling. No waiting tasks are started.

In the main thread, the control is blocked until all the tasks have either finished or cooperate with the cancellation request. Once the WaitAll unblocks the program handles any cancelled tasks in the catch block.

Handling cancellation of task 9
Handling cancellation of task 8
Handling cancellation of task 7
Handling cancellation of task 6
Handling cancellation of task 5

Tasks 6 to 9 never got a chance to start. Task 5 was started, but was cancelled. Therefore task 5’s cancellation can be handled inside the task and outside it. Different clean up may be required in each place.

Finally, the program lists the end state (See also: Task state transitions) of each of the tasks:

Tasks 1 state: RanToCompletion
Tasks 2 state: RanToCompletion
Tasks 3 state: RanToCompletion
Tasks 4 state: RanToCompletion
Tasks 5 state: Canceled
Tasks 6 state: Canceled
Tasks 7 state: Canceled
Tasks 8 state: Canceled
Tasks 9 state: Canceled
Program End

When writing code to handle cancelled tasks, watch out for this gotcha that can trip you up if you are not careful.

Tip of the day: Splitting a string when encountering whitespace

In .NET the string class has a Split method that splits the string at the separator character(s) that you specify. However, if you want to split the string at any instance of whitespace you don’t have to create a Split call that enumerates all those different types of whitespace… and there are actually quite a lot! Instead you can just call Split without any parameters and it will split at whitespace regardless of the type.

For example, the following program, in which I hope I’ve managed to use all the different types of whitespace in Unicode, will produce the output below:

static void Main(string[] args)
{
  string source = "Anu0020inspiredrcalligrapherncanu1680createu180epagesu2000ofu2001"+
    "beautytusingu2002sticku2003ink,u2004quill,u2005brush,u2006pick-axe,u2007buzzu2008"+
    "saw,u2009oru200aevenu202fstrawberryu205fjam."+
    Environment.NewLine+
    "Theu3000quicku2028brownu2029foxu0009jumpsu000aoveru000btheu000clazyu000ddog."+
    Environment.NewLine+
    "Whitespaceu0085Foru00a0the win!";

  string[] words = source.Split();

  foreach(string word in words)
  {
    Console.WriteLine(word);
  }
}

Produces this output:

An
inspired
calligrapher
can
create
pages
of
beauty
using
stick
ink,
quill,
brush,
pick-axe,
buzz
saw,
or
even
strawberry
jam.

The
quick
brown
fox
jumps
over
the
lazy
dog.

Whitespace
For
the
win!

Parallel Loop Anti-pattern

Here’s a quick parallel loop anti-pattern. In other words, don’t do this, it will only make you miserable.

If you want to start tasks in a loop watch out for including the loop variable as a closure to the task body. For example:

Task[] tasks = new Task[20];
for (int i = 0; i < 20; i++)
    tasks[i] = Task.Factory.StartNew(
        () => Console.WriteLine("The loop index is {0}", i));
Task.WaitAll(tasks);

What happens is that the variable i is updated in each loop iteration. So, the task uses the value of i as it is when the task runs. Since the task does not run in the loop (it may run at any time the task schedule sees fit to run the task) the value of i could have updated by the time the task actually runs.

In the example program above, my output showed that i was 20 for every single iteration. Incidentally, if you are using i for an indexer you’ll notice that 20 will be out of range for something with 20 elements (which uses indexes 0 to 19) which just adds to the misery.

If you have something like ReSharper installed the it will warn you that you “Access to modified closure” and underline the uses of i that are affected.

So, if you must use run the body of a loop in parallel you are much better off using Parallel.For than trying the above.

Task status state changes

Over the course of the last couple of months I’ve blogged a lot about the Task Parallel Library and mentioned a number of statuses that a task can have, but nowhere is a nice handy-dandy chart to show what those statuses are and how the transition from one to another. The green status show the normal line that most tasks will take. The purple and red show alternative paths that may be taken.

When a task is first created it status is Created – however you will almost never see this. This status only exists when you create a task using its constructor. Most of the time you would create tasks with Task.Factory.StartNew(…)

When you start a task the status will be WaitingToRun until such time as the task scheduler can actually run the task. For some tasks the transition will be almost instant, for others it will mean a wait while other tasks complete.

From WaitingToRun a task can transition to either Running or Cancelled. The latter transition happens if the cancellation token is signalled before the task gets a chance to start.

Once a task is running there are three possible exits for it. The normal exit is for the status to transition to RanToCompletion which means that the task completed without incident. The other two exits require exception handling in the calling code.

If an error happens that the task cannot (or does not) handle internally then it will transition to Faulted (see Tasks that throw exceptions). The AggregateException will contain details of all the exceptions in Faulted tasks as part of its InnerExceptions collection.

If the tasks are cancelled, any task that has not already completed will transition immediately to Cancelled. An AggregateException will contain details of all tasks that were cancelled (whether they had a chance to run or not) with TaskCanceledException inner exceptions. The exception to this is any task that was running already when the cancellation token was signalled and either ran to completion normally, or did not respond correctly to the cancellation request (see Cancelling parallel tasks).

Cancelling parallel tasks

UPDATE (7-June-2011): The post as it originally appeared had a bug in the code, the catch block in the task caught the wrong exception type. See the Gotcha section at the end for an explanation on why there are two types of exception for this.

I think, to date, I’ve mentioned most of the task lifecycle, but I’ve not talked about cancelling tasks yet. So here goes.

You can cancel tasks for what ever reason by passing in a cancellation token to the task. The task must be cooperative insomuch as it must watch the cancellation token to detect if a cancellation has been signalled then it can clean up and exit.

The basic program

So, the little example program to demonstrate this is this:

class Program
{
    static void Main(string[] args)
    {
        const int numTasks = 9;
        Task[] tasks = new Task[numTasks];
        for (int i = 0; i < 10; i++)
            tasks[i] = Task.Factory.StartNew(PerformTask);

        Task.WaitAll(tasks);

        foreach(Task t in tasks)
            Console.WriteLine("Tasks {0} state: {1}", t.Id, t.Status);

        Console.WriteLine("Program End");
        Console.ReadLine();
    }

    static void PerformTask()
    {
        Console.WriteLine("Task {0}: Starting", Task.CurrentId);
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine("Task {0}: {1}/3 In progress", Task.CurrentId, i+1);
            Thread.Sleep(500); // Simulate doing some work
        }
        Console.WriteLine("Task {0}: Finished", Task.CurrentId);
    }
}

So far this doesn’t do much. It starts 9 tasks and each runs to completion. Each task’s end state is RanToCompletion.

Setting up the CancellationToken

Now, if we introduce the cancellation token to the task we can cancel the task at some point during its execution. The Main method then gets changed to this:

static void Main(string[] args)
{
    const int numTasks = 9;

    CancellationTokenSource tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;

    Task[] tasks = new Task[numTasks];
    for (int i = 0; i < numTasks; i++)
        tasks[i] = Task.Factory.StartNew(() => PerformTask(token), token);

    Thread.Sleep(1500);
    Console.WriteLine("Cancelling tasks");
    tokenSource.Cancel();
    Console.WriteLine("Cancellation Signalled");

    Task.WaitAll(tasks);

    foreach(Task t in tasks)
        Console.WriteLine("Tasks {0} state: {1}", t.Id, t.Status);


    Console.WriteLine("Program End");
    Console.ReadLine();
}

The PerformTask method now takes a CancellationToken (but doesn’t yet do anything with it)

If this code is run, the Task.WaitAll method call will throw an AggregateException with a number of TaskCanceledException objects.

Handling cancelled tasks

You therefore have to surround your WaitAll method with a try/catch block and look out for TaskCanceledException objects and handle them as you need (see also: handling AggregateException exceptions). In my example I’m just going to output the fact to the console. The try/catch block looks like this:

try
{
    Task.WaitAll(tasks);
}
catch (AggregateException aex)
{
    aex.Handle(ex =>
    {
        TaskCanceledException tcex = ex as TaskCanceledException;
        if (tcex != null)
        {
            Console.WriteLine("Handling cancellation of task {0}", tcex.Task.Id);
            return true;
        }
        return false;
    });
}

The tasks that were in progress at the time the cancel was signalled complete as normal. Cancelling the tasks will not stop any currently running task.

Responding to a cancellation request: IsCancellationRequested

If the tasks are sufficiently short running allowing them to complete may be perfectly acceptable.

However, if a task is long running or it is safe to cancel them then you can allow your task to cooperate and respond to the token being signalled to cancel.

The CancellationToken object has a property you can check called IsCancellationRequested. For example:

static void PerformTask(CancellationToken token)
{
    Console.WriteLine("Task {0}: Starting", Task.CurrentId);
    for (int i = 0; i < 4; i++)
    {
        if (token.IsCancellationRequested)
        {
            Console.WriteLine("Task {0}: Cancelling", Task.CurrentId);
            return;
        }
        Console.WriteLine("Task {0}: {1}/3 In progress", Task.CurrentId, i+1);
        Thread.Sleep(500); // Simulate doing some work
    }
    Console.WriteLine("Task {0}: Finished", Task.CurrentId);
}

If you simply exit from your task, like the above example, then the Status of the task will be RanToCompletion as if the task completed normally. If you do not need to know whether a task actually completed or was cancelled then this may be completely acceptable.

Responding to a cancellation request: ThrowIfCancellationRequested

If you need to perform clean up or the calling code needs to know that a task has been cancelled then using the CancellationToken’s ThrowIfCancellationRequested() method may be a better choice.

If you do need to perform clean up inside your task, ensure that the OperationCanceledExcption is thrown again so that the calling code knows that the task was cancelled.

static void PerformTask(CancellationToken token)
{
    try
    {
        Console.WriteLine("Task {0}: Starting", Task.CurrentId);
        for (int i = 0; i < 4; i++)
        {
            token.ThrowIfCancellationRequested();
            Console.WriteLine("Task {0}: {1}/3 In progress", Task.CurrentId, i + 1);
            Thread.Sleep(500); // Simulate doing some work
        }
        Console.WriteLine("Task {0}: Finished", Task.CurrentId);
    }
    catch (OperationCanceledException)
    {
        // Any clean up code goes here.
        Console.WriteLine("Task {0}: Cancelling", Task.CurrentId);
        throw; // To ensure that the calling code knows the task was cancelled.
    }
    catch(Exception)
    {
        // Clean up other stuff
        throw; // If the calling code also needs to know.
    }
}

Remember that if you allow other exceptions to escape your task then the task’s status will be Faulted.

Gotcha!

This section was added on 7-June-2011.

One thing to watch out for is that the exception you get inside the task is different from the exception you get inside the AggregateException outside the task. Normally, you’d expect that the exception is passed through and becomes one of the InnerExceptions in the aggregate exceptions.

It you want to keep the code consistent and only deal with one exception type for cancelled tasks you can simple deal with the OperationCanceledException throughout (both inside and outside the tasks) as that is the base class. Outside the task the exception object is actually a TaskCanceledException.

The advantage of referencing the more specific TaskCanceledException outside the task is that the exception object also contains a reference to the Task that was cancelled. Inside the task the exception that ThrowIfCancellationRequested throws is an OperationCanceledException (which doesn’t contain the Task object, however you are inside the task at this point)

The other point to note is that outside the task, the TaskCanceledException object in the AggregateException object doesn’t contain much of the information you’d expect to find in an Exception object (such as a Stack Trace).

Tip of the day: Mount ISO files as drives

When you can download lots of DVD and CD images from likes of MSDN it can become rather annoying having to burn ISO images to physical CDs or DVDs. However, Virtual Clone Drive mounts an ISO image as a virtual drive so you don’t have to burn physical drives. This saves on the cost of physical disks and once mounted will typically run faster than a CD or DVD drive.

Building messages in parallel

I recently saw some code where the developer was attempting to build up messages inside tasks that were being reported outside of the task.

In a sequential system it is easy enough to do this. You have various options available to you, such as

  • message += …;
  • StringBuilder
  • Streams

However, in a parallel system these all fall down because you lose control over the sequencing. You can regain some control by using appropriate locks but then you add in bottlenecks around the synchronisation points which is something you want to minimise in a parallel system.

I’ll show you what I mean. Each example below is attempting to build up a large message containing messages from smaller subroutines. For the moment, let’s assume that the exact order of the individual messages are not important. It may be a series of log entries, or a list of errors to correct. The only important thing is that each individual message is not garbled in anyway. [Skip the code]

The example message is actually just a set of letters and numbers. In the final message each letter must appear 10 times and each number 26 times. Once the tasks have finished, the final messages are examined to see what happened.

Sequential Reference code

Here is the code:

class Program
{
    static void Main(string[] args)
    {
        string result = SequentialReference();

        ShowResult(result);

        Console.WriteLine("Program finished");
        Console.ReadLine();
    }

    private static string SequentialReference()
    {
        string result = string.Empty;

        for(int i=0; i<10; i++)
        {
            for(char c='A'; c<='Z'; c++)
            {
                result += string.Format("{0}{1}", i, c);
            }
            result += Environment.NewLine;
        }

        return result;
    }

    private static void ShowResult(string message)
    {
        // Code to display the message and the
        // results of the tests
    }
}

The code generates the messages, then outputs the results. For the reference sequential code (which is what we want all the results to look like) we get:

0A0B0C0D0E0F0G0H0I0J0K0L0M0N0O0P0Q0R0S0T0U0V0W0X0Y0Z
1A1B1C1D1E1F1G1H1I1J1K1L1M1N1O1P1Q1R1S1T1U1V1W1X1Y1Z
2A2B2C2D2E2F2G2H2I2J2K2L2M2N2O2P2Q2R2S2T2U2V2W2X2Y2Z
3A3B3C3D3E3F3G3H3I3J3K3L3M3N3O3P3Q3R3S3T3U3V3W3X3Y3Z
4A4B4C4D4E4F4G4H4I4J4K4L4M4N4O4P4Q4R4S4T4U4V4W4X4Y4Z
5A5B5C5D5E5F5G5H5I5J5K5L5M5N5O5P5Q5R5S5T5U5V5W5X5Y5Z
6A6B6C6D6E6F6G6H6I6J6K6L6M6N6O6P6Q6R6S6T6U6V6W6X6Y6Z
7A7B7C7D7E7F7G7H7I7J7K7L7M7N7O7P7Q7R7S7T7U7V7W7X7Y7Z
8A8B8C8D8E8F8G8H8I8J8K8L8M8N8O8P8Q8R8S8T8U8V8W8X8Y8Z
9A9B9C9D9E9F9G9H9I9J9K9L9M9N9O9P9Q9R9S9T9U9V9W9X9Y9Z

Does the result contain all the necessary parts?
10 of each letter; 26 of each number
0: 26 occurrences: Pass
1: 26 occurrences: Pass
2: 26 occurrences: Pass
3: 26 occurrences: Pass
4: 26 occurrences: Pass
5: 26 occurrences: Pass
6: 26 occurrences: Pass
7: 26 occurrences: Pass
8: 26 occurrences: Pass
9: 26 occurrences: Pass
A: 10 occurrences: Pass
B: 10 occurrences: Pass
C: 10 occurrences: Pass
D: 10 occurrences: Pass
E: 10 occurrences: Pass
F: 10 occurrences: Pass
G: 10 occurrences: Pass
H: 10 occurrences: Pass
I: 10 occurrences: Pass
J: 10 occurrences: Pass
K: 10 occurrences: Pass
L: 10 occurrences: Pass
M: 10 occurrences: Pass
N: 10 occurrences: Pass
O: 10 occurrences: Pass
P: 10 occurrences: Pass
Q: 10 occurrences: Pass
R: 10 occurrences: Pass
S: 10 occurrences: Pass
T: 10 occurrences: Pass
U: 10 occurrences: Pass
V: 10 occurrences: Pass
W: 10 occurrences: Pass
X: 10 occurrences: Pass
Y: 10 occurrences: Pass
Z: 10 occurrences: Pass
Does the result contain correctly sequenced individual messages?
Each sequence 52 chars; 0A0B0C... 1A1B1C.... etc.
Message 0: PASS - 52 char; PASS - Message content as expected
Message 1: PASS - 52 char; PASS - Message content as expected
Message 2: PASS - 52 char; PASS - Message content as expected
Message 3: PASS - 52 char; PASS - Message content as expected
Message 4: PASS - 52 char; PASS - Message content as expected
Message 5: PASS - 52 char; PASS - Message content as expected
Message 6: PASS - 52 char; PASS - Message content as expected
Message 7: PASS - 52 char; PASS - Message content as expected
Message 8: PASS - 52 char; PASS - Message content as expected
Message 9: PASS - 52 char; PASS - Message content as expected
Program finished

String Concatenation in parallel

The first bad parallel example is this one, where the message is built up using string concatenation.  The code is almost identical to the sequential example, except that the for loop is now a Parallel.For and I’ve injected a Sleep to simulate performing other work (such as getting the data necessary to build the messages).

class Program
{
    static void Main(string[] args)
    {
        string message = StringConcat();

        ShowResult(message);

        Console.WriteLine("Program finished");
        Console.ReadLine();
    }

    private static string StringConcat()
    {
        string result = string.Empty;

        Parallel.For(0, 10,
                        (int i) =>
                            {
                                for (char c = 'A'; c <= 'Z'; c++)
                                {
                                    result += string.Format("{0}{1}",i, c);
                                    Thread.Sleep(15);
                                }
                                result += Environment.NewLine;
                            });

        return result;
    }
}

And the results are starkly different:

0A2A4A6A8A2B0B6B4B8B2C0C6C8C2D0D6D4D8D2E0E4E8E2F0F4F6F8F2G0G4G6G8G2H0H4H8H2I0I4I
8I2J0J6J4J8J2K0K4K8K2L0L6L4L8L2M0M6M8M2N0N4N8N2O0O4O8O2P0P6P4P8P2Q0Q4Q8Q2R0R6R4R
8R2S0S6S4S8S2T0T4T8T2U0U4U8U2V0V6V4V8V2W0W6W8W2X0X6X8X2Y0Y4Y8Y2Z0Z4Z6Z8Z
3A
1A
5A
7A
9A3B1B5B7B9B3C1C7C9C3D1D5D9D3E1E7E9E3F1F5F7F9F3G1G5G9G3H1H5H9H3I1I7I5I9I3J1J7J9J
3K1K5K7K9K3L1L7L5L9L3M1M5M9M3N1N5N7N9N3O1O5O7O9O3P1P7P9P3Q1Q7Q5Q9Q3R1R7R5R9R1S3S
7S5S9S3T1T5T7T9T3U7U5U9U1V5V9V3W7W9W1X3X7X5X9X3Y1Y7Y5Y9Y1Z7Z9Z

Does the result contain all the necessary parts?
10 of each letter; 26 of each number
0: 26 occurrences: Pass
1: 24 occurrences: Fail
2: 26 occurrences: Pass
3: 24 occurrences: Fail
4: 22 occurrences: Fail
5: 20 occurrences: Fail
6: 16 occurrences: Fail
7: 21 occurrences: Fail
8: 26 occurrences: Pass
9: 26 occurrences: Pass
A: 10 occurrences: Pass
B: 10 occurrences: Pass
C: 8 occurrences: Fail
D: 9 occurrences: Fail
E: 8 occurrences: Fail
F: 10 occurrences: Pass
G: 9 occurrences: Fail
H: 8 occurrences: Fail
I: 9 occurrences: Fail
J: 9 occurrences: Fail
K: 9 occurrences: Fail
L: 10 occurrences: Pass
M: 8 occurrences: Fail
N: 9 occurrences: Fail
O: 9 occurrences: Fail
P: 9 occurrences: Fail
Q: 9 occurrences: Fail
R: 10 occurrences: Pass
S: 10 occurrences: Pass
T: 9 occurrences: Fail
U: 8 occurrences: Fail
V: 8 occurrences: Fail
W: 7 occurrences: Fail
X: 9 occurrences: Fail
Y: 9 occurrences: Fail
Z: 8 occurrences: Fail
Does the result contain correctly sequenced individual messages?
Each sequence 52 chars; 0A0B0C... 1A1B1C.... etc.
Message 0: FAIL - Expected 52 / Got 232 characters
Message 1: FAIL - Expected 52 / Got 2 characters
Message 2: FAIL - Expected 52 / Got 2 characters
Message 3: FAIL - Expected 52 / Got 2 characters
Message 4: FAIL - Expected 52 / Got 2 characters
Message 5: FAIL - Expected 52 / Got 222 characters
Program finished

As you can see, some of it works out… Most of it is a mess!

So what happened?

The string that will contain the result was created outside of the parallel tasks. Inside the tasks the result was updated without any synchronisation structure in place. That means that all the tasks could update the intermediate stages of the result and in the process overwrite each others changes, insert items out of sequence and so on.

I’ve written about some of ways that the Parallel Extensions can help with synchronisation of data across parallel tasks before (e.g. the ConcurrentDictionary being used to help with the aggregation of grouped counts) so perhaps here is an example of where another of the concurrent collections may come in handy. A ConcurrentBag could be used to hold each of the individual completed messages.

A ConcurrentBag is an unordered collection of objects that you can access across multiple threads in a safe way. You can add the same object to the bag as many times as you like. As it is unordered you cannot rely on the objects being retrieved in any particular sequence.

The code that builds the messages now looks like this:

private static string ConcurrentBagExample()
{
    ConcurrentBag<string> bag = new ConcurrentBag<string>();

    Parallel.For(0, 10,
                    (i) =>
                    {
                        string result = string.Empty;
                        for (char c = 'A'; c <= 'Z'; c++)
                        {
                            result += string.Format("{0}{1}", i, c);
                            Thread.Sleep(15);
                        }
                        bag.Add(result);
                    });

    return string.Join(Environment.NewLine, bag);
}

What has changed is that the building of the string has moved inside the task. This means that the task can only build the string for itself. Once it is done the string is added to the ConcurrentBag. The final string is built outside the parallel tasks. At the end of the method a simple string.Join() is used to pull all the data that’s been built up in the ConcurrentBag.

And now the messages are formed correctly. The only difference between the output of this program and that of the sequential reference program [see above] is the ordering of the individual messages:

1A1B1C1D1E1F1G1H1I1J1K1L1M1N1O1P1Q1R1S1T1U1V1W1X1Y1Z
0A0B0C0D0E0F0G0H0I0J0K0L0M0N0O0P0Q0R0S0T0U0V0W0X0Y0Z
3A3B3C3D3E3F3G3H3I3J3K3L3M3N3O3P3Q3R3S3T3U3V3W3X3Y3Z
2A2B2C2D2E2F2G2H2I2J2K2L2M2N2O2P2Q2R2S2T2U2V2W2X2Y2Z
5A5B5C5D5E5F5G5H5I5J5K5L5M5N5O5P5Q5R5S5T5U5V5W5X5Y5Z
4A4B4C4D4E4F4G4H4I4J4K4L4M4N4O4P4Q4R4S4T4U4V4W4X4Y4Z
7A7B7C7D7E7F7G7H7I7J7K7L7M7N7O7P7Q7R7S7T7U7V7W7X7Y7Z
6A6B6C6D6E6F6G6H6I6J6K6L6M6N6O6P6Q6R6S6T6U6V6W6X6Y6Z
9A9B9C9D9E9F9G9H9I9J9K9L9M9N9O9P9Q9R9S9T9U9V9W9X9Y9Z
8A8B8C8D8E8F8G8H8I8J8K8L8M8N8O8P8Q8R8S8T8U8V8W8X8Y8Z

Parallel Tasks and the HttpContext

A few days ago I spotted a question on StackOverflow by someone trying to use a parallel loop in an ASP.NET application. It may have been an ASP.NET MVC application (I don’t recall) but the issue is the same.

This person had some code in a parallel task that was using the HttpContext object. I would be hesitant to use that object in the first instance as I don’t know how thread safe it is. I suspect that since it holds a lot of information about the state of a request/response that it would be quite dangerous to access an instance in many threads.

His main issue what that he was getting a null back from HttpContext.Current inside the parallel tasks.

ASP.NET is already multithreaded. It abstracts most of that away so that when you are writing against it you only really ever see the request you are currently dealing with. Many other requests are happening around you, but the framework does its best to shield you from that so that you can write code cleanly. It is also its downfall in some cases.

If you don’t realise what the framework is doing for you then you could very easily fall into a number of traps when you get to the edges of that abstraction. So, when someone uses HttpContext.Current inside parallel tasks not realising that there must already by multiple requests being handled, and therefore there must be multiple simultaneous HttpContext objects floating around masquerading as the Current context. It can become very difficult to track down bugs if you know what the constraints of what Current means in this… erm… context.

Ultimately, HttpContext.Current is only available on the thread that you started with in ASP.NET. If you create new threads then it is no longer available unless you explicitly set it yourself.

Handling AggregateExceptions

I’ve written a couple of posts (see also) about how the AggregateException plays its part in exception handling in parallel systems. However, it has another trick up its sleeve when it comes to handling exceptions.

AggregateException has a Handle method that takes a delegate of Func<Exception, bool> i.e. It takes an Exception as a parameter and returns a bool. The return value indicates whether the function handled the exception or not.

Here is an example program that shows what how it works.

class Program
{
    static void Main(string[] args)
    {
        try
        {
            DoWork();
        }
        catch(AggregateException aex)
        {
            Console.WriteLine("Handle Remaining Exceptions");
            foreach(Exception ex in aex.InnerExceptions)
            {
                Console.WriteLine("{0}: {1}", ex.GetType().Name, ex.Message);
            }
        }

        Console.ReadLine();
    }

    private static void DoWork()
    {
        const int numTasks = 20;
        Task[] tasks = new Task[numTasks];
        for (int i = 0; i < numTasks; i++)
            tasks[i] = Task.Factory.StartNew(PerformTask);

        Thread.Sleep(2500);

        try
        {
            Task.WaitAll(tasks);
        }
        catch(AggregateException aex)
        {
            Console.WriteLine("AggregateException.Handle...");
            aex.Handle(ex => HandleException(ex));
            Console.WriteLine("Finished handling exceptions."); // This never shows
        }
    }

    public static bool HandleException(Exception ex)
    {
        if (ex is OddException)
        {
            Console.WriteLine("Handling: {0}", ex.Message);
            return true;
        }

        Console.WriteLine("Not handling: {0}", ex.Message);
        return false;
    }

    public static void PerformTask()
    {
        int? id = Task.CurrentId;
        Console.WriteLine("Performing Task {0}", id);

        if (id.Value % 13 == 0)
            throw new TriskaidekaException("Mwaaahaahaahaahaaaaaaaa!");

        if (id.Value % 2 == 1)
            throw new OddException("The task ("+id+") is distinctly odd.");
    }
}

The program starts 20 tasks (DoWork). Each task (PerformTask) simply outputs a line to the console to say what it’s id is and then throws an exception depending on some condition. There are two types of exception that it can throw.

Back in the main thread (DoWork) a Sleep statement gives the tasks a chance to get going (and hopefully complete). During this time, the tasks get the opportunity to output the following.

Performing Task 1
Performing Task 2
Performing Task 4
Performing Task 3
Performing Task 5
Performing Task 8
Performing Task 9
Performing Task 10
Performing Task 12
Performing Task 13
Performing Task 6
Performing Task 14
Performing Task 7
Performing Task 16
Performing Task 17
Performing Task 18
Performing Task 15
Performing Task 11
Performing Task 19
Performing Task 20

Then the Task.WaitAll statement is called which will potentially throw an AggregateException (in fact it will with 10 InnerExceptions).

Since the Task.WaitAll call is wrapped in a try/catch the AggregateException is caught. We output a message to say the exceptions are being handled. The AggregateException.Handle method calls the method given (HandleException) once for each of the InnerExceptions.

AggregateException.Handle...

The HandleException method either handles the exception (in which case it returns true) or it doesn’t (so returning false). In each case it also writes to the console to say what it has done. That console output looks like this:

Handling: The task (19) is distinctly odd.
Handling: The task (17) is distinctly odd.
Handling: The task (15) is distinctly odd.
Not handling: Mwaaahaahaahaahaaaaaaaa!
Handling: The task (11) is distinctly odd.
Handling: The task (9) is distinctly odd.
Handling: The task (7) is distinctly odd.
Handling: The task (5) is distinctly odd.
Handling: The task (3) is distinctly odd.
Handling: The task (1) is distinctly odd.

The AggregateException.Handle method then checks to see if any of the exceptions remain unhandled. If there are still unhandled exceptions then it rethrows. Since there is one remaining exception that is unhandled the line of code after the call to Handle is never called.

The try/catch block in the Main method catches AggregateException and just loops over the remaining exceptions to show what was left unhandled.

Handle Remaining Exceptions
TriskaidekaException: Mwaaahaahaahaahaaaaaaaa!

See also