メモ

プログラミングなどの備忘録を書きます

RabbitMQでロストしないようにpublish/consumeする

RabbitMQでメッセージをロストせずにpublish/consumeする方法を軽く調べたのでメモしておく。

コードはJavaのクライアントライブラリを使用したものをscalaぽっく書いている。
RabbitMQ - RabbitMQ Java Client Library

consume

メッセージの処理が終わったらacknowledgementを返す。

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
// noAck=falseに設定
channel.basicConsume(queue, false, consumer)
.
.
.
// メッセージの処理の成否によってAckかNackを返す
if (isSuccess) {
  channel.basicAck(deliveryTag, multiple)
} else {
  channel.basicNack(deliveryTag, multiple, requeue)
}

acknowledgementが返ってくるまでRabbitMQはメッセージを保持するので、consume中にクライアントが死んだりコネクションが切れたりした場合でもメッセージはキューに残る。
multipleは指定したdeliveryTag(キューに入っているメッセージの番号のようなもの)までの全てのメッセージにacknowledgementを返すときtrue、指定したdeliveryTagのメッセージのみにacknowledgementを返す時falseにする。
requeueはnack時にメッセージをキューに詰め直す場合trueに、メッセージを捨てたりdead letterキューに入れたりする場合falseにする。

publish

publishの失敗を検知するには、transactionとconfirmという2通りの方法がある。
普通にbasicPublishするだけだとpublish後の失敗は検知できない。(publish先のExchageやキューが存在しなかったりしても何も言われない。)

transaction

RabbitMQがキューにデータを永続化するまでロックする。
AMQPの仕様ではトランザクションはatomicな処理とされているが、RabbitMQではこの仕様を満たしていないらしい。(失敗したがメッセージのpublishができていることがある?)

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body)
channel.txCommit()
channel.close()

mandatoryをtrueにするとメッセージをキューにルーティングできない時にエラーになる。falseの時はバインド先が存在しない場合メッセージが吐き捨てられる。
immediateをtrueにすると、メッセージは送信先のキューからすぐにコンシュームされない場合エラーになる。

transactionはロックを行うため、処理速度がかなり遅くなる。 メッセージをロストしたくないという目的に対してコストが高すぎるため、RabbitMQではconfirmという仕組みが提供されている。(AMQPの仕様には存在しないRabbitMQオリジナルの仕様らしい)
メッセージをロストしたくないだけならconfirmで十分なので、transactionを使用するケースはあんまり多くなさそうに感じる。

confirm

confirmを使うとbrokerがメッセージを処理した段階でpublisherにacknowledgementが返る。
下記のようにするとackknowledgementが帰ってくるまでwaitする。

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
channel.confirmSelect()
channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body)
channel.waitForConfirmsOrDie(timeout)
channel.close()

下記のようにListenerを設定することで非同期的にacknowledgementを受け取ることもできる

channel.addConfirmListener(new ConfirmListener {
  override def handleAck(deliveryTag: Long, multiple: Boolean): Unit = {
    //ackの時の処理
  }
  override def handleNack(deliveryTag: Long, multiple: Boolean): Unit = {
    //nackの時の処理
  }
})