node-amqp: Setting up a dead letter exchange

06 Sep 2013 posted by Jaz Lalli

We’ve started to utilize Node.js a lot more over the last year at Media Ingenuity, to the point where a few of the team might even be classed as fanboys (guilty as charged). There’s so little friction to getting things done that it’s become my go to technology for trying things out or prototyping new ideas. That’s exactly the approach we took when we needed to figure out how to collect and store data about user interaction on our sites. Following a few tests and iterations, fast-forward a couple of months, and we’re now publishing data from web clients, either via websockets (nodejs & socket.io) or jsonp. That data then gets pushed to CloudAMQP, where it gets routed to the appropriate queue. From there, we have consumers (also Node processes that use node-amqp) doing the processing. At the moment, most of the data gets pumped into SQL for analysts to play with, but there’s also the potential for some cool realtime stuff.

One of the challenges with the setup described was in ensuring we don’t lose data. Whilst at first we accepted that we might lose some data while we figured out what we were doing, we’re now reliant on that data for a number of things, so we can’t afford any gaps. We tried a couple of different approaches to address that, before we settled on the current (and so far, best) solution, dead letter exchanges in RabbitMQ.

Initially, we had the queue consumers go about their business, and if anything went wrong along the way, we just dumped data into log files. We took the view that by tailing the logs, and with some simple alerts, we could react to problems and recover data from the logs if needed. In reality, it didn’t quite turn out that way. The cost of recovering data from files was ultimately too prohibitive, so we ended up just accepting the loss and moving on. Certainly not ideal in the long run.

We then implemented manual message acknowledgements in RabbitMQ. In this scenario, the queue delivers a message, but doesn’t discard it until the consumer acknowledges that it was handled successfully. Messages that are unsuccessful then need to be dealt with somehow, which is where the dead letter feature comes into play. RabbitMQ allows you to specify an exchange as the dead letter exchange for any queue on creation. When a message is then rejected, it is automatically sent to the dead letter exchange. The consumer process ends up looking something like this

var connection = amqp.createConnection({url: config.CLOUD_AMQP}, connOptions);

connection.on('ready', function() {

    // dead letter exchange for rejected messages
    connection.exchange(
        'DeadLetterExchange',
        options = {
            durable: true,
            autoDelete: false,
            type: 'topic'
        },
        function (ex) {
            // queue that'll receive all dead-lettered messages
            connection.queue('DeadLetterQueue',
                options = {durable: true, autoDelete: false},
                function (q) {
                    q.bind(ex, '#');
                }
            );
        }
    );

    // primary exchange
    connection.exchange(
        config.EXCHANGE_NAME,
        options = {durable: true, autoDelete: false, type: 'fanout'},
        function (ex) {
            connection.queue(
                config.QUEUE_NAME,
                options = {
                    durable: true,
                    autoDelete: false,
                    closeChannelOnUnsubscribe: true,
                    arguments: {
                        'x-dead-letter-exchange': 'DeadLetterExchange'
                    }
                },
                function (q) {
                    q.bind(ex, '#');
                    
                    // define my message handler
                    q.subscribe(options = {ack: true}, function (payload, headers, info, msg) {	                    	
                    	// HANDLE THE MESSAGE THEN...
                    	if (success) {
                            msg.acknowledge();
                        } else {
                            msg.reject(false); // reject=true, requeue=false causes dead-lettering
                        }
                    });
                }
            );
        }
    );
});

A couple of points on the setup above

  • I chose a topic exchange for the dead letter exchange, which allows multiple queues to bind to it using specific binding key patterns. I felt this offered a bit more flexible in the long run over a direct exchange.

  • I couldn’t get acknowledgments to work using queue.shift() or queue.shift(true, false), so chose the method above based on this thread. I haven’t looked into it any further, as the code above is working, but it’s one for the future perhaps.

  • In general, there didn’t seem to be many docs or articles that talked about achieving this using node-amqp, so I was flying blind a little. Therefore, I make no claim that this is the way to do it. It’s working for us as it stands, but if anyone has used an alternative approach, we’d love to hear about it.