今回は本家チュートリアル 3 Publish/Subscribe です。これは何か?を簡単に言うと、ブロードキャストです。
今までの例では Receiver 側は Sender から送られた Message をどれか一つの Receiver だけで受けていましたが、これを複数の Receiver で受ける事が可能になります。
ではまず Receiver 側です。
pub.js
#!/usr/bin/env node const amqp = require('amqplib'); const my = require('./my.js'); async function main() { const a = getArgs(); const conn = await amqp.connect(my.conf).catch(my.die); const ch = await conn.createChannel(); await ch.assertExchange(a.exchange, 'fanout', { durable: false }); const q = await ch.assertQueue('', { exclusive: true }); await ch.bindQueue(q.queue, a.exchange, ''); console.log('Waiting exchange=%s', a.exchange); await ch.consume(q.queue, msg => { console.log('Received: %s', msg.content.toString()); }, { noAck: true } ); }; main(); function getArgs() { if (process.argv.length < 3) { usage(); } return { exchange: process.argv[2], }; } function usage() { const usage = `sub.js EXCHANGE_NAME`; console.log(usage); process.exit(); }
await ch.assertExchange(a.exchange, 'fanout', { durable: false });
Exchange を定義します。Exchange については第1回の記事で少しだけ触れましたが、ここで改めて説明し直します。
基本的には Queue と同じですが違いは送受信方法や、名前に対して部分一致、OR などで受信が可能になります。
この Exchange には Queue を格納します。durable の意味は Queue と同じです。
そして Queue と同じように RabbitMQ 側に定義が残ります。一覧と削除は以下で可能です。
sudo rabbitmqadmin list exchanges
sudo rabbitmqadmin delete exchange name=EXCHANGE_NAME
fanout はブロードキャストです。
const q = await ch.assertQueue('', { exclusive: true }); await ch.bindQueue(q.queue, a.exchange, '');
Queue を定義しますが、名前を空にしています。空を与えると RabbitMQ がランダムユニークな Queue を作成してくれます。
exclusive: true とすると、作成した Queue をこの Receiver で独占します。他の Receiver で受けようとするとエラーになります。
そして、Receiver が落ちると Queue は RabbitMQ 上から削除されます。
自動削除だけであれば autoDelete: true でも同じ事ができますが、ブロードキャストなので、
複数で同じ Queue を待ち受ける必要が無いため exclusive を指定をします。
bindQueue で Exchange と Queue を結びつけます。
次に Sender 側です。
sub.js
#!/usr/bin/env node const amqp = require('amqplib'); const my = require('./my.js'); async function main() { const a = getArgs(); const conn = await amqp.connect(my.conf).catch(my.die); const ch = await conn.createChannel(); await ch.publish(a.exchange, '', Buffer.from(a.msg)); console.log('Published exchange=%s: %s', a.exchange, a.msg); await ch.close(); await conn.close(); }; main(); function getArgs() { if (process.argv.length < 4) { usage(); } return { exchange: process.argv[2], msg: process.argv.slice(3).join(' '), }; } function usage() { const usage = `pub.js EXCHANGE_NAME MESSAGE`; console.log(usage); process.exit(); }
await ch.publish(a.exchange, '', Buffer.from(a.msg));
送信側は Exchange に対して publish します。
./sub.sh pubsub
./sub.sh pubsub
と複数の Receiver を立ち上げ
./pub.sh pubsub HOGEHOGE
で Message を送信してみると立ち上げた Receiver 全てに Message が送信される事がわかると思います。
次回は、本家チュートリアル4 相当となる、Routing に移りたいと思います。
Routing と言うとわけがわかりませんが、Receiver 側が名前を OR 指定できるだけだと思えば OK です。