強まっていこう

あっちゃこっちゃへ強まっていくためのブログです。

RabbitMQ で Topics (チュートリアル 5 Topics)

今回はチュートリアル 5 Topics をやっていきます。Topic と言ったって単にワイルドカードを使った部分一致で待ち受けできますよ、と言うだけです。

では早速 Receiver から。

sub_t.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertExchange(a.exchange, 'topic', {
    durable: false
  });
  const q = await ch.assertQueue('', {
    exclusive: true
  });
  a.routingKeys.split(',').forEach(async routingKey => {
    await ch.bindQueue(q.queue, a.exchange, routingKey);
  });
  console.log('Waiting exchange=%s, routing_keys=%s', a.exchange, a.routingKeys);
  await ch.consume(q.queue, 
    msg => {
      console.log('Received: %s', msg.content.toString());
    },
    {
      noAck: true
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 4) { usage(); }
  return {
    exchange: process.argv[2],
    routingKeys: process.argv[3]
  };
}

function usage() {
  const usage = `sub_r.js EXCHANGE_NAME ROUTING_KEYS(CSV)`;
  console.log(usage);
  process.exit();
}
await ch.assertExchange(a.exchange, 'topic', {
  durable: false
});

Exchange をタイプ topic で定義します。前回のコードとの違いはこれだけです。

Sender 側は前回のコードと何も変わりません。


pub_t.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.publish(a.exchange, a.routingKey, Buffer.from(a.msg));
  console.log('Published exchange=%s, routing_key: %s', a.exchange, a.routingKey, a.msg);
  await ch.close();
  await conn.close();
};
main();

function getArgs() {
  if (process.argv.length < 5) { usage(); }
  return {
    exchange: process.argv[2],
    routingKey: process.argv[3],
    msg: process.argv.slice(3).join(' '),
  };
}

function usage() {
  const usage = `pub_t.js EXCHANGE_NAME ROUTING_KEY MESSAGE`;
  console.log(usage);
  process.exit();
}
./sub_t.js topic aaa.*
./sub_t.js topic aaa.bbb.*

と2つ Receiver を立ち上げ

./pub_t.js topic aaa hoge
./pub_t.js topic aaa.bbb hoge
./pub_t.js topic aaa.ccc hoge

と Message を送ると aaa.* には aaa.bbb と aaa.ccc が、aaa.bbb.* には何も届きません。

ワイルドカードでそこに何かしらの文字列が必要であることを表します。

では次に * を # に変えて実行してみてください。

./sub_t.js topic aaa.#
./sub_t.js topic aaa.bbb.#

aaa.# には aaa、aaa.bbb、aaa.ccc のメッセージがすべて届きます。aaa.bbb.# には aaa.bbb のみが届きます。

と違い # はそこに文字があってもなくてもマッチするようになります。

Topic を使うと部分一致での待受ができることがよくわかるかと思います。

次回は、本家チュートリアル6 RabbitMQ を用いた RPC の実現方法を述べていこうかと思っています。