- package com.vdncloud.las.da.akka
- import java.util.concurrent.TimeUnit
- import akka.actor.{Actor, ActorRef, ActorSystem, Props}
- import akka.routing.RoundRobinRouter
- import junit.framework.TestCase
- import scala.concurrent.duration._
- /**
- *
- * Calculate – 发送给 主 actor 来启动计算。
- * Work – 从 主 actor 发送给各 工作 actor,包含工作分配的内容。
- * Result – 从 工作 actors 发送给 主 actor,包含工作actor的计算结果。
- * PiApproximation – 从 主 actor发送给 监听器 actor,包含pi的最终计算结果和整个计算耗费的时间。
- *
- * 发送给actor的消息应该永远是不可变的,以避免共享可变状态。
- * 在scala里我们有 'case classes' 来构造完美的消息。
- * 现在让我们用case class创建3种消息。
- * 我们还为消息们创建一个通用的基础trait(定义为sealed以防止在我们不可控的地方创建消息):
- */
- sealed trait PiMessage
- case object Calculate extends PiMessage
- case class Work(start: Int, nrOfElements: Int) extends PiMessage
- case class Result(value: Double) extends PiMessage
- case class PiApproximation(pi: Double, duration: Duration) {
- override def toString: String = "Pi approximation: \t\t%s\tCalculation time: \t%s".format(pi, duration)
- }
- /**
- * 现在创建工作 actor。 方法是混入 Actor trait 并定义其中的 receive 方法.
- * receive 方法定义我们的消息处理器。我们让它能够处理 Work 消息,所以添加一个针对这种消息的处理器:
- */
- class Worker extends Actor {
- /**
- * 可以看到我们现在创建了一个 Actor 和一个 receive 方法作为 Work 消息的处理器.
- * 在这个处理器中我们调用calculatePiFor(..) 方法, 将结果包在 Result 消息里并使用sender异步发送回消息的原始发送者。
- * 在Akka里,sender引用是与消息一起隐式发送的,这样接收者可以随时回复或将sender引用保存起来以备将来使用。
- *
- * 现在在我们的 Worker actor 中唯一缺少的就是实现 calculatePiFor(..) 方法。
- * 虽然在Scala里我们可以有很多方法来实现这个算法,在这个入门指南中我们选择了一种命令式的风格,使用了for写法和一个累加器:
- */
- def calculatePiFor(start: Int, nrOfElements: Int): Double = {
- var acc = 0.0
- for (i ← start until (start + nrOfElements))
- acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
- acc
- }
- def receive = {
- case Work(start, nrOfElements) ?
- sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
- }
- }
- /**
- * 现在我们有了一个路由,可以在一个单一的抽象中表达所有的工作actor。现在让我们创建主actor. 传递给它三个整数变量:
- *
- * 我们还缺少 主 actor的消息处理器. 这个处理器需要能够对两种消息进行响应:
- * Calculate – 用来启动计算过程
- * Result – 用来汇总不同的计算结果
- *
- * @param nrOfWorkers – 定义我们会启动多少工作actor
- * @param nrOfMessages – 定义会有多少整数段发送给工作actor
- * @param nrOfElements – 定义发送给工作actor的每个整数段的大小
- * @param listener – 用来向外界报告最终的计算结果。
- */
- class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef)
- extends Actor {
- var pi: Double = _
- var nrOfResults: Int = _
- val start: Long = System.currentTimeMillis
- /**
- * 主actor会稍微复杂一些。
- * 在它的构造方法里我们创建一个round-robin的路由器来简化将工作平均地分配给工作actor们的过程,先做这个:
- */
- val workerRouter = context.actorOf(
- Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
- // val workerRouter = context.actorOf(Props[Worker].withRouter(FromConfig()), "workerRouter")
- /**
- * Calculate 处理器会通过其路由器向所有的工作 actor 发送工作内容.
- * Result 处理器从 Result 消息中获取值并汇总到我们的 pi 成员变量中.
- *
- * 我们还会记录已经接收的结果数据的数量,它是否与发送出去的任务数量一致 .
- * 主 actor 发现计算完成了,会将最终结果发送给监听者.
- * 当整个过程都完成了,它会调用 context.stop(self) 方法来终止自己 和 它所监管的所有actor.
- * 在本例中,主actor监管一个actor,我们的路由器,而路由器监管着所有 nrOfWorkers 个工作actors.
- * 所有的actor都会在其监管者的stop方法被调用时自动终止,并会传递给所有它监管的子actor。
- */
- def receive = {
- case Calculate ?
- for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
- case Result(value) ?
- pi += value
- nrOfResults += 1
- if (nrOfResults == nrOfMessages) {
- // Send the result to the listener
- val pa = PiApproximation(pi, duration = Duration(System.currentTimeMillis - start, TimeUnit.MILLISECONDS))
- listener ! pa
- sender ! pa
- // Stops this actor and all its supervised children
- context.stop(self)
- }
- }
- }
- /**
- * 创建计算结果监听者
- * 监听者很简单,当它接收到从 Master发来的PiApproximation ,就将结果打印出来并关闭整个 Actor系统。
- */
- class Listener extends Actor {
- def receive = {
- // PiApproximation(pi, duration)
- case a: PiApproximation ?
- println("Listener " + a)
- // context.system.terminate()
- context.system.shutdown()
- }
- }
- /**
- * Created by hdfs on 17-2-21.
- * ref:http://blog.csdn.net/beijicy/article/details/50587180
- *
- */
- class TestAkka extends TestCase {
- /**
- * 现在只剩下实现启动和运行计算的执行者了。
- * 我们创建一个调用 Pi的对象, 这里我们可以继承Scala中的 Apptrait, 这个trait使我们能够在命令行上直接运行这个应用.
- */
- def testAkkConcurrent(): Unit = {
- //Pi 对象是我们的actor和消息的很好的容器。
- // 所以我们把它们都放在这儿。我们还创建一个 calculate 方法来启动 主 actor 并等待它结束:
- calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
- }
- /**
- * calculate 方法创建一个 Actor系统,这是包括所有创建出的actor的 "上下文"。
- * 如何在容器中创建actor的例子在calculate方法的 'system.actorOf(...)' 这一行。
- * 这里我们创建两个顶级actor. 如果你是在一个actor上下文(i.e. 在一个创建其它actor的actor中),
- * 你应该使用 context.actorOf(...). 这在以上的主actor代码中有所体现。
- *
- * @param nrOfWorkers
- * @param nrOfElements
- * @param nrOfMessages
- */
- def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
- // Create an Akka system
- val system = ActorSystem("PiSystem")
- // val system = ActorSystem("MasterApp", ConfigFactory.load.getConfig("multiThread"))
- // create the result listener, which will print the result and shutdown the system
- val listener = system.actorOf(Props[Listener], name = "listener")
- // create the master
- val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener)),
- name = "master")
- master ! Calculate
- }
- }
来源: http://www.bubuko.com/infodetail-1953886.html