Software Consulting

LMAX Disruptor Example Project

The LMAX Disruptor code was released way back in 2011 with much fanfare, a white paper, and various blog posts. Since then, it has been refined, documented, emulated and referenced by a number of projects, but when it was first released there were few example projects and tutorials. Figuring out how it worked and what the use-cases were was an exercise in reverse engineering the source and digging through the various slide decks and blog posts that mostly discussed the why of the framework, not the how-to.

In 2012, I worked up a sample project while trying to learn how I might leverage the disruptor pattern for a concurrent programming problem I was trying to solve at the time. I was won over by the apparent simplicity of its promise - avoiding the problems of multi-threading by simply not doing multi-threading was attractive, and the initial benchmarks were impressive. The concept of mechanical sympathy seemed to make sense, even to someone with no real talent in electrical engineering or CPU design.

Now achieving a level of maturity, it is relatively easy for a developer to come up to speed on the framework, but I decided to write up my original sample project anyway, just in case it might help someone just starting to ascend the learning curve.

Background

I won't go into great detail on what the disruptor is, since the white-paper referenced in the Resources section below does a far better job than I could, but a minimal background will be helpful going forward, so we'll only hit a couple of the high points here.

The core structure of the disruptor is the Ring Buffer, which, as the name implies is a circular buffer of sequential slots that hold the events (or tasks) to be processed. producers and consumers track the sequence numbers of available vs. in-use slots to achieve concurrency with a minimum of inter-thread communication. This, plus the contiguous structure of the buffer in memory, coupled with the pre-allocation of events or tasks within the ring allow for the optimal use of CPU caches, a concept referred to by LMAX as mechanical sympathy.

The library open-sourced by LMAX contains the Ring Buffer and supporting code, plus a Java-based DSL for configuring and starting it up.

A Real-world Example

I chose for my example a real-world concurrency pattern often referred to as a diamond configuration. In this arrangement, two tasks must complete before a third can begin, but these two may be processed concurrently. When diagramed, this looks a bit like a diamond, hence the name:

Our real-world example is inspired by a pattern of in-memory storage known as event-sourcing. In this pattern, the state of a system can always be rebuilt by replaying a journal of events. Data retrieval from the system is very fast, as the current state is always in memory.

Such systems require a persistent journal and some form of redundancy for both performance and fault-tolerance. In our example code, we examine a simple bank-account transaction processor patterned on event-sourcing. Before processing a transaction event, it must be replicated and journaled. These three phases: journaling, replication, and posting, form the points on our diamond.

Business Logic Layer

A simple in-memory database of bank accounts is implemented. It is important to note that this implementation is completely independent of the Disruptor-specific features to be discussed a little later.

AccountStore

For demo purposes, the simplest possible in-memory store is implemented based on a HashMap with account numbers as keys. The map stores instances of Account model objects.

The Model

For purposes of this simplistic example, only the bare minimum is implemented to keep the code as brief and clear as possible.

Account - The account object is a simple storehouse for state. In the case of this example, the current account balance. For demo purposes, each account also keeps an ordered list of transactions that have posted against it, though this is not the way one might keep historical data in a real-world scenario.

Transaction - The transaction object represents the data required to increase or decrease the account balance.

Disruptor Implementation

This part of the implementation contains the Disruptor-specific portion of the application logic. Here we establish the infrastructure classes, build the ring buffer, and use it to process transaction events.

Transaction Event

The TransactionEvent is just a wrapper around a Transaction model object. Its main purpose is to separate the disruptor-specific logic from our model, and to expose one static factory class that will be used when publishing events to the ring buffer:

    public final static EventFactory EVENT_FACTORY = new EventFactory() {
        public TransactionEvent newInstance() {
            return new TransactionEvent();
        }
    };        

EventFactory is an interface specified by the Disruptor framework. In our simple example, it does not do much, but it should be clear that it's purpose is to build events of a type compatible with the ring buffer.

Event Handlers

The EventHandler intferface is also part of the Disruptor framework. It specifies one method:

    onEvent(T event, long sequence, boolean endOfBatch);

In our example, we define three event handlers corresponding to the three processing phases previously described: journaling, replication, and posting.

Transaction Processor

The TransactionProcessor is where the ringbuffer is instantiated and its lifecycle managed. In addition, it exposes a service method for posting a transaction. The TransactionProcessor uses the Disruptor DSL to establish the ring buffer in an init() method:

    public void init() {
        disruptor = new Disruptor(
                TransactionEvent.EVENT_FACTORY, 
                1024, 
                EXECUTOR,
                ProducerType.SINGLE,
                new YieldingWaitStrategy());
        
        // Pretend that we have real journaling, just to demo it...
        File journalDir = new File("target/test");
        journalDir.mkdirs();
        File journalFile = new File(journalDir, "test-journal.txt");
        
        // In this example start fresh each time - though a real implementation
        // might roll over the journal or the like.
        if (journalFile.exists()) {
            journalFile.delete(); 
        }

        journal = new JournalTransactionHandler(journalFile);
        
        replicate = new ReplicateTransactionHandler();
        
        post = new PostTransactionHandler(accountStore);

        // This is where the magic happens 
        // (see "diamond configuration" above)
        disruptor.handleEventsWith(journal, replicate).then(post);
        
        // We don't do any fancy exception handling in this demo, but if we
        // did, one way to set it up for each handler is like this:
        ExceptionHandler exh = new GenericExceptionHandler();
        disruptor.handleExceptionsFor(journal).with(exh);
        disruptor.handleExceptionsFor(replicate).with(exh);
        disruptor.handleExceptionsFor(post).with(exh);

        ringBuffer = disruptor.start();
        
    }

In a real implementation, the lifecycle of the initialized ring-buffer would be managed by a Spring bean or similarly long-lived application object. At that point, posting a transaction is as simple as invoking the postTransaction method, implemented in TransactionProcessor as:

    public void postTransaction(Transaction transaction) {
        disruptor.publishEvent(new TransactionEventPublisher(transaction));
    }

This method uses another Disruptor specific entity, an event publisher, which implements the EventTranslator interface. Implementing classes must copy the attributes of a particular event into attributes of one of the pre-allocated slots in the ring-buffer. If needed, the actual sequence of the availabe buffer slot is provided.

    class TransactionEventPublisher implements EventTranslator {
        private Transaction transaction;
        
        public TransactionEventPublisher(Transaction transaction) {
            this.transaction = transaction;
        }

        public void translateTo(TransactionEvent event, long sequence) {
            event.setTransaction(transaction);
            event.setBufferSeq(sequence); // We don't really use this, just demonstrating its availability
        }
    }

Sample Project

Resources


comments powered by Disqus