Software Development

Paramore Brighter: The Russian Doll Model

I’ve mentioned a bit about the attributing the handler and the step and timing parameters, but I’ve not explained them properly in previous posts (“Retrying commands” mentions steps, and “Don’t Repeat Yourself” also mentions timings). So, I’ve created a small project to demonstrate what they mean and how it all operates.

The code for this post is available on GitHub.

If you just have the target handler, that is the handler that is directly tied to the Command that got Sent, without any decorations, then we won’t have to worry about the Russian Doll Model. There is only one handler, and it goes directly there. However, as soon as you start decorating your handler with other handlers it comes in to effect.

Timing

As the name suggests this affects when the decorated handler will run. Either before or after the target handler. However, handlers set to run “before” also get an opportunity to do things afterwards as well due to the Russian Doll model, as we’ll see.

The Before handler wraps the target handler, and the target handler wraps the After handler. At the very centre is the inner most After handler. Like this:

Russian Doll Model with Before and After handlers
Russian Doll Model with Before and After handlers

The red arrows in the diagram show the flow of the code. So, for a handler with a before and after decoration, the code will execute in the following order:

  • The “Before” timing Handle method
  • The Target Handle method
  • The “After” timing Handle method
  • The Target Handle method continued (after any call to the base.Handle())
  • The “Before” timing Handle method continued (after any call to the base.Handle())

Obviously, you do not have to call the base.Handler from your handler, but if you do that you break the Russian Doll Model, subsequent steps will not be called. Throwing an exception also will not call subsequent steps. According to Ian Cooper, the originator of the Paramore Brighter framework, “An exception is the preferred mechanism to exit a pipeline”.

Steps

If you have multiple decorators with the same timing, it may be important to let the framework know in which order to run them.

For Before handlers the steps ascend, so step 1, followed by step 2, followed by step 3, etc. For After handlers the steps descend, so step 3, followed by step 2, followed by step 1.

Russian Doll Model 7 Layers
7 Layer Russian Doll Model (3 Before, Target, and 3 After)

The red arrows in the diagram show the flow of the code. So, for a handler with three before and after decorations, the code will execute in the following order:

  • Step 1 for the “Before” timing Handle method
  • Step 2 for the “Before” timing Handle method
  • Step 3 for the “Before” timing Handle method
  • The Target Handle method
  • Step 3 for the “After” timing Handle method
  • Step 2 for the “After” timing Handle method
  • Step 1 for the “After” timing Handle method
  • Step 2 for the “After” timing Handle method continued (after any call to the base.Handle())
  • Step 3 for the “After” timing Handle method continued (after any call to the base.Handle())
  • The Target Handle method continued (after any call to the base.Handle())
  • Step 3 for the “Before” timing Handle method continued (after any call to the base.Handle())
  • Step 2 for the “Before” timing Handle method continued (after any call to the base.Handle())
  • Step 1 for the “Before” timing Handle method continued (after any call to the base.Handle())

Base Handler classes

You can, of course, create a class between RequestHandler and your own target handler class and this adds its own complexity to the model.

Any handler attributes added to the base class will be added to the pipeline and those handlers will be run for the time, and step they specify. Also, remember that the base class has its own Handle method which can have code before and and after the call to the base class’s implementation.

This can be seen in the sample project on GitHub, which you can download and experiment with to see how the code is executed.

Software Development

Paramore Brighter: DRY with Custom Decorated Command Handlers

You may wish to add similar functionality to many (or all) command handlers. The typical example is logging. You can decorate a command handler in a similar way to the policies I showed in previous posts to add common functionality. I’ve used this technique to guard the handler from invalid command arguments/parameters (essentially a validator), and for ensuring that we ping our APM (Application Performance Management) tool when a command completes. I’ll use the latter to demonstrate creating a custom decorator and handler to initiate this common code.

Paramore Brighter Command Processor will look for any attributes derived from RequestHandlerAttribute that are added to the Handle method on your command handler class. It will then use them to build a pipeline for your command.

So, in the example here, our attribute class looks like this:

public class HeartbeatAttribute : RequestHandlerAttribute
{
    public HeartbeatAttribute(int step, HandlerTiming timing = HandlerTiming.After) : base(step, timing)
    {
    }

    public override Type GetHandlerType()
    {
        return typeof(HeartbeatHandler<>);
    }
}

We are deriving from RequestHandlerAttribute, and it has an abstract method that you need to implement. GetHandlerType() returns the type of handler that needs to be instantiated to handle the common task.

The RequestHandlerAttribute class also takes two arguments for its constructor that you either need to capture from users of your attribute or supply yourself. It takes a step and a timing parameter. Since we’ve already talked about step in a previous post we’ll move on to talking about timing.

The two options for timing are Before and After. In the previous examples the timing has been implicitly set to Before because the handler needed perform actions before your target handler (the one that you decorated). If you set the timing to After it only actions after your target handler.

In the example here, the timing is set After because we want to make sure that the the handler completed correctly before our handler runs. So, if it throws an exception then our heartbeat handler won’t run. If you need to perform an action before and after, then set the timing to Before, and perform actions before the call to base.Handle() and after the call.

Our heartbeat handler looks like this:

public class HeartbeatHandler<TRequest> : RequestHandler<TRequest> where TRequest : class, IRequest
{
    public override TRequest Handle(TRequest command)
    {
        // We would probably call a heartbeat service at this point.
        // But for demonstration we'll just write to the console.

        Console.WriteLine($"Heartbeat pulsed for {command.GetType().FullName}");
        string jsonString = JsonConvert.SerializeObject(command);
        Console.WriteLine(jsonString);

        return base.Handle(command);
    }
}

The important thing, as will all handlers, is to remember the call to the base.Handle() which ensures the pipeline is continued.

The target handler decoration looks like this:

[FallbackPolicy(step:1, backstop:true, circuitBreaker:false)]
[UsePolicy(policy: "GreetingRetryPolicy", step:2)]
[Heartbeat(step:3)]
public override SalutationCommand Handle(SalutationCommand command)
{
    // Stuff to handle the command.

    return base.Handle(command);
}

The first two decorators are from previous posts (Retrying Commands and Implementing a fallback exception handler) while the third is our new decorator.

When run, you can see that if the service fails completely (i.e. all the retries failed) then the Heartbeat does not get run. However, if the command succeeds then the heartbeat handler is run. Our APM knows the command succeeded and can display that.

Remember

Remember to wire up the handler, as with all handlers, to your dependency injection framework, so that it can be correctly instantiated:

serviceCollection.AddScoped(typeof(HeartbeatHandler<>));
Software Development

Paramore Brighter with Quality of Service: Retrying Commands

Paramore Brighter supports Policies to maintain quality of service. This is useful when your command makes calls to external services, whether they are databases, web services, or any other end point that exists out of the process of your application. You can set up retry policies, circuit-breaker policies, and timeout policies. For this post, we’ll concentrate on setting up a retry policy.

The full code from this post is available on GitHub.com.

The SalutationHandler that we’ve been using in previous posts now emulates an external failure by throwing an exception in some cases. The policy handler will catch the exception and act on it, retrying the command if necessary.

Set up the policy

First off let’s set up the policy. In this case I’m going for an exponential backoff (doubling the wait time on each attempt) and it will perform a maximum of 4 attempts.

private static IAmAPolicyRegistry GetPolicies()
{
    var policyRegistry = new PolicyRegistry();

    // These are the default policies that must exist. 
    // We're not using them, so we're setting them to No-op
    policyRegistry.Add(CommandProcessor.RETRYPOLICY, Policy.NoOp());
    policyRegistry.Add(CommandProcessor.RETRYPOLICYASYNC, Policy.NoOpAsync());
    policyRegistry.Add(CommandProcessor.CIRCUITBREAKER, Policy.NoOp());
    policyRegistry.Add(CommandProcessor.CIRCUITBREAKERASYNC, Policy.NoOpAsync());
    
    // Sets up the policy that we're going to use 
    // for the SaluationHandler
    var greetingRetryPolicy = Policy
        .Handle<Exception>()
        .WaitAndRetry(new[]
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(2), 
            TimeSpan.FromSeconds(4) 
        }, (exception, timeSpan) =>
        {
            Console.WriteLine($" ** An error occurred: {exception.Message}");
            Console.WriteLine($" ** Waiting {timeSpan.Seconds} seconds until retry.");
        });

    policyRegistry.Add("GreetingRetryPolicy", greetingRetryPolicy);
    return policyRegistry;
}

The policies are defined using Polly, a .NET resilience and transient-fault-handling library.

The .Handle<Exception>() means the policy handles all exceptions. You might want it to be more specific for your use case. e.g. SqlException for database errors.

The WaitAndRetry(...) takes a set of timings (as TimeSpan objects) for how long to wait between attempts and an Action which is run between attempts. Although there are only 3 times here, it will make 4 attempts. Each time represents the amount of time after an attempt before retrying. The first attempt is performed immediately.

The Action allows you to set up what you want to do between attempts. In this case, I’ve only had it output to the console. You may wish to log the error, or take other actions that might help it work.

Finally, we add the policy to the registry and give it a name, so we can refer to it on our Handle method in our command handler class.

In order for Brighter to be able to use this policy, the Handler for it needs to be registered in the IoC container.

serviceCollection.AddScoped(typeof(ExceptionPolicyHandler<>));

The Command Handler

It should be noted that the regardless of the number retries that are made, they are all processed through the same instance of the command handler. This may be important if you store state to do with the progress of the command. It also might be important in case any services you rely on that are injected into the command handler get left in an undefined state if things go wrong.

[FallbackPolicy(step:1, backstop:true, circuitBreaker:false)]
[UsePolicy(policy: "GreetingRetryPolicy", step:2)]
public override SalutationCommand Handle(SalutationCommand command)
{
    ...
}

We still have our fallback that we set up in the previous post on Paramore Brighter, but we now have a UsePolicy attribute. And since we have two attributes the Step argument now becomes important.

The command processor sets up the policy and command handlers like a Russian doll, with the command handler right in the middle. The outer handler (doll) is step 1, then the one inside that is step 2, and so on until you get to the actual command handler. So, in this case at the very outside is the FallbackPolicy and it only does its thing if it gets an exception, the UsePolicy will act on exceptions before the fallback sees them most of the time.

The UsePolicy attribute takes the name of the policy that we set up earlier when we were creating the policy registry.

Analysing the StackTrace

So, when we ask to greet “Voldemort” it will always fail. We get a stack trace that shows off the Russian Doll quite well.

System.ApplicationException: A death-eater has appeared.
   at QualityOfService.SalutationHandler.ThrowOnTheDarkLord(SalutationCommand command) in C:\dev\BrighterRecipes\src\quality-of-service\quality-of-service\SalutationHandler.cs:line 46
   at QualityOfService.SalutationHandler.Handle(SalutationCommand command) in C:\dev\BrighterRecipes\src\quality-of-service\quality-of-service\SalutationHandler.cs:line 21
   at Paramore.Brighter.RequestHandler`1.Handle(TRequest command)

The above is our SaulatationHandler, starting from the top where the exception is thrown, until the point that our code is called by Paramore Brighter itself.

   at Paramore.Brighter.Policies.Handlers.ExceptionPolicyHandler`1.<>n__0(TRequest command)
   at Paramore.Brighter.Policies.Handlers.ExceptionPolicyHandler`1.<>c__DisplayClass2_0.b__0()
   at Polly.Policy.<>c__DisplayClass33_0`1.b__0(Context ctx, CancellationToken ct)
   at Polly.Policy.<>c__DisplayClass42_0`1.b__0(Context ctx, CancellationToken ct)
   at Polly.RetrySyntax.<>c__DisplayClass19_0.b__1(Context ctx, CancellationToken ct)
   at Polly.Retry.RetryEngine.Implementation[TResult](Func`3 action, Context context, CancellationToken cancellationToken, IEnumerable`1 shouldRetryExceptionPredicates, IEnumerable`1 shouldRetryResultPredicates, Func`1 policyStateFactory)
   at Polly.RetrySyntax.<>c__DisplayClass19_1.b__0(Action`2 action, Context context, CancellationToken cancellationToken)
   at Polly.Policy.Execute[TResult](Func`3 action, Context context, CancellationToken cancellationToken)
   at Polly.Policy.Execute[TResult](Func`1 action)
   at Paramore.Brighter.Policies.Handlers.ExceptionPolicyHandler`1.Handle(TRequest command)
   at Paramore.Brighter.RequestHandler`1.Handle(TRequest command)

The above section is all part of the retry handler, as defined by the policy we set up. Most of this code is in Polly, which is the quality of service package that Brighter uses.

   at Paramore.Brighter.Policies.Handlers.FallbackPolicyHandler`1.CatchAll(TRequest command)
// The rest of this isn't really part of the exception 
// stack trace, but I wanted to show you where it came from.
   at Paramore.Brighter.Policies.Handlers.FallbackPolicyHandler`1.Handle(TRequest command)
   at Paramore.Brighter.CommandProcessor.Send[T](T command)
   at QualityOfService.Program.Main(String[] args)

Finally, the most outer of the handlers (which you cannot normally see all of because it has caught the exception in CatchAll) before handing it off to our fallback handler.

Software Development

Paramore Brighter: Implementing a fallback exception handler

So far in this series, we have a basic command processor set up and able to dispose of resources. But what happens when things go wrong? The command processor has a fallback mechanism to handle exceptions that are not caught.

To add this functionality all you need to do is to decorate your handler with a fallback policy attribute, add the Fallback handler into your Dependency Injection framework, and then override the Fallback method.

To add the Fallback handler to .NET Core’s Dependency Injection framework we add

serviceCollection.AddScoped(typeof(FallbackPolicyHandler<>));

to the BuildServiceProvider method. The typeof variant of AddScoped allows a more general type to be expressed. Otherwise, we’d have to add a fallback policy handler for each command.

Our little salutation handler now looks like this:

[FallbackPolicy(backstop:true, circuitBreaker:false, step:1)]
public override SalutationCommand Handle(SalutationCommand command)
{
    Console.WriteLine($"Greetings, {command.Name}.");
    ThrowOnTheDarkLord(command);
    return base.Handle(command);
}

(If you’ve not read Harry Potter, the reference is that if you use He-who-must-not-be-named’s name, then a death eater will appear and take you away. So if we use The Dark Lord’s name we’re throwing an exception)

Back to the code: The first line is the attribute decoration. In this case we say we have a fallback policy that acts as a backstop for any unhandled exception (backstop:true). We’ve not covered the Circuit Breaker so we’re not interested in that for the moment (circuitBreaker:false), and we’ve also not covered what happens if you have multiple attributes (step:1) so we’ll leave that as step 1 (of 1). I’ll come back to those things later.

Now, if we put “Voldemort” in as the Name in the command, it will throw an exception. So we have to handle that somehow. The RequestHandler class has a Fallback method which you can override in your derived handler class.

public override SalutationCommand Fallback(SalutationCommand command)
{
    if (this.Context.Bag
            .ContainsKey(FallbackPolicyHandler<SalutationCommand>
                         .CAUSE_OF_FALLBACK_EXCEPTION))
    {
        Exception exception = (Exception)this.Context
                              .Bag[FallbackPolicyHandler
                                   .CAUSE_OF_FALLBACK_EXCEPTION];
        Console.WriteLine(exception);
    }
    return base.Fallback(command);
}

What is happening here is that we are retrieving the Exception from the Context‘s Bag, which is just a Dictionary. Then we can do what we want with the Exception. In this simple example, I’m just writing it to the console, but you’ll most likely want to do something more with it in your application.

As you can see, this is a bit on the clunky side, so where I’ve used Brighter before, I’ve tended to introduce a class between RequestHandler and the specific handler to put in some things that help clean things up.

In this case the MyRequestHandler class looks like this:

public class MyRequestHandler<TCommand> 
             : RequestHandler<TCommand> where TCommand : class, IRequest
{
    public override TCommand Fallback(TCommand command)
    {
        if (this.Context.Bag
            .ContainsKey(FallbackPolicyHandler
                .CAUSE_OF_FALLBACK_EXCEPTION))
        {
            Exception exception = (Exception)this.Context
                .Bag[FallbackPolicyHandler
                    .CAUSE_OF_FALLBACK_EXCEPTION];
            return base.Fallback(ExceptionFallback(command, exception));
        }
        return base.Fallback(command);
    }

    public virtual TCommand ExceptionFallback(TCommand command, Exception exception)
    {
        // If exceptions need to be handled, 
        // this should be implemented in a derived class
        return command;
    }
}

At the top we can see that instead of a specific command we still have the generic TCommand, which needs to be a class and derived from IRequest. That wasn’t seen in the specific command handler because the explicit command already has these properties, so we didn’t need to express them again.

The Fallback method now contains the code that extracts the exception from the Context and calls ExceptionFallback. In this class ExceptionFallback does nothing except return the command back. When we implement it in our SalutationHandler, the code for handling the exception now looks like this:

public override SalutationCommand ExceptionFallback(SalutationCommand command, Exception exception)
{
    Console.WriteLine(exception);
    return base.ExceptionFallback(command, exception);
}

And that is so much nicer to read. We’ve extracted away the plumbing of retrieving the exception to the newly introduced base class and our command handler looks much neater as a result.

To view the source as a whole, see it on GitHub.

Open Source Software

Round Robin class

We recently had need of a round robin functionality and since there is no round robin class built into .NET I needed to build my own class.

It is a fairly simple algorithm, each call returns the next item in the sequence. When the end of the sequence is reached go back to the beginning and start over.

In our case, we also needed it to be thread safe as we were calling it from tasks that are running in parallel.

using System;
using System.Collections.Generic;
using System.Linq;

namespace Xander.RoundRobin
{
    public class RoundRobin<T>
    {
        private readonly T[] _items;
        private readonly object _syncLock = new object();

        private int _currentIndex = -1;

        public RoundRobin(IEnumerable<T> sequence)
        {
            _items = sequence.ToArray();
            if (_items.Length == 0)
                throw new ArgumentException("Sequence contains no elements.", nameof(sequence));
        }

        public T GetNextItem()
        {
            lock (this._syncLock)
            {
                _currentIndex++;
                if (_currentIndex >= _items.Length)
                    _currentIndex = 0;
                return _items[_currentIndex];
            }
        }
    }
}

To use the class you can create it like this:

var rr = new RoundRobin<int>(items);

(Replacing int with the type you need)

And to retrieve the next item in the sequence, call

var item = rr.GetNextItem();

I’ve got a few ideas for features to add as well, so I’ve put this code on GitHub and I’ll be creating a NuGet package when I’ve got the time.

Software Development

But why is the iterator operating in multiple-threads

Background

Recently, I had a bit of a problem with NHibernate when I was converting some code into parallel tasks. (If you have no interest in NHibernate, then don’t worry – it is just background to the issue I was having when I spotted this gulf between my expectation and reality. NHibernate is incidental to this and I won’t mention it much beyond this paragraph.) It turns out that Parallel.ForEach runs the iterator in multiple threads, not just the the action it performs on each item received from the iterator. NHibernate, being the source of the data was running inside the iterator and when I attached NHibernate Profiler to see what it could turn up it very quickly began reporting that the NHibernate session was running in multiple-threads and that NHibernate was not designed to be thread safe.

The Iterator Patten in .NET

In .NET the iterator pattern is exposed via an IEnumerator or IEnumerator<T> and there is some syntactic sugar so that you can create an iterator method using yield return. There is also syntactic sugar surrounding the consumption of iterators via foreach. This almost completely hides the complexities of IEnumerator implementations.

There are some limitations to this. The interface is inherently not thread safe as it does not provide for an atomic operation that retrieves an element and moves the internal pointer on to the next. You have to call MoveNext() followed by Current if it returned true. If the iterator needs thread-safety, it is the responsibility of the caller to provide it.

But, then this happens…

Knowing this, I would have assumed (always a bad idea, but I’m only human) that Parallel.ForEach() operates over the iterator in a single thread, but farms out each loop to different threads, but I was wrong. Try the following code for yourself and see what happens:

public class Program
{
    public static void Main(string[] args)
    {
        Parallel.ForEach(
            YieldedNumbers(),
            (n) => { Thread.Sleep(n); });
        Console.WriteLine("Done!");
        Console.ReadLine();
    }

    public static IEnumerable<int> YieldedNumbers()
    {
        Random rnd = new Random();
        int lastKnownThread = Thread.CurrentThread.ManagedThreadId;
        int detectedSwitches = 0;
        for (int i = 0; i < 1000; i++)
        {
            int currentThread = Thread.CurrentThread.ManagedThreadId;
            if (lastKnownThread != currentThread)
            {
                detectedSwitches++;
                Console.WriteLine(
                    $"{detectedSwitches}: Last known thread ({lastKnownThread}) is not the same as the current thread ({currentThread}).");
                lastKnownThread = currentThread;
            }
            yield return rnd.Next(10,150);
        }
    }
}

The Action<int> passed to the Parallel.ForEach simply simulates some work being done (and the times sent to the Thread.Sleep() are roughly analogous to the times of the tasks in the original project).

What I’ve done here also is detect when the thread changes and report that to the console. It happens roughly 15%-18% of the time on the runs I’ve made on my machine. Now that was surprising (not really, because NHibernate Profiler had already told me – but to have a very clean example of the same was). I can’t blame any weirdness in third party libraries. It happens with some very basic .NET code in a console application.

Possible Solutions

1. My first thought was to dump all the data retrieved from the iterator into a collection of some sort (e.g. an array or list), but the iterator was originally put in place because the volume of data was causing memory pressure. The app ran overnight and will process anything between a few hundred to a few hundred thousand customers and testing found that it significantly slowed down around the 7000 mark because of the size of the data, and fell over completely not far past that. So, the iterator that I created hides the fact that I now page the data, the calling code knows nothing about this paging and didn’t have to be modified. So that solution was out of the question, we’d be back to the problem we had a while ago.

2.The data could be processed in batches and each fully retrieved batch be run in parallel one at at time. I did try that but it just made the calling code difficult to read and more complex than it needed to be. The reader has to be able to understand why there are batches, and the person writing the code has to remember that the data may not fit an exact number of batches and will have to process the final batch outside the loop which adds to the cognitive load on the reader/maintainer.

public static void Main(string[] args)
{
    int batchSize = 97;
    List batch = new List<int>();
    foreach (int item in YieldedNumbers())
    {
        batch.Add(item);
        if (batch.Count >= batchSize)
            ProcessBatch(batch);
    }
    ProcessBatch(batch);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static int batchCount = 0;
private static void ProcessBatch(List<int> batch)
{
    batchCount ++;
    Console.WriteLine($"Processing batch {batchCount} containing {batch.Count} items");
    Parallel.ForEach(batch, (n) => { Thread.Sleep(n); });
    batch.Clear();
}

// The YieldedNumbers() method is unchanged from before.

The iterator is always called from a single thread and therefore never complains on this set up.

3. Use the Microsoft Data Flow for the Task Parallel library. Personally, I think this one is best because the pattern is clear and the complex bits can be moved away from the main algorithm. The only part I didn’t like was the effort to set up the Producer/Consumer pattern using this library, but it handles all the bits I want to abstract away quite nicely… And that set up can be abstracted out later. Here’s the basic algorithm.

public static void Main(string[] args)
{
    var producerOptions = new DataflowBlockOptions { BoundedCapacity = 97 };
    var buffer = new BufferBlock<int>(producerOptions);
    var consumerOptions = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = Environment.ProcessorCount,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    var consumer = new ActionBlock<int>( n=> {  Thread.Sleep(n); }, consumerOptions);
    buffer.LinkTo(consumer, linkOptions);
    Produce(buffer);
    Task.WaitAll(consumer.Completion);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static void Produce(ITargetBlock target)
{
    foreach (var n in YieldedNumbers())
    {
        // Normally, this will return immediately, but if the queue has
        // reached its limit then it will wait until the consumer has
        // processed items on the queue.
        Task.WaitAll(target.SendAsync(n));
    }
    // Set the target to the completed state to signal to the consumer
    // that no more data will be available.
    target.Complete();
}

I originally had the the Produce() method as an async/await method… But that didn’t work, it seems that doing that the iterator shifts around threads again because when the code wakes up after the await it may be restarted on a new thread. So I put it back to a simple Task.WaitAll() and it kept it all on the same thread.

The producer options are set so that the queue size is limited, it stops pulling from the producer if the queue reaches capacity and thus it keeps the app running smoothly. The producer won’t over produce.

The consumer options need to be set explicitly otherwise it acts on a single thread. Unlike other things in the TPL it won’t necessarily optimise for the number of cores you have, you have to specify that, and a crude rule of thumb for getting that number is Environment.ProcessorCount (crude, because if you have hyper threading it can treat that as being multiple processor cores). However, it is good enough unless you really need to optimise things accurately.

Now, a lot of this can be abstracted away so that the calling code can just get on with what it needs without the distractions that this pattern introduces.

Most of this code can be extracted out to a class that extends IEnumerable<T>

public static class IEnumerableExtensions
{
    public static void ConsumeInParallel<T>(this IEnumerable<T> source, Action<T> action, int queueLimit = int.MaxValue)
    {
        var producerOptions = new DataflowBlockOptions { BoundedCapacity = queueLimit };
        var buffer = new BufferBlock<T>(producerOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = Environment.ProcessorCount,
            MaxDegreeOfParallelism = Environment.ProcessorCount
        };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
        var consumer = new ActionBlock<T>(action, consumerOptions);
        buffer.LinkTo(consumer, linkOptions);
        Produce(source, buffer);
        Task.WaitAll(consumer.Completion);
    }

    private static void Produce<T>(IEnumerable<T> source, ITargetBlock<T> target)
    {
        foreach (var n in source)
            Task.WaitAll(target.SendAsync(n));
        target.Complete();
    }
}

With this, we can use any IEnumerator<T> as a source of data and it will happily process it. The queueLimit ensures that we don’t end up with too much data waiting to be processed as we don’t want memory pressures causing the app to become unstable.

The calling code now looks much neater:

public static void Main(string[] args)
{
    YieldedNumbers().ConsumeInParallel(n=> {Thread.Sleep(n);}, 97);

    Console.WriteLine("Done!");
    Console.ReadLine();
}
Software Development

Debugging a process that cannot be invoked through Visual Studio.

Sometimes it is rather difficult to debug through Visual Studio directly even although the project is right there in front of you. In my case I have an assembly that is invoked from a wrapper that is itself invoked from an MSBuild script. I could potentially get VS to invoke the whole pipeline but it seemed to me a less convoluted process to try and attach the debugger to the running process and debug from there.

But what if the process is something quite ephemeral. If the process starts up, does its thing, then shuts down you might not have time to attach a debugger to it before the process has completed. Or the thing you are debugging is in the start up code and there is no way to attach a debugger in time for that.

However there is something that can be done (if you have access to the source code and can rebuild).

for (int i = 30; i >= 0; i--)
{
    Console.WriteLine("Waiting for debugger to attach... {0}", i);
    if (Debugger.IsAttached)
        break;
    Thread.Sleep(1000);
}

if (Debugger.IsAttached)
{
    Console.WriteLine("A debugger has attached to the process.");
    Debugger.Break();
}
else
{
    Console.WriteLine("A debugger was not detected... Continuing with process anyway.");
}

You could get away with less code, but I like this because is is reasonably flexible and I get to see what’s happening.

First up, I set a loop to count down thirty seconds to give me time to attach the debugger. On each loop it checks to see if the debugger is attached already and exits early if it has (This is important as otherwise you could attach the debugger then get frustrated waiting for the process to continue.)

After the loop (regardless of whether it simply timed-out or a debugger was detected) it does a final check and breaks the execution if a debugger is detected.

Each step of the way it outputs to the console what it is doing so you can see when to attach the debugger and you can see when the debugger got attached, or not.

My recommendation, if you want to use this code, is to put it in a utility class somewhere that you can call when needed, then take the call out afterwards.