High performance message processing with disruptor.net

30. december 2015

High performance message processing with disruptor.net

Concurrency and message queueing is not easy - Actually it can be quite the pain and when solving it you might end up destroying performance in your application.

Some years ago, a guy named Martin Thompson, running a blog called mechanical sympathy which I highly recommend, decided enough was enough and invented the disruptor pattern. First implemented in Java (I think), later ported to various other languages including .NET.

This post will demo some of the basic features of the disruptor framework including basic error handling and monitoring.

This post will cover:

 

What's the problem?

The disruptor was designed to act as a high performance message queue for the financial world. The financial world is experiencing massive data spikes on macro events and the systems could not keep up. It does that by receiving messages on one thread and dispatching them to workers (event handlers) on other threads when they are available - without the use of locks. Sounds quite simple - Let's keep it there :)

To illustrate, here are a couple of examples on how the disruptor can be set up. Each producer and consumer runs in its own thread.

 

The ringbuffer serves to absorb data spikes coming in from the producers. When the consumers are available events will be dispatched to them. If your consumers are slower than your producers, it does not matter to have a queue. Your queue will be filled and all kinds of pain will hit you.

 

Hello world

This hello world example will implement a single producer and a single consumer in a console application.

To get started, create a new console application and get the disruptor package from nuget.

PM> Install-Package Disruptor.Net

 

Add the following three classes to your project.

QueueItem.cs

public class QueueItem
    {
        public int IntValue { get; set; }
        public void Update(QueueItem other)
        {
            IntValue = other.IntValue;
        }
    }

 

EventHandler.cs

using System;
using System.Threading;
using Disruptor;

public class EventHandler : IEventHandler<QueueItem>
{
    public void OnNext(QueueItem data, long sequence, bool endOfBatch)
    {
        Console.WriteLine("Received seqno: {0} on thread id {1}. Int: {2}", sequence, Thread.CurrentThread.ManagedThreadId, data.IntValue);
    }
}

 

Queue.cs

using System;
using System.Threading;
using System.Threading.Tasks;
using Disruptor.Dsl;
using Disruptor;

public class Queue
{
    private RingBuffer<QueueItem> ringBuffer;
    private Disruptor<QueueItem> disruptor;

    public void Start()
    {
        var eventHandler = new EventHandler();
        disruptor = new Disruptor<QueueItem>(() => new QueueItem(), (int)Math.Pow(64, 2), TaskScheduler.Default);
        disruptor.HandleEventsWith(eventHandler);
        ringBuffer = disruptor.Start();
    }

    public void Stop()
    {
        disruptor.Halt();
    }

    public void Enqueue(QueueItem item)
    {
        var next = ringBuffer.Next();
        ringBuffer[next].Update(item);
        ringBuffer.Publish(next);
        Console.WriteLine("Enqueing on thread id {0}", Thread.CurrentThread.ManagedThreadId);
    }
}

 

And finally, your program.cs should look something like this.

using System;

class Program
{
    static void Main(string[] args)
    {
        var queue = new Queue();
        queue.Start();
        
        var qItem = new QueueItem { IntValue = 10 };
        int counter = 1;
        while (true)
        {
            qItem.IntValue = counter;
            queue.Enqueue(qItem);
            Console.ReadKey();
            counter++;
        }
    
        queue.Stop();
    }
}

 

This example produce an event each time you hit a key. It will output which thread is used for queening and which thread is used for processing.

 

Thoughput example

In order to how much we can pump through the disruptor, I have create a small test application. It's available on github here.

Running it in release mode on my home PC (i7-8320 @ 3,6Ghz) gives me about 10mio per second. Please note that there is no processing in the example, only counting.

It's measuring performance via a nifty nuget package, made by yours truly, called Disruptor-net.Metrics. It's also available on my github page.

 

Also note that if you run a memory profiler, DotMemory in this case, it's completely flatlined meaning that it does not produce any garbage. At all. Including the metrics library. That's good news as garbage equals death of performance.