提供: tty00
(ページの作成:「== はじめに == [http://akka.io/ Akka]は、並列、分散処理のためのツールキットとランタイム。 [http://akka.io/ Akka]は、 [http://ja.wikipedia....」) |
(相違点なし)
|
2014年1月18日 (土) 22:10時点における最新版
目次
はじめに
Akkaは、並列、分散処理のためのツールキットとランタイム。 Akkaは、 アクターモデルに基づくメッセージパッシングを使用したコンピュータアーキテクチャを採用し、プログラミングの際はアクターの振る舞いやアクター間のメッセージのやりとりを記述する。 ライブラリはJavaとScalaに対応している。
システム構成
- OS:Debian
- Scala:2.10.2
- SBT:0.13
SBTの設定
build.sbt
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.2.3", "com.typesafe.akka" %% "akka-slf4j" % "2.2.3", "com.typesafe.akka" %% "akka-remote" % "2.2.3", "com.typesafe.akka" %% "akka-kernel" % "2.2.3" )
Actor/アクター
アクタークラスの定義
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props case class Message(x: String) object Main extends App { val system = ActorSystem("sample") val actor = system.actorOf(Props[MyActor], "PathName") //アクターの作成 actor ! Message("Hello") //アクターにメッセージを送る system.shutdown() } //アクタークラスの定義 class MyActor extends Actor { def receive = { case Message(x) => println(x) } }
アクターに初期値を渡す
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props case class Message(x: String) object Main extends App { val system = ActorSystem("sample") val actor = system.actorOf(Props(new MyActor("World")), "PathName") //アクターの作成 actor ! Message("Hello") //アクターにメッセージを送る system.shutdown() } //アクタークラスの定義 class MyActor(arg: String) extends Actor { def receive = { case Message(x) => println(x + arg) } }
メッセージ
送信元のアクターにメッセージ送る
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props case class Message(x: String) case class Start(x: String) object Main extends App { val system = ActorSystem("sample") val actor = system.actorOf(Props[MyActor], "PathName") //アクターの作成 actor ! Start("Hello") //アクターにメッセージを送る system.shutdown() } class MyActor extends Actor { val echo = context.actorOf(Props[Echo]) def receive = { case Start(x) => println("Start=" + x); echo ! Message(x) case Message(x) => println("Echo=" + x) } } class Echo extends Actor { def receive = { case Message(x) => sender ! Message(x) //送信元のアクターにメッセージ送る } }
アクターを指定してメッセージを送る
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props case class Message(x: String) object Main extends App { val system = ActorSystem("sample") system.actorOf(Props(new MyActor("foo")), "foo") //アクターの作成 system.actorOf(Props(new MyActor("bar")), "bar") //アクターの作成 system.actorSelection("/user/foo") ! Message("Foo!") //アクターパスを指定して、アクターの参照を得る system.actorSelection("/user/bar") ! Message("Bar!") //アクターパスを指定して、アクターの参照を得る system.shutdown() } class MyActor(val name:String) extends Actor { def receive = { case Message(x) => println(x +"@"+ name) } }
アクターからメッセージを受信する
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import akka.pattern.{ask,pipe} import scala.concurrent.Await import scala.concurrent.duration._ import akka.util.Timeout case class Add(x:Int,y:Int) object Main extends App { val system = ActorSystem("sample") val actor = system.actorOf(Props[MyActor]) //アクターの作成 implicit val timeout = Timeout(10 seconds) val future = (actor ? Add(10,5)) //アクターからメッセージを受信する val result = Await.result( future , timeout.duration).asInstanceOf[Int] //メッセージを待つ println("result="+result) system.shutdown() } class MyActor extends Actor { def receive = { case Add(x,y) => sender ! (x+y) } }
Futureでアクターからのメッセージを非同期処理
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import akka.pattern.{ ask, pipe } import scala.concurrent.Await import scala.concurrent.duration._ import akka.util.Timeout import scala.concurrent._ import scala.util.{ Success, Failure } import ExecutionContext.Implicits.global import scala.language.postfixOps case class Add(x: Int, y: Int) object Main extends App { val system = ActorSystem("sample") val actor = system.actorOf(Props[MyActor]) //アクターの作成 implicit val timeout = Timeout(10 seconds) val future = (actor ? Add(10, 5)) //アクターからメッセージを受信する future onComplete { case Success(x) => println("x=" + x) case Failure(x) => println(x.getMessage()) } Await.ready(future, timeout.duration) system.shutdown() } class MyActor extends Actor { def receive = { case Add(x, y) => sender ! (x + y) } }
ルータ
ルータの作成
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props import akka.routing.RoundRobinRouter case class Message(x: String) object Main extends App { val system = ActorSystem("sample") //5つのMyActorにメッセージを送信するルータ(ラウンドロビン)を作成する val router = system.actorOf(Props[MyActor].withRouter(RoundRobinRouter(nrOfInstances = 5))) for (i <- 1 to 10) { router ! Message("" + i) } system.shutdown() } class MyActor extends Actor { def receive = { case Message(x) => println("Message=" + x + "@" + self.path) } }
アクターを指定してルータを作成する
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.ActorRef import akka.actor.Props import akka.routing.RoundRobinRouter case class Message(x: Int) object Main extends App { val system = ActorSystem("sample") //3つのMyActorにメッセージを送信するルータ(ラウンドロビン)を作成する val actor1 = system.actorOf(Props[MyActor], "actor1") val actor2 = system.actorOf(Props[MyActor], "actor2") val actor3 = system.actorOf(Props[MyActor], "actor3") val routees = Vector[ActorRef](actor1, actor2, actor3) val router = system.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = routees))) for (i <- 1 to 10) { router ! Message(i) } system.shutdown() } class MyActor extends Actor { def receive = { case Message(x) => println("Message=" + x + "@" + self.path) } }
Networking/ネットワーク
リモートにアクターを作成する
common.conf
akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { netty.tcp { hostname = "127.0.0.1" } } }
application.conf
//#server server { include "common" akka { # LISTEN on tcp port 2552 remote.netty.tcp.port = 2552 } } //#client client { include "common" akka { actor { deployment { /echo { remote = "akka.tcp://ServerSystem@127.0.0.1:2552" } } } remote.netty.tcp.port = 2553 } }
Echo.scala
import akka.actor._ //受信メッセージを返すアクターを定義 class Echo extends Actor { def receive = { case msg: String => { println("self.path=" + self.path) println("sender.path=" + sender.path) println("msg=" + msg) sender ! msg + "@server" } } }
Client.scala
import akka.actor._ import akka.kernel.Bootable import com.typesafe.config.ConfigFactory case class Start() class ClientApp extends Bootable { val system = ActorSystem("ClientSystem", ConfigFactory.load.getConfig("client")) //リモートのアクターシステム(127.0.0.1:2552)にエコーアクターを生成する。 val echo = system.actorOf(Props[Echo], name = "echo") val startActor = system.actorOf(Props(classOf[StartActor], echo), name = "startActor") startActor ! Start def startup() { println("client start") } def shutdown() { println("client end") } } class StartActor(remoteActor: ActorRef) extends Actor { def receive = { case Start => remoteActor ! "hello@client" case msg: String => println("msg=" + msg) } } object Client extends App { new ClientApp }
Server.scala
import akka.actor._ import akka.kernel.Bootable import com.typesafe.config.ConfigFactory class ServerApp extends Bootable { var system = ActorSystem("ServerSystem", ConfigFactory.load.getConfig("server")) def startup() { println("server start") } def shutdown() { println("server stop") } } object Server extends App { new ServerApp() }