Software Consulting

Event Collection with Node.js

In the past few years I've worked on several projects that shared a similar requirement. In each case, remote systems needed to communicate events to some central processing engine for aggregation and long-term storage. Often, support for thousands of such events per second is required. In most cases, some form of data collection was implemented based on a REST-like HTTP service such that POST or GET requests were used to forward data via a cluster of web servers. It is interesting to contemplate the many uses of this pattern:

Like the original Java developers of some of the systems I have encountered, when confronted with such a requirement I would be tempted to bring out a copy of Jetty or Tomcat and whip up a quick Java web app to handle the inbound HTTP requests; But recently I've been playing around with Node.js and Express to create simple, standalone web applications. I am a complete Node.js neophyte, but I am pretty impressed with it so far. As much as I loves me my Java, doing simple HTTP-powered services in Jetty or Tomcat still takes quite a bit of code. Projects like Drop Wizard improve upon the situation a great deal, and even layer on data-center features like health checks and JMX monitoring. But sometimes less is more, and to this end, I set out to capture three flavors of "Event Collection" use-case using Node.js.

Note that this is not a node.js tutorial, so you'll want to have at least a cursory understanding of node and npm (the node package manager) before proceeding (see resources below). Please also note that the sample code has been stripped down to illustrate specific points, so more work would be needed before any of these samples were production ready.

The Use Case

We want to create a generic event capture mechanism that receives events in the form of an HTTP POST. The path portion of the URL identifies the event-type and the body of the request is expected to be JSON, representing the event data. The event collector is a front-line application that must do as little as possible with each event, reserving its resources to process as many events as possible (i.e. favor throughput). It is understood that some downstream back end system is actually processing the events.

Option #1 Capturing Events and saving to MySql

This is the simplest thing that could possibly work, and if extreme scale is not really needed, this might just do the trick. In this example, the event is simply inserted into a database table. For this case, I had no doubts that node.js would rock this case, as Mysql support is ubiquitous. As expected, this was the easiest of the three cases and needed the least code to prototype:

{
  "name" : "EventsToMySql",
  "version" : "0.0.1",
  "dependencies" : {
    "express" : "3.5.0",
    "mysql" : "2.1.0"
  }
}
        

Our project requires Express and MySql.

var express = require('express');
var mysql      = require('mysql');
var pool = mysql.createPool({
  host     : 'localhost',
  user     : 'root',
  database : 'test'
});

app = express();
app.use(express.bodyParser());

app.post('/api/event/:eventType', function(req, res) {
  pool.getConnection(function(err, connection) { 
    if (err) {
      console.log('ERROR: Unable to get connection due to ' + err.message);
    } else {
      connection.query('INSERT INTO stats (data_event, payload, submission_date) values (?, ?,?)',
        [req.params.eventType, JSON.stringify(req.body), new Date()], function(err, result) {
        if (err) {
          res.send(500, err);
        } else {
          res.send(200, 'Event is queued...');
        }
        connection.release();
      });
    }
  });
});

app.listen(8083);
        

Of special note is the use of a variable as part of the path :eventType becomes a parameter containing our event type, allowing for an open-ended number of events to be handled by this same block. A straightforward SQL insert is used to store the event data. JSON.stringify is used because the Express body-parser has processed the JSON payload of the post into a Javascript object. Normally we would do some validation and error handling as well.

Option #2 Capturing Events and queuing with RabbitMq

Now I wanted to see if I could optimize throughput using my favorite rodent-themed message broker. RabbitMq is pretty popular, so I felt fairly sure that NPM would reveal an existing implementation, and I was not disappointed. In this case, the goal was to enqueue the event on an AMQP Exchange. In the "real world", a downstream consumer would receive and process the events. This nicely decouples event reception from processing, and if using durable queues with server-confirms, should result in very little likelihood of event loss during failures.

{
  "name" : "EventsToRabbit",
  "version" : "0.0.1",
  "dependencies" : {
    "express" : "3.5.0",
    "amqp" : "https://github.com/postwait/node-amqp/tarball/master"
  }
}
        

In developing this example, I discovered that there is a bug in the server confirms in the release version of the amqp package, this issue is fixed in the main codeline, hence the git url in place of a release version.

var express = require('express');

var amqp = require('amqp'); 
var exchange;

var connection = amqp.createConnection({ 
    host: 'localhost', 
    port: 1572, 
    login: 'guest', 
    password: 'guest', 
    authMechanism: 'AMQPLAIN', 
    vhost: '/', 
    ssl: { enabled : false }
});

connection.addListener('ready', function(){
    console.log('Connection is ready');
    // Once the connection is ready, we declare our exchange, in this example
    // we activate server confirms
    connection.exchange('org.wjb.ha.TestExchange',
         {'type':'topic','autoDelete':false,'durable':true,'confirm':true}, 
          function(exch){
             console.log('Exchange \'' + exch.name + '\' is open');
             exchange = exch;        
          });
});

app = express();
app.use(express.bodyParser());

app.post('/api/event/:eventType', function(req, res) {
  if (exchange) {
    // In this example, we use message-headers to include the 'event type'.
    // (Only include the callback function when server confirms are active)
    exchange.publish('', req.body, {'headers':{'eventType':req.params.eventType}}, function(hasError){
      if (hasError) {
        res.send(500, 'Message not confirmed by broker');
      } else {
        res.send('Message is queued...');
      }
    });
  } else {
    res.send(500, 'Exchange not initialized');
  }
});

app.listen(8083);
        

As you can see, there is a little more code required for enqueuing our events on RabbitMq, but not much, pretty sweet!

Option #3 Capturing Events and queuing with Kafka

I've only just started playing around with Kafka. It is definitely more complex to set up and work with than RabbitMq, but its distributed and partitioned design promises massive throughput. I thought it would be unlikely that I would see a pre-built client for Kafka in NPM, but how wrong I was. I used the kafka-node package to quickly hook up my event collector to a Kafka topic. I included a quick and dirty example of using the event-type to select a Kafka partition.

{
  "name" : "EventsToKafka",
  "version" : "0.0.1",
  "dependencies" : {
    "express" : "3.5.0",
    "kafka-node" : "0.1.2"
  }
}
        
var express = require('express');
var kafka = require('kafka-node');
// Use Zookeeper connect String in Client:
var client = new kafka.Client('localhost:2181','test-client');
var producer = new kafka.Producer(client);
producer.addListener('ready', function () {
    console.log('Kafka producer is ready');
});

app = express();
app.use(express.bodyParser());

app.post('/api/event/:eventType', function(req, res) {
    if(producer) {
        // In this example, we add the 'event type' to the message itself, as
        // our kafka protocol doesn't have explicit headers as in AMQP
        req.body.eventType = req.params.eventType;

        // Use the event type to choose the partition (if there was more than one)
        hash_key = req.body.eventType;
        part = getPartition(hash_key);

        payload = [
            { topic: 'test', messages: JSON.stringify(req.body), partition: part}
        ];
        producer.send(payload, function (err, data) {
            if (err) {
                res.send(500, err);
            } else {
                res.send(200, 'Message is queued...');
            }
        });
        
    } else {
        res.send(500, 'Producer is not initialized');
    }
});

app.listen(8083);

var numPartitions = 1;

function getPartition(key) {
    if (key) {
        return hashString(key) % numPartitions;
    } else {
        return 0;
    }
}

function hashString(str){
    var hash = 0;
    if (str.length == 0) return hash;
    for (i = 0; i < str.length; i++) {
        char = str.charCodeAt(i);
        hash = ((hash<<5)-hash)+char;
        hash = hash & hash; 
    }
    return hash;
}
        

The Kafka example takes the most code, but if you are trying to scale massively, is likely a good way to go. Even with the added complexity, it was still pretty simple to get something up and running quickly. More proof of the greate community surrounding Node.js!

Resources:


comments powered by Disqus