RabbitMQでロストしないようにpublish/consumeする

RabbitMQでメッセージをロストせずにpublish/consumeする方法を軽く調べたのでメモしておく。

consume

メッセージの処理が終わったらacknowledgementを返す。

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
// noAck=false
channel.basicConsume(queue, false, consumer)
//メッセージの処理
if (isSuccess) {
  channel.basicAck(deliveryTag, multiple)
} else {
  channel.basicNack(deliveryTag, multiple, requeue)
}

acknowledgementが返ってくるまでRabbitMQはメッセージを保持するので、consume中にクライアントが死んだりコネクションが切れたりした場合でもメッセージはキューに残る。
multipleは指定したdeliveryTagまでの全てのメッセージにacknowledgementを返すときtrue、指定したdeliveryTagのメッセージのみにacknowledgementを返す時falseにする。
requeueはnack時にメッセージをキューに詰め直す場合trueに、メッセージを捨てたりdead letterキューに入れたりする場合falseにする。

publish

publishの失敗を検知するには、transactionとconfirmという2通りの方法がある。
普通にbasicPublishするだけだとpublish後の失敗は検知できない。(publish先のExchageやキューが存在しなかったりしても何も言われない。)

transaction

RabbitMQがキューにデータを保存し、永続化されるまでをブロックする?(このへんの理解は不十分です…) AMQPの仕様ではトランザクションはatomicな処理とされているが、RabbitMQではこの仕様を満たしていないらしい。(失敗したがメッセージのpublishはできているということがあるらしい。)

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body)
channel.txCommit()
channel.close()

mandatoryをtrueにするとメッセージをキューにルーティングできない時にエラーになる。falseの時はバインド先が存在しない場合メッセージが吐き捨てられる。
immediateをtrueにすると、メッセージは送信先のキューからすぐにコンシュームされない場合エラーになる。

transactionはコストの高い処理をしているらしく、処理速度がかなり遅くなる。 メッセージをロストしたくないという目的に対してコストが高すぎるため、RabbitMQではconfirmという仕組みが提供されている。(AMQPの仕様には存在しないRabbitMQオリジナルの仕様)

confirm

confirmを使うとbrokerがメッセージを処理した段階でpublisherにacknowledgementが返る。
下記のようにするとackknowledgementが帰ってくるまでwaitする。

val connection = factory.newConnection(addresses)
val channel = connection.createChannel()
channel.confirmSelect()
channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body)
channel.waitForConfirmsOrDie(timeout)
channel.close()

下記のようにListenerを設定することで非同期的にacknowledgementを受け取ることもできる

channel.addConfirmListener(new ConfirmListener {
  override def handleAck(deliveryTag: Long, multiple: Boolean): Unit = {
    //ackの時の処理
  }
  override def handleNack(deliveryTag: Long, multiple: Boolean): Unit = {
    //nackの時の処理
  }
})

メッセージをロストしたくないだけならconfirmで十分なので、transactionを使用するケースはあんまり多くなさそうに感じる。

pythonのプロジェクト構成

今までちょっとしたスクリプトくらいでしかpython使ってこなかったので、ちゃんとしたプロジェクト構成について知識がなかった。
githubリポジトリを漁って見たが、結構まちまちであまり決まった構成というのはないのかなという印象。

下記のドキュメントが参考になりそうだったのでひとまずこれに従ってやってみようと思う。
python初学者向けのドキュメントで構成についてだけでなく全般的な内容が書かれている。

Structuring Your Project — The Hitchhiker's Guide to Python

google financeのAPIのメモ

株価データを取得するためにgoogle financeのAPIを触ってみたときのメモ。
このAPIはおそらくGoogleFinanceの裏で使われているAPIで、公式のドキュメントが存在しない。
公式で提供しているAPIは終了しているっぽいので、使い続けられるか怪しいかもしれない。 https://developers.google.com/finance/?hl=ja

リクエス

下記はトヨタの過去1年分の株価を取得する例 。
https://www.google.com/finance/getprices?p=1Y&i=86400&x=TYO&q=7203

p:データを取得する期間。年(Y)または日(d)で指定ができる
i:データの間隔。秒単位で指定する。上記では1日を(86400秒)指定している。
x:銘柄が取引される市場。TYOは東京証券取引所
q:銘柄のコード。

またデータの取得を開始する日時をts(UNIXタイムスタンプ)で指定できる
https://www.google.com/finance/getprices?p=1Y&i=86400&x=TYO&q=7203&ts=1451574000

レスポンス

下記のようなレスポンスが返ってくる

EXCHANGE%3DTYO
MARKET_OPEN_MINUTE=540
MARKET_CLOSE_MINUTE=900
INTERVAL=86400
COLUMNS=DATE,CLOSE,HIGH,LOW,OPEN,VOLUME
DATA_SESSIONS=[MORNING,540,690],[AFTERNOON,750,900]
DATA=
TIMEZONE_OFFSET=540
a1455861600,5999,6086,5961,6050,14020000
3,5980,6044,5850,5906,11427200
4,5994,6070,5954,6065,10895500
5,5974,5989,5835,5900,10888100
6,5931,5967,5827,5881,15819200
7,5910,6006,5895,5960,12753100
10,5897,6062,5897,6001,18539900
11,5893,5935,5824,5853,11093200
...

株価のデータは9行目から。どの列が何を表すのかはCOLUMNSを見れば分かる。
株価データの一行目のdateはタイムスタンプになっており、2行目以降は最初のデータからiで指定した期間が何回経過したかを表している。
上記はデータの間隔が1日の例だが、dateの数値が飛んでいるのは土日などを挟んでいるため。
また、終値は単にCLOSEとされているが、ちゃんと調整後終値株式分割を反映させた終値)が返ってくる。

サンプル

すごく雑に1年分の株価をcsvにして吐き出すものを書いた。
データ解析に向いていると聞いてpythonで書いてみたが、正直pythonよくわからずに書いているのでいいコードではないと思う。

gist.github.com

elasticsearchでfilterとpost_filterを使ったらうまくいかなかった

termによる検索結果をさらに絞り込むためにpost_filterを使ったがうまくいかなかった。

post_filter

検索結果からさらに絞り込みを行うためのクエリで主にaggregationと組み合わせる。

このようなクエリを投げたが、 filterの結果から絞り込むのではなく、post_filterのみを適用した結果が返ってきてしまう。(2.2で確認。5以降では動くかも)

{
  "filter":{
      {"term":{"field1":"hoge"}},
  },
  "from":0,
  "size":100,
  "sort":{"createdAt":{"order":"desc"}},
  "post_filter":{"not":{"term":{"field2":"huga"}}}
}

queryと一緒に使うようにしたら動いた。

{
  "query":{
      {"term":{"field1":"hoge"}},
  },
  "from":0,
  "size":100,
  "sort":{"createdAt":{"order":"desc"}},
  "post_filter":{"not":{"term":{"field2":"huga"}}}
}

filterとqueryの違いについては公式のドキュメントで説明されている。 https://www.elastic.co/guide/en/elasticsearch/guide/current/_queries_and_filters.html

これによるとqueryはスコアリングをしたい時に使用するもので、filterは単に絞り込みだけがしたい時に使うものらしい。 post_filterが使われる場合は単なる絞り込みではないと言うことなのだろうか。

python環境構築

今までデフォルトで入ってたpythonを使ってたけど、ちょっとpythonで作ってみたいものが出てきたので環境構築する。 といってもpyenv、virtualenvを入れただけ。 https://github.com/yyuu/pyenv#basic-github-checkout

pyenv

brewが使えるようなのでbrewで入れる。

brew install pyenv

任意のバージョンをインストール

pyenv install 3.5.0

コマンドラインツールがなくてエラーが出ることがあるらしい。3.5.0をインストールしようとしたら下記のエラーが出た

make: *** [Python/importlib_external.h] Trace/BPT trap: 5
make: *** Waiting for unfinished jobs....
make: *** [Python/importlib.h] Trace/BPT trap: 5

その場合以下を実行

xcode-select --install。

リハッシュしてグローバルに設定。

pyenv rehash
pyenv global 3.5.0

pathの設定をする。下記はbashの例。

echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bash_profile
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bash_profile
echo 'eval "$(pyenv init -)"' >> ~/.bash_profile

下記のコマンドでpyenv下のpythonが帰ってきたら完了。

which python

virtualenv

virtualenvを使うとプロジェクト毎に異なるバージョンのモジュールを使用できるようになる。
pipでインストールできる。

pip install virtualenv

下記のようにプロジェクト用の仮想環境を作り、activateする

virtualenv [仮想環境の名前]
source [仮想環境の名前]/bin/activate

これでグローバルではなく、プロジェクト毎の環境にパッケージがインストールされるようになる。

iosアプリのsegueをプログラムから呼び出す

StoryBoard上ではなく、プログラムからsegueを呼び出したかった。 やり方をググったが何故か中々ヒットしなかったのでメモしておく。

事前準備

StoryBoard上で、ViewController間をつなぐsegueを作っておく。 作ったsegueのidentifierはちゃんと埋めておく。

コード

以下のコードでsegueを呼び出せる。

performSegueWithIdentifier("segueのidentifier", sender: self)

またsegueが呼ばれた際の処理はVieController内で以下のように書ける。

override func prepareForSegue(segue: UIStoryboardSegue, sender: AnyObject?) {
  // やりたい処理
}

簡単そうなのに何故ヒットしなかったのだろう。

iosアプリの起動時の画面をコントロールする

ログイン状態など条件に応じてrootViewControllerを切り替えたかった。 ちょっと調べたところUIWindowのrootViewControllerを設定すれば良いらしい。

var window: UIWindow?

let storyboard = UIStoryboard(name: "Main", bundle: nil)
if (flag) {
  let mainViewController = storyboard.instantiateViewControllerWithIdentifier("MainViewController")
  let navigationController = UINavigationController(rootViewController: mainViewController)
  self.window!.rootViewController = navigationController
} else {
  let loginViewController = storyboard.instantiateViewControllerWithIdentifier("LoginViewController")
  let navigationController = UINavigationController(rootViewController: loginViewController)
  self.window!.rootViewController = navigationController
}

しかし、このようにすると最初のviewが表示される前に一瞬真っ暗な画面が表示されてしまった。
原因はデフォルトのrootViewControllerが設定されておらず、上記のコードの実行前に未設定のrootViewControllerを表示しようとしているためだった。 ひとまずlaunch中に表示するviewを作りデフォルトのrootViewControllerとして設定することで見かけ上は解決した。

launch前にrootViewControllerを設定する方法があればそれを使いたいが、ググっても見つからなかった。 より良い方法が見つかったら追記したい。