Software Development

But why is the iterator operating in multiple-threads

Background

Recently, I had a bit of a problem with NHibernate when I was converting some code into parallel tasks. (If you have no interest in NHibernate, then don’t worry – it is just background to the issue I was having when I spotted this gulf between my expectation and reality. NHibernate is incidental to this and I won’t mention it much beyond this paragraph.) It turns out that Parallel.ForEach runs the iterator in multiple threads, not just the the action it performs on each item received from the iterator. NHibernate, being the source of the data was running inside the iterator and when I attached NHibernate Profiler to see what it could turn up it very quickly began reporting that the NHibernate session was running in multiple-threads and that NHibernate was not designed to be thread safe.

The Iterator Patten in .NET

In .NET the iterator pattern is exposed via an IEnumerator or IEnumerator<T> and there is some syntactic sugar so that you can create an iterator method using yield return. There is also syntactic sugar surrounding the consumption of iterators via foreach. This almost completely hides the complexities of IEnumerator implementations.

There are some limitations to this. The interface is inherently not thread safe as it does not provide for an atomic operation that retrieves an element and moves the internal pointer on to the next. You have to call MoveNext() followed by Current if it returned true. If the iterator needs thread-safety, it is the responsibility of the caller to provide it.

But, then this happens…

Knowing this, I would have assumed (always a bad idea, but I’m only human) that Parallel.ForEach() operates over the iterator in a single thread, but farms out each loop to different threads, but I was wrong. Try the following code for yourself and see what happens:

public class Program
{
    public static void Main(string[] args)
    {
        Parallel.ForEach(
            YieldedNumbers(),
            (n) => { Thread.Sleep(n); });
        Console.WriteLine("Done!");
        Console.ReadLine();
    }

    public static IEnumerable<int> YieldedNumbers()
    {
        Random rnd = new Random();
        int lastKnownThread = Thread.CurrentThread.ManagedThreadId;
        int detectedSwitches = 0;
        for (int i = 0; i < 1000; i++)
        {
            int currentThread = Thread.CurrentThread.ManagedThreadId;
            if (lastKnownThread != currentThread)
            {
                detectedSwitches++;
                Console.WriteLine(
                    $"{detectedSwitches}: Last known thread ({lastKnownThread}) is not the same as the current thread ({currentThread}).");
                lastKnownThread = currentThread;
            }
            yield return rnd.Next(10,150);
        }
    }
}

The Action<int> passed to the Parallel.ForEach simply simulates some work being done (and the times sent to the Thread.Sleep() are roughly analogous to the times of the tasks in the original project).

What I’ve done here also is detect when the thread changes and report that to the console. It happens roughly 15%-18% of the time on the runs I’ve made on my machine. Now that was surprising (not really, because NHibernate Profiler had already told me – but to have a very clean example of the same was). I can’t blame any weirdness in third party libraries. It happens with some very basic .NET code in a console application.

Possible Solutions

1. My first thought was to dump all the data retrieved from the iterator into a collection of some sort (e.g. an array or list), but the iterator was originally put in place because the volume of data was causing memory pressure. The app ran overnight and will process anything between a few hundred to a few hundred thousand customers and testing found that it significantly slowed down around the 7000 mark because of the size of the data, and fell over completely not far past that. So, the iterator that I created hides the fact that I now page the data, the calling code knows nothing about this paging and didn’t have to be modified. So that solution was out of the question, we’d be back to the problem we had a while ago.

2.The data could be processed in batches and each fully retrieved batch be run in parallel one at at time. I did try that but it just made the calling code difficult to read and more complex than it needed to be. The reader has to be able to understand why there are batches, and the person writing the code has to remember that the data may not fit an exact number of batches and will have to process the final batch outside the loop which adds to the cognitive load on the reader/maintainer.

public static void Main(string[] args)
{
    int batchSize = 97;
    List batch = new List<int>();
    foreach (int item in YieldedNumbers())
    {
        batch.Add(item);
        if (batch.Count >= batchSize)
            ProcessBatch(batch);
    }
    ProcessBatch(batch);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static int batchCount = 0;
private static void ProcessBatch(List<int> batch)
{
    batchCount ++;
    Console.WriteLine($"Processing batch {batchCount} containing {batch.Count} items");
    Parallel.ForEach(batch, (n) => { Thread.Sleep(n); });
    batch.Clear();
}

// The YieldedNumbers() method is unchanged from before.

The iterator is always called from a single thread and therefore never complains on this set up.

3. Use the Microsoft Data Flow for the Task Parallel library. Personally, I think this one is best because the pattern is clear and the complex bits can be moved away from the main algorithm. The only part I didn’t like was the effort to set up the Producer/Consumer pattern using this library, but it handles all the bits I want to abstract away quite nicely… And that set up can be abstracted out later. Here’s the basic algorithm.

public static void Main(string[] args)
{
    var producerOptions = new DataflowBlockOptions { BoundedCapacity = 97 };
    var buffer = new BufferBlock<int>(producerOptions);
    var consumerOptions = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = Environment.ProcessorCount,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    var consumer = new ActionBlock<int>( n=> {  Thread.Sleep(n); }, consumerOptions);
    buffer.LinkTo(consumer, linkOptions);
    Produce(buffer);
    Task.WaitAll(consumer.Completion);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static void Produce(ITargetBlock target)
{
    foreach (var n in YieldedNumbers())
    {
        // Normally, this will return immediately, but if the queue has
        // reached its limit then it will wait until the consumer has
        // processed items on the queue.
        Task.WaitAll(target.SendAsync(n));
    }
    // Set the target to the completed state to signal to the consumer
    // that no more data will be available.
    target.Complete();
}

I originally had the the Produce() method as an async/await method… But that didn’t work, it seems that doing that the iterator shifts around threads again because when the code wakes up after the await it may be restarted on a new thread. So I put it back to a simple Task.WaitAll() and it kept it all on the same thread.

The producer options are set so that the queue size is limited, it stops pulling from the producer if the queue reaches capacity and thus it keeps the app running smoothly. The producer won’t over produce.

The consumer options need to be set explicitly otherwise it acts on a single thread. Unlike other things in the TPL it won’t necessarily optimise for the number of cores you have, you have to specify that, and a crude rule of thumb for getting that number is Environment.ProcessorCount (crude, because if you have hyper threading it can treat that as being multiple processor cores). However, it is good enough unless you really need to optimise things accurately.

Now, a lot of this can be abstracted away so that the calling code can just get on with what it needs without the distractions that this pattern introduces.

Most of this code can be extracted out to a class that extends IEnumerable<T>

public static class IEnumerableExtensions
{
    public static void ConsumeInParallel<T>(this IEnumerable<T> source, Action<T> action, int queueLimit = int.MaxValue)
    {
        var producerOptions = new DataflowBlockOptions { BoundedCapacity = queueLimit };
        var buffer = new BufferBlock<T>(producerOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = Environment.ProcessorCount,
            MaxDegreeOfParallelism = Environment.ProcessorCount
        };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
        var consumer = new ActionBlock<T>(action, consumerOptions);
        buffer.LinkTo(consumer, linkOptions);
        Produce(source, buffer);
        Task.WaitAll(consumer.Completion);
    }

    private static void Produce<T>(IEnumerable<T> source, ITargetBlock<T> target)
    {
        foreach (var n in source)
            Task.WaitAll(target.SendAsync(n));
        target.Complete();
    }
}

With this, we can use any IEnumerator<T> as a source of data and it will happily process it. The queueLimit ensures that we don’t end up with too much data waiting to be processed as we don’t want memory pressures causing the app to become unstable.

The calling code now looks much neater:

public static void Main(string[] args)
{
    YieldedNumbers().ConsumeInParallel(n=> {Thread.Sleep(n);}, 97);

    Console.WriteLine("Done!");
    Console.ReadLine();
}
Parallelisation Talk Examples, Parallelization Talk Examples, Talk Examples

Parallelisation talk example – Independent Object Graphs

Parallelised code works best when data is not shared. This example shows a simple piece of parallel code where each task operates independently on its own object graph without dependencies on other objects outside its own graph.

Each iteration of the Parallel.ForEach statement operates on only one item in the List named initialObjectGraph. The operations do not make reference to any other objects in the list. The PopulateDetails method (which effectively operates on each item in the list) only uses and updates the information in just that one item.

The program outputs the initial object graph with just the very basic information that is input into it in the GetObjectGraph method. After the Parallel.ForEach has called PopulateDetails on each element in the list, the object graph is output once again

Example Code

class Program
{
    static void Main(string[] args)
    {
        List<HotelRoomAvailability> initialObjectGraph = GetObjectGraph();

        DisplayObjectGraph(initialObjectGraph);

        Parallel.ForEach(initialObjectGraph, item => PopulateDetails(item));

        DisplayObjectGraph(initialObjectGraph);

        Console.WriteLine("Program finished");
        Console.ReadLine();

    }

    private static void PopulateDetails(HotelRoomAvailability hotel)
    {
        Console.WriteLine("Populating details of {0}", hotel.HotelCode);
        hotel.Name = HotelRespository.GetHotelName(hotel.HotelCode);
        hotel.Rates = AvailabilityRespository.GetRateInformation(
            hotel.HotelCode, hotel.StayDate, hotel.NumberOfNights);
    }

    private static List<HotelRoomAvailability> GetObjectGraph()
    {
        List<HotelRoomAvailability> result = new List
            {
                new HotelRoomAvailability
                    {
                        StayDate = new DateTime(2011, 7, 1),
                        NumberOfNights = 3,
                        HotelCode = "LONSOHO"
                    },
                new HotelRoomAvailability
                    {
                        StayDate = new DateTime(2011, 7, 1),
                        NumberOfNights = 3,
                        HotelCode = "LONCOVGDN"
                    },
                new HotelRoomAvailability
                    {
                        StayDate = new DateTime(2011, 7, 1),
                        NumberOfNights = 3,
                        HotelCode = "LONLEISQR"
                    },
                new HotelRoomAvailability
                    {
                        StayDate = new DateTime(2011, 7, 1),
                        NumberOfNights = 3,
                        HotelCode = "LONHIGHOL"
                    },
                new HotelRoomAvailability
                    {
                        StayDate = new DateTime(2011, 7, 1),
                        NumberOfNights = 3,
                        HotelCode = "LONKINGSX"
                    },
                new HotelRoomAvailability
                    {
                        StayDate = new DateTime(2011, 7, 1),
                        NumberOfNights = 3,
                        HotelCode = "LONEUSTON"
                    }
            };

        return result;
    }

    private static void DisplayObjectGraph(
        IEnumerable initialObjectGraph)
    {
        foreach (HotelRoomAvailability availability in initialObjectGraph)
        {
            Console.WriteLine(availability);
            Console.WriteLine(new string('=', 80));
        }
    }
}

Note that not all code is being shown here. There are a number of classes outside the Program class that operate on the data simulated calls to services or other calculations on the data. If you want to see the full example please download the full code sample using the link at the bottom of this post.

Example output

LONSOHO :
Staying on 01/Jul/2011 for 3 nights

================================================================================

LONCOVGDN :
Staying on 01/Jul/2011 for 3 nights

================================================================================

LONLEISQR :
Staying on 01/Jul/2011 for 3 nights

================================================================================

LONHIGHOL :
Staying on 01/Jul/2011 for 3 nights

================================================================================

LONKINGSX :
Staying on 01/Jul/2011 for 3 nights

================================================================================

LONEUSTON :
Staying on 01/Jul/2011 for 3 nights

================================================================================

Populating details of LONSOHO
Populating details of LONCOVGDN
Populating details of LONLEISQR
Populating details of LONHIGHOL
Populating details of LONKINGSX
Populating details of LONEUSTON
LONSOHO : London Soho
Staying on 01/Jul/2011 for 3 nights
Available Rates:
Rate: One Month Advanced Rate Room Only
01/Jul/2011: £48.40
02/Jul/2011: £52.80
03/Jul/2011: £44.00
Total Price: £145.20
----------------------------------------
Rate: One Month Advanced Rate Bed and Breakfast
01/Jul/2011: £58.40
02/Jul/2011: £62.80
03/Jul/2011: £54.00
Total Price: £175.20
----------------------------------------
Rate: One Month Advanced Rate Dinner, Bed and Breakfast
01/Jul/2011: £83.40
02/Jul/2011: £87.80
03/Jul/2011: £79.00
Total Price: £250.20
----------------------------------------
Rate: Two Week Advances Rate Room Only
01/Jul/2011: £54.45
02/Jul/2011: £59.40
03/Jul/2011: £49.50
Total Price: £163.35
----------------------------------------
Rate: Two Week Advances Rate Bed and Breakfast
01/Jul/2011: £64.45
02/Jul/2011: £69.40
03/Jul/2011: £59.50
Total Price: £193.35
----------------------------------------
Rate: Two Week Advances Rate Dinner, Bed and Breakfast
01/Jul/2011: £89.45
02/Jul/2011: £94.40
03/Jul/2011: £84.50
Total Price: £268.35
----------------------------------------
Rate: Fully Flexible Rate Room Only
01/Jul/2011: £60.50
02/Jul/2011: £66.00
03/Jul/2011: £55.00
Total Price: £181.50
----------------------------------------
Rate: Fully Flexible Rate Bed and Breakfast
01/Jul/2011: £70.50
02/Jul/2011: £76.00
03/Jul/2011: £65.00
Total Price: £211.50
----------------------------------------
Rate: Fully Flexible Rate Dinner, Bed and Breakfast
01/Jul/2011: £95.50
02/Jul/2011: £101.00
03/Jul/2011: £90.00
Total Price: £286.50
----------------------------------------

================================================================================

LONCOVGDN : London Covent Garden
Staying on 01/Jul/2011 for 3 nights
Available Rates:
Rate: One Month Advanced Rate Room Only
01/Jul/2011: £59.84
02/Jul/2011: £65.28
03/Jul/2011: £54.40
Total Price: £179.52
----------------------------------------
Rate: One Month Advanced Rate Bed and Breakfast
01/Jul/2011: £69.84
02/Jul/2011: £75.28
03/Jul/2011: £64.40
Total Price: £209.52
----------------------------------------
Rate: One Month Advanced Rate Dinner, Bed and Breakfast
01/Jul/2011: £94.84
02/Jul/2011: £100.28
03/Jul/2011: £89.40
Total Price: £284.52
----------------------------------------
Rate: Two Week Advances Rate Room Only
01/Jul/2011: £67.32
02/Jul/2011: £73.44
03/Jul/2011: £61.20
Total Price: £201.96
----------------------------------------
Rate: Two Week Advances Rate Bed and Breakfast
01/Jul/2011: £77.32
02/Jul/2011: £83.44
03/Jul/2011: £71.20
Total Price: £231.96
----------------------------------------
Rate: Two Week Advances Rate Dinner, Bed and Breakfast
01/Jul/2011: £102.32
02/Jul/2011: £108.44
03/Jul/2011: £96.20
Total Price: £306.96
----------------------------------------
Rate: Fully Flexible Rate Room Only
01/Jul/2011: £74.80
02/Jul/2011: £81.60
03/Jul/2011: £68.00
Total Price: £224.40
----------------------------------------
Rate: Fully Flexible Rate Bed and Breakfast
01/Jul/2011: £84.80
02/Jul/2011: £91.60
03/Jul/2011: £78.00
Total Price: £254.40
----------------------------------------
Rate: Fully Flexible Rate Dinner, Bed and Breakfast
01/Jul/2011: £109.80
02/Jul/2011: £116.60
03/Jul/2011: £103.00
Total Price: £329.40
----------------------------------------

================================================================================

LONLEISQR : London Leicester Square
Staying on 01/Jul/2011 for 3 nights
Available Rates:
Rate: One Month Advanced Rate Room Only
01/Jul/2011: £61.60
02/Jul/2011: £67.20
03/Jul/2011: £56.00
Total Price: £184.80
----------------------------------------
Rate: One Month Advanced Rate Bed and Breakfast
01/Jul/2011: £71.60
02/Jul/2011: £77.20
03/Jul/2011: £66.00
Total Price: £214.80
----------------------------------------
Rate: One Month Advanced Rate Dinner, Bed and Breakfast
01/Jul/2011: £96.60
02/Jul/2011: £102.20
03/Jul/2011: £91.00
Total Price: £289.80
----------------------------------------
Rate: Two Week Advances Rate Room Only
01/Jul/2011: £69.30
02/Jul/2011: £75.60
03/Jul/2011: £63.00
Total Price: £207.90
----------------------------------------
Rate: Two Week Advances Rate Bed and Breakfast
01/Jul/2011: £79.30
02/Jul/2011: £85.60
03/Jul/2011: £73.00
Total Price: £237.90
----------------------------------------
Rate: Two Week Advances Rate Dinner, Bed and Breakfast
01/Jul/2011: £104.30
02/Jul/2011: £110.60
03/Jul/2011: £98.00
Total Price: £312.90
----------------------------------------
Rate: Fully Flexible Rate Room Only
01/Jul/2011: £77.00
02/Jul/2011: £84.00
03/Jul/2011: £70.00
Total Price: £231.00
----------------------------------------
Rate: Fully Flexible Rate Bed and Breakfast
01/Jul/2011: £87.00
02/Jul/2011: £94.00
03/Jul/2011: £80.00
Total Price: £261.00
----------------------------------------
Rate: Fully Flexible Rate Dinner, Bed and Breakfast
01/Jul/2011: £112.00
02/Jul/2011: £119.00
03/Jul/2011: £105.00
Total Price: £336.00
----------------------------------------

================================================================================

LONHIGHOL : London High Holborn
Staying on 01/Jul/2011 for 3 nights
Available Rates:
Rate: One Month Advanced Rate Room Only
01/Jul/2011: £59.84
02/Jul/2011: £65.28
03/Jul/2011: £54.40
Total Price: £179.52
----------------------------------------
Rate: One Month Advanced Rate Bed and Breakfast
01/Jul/2011: £69.84
02/Jul/2011: £75.28
03/Jul/2011: £64.40
Total Price: £209.52
----------------------------------------
Rate: One Month Advanced Rate Dinner, Bed and Breakfast
01/Jul/2011: £94.84
02/Jul/2011: £100.28
03/Jul/2011: £89.40
Total Price: £284.52
----------------------------------------
Rate: Two Week Advances Rate Room Only
01/Jul/2011: £67.32
02/Jul/2011: £73.44
03/Jul/2011: £61.20
Total Price: £201.96
----------------------------------------
Rate: Two Week Advances Rate Bed and Breakfast
01/Jul/2011: £77.32
02/Jul/2011: £83.44
03/Jul/2011: £71.20
Total Price: £231.96
----------------------------------------
Rate: Two Week Advances Rate Dinner, Bed and Breakfast
01/Jul/2011: £102.32
02/Jul/2011: £108.44
03/Jul/2011: £96.20
Total Price: £306.96
----------------------------------------
Rate: Fully Flexible Rate Room Only
01/Jul/2011: £74.80
02/Jul/2011: £81.60
03/Jul/2011: £68.00
Total Price: £224.40
----------------------------------------
Rate: Fully Flexible Rate Bed and Breakfast
01/Jul/2011: £84.80
02/Jul/2011: £91.60
03/Jul/2011: £78.00
Total Price: £254.40
----------------------------------------
Rate: Fully Flexible Rate Dinner, Bed and Breakfast
01/Jul/2011: £109.80
02/Jul/2011: £116.60
03/Jul/2011: £103.00
Total Price: £329.40
----------------------------------------

================================================================================

LONKINGSX : London Kings Cross
Staying on 01/Jul/2011 for 3 nights
Available Rates:
Rate: One Month Advanced Rate Room Only
01/Jul/2011: £61.60
02/Jul/2011: £67.20
03/Jul/2011: £56.00
Total Price: £184.80
----------------------------------------
Rate: One Month Advanced Rate Bed and Breakfast
01/Jul/2011: £71.60
02/Jul/2011: £77.20
03/Jul/2011: £66.00
Total Price: £214.80
----------------------------------------
Rate: One Month Advanced Rate Dinner, Bed and Breakfast
01/Jul/2011: £96.60
02/Jul/2011: £102.20
03/Jul/2011: £91.00
Total Price: £289.80
----------------------------------------
Rate: Two Week Advances Rate Room Only
01/Jul/2011: £69.30
02/Jul/2011: £75.60
03/Jul/2011: £63.00
Total Price: £207.90
----------------------------------------
Rate: Two Week Advances Rate Bed and Breakfast
01/Jul/2011: £79.30
02/Jul/2011: £85.60
03/Jul/2011: £73.00
Total Price: £237.90
----------------------------------------
Rate: Two Week Advances Rate Dinner, Bed and Breakfast
01/Jul/2011: £104.30
02/Jul/2011: £110.60
03/Jul/2011: £98.00
Total Price: £312.90
----------------------------------------
Rate: Fully Flexible Rate Room Only
01/Jul/2011: £77.00
02/Jul/2011: £84.00
03/Jul/2011: £70.00
Total Price: £231.00
----------------------------------------
Rate: Fully Flexible Rate Bed and Breakfast
01/Jul/2011: £87.00
02/Jul/2011: £94.00
03/Jul/2011: £80.00
Total Price: £261.00
----------------------------------------
Rate: Fully Flexible Rate Dinner, Bed and Breakfast
01/Jul/2011: £112.00
02/Jul/2011: £119.00
03/Jul/2011: £105.00
Total Price: £336.00
----------------------------------------

================================================================================

LONEUSTON : London Euston
Staying on 01/Jul/2011 for 3 nights
Available Rates:
Rate: One Month Advanced Rate Room Only
01/Jul/2011: £62.48
02/Jul/2011: £68.16
03/Jul/2011: £56.80
Total Price: £187.44
----------------------------------------
Rate: One Month Advanced Rate Bed and Breakfast
01/Jul/2011: £72.48
02/Jul/2011: £78.16
03/Jul/2011: £66.80
Total Price: £217.44
----------------------------------------
Rate: One Month Advanced Rate Dinner, Bed and Breakfast
01/Jul/2011: £97.48
02/Jul/2011: £103.16
03/Jul/2011: £91.80
Total Price: £292.44
----------------------------------------
Rate: Two Week Advances Rate Room Only
01/Jul/2011: £70.29
02/Jul/2011: £76.68
03/Jul/2011: £63.90
Total Price: £210.87
----------------------------------------
Rate: Two Week Advances Rate Bed and Breakfast
01/Jul/2011: £80.29
02/Jul/2011: £86.68
03/Jul/2011: £73.90
Total Price: £240.87
----------------------------------------
Rate: Two Week Advances Rate Dinner, Bed and Breakfast
01/Jul/2011: £105.29
02/Jul/2011: £111.68
03/Jul/2011: £98.90
Total Price: £315.87
----------------------------------------
Rate: Fully Flexible Rate Room Only
01/Jul/2011: £78.10
02/Jul/2011: £85.20
03/Jul/2011: £71.00
Total Price: £234.30
----------------------------------------
Rate: Fully Flexible Rate Bed and Breakfast
01/Jul/2011: £88.10
02/Jul/2011: £95.20
03/Jul/2011: £81.00
Total Price: £264.30
----------------------------------------
Rate: Fully Flexible Rate Dinner, Bed and Breakfast
01/Jul/2011: £113.10
02/Jul/2011: £120.20
03/Jul/2011: £106.00
Total Price: £339.30
----------------------------------------

================================================================================

Program finished

More information

Parallelisation Talk Examples, Parallelization Talk Examples, Talk Examples

Parallelisation Talk Examples – Parallel.ForEach

These are some code examples from my introductory talk on Parallelisation. Showing the difference between a standard sequential foreach loop and its parallel equivalent.

Code example 1: Serial processing of a foreach loop

class Program
{
    private static Random rnd = new Random();

    static void Main(string[] args)
    {
        DateTime start = DateTime.UtcNow;

        IEnumerable items = Enumerable.Range(0,20);

        foreach(int item in items)
            ProcessLoop(item);

        DateTime end = DateTime.UtcNow;
        TimeSpan duration = end - start;

        Console.WriteLine("Finished. Took {0}", duration);

        Console.ReadLine();
    }

    private static void ProcessLoop(int item)
    {
        Console.WriteLine("Processing item {0}", item);

        // Simulate similar but slightly variable length processing
        int pause = rnd.Next(900, 1100);
        Thread.Sleep(pause);
    }
}

The output of the above code may look something like this:

Sequential foreach Example

As you can see this takes roughly of 20 seconds to process 20 items with each item taking about one second to process.

Code Example 2: Parallel processing of a foreach loop

The Parallel class can be found in the System.Threading.Tasks namespace.

class Program
{
    private static Random rnd = new Random();

    static void Main(string[] args)
    {
        DateTime start = DateTime.UtcNow;

        IEnumerable items = Enumerable.Range(0,20);

        Parallel.ForEach(items,
            (item) => ProcessLoop(item));

        DateTime end = DateTime.UtcNow;
        TimeSpan duration = end - start;

        Console.WriteLine("Finished. Took {0}", duration);

        Console.ReadLine();
    }

    private static void ProcessLoop(int item)
    {
        Console.WriteLine("Processing item {0}", item);

        // Simulate similar but slightly variable length processing
        int pause = rnd.Next(900, 1100);
        Thread.Sleep(pause);
    }
}

The output of the above code may look something like this:

Parallel.ForEach Example

The result of this code is that it takes roughly 5 second to process the 20 items. I have a 4 core processor so it would be in line with the expectation that the work is distributed across all 4 cores.

Software Development

Tasks that create more work

I’m creating a program that parses a web page then follows the links and then parses the next set of web pages to eventually build a picture of an entire site. This means that as the program runs more work is being generated and more tasks can be launched to process each new page as it is discovered.

My original solution was simply to create code like this:

     1:  private void ProcessLink(string link)
     2:  {
     3:      var page = GetPageInformation(link);
     4:      var newLinks = GetNewLinks(page);
     5:   
     6:      foreach(var newLink in newLinks)
     7:      {
     8:          Action action = () => {  ProcessLink(newLink); };
     9:          Task.Factory.StartNew(action, TaskCreationOptions.AttachedToParent);
    10:      }
    11:  }

The premise is simple enough, build a list of new links from a page then for each of the new links start a new task. The new task is attached to the parent task (the task that is launching the new set of tasks)

However, it soon became apparent that this was quickly getting out of control and I had no idea what was still waiting to be processed, or that the same link was being queue up multiple times in many different threads and so on. I ended up putting in place so many mechanisms to prevent the code processing the same page over again in different threads that it was getting silly. For a small number of new tasks being launched, I’m sure that Task.Factory.StartNew() is perfectly suitable.

I eventually realised that I was heading down the wrong way and I needed to rethink my strategy altogether. I wanted to make the code parallelisable so that while I was waiting on one page I could be parsing and processing another page. So, I eventually refactored it to this:

   1:  public class SiteScraper
     2:  {
     3:      private ConcurrentDictionary<string, ScraperResults> completedWork = 
     4:          new ConcurrentDictionary<string, ScraperResults>();
     5:   
     6:      private List<string> currentWork;
     7:   
     8:      private ConcurrentQueue<string> futureWorkQueue = 
     9:          new ConcurrentQueue<string>();
    10:   
    11:      public void GetSiteInformation(string startingUrl)
    12:      {
    13:          currentWork = new List<string();
    14:          currentWork.Add(startingUrl.ToLowerInvariant());
    15:   
    16:          while(currentWork.Any())
    17:          {
    18:              Parallel.ForEach(currentWorkQueue, item => GetPageInformation(item));
    19:              BuildWorkQueue();
    20:          }
    21:      }
    22:   
    23:      private void BuildWorkQueue()
    24:      {
    25:          currentWork = new List<string>(futureWorkQueue
    26:              .Select(link => link.ToLowerInvariant()).Distinct()
    27:              .Where(link => IsLinkToBeProcessed(link)));
    28:   
    29:          futureWorkQueue = new ConcurrentQueue<string>();
    30:      }
    31:   
    32:      private void GetPageInformation(string url)
    33:      {
    34:          // Do stuff
    35:          ProcessNewLinks(newLinks)
    36:      }
    37:   
    38:      private void ProcessNewLinks(IEnumerable<string> newLinks)
    39:      {
    40:          foreach (string url in newLinks.Where(l => IsLinkToBeProcessed(l)))
    41:          {
    42:              futureWorkQueue.Enqueue(url);
    43:          }
    44:      }
    45:   
    46:   
    47:      // Other bits
    48:   
    49:  }

There is still some code to ensure duplicates are removed and not processed, but it become much easier to debug and know what has been processed and what is still to be processed than it was before.

The method GetSiteInformation (lines 11-21) handles the main part of the parallelisation. This is the key to this particular algorithm.

Before discussing what that does, I just want to explain the three collections set up as fields on the class (lines 3 to 9). The completedWork is a dictionary keyed on the url containing an object graph representing the bits of the page we are interested in. The currentWork (line 6) is a list of the current urls that are being processed. Finally, the futureWorkQueue contains a queue of all the new links that are discovered, which will feed into the next iteration.

The GetSiteInformation class creates the initial list of currentWork and processes it using Parallel.ForEach (line 18). On the first iteration only one item will be processed, but it should result in many new links to be processed. A call to BuildWorkQueue builds the new work queue for the next iteration which is controlled by the while loop (lines 16-20). When BuildWorkQueue creates no new items for the workQueue then the work is complete and the while loop exits.

BuildWorkQueue is called when all the existing work is completed. It then builds the new set of urls to be processed. The futureWorkQueue is the collection that was populated when the links get processed (see later). All the links are forced into lower case (while this may not be advisable for all websites, for my case it is sufficient), only distinct elements are processed as the futureWorkQueue could quite easily have been filled with duplicates and finally a check is made to ensure that the link has not already been processed (lines 25-27).

During the processing of a specific URL (lines 32-36 – mostly not shown) new links may be generated. Each of these will be be added to the futureWorkQueue (lines 40-43). Before enqueuing any link a check is made to ensure it has not already been processed.

There are other bits of the class that are not shown. For example the IsLinkToBeProcessed method (which checks the domain, whether it has been processed already and so on) and the code that populates the completedWork.

In this version of the code it is much easier to see what has been completed and what is still to do (or at least, what has been found to do).