How to use a disruptor with multiple message types

Configure the disruptor to use objects containing a fixed size byte array (as recommended by How should one use Disruptor (Disruptor Pattern) to build real-world message systems?). In this case, the main thread must encode the messages into byte arrays before publishing them to the disruptor and each of the business logic threads must decode the byte arrays back into objects upon receipt. The downside of this setup is that the business logic threads are not truly sharing the memory from the disruptor - instead they are creating new objects (and thus creating garbage) from the byte array provided by the disruptor. The upside of this setup is that all business logic threads can read multiple different types of messages from the same disruptor.

This would be my preferred approach, but I slightly coloured by our use cases, just about every place that we've used the Disruptor it's either receiving from or sending to some sort of I/O device, so our basic currency is byte arrays. You can get around the object creation by using a flyweight approach to marshalling. To see an example of this, I used Javolution's Struct and Union classes in an example that I presented at Devoxx (https://github.com/mikeb01/ticketing). If you can completely deal with the object before returning from the onEvent call from the event handler then this approach works well. If the event needs to live beyond that point then you need to make some sort of copy of the data, e.g. de-serialising it into an object.

Configure the disruptor to use a single type of object but create multiple disruptors, one for for each object type. In the case above, there would be two separate disruptors - one for objects of type A and another for objects of type B. The upside of this setup is that the main thread doesn't have to encode the object to a byte array and the business less logic threads can share the same objects as used in the disruptor (no garbage created). The downside of this setup is that somehow each business logic thread will have to subscribe to messages from multiple disruptors.

Not tried this approach, you'd probably need a custom EventProcessor that can poll from multiple ring buffers.

Configure the disruptor to use a single type of "super" object that contains all fields of both message A and B. This is very against OO style, but will allow for a compromise between option #1 and #2. Configure the disruptor to use object references. However, in this case I lose the performance benefits of object preallocation and memory ordering.

We've done this in a couple of cases where some cases where lack of preallocation is tolerable. It works okay. If you are passing objects then you need to make sure that you null them out once you are finished with them on the consumer side. We found that using a double dispatch pattern for the "super" object kept the implementation fairly clean. One drawback to this is that it you will get slightly longer GC stalls that with something that was a straight array of objects as the GC has more live objects to traverse during the mark phase.

What do you recommend for this situation? I feel that option #2 is the cleanest solution, but I don't know whether or how consumers can technically subscribe to messages from multiple disruptors. If anyone can provide an example for how to implement option #2, it would be much appreciated!

Another option, if you want complete flexibility with regards to the use of data, is to not use the ring buffer, but instead talk directly to the Sequencer and define your object layout as you best see fitting.


Ben Baumgold, I am sure you found a solution by now. Your #4 (or #3) can be implemented trivially by creating an event holder. Think of it as enum for Objects. To speed look-ups, events should be enriched with an enum type. Notice, I am storing a reference to the original event in the holder. It may be more appropriate to create a copy constructor or clone() and copy events on insertion into the ring buffer.

Illustrating by example:

// this is enum used in events

public enum MyEventEnum {
EVENT_TIMER,
EVENT_MARKETDATA;
}

// this is holder. At any time, this instance in ringbuffer holds just one event indexed by array[ type.ordinal() ]. why array should be obvious from the code.

public class RingBufferEventHolder {    
 private MyEventEnum;   
 private EventBase array[];

 public RingBufferEventHolder() {
    array=new EventBase[MyEventEnum.values().length]; 
 }

 // TODO: null the rest
 public void setEvent(EventBase event) {
    type=event.getType();
    switch( event.getType() ) {
        case EVENT_TIMER:
            array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
            break;
        case EVENT_MARKETDATA:
            array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
            break;
        default:
            throw new RuntimeException("Unknown event type " + event );
    }
}

// publish event

   EventBase newEvent=new EventMarketData(....);
   // prepare
   long nextSequence = ringBuffer.next(); 
   RingBufferEventHolder holder = ringBuffer.get(nextSequence);
   holder.setEvent(newEvent);
   // make the event available to EventProcessors 
   ringBuffer.publish(nextSequence);