Parallelisation Talk examples – Parallel.For

This is some example code from my introductory talk on Parallelisation. Showing the difference between a standard sequential for loop and its parallel equivalent.

Code example 1: Serial processing of a for loop

class Program
{
    private static Random rnd = new Random();

    static void Main(string[] args)
    {
        DateTime start = DateTime.UtcNow;

        for (int i = 0; i < 20; i++)
            ProcessLoop(i);

        DateTime end = DateTime.UtcNow;
        TimeSpan duration = end - start;

        Console.WriteLine("Finished. Took {0}", duration);
    }

    private static void ProcessLoop(long i)
    {
        Console.WriteLine("Processing index {0}", i);

        // Simulate similar but slightly variable length processing
        int pause = rnd.Next(900, 1000);
        Thread.Sleep(pause);
    }
}

The output of the above code may look something like this:

Sequential for example

As you can see this takes just shy of 20 seconds to process 20 items.

Code Example 2: Parallel processing of a for loop

The Parallel class can be found in the System.Threading.Tasks namespace.

class Program
{
    private static Random rnd = new Random();

    static void Main(string[] args)
    {
        DateTime start = DateTime.UtcNow;

        Parallel.For(0, 20,
            (i) => ProcessLoop(i));

        DateTime end = DateTime.UtcNow;
        TimeSpan duration = end - start;

        Console.WriteLine("Finished. Took {0}", duration);

        Console.ReadLine();
    }

    private static void ProcessLoop(long i)
    {
        Console.WriteLine("Processing index {0}", i);

        // Simulate similar but slightly variable length processing
        int pause = rnd.Next(900, 1000);
        Thread.Sleep(pause);
    }
}

The output of the above code may look something like this:

Parallel.For Example

The result of this code is that it takes just shy of 5 second to process the 20 items. I have a 4 core processor so it would be in line with the expectation that the work is distributed across all 4 cores.

Tasks that create more work

I’m creating a program that parses a web page then follows the links and then parses the next set of web pages to eventually build a picture of an entire site. This means that as the program runs more work is being generated and more tasks can be launched to process each new page as it is discovered.

My original solution was simply to create code like this:

     1:  private void ProcessLink(string link)
     2:  {
     3:      var page = GetPageInformation(link);
     4:      var newLinks = GetNewLinks(page);
     5:   
     6:      foreach(var newLink in newLinks)
     7:      {
     8:          Action action = () => {  ProcessLink(newLink); };
     9:          Task.Factory.StartNew(action, TaskCreationOptions.AttachedToParent);
    10:      }
    11:  }

The premise is simple enough, build a list of new links from a page then for each of the new links start a new task. The new task is attached to the parent task (the task that is launching the new set of tasks)

However, it soon became apparent that this was quickly getting out of control and I had no idea what was still waiting to be processed, or that the same link was being queue up multiple times in many different threads and so on. I ended up putting in place so many mechanisms to prevent the code processing the same page over again in different threads that it was getting silly. For a small number of new tasks being launched, I’m sure that Task.Factory.StartNew() is perfectly suitable.

I eventually realised that I was heading down the wrong way and I needed to rethink my strategy altogether. I wanted to make the code parallelisable so that while I was waiting on one page I could be parsing and processing another page. So, I eventually refactored it to this:

   1:  public class SiteScraper
     2:  {
     3:      private ConcurrentDictionary<string, ScraperResults> completedWork = 
     4:          new ConcurrentDictionary<string, ScraperResults>();
     5:   
     6:      private List<string> currentWork;
     7:   
     8:      private ConcurrentQueue<string> futureWorkQueue = 
     9:          new ConcurrentQueue<string>();
    10:   
    11:      public void GetSiteInformation(string startingUrl)
    12:      {
    13:          currentWork = new List<string();
    14:          currentWork.Add(startingUrl.ToLowerInvariant());
    15:   
    16:          while(currentWork.Any())
    17:          {
    18:              Parallel.ForEach(currentWorkQueue, item => GetPageInformation(item));
    19:              BuildWorkQueue();
    20:          }
    21:      }
    22:   
    23:      private void BuildWorkQueue()
    24:      {
    25:          currentWork = new List<string>(futureWorkQueue
    26:              .Select(link => link.ToLowerInvariant()).Distinct()
    27:              .Where(link => IsLinkToBeProcessed(link)));
    28:   
    29:          futureWorkQueue = new ConcurrentQueue<string>();
    30:      }
    31:   
    32:      private void GetPageInformation(string url)
    33:      {
    34:          // Do stuff
    35:          ProcessNewLinks(newLinks)
    36:      }
    37:   
    38:      private void ProcessNewLinks(IEnumerable<string> newLinks)
    39:      {
    40:          foreach (string url in newLinks.Where(l => IsLinkToBeProcessed(l)))
    41:          {
    42:              futureWorkQueue.Enqueue(url);
    43:          }
    44:      }
    45:   
    46:   
    47:      // Other bits
    48:   
    49:  }

There is still some code to ensure duplicates are removed and not processed, but it become much easier to debug and know what has been processed and what is still to be processed than it was before.

The method GetSiteInformation (lines 11-21) handles the main part of the parallelisation. This is the key to this particular algorithm.

Before discussing what that does, I just want to explain the three collections set up as fields on the class (lines 3 to 9). The completedWork is a dictionary keyed on the url containing an object graph representing the bits of the page we are interested in. The currentWork (line 6) is a list of the current urls that are being processed. Finally, the futureWorkQueue contains a queue of all the new links that are discovered, which will feed into the next iteration.

The GetSiteInformation class creates the initial list of currentWork and processes it using Parallel.ForEach (line 18). On the first iteration only one item will be processed, but it should result in many new links to be processed. A call to BuildWorkQueue builds the new work queue for the next iteration which is controlled by the while loop (lines 16-20). When BuildWorkQueue creates no new items for the workQueue then the work is complete and the while loop exits.

BuildWorkQueue is called when all the existing work is completed. It then builds the new set of urls to be processed. The futureWorkQueue is the collection that was populated when the links get processed (see later). All the links are forced into lower case (while this may not be advisable for all websites, for my case it is sufficient), only distinct elements are processed as the futureWorkQueue could quite easily have been filled with duplicates and finally a check is made to ensure that the link has not already been processed (lines 25-27).

During the processing of a specific URL (lines 32-36 – mostly not shown) new links may be generated. Each of these will be be added to the futureWorkQueue (lines 40-43). Before enqueuing any link a check is made to ensure it has not already been processed.

There are other bits of the class that are not shown. For example the IsLinkToBeProcessed method (which checks the domain, whether it has been processed already and so on) and the code that populates the completedWork.

In this version of the code it is much easier to see what has been completed and what is still to do (or at least, what has been found to do).

Using semaphores to restrict access to resources

I’m in the process of building a small data extraction application. It uses the new Parallel Extensions in .NET 4 in order to more efficiently extract data from a web service. While some threads are blocked waiting on the web service to respond, other threads are working away processing the results of the previous call.

Initially when I set this up I didn’t throttle the calls to the web service. I let everything through. However, in this environment I quickly discovered that I was having to re-try calls a lot because the original call for some data was timing out. When I looked in Fiddler to see what was going on I discovered that as I ran the application I was getting over a screen full of started requests that were not finishing or just taking a very long time to complete. I was overloading the server and it couldn’t cope with the volume of requests.

With this in mind I added in some code to the class that initiated the web service calls in order to ensure that it didn’t call the web service too frequently. This is where the semaphores come in to play.

Semaphores are a type of synchronisation mechanism that allow you to limit access to some segment of code. No more than a specified number of threads may enter the segment of code at any one time. If more threads attempt to enter that segment of code than are permitted then any new thread arriving will be forced to wait until access is granted.

I’ll show you what I mean:

   1:  public class WebServiceHelper
   2:  {
   3:      private static Semaphore pool = new Semaphore(3, 3);
   4:   
   5:      public ResultsData GetData(RequestData request)
   6:      {
   7:          try
   8:          {
   9:              pool.WaitOne();
  10:              return GetDataImpl(request);
  11:          }
  12:          finally
  13:          {
  14:              pool.Release();
  15:          }
  16:      }
  17:   
  18:      private ResultsData GetDataImpl(RequestData request)
  19:      {
  20:          // Do stuff here
  21:      }
  22:   
  23:  }

This is just a fragment of the class in order to show just the important bits.

In line 3 we set up the Semaphore as a static, so that all instances of the class can have access to it. It doesn’t need to be a static if you are going to reuse the same instance of the class in many places, but for the purposes of this example I’m using a static.

The Semaphore is initialised with an initial count of 3 (first parameter) which means that there are three resources available currently, and a maximum count  also of 3 (second parameter) which means we can have a maximum of three resources in use at any one time.

In the GetData method (lines 5-16) I wrap the call that does the actual work in a try-finally block. If any exceptions are thrown here is not the place to handle them. The only thing this method should be concerned with is ensuring the resources are properly synchronised. In line 9 we wait for a resource to become available (the first three calls will not block because we’ve started off with three available) but after that calls may block if necessary. On line 10 we call the method that does the actual work we are interested in (this prevents cluttering up one method with the details of the work needing done and the synchronisation code). In the finally block (lines 12 to 15)  we ensure that the resource is released regardless of the ultimate outcome. It doesn’t matter if an exception was thrown or if it was successful we always release the resource back at the end of the operation.

WaitOne (line 9) does have overloads that accept a time to wait either as a TimeSpan or integer representing milliseconds. This means that you can ensure you are not blocking infinitely if an error occurs and the resource is never released.

That just about sums it up. I now have an application that I can parallelise yet ensure that I don’t overload the web server at the same time.

I should also point out that using Semaphores (or any kind of locking or synchronisation method) does reduce the parallelisability of the application, but they can be useful to ensure safe access to data or resources. However, there are also other techniques which help reduce the need for these synchronisation schemes.

Parallelisation in .NET 4.0 – The concurrent dictionary

One thing that I was always conscious of when developing concurrent code was that shared state is very difficult to deal with. It still is difficult to deal with, however the Parallel extensions have some things to help deal with shared information better and one of them is the subject of this post.

The ConcurrentDictionary has accessors and mutators that “try” and work over the data. If the operation fails then it returns false. If it works you get a true, naturally. To show this, I’ve written a small program that counts the words in Grimm’s Fairy Tales (which I downloaded from the Project Gutenberg website) and displayed the top forty most used words.

Here is the program:

   1:  class Program
   2:  {
   3:      private static ConcurrentDictionary<string, int> wordCounts =
   4:          new ConcurrentDictionary<string, int>();
   5:   
   6:      static void Main(string[] args)
   7:      {
   8:          string[] lines = File.ReadAllLines("grimms-fairy-tales.txt");
   9:          Parallel.ForEach(lines, line => { ProcessLine(line); });
  10:   
  11:          Console.WriteLine("There are {0} distinct words", wordCounts.Count);
  12:          var topForty = wordCounts.OrderByDescending(kvp => kvp.Value).Take(40);
  13:          foreach (KeyValuePair<string, int> word in topForty)
  14:          {
  15:              Console.WriteLine("{0}: {1}", word.Key, word.Value);
  16:          }
  17:          Console.ReadLine();
  18:      }
  19:   
  20:      private static void ProcessLine(string line)
  21:      {
  22:          var words = line.Split(' ')
  23:              .Select(w => w.Trim().ToLowerInvariant())
  24:              .Where(w => !string.IsNullOrEmpty(w));
  25:          foreach (string word in words)
  26:              CountWord(word);
  27:      }
  28:   
  29:      private static void CountWord(string word)
  30:      {
  31:          if (!wordCounts.TryAdd(word, 1))
  32:              UpdateCount(word);
  33:      }
  34:   
  35:      private static void UpdateCount(string word)
  36:      {
  37:          int value = wordCounts[word];
  38:          if (!wordCounts.TryUpdate(word, value + 1, value))
  39:          {
  40:              Console.WriteLine("Failed to count '{0}' (was {1}), trying again...",
  41:                  word, value);
  42:   
  43:              UpdateCount(word);
  44:          }
  45:      }
  46:  }

The ConcurrentDictionary is set up in line 3 &4  with the word as the key and the count as the value, but the important part is in the CountWord and UpdateCount methods (starting on line 29 and 35 respectively).

We start by attempting to add a word do the dictionary with a count of 1 (line 31). If that fails then we must have already added the word to the dictionary, in which case we will need to update the existing value (lines 37-44). In order to do that we need to get hold of the existing value (line 37). We can do that with a simple indexer using the word as the key, we then attempt to update the value (line 38). The reason I say we attempt to do that is that there are many threads operating on the same dictionary object and we the update may fail.

The TryUpdate method ensures that you are updating the correct thing as it asks you to pass in the original value and the new value. If someone got there before you (a race condition) the original value will be different to what is currently in the dictionary and the update will not happen. This ensures that the data is consistent.  In our case, we simply try again.

The result of the application is as follows.

Failed to count 'the' (was 298), trying again...
Failed to count 'the' (was 320), trying again...
Failed to count 'and' (was 337), trying again...
Failed to count 'of' (was 113), trying again...
Failed to count 'the' (was 979), trying again...
Failed to count 'the' (was 989), trying again...
Failed to count 'and' (was 698), trying again...
Failed to count 'well' (was 42), trying again...
Failed to count 'the' (was 4367), trying again...
Failed to count 'and' (was 3463), trying again...
Failed to count 'the' (was 4654), trying again...
Failed to count 'to' (was 1772), trying again...
Failed to count 'the' (was 4798), trying again...
Failed to count 'the' (was 4805), trying again...
Failed to count 'the' (was 4858), trying again...
Failed to count 'her' (was 508), trying again...
Failed to count 'and' (was 3693), trying again...
Failed to count 'and' (was 3705), trying again...
Failed to count 'and' (was 3719), trying again...
Failed to count 'the' (was 4909), trying again...
Failed to count 'she' (was 600), trying again...
Failed to count 'to' (was 1852), trying again...
Failed to count 'curdken' (was 3), trying again...
Failed to count 'the' (was 4665), trying again...
Failed to count 'which' (was 124), trying again...
Failed to count 'the' (was 5361), trying again...
Failed to count 'and' (was 4327), trying again...
Failed to count 'to' (was 2281), trying again...
Failed to count 'they' (was 709), trying again...
Failed to count 'they' (was 715), trying again...
Failed to count 'and' (was 4668), trying again...
Failed to count 'you' (was 906), trying again...
Failed to count 'of' (was 1402), trying again...
Failed to count 'the' (was 6708), trying again...
Failed to count 'and' (was 5149), trying again...
Failed to count 'snowdrop' (was 21), trying again...
Failed to count 'draw' (was 18), trying again...
Failed to count 'he' (was 1834), trying again...
There are 10369 distinct words
the: 7168
and: 5488
to: 2725
a: 1959
he: 1941
of: 1477
was: 1341
in: 1136
she: 1134
his: 1031
that: 1024
you: 981
it: 921
her: 886
but: 851
had: 829
they: 828
as: 770
i: 755
for: 740
with: 731
so: 693
not: 691
said: 678
when: 635
then: 630
at: 628
on: 576
will: 551
him: 544
all: 537
be: 523
have: 481
into: 478
is: 444
went: 432
came: 424
little: 381
one: 358
out: 349

As you can see in this simple example, a race condition was encountered 38 times.

Parallelisation in .NET 4.0 – Part 2 Throwing Exceptions

With more threads running simultaneously in an application there is increasing complexity when it comes to debugging. When exceptions are thrown you usually catch them somewhere and handle them. But what happens if you throw an exception inside a thread?

Naturally, if you can handle the exception within the thread then that makes life much easier. But what if an exception bubbles up and out into code that created the thread?

In the example in my previous post on Parallelisation in .NET 4.0 had the calls to a third party service happening in separate threads. So, what happens if somewhere in the call an exception is raised.

In the service call GetAvailability, I’ve simulated some error conditions to throw exceptions based on the input to illustrate the examples. This is what it looks like:

public HotelAvail GetAvailability(string hotelCode, DateTime startDate, int nights)
{
    // Throw some exceptions depending on the input.
    if (hotelCode == null)
        throw new ArgumentNullException("hotelCode");

    if ((hotelCode.Length > 10) || (hotelCode.Length == 0))
        throw new ArgumentOutOfRangeException(
            "Hotel Codes are 1 to 10 chars in length. Got code which was " +
            hotelCode.Length + " chars.");

    if (hotelCode.StartsWith("Z"))
        throw new AvailabilityException("Hotel code '" + hotelCode +
                                        "' does not exist"); // A custom exception type
    // ... etc. ...
}

The calling code, from the previous example, looks like this:

public IEnumerable<HotelAvail> GetAvailability(IEnumerable<string> codes,
        DateTime startDate, int numNights)
{
        return codes.AsParallel().Select(code =>
            new AvailService().GetAvailability(code, startDate, numNights))
            .ToList();
}

If we provide incorrect input into the service such that it causes exceptions to be raised then Visual Studio responds in the normal way by breaking the debugging session at the point closest to where the exception is thrown.

If we were to wrap the call to the service in a try catch block (as in the following code sample) then we’d except that Visual Studio wouldn’t break the debugging session as there is a handler (the catch block) for the exception.

public IEnumerable<HotelAvail> GetAvailabilityPlinqException(IEnumerable<string> codes,
        DateTime startDate, int numNights)
{
    try
    {
        return codes.AsParallel().Select(code =>
            new AvailService().GetAvailability(code, startDate, numNights))
            .ToList();
    }
    catch (Exception ex)
    {
        // Do stuff to handle the exception.
    }
    return null;
}

Normally, that would be the case, however if the handler is outside the thread that threw the exception, as in the above example, the situation is somewhat different. In this case the Exception Assistant will appear and highlight the exception (or the code nearest the exception if it can’t highlight the throw statement itself*)

AvailabilityException in Exception Assistant

This happens because the exception is not caught within the thread in which it was originally thrown.

The AggregateException

If you just tell the debugger to continue executing the application it will continue, but the code that created the threads will have to handle an AggregateException. This is a special exception class that contains an InnerExceptions (note the plural) property that contains all the exceptions thrown from each of the threads.

AggregateException.InnerExceptions

You can enumerate over each of the inner exceptions to find out what happened in each of the threads.

Be aware, however, that an Aggregate exception can, itself, contain an AggregateException. So simply calling InnerExceptions may yet yield another AggregateException. For example if the hierarchy of exceptions looks like this:

AggregateException Hierarchy

Then the results of iterating over the InnerExceptions will be:

foreach(Exception ex in aggregateException.InnerExceptions)
{
    // ... do stuff ...
}
  • AggregateException
  • ApplicationException

You can flatten the hierarchy into a single AggregateException object that doesn’t contain InnerExceptions with any additional AggregateException objects. To do this call Flatten() on the original AggregateException. This returns a new AggregateException which you can then call InnerExceptions on and not have to worry about any hierarchy.

For example:

foreach(Exception ex in aggregateException.Flatten().InnerExceptions)
{
    // ... do stuff ...
}

Which results in the following exceptions being enumerated by the loop:

  • ApplicationException
  • NullReferenceException
  • ArgumentException
  • DivideByZeroException

But it’s broken, why doesn’t it just stop?

Well, it does. Once a thread has thrown an exception that bubbles up and out then no new tasks are started, so no new threads are created, and no new work gets done. However, remember that there will be other threads running as well and if one breaks, maybe others will break too, or maybe they will complete successfully. We won’t know unless they are allowed to finish what they are doing.

Going back to the room availability example if the input hotel codes contain invalid codes then it will throw an exception that is not caught within the thread. What if a selection of good and bad hotel codes are passed:

1, 2, 3, Z123, 4, 5, 6, 1234567890ABC, 7, 8, 9

Of the above list “Z123” and “1234567890ABC” are both invalid and produce different exceptions. However, when running tests the AggregateException only contains one of the exceptions.

To show what happens, I’ve modified my “service” like this and run it through a console applications. Here’s the full code:

The service class

public class AvailService
{
    // ...

    public HotelAvail GetAvailability(string hotelCode, DateTime startDate, int nights)
    {
        Console.WriteLine("Start @ {0:HH-mm-ss.fff}: {1}", DateTime.Now, hotelCode);

        ValidateInput(hotelCode);

        // ... do stuff to process the request ...

        Console.WriteLine("  End @ {0:HH-mm-ss.fff}: {1}", DateTime.Now, hotelCode);
        return result;
    }

    private void ValidateInput(string hotelCode)
    {
        if (hotelCode == null)
        {
            Console.WriteLine("Error @ {0:HH-mm-ss.fff}: hotelCode is null", DateTime.Now);
            throw new ArgumentNullException("hotelCode");
        }

        if ((hotelCode.Length > 10) || (hotelCode.Length == 0))
        {
            Console.WriteLine("Error @ {0:HH-mm-ss.fff}: hotelCode is {1}", DateTime.Now, hotelCode);
            throw new ArgumentOutOfRangeException(
                "Hotel Codes are 1 to 10 chars in length. Got code which was " +
                hotelCode.Length + " chars.");
        }

        if (hotelCode.StartsWith("Z"))
        {
            Console.WriteLine("Error @ {0:HH-mm-ss.fff}: hotelCode is {1}", DateTime.Now, hotelCode);
            throw new AvailabilityException("Hotel code '" + hotelCode +
                                            "' does not exist");
        }
    }
}

The method on the controller class

public IEnumerable<HotelAvail> GetAvailability(IEnumerable<string> codes,
        DateTime startDate, int numNights)
{
    return codes.AsParallel().Select(code =>
        new AvailService().GetAvailability(code, startDate, numNights))
        .ToList();
}

The Main method on the Program class

static void Main(string[] args)
{
    string[] codes = "1,2,3,Z123,4,5,6,1234567890ABC,,7,8,9".Split(',');
    AvailController ctrl = new AvailController();

    DateTime start = DateTime.Now;
    try
    {
        var result = ctrl.GetAvailability(codes,
            DateTime.Today.AddDays(7.0), 2);
    }
    catch (AggregateException aex)
    {
        Console.WriteLine(aex.Message);

        foreach (Exception ex in aex.InnerExceptions)
            Console.WriteLine(" -- {0}", ex.Message);

    }
    finally
    {
        DateTime end = DateTime.Now;
        Console.WriteLine("Total time in ms: {0}",
                            (end - start).TotalMilliseconds);

    }
}

And the console output is:

Start @ 16-36-36.518: 7
Start @ 16-36-36.518: Z123
Start @ 16-36-36.518: 6
Start @ 16-36-36.518: 1
Error @ 16-36-36.526: hotelCode is Z123
  End @ 16-36-42.438: 1
  End @ 16-36-42.654: 6
  End @ 16-36-42.900: 7
One or more errors occurred.
 -- Hotel code 'Z123' does not exist
Total time in ms: 6400

As you can see only 4 items got started out of an initial input collection of 11 items. The error occurred 8ms after these items started. Those items that did not cause an error were allowed to continue to completion. The result variable in the Main method will never have anything because of the exception so we never get the results of the three items that did succeed.

Naturally, the best course of action is not to let the exception bubble up and out of the thread in which the code is executing.

 

 

* Note, there appears to be a bug in Visual Studio with the Exception Assistant not always highlighting the correct line of code.

Parallelisation in .NET 4.0 – Part 1 looping

In an upcoming project we have a need for using some parallelisation features. There are two aspects to this, one is that we have to make multiple calls out to a web service that can take some time to return and in the meantime we have to get data out of the CMS to match up to the data coming back from the web service.

I’ll be writing a series (just for the irony of it) of posts on these new features in .NET and how we will be implementing them.

The problem

We have a web service that we have to call to get data back to our system. Calls to the web service take in the region of 4 to 7 seconds each to return data to us. The performance of the web service does not degrade significantly if we make multiple calls to it.

The Solution (first attempt)

Since the calls to the web service are not altering state we can safely make those calls in parallel. There is a class called AvailService that calls the web service and gets the results back to us. If you’re interested the web service checks on the availability of rooms in a hotel based on the the hotel code you pass and the stay date range (expressed as a start date and number of nights).

What we could have done in a serial implementation is this:

public IEnumerable<hotelAvail> GetAvailabilitySerial(
    IEnumerable<string> codes, DateTime startDate, int numNights)
{
    List result = new List<hotelAvail>();

    foreach (string code in codes)
    {
        AvailService service = new AvailService();
        HotelAvail singleResult = service.
                GetAvailability(code, startDate, numNights);
        result.Add(singleResult);
    }

    return result;
}

This simply goes around each item and gets the availability of rooms in that hotel for the stay date range. It could be refactored into a LINQ expression, but I’m going to leave it as a fuller loop just to show more clearly what’s going on.

Using Parallel.For

When we change this to a parallel it doesn’t really change much. I’ve changed the list to an array which is set up with the correct size of the result set and each parallel iteration only interacts with one slot in the array.

public IEnumerable<HotelAvail> GetAvailability (
    IList<string> codes, DateTime startDate, int numNights)
{
    HotelAvail[] result = new HotelAvail[codes.Count];

    Parallel.For(0, codes.Count, i =>
        {
            string code = codes[i];
            result[i] = new AvailService().
                GetAvailability(
                    code, startDate, numNights);
        });

    return result;
}

The code in the lambda expression is the part that is parallelised. I’ve had to make some concessions here as well. The IEnumerable of codes is now an IList, this is because I need to be able to access specific indexes into the list in order to align it with the array. That way the there is no accidental overwriting of elements in the result set.

If the ordering of the output is important, (e.g. must be the same order as the input) then this will maintain that ordering. Many of the remaining solutions do not maintain the order.

However, there is a better way to do this that doesn’t involve setting up and maintaining structures in this way and it closer to our serial code.

Using a Parallel.ForEach and a ConcurrentBag

public IEnumerable<HotelAvail> GetAvailabilityConcurrentCollection(
    IEnumerable<string> codes,
    DateTime startDate, int numNights)
{
    ConcurrentBag<HotelAvail> result = new ConcurrentBag();

    Parallel.ForEach(codes, code => result.Add(
        new AvailService().
            GetAvailability(code, startDate, numNights)));

    return result;
}

The content of the Parallel.ForEach here is almost identical to the serial foreach version, except that the code is slightly more terse. However, this time I’m using a ConcurrentBag for the result collection. As the method has always returned an IEnumerable this will not change the return type of the method, meaning that a serial method can be made parallel (assuming other considerations necessary for parallelism are taken into account) fairly easily.

The order of the results may be quite different from the order of the input.

Using PLINQ

Finally, I’ve refactored the code using PLINQ. This example is really quite terse, but if you understand LINQ then it should be very easy to pick up.

public IEnumerable<HotelAvail> GetAvailabilityPlinq(
    IEnumerable<string> codes,
    DateTime startDate, int numNights)
{

    return codes.AsParallel().Select(code =>
        new AvailService().GetAvailability(code, startDate, numNights))
        .ToList();
}

Again, the order of the result may be quite different from the order of the input. If ordering is important to you you can instruct PLINQ to maintain the ordering by using .AsOrdered(). This ensures that the output from the PLINQ expression is in the same order as the input.

e.g.

 

public IEnumerable<HotelAvail> GetAvailabilityPlinq(
    IEnumerable<string> codes,
    DateTime startDate, int numNights)
{

    return codes.AsParallel().AsOrdered().Select(code =>
        new AvailService().GetAvailability(code, startDate, numNights))
        .ToList();
}

The important part to all this is the call to the service, which has remained the same throughout all the samples. It is really just the infrastructure around that call that has changed over these examples.

The results

This table and graph show the results of some tests I ran between the serial and parallel versions. The numbers across the top row and X-axis represent the number of calls to the service. The numbers in the table and Y-Axis represent the time (in seconds) to complete the calls.

 

1 2 3 4 5 6 7 8 9
Serial 5.52 11.22 16.72 22.60 28.59 34.32 40.32 45.67 51.64
Parallel 5.52 6.20 6.35 6.45 11.24 12.35 12.45 12.86 16.66

Parallel vs Serial processing

 

Finally, a warning about using parallelised code from Alex Mackey’s book Introducing .NET 4.0:

“Although parallelization enhancements make writing code to run in parallel much easier, don’t underestimate the increase in complexity that parallelizing an application can bring. Parallelization shares many of the same issues you might have experiences when creating multithreaded applications. You must take care when developing parallel applications to isolate code that can be parallelized.”

Improving performance with parallel code

While waiting for my car to get serviced today I finally managed to catch up on reading some articles in MSDN Magazine. One of them, on optimising managed code for multi-core machines, really caught my attention.

The article was about a new technology from Microsoft called Parallel Extensions to .NET Framework 3.5 (download) which is currently released as a Community Technology Preview (CTP) at the moment. The blurb on the Microsoft site says “Parallel Extensions to the .NET Framework is a managed programming model for data parallelism, task parallelism, and coordination on parallel hardware unified by a common work scheduler.”

What I was most excited about was the ease with which it becomes possible to make an algorithm take advantage of multiple cores without all that tedious mucking about with threadpools. From what I gather the extensions are able to optimise the parallelism across the available cores. A looping construct can be set up across many cores and each thread is internally given a queue of work to do. If a thread finishes before others it can take up the slack by taking some work from another thread’s work queue.

Obviously, if an algorithm never lent itself well to parallelism in the first place these extensions won’t help much. Also the developer is still going to have to deal with concurrent access to shared resources so it is not a panacea. Those caveats aside these extensions to the .NET will make the job of using multi-core machines to their best much easier.