読者です 読者をやめる 読者になる 読者になる

成らぬは人の為さぬなりけり

エンジニアライフをエンジョイする為のブログ

Scala勉強日誌 - Akka その2 Future

さて、実は前回の(その1)からかなり時間が経っているのですが、また再開していきたいと思います。

今日のテーマ

  • 非同期でWebAPIを呼ぶ
  • 結果を受け取って処理する

環境

  • MacOSX 10.9.4
  • Scala 2.11.1
  • sbt 0.13.5
  • akka-actor 2.3.4

非同期でWebAPIを呼ぶ

まずは、API呼ぶActorを作ります。

こんかいはシンプルに天気を取ってくるAPIです。

package akka.sample.actor

import akka.actor.Actor
import scala.io.Source

class WeatherApi extends Actor {
  def receive = {
    case i: Int =>
      println(s"receive $i")
      Thread.sleep(1000)
      sender ! (i, Source.fromURL("http://weather.livedoor.com/forecast/webservice/json/v1?city=130010", "utf8").getLines.mkString)
  }
}

少し補足しておきますと、 - iを受け取っているのは、何回もこのActorを使いたいので、何回目かを受け取ってます(深い意味は無いです) - sleepしているのはAPI呼び出しに時間がかかる事を演出する為なので、直接は関係ないです

結果を受け取って処理する

http://straitwalk.hatenablog.com/entry/2013/02/13/223039:title=前回]の「ActorからActorを呼んで結果を受け取ってみる」あたりで、 結果受け取ったんですが、今回はFutureを使ってみたいと思います。

Futureに関しては、 Future と Promise - Scala Documentation この辺みるのが良いのかな??

package akka.sample

import akka.actor.{Props, ActorSystem}
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext}
import ExecutionContext.Implicits.global
import spray.json._
import akka.sample.actor.WeatherApi

object AkkaExample03_01 extends App {
  val system = ActorSystem()
  val actor = system.actorOf(Props[WeatherApi])
  implicit val timeout = Timeout(10.seconds)

  try {
    (1 to 5).foreach { i =>
      println(s"start $i")
      val f = (actor ? i).map {
        case (i, jsonStr: String) => jsonStr.parseJson.asJsObject
      }
      f.onSuccess {
        case (json: JsObject) =>
          println(s"$i ${json.getFields("location").head.asJsObject.getFields("city").head}")
      }
    }
    Thread.sleep(5000)
  } finally {
    system.shutdown
  }

}
  • 5回API呼ぶ
  • それぞれ結果を受け取ってJSONをparseする
  • 結果のJSONからcityプロパティをprintlnしてみる

という事をやってます。(なんかprintデバッグが多いからか?コードが汚い感じするなぁ...うーむ, LL脳だからか??)

  • timeout
    • AskableActorRef#?(askのalias?)に必要なimplicit parameterです
    • Actor#receiveが結果返すまでのtimeoutかと思います
  • actor ? iがFutureのオブジェクトを返すので、mapしてJSONをparseしてます
  • jsonのparseに成功すると、onSuccessでprintlnします
  • sleepさせてるのは。そのまま終わらせると、actorが仕事し終わる前にexitしてしまうので、待ってるだけ。(sleepでやるのキモいけど)

実行結果

[info] Running akka.sample.AkkaExample03_01 
start 1
receive 1
start 2
start 3
start 4
start 5
receive 2
1 "東京"
receive 3
2 "東京"
receive 4
3 "東京"
receive 5
4 "東京"

実行してみるとわかると思うんですが、1個ずつメッセージが順番に出力されます。 つまり、1個1個順番に処理している、並列に処理させたい...

Routerを使ってみる

というわけで、Routerを使ってみます。

Router使ったバージョン

object AkkaExample03_02 extends App {
  val system = ActorSystem()
  val router = system.actorOf(Props[WeatherApi].withRouter(new RoundRobinPool(3)))
  implicit val timeout = Timeout(10.seconds)

  try {
    (1 to 5).foreach { i =>
      println(s"start $i")
      val f = (router ? i).map {
        case (i, jsonStr:String) => jsonStr.parseJson.asJsObject
      }
      f.onSuccess {
        case json: JsObject => println(s"$i ${json.getFields("location").head.asJsObject.getFields("city").head}")
      }
    }
    Thread.sleep(5000)
  } finally {
    system.shutdown
  }
}

変えたのは

  val router = system.actorOf(Props[WeatherApi].withRouter(new RoundRobinPool(3)))

ここです。 前回やった時はRoundRobinRouterを使ったんですが、 2.3からdeprecatedになったようで

@deprecated("Use RoundRobinPool or RoundRobinGroup", "2.3")
object RoundRobinRouter {
...

と書いてあったので、RoundRobinPool使ってみました。 実行してみると、3つずつ処理されているのがわかると思います。

とりあえず、今日はここまで。