今回は本家チュートリアル 3 Publish/Subscribe です。これは何か?を簡単に言うと、ブロードキャストです。
今までの例では Receiver 側は Sender から送られた Message をどれか一つの Receiver だけで受けていましたが、これを複数の Receiver で受ける事が可能になります。
ではまず Receiver 側です。
pub.js
#!/usr/bin/env node const amqp = require('amqplib'); const my = require('./my.js'); async function main() { const a = getArgs(); const conn = await amqp.connect(my.conf).catch(my.die); const ch = await conn.createChannel(); await ch.assertExchange(a.exchange, 'fanout', { durable: false }); const q = await ch.assertQueue('', { exclusive: true }); await ch.bindQueue(q.queue, a.exchange, ''); console.log('Waiting exchange=%s', a.exchange); await ch.consume(q.queue, msg => { console.log('Received: %s', msg.content.toString()); }, { noAck: true } ); }; main(); function getArgs() { if (process.argv.length < 3) { usage(); } return { exchange: process.argv[2], }; } function usage() { const usage = `sub.js EXCHANGE_NAME`; console.log(usage); process.exit(); }
await ch.assertExchange(a.exchange, 'fanout', { durable: false });
Exchange を定義します。Exchange については第1回の記事で少しだけ触れましたが、ここで改めて説明し直します。
基本的には Queue と同じですが違いは送受信方法や、名前に対して部分一致、OR などで受信が可能になります。
この Exchange には Queue を格納します。durable の意味は Queue と同じです。
そして Queue と同じように RabbitMQ 側に定義が残ります。一覧と削除は以下で可能です。