Scala勉強日誌 - Akka その2 Future
さて、実は前回の(その1)からかなり時間が経っているのですが、また再開していきたいと思います。
今日のテーマ
- 非同期でWebAPIを呼ぶ
- 結果を受け取って処理する
環境
非同期でWebAPIを呼ぶ
まずは、API呼ぶActorを作ります。
こんかいはシンプルに天気を取ってくるAPIです。
package akka.sample.actor import akka.actor.Actor import scala.io.Source class WeatherApi extends Actor { def receive = { case i: Int => println(s"receive $i") Thread.sleep(1000) sender ! (i, Source.fromURL("http://weather.livedoor.com/forecast/webservice/json/v1?city=130010", "utf8").getLines.mkString) } }
少し補足しておきますと、
- i
を受け取っているのは、何回もこのActorを使いたいので、何回目かを受け取ってます(深い意味は無いです)
- sleep
しているのはAPI呼び出しに時間がかかる事を演出する為なので、直接は関係ないです
結果を受け取って処理する
http://straitwalk.hatenablog.com/entry/2013/02/13/223039:title=前回]の「ActorからActorを呼んで結果を受け取ってみる」あたりで、 結果受け取ったんですが、今回はFutureを使ってみたいと思います。
Futureに関しては、 Future と Promise - Scala Documentation この辺みるのが良いのかな??
package akka.sample import akka.actor.{Props, ActorSystem} import akka.util.Timeout import akka.pattern.ask import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext} import ExecutionContext.Implicits.global import spray.json._ import akka.sample.actor.WeatherApi object AkkaExample03_01 extends App { val system = ActorSystem() val actor = system.actorOf(Props[WeatherApi]) implicit val timeout = Timeout(10.seconds) try { (1 to 5).foreach { i => println(s"start $i") val f = (actor ? i).map { case (i, jsonStr: String) => jsonStr.parseJson.asJsObject } f.onSuccess { case (json: JsObject) => println(s"$i ${json.getFields("location").head.asJsObject.getFields("city").head}") } } Thread.sleep(5000) } finally { system.shutdown } }
という事をやってます。(なんかprintデバッグが多いからか?コードが汚い感じするなぁ...うーむ, LL脳だからか??)
- timeout
AskableActorRef#?
(askのalias?)に必要なimplicit parameterです- Actor#receiveが結果返すまでのtimeoutかと思います
actor ? i
がFutureのオブジェクトを返すので、mapしてJSONをparseしてます- jsonのparseに成功すると、onSuccessでprintlnします
sleep
させてるのは。そのまま終わらせると、actorが仕事し終わる前にexitしてしまうので、待ってるだけ。(sleepでやるのキモいけど)
実行結果
[info] Running akka.sample.AkkaExample03_01 start 1 receive 1 start 2 start 3 start 4 start 5 receive 2 1 "東京" receive 3 2 "東京" receive 4 3 "東京" receive 5 4 "東京"
実行してみるとわかると思うんですが、1個ずつメッセージが順番に出力されます。 つまり、1個1個順番に処理している、並列に処理させたい...
Routerを使ってみる
というわけで、Routerを使ってみます。
Router使ったバージョン
object AkkaExample03_02 extends App { val system = ActorSystem() val router = system.actorOf(Props[WeatherApi].withRouter(new RoundRobinPool(3))) implicit val timeout = Timeout(10.seconds) try { (1 to 5).foreach { i => println(s"start $i") val f = (router ? i).map { case (i, jsonStr:String) => jsonStr.parseJson.asJsObject } f.onSuccess { case json: JsObject => println(s"$i ${json.getFields("location").head.asJsObject.getFields("city").head}") } } Thread.sleep(5000) } finally { system.shutdown } }
変えたのは
val router = system.actorOf(Props[WeatherApi].withRouter(new RoundRobinPool(3)))
ここです。
前回やった時はRoundRobinRouter
を使ったんですが、
2.3からdeprecatedになったようで
@deprecated("Use RoundRobinPool or RoundRobinGroup", "2.3") object RoundRobinRouter { ...
と書いてあったので、RoundRobinPool
使ってみました。
実行してみると、3つずつ処理されているのがわかると思います。
とりあえず、今日はここまで。