強まっていこう

あっちゃこっちゃへ強まっていくためのブログです。

Rabbit MQ で RPC(チュートリアル 6 RPC)

今回はチュートリアル 6 RPC についてやっていきます。

まずは RPCのサーバ側です。引数で受けた文字列に日時をつけて返します。

rpc_server.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertQueue(a.queue, {
    durable: false
  });
  ch.prefetch(1);
  console.log('Waiting queue=%s', a.queue);
  await ch.consume(a.queue,
    req => {
      var msg = req.content.toString();
      var res = `${msg}(${new Date()})`;
      console.log('Response: %s', res);
      ch.sendToQueue(req.properties.replyTo, 
        Buffer.from(res), {
          correlationId: req.properties.correlationId
        }
      );
      ch.ack(req);
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    queue: process.argv[2],
  };
}

function usage() {
  const usage = `rpc_server.js QUEUE_NAME`;
  console.log(usage);
  process.exit();
}
await ch.consume(a.queue,
  req => {
    var msg = req.content.toString();
    var res = `${msg}(${new Date()})`;
    console.log('Response: %s', res);
    ch.sendToQueue(req.properties.replyTo, 
      Buffer.from(res), {
        correlationId: req.properties.correlationId
      }
    );
    ch.ack(req);
  }
);

req.properties.replyTo は、Caller 側に返すための Queue です。
correlationId は RPC のやり取り上で使う一意の ID です。Caller 側で作ります。

次に Caller 側です。


rpc.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  const correlationId = genUUID();
  const q = await ch.assertQueue('', { exclusive: true });
  await ch.sendToQueue(a.queue, Buffer.from(a.msg), {
    correlationId: correlationId,
    replyTo: q.queue
  });
  ch.consume(q.queue, async res => {
    if (res.properties.correlationId == correlationId) {
      console.log('Response: %s', res.content.toString());
      await ch.close();
      await conn.close();
    }
  },
  {
    noAck: true
  });
  console.log('Request queue=%s: %s', a.queue, a.msg);
};
main();

function genUUID() {
  return Math.random().toString() + Math.random().toString() + Math.random().toString();
}

function getArgs() {
  if (process.argv.length < 4) { usage(); }
  return {
    queue: process.argv[2],
    msg: process.argv.slice(3).join(' '),
  };
}

function usage() {
  const usage = `rpc.js QUEUE_NAME MESSAGE`;
  console.log(usage);
  process.exit();
}
const correlationId = genUUID();

一意の ID を作ります。ここでは適当にランダムで作っています。一意になるかどうかは運次第です。

await ch.sendToQueue(a.queue, Buffer.from(a.msg), {
  correlationId: correlationId,
  replyTo: q.queue
});

これで、RPC を call します。a.msg が引数にあたります。
replyTo は rpc_server 側で結果を詰め込んで返すための Queue です。

ch.consume(q.queue, async res => {
  if (res.properties.correlationId == correlationId) {
    console.log('Response: %s', res.content.toString());
    await ch.close();
    await conn.close();
  }
},


結果を待ち受けます。correlationId が一致するかどうかで結果がちゃんと自分宛てのものかチェックします。

./rpc_server.js rpc
./rpc.js rpc hoge

これで RPC の結果が返ることがわかるかと思います。

で、ここまでやっといてなんですがこの RPC は使わない方が良いです。
RPC の仕組みとしては下の下かと。いちいち MQ を経由してやり取りしているためかなり遅いですし、オススメしません。
他の RPC の仕組みを使った方が確実に幸せになれます。

最後のエントリーがかなりいまいちなので、次回、RabbitMQ をちょっとだけ便利にするコマンドを作ったのでそれを紹介して RabbitMQ シリーズは完とします。