Feedback from my DDD Scotland Talk on Parallisation

DDD Scotland 2011 Talk Opening SlideI got my feedback from my DDD Scotland 2011 talk on Parallelisation. I was actually pleasantly surprised. I guess I was being a little too self critical and the talk went over a lot better than I thought it had.

Some of the highlights:

  • Good clear samples and demos.
  • Enthusiastic speaker who really knew his stuff. Great talk!
  • Nice easy to understand examples. Getting the concepts across without clutter.
  • Useful info. Genuinely learnt something new.

And of course my favourite comment (despite being somewhat irrelevant, or should that be irreverent): Colin’s funky hair.

There were a couple of points that I need to address in future versions of the talk.

I only gave an overview of locking and the only demo that went close was the ConcurrentDictionary example in which all the locking mechanisms are internal to the ConcurrentDictionary. One person wanted more detail on locking so I shall endeavour to add a little extra into the presentation for DDD South West on locking including, if time allows, a specific demo.

The aspect of locking I need to address, is that I talked about when I used a semaphore in a project to restrict access to a scarce resource, but again I didn’t elaborate on it and another person would have found an example of a semaphore being used useful. I have already written about semaphores in a previous blog post, so I shall try and work that in to the next version of the presentation.

The other part is that the intro appears to be a little long and I need to shorten than slightly. If I can do that, it will at least free up space in a one hour talk to add the additional information on locking in later on.

I really appreciate the additional few moments people took at the end of my talk to write specifically what they enjoyed and what they disliked about my presentation, especially as lunch was waiting for them. It really gives me something to work with in order to improve the talk.

Parallelisation Talk Examples – ConcurrentDictionary

The example used in the talk was one I had already blogged about. The original blog entry the example was based upon is here: Parallelisation in .NET 4.0 – The ConcurrentDictionary.

Code Example

class Program
    private static ConcurrentDictionary<string, int> wordCounts =
        new ConcurrentDictionary<string, int>();

    static void Main(string[] args)
        string[] lines = File.ReadAllLines("grimms-fairy-tales.txt");
        Parallel.ForEach(lines, ProcessLine);

        Console.WriteLine("There are {0} distinct words", wordCounts.Count);
        var topForty = wordCounts.OrderByDescending(kvp => kvp.Value).Take(40);
        foreach (KeyValuePair word in topForty)
            Console.WriteLine("{0}: {1}", word.Key, word.Value);

    private static void ProcessLine(string line)
        var words = line.Split(' ')
            .Select(w => w.Trim().ToLowerInvariant())
            .Where(w => !string.IsNullOrEmpty(w));
        foreach (string word in words)

    private static void CountWord(string word)
        if (!wordCounts.TryAdd(word, 1))

    private static void UpdateCount(string word)
        int value = wordCounts[word];
        if (!wordCounts.TryUpdate(word, value + 1, value))
            Console.WriteLine("Failed to count '{0}' (was {1}), trying again...",
                word, value);



Tasks that create more work

I’m creating a program that parses a web page then follows the links and then parses the next set of web pages to eventually build a picture of an entire site. This means that as the program runs more work is being generated and more tasks can be launched to process each new page as it is discovered.

My original solution was simply to create code like this:

     1:  private void ProcessLink(string link)
     2:  {
     3:      var page = GetPageInformation(link);
     4:      var newLinks = GetNewLinks(page);
     6:      foreach(var newLink in newLinks)
     7:      {
     8:          Action action = () => {  ProcessLink(newLink); };
     9:          Task.Factory.StartNew(action, TaskCreationOptions.AttachedToParent);
    10:      }
    11:  }

The premise is simple enough, build a list of new links from a page then for each of the new links start a new task. The new task is attached to the parent task (the task that is launching the new set of tasks)

However, it soon became apparent that this was quickly getting out of control and I had no idea what was still waiting to be processed, or that the same link was being queue up multiple times in many different threads and so on. I ended up putting in place so many mechanisms to prevent the code processing the same page over again in different threads that it was getting silly. For a small number of new tasks being launched, I’m sure that Task.Factory.StartNew() is perfectly suitable.

I eventually realised that I was heading down the wrong way and I needed to rethink my strategy altogether. I wanted to make the code parallelisable so that while I was waiting on one page I could be parsing and processing another page. So, I eventually refactored it to this:

   1:  public class SiteScraper
     2:  {
     3:      private ConcurrentDictionary<string, ScraperResults> completedWork = 
     4:          new ConcurrentDictionary<string, ScraperResults>();
     6:      private List<string> currentWork;
     8:      private ConcurrentQueue<string> futureWorkQueue = 
     9:          new ConcurrentQueue<string>();
    11:      public void GetSiteInformation(string startingUrl)
    12:      {
    13:          currentWork = new List<string();
    14:          currentWork.Add(startingUrl.ToLowerInvariant());
    16:          while(currentWork.Any())
    17:          {
    18:              Parallel.ForEach(currentWorkQueue, item => GetPageInformation(item));
    19:              BuildWorkQueue();
    20:          }
    21:      }
    23:      private void BuildWorkQueue()
    24:      {
    25:          currentWork = new List<string>(futureWorkQueue
    26:              .Select(link => link.ToLowerInvariant()).Distinct()
    27:              .Where(link => IsLinkToBeProcessed(link)));
    29:          futureWorkQueue = new ConcurrentQueue<string>();
    30:      }
    32:      private void GetPageInformation(string url)
    33:      {
    34:          // Do stuff
    35:          ProcessNewLinks(newLinks)
    36:      }
    38:      private void ProcessNewLinks(IEnumerable<string> newLinks)
    39:      {
    40:          foreach (string url in newLinks.Where(l => IsLinkToBeProcessed(l)))
    41:          {
    42:              futureWorkQueue.Enqueue(url);
    43:          }
    44:      }
    47:      // Other bits
    49:  }

There is still some code to ensure duplicates are removed and not processed, but it become much easier to debug and know what has been processed and what is still to be processed than it was before.

The method GetSiteInformation (lines 11-21) handles the main part of the parallelisation. This is the key to this particular algorithm.

Before discussing what that does, I just want to explain the three collections set up as fields on the class (lines 3 to 9). The completedWork is a dictionary keyed on the url containing an object graph representing the bits of the page we are interested in. The currentWork (line 6) is a list of the current urls that are being processed. Finally, the futureWorkQueue contains a queue of all the new links that are discovered, which will feed into the next iteration.

The GetSiteInformation class creates the initial list of currentWork and processes it using Parallel.ForEach (line 18). On the first iteration only one item will be processed, but it should result in many new links to be processed. A call to BuildWorkQueue builds the new work queue for the next iteration which is controlled by the while loop (lines 16-20). When BuildWorkQueue creates no new items for the workQueue then the work is complete and the while loop exits.

BuildWorkQueue is called when all the existing work is completed. It then builds the new set of urls to be processed. The futureWorkQueue is the collection that was populated when the links get processed (see later). All the links are forced into lower case (while this may not be advisable for all websites, for my case it is sufficient), only distinct elements are processed as the futureWorkQueue could quite easily have been filled with duplicates and finally a check is made to ensure that the link has not already been processed (lines 25-27).

During the processing of a specific URL (lines 32-36 – mostly not shown) new links may be generated. Each of these will be be added to the futureWorkQueue (lines 40-43). Before enqueuing any link a check is made to ensure it has not already been processed.

There are other bits of the class that are not shown. For example the IsLinkToBeProcessed method (which checks the domain, whether it has been processed already and so on) and the code that populates the completedWork.

In this version of the code it is much easier to see what has been completed and what is still to do (or at least, what has been found to do).

Parallelisation in .NET 4.0 – The concurrent dictionary

One thing that I was always conscious of when developing concurrent code was that shared state is very difficult to deal with. It still is difficult to deal with, however the Parallel extensions have some things to help deal with shared information better and one of them is the subject of this post.

The ConcurrentDictionary has accessors and mutators that “try” and work over the data. If the operation fails then it returns false. If it works you get a true, naturally. To show this, I’ve written a small program that counts the words in Grimm’s Fairy Tales (which I downloaded from the Project Gutenberg website) and displayed the top forty most used words.

Here is the program:

   1:  class Program
   2:  {
   3:      private static ConcurrentDictionary<string, int> wordCounts =
   4:          new ConcurrentDictionary<string, int>();
   6:      static void Main(string[] args)
   7:      {
   8:          string[] lines = File.ReadAllLines("grimms-fairy-tales.txt");
   9:          Parallel.ForEach(lines, line => { ProcessLine(line); });
  11:          Console.WriteLine("There are {0} distinct words", wordCounts.Count);
  12:          var topForty = wordCounts.OrderByDescending(kvp => kvp.Value).Take(40);
  13:          foreach (KeyValuePair<string, int> word in topForty)
  14:          {
  15:              Console.WriteLine("{0}: {1}", word.Key, word.Value);
  16:          }
  17:          Console.ReadLine();
  18:      }
  20:      private static void ProcessLine(string line)
  21:      {
  22:          var words = line.Split(' ')
  23:              .Select(w => w.Trim().ToLowerInvariant())
  24:              .Where(w => !string.IsNullOrEmpty(w));
  25:          foreach (string word in words)
  26:              CountWord(word);
  27:      }
  29:      private static void CountWord(string word)
  30:      {
  31:          if (!wordCounts.TryAdd(word, 1))
  32:              UpdateCount(word);
  33:      }
  35:      private static void UpdateCount(string word)
  36:      {
  37:          int value = wordCounts[word];
  38:          if (!wordCounts.TryUpdate(word, value + 1, value))
  39:          {
  40:              Console.WriteLine("Failed to count '{0}' (was {1}), trying again...",
  41:                  word, value);
  43:              UpdateCount(word);
  44:          }
  45:      }
  46:  }

The ConcurrentDictionary is set up in line 3 &4  with the word as the key and the count as the value, but the important part is in the CountWord and UpdateCount methods (starting on line 29 and 35 respectively).

We start by attempting to add a word do the dictionary with a count of 1 (line 31). If that fails then we must have already added the word to the dictionary, in which case we will need to update the existing value (lines 37-44). In order to do that we need to get hold of the existing value (line 37). We can do that with a simple indexer using the word as the key, we then attempt to update the value (line 38). The reason I say we attempt to do that is that there are many threads operating on the same dictionary object and we the update may fail.

The TryUpdate method ensures that you are updating the correct thing as it asks you to pass in the original value and the new value. If someone got there before you (a race condition) the original value will be different to what is currently in the dictionary and the update will not happen. This ensures that the data is consistent.  In our case, we simply try again.

The result of the application is as follows.

Failed to count 'the' (was 298), trying again...
Failed to count 'the' (was 320), trying again...
Failed to count 'and' (was 337), trying again...
Failed to count 'of' (was 113), trying again...
Failed to count 'the' (was 979), trying again...
Failed to count 'the' (was 989), trying again...
Failed to count 'and' (was 698), trying again...
Failed to count 'well' (was 42), trying again...
Failed to count 'the' (was 4367), trying again...
Failed to count 'and' (was 3463), trying again...
Failed to count 'the' (was 4654), trying again...
Failed to count 'to' (was 1772), trying again...
Failed to count 'the' (was 4798), trying again...
Failed to count 'the' (was 4805), trying again...
Failed to count 'the' (was 4858), trying again...
Failed to count 'her' (was 508), trying again...
Failed to count 'and' (was 3693), trying again...
Failed to count 'and' (was 3705), trying again...
Failed to count 'and' (was 3719), trying again...
Failed to count 'the' (was 4909), trying again...
Failed to count 'she' (was 600), trying again...
Failed to count 'to' (was 1852), trying again...
Failed to count 'curdken' (was 3), trying again...
Failed to count 'the' (was 4665), trying again...
Failed to count 'which' (was 124), trying again...
Failed to count 'the' (was 5361), trying again...
Failed to count 'and' (was 4327), trying again...
Failed to count 'to' (was 2281), trying again...
Failed to count 'they' (was 709), trying again...
Failed to count 'they' (was 715), trying again...
Failed to count 'and' (was 4668), trying again...
Failed to count 'you' (was 906), trying again...
Failed to count 'of' (was 1402), trying again...
Failed to count 'the' (was 6708), trying again...
Failed to count 'and' (was 5149), trying again...
Failed to count 'snowdrop' (was 21), trying again...
Failed to count 'draw' (was 18), trying again...
Failed to count 'he' (was 1834), trying again...
There are 10369 distinct words
the: 7168
and: 5488
to: 2725
a: 1959
he: 1941
of: 1477
was: 1341
in: 1136
she: 1134
his: 1031
that: 1024
you: 981
it: 921
her: 886
but: 851
had: 829
they: 828
as: 770
i: 755
for: 740
with: 731
so: 693
not: 691
said: 678
when: 635
then: 630
at: 628
on: 576
will: 551
him: 544
all: 537
be: 523
have: 481
into: 478
is: 444
went: 432
came: 424
little: 381
one: 358
out: 349

As you can see in this simple example, a race condition was encountered 38 times.