在上一篇讨论中我们谈到了监管:在 Akka 中就是一种直属父子监管树结构,父级 Actor 负责处理直属子级 Actor 产生的异常。当时我们把 BackoffSupervisor 作为父子监管方式的其中一种。实际上 BackoffSupervisor 与定义了 supervisorStrategy 的 Actor 有所不同。我们应该把 BackoffSupervisor 看作是一个一体化的 Actor。当然,它的实现方式还是由一对父子 Actor 组成。监管策略(SupervisorStrategy)是在 BackoffSupervisor 的内部实现的。从外表上 BackoffSupervisor 就像是一个 Actor,运算逻辑是在子级 Actor 中定义的,所谓的父级 Actor 除监管之外没有任何其它功能,我们甚至没有地方定义父级 Actor 的功能,它的唯一功能是转发收到的信息给子级,是嵌入 BackoffSupervisor 里的。所以我们虽然发送消息给 BackoffSupervisor,但实际上是在与它的子级交流。我们看看下面这个例子:
- package backoffSupervisorDemo
- import akka.actor._
- import akka.pattern._
- import backoffSupervisorDemo.InnerChild.TestMessage
- import scala.concurrent.duration._
- object InnerChild {
- case class TestMessage(msg: String)
- class ChildException extends Exception
- def props = Props[InnerChild]
- }
- class InnerChild extends Actor with ActorLogging {
- import InnerChild._
- overridedef receive: Receive = {
- caseTestMessage(msg) =>//模拟子级功能log.info(s"Child received message: ${msg}")
- }
- }
- object Supervisor {
- def props: Props = {//在这里定义了监管策略和child Actor构建def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
- case_: InnerChild.ChildException => SupervisorStrategy.Restart
- }
- val options = Backoff.onFailure(InnerChild.props,"innerChild",1second,5seconds,0.0)
- .withManualReset
- .withSupervisorStrategy(
- OneForOneStrategy(maxNrOfRetries =5, withinTimeRange =5 seconds)(
- decider.orElse(SupervisorStrategy.defaultDecider)
- )
- )
- BackoffSupervisor.props(options)
- }
- }
- //注意:下面是Supervisor的父级,不是InnerChild的父级
- object ParentalActor {
- case class SendToSupervisor(msg: InnerChild.TestMessage)
- case class SendToInnerChild(msg: InnerChild.TestMessage)
- case class SendToChildSelection(msg: InnerChild.TestMessage)
- def props = Props[ParentalActor]
- }
- class ParentalActor extends Actor with ActorLogging {
- import ParentalActor._
- //在这里构建子级Actor supervisorval supervisor = context.actorOf(Supervisor.props,"supervisor")
- supervisor ! BackoffSupervisor.getCurrentChild//要求supervisor返回当前子级Actor
- varinnerChild: Option[ActorRef] = None//返回的当前子级ActorRefval selectedChild = context.actorSelection("/user/parent/supervisor/innerChild")
- overridedef receive: Receive = {
- caseBackoffSupervisor.CurrentChild(ref) =>//收到子级Actor信息innerChild =ref
- caseSendToSupervisor(msg) => supervisor ! msg
- caseSendToChildSelection(msg) => selectedChild ! msg
- caseSendToInnerChild(msg) => innerChildforeach(child => child ! msg)
- }
- }
- object BackoffSupervisorDemo extends App {
- import ParentalActor._
- val testSystem = ActorSystem("testSystem")
- val parent = testSystem.actorOf(ParentalActor.props,"parent")
- Thread.sleep(1000)//wait for BackoffSupervisor.CurrentChild(ref) received
- parent ! SendToSupervisor(TestMessage("Hello message 1 to supervisor"))
- parent ! SendToInnerChild(TestMessage("Hello message 2 to innerChild"))
- parent ! SendToChildSelection(TestMessage("Hello message 3 to selectedChild"))
- scala.io.StdIn.readLine()
- testSystem.terminate()
- }
在上面的例子里我们分别向 supervisor,innerChild,selectedChild 发送消息。但所有消息都是由 InnerChild 响应的,如下:
- [INFO][05 / 29 / 2017 16 : 11 : 48.167][testSystem - akka.actor.
- default - dispatcher - 2][akka: //testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 1 to supervisor
- [INFO][05 / 29 / 2017 16 : 11 : 48.177][testSystem - akka.actor.
- default - dispatcher - 2][akka: //testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 2 to innerChild
- [INFO][05 / 29 / 2017 16 : 11 : 48.179][testSystem - akka.actor.
- default - dispatcher - 3][akka: //testSystem/user/parent/supervisor/innerChild] Child received message: Hello message 3 to selectedChild
上面我们向 supervisor 发送了一个 BackoffSupervisor.GetCurrentChild 消息用来获取子级 Actor。BackoffSupervisor 是这样处理下面几个特殊消息的:
- private[akka] trait HandleBackoff {this: Actor ⇒
- def childProps: Props
- def childName: String
- def reset: BackoffReset
- varchild: Option[ActorRef] = None
- varrestartCount =0
- import BackoffSupervisor._
- import context.dispatcher
- overridedef preStart(): Unit = startChild()
- def startChild(): Unit = {
- if (child.isEmpty) {
- child = Some(context.watch(context.actorOf(childProps, childName)))
- }
- }
- def handleBackoff: Receive = {
- case StartChild ⇒
- startChild()
- reset match {
- case AutoReset(resetBackoff) ⇒
- val _ = context.system.scheduler.scheduleOnce(resetBackoff, self, ResetRestartCount(restartCount))
- case_ ⇒// ignore
- }
- case Reset ⇒
- reset match {
- caseManualReset ⇒ restartCount =0
- case msg ⇒ unhandled(msg)
- }
- case ResetRestartCount(current) ⇒
- if(current == restartCount) {
- restartCount =0
- }
- case GetRestartCount ⇒
- sender() ! RestartCount(restartCount)
- case GetCurrentChild ⇒
- sender() ! CurrentChild(child)
- casemsgif child.contains(sender()) ⇒
- // use the BackoffSupervisor as sendercontext.parent ! msg
- case msg ⇒ child match {
- case Some(c) ⇒ c.forward(msg)
- case None ⇒ context.system.deadLetters.forward(msg)
- }
- }
- }
在 handleBackoff 函数里可以找到这些消息的处理方式。
在构建上面例子里的 Supervisor 的 Props 时定义了监管策略(SupervisorStrategy)对 InnerChild 产生的异常 ChildException 进行 Restart 处理。我们调整一下 InnerChild 代码来随机产生一些异常:
- object InnerChild {
- case class TestMessage(msg: String)
- class ChildException(val errmsg: TestMessage) extends Exception
- objectCException {//for pattern match of class with parameterdef apply(msg: TestMessage) =new ChildException(msg)
- def unapply(cex: ChildException) = Some(cex.errmsg)
- }
- def props = Props[InnerChild]
- }
- class InnerChild extends Actor with ActorLogging {
- import InnerChild._
- context.parent ! BackoffSupervisor.Reset//reset backoff counts
- overridedef receive: Receive = {
- caseTestMessage(msg) =>//模拟子级功能
- if(Random.nextBoolean())//任意产生异常
- throw new ChildException(TestMessage(msg))
- else
- log.info(s"Child received message: ${msg}")
- }
- }
我们用 Random.nextBoolean 来任意产生一些异常。注意:我们同时把 ChildException 改成了一个带参数的 class,因为我们可能需要在重启之前获取造成异常的消息,如下:
- def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
- caseInnerChild.CException(tmsg) =>
- println(s"Message causing exception: ${tmsg.msg}")//we can extract message here
- SupervisorStrategy.Restart
- }
所有信息发给 supervisor 就行了:
- class ParentalActor extends Actor with ActorLogging {
- import ParentalActor._
- //在这里构建子级Actor supervisorval supervisor = context.actorOf(Supervisor.props,"supervisor")
- overridedef receive: Receive = {
- casemsg@ _ => supervisor ! msg
- }
- }
- object BackoffSupervisorDemo extends App {
- import ParentalActor._
- import InnerChild._
- val testSystem = ActorSystem("testSystem")
- val parent = testSystem.actorOf(ParentalActor.props,"parent")
- parent ! TestMessage("Hello message 1 to supervisor")
- parent ! TestMessage("Hello message 2 to supervisor")
- parent ! TestMessage("Hello message 3 to supervisor")
- parent ! TestMessage("Hello message 4 to supervisor")
- parent ! TestMessage("Hello message 5 to supervisor")
- parent ! TestMessage("Hello message 6 to supervisor")
- scala.io.StdIn.readLine()
- testSystem.terminate()
- }
运行后发现在出现异常后所有消息都变成了 DeadLetter:
- [INFO][05 / 29 / 2017 18 : 22 : 11.689][testSystem - akka.actor.
- default - dispatcher - 5][akka: //testSystem/user/parent/supervisor/innerChild] Message [backoffSupervisorDemo.InnerChild$TestMessage] from Actor[akka://testSystem/user/parent#2140150413] to Actor[akka://testSystem/user/parent/supervisor/innerChild#-1047097634] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
- ....
这也证明了 BackoffSupervisor 具有不同的 Restart 处理方式,好像是直接终止 InnerChild 而非正常的挂起,销毁了 ActorRef 和邮箱,所以在完成启动之前发给 InnerChild 的消息都被导入 DeadLetter 队列了。也就是说不但错过造成异常的消息,而是跳过了下面启动时间段内所有的消息。
下面我们来解决失踪消息的问题:首先是如何重新发送造成异常的消息,我们可以在监管策略中重启前发送:
- def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
- caseInnerChild.CException(tmsg) =>
- println(s"Message causing exception: ${tmsg.msg}")//we can extract message hereBackoffSupervisorDemo.sendToParent(tmsg)//resend message
- SupervisorStrategy.Restart
- }
在 BackoffSupervisorDemo 里先声明 sendToParent 函数:
- def sendToParent(msg: TestMessage) = parent ! msg
然后再想办法把 DeadLetter 捞出来。我们可以用 Akka 的 eventStream 来订阅 DeadLetter 类型消息:
- object DeadLetterMonitor {
- def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef))
- }
- class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging {
- import InnerChild._
- import context.dispatcher
- overridedef receive: Receive = {
- caseDeadLetter(msg,sender,_) =>//wait till InnerChild finishes restart then resendcontext.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage])
- }
- }
- object BackoffSupervisorDemo extends App {
- import ParentalActor._
- import InnerChild._
- def sendToParent(msg: TestMessage) = parent ! msg
- val testSystem = ActorSystem("testSystem")
- val parent = testSystem.actorOf(ParentalActor.props,"parent")
- val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor")
- testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter
- parent ! TestMessage("Hello message 1 to supervisor")
- parent ! TestMessage("Hello message 2 to supervisor")
- parent ! TestMessage("Hello message 3 to supervisor")
- parent ! TestMessage("Hello message 4 to supervisor")
- parent ! TestMessage("Hello message 5 to supervisor")
- parent ! TestMessage("Hello message 6 to supervisor")
- scala.io.StdIn.readLine()
- testSystem.terminate()
- }
试运算后显示 InnerChild 成功处理了所有 6 条消息。
下面是本次讨论的完整示范代码:
- package backoffSupervisorDemo
- import akka.actor._
- import akka.pattern._
- import scala.util.Random
- import scala.concurrent.duration._
- object InnerChild {
- case class TestMessage(msg: String)
- class ChildException(val errmsg: TestMessage) extends Exception
- objectCException {//for pattern match of class with parameterdef apply(msg: TestMessage) =new ChildException(msg)
- def unapply(cex: ChildException) = Some(cex.errmsg)
- }
- def props = Props[InnerChild]
- }
- class InnerChild extends Actor with ActorLogging {
- import InnerChild._
- context.parent ! BackoffSupervisor.Reset//reset backoff counts
- overridedef receive: Receive = {
- caseTestMessage(msg) =>//模拟子级功能
- if(Random.nextBoolean())//任意产生异常
- throw new ChildException(TestMessage(msg))
- else
- log.info(s"Child received message: ${msg}")
- }
- }
- object Supervisor {
- def props: Props = {//在这里定义了监管策略和child Actor构建def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
- caseInnerChild.CException(tmsg) =>
- println(s"Message causing exception: ${tmsg.msg}")//we can extract message hereBackoffSupervisorDemo.sendToParent(tmsg)//resend message
- SupervisorStrategy.Restart
- }
- val options = Backoff.onFailure(InnerChild.props,"innerChild",1second,5seconds,0.0)
- .withManualReset
- .withSupervisorStrategy(
- OneForOneStrategy(maxNrOfRetries =5, withinTimeRange =5 seconds)(
- decider.orElse(SupervisorStrategy.defaultDecider)
- )
- )
- BackoffSupervisor.props(options)
- }
- }
- //注意:下面是Supervisor的父级,不是InnerChild的父级
- object ParentalActor {
- case class SendToSupervisor(msg: InnerChild.TestMessage)
- case class SendToInnerChild(msg: InnerChild.TestMessage)
- case class SendToChildSelection(msg: InnerChild.TestMessage)
- def props = Props[ParentalActor]
- }
- class ParentalActor extends Actor with ActorLogging {
- import ParentalActor._
- //在这里构建子级Actor supervisorval supervisor = context.actorOf(Supervisor.props,"supervisor")
- overridedef receive: Receive = {
- casemsg@ _ => supervisor ! msg
- }
- }
- object DeadLetterMonitor {
- def props(parentRef: ActorRef) = Props(new DeadLetterMonitor(parentRef))
- }
- class DeadLetterMonitor(receiver: ActorRef) extends Actor with ActorLogging {
- import InnerChild._
- import context.dispatcher
- overridedef receive: Receive = {
- caseDeadLetter(msg,sender,_) =>//wait till InnerChild finishes restart then resendcontext.system.scheduler.scheduleOnce(1 second,receiver,msg.asInstanceOf[TestMessage])
- }
- }
- object BackoffSupervisorDemo extends App {
- import ParentalActor._
- import InnerChild._
- def sendToParent(msg: TestMessage) = parent ! msg
- val testSystem = ActorSystem("testSystem")
- val parent = testSystem.actorOf(ParentalActor.props,"parent")
- val deadLetterMonitor = testSystem.actorOf(DeadLetterMonitor.props(parent),"dlmonitor")
- testSystem.eventStream.subscribe(deadLetterMonitor,classOf[DeadLetter]) //listen to DeadLetter
- parent ! TestMessage("Hello message 1 to supervisor")
- parent ! TestMessage("Hello message 2 to supervisor")
- parent ! TestMessage("Hello message 3 to supervisor")
- parent ! TestMessage("Hello message 4 to supervisor")
- parent ! TestMessage("Hello message 5 to supervisor")
- parent ! TestMessage("Hello message 6 to supervisor")
- scala.io.StdIn.readLine()
- testSystem.terminate()
- }
来源: http://www.cnblogs.com/tiger-xc/p/6918830.html