Node.js と amqp を組み合わせる

サーバサイドからのクライアント(ブラウザ)に push するのを色々調べていた。
最初 TwitterFacebook がやっているように、Transfer-Encoding: chunked を試していて、Node.js を使ったら chunked なデータを送信するのは割と簡単にできた。
が、ある程度データを流しているとバッファがたまったり、コネクションが切れたりした場合の対応を一々書くのが煩わしくなったので、みんな大好き socket.io を使ってみる事にした。


他のサーバからデータを push したい場合があるので、amqp を使って連携してみた。
amqp のライブラリは node-amqp を使用。

var amqp = require('amqp'),
    io = require('socket.io'),
    express = require('express'),
    ejs = require('ejs');

var app = express.createServer();
var port = 3000;
app.configure(function () {
  app.use(express.static(__dirname + '/static'));
  app.set('view options', {layout: false})
});

app.get('/', function(req, res) {
  res.render('index.ejs', {locals: {port: port}});
});

app.listen(port);
console.log('Express server listening on port %d', app.address().port)

var options = {};
var connection = amqp.createConnection(options);
connection.addListener('error', function (e) {
  throw e;
})
connection.addListener('close', function (e) {
  console.log('connection closed.');
});

var socket = io.listen(app);
connection.addListener('ready', function () {
  console.log('connected to ' + connection.serverProperties.product);
  var exchange = connection.exchange('node-direct-exchange', {type: 'direct'});
  var q = connection.queue('my_queue', function () {
    //q.bind("#"); // # の場合全ての direct を受けれる?
    q.bind(exchange, 'feed');
    q.on('queueBindOk', function () {
      socket.on('connection', function (client) {
        client.on('disconnect', function () {});
        client.on('message', function (msg) {
          client.send(msg);
          client.broadcast(msg);   
        });
  
        q.subscribe({routingKeyInPayload: true}, function (msg) {
          console.log(msg.message);
          //var message = JSON.stringify(msg.message);
          var message = msg.message;

          client.send(message);
          client.broadcast(message);
        });
      });
    });
  });
});

express と ejs を使ったけどそこら辺は割愛。


クライアント側

$(function() {
  var socket = new io.Socket(null, {port: 3000});
  socket.connect();
  socket.on('connect', function() {
    console.log('connect');
  });
  socket.on('message', function(msg) {
    $('#list').prepend($(msg));
  });
  socket.on('disconnect', function(){
    console.log('disconnect');
  });
});

amqp の publisher は Perl の Net::RabbitMQ を使用。

#!/usr/bin/env perl
use strict;
use warnings;
use Data::Dumper;
use JSON qw/encode_json/;
use Encode;
use Net::RabbitMQ;

my $mq = Net::RabbitMQ->new;
# localhost にしてるけど、他のサーバだと当然そのサーバの IP になる
$mq->connect('localhost', { user => 'guest', password => 'guest'});
$mq->channel_open(1);
   
my $data = encode_json({message => Encode::decode_utf8('こんにちはこんにちは!')});
$mq->publish(1, 'my_queue', $data, {}, {content_type => 'application/json'});

node-amqp が癖があって色々ハマった…。
content_type を付けないと buffer なデータとして処理されるので適切なものを付ける。


Node.js がイベント駆動なので、こういうのには凄く向いていると思う。


参考文献:
node.jsでchatアプリっぽいもの作るメモ - すぎゃーんメモ