読者です 読者をやめる 読者になる 読者になる

成らぬは人の為さぬなりけり

エンジニアライフをエンジョイする為のブログ

Scala勉強日誌 - Actor

今日のテーマは「Actor」です。


チャット作ってみようと思って、Play2.0のサンプルにある
WebsocketChatのコードを見ていたらActorが出てきて、
全く理解できずw
そういえば、さらっと勉強して、すっかり忘れていたので
ちゃんと勉強してみる事にしました。


今日は、本当に基礎のみ。
まずはActorトレイトを実装する方法でやってみます。


1.非同期でメッセージ送信

ScalaのActorはメッセージパッシングという方法で
メッセージを使って、情報の伝達を行います。
例として、受け取ったメッセージをそのまま標準出力に出力するActorを作ってみます。

import scala.actors._
class MyActor extends Actor {
  def act = {
    receive {
      case msg => println("echo -> " + msg)
    }
  }
}

まずは、

  1. actメソッドを実装する
  2. メッセージを受け取った時に実行される処理をreceiveメソッドに渡す


ではこいつを実行してみましょう。

object Main extends App {
  val myActor = new MyActor
  myActor.start
  myActor ! "hoge"
}

MyActorをnewして、そのActorさんをstartしてあげます。
ここで登場するのが「!」メソッド。
こいつは非同期で、Actorに対してメッセージを送信します。
※メッセージを送信するメソッドは他にもありますが、後ほど使用します。


では、実行してみます。

echo -> hoge

送信したメッセージがちゃんと受け取れている事が確認できました。


Actorはメッセージキューを持っているので、メッセージを貯めておくことができます。
なので、startする前にメッセージを送っても、キューにちゃんと持っていて、
startしてから処理を開始してくれます。
試しにstartと!を逆にしてみましょう。

object Main extends App {
  val myActor = new MyActor
  myActor ! "hoge"
  myActor.start
}
echo -> hoge

前回と同じ結果になりました。


では、次は、こいつに対して、
2回メッセージを送ってみましょう。

object Main extends App {
  val myActor = new MyActor
  myActor.start
  myActor ! "hoge"
  myActor ! "foo"
}
echo -> hoge

「hoge」しか受信できませんでした。
receiveの処理を何回も実行する為には、
loopメソッドを使う必要があります。


MyActorを直してみます。

class MyActor extends Actor {
  def act = {
    loop {
      receive {
        case msg => println("echo -> " + msg)
      }
    }
  }
}
echo -> hoge
echo -> foo

よし、今度は両方実行できました。


2.結果を受け取る
次はActorからなにかしら結果を受け取ってみます。
結果を受け取る為には「replay」メソッドを使用します。

class MyActor extends Actor {
  def act = {
    loop {
      receive {
        case msg => reply(msg)
      }
    }
  }
}
  val myActor = new MyActor
  myActor.start
  println(myActor ! "hoge")
  println(myActor ! 2)
()
()

お、受け取れない、、、
!メソッドは、

def !(msg: Any): Unit 

こうなっている。そりゃ受け取れないわ、、、
というわけで、結果を受け取るんだから同期のメソッドを使ってみる。

  val myActor = new MyActor
  myActor.start
  println(myActor !? "hoge")
  println(myActor !? 2)
hoge
2

同期でメッセージ送信するメソッド「!?」を使う事で結果を受け取る事ができる。


3.receiveとreact

ScalaのActorは内部的にはJavaのスレッドプールで実装されているようです。
なので、使えるスレッドがなくなると、内部で新しいスレッドを作っていきます。
receiveメソッドは、スレッドをブロックする為、
スレッドをブロックしないreactメソッドを使用してみます。

class MyActor extends Actor {
  def act = {
    loop {
      react {
        case msg => reply(msg)
      }
    }
  }
}
hoge
2

はい、特に何も変わりがありません。

うむ、なんかどう違って、どう使い分けたら良いのかよくわからない。
何が違うんでしょ?

Scala の Actor - receive/react と Thread - なんとなくな Developer のメモ
こちらを参考にさせて頂きました。

  • reactは制御を返さない
  • Actor と Thread は 1対1 に対応しない(状況に応じて動的に割り当てられる模様)
  • receive は個別の Thread で処理する
  • react は Thread を共有して処理する

まず、一個目から検証してみましょう。


■reactは制御を返さない

class MyActor extends Actor {
  def act = {
    loop {
      receive {
        case msg => reply(msg)
      }
      println("Test!!!")
    }
  }
}
Test!!!
hoge
Test!!!
2
class MyActor extends Actor {
  def act = {
    loop {
      react {
        case msg => reply(msg)
      }
      println("Test!!!")
    }
  }
}
hoge
2

なるほど、reactの後の処理の部分が評価されないんですね。
確かに、制御を返さないんだから、当たり前ですね。


■スレッドの割当

こちらは、参考サイトのサンプルを参照してください。
receiveはActor毎に別のスレッドが割り当てられているのに対して
reactは同じスレッドが使いまわされていますね。
なるほど。


4.終了させる

Eclipseを使ってサンプルコードを書いていたら、
メインスレッドが終了していない事に気づく。。。
Oh,,,ソリャソウダorz

では、スレッドを終了させてやりましょう。

case class Exit

class MyActor extends Actor {
  def act = {
    loop {
      react {
        case Exit =>
          reply("exit")
          exit
        case msg => reply(msg)
      }
    }
  }
}
  val myActor = new MyActor
  myActor.start
  println(myActor !? "hoge")
  println(myActor !? 2)
  println(myActor !? Exit)
hoge
2
exit


5.Actorオブジェクトのメソッドを使ってやってみる

Actorオブジェクトのactorメソッドを使えば、
クラスを定義しなくてもActorインスタンスが生成できる。

import scala.actors.Actor
import scala.actors.Actor._

object ActorSample extends App {
  // receive
  val actor_receive = actor {
    val res = receive {
      case i:Int =>
        println("receive message : " + i)
        i
    }
    println("res : " + res)
  }
  actor_receive ! 1
  actor_receive ! 2 // loopしてないので、こいつは無視
  
  // react
  val actor_react = actor {
    loop {
      react {
        case i: Int => 
          println("react message : " + i)
          reply(i)
      }
      println("hoge") // reactは制御を返さないのでここは評価されない
    }
  }
  actor_react ! 1000
  actor_react ! 9999
  var res = actor_react !? 10 // 結果を受け取る場合は同期(!?)でメッセージ送信
  println("reply response: " + res)
  
  // 非同期で結果を受け取り処理をさせる方法
  actor_react !! (111, {case i:Int => println("callback : " + i)})
}


今日は、ここまで。
我ながら、分からない事がたくさんありすぎるw


次回は、Akkaにチャレンジしてみよう。