RabbitMQでロストしないようにpublish/consumeする

RabbitMQでメッセージをロストせずにpublish/consumeする方法を軽く調べたのでメモしておく。

consume

メッセージの処理が終わったらacknowledgementを返す。

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
// noAck=false
channel.basicConsume(queue, false, consumer)
//メッセージの処理
if (isSuccess) {
  channel.basicAck(deliveryTag, multiple)
} else {
  channel.basicNack(deliveryTag, multiple, requeue)
}

acknowledgementが返ってくるまでRabbitMQはメッセージを保持するので、consume中にクライアントが死んだりコネクションが切れたりした場合でもメッセージはキューに残る。
multipleは指定したdeliveryTagまでの全てのメッセージにacknowledgementを返すときtrue、指定したdeliveryTagのメッセージのみにacknowledgementを返す時falseにする。
requeueはnack時にメッセージをキューに詰め直す場合trueに、メッセージを捨てたりdead letterキューに入れたりする場合falseにする。

publish

publishの失敗を検知するには、transactionとconfirmという2通りの方法がある。
普通にbasicPublishするだけだとpublish後の失敗は検知できない。(publish先のExchageやキューが存在しなかったりしても何も言われない。)

transaction

RabbitMQがキューにデータを保存し、永続化されるまでをブロックする?(このへんの理解は不十分です…) AMQPの仕様ではトランザクションはatomicな処理とされているが、RabbitMQではこの仕様を満たしていないらしい。(失敗したがメッセージのpublishはできているということがあるらしい。)

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body)
channel.txCommit()
channel.close()

mandatoryをtrueにするとメッセージをキューにルーティングできない時にエラーになる。falseの時はバインド先が存在しない場合メッセージが吐き捨てられる。
immediateをtrueにすると、メッセージは送信先のキューからすぐにコンシュームされない場合エラーになる。

transactionはコストの高い処理をしているらしく、処理速度がかなり遅くなる。 メッセージをロストしたくないという目的に対してコストが高すぎるため、RabbitMQではconfirmという仕組みが提供されている。(AMQPの仕様には存在しないRabbitMQオリジナルの仕様)

confirm

confirmを使うとbrokerがメッセージを処理した段階でpublisherにacknowledgementが返る。
下記のようにするとackknowledgementが帰ってくるまでwaitする。

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
channel.confirmSelect()
channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body)
channel.waitForConfirmsOrDie(timeout)
channel.close()

下記のようにListenerを設定することで非同期的にacknowledgementを受け取ることもできる

channel.addConfirmListener(new ConfirmListener {
  override def handleAck(deliveryTag: Long, multiple: Boolean): Unit = {
    //ackの時の処理
  }
  override def handleNack(deliveryTag: Long, multiple: Boolean): Unit = {
    //nackの時の処理
  }
})

メッセージをロストしたくないだけならconfirmで十分なので、transactionを使用するケースはあんまり多くなさそうに感じる。