Software Consulting

Node.js, AMQP, and Server-Sent Events

In my recent article Event Collection with Node.js, I explored (among other things) using node and express to integrate with RabbitMQ to produce messages. In this article, we'll explore using a similar setup to consume messages from RabbitMQ and relay them to a browser-based client using server-sent events. Getting this working requires a server-side component, as well as a surprisingly small amount of client-side javascript.

The Server Side

We can reuse the basic framework of our Express application from the previous article. Our dependencies are once again on the AMQP library and Express. Our server must establish a connection to the RabbitMQ broker and consume messages from a queue. It then exposes a pair of resources, one is the url which serves our events, and the other is simply a static html file that contains our client-side code. Note that we do this to satisfy the requirement that our javascript can only connect to the server it originated on. There are other ways to avoid this problem, but they are beyond the scope of this article.

{
  "name" : "ServerSentEvents",
  "version" : "0.0.1",
  "dependencies" : {
    "express" : "3.5.0",
    "amqp" : "https://github.com/postwait/node-amqp/tarball/master"
  }
}
        

As of this writing, there is a bug in the server confirms in the release version of the amqp package, this issue is fixed in the main codeline, hence the git url in place of a release version. Take a look at the code below, keeping in mind that robust error handling and other production features have been omitted for brevity.

var express = require('express');
var amqp = require('amqp'); 
var connections = [];

var connection = amqp.createConnection({ 
    host: 'localhost', 
    port: 1572, 
    login: 'guest', 
    password: 'guest', 
    authMechanism: 'AMQPLAIN', 
    vhost: '/', 
    ssl: { enabled : false }
});

var queue;

connection.addListener('ready', function(){
    console.log('Connection is ready');
    // Once the connection is ready, we subscribe to the queue
    connection.queue('org.wjb.ha.TestQ', {'noDeclare': true}, function (q) {
        console.log('Queue ' + q.name + ' is open');
        queue = q;
        queue.subscribe(function(message, headers, deliveryInfo, messageObject) {
            consume(message, headers, deliveryInfo, messageObject);
        });
    });
});

app = express();
app.use(express.static(__dirname + '/public'));

app.get('/', function(req, res){
    res.sendfile('index.html', { root: __dirname + "/public" } );
});

app.get('/events', function(req, res) {
    if (req.headers.accept == 'text/event-stream') {
        res.writeHead(200, {
            'content-type': 'text/event-stream',
            'cache-control': 'no-cache',
            'connection': 'keep-alive'
        });
        connections.push(res);
        console.log('Connection added for ' + req.ip);

        req.on('close', function () {
            removeConnection(res);
        });
    } else {
        res.send(500, 'This path for EventSource subscription only...');
    }
});

app.listen(8083);

function consume(message, headers, deliveryInfo, messageObject) {
    if (deliveryInfo.contentType==='text/plain') {
        broadcast(message.data);
    } else if (deliveryInfo.contentType==='application/json') {
        broadcast(JSON.stringify(message));
    }
}

function broadcast(data) {
    var id = (new Date()).toLocaleTimeString();
    connections.forEach(function (res) {
        writeEvent(res, id, data);
    });
}

function writeEvent(res, id, data) {
    res.write('id: ' + id + '\n');
    res.write('data: ' + data + '\n\n');
}

function removeConnection(res) {
    var i = connections.indexOf(res);
    if (i !== -1) {
        connections.splice(i, 1);
        console.log('Connection closed.');
    }
}
        

Let's examine what's happening here. The first few blocks of code establish our connectivity to RabbitMQ. Once the connection is established, we subscribe to a queue. In this example, the queue has already been set up on the broker, so we specify noDeclare=true. The call to subscribe delegates to our consume function, discussed in a moment. At this point, you see that we expose two resource via GET request, one at "/" which serves our html page, and the more interesting one at "/events" which is the resource our client uses to receive events.

Because server-sent events work by holding open the connection for a longer duration, we accommodate multiple clients by keeping an array of connections. As messages are received on the queue, we can then broadcast them to all of our clients. At this point, all the basic infrastructure is established.

This leaves the final four functions in our source code, and this is where all the magic happens. The first function consume is how messages from the queue get to us. In this simple example implementation, you can see that we support two message types: plain-text and json objects. Note that the AMQP client is smart enough to process JSON messages into objects, hence the use of stringify before broadcasting them to our clients.

The second two functions, broadcast and writeEvent, are how we relay the messages to the browser client as events. There are some restrictions on the content and formatting of these events, so please do read up on them in the Resources section below.

Finally, we provide a method to cleanup connections. This function is called when a client closes its connection.

The Client Side

For this simple example, all we do is dump the event content to the page body. The key component in server-sent events is the EventSource object, which is constructed with the path to the event resource we created on our server above. This object's onmessage attribute is given a pointer to an anonymous function that writes the data to the DOM. In a real application, you could do almost anything with the data, especially if you were receiving a JSON string which could be parsed into complex objects.

<html>
  <head>
  </head>
  <body>
    <h1>Server Sent Events
    <p>Received these events from RabbitMQ: </p>
    <script>
      var source = new EventSource('/events');
      source.onmessage = function(e) {
        document.body.innerHTML += ('<p>' + e.data + '</p>');
      };
    </script>
  </body>
</html>        
        

 

Once again, I have been surprised at the ease with which a relatively complex set of behaviors can be accomplished using elements of the HTML 5 and Node.js ecosystem!

Resources:


comments powered by Disqus