強まっていこう

あっちゃこっちゃへ強まっていくためのブログです。

RabbitMQ でPub/Sub (チュートリアル3 Publish/Subscribe)

今回は本家チュートリアル 3 Publish/Subscribe です。これは何か?を簡単に言うと、ブロードキャストです。
今までの例では Receiver 側は Sender から送られた Message をどれか一つの Receiver だけで受けていましたが、これを複数の Receiver で受ける事が可能になります。

ではまず Receiver 側です。

pub.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertExchange(a.exchange, 'fanout', {
    durable: false
  });
  const q = await ch.assertQueue('', {
    exclusive: true
  });
  await ch.bindQueue(q.queue, a.exchange, '');
  console.log('Waiting exchange=%s', a.exchange);
  await ch.consume(q.queue, 
    msg => {
      console.log('Received: %s', msg.content.toString());
    },
    {
      noAck: true
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    exchange: process.argv[2],
  };
}

function usage() {
  const usage = `sub.js EXCHANGE_NAME`;
  console.log(usage);
  process.exit();
}
await ch.assertExchange(a.exchange, 'fanout', {
  durable: false
});

Exchange を定義します。Exchange については第1回の記事で少しだけ触れましたが、ここで改めて説明し直します。
基本的には Queue と同じですが違いは送受信方法や、名前に対して部分一致、OR などで受信が可能になります。
この Exchange には Queue を格納します。durable の意味は Queue と同じです。
そして Queue と同じように RabbitMQ 側に定義が残ります。一覧と削除は以下で可能です。

sudo rabbitmqadmin list exchanges
sudo rabbitmqadmin delete exchange name=EXCHANGE_NAME

fanout はブロードキャストです。

const q = await ch.assertQueue('', {
  exclusive: true
});
await ch.bindQueue(q.queue, a.exchange, '');

Queue を定義しますが、名前を空にしています。空を与えると RabbitMQ がランダムユニークな Queue を作成してくれます。
exclusive: true とすると、作成した Queue をこの Receiver で独占します。他の Receiver で受けようとするとエラーになります。
そして、Receiver が落ちると Queue は RabbitMQ 上から削除されます。
自動削除だけであれば autoDelete: true でも同じ事ができますが、ブロードキャストなので、
複数で同じ Queue を待ち受ける必要が無いため exclusive を指定をします。

bindQueue で Exchange と Queue を結びつけます。

次に Sender 側です。

sub.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.publish(a.exchange, '', Buffer.from(a.msg));
  console.log('Published exchange=%s: %s', a.exchange, a.msg);
  await ch.close();
  await conn.close();
};
main();

function getArgs() {
  if (process.argv.length < 4) { usage(); }
  return {
    exchange: process.argv[2],
    msg: process.argv.slice(3).join(' '),
  };
}

function usage() {
  const usage = `pub.js EXCHANGE_NAME MESSAGE`;
  console.log(usage);
  process.exit();
}
await ch.publish(a.exchange, '', Buffer.from(a.msg));

送信側は Exchange に対して publish します。

./sub.sh pubsub
./sub.sh pubsub

と複数の Receiver を立ち上げ

./pub.sh pubsub HOGEHOGE

で Message を送信してみると立ち上げた Receiver 全てに Message が送信される事がわかると思います。

次回は、本家チュートリアル4 相当となる、Routing に移りたいと思います。
Routing と言うとわけがわかりませんが、Receiver 側が名前を OR 指定できるだけだと思えば OK です。