提供: tty00

移動: 案内検索

はじめに

Akkaは、並列、分散処理のためのツールキットとランタイム。 Akkaは、 アクターモデルに基づくメッセージパッシングを使用したコンピュータアーキテクチャを採用し、プログラミングの際はアクターの振る舞いやアクター間のメッセージのやりとりを記述する。 ライブラリはJavaとScalaに対応している。

システム構成

  • OS:Debian
  • Scala:2.10.2
  • SBT:0.13

SBTの設定

build.sbt

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
 
libraryDependencies ++= Seq(
	"com.typesafe.akka" %% "akka-actor" % "2.2.3",
	"com.typesafe.akka" %% "akka-slf4j" % "2.2.3",
	"com.typesafe.akka" %% "akka-remote" % "2.2.3",
	"com.typesafe.akka" %% "akka-kernel" % "2.2.3"
)

Actor/アクター

アクタークラスの定義

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
 
case class Message(x: String)
 
object Main extends App {
 
  val system = ActorSystem("sample")
  val actor = system.actorOf(Props[MyActor], "PathName") //アクターの作成
  actor ! Message("Hello") //アクターにメッセージを送る
 
  system.shutdown()
}
 
//アクタークラスの定義
class MyActor extends Actor {
  def receive = {
    case Message(x) => println(x)
  }
 
}

アクターに初期値を渡す

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
 
case class Message(x: String)
 
object Main extends App {
 
  val system = ActorSystem("sample")
  val actor = system.actorOf(Props(new MyActor("World")), "PathName") //アクターの作成
  actor ! Message("Hello") //アクターにメッセージを送る
 
  system.shutdown()
}
 
//アクタークラスの定義
class MyActor(arg: String) extends Actor {
  def receive = {
    case Message(x) => println(x + arg)
  }
 
}

メッセージ

送信元のアクターにメッセージ送る

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
 
case class Message(x: String)
case class Start(x: String)
 
object Main extends App {
 
  val system = ActorSystem("sample")
  val actor = system.actorOf(Props[MyActor], "PathName") //アクターの作成
  actor ! Start("Hello") //アクターにメッセージを送る
 
  system.shutdown()
}
 
class MyActor extends Actor {
  val echo = context.actorOf(Props[Echo])
  def receive = {
    case Start(x) =>
      println("Start=" + x); echo ! Message(x)
    case Message(x) => println("Echo=" + x)
  }
 
}
 
class Echo extends Actor {
  def receive = {
    case Message(x) => sender ! Message(x) //送信元のアクターにメッセージ送る
  }
}


アクターを指定してメッセージを送る

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
 
case class Message(x: String)
 
object Main extends App {
 
  val system = ActorSystem("sample")
  system.actorOf(Props(new MyActor("foo")), "foo") //アクターの作成
  system.actorOf(Props(new MyActor("bar")), "bar") //アクターの作成
  system.actorSelection("/user/foo") ! Message("Foo!") //アクターパスを指定して、アクターの参照を得る
  system.actorSelection("/user/bar") ! Message("Bar!") //アクターパスを指定して、アクターの参照を得る
  system.shutdown()
}
 
class MyActor(val name:String) extends Actor {
 
  def receive = {
    case Message(x) => println(x +"@"+ name)
  }
 
}

アクターからメッセージを受信する

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.{ask,pipe}
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.util.Timeout
 
 
case class Add(x:Int,y:Int)
 
object Main extends App {
 
  val system = ActorSystem("sample")
  val actor = system.actorOf(Props[MyActor]) //アクターの作成
 
  implicit val timeout = Timeout(10 seconds)
  val future = (actor ? Add(10,5)) //アクターからメッセージを受信する
 
  val result = Await.result( future , timeout.duration).asInstanceOf[Int] //メッセージを待つ
  println("result="+result)
 
  system.shutdown()
}
 
class MyActor extends Actor {
 
  def receive = {
    case Add(x,y) => sender ! (x+y)
  }
 
}

Futureでアクターからのメッセージを非同期処理

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.pattern.{ ask, pipe }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.util.Timeout
import scala.concurrent._
import scala.util.{ Success, Failure }
import ExecutionContext.Implicits.global
import scala.language.postfixOps
 
case class Add(x: Int, y: Int)
 
object Main extends App {
 
  val system = ActorSystem("sample")
  val actor = system.actorOf(Props[MyActor]) //アクターの作成
 
  implicit val timeout = Timeout(10 seconds)
  val future = (actor ? Add(10, 5)) //アクターからメッセージを受信する
 
  future onComplete {
    case Success(x) => println("x=" + x)
    case Failure(x) => println(x.getMessage())
  }
 
  Await.ready(future, timeout.duration)
 
  system.shutdown()
 
}
 
class MyActor extends Actor {
 
  def receive = {
    case Add(x, y) => sender ! (x + y)
  }
 
}

ルータ

ルータの作成

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import akka.routing.RoundRobinRouter
 
case class Message(x: String)
 
object Main extends App {
 
  val system = ActorSystem("sample")
 
  //5つのMyActorにメッセージを送信するルータ(ラウンドロビン)を作成する
  val router = system.actorOf(Props[MyActor].withRouter(RoundRobinRouter(nrOfInstances = 5)))
 
  for (i <- 1 to 10) {
    router ! Message("" + i)
 
  }
 
  system.shutdown()
 
}
 
class MyActor extends Actor {
  def receive = {
    case Message(x) => println("Message=" + x + "@" + self.path)
  }
}

アクターを指定してルータを作成する

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.RoundRobinRouter
 
case class Message(x: Int)
 
object Main extends App {
 
  val system = ActorSystem("sample")
 
  //3つのMyActorにメッセージを送信するルータ(ラウンドロビン)を作成する
  val actor1 = system.actorOf(Props[MyActor], "actor1")
  val actor2 = system.actorOf(Props[MyActor], "actor2")
  val actor3 = system.actorOf(Props[MyActor], "actor3")
 
  val routees = Vector[ActorRef](actor1, actor2, actor3)
 
  val router = system.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = routees)))
 
  for (i <- 1 to 10) {
    router ! Message(i)
  }
 
  system.shutdown()
 
}
 
class MyActor extends Actor {
  def receive = {
    case Message(x) => println("Message=" + x + "@" + self.path)
  }
}


Networking/ネットワーク

リモートにアクターを作成する

common.conf

akka {
 
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
 
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
    }
  }
}

application.conf

//#server
server {
  include "common"
 
  akka {
    # LISTEN on tcp port 2552
    remote.netty.tcp.port = 2552
  }
}
 
//#client
client {
  include "common"
 
  akka {
    actor {
      deployment {
        /echo {
          remote = "akka.tcp://ServerSystem@127.0.0.1:2552"
        }
      }
    }
 
    remote.netty.tcp.port = 2553
  }
}

Echo.scala

import akka.actor._
 
//受信メッセージを返すアクターを定義
class Echo extends Actor {
  def receive = {
    case msg: String => {
      println("self.path=" + self.path)
      println("sender.path=" + sender.path)
      println("msg=" + msg)
      sender ! msg + "@server"
    }
  }
}

Client.scala

import akka.actor._
import akka.kernel.Bootable
import com.typesafe.config.ConfigFactory
 
case class Start()
 
class ClientApp extends Bootable {
 
  val system = ActorSystem("ClientSystem", ConfigFactory.load.getConfig("client"))
  //リモートのアクターシステム(127.0.0.1:2552)にエコーアクターを生成する。
  val echo = system.actorOf(Props[Echo], name = "echo")
  val startActor = system.actorOf(Props(classOf[StartActor], echo), name = "startActor")
 
  startActor ! Start
 
  def startup() {
    println("client start")
  }
 
  def shutdown() {
    println("client end")
  }
 
}
 
class StartActor(remoteActor: ActorRef) extends Actor {
  def receive = {
    case Start => remoteActor ! "hello@client"
    case msg: String => println("msg=" + msg)
  }
}
 
object Client extends App {
  new ClientApp
}

Server.scala

import akka.actor._
import akka.kernel.Bootable
import com.typesafe.config.ConfigFactory
 
class ServerApp extends Bootable {
  var system = ActorSystem("ServerSystem", ConfigFactory.load.getConfig("server"))
 
  def startup() {
    println("server start")
  }
  def shutdown() {
    println("server stop")
 
  }
}
 
object Server extends App {
  new ServerApp()
}