Paramore Brighter: DRY with Custom Decorated Command Handlers

You may wish to add similar functionality to many (or all) command handlers. The typical example is logging. You can decorate a command handler in a similar way to the policies I showed in previous posts to add common functionality. I’ve used this technique to guard the handler from invalid command arguments/parameters (essentially a validator), and for ensuring that we ping our APM (Application Performance Management) tool when a command completes. I’ll use the latter to demonstrate creating a custom decorator and handler to initiate this common code.

Paramore Brighter Command Processor will look for any attributes derived from RequestHandlerAttribute that are added to the Handle method on your command handler class. It will then use them to build a pipeline for your command.

So, in the example here, our attribute class looks like this:

public class HeartbeatAttribute : RequestHandlerAttribute
{
    public HeartbeatAttribute(int step, HandlerTiming timing = HandlerTiming.After) : base(step, timing)
    {
    }

    public override Type GetHandlerType()
    {
        return typeof(HeartbeatHandler<>);
    }
}

We are deriving from RequestHandlerAttribute, and it has an abstract method that you need to implement. GetHandlerType() returns the type of handler that needs to be instantiated to handle the common task.

The RequestHandlerAttribute class also takes two arguments for its constructor that you either need to capture from users of your attribute or supply yourself. It takes a step and a timing parameter. Since we’ve already talked about step in a previous post we’ll move on to talking about timing.

The two options for timing are Before and After. In the previous examples the timing has been implicitly set to Before because the handler needed perform actions before your target handler (the one that you decorated). If you set the timing to After it only actions after your target handler.

In the example here, the timing is set After because we want to make sure that the the handler completed correctly before our handler runs. So, if it throws an exception then our heartbeat handler won’t run. If you need to perform an action before and after, then set the timing to Before, and perform actions before the call to base.Handle() and after the call.

Our heartbeat handler looks like this:

public class HeartbeatHandler<TRequest> : RequestHandler<TRequest> where TRequest : class, IRequest
{
    public override TRequest Handle(TRequest command)
    {
        // We would probably call a heartbeat service at this point.
        // But for demonstration we'll just write to the console.

        Console.WriteLine($"Heartbeat pulsed for {command.GetType().FullName}");
        string jsonString = JsonConvert.SerializeObject(command);
        Console.WriteLine(jsonString);

        return base.Handle(command);
    }
}

The important thing, as will all handlers, is to remember the call to the base.Handle() which ensures the pipeline is continued.

The target handler decoration looks like this:

[FallbackPolicy(step:1, backstop:true, circuitBreaker:false)]
[UsePolicy(policy: "GreetingRetryPolicy", step:2)]
[Heartbeat(step:3)]
public override SalutationCommand Handle(SalutationCommand command)
{
    // Stuff to handle the command.

    return base.Handle(command);
}

The first two decorators are from previous posts (Retrying Commands and Implementing a fallback exception handler) while the third is our new decorator.

When run, you can see that if the service fails completely (i.e. all the retries failed) then the Heartbeat does not get run. However, if the command succeeds then the heartbeat handler is run. Our APM knows the command succeeded and can display that.

Remember

Remember to wire up the handler, as with all handlers, to your dependency injection framework, so that it can be correctly instantiated:

serviceCollection.AddScoped(typeof(HeartbeatHandler<>));

Paramore Brighter with Quality of Service: Retrying Commands

Paramore Brighter supports Policies to maintain quality of service. This is useful when your command makes calls to external services, whether they are databases, web services, or any other end point that exists out of the process of your application. You can set up retry policies, circuit-breaker policies, and timeout policies. For this post, we’ll concentrate on setting up a retry policy.

The full code from this post is available on GitHub.com.

The SalutationHandler that we’ve been using in previous posts now emulates an external failure by throwing an exception in some cases. The policy handler will catch the exception and act on it, retrying the command if necessary.

Set up the policy

First off let’s set up the policy. In this case I’m going for an exponential backoff (doubling the wait time on each attempt) and it will perform a maximum of 4 attempts.

private static IAmAPolicyRegistry GetPolicies()
{
    var policyRegistry = new PolicyRegistry();

    // These are the default policies that must exist. 
    // We're not using them, so we're setting them to No-op
    policyRegistry.Add(CommandProcessor.RETRYPOLICY, Policy.NoOp());
    policyRegistry.Add(CommandProcessor.RETRYPOLICYASYNC, Policy.NoOpAsync());
    policyRegistry.Add(CommandProcessor.CIRCUITBREAKER, Policy.NoOp());
    policyRegistry.Add(CommandProcessor.CIRCUITBREAKERASYNC, Policy.NoOpAsync());
    
    // Sets up the policy that we're going to use 
    // for the SaluationHandler
    var greetingRetryPolicy = Policy
        .Handle<Exception>()
        .WaitAndRetry(new[]
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(2), 
            TimeSpan.FromSeconds(4) 
        }, (exception, timeSpan) =>
        {
            Console.WriteLine($" ** An error occurred: {exception.Message}");
            Console.WriteLine($" ** Waiting {timeSpan.Seconds} seconds until retry.");
        });

    policyRegistry.Add("GreetingRetryPolicy", greetingRetryPolicy);
    return policyRegistry;
}

The policies are defined using Polly, a .NET resilience and transient-fault-handling library.

The .Handle<Exception>() means the policy handles all exceptions. You might want it to be more specific for your use case. e.g. SqlException for database errors.

The WaitAndRetry(...) takes a set of timings (as TimeSpan objects) for how long to wait between attempts and an Action which is run between attempts. Although there are only 3 times here, it will make 4 attempts. Each time represents the amount of time after an attempt before retrying. The first attempt is performed immediately.

The Action allows you to set up what you want to do between attempts. In this case, I’ve only had it output to the console. You may wish to log the error, or take other actions that might help it work.

Finally, we add the policy to the registry and give it a name, so we can refer to it on our Handle method in our command handler class.

In order for Brighter to be able to use this policy, the Handler for it needs to be registered in the IoC container.

serviceCollection.AddScoped(typeof(ExceptionPolicyHandler<>));

The Command Handler

It should be noted that the regardless of the number retries that are made, they are all processed through the same instance of the command handler. This may be important if you store state to do with the progress of the command. It also might be important in case any services you rely on that are injected into the command handler get left in an undefined state if things go wrong.

[FallbackPolicy(step:1, backstop:true, circuitBreaker:false)]
[UsePolicy(policy: "GreetingRetryPolicy", step:2)]
public override SalutationCommand Handle(SalutationCommand command)
{
    ...
}

We still have our fallback that we set up in the previous post on Paramore Brighter, but we now have a UsePolicy attribute. And since we have two attributes the Step argument now becomes important.

The command processor sets up the policy and command handlers like a Russian doll, with the command handler right in the middle. The outer handler (doll) is step 1, then the one inside that is step 2, and so on until you get to the actual command handler. So, in this case at the very outside is the FallbackPolicy and it only does its thing if it gets an exception, the UsePolicy will act on exceptions before the fallback sees them most of the time.

The UsePolicy attribute takes the name of the policy that we set up earlier when we were creating the policy registry.

Analysing the StackTrace

So, when we ask to greet “Voldemort” it will always fail. We get a stack trace that shows off the Russian Doll quite well.

System.ApplicationException: A death-eater has appeared.
   at QualityOfService.SalutationHandler.ThrowOnTheDarkLord(SalutationCommand command) in C:\dev\BrighterRecipes\src\quality-of-service\quality-of-service\SalutationHandler.cs:line 46
   at QualityOfService.SalutationHandler.Handle(SalutationCommand command) in C:\dev\BrighterRecipes\src\quality-of-service\quality-of-service\SalutationHandler.cs:line 21
   at Paramore.Brighter.RequestHandler`1.Handle(TRequest command)

The above is our SaulatationHandler, starting from the top where the exception is thrown, until the point that our code is called by Paramore Brighter itself.

   at Paramore.Brighter.Policies.Handlers.ExceptionPolicyHandler`1.<>n__0(TRequest command)
   at Paramore.Brighter.Policies.Handlers.ExceptionPolicyHandler`1.<>c__DisplayClass2_0.b__0()
   at Polly.Policy.<>c__DisplayClass33_0`1.b__0(Context ctx, CancellationToken ct)
   at Polly.Policy.<>c__DisplayClass42_0`1.b__0(Context ctx, CancellationToken ct)
   at Polly.RetrySyntax.<>c__DisplayClass19_0.b__1(Context ctx, CancellationToken ct)
   at Polly.Retry.RetryEngine.Implementation[TResult](Func`3 action, Context context, CancellationToken cancellationToken, IEnumerable`1 shouldRetryExceptionPredicates, IEnumerable`1 shouldRetryResultPredicates, Func`1 policyStateFactory)
   at Polly.RetrySyntax.<>c__DisplayClass19_1.b__0(Action`2 action, Context context, CancellationToken cancellationToken)
   at Polly.Policy.Execute[TResult](Func`3 action, Context context, CancellationToken cancellationToken)
   at Polly.Policy.Execute[TResult](Func`1 action)
   at Paramore.Brighter.Policies.Handlers.ExceptionPolicyHandler`1.Handle(TRequest command)
   at Paramore.Brighter.RequestHandler`1.Handle(TRequest command)

The above section is all part of the retry handler, as defined by the policy we set up. Most of this code is in Polly, which is the quality of service package that Brighter uses.

   at Paramore.Brighter.Policies.Handlers.FallbackPolicyHandler`1.CatchAll(TRequest command)
// The rest of this isn't really part of the exception 
// stack trace, but I wanted to show you where it came from.
   at Paramore.Brighter.Policies.Handlers.FallbackPolicyHandler`1.Handle(TRequest command)
   at Paramore.Brighter.CommandProcessor.Send[T](T command)
   at QualityOfService.Program.Main(String[] args)

Finally, the most outer of the handlers (which you cannot normally see all of because it has caught the exception in CatchAll) before handing it off to our fallback handler.

Paramore Brighter: Implementing a fallback exception handler

So far in this series, we have a basic command processor set up and able to dispose of resources. But what happens when things go wrong? The command processor has a fallback mechanism to handle exceptions that are not caught.

To add this functionality all you need to do is to decorate your handler with a fallback policy attribute, add the Fallback handler into your Dependency Injection framework, and then override the Fallback method.

To add the Fallback handler to .NET Core’s Dependency Injection framework we add

serviceCollection.AddScoped(typeof(FallbackPolicyHandler<>));

to the BuildServiceProvider method. The typeof variant of AddScoped allows a more general type to be expressed. Otherwise, we’d have to add a fallback policy handler for each command.

Our little salutation handler now looks like this:

[FallbackPolicy(backstop:true, circuitBreaker:false, step:1)]
public override SalutationCommand Handle(SalutationCommand command)
{
    Console.WriteLine($"Greetings, {command.Name}.");
    ThrowOnTheDarkLord(command);
    return base.Handle(command);
}

(If you’ve not read Harry Potter, the reference is that if you use He-who-must-not-be-named’s name, then a death eater will appear and take you away. So if we use The Dark Lord’s name we’re throwing an exception)

Back to the code: The first line is the attribute decoration. In this case we say we have a fallback policy that acts as a backstop for any unhandled exception (backstop:true). We’ve not covered the Circuit Breaker so we’re not interested in that for the moment (circuitBreaker:false), and we’ve also not covered what happens if you have multiple attributes (step:1) so we’ll leave that as step 1 (of 1). I’ll come back to those things later.

Now, if we put “Voldemort” in as the Name in the command, it will throw an exception. So we have to handle that somehow. The RequestHandler class has a Fallback method which you can override in your derived handler class.

public override SalutationCommand Fallback(SalutationCommand command)
{
    if (this.Context.Bag
            .ContainsKey(FallbackPolicyHandler<SalutationCommand>
                         .CAUSE_OF_FALLBACK_EXCEPTION))
    {
        Exception exception = (Exception)this.Context
                              .Bag[FallbackPolicyHandler
                                   .CAUSE_OF_FALLBACK_EXCEPTION];
        Console.WriteLine(exception);
    }
    return base.Fallback(command);
}

What is happening here is that we are retrieving the Exception from the Context‘s Bag, which is just a Dictionary. Then we can do what we want with the Exception. In this simple example, I’m just writing it to the console, but you’ll most likely want to do something more with it in your application.

As you can see, this is a bit on the clunky side, so where I’ve used Brighter before, I’ve tended to introduce a class between RequestHandler and the specific handler to put in some things that help clean things up.

In this case the MyRequestHandler class looks like this:

public class MyRequestHandler<TCommand> 
             : RequestHandler<TCommand> where TCommand : class, IRequest
{
    public override TCommand Fallback(TCommand command)
    {
        if (this.Context.Bag
            .ContainsKey(FallbackPolicyHandler
                .CAUSE_OF_FALLBACK_EXCEPTION))
        {
            Exception exception = (Exception)this.Context
                .Bag[FallbackPolicyHandler
                    .CAUSE_OF_FALLBACK_EXCEPTION];
            return base.Fallback(ExceptionFallback(command, exception));
        }
        return base.Fallback(command);
    }

    public virtual TCommand ExceptionFallback(TCommand command, Exception exception)
    {
        // If exceptions need to be handled, 
        // this should be implemented in a derived class
        return command;
    }
}

At the top we can see that instead of a specific command we still have the generic TCommand, which needs to be a class and derived from IRequest. That wasn’t seen in the specific command handler because the explicit command already has these properties, so we didn’t need to express them again.

The Fallback method now contains the code that extracts the exception from the Context and calls ExceptionFallback. In this class ExceptionFallback does nothing except return the command back. When we implement it in our SalutationHandler, the code for handling the exception now looks like this:

public override SalutationCommand ExceptionFallback(SalutationCommand command, Exception exception)
{
    Console.WriteLine(exception);
    return base.ExceptionFallback(command, exception);
}

And that is so much nicer to read. We’ve extracted away the plumbing of retrieving the exception to the newly introduced base class and our command handler looks much neater as a result.

To view the source as a whole, see it on GitHub.

Paramore Brighter: Ensuring Dependencies are Disposed.

In my previous post, I showed how to set up Paramore Brighter with the built in Dependency Injection provided with .NET Core 2. However, it wasn’t the full story.

The code for this post is on GitHub.

In reality the various classes you might need will have different lifecyles, and along with that there are different needs for cleaning up. Some objects might be singletons and you get the same object back every time, some might be transient where you get a different object back every time, and in some cases you need the same object back for the the duration of the action you are doing, but a different object back at other times.

We’re going to look at the last scenario, objects that have a “scope”. For example, say somewhere you need to access a DbContext from Entity Framework. You probably want the same context for the duration of handling the command, but a separate one next time around. This is especially true if your application can handle multiple commands at the same time (e.g. An ASP.NET Core application) – You don’t want one handler to initiate SaveChanges() on the same context as another is still making changes to the data model.

We also want to make sure that any objects that need to be disposed of at the end of handling a command are properly disposed of, whether it is the handler itself, or an object that was injected into it.

To that end we’re going to make some changes to the code from the previous application.

The BuildServiceProvider() method changes the handlers to being scoped:

private static IServiceProvider BuildServiceProvider()
{
    var serviceCollection = new ServiceCollection();
    serviceCollection.AddScoped<SalutationHandler>();
    return serviceCollection.BuildServiceProvider();
}

The ServiceProviderHandler class that was created in the previous post needs to take into account that after a command is handled, the resources it uses need to be disposed.

public class ServiceProviderHandler : IAmAHandlerFactory
{
    private readonly IServiceProvider _serviceProvider;
    private readonly ConcurrentDictionary<IHandleRequests, IServiceScope> _activeHandlers;
    public ServiceProviderHandler(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
        _activeHandlers = new ConcurrentDictionary<IHandleRequests, IServiceScope>();
    }
    public IHandleRequests Create(Type handlerType)
    {
        IServiceScope scope = _serviceProvider.CreateScope();
        IServiceProvider scopedProvider = scope.ServiceProvider;
        IHandleRequests result = (IHandleRequests)scopedProvider.GetService(handlerType);
        if (_activeHandlers.TryAdd(result, scope))
            return result;

        scope.Dispose();
        throw new InvalidOperationException("The handler could not be tracked properly. It may be declared in the service collection with the wrong lifecyle.");
    }

    public void Release(IHandleRequests handler)
    {
        if (_activeHandlers.TryRemove(handler, out IServiceScope scope))
        {
            scope.Dispose();
        }
    }
}

The changes are that we now keep a dictionary of active command handlers and the scope they are in. When we are asked to Create() a new handler, we:

  • create a new scope,
  • get a services in the context of that scope, and
  • store the handler and scope in the dictionary (keyed on the handler)

If the handler cannot be added to the dictionary, then we Dispose() of the scope (which also disposes the handler if it is disposable) and we throw an exception to say that something went wrong. Generally, the same handler should never end up in the dictionary twice, but it might if it was set up with a Singleton lifecycle and multiple threads are trying to use it. So, we guard against that. In a single threaded application, this code is not likely to be hit even if the handler was defined as a Singleton because it will have been removed from the dictionary at the end of its previous operation.

Once the command has been handled, the Release() method is called which

  • looks up the handler in the dictionary to get the scope, while it
  • removes the handler and scope from the dictionary, and then
  • disposes of everything in that scope.

Just to show that this all works, I made the command handler implement the IDisposable interface and just put in a Console.WriteLine() to show that it was called.

public class SalutationHandler : RequestHandler<SalutationCommand>, IDisposable
{
    public override SalutationCommand Handle(SalutationCommand command)
    {
        Console.WriteLine($"Greetings, {command.Name}.");
        return base.Handle(command);
    }

    public void Dispose()
    {
        Console.WriteLine("I'm being disposed.");
    }
}

The Main() method now creates two commands:

commandProcessor.Send(new SalutationCommand("Christian"));
commandProcessor.Send(new SalutationCommand("Alisdair"));

And the resulting output is:

Greetings, Christian.
I'm being disposed.
Greetings, Alisdair.
I'm being disposed.

Paramore Brighter: Using .NET Core’s Dependency Injection

This is the first in a series of posts on Paramore.Brighter. I’m writing this as a series of recipes, with the aim of you picking up a point quickly and getting going with it.

The code for this post is on GitHub, you can find it here: GitHub Basic solution

In .NET Core there is now a Dependency Injection framework built in. Obviously, you can use your own, but for simplicity (and because a lot of people will take what comes in the box) I’m going to show you how to use the dependency injection framework that comes out of the box. It is what ASP.NET Core applications will use by default.

The Command & Handler

If you’ve already read a bit about how Paramore Brighter works, you’ll probably already know how to create commands and command handlers, but we’ll just recap anyway. We’re going to create a simple Hello World scenario.

Our command and handler look like this:

public class SalutationCommand : IRequest
{
    public Guid Id { get; set; }

    public string Name { get; }

    public SalutationCommand(string name)
    {
        Id = Guid.NewGuid();
        Name = name;
    }
}

public class SalutationHandler : RequestHandler<SalutationCommand>
{
    public override SalutationCommand Handle(SalutationCommand command)
    {
        Console.WriteLine($"Greetings, {command.Name}.");
        return base.Handle(command);
    }
}

Nothing too complex here. The command is used to pass some information to the handler, in this case a name, we’ll not worry about the Id for the moment, it is required by the IRequest interface, and at this stage can be anything you want. The handler then writes a greeting to the console using the name it was given.

Configuring the command processor

At a most basic level, the command processor needs to know just two things.

  1. How to map commands to their handler
  2. How to build a handler

Everything else it can do can come later, but without those two things it does not work.

The first thing the configuration does it build a registry of commands and their handlers.

private static SubscriberRegistry CreateRegistry()
{
    var registry = new SubscriberRegistry();
    registry.Register<SalutationCommand, SalutationHandler>();
    return registry;
}

The second thing it does is create a class, implementing the IAmAHandlerFactory interface, that will build the handler, and in our case, it uses the IServiceProvider to do that.

public class ServiceProviderHandler : IAmAHandlerFactory
{
    private readonly IServiceProvider _serviceProvider;
    public ServiceProviderHandler(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }
    public IHandleRequests Create(Type handlerType)
    {
        return (IHandleRequests)_serviceProvider.GetService(handlerType);
    }

    public void Release(IHandleRequests handler)
    {
    }
}

This is a very simple implementation that just calls the GetService() in the Create() method to get the command handler object from the container. It doesn’t do any clean up, or any validation.

Putting it all together

Finally, a builder object is used to wire all that together and produce a command processor

private static IAmACommandProcessor BuildCommandProcessor(IServiceProvider serviceProvider)
{
    var registry = CreateRegistry(); // 1. Maps commands to Handlers
    var factory = new ServiceProviderHandler(serviceProvider); // 2. Builds handlers

    var builder = CommandProcessorBuilder.With()
        .Handlers(new HandlerConfiguration(
            subscriberRegistry: registry,
            handlerFactory: factory))
        .DefaultPolicy()
        .NoTaskQueues()
        .RequestContextFactory(new InMemoryRequestContextFactory());

    return builder.Build();
}

There are other things this is doing, but for the moment we’re not concerned about them.

And that’s it, the only thing left is the entry point (the Main method) of the application.

static void Main(string[] args)
{
    var serviceProvider = BuildServiceProvider();
    var commandProcessor = BuildCommandProcessor(serviceProvider);

    commandProcessor.Send(new SalutationCommand("Christian"));

    Console.ReadLine();
}

When run, it emits a single line at the console, which reads:

Greetings, Christian

Join Null Check with Assignment

2017-07-16-join-null-check-with-assignment

I recently wrote some code and asked ReSharper to add a null check for me, which it did. Then it suggested that I could simplify the null check by joining it to the assignment.

Intrigued, I let it.

The code went from this:

public void SetMessage(string message)
{
    if (message == null) throw new ArgumentNullException(nameof(message));
    Message = message;
}

To this:

public void SetMessage(string message)
{
    Message = message ?? throw new ArgumentNullException(nameof(message));
}

So, I assign message to the property Message unless it is null in which case I throw the exception. This is a new feature in C# 7 called a “throw expression”.

At first glance, I thought it would still assign null to Message before throwing the exception, but that’s not what the code looks like underneath.

I got out my trusty dotPeek to see what it actually compiled to. (Don’t worry, I’m not going to show you IL, just what the C# looks like without the syntactic sugar). The result was this:

public void SetMessage(string message)
{
  string str = message;
  if (str == null)
    throw new ArgumentNullException("message");
  this.Message = str;
}

Excellent, it is still doing the null check in advance. So the semantics of what I wrote have not changed. That’s great. I learned something new today.

But…

ReShaper also suggested it in an overloaded version of that function that takes two parameters. And the result was not semantically equivalent. So, be careful. Here’s what happened there. I started with this:

public void SetMessage(string message, string transitionMessage)
{
    if (message == null) throw new ArgumentNullException(nameof(message));
    if (transitionMessage == null) throw new ArgumentNullException(nameof(transitionMessage));

    Message = message;
    TransitionMessage = transitionMessage;
}

Let ReSharper refactor to this:

public void SetMessage(string message, string transitionMessage)
{
    Message = message ?? throw new ArgumentNullException(nameof(message));
    TransitionMessage = transitionMessage ?? throw new ArgumentNullException(nameof(transitionMessage));
}

And, I’m beginning to get a little apprehensive at this point because I think I see a problem. In fact, when I look at it in dotPeek, I can see exactly what the issue is. Here’s the same code with the syntactic sugar removed:

public void SetMessage(string message, string transitionMessage)
{
  string str1 = message;
  if (str1 == null)
    throw new ArgumentNullException("message");
  this.Message = str1;
  string str2 = transitionMessage;
  if (str2 == null)
    throw new ArgumentNullException("transitionMessage");
  this.TransitionMessage = str2;
}

It does the first null check, then assigns to the Message property. Then it does the second null check… And that’s not what I want at all. This method should be an all or nothing proposition. Either both properties are set, or neither are changed and this isn’t the case any more.

Caveat Programmator, as they say in Latin.

Aggregate of columns, not rows

In SQL Server it is very easy to aggregate a value across many rows. You simply just write something like this:

SELECT MAX(SomeColumn)
FROM SomeTable

This will return one row with the aggregate value (in this case MAXimum) of the column.

However, what if you want to do this across columns. What if you have a table with two or more columns you want to aggregate for each row?

You can do this with a tedious case statement that just gets more and more cumbersome for each additional column you have, especially if any of the columns are nullable. Or, you can create a subquery with a table in it and you aggregate the table in the subquery exposing the final value to your main query already aggregated.

Here’s an example:

SELECT Id, 
       (SELECT MAX(num) FROM (VALUES(st.ANumber), (st.AnotherNumber)) AS AllNumbers(num)) AS MaxNumber
FROM SomeTable st

The second line contains the subquery which is then exposed as a column in the final result set.

The subquery effectively pivots the columns into rows, then aggregates the rows. Just be careful with where you put the brackets so that it interprets them as separate rows rather than columns.

This also deals with NULL values quite effectively too, since the aggregate function will ignore any null value it finds.

But why is the iterator operating in multiple-threads

Background

Recently, I had a bit of a problem with NHibernate when I was converting some code into parallel tasks. (If you have no interest in NHibernate, then don’t worry – it is just background to the issue I was having when I spotted this gulf between my expectation and reality. NHibernate is incidental to this and I won’t mention it much beyond this paragraph.) It turns out that Parallel.ForEach runs the iterator in multiple threads, not just the the action it performs on each item received from the iterator. NHibernate, being the source of the data was running inside the iterator and when I attached NHibernate Profiler to see what it could turn up it very quickly began reporting that the NHibernate session was running in multiple-threads and that NHibernate was not designed to be thread safe.

The Iterator Patten in .NET

In .NET the iterator pattern is exposed via an IEnumerator or IEnumerator<T> and there is some syntactic sugar so that you can create an iterator method using yield return. There is also syntactic sugar surrounding the consumption of iterators via foreach. This almost completely hides the complexities of IEnumerator implementations.

There are some limitations to this. The interface is inherently not thread safe as it does not provide for an atomic operation that retrieves an element and moves the internal pointer on to the next. You have to call MoveNext() followed by Current if it returned true. If the iterator needs thread-safety, it is the responsibility of the caller to provide it.

But, then this happens…

Knowing this, I would have assumed (always a bad idea, but I’m only human) that Parallel.ForEach() operates over the iterator in a single thread, but farms out each loop to different threads, but I was wrong. Try the following code for yourself and see what happens:

public class Program
{
    public static void Main(string[] args)
    {
        Parallel.ForEach(
            YieldedNumbers(),
            (n) => { Thread.Sleep(n); });
        Console.WriteLine("Done!");
        Console.ReadLine();
    }

    public static IEnumerable<int> YieldedNumbers()
    {
        Random rnd = new Random();
        int lastKnownThread = Thread.CurrentThread.ManagedThreadId;
        int detectedSwitches = 0;
        for (int i = 0; i < 1000; i++)
        {
            int currentThread = Thread.CurrentThread.ManagedThreadId;
            if (lastKnownThread != currentThread)
            {
                detectedSwitches++;
                Console.WriteLine(
                    $"{detectedSwitches}: Last known thread ({lastKnownThread}) is not the same as the current thread ({currentThread}).");
                lastKnownThread = currentThread;
            }
            yield return rnd.Next(10,150);
        }
    }
}

The Action<int> passed to the Parallel.ForEach simply simulates some work being done (and the times sent to the Thread.Sleep() are roughly analogous to the times of the tasks in the original project).

What I’ve done here also is detect when the thread changes and report that to the console. It happens roughly 15%-18% of the time on the runs I’ve made on my machine. Now that was surprising (not really, because NHibernate Profiler had already told me – but to have a very clean example of the same was). I can’t blame any weirdness in third party libraries. It happens with some very basic .NET code in a console application.

Possible Solutions

1. My first thought was to dump all the data retrieved from the iterator into a collection of some sort (e.g. an array or list), but the iterator was originally put in place because the volume of data was causing memory pressure. The app ran overnight and will process anything between a few hundred to a few hundred thousand customers and testing found that it significantly slowed down around the 7000 mark because of the size of the data, and fell over completely not far past that. So, the iterator that I created hides the fact that I now page the data, the calling code knows nothing about this paging and didn’t have to be modified. So that solution was out of the question, we’d be back to the problem we had a while ago.

2.The data could be processed in batches and each fully retrieved batch be run in parallel one at at time. I did try that but it just made the calling code difficult to read and more complex than it needed to be. The reader has to be able to understand why there are batches, and the person writing the code has to remember that the data may not fit an exact number of batches and will have to process the final batch outside the loop which adds to the cognitive load on the reader/maintainer.

public static void Main(string[] args)
{
    int batchSize = 97;
    List batch = new List<int>();
    foreach (int item in YieldedNumbers())
    {
        batch.Add(item);
        if (batch.Count >= batchSize)
            ProcessBatch(batch);
    }
    ProcessBatch(batch);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static int batchCount = 0;
private static void ProcessBatch(List<int> batch)
{
    batchCount ++;
    Console.WriteLine($"Processing batch {batchCount} containing {batch.Count} items");
    Parallel.ForEach(batch, (n) => { Thread.Sleep(n); });
    batch.Clear();
}

// The YieldedNumbers() method is unchanged from before.

The iterator is always called from a single thread and therefore never complains on this set up.

3. Use the Microsoft Data Flow for the Task Parallel library. Personally, I think this one is best because the pattern is clear and the complex bits can be moved away from the main algorithm. The only part I didn’t like was the effort to set up the Producer/Consumer pattern using this library, but it handles all the bits I want to abstract away quite nicely… And that set up can be abstracted out later. Here’s the basic algorithm.

public static void Main(string[] args)
{
    var producerOptions = new DataflowBlockOptions { BoundedCapacity = 97 };
    var buffer = new BufferBlock<int>(producerOptions);
    var consumerOptions = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = Environment.ProcessorCount,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    };
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
    var consumer = new ActionBlock<int>( n=> {  Thread.Sleep(n); }, consumerOptions);
    buffer.LinkTo(consumer, linkOptions);
    Produce(buffer);
    Task.WaitAll(consumer.Completion);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

private static void Produce(ITargetBlock target)
{
    foreach (var n in YieldedNumbers())
    {
        // Normally, this will return immediately, but if the queue has
        // reached its limit then it will wait until the consumer has
        // processed items on the queue.
        Task.WaitAll(target.SendAsync(n));
    }
    // Set the target to the completed state to signal to the consumer
    // that no more data will be available.
    target.Complete();
}

I originally had the the Produce() method as an async/await method… But that didn’t work, it seems that doing that the iterator shifts around threads again because when the code wakes up after the await it may be restarted on a new thread. So I put it back to a simple Task.WaitAll() and it kept it all on the same thread.

The producer options are set so that the queue size is limited, it stops pulling from the producer if the queue reaches capacity and thus it keeps the app running smoothly. The producer won’t over produce.

The consumer options need to be set explicitly otherwise it acts on a single thread. Unlike other things in the TPL it won’t necessarily optimise for the number of cores you have, you have to specify that, and a crude rule of thumb for getting that number is Environment.ProcessorCount (crude, because if you have hyper threading it can treat that as being multiple processor cores). However, it is good enough unless you really need to optimise things accurately.

Now, a lot of this can be abstracted away so that the calling code can just get on with what it needs without the distractions that this pattern introduces.

Most of this code can be extracted out to a class that extends IEnumerable<T>

public static class IEnumerableExtensions
{
    public static void ConsumeInParallel<T>(this IEnumerable<T> source, Action<T> action, int queueLimit = int.MaxValue)
    {
        var producerOptions = new DataflowBlockOptions { BoundedCapacity = queueLimit };
        var buffer = new BufferBlock<T>(producerOptions);
        var consumerOptions = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = Environment.ProcessorCount,
            MaxDegreeOfParallelism = Environment.ProcessorCount
        };
        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
        var consumer = new ActionBlock<T>(action, consumerOptions);
        buffer.LinkTo(consumer, linkOptions);
        Produce(source, buffer);
        Task.WaitAll(consumer.Completion);
    }

    private static void Produce<T>(IEnumerable<T> source, ITargetBlock<T> target)
    {
        foreach (var n in source)
            Task.WaitAll(target.SendAsync(n));
        target.Complete();
    }
}

With this, we can use any IEnumerator<T> as a source of data and it will happily process it. The queueLimit ensures that we don’t end up with too much data waiting to be processed as we don’t want memory pressures causing the app to become unstable.

The calling code now looks much neater:

public static void Main(string[] args)
{
    YieldedNumbers().ConsumeInParallel(n=> {Thread.Sleep(n);}, 97);

    Console.WriteLine("Done!");
    Console.ReadLine();
}

Overusing the Null-Conditional Operator

The null-conditional operator is the ?. between object and field/property/method. It simply says that if the thing on the left hand side is null then the thing on the right hand side is not evaluated. It is a shorthand, so:

if (a != null)
{
    a.DoSomething();
}

becomes

a?.DoSomething();

And that’s great. It makes life much simpler, and if you’re using ReSharper it will alert you when you could use this operator over a null guard check.

But, and this is quite a big “but”, I have noticed a trend to overuse it.

I’ve seen people do crazy stuff like replace most (almost all) instances of the dot-operator so their code is littered with these question-marks before the dot.

var result = myObject?.GetSomething()?.SomeValue?.ToString()?.Split()?.Where(s=>s?.Length > 0);

And when you get to that level of lunacy you’re basically turning on the old Visual Basic OnError Resume Next head-in-the-sand error handling anti-pattern.

I want to make this absolutely clear. The null-conditional operator is not bad, per se. However, over using it or using it without thought to the logic of your application is bad as it hides potential bugs.

You should only use it when you would normally do a null check in advance. If ReSharper says you can refactor your code to use it, then it most likely is fine to use (you were probably using a longer construct for the same thing already which implies you’ve most likely thought about it – No on likes writing reams of code for no good reason).

Application Configuration in .NET Core – Part 2

In the first part we got started by pulling in configuration data from multiple source in .NET Core, in this part we’ll look at mapping the configuration onto a set of classes so that is becomes easier to access.

This is something that is built into ASP.NET Core through dependency injection, so we’ll look at that.

In ASP.NET Core applications, the configuration is setup in the Startup class normally. It is added in the ConfigureServices method. There are two lines you need to add:

public void ConfigureServices(IServiceCollection services)
{
  services.AddOptions();
  services.Configure(GetConfiguration());
  // Other services are configured here.
}

In order to get them to compile you need an additional NuGet package called Microsoft.Extensions.Options.ConfigurationExtensions. This ensures that everything you need to have configuration converted to the type you specify is set up and that it can be dependency injected into your code.

The GetConfiguration() call is to get the generated IConfigurationRoot and looks similar to the way we set up the configuration int the previous post. For this example it looks like this:

private IConfigurationRoot GetConfiguration()
{
  var builder = new ConfigurationBuilder();

  builder.AddInMemoryCollection(new Dictionary<string, string>
  {
    { "InMemory", "This value comes from the in-memory collection" }
  });

  builder.AddEnvironmentVariables();
  builder.AddJsonFile("appSettings.json");

  return builder.Build();
}

I’ve added configuration from the environment variables as well.

As the app is configured to map the configuration settings to an object structure, here’s what it looks like:

public class MyConfiguration
{
  public string UserName { get; set; } // From the environment variable of the same name
  public string InMemory { get; set; } // From the in-memory collection
  public string RootItem { get; set; } // from appSettings.json
  public FavouriteStuff Favourites { get; set; } // from appSettings.json
  public string[] Fruits { get; set; } // from appSettings.json
}

public class FavouriteStuff
{
  public string TvShow { get; set; }
  public string Movie { get; set; }
  public string Food { get; set; }
  public string Drink { get; set; }
}

As you can see the structure can be deep if necessary which gives you quite a lot of flexibility.

The appSettings.json file looks like this:

{
  "RootItem": "This is at the root",
  "Favourites": {
    "TvShow": "Star Trek: The Next Generation",
    "Movie": "First Contact",
    "Food": "Haggis",
    "Drink":  "Cream Soda"  
  },
  "Fruits": [
    "Apples",
    "Oranges",
    "Pears",
    "Cherries",
    "Bananas",
    "Strawberries",
    "Raspberries"
  ]           
}

Now, the controller that needs the configuration information looks like this:

public class HomeController : Controller
{
    private readonly IOptions<MyConfiguration> _config;

    public HomeController(IOptions<MyConfiguration> config)
    {
        _config = config;
    }
    public IActionResult Index()
    {
        JsonSerializerSettings settings = new JsonSerializerSettings();
        settings.Formatting = Formatting.Indented;
        return new JsonResult(_config.Value, settings);
    }
}

All this does is take the `MyConfiguration` object created by the framework on our behalf and render it as JSON to the browser. The key parts are that the constructor takes an IOptions<MyConfiguration> reference, which you store in the controller and you can then access as needed in any of the methods of the controller.

Finally, the output in the browser looks as follows.

{
  "UserName": "colin.mackay",
  "InMemory": "This value comes from the in-memory collection",
  "RootItem": "This is at the root",
  "Favourites": {
    "TvShow": "Star Trek: The Next Generation",
    "Movie": "First Contact",
    "Food": "Haggis",
    "Drink": "Cream Soda"
  },
  "Fruits": [
    "Apples",
    "Oranges",
    "Pears",
    "Cherries",
    "Bananas",
    "Strawberries",
    "Raspberries"
  ]
}

It looks very similar to the appSettings.json file, but you can see that it has, in addition, the “UserName” and “InMemory” elements which don’t appear in that file.