強まっていこう

あっちゃこっちゃへ強まっていくためのブログです。

Rabbit MQ で RPC(チュートリアル 6 RPC)

今回はチュートリアル 6 RPC についてやっていきます。

まずは RPCのサーバ側です。引数で受けた文字列に日時をつけて返します。

rpc_server.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertQueue(a.queue, {
    durable: false
  });
  ch.prefetch(1);
  console.log('Waiting queue=%s', a.queue);
  await ch.consume(a.queue,
    req => {
      var msg = req.content.toString();
      var res = `${msg}(${new Date()})`;
      console.log('Response: %s', res);
      ch.sendToQueue(req.properties.replyTo, 
        Buffer.from(res), {
          correlationId: req.properties.correlationId
        }
      );
      ch.ack(req);
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    queue: process.argv[2],
  };
}

function usage() {
  const usage = `rpc_server.js QUEUE_NAME`;
  console.log(usage);
  process.exit();
}
await ch.consume(a.queue,
  req => {
    var msg = req.content.toString();
    var res = `${msg}(${new Date()})`;
    console.log('Response: %s', res);
    ch.sendToQueue(req.properties.replyTo, 
      Buffer.from(res), {
        correlationId: req.properties.correlationId
      }
    );
    ch.ack(req);
  }
);

req.properties.replyTo は、Caller 側に返すための Queue です。
correlationId は RPC のやり取り上で使う一意の ID です。Caller 側で作ります。

次に Caller 側です。

続きを読む

RabbitMQ で Topics (チュートリアル 5 Topics)

今回はチュートリアル 5 Topics をやっていきます。Topic と言ったって単にワイルドカードを使った部分一致で待ち受けできますよ、と言うだけです。

では早速 Receiver から。

sub_t.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertExchange(a.exchange, 'topic', {
    durable: false
  });
  const q = await ch.assertQueue('', {
    exclusive: true
  });
  a.routingKeys.split(',').forEach(async routingKey => {
    await ch.bindQueue(q.queue, a.exchange, routingKey);
  });
  console.log('Waiting exchange=%s, routing_keys=%s', a.exchange, a.routingKeys);
  await ch.consume(q.queue, 
    msg => {
      console.log('Received: %s', msg.content.toString());
    },
    {
      noAck: true
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 4) { usage(); }
  return {
    exchange: process.argv[2],
    routingKeys: process.argv[3]
  };
}

function usage() {
  const usage = `sub_r.js EXCHANGE_NAME ROUTING_KEYS(CSV)`;
  console.log(usage);
  process.exit();
}
await ch.assertExchange(a.exchange, 'topic', {
  durable: false
});

Exchange をタイプ topic で定義します。前回のコードとの違いはこれだけです。

Sender 側は前回のコードと何も変わりません。

続きを読む

RabbitMQ で Routing (チュートリアル4 Routing)

さて今回は本家チュートリアル 4 Routing をやってまいります。
Routing と言ったって Receiver 側で名前を OR で待ち受けれるだけと言う簡単な機能です。

pub_r.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertExchange(a.exchange, 'direct', {
    durable: false
  });
  const q = await ch.assertQueue('', {
    exclusive: true
  });
  a.routingKeys.split(',').forEach(async routingKey => {
    await ch.bindQueue(q.queue, a.exchange, routingKey);
  });
  console.log('Waiting exchange=%s, routing_keys=%s', a.exchange, a.routingKeys);
  await ch.consume(q.queue, 
    msg => {
      console.log('Received: %s', msg.content.toString());
    },
    {
      noAck: true
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 4) { usage(); }
  return {
    exchange: process.argv[2],
    routingKeys: process.argv[3]
  };
}

function usage() {
  const usage = `sub_r.js EXCHANGE_NAME ROUTING_KEYS(CSV)`;
  console.log(usage);
  process.exit();
}
await ch.assertExchange(a.exchange, 'direct', {
  durable: false
});

Exchange のタイプを direct にします。

a.routingKeys.split(',').forEach(async routingKey => {
  await ch.bindQueue(q.queue, a.exchange, routingKey);
});

bindQueue する時に、第3引数に対して待ち受ける routingKey を指定します。複数指定が可能です。

次は Sender 側。

続きを読む

RabbitMQ でPub/Sub (チュートリアル3 Publish/Subscribe)

今回は本家チュートリアル 3 Publish/Subscribe です。これは何か?を簡単に言うと、ブロードキャストです。
今までの例では Receiver 側は Sender から送られた Message をどれか一つの Receiver だけで受けていましたが、これを複数の Receiver で受ける事が可能になります。

ではまず Receiver 側です。

pub.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertExchange(a.exchange, 'fanout', {
    durable: false
  });
  const q = await ch.assertQueue('', {
    exclusive: true
  });
  await ch.bindQueue(q.queue, a.exchange, '');
  console.log('Waiting exchange=%s', a.exchange);
  await ch.consume(q.queue, 
    msg => {
      console.log('Received: %s', msg.content.toString());
    },
    {
      noAck: true
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    exchange: process.argv[2],
  };
}

function usage() {
  const usage = `sub.js EXCHANGE_NAME`;
  console.log(usage);
  process.exit();
}
await ch.assertExchange(a.exchange, 'fanout', {
  durable: false
});

Exchange を定義します。Exchange については第1回の記事で少しだけ触れましたが、ここで改めて説明し直します。
基本的には Queue と同じですが違いは送受信方法や、名前に対して部分一致、OR などで受信が可能になります。
この Exchange には Queue を格納します。durable の意味は Queue と同じです。
そして Queue と同じように RabbitMQ 側に定義が残ります。一覧と削除は以下で可能です。

続きを読む

RabbitMQ でタスク処理(チュートリアル2 Work queues)

前回、単純なメッセージ送信を行いました。今回は本家チュートリアル 2 Work queues にあたる、タスク処理を行ってみます。タスク処理で大事なのはきちんとタスクが完遂されることです。なので、もし処理途中でプロセスが死んだ場合、他のプロセスがタスク処理を引き受ける必要があります。以下のコードでそれが実現できます。

worker.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

// worker の処理が正常終了した場合 ack でmsg を投げるとMQ 側からキューが消える
// それが行われずプロセスが死んだら他のワーカーがそのキューを処理する
async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertQueue(a.queue, {
    durable: true
  });
  console.log('Waiting queue=%s', a.queue);
  ch.prefetch(1);
  await ch.consume(a.queue,
    task => {
      var secs = task.content.toString().split('.').length - 1;
      console.log('Received: %s', task.content.toString());
      setTimeout(() => {
        console.log('Done');
        ch.ack(task);
      }, secs * 1000);
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    queue: process.argv[2],
  };
}

function usage() {
  const usage = `worker.js QUEUE_NAME`;
  console.log(usage);
  process.exit();
}
await ch.assertQueue(a.queue, {
  durable: true
});

durable: true にするのは RabbitMQ が死んだ時でも残る Queue にするためです。
durable: false が指定された Queue がすでに存在する場合はその Queue を消す必要があります。

続きを読む

RabbitMQ でメッセージ送受信(チュートリアル1 "Hello World!")

RabbitMQ を仕事で使う必要があり使い始めたんですがこれがまた分かり辛い。むちゃくちゃ単純な代物なのにやたらと難しくする病気を患っているシリーズです。
自分は Node.js から使うんですが、本家チュートリアルのコードがコールバック地獄コードだったり、
他の人が書いているものもPromise 地獄コードだったりしてどうも具合が悪かったり。

同じように困っている人がいるのでは?と思いましてキーを打ち始めたわけです。一助となればと。

では、まず主要な用語を説明します。これがコアなのでしっかり把握してください。

Producer: 送信者(以後 Sender)
Consumer: 受信者(以後 Receiver)
Message: 送受信するメッセージ
Queue: Message を入れるものでラベルを付ける事が出来きそのラベルを使った送受信が可能
Exchange: Queueを入れるもので送受信の方法を切り替える事が出来る
     Queue同様ラベルを付ける事が可能で OR や部分一致での受信が可能になる

Sender がクライアントで Receiver がサーバ的なノリです。
Queue と Exchange の関係が最初はわけがわからんと思います。そらそうです。本来であれば別に分けなくても良いですから・・・。不毛に複雑化してますが我慢するしか無いです。

まずは一番単純な Queue に Message を詰め込んで送る方法を説明しましょう。本家サイトのチュートリアル1に相当します。
コードは JS です。

まず npm で必要なライブラリを入れます。

npm i amqplib -S

Receiver 側のコードが以下になります。

recv.js

#!/usr/bin/env node

const amqp = require('amqplib');
const my = require('./my.js');

async function main() {
  const a = getArgs();
  const conn = await amqp.connect(my.conf).catch(my.die);
  const ch = await conn.createChannel();
  await ch.assertQueue(a.queue, {
    durable: false
  });
  console.log('Waiting queue=%s', a.queue);
  await ch.consume(a.queue,
    msg => {
      console.log('Received: %s', msg.content.toString());
    },
    {
      noAck: true
    }
  );
};
main();

function getArgs() {
  if (process.argv.length < 3) { usage(); }
  return {
    queue: process.argv[2],
  };
}

function usage() {
  const usage = `recv.js QUEUE_NAME`;
  console.log(usage);
  process.exit();
}

このコマンドは QUEUE_NAME を引数で受けてメッセージを待ち受けます。

recv.js QUEUE_NAME

コードの説明をしていきます。

続きを読む

テレビの終焉 吉本騒動

ジャニーズの崩壊に続いて吉本の崩壊。テレビの歴史を作ってきた2事務所が今砕け散ろうとしている。

本日行われた岡本社長らの会見も終始保身で「おかちゃんはやめへんでー!」と自己保身ばかりで話にならない。

事の発端は宮迫氏らのウソで事務所側がすっかりタレントを信用しなくなってしまっているのもわかりはするのだがあの会見はマイナスにしか働かない。

しかし何が一番の問題かと言うとテレビ・新聞・週刊誌等のマスメディアがもはや成立しなくなっていること。

週刊誌は大手事務所が砕け散るようなスキャンダルをぶちまけ、なんとか小銭を稼ごうとしている状態。

テレビは叩かれるのを恐れて新聞・週刊誌のネタをそのまま垂れ流して小銭を稼ごうとしている状態。

これらはもはやタコの足食い。番組が潰れたり再放送出来なくなったりダメージは自分らにすべて返ってくることをわかっているのだろうか。

新聞は社員の大量リストラで既に死に体。新聞とテレビはオーナーが同じと言う世界的には稀というか、あらゆる国で禁止されていること(クロスオーナーシップ=メディア腐敗の原因の1つ)を日本はやってしまっており新聞社が受けるダメージは直接テレビも受ける。

テレビはもう何をやって良いかわからない状態だと思う。

そもそもこういった問題を四方八方から叩いて一体誰が得をするのやら。いっときの数字欲しさに致命傷を負う姿は愚か過ぎて哀れになってくる。

犯罪を犯したわけでもなく、法的には全く問題の無い人間達を犯罪者のように扱い社会的制裁を課す。法治国家としても崩壊している。

山口達也氏の問題、遡れば山本圭一氏の問題。どちらも不起訴なのにも関わらず重罪を犯した人間のような扱いには呆れ果てる。

ダメージを一番食うのは自分達メディア自身だというのが全くわかっていないことにも呆れ果てる。

吉本は自民党の仕事を受けたりネットに進出しようと試みたり様々工夫をしていたが今回の件で崩壊するだろう。

そもそもね、テレビは田中角栄の呪いの放送免許制度で局が少なすぎて見たくもないものを見せられる。でも今の世の中好きなものを好きな時間にネットで見ることができる。

だから今の子供ってテレビを全く見ない。ネット特にユーチューブしか見ない。

嫌なものでも強制的に見せられるテレビにはヘイトが集まる。ちょっとしたことで苦情があふれる。その苦情はヘイトを常々探している連中がたむろするSNSなどでさらにブーストする。SNSで広がったヘイトはテレビ局だけではなくスポンサーなどにまで波及しスポンサーの売上、テレビ局の広告収入と言う金銭面にダメージを与える。

このループが完成してしまっている。

このヘイトを放出しまくっている連中はネットでは自分の好きなものを好きな時間に見ている。だからこそ、嫌いなものを決まった時間に見せられるテレビに対するストレスは倍増する。

長引く不況、やりたい放題で無責任な官僚、自分達とお友達の事しか考えていない政治家に対してイライラが溜まっている土壌がさらに怒りを増長させている。

今回の件はテレビの本格的終焉を現していると思う。

謹慎だの契約解除の解雇だのされた芸能人はさっさとユーチューバーになれば良い。テレビがすべてと言うのは勘違いで、そこにはもう未来はないんだから。

山口達也氏も農家に住み込んでみた、みたいな感じでユーチューバーになれば良い。本来ジャニーズと対立し元SMAP3人を抱える飯島三智氏がネットタレント事務所を起こせば良いのに。

UUUM に媚びへつらっている場合じゃないと思うんだが。まぁあまりセンスが無いのかもしれないが・・・。一番ユーチューブに向いていない草なぎをユーチューバーにしてしまったし・・>・。

島田紳助あたりが戻ってきて一旗あげてくれないかな。