強まっていこう

あっちゃこっちゃへ強まっていくためのブログです。

RabbitMQ でタスク処理(チュートリアル2 Work queues)

前回、単純なメッセージ送信を行いました。今回は本家チュートリアル 2 Work queues にあたる、タスク処理を行ってみます。タスク処理で大事なのはきちんとタスクが完遂されることです。なので、もし処理途中でプロセスが死んだ場合、他のプロセスがタスク処理を引き受ける必要があります。以下のコードでそれが実現できます。

worker.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

// worker の処理が正常終了した場合 ack でmsg を投げるとMQ 側からキューが消える
// それが行われずプロセスが死んだら他のワーカーがそのキューを処理する
async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertQueue(a.queue, {
    durable: true
  });
  console.log('Waiting queue=%s', a.queue);
  ch.prefetch(1);
  await ch.consume(a.queue,
    task => {
      var secs = task.content.toString().split('.').length - 1;
      console.log('Received: %s', task.content.toString());
      setTimeout(() => {
        console.log('Done');
        ch.ack(task);
      }, secs * 1000);
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    queue: process.argv[2],
  };
}

function usage() {
  const usage = `worker.js QUEUE_NAME`;
  console.log(usage);
  process.exit();
}
await ch.assertQueue(a.queue, {
  durable: true
});

durable: true にするのは RabbitMQ が死んだ時でも残る Queue にするためです。
durable: false が指定された Queue がすでに存在する場合はその Queue を消す必要があります。

ch.prefetch(1);

ワーカーが一つ前のメッセージを処理し、成功を返すまで、ワーカーに新しいメッセージを送出しなくなります。
要するにビジー状態を作り出し他に効率良く作業を振るようになるわけです。

await ch.consume(a.queue,
  task => {
    var secs = task.content.toString().split('.').length - 1;
    console.log('Received: %s', task.content.toString());
    setTimeout(() => {
      console.log('Done');
      ch.ack(task);
    }, secs * 1000);
  }
);

前回の例では noAck: true を指定していましたが、今回は指定しません。ようするに noAck: false と言う事です。
noAck を false にし、RabbitMQ 側に処理成功を通知します。それが

ch.ack(task);

の部分です。

この worker.js は ... を受け取りこのドットの1つあたり 1秒 とし、その秒数分処理を wait させるコマンドで、処理を途中で殺してちゃんと処理が引き継がれるかをテストしやすくするためのコマンドになります。

次に、task を投げ込む側のコマンドです。

task.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.sendToQueue(a.queue, Buffer.from(a.task), {
    persistent: true
  });
  console.log('Sent queue=%s: %s', a.queue, a.task);
  await ch.close();
  await conn.close();
};
main();

function getArgs() {
  if (process.argv.length < 4) { usage(); }
  return {
    queue: process.argv[2],
    task: process.argv.slice(3).join(' '),
  };
}

function usage() {
  const usage = `task.js QUEUE_NAME TASK`;
  console.log(usage);
  process.exit();
}
await ch.sendToQueue(a.queue, Buffer.from(a.task), {
  persistent: true
});

persistent: true にするとRabbitMQ がダウンしても Message が残るようになります。基本 RabbitMQ は Message をメモリに置いているのですが persistent: true にすると disk に書き込むようになります(なのでちょっと遅くはなる)。重要な task 等は true にすると良いでしょう。ただし、絶対に消えない、と言うわけではありません(RabbitMQ が disk sync 成功前に死ぬと当然アウト)。
Message の永続化は Receiver 側で durable: true にするだけでは駄目で、Sender 側での persistent: true の指定も必要になります。
Sender側が persistent: true と指定していても、Receiver 側で durable: false にしていると RabbitMQ が死んだ場合 Message は破棄されます。

さて、では実際動作を見る方法ですが

./worker.js task

で Receiver を起動し

./task.js task ..........

で、task を送り、10秒ほど終了処理を wait させた状態で RabbitMQ サーバを落とします。そうすると worker.js が処理終了時にエラーで死にます。

その後 RabbitMQ を起動し、worker.js を起動してください。処理が続行される事がわかると思います。

durable: false にしたりしてその違いを確かめると良いかもしれません。

ちなみに、RabbitMQ 停止・起動は Ubuntu の場合

sudo systemctl stop rabbitmq-server
sudo systemctl start rabbitmq-server

です。

次回は、本家チュートリアル3 相当となる、Pub/Sub に移りたいと思います。