強まっていこう

あっちゃこっちゃへ強まっていくためのブログです。

RabbitMQ でメッセージ送受信(チュートリアル1 "Hello World!")

RabbitMQ を仕事で使う必要があり使い始めたんですがこれがまた分かり辛い。むちゃくちゃ単純な代物なのにやたらと難しくする病気を患っているシリーズです。
自分は Node.js から使うんですが、本家チュートリアルのコードがコールバック地獄コードだったり、
他の人が書いているものもPromise 地獄コードだったりしてどうも具合が悪かったり。

同じように困っている人がいるのでは?と思いましてキーを打ち始めたわけです。一助となればと。

では、まず主要な用語を説明します。これがコアなのでしっかり把握してください。

Producer: 送信者(以後 Sender)
Consumer: 受信者(以後 Receiver)
Message: 送受信するメッセージ
Queue: Message を入れるものでラベルを付ける事が出来きそのラベルを使った送受信が可能
Exchange: Queueを入れるもので送受信の方法を切り替える事が出来る
     Queue同様ラベルを付ける事が可能で OR や部分一致での受信が可能になる

Sender がクライアントで Receiver がサーバ的なノリです。
Queue と Exchange の関係が最初はわけがわからんと思います。そらそうです。本来であれば別に分けなくても良いですから・・・。不毛に複雑化してますが我慢するしか無いです。

まずは一番単純な Queue に Message を詰め込んで送る方法を説明しましょう。本家サイトのチュートリアル1に相当します。
コードは JS です。

まず npm で必要なライブラリを入れます。

npm i amqplib -S

Receiver 側のコードが以下になります。

recv.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertQueue(a.queue, {
    durable: false
  });
  console.log('Waiting queue=%s', a.queue);
  await ch.consume(a.queue,
    msg => {
      console.log('Received: %s', msg.content.toString());
    },
    {
      noAck: true
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    queue: process.argv[2],
  };
}

function usage() {
  const usage = `recv.js QUEUE_NAME`;
  console.log(usage);
  process.exit();
}

このコマンドは QUEUE_NAME を引数で受けてメッセージを待ち受けます。

recv.js QUEUE_NAME

コードの説明をしていきます。


my.js にはコンフィグとエラー用のメッセージ関数が入っています。

my.js

module.exports = {
  conf: {
    protocol: 'amqp',
    hostname: 'localhost',
    port: 5672,
    username: 'guest',
    password: 'guest',
  },
  die: e => {
    console.error(e);  
    process.exit(1);
  }
};
const a = getArgs();
const conn = await amqp.connect(my.conf).catch(my.die);
const ch = await conn.createChannel();

getArgs はコマンド引数を返します。
amqp.connect(my.conf) で Connection をはって conn.createChannel() で Channel を作ります。
この Channnel ですべてのやり取りをします。

await ch.assertQueue(a.queue, {
  durable: false
});

Message の入れ物である Queue を作成します。a.queue が Queue の名前になります。

durable は RabbitMQ を再起動しても Message を存続させるかどうかのフラグです。後で詳しく説明します。ここでは false にしているので Message は残りません。

ただ、何も指定しない場合 Queue 自体は RabbitMQ が落ちても残ります。存在する Queue の設定を変更する事はできません。なので durable を true にしてみようとするとすでに存在する Queue がある、と言ってエラーになります。再定義したい場合は Queue を消す必要があります。Queue の一覧表示と削除は以下で可能です。

sudo rabbitmqadmin list queues
sudo rabbitmqadmin delete queue name=QUEUE_NAME

durable: false と共に autoDelete を指定するとQueue は Receiver を落とすと共に消えるので色々試す際楽かもしれません。

await ch.consume(a.queue,
  msg => {
    console.log('Received: %s', msg.content.toString());
  },
  {
    noAck: true
  }
);

これで a.queue 宛の Message を待ち受けます。noAck: true は処理の失敗成功を RabbitMQ に返さない宣言です。後に詳しく説明します。

次はSender 側のコード。

send.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.sendToQueue(a.queue, Buffer.from(a.msg));
  console.log('Sent queue=%s: %s', a.queue, a.msg);
  await ch.close();
  await conn.close();
};
main();

function getArgs() {
  if (process.argv.length < 4) { usage(); }
  return {
    queue: process.argv[2],
    msg: process.argv.slice(3).join(' '),
  };
}

function usage() {
  const usage = `send.js QUEUE_NAME MESSAGE`;
  console.log(usage);
  process.exit();
}
send.js QUEUE_NAME MESSAGE

QUEUE_NAME に向けてMESSAGE を送信します。

本家チュートリアルでは送信側でも Queue を定義していますがいろいろ試す際に邪魔になるので Receiver だけで定義した方が良いです。

await ch.sendToQueue(a.queue, Buffer.from(a.msg));
console.log('Sent queue=%s: %s', a.queue, a.msg);
await ch.close();
await conn.close();

この部分で a.queue に向かって a.msg を送信します。送信が終わったら Channel と Connection を close します。

後は、これらのコマンドに対して実行ビットを立てて

./recv.js hoge

で Receiver を立ち上げて

./send.js hoge HOGEHOGE

とやるとメッセージが送信されるのが解ると思います。Receiver は複数立ち上げる事ができます。複数立ち上げて Message を送信しまくるとラウンドロビンされる事がわかると思います。

今回は一番単純なメッセージ送受信を行いました。次回はタスク実行を行ってみたいと思います。