在开始讨论 Akka 中对 Actor 的生命周期管理前,我们先探讨一下所谓的 Actor 编程模式。对比起我们习惯的行令式(imperative)编程模式,Actor 编程模式更接近现实中的应用场景和功能测试模式。这是因为 Actor 是靠消息来驱动的,每种消息代表一项功能的运算指令。由于消息驱动式的程序是松散耦合的,每项功能都是在独立的线程中运算,互不干扰依赖,所以我们可以很自然的分开来实现各项功能以及独立测试每项功能。虽然 Akka 同时提供了 Java 和 Scala 两种 API,但可能由于 Akka 本身是用 Scala 开发的,所以感觉用 Scala 来开发 Akka 程序会更自然些:笼统来讲,Actor 编程主要就是对 receive 函数的实现。而 receive 函数就是几个普通的功能函数用模式匹配的方式按消息类型进行调用。receive 函数所调用的功能函数可以是任何 JVM 兼容语言函数,由于每个 Actor 的运算都在自己独立的线程里进行,所以我们不必担心 Actor 函数在运行中的交叉调用问题。Akka 程序本就是一种原生的多线程程序,每个 Actor 都在一个自己的线程内独立运算它的 receive 函数。除此之外 Actor 的运算环境可以在任何不同的 JVM 里,只要 Akka 信息发送能实现跨 JVM 投递的话,实现分布式程序也是自然而然的事了。所以,理论上 Akka 编程初学者应该把主要注意力放在这个 receive 函数的实现上来,按照一种模版式的方式来编写 Akka 程序就可以了,如下面演示的这个模版例子:
- import akka.actor._
- objectMyActor {//在这个伴生对象里申明MyActor所支持的功能指令
- sealed trait ActorCommands
- case object RunFuncA extends ActorCommands
- case object RunFuncB extends ActorCommands
- }
- //假设有funcA,funcB. 它们可以从任何JVM函数库里调用val funcA : () => Any =
- val funcB : () => Any =class MyActor extends Actor {
- import MyActor._
- varstateValue: Any = _//内部状态,代表这个Actor的当前运算结果
- overridedef receive: Receive = {
- caseRunFuncA => stateValue = funcA//运算funcA,更新stateValue
- caseRunFuncB => stateValue = funcB//运算funcB,更新stateValue
- ...
- }
- }
以上是一个 Actor 需要实现的功能编程样板。可以说其它 Akka 编程部分也都不过是标准的铺垫代码而已。在我来看就是把原来的一个完整程序按功能(应该是按程序状态)切分开来并按上面的模板套入各种 Actor 的 receive 函数里。想想看,如此这般我们就可以实现一个分布式的多线程程序了。
行令式(imperative)程序的运算流程是按代码顺序进行的。这种流程模式不方便运算流程控制,这个缺点在进行异常处理时更加明显。对于一段我们认为在运算中可能发生异常的代码,我们只能用 try-catch 来把这段代码包裹起来。那么对于一个安全考虑的比较详细的程序来讲就会出现许多 try-catch 代码段混合在运算流程里造成整体程序逻辑紊乱,不利对程序的理解和维护。再试想如果容许重试异常运算的话会是怎样的一个场景。而这个问题在 Akka 编程中得到了完美的解决。在 Akka 编程里我们可以把每段可能产生异常的代码放到一个独立的 Actor 中去运算。Akka 的 Actor 组织是一个层级结构。下层 Actor 是由直接上一层 Actor 产生,形成一种父子 Actor 关系。父级 Actor 除维护自身状态之外还必须负责处理下一层子级 Actor 所发生的异常,形成一种树形父子层级监管结构。任何子级 Actor 在运算中发生异常后立即将自己和自己的子级 Actor 运算挂起,并将下一步行动交付给自己的父级 Actor 决定。父级 Actor 对发生异常的子级 Actor 有以下几种处理方式:
1、恢复运算(Resume):不必理会异常,保留当前状态,跳过当前异常消息,照常继续处理其它消息
2、重新启动(Restart):清除当前状态,保留邮箱及内容,终止当前 Actor,再重新构建一个新的 Actor 实例,沿用原来的消息地址 ActorRef 继续工作
3、彻底终止(Stop):销毁当前 Actor 及 ActorRef 邮箱,把所有消息导向 DeadLetter 队列。
4、向上提交(Esculate):如果父级无法处理子级异常,则这种情况也视为父级出现的异常。按照规定,父级会将自己和子级 Actor 运算暂停挂起并把子级 Actor 实际产生的异常当作自己发生的异常提交给上一层父级处理(也就是说异常信息的发送者 sender 变成了父级 Actor)。
Akka 处理异常的方式简单直接:如果发生异常就先暂停挂起然后交给直属父级 Actor 去处理。这就把异常封闭在这个 Actor 的监管链条里。Akka 系统的监管链条实际代表一个功能的分散封闭运算,所以一个监管链条里发生的异常不会影响其它监管链条。换句话说就是 Actor 发生异常是封闭在它所属的功能内部的,一个功能发生异常不会影响其它功能。而在行令式程序中,如果没有 try-catch,任何一段产生异常的代码都会导致整个程序中断。
Akka 提供了 OneForOneStrategy 和 AllForOneStrategy 两种对待异常 Actor 的策略配置,策略中定义了对下属子级发生的各种异常的处理方式。异常处理策略是以策略施用对象分类的,如下:
OneForOneStrategy:只针对发生异常的 Actor 施用策略
AllForOneStrategy:虽然一个直属子级 Actor 发生了异常,监管父级 Actor 把它当作所有下属子级同时发生了相同异常,对所有子级 Actor 施用策略
下面是一个典型的策略例子:
- OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
- case _:
- ArithmeticException = >Resume
- case _:
- SomeMinerExecption = >Resume
- case _:
- NullPointerException = >Restart
- case _:
- IllegalArgumentException = >Stop
- case _:
- Exception = >Escalate
- }
Akka 对待这种父子监管的原则保证了在 Akka 系统中不会出现任何孤儿,也就是说保证不会出现断裂的监管树。这就要求当任何一个 Actor 在暂停挂起前都要保证先暂停挂起它的所有直属子级 Actor,而子级则必须先暂停挂起它们的直属子级,如此递归。同样,任何 Actor 在重启(Restart)时也必须递归式地重启直属子级,因为重启一个 Actor 需要先停止再启动,我们必须肯定在停止时不会产生孤儿 Actor。如果一个父级 Actor 无法处理子级异常需要向上提交(Esculate)的话,首先它需要采取递归方式来暂停挂起自身以下的监管链条。它的直属父级 Actor 会按自己的异常处理策略来对待提交上来的异常,处理的结果将会递归式沿着监管树影响属下的所有子子孙孙。但如果这个级别的 Actor 异常处理策略还是无法覆盖这个异常时,它又会挂起自己,再向上提交(Esculate)。那么如果到达了顶级 Actor 又如何向上提交呢?Akka 系统最终的异常处理策略可以在 config 文件里配置:
- # The guardian"/user"will usethis class to obtain its supervisorStrategy.
- # It needs to be a subclass of akka.actor.SupervisorStrategyConfigurator.
- # In addition to the defaultthereis akka.actor.StoppingSupervisorStrategy.
- guardian-supervisor-strategy ="akka.actor.DefaultSupervisorStrategy"
默认策略是 DefaultSupervisorStrategy。以下是 Akka 提供的默认策略:
- /**
- * When supervisorStrategy is not specified for an actor this
- * `Decider` is used by default in the supervisor strategy.
- * The child will be stopped when [[akka.actor.ActorInitializationException]],
- * [[akka.actor.ActorKilledException]], or [[akka.actor.DeathPactException]] is
- * thrown. It will be restarted for other `Exception` types.
- * The error is escalated if it's a `Throwable`, i.e. `Error`.
- */
- final val defaultDecider: Decider = {
- case _:
- ActorInitializationException⇒Stop
- case _:
- ActorKilledException⇒Stop
- case _:
- DeathPactException⇒Stop
- case _:
- Exception⇒Restart
- }
- /**
- * When supervisorStrategy is not specified for an actor this
- * is used by default. OneForOneStrategy with decider defined in
- * [[#defaultDecider]].
- */
- f inal val defaultStrategy:
- SupervisorStrategy = {
- OneForOneStrategy()(defaultDecider)
- }
我们看到前面三种异常直属父级直接终止子级 Actor,其它类型重启。当然我们可以在这个默认策略之上再添加自定义的一些异常处理策略:
- overrideval supervisorStrategy =
- OneForOneStrategy(maxNrOfRetries =10, withinTimeRange =1 minute) {
- case_: ArithmeticException => Resume
- case_: MyException => Restart
- caset =>
- super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
- }
上面提到 Akka 绝对不容许有孤儿 Actor 存在(断裂的监管树),所以停止任何一个 Actor,它下属的子子孙孙都会自下而上依次停止运算。为了更好的理解 Actor 的监管策略,我们必须先从了解 Actor 的生命周期(lift-cycle)开始。一个 Actor 从构建产生 ActorRef 开始到彻底终止为整个生命周期。其中可以发生多次重启(Restart)。我们在下面对 Actor 的开始、终止、重启这三个环节中发生的事件进行描述:
1、开始
当 Akka 通过 Props 构建一个 Actor 后,这个 Actor 可以立即开始处理消息,进入开始(started)状态。Akka 提供了针对开始状态的事件接口(event hooks)preStart 如下:
- /**
- * User overridable callback.
- * <p/>
- * Is called when an Actor is started.
- * Actors are automatically started asynchronously when created.
- * Empty default implementation.
- */
- @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
- //#lifecycle-hooksdef preStart(): Unit = ()
我们可以重载 preStart 在 Actor 开始处理消息前进行一些初始化准备工作,如下:
- overridedef preStart={
- log.info ("Starting storage actor...")
- initDB
- }
2、终止
一个 Actor 可能因为完成运算、发生异常又或者人为通过发送 Kill,PoisonPill 强行终止等而进入停止(stopping)状态。在停止过程中这个 Actor 会先以递归方式停止它属下的所有子孙 Actor 然后停止处理消息并将所有发给它的消息导向 DeadLetter 队列。Akka 提供了事件接口 postStop:
- /**
- * User overridable callback.
- * <p/>
- * Is called asynchronously after 'actor.stop()' is invoked.
- * Empty default implementation.
- */
- @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
- //#lifecycle-hooksdef postStop(): Unit = ()
我们可以重载 postStop 来进行一些事后清理工作:
- overridedef postStop={
- log.info ("Stopping storage actor...")
- db.release
- }
3、重启
重启是 Actor 生命周期里一个最重要的环节。在一个 Actor 的生命周期里可能因为多种原因发生重启(Restart)。造成一个 Actor 需要重启的原因可能有下面几个:
1、在处理某特定消息时造成了系统性的异常,必须通过重启来清理系统错误
2、内部状态毁坏,必须通过重启来重新构建状态
3、在处理消息时无法使用到一些依赖资源,需要重启来重新配置资源
重启是一个先停止再开始的过程。父级 Actor 通过递归方式先停止下面的子孙 Actor,那么在启动过程中这些停止的子孙 Actor 是否会自动构建呢?这里需要特别注意:因为父级 Actor 是通过 Props 重新构建的,如果子级 Actor 的构建是在父级 Actor 的类构建器内而不是在消息处理函数内构建的,那么子级 Actor 会自动构建。Akka 提供了 preRestart 和 postRestart 两个事件接口。preRestart 发生在停止之前,postRestart 发生在开始前,如下:
- /**
- * Scala API: User overridable callback: '''By default it disposes of all children and then calls `postStop()`.'''
- * @param reason the Throwable that caused the restart to happen
- * @param message optionally the current message the actor processed when failing, if applicable
- * <p/>
- * Is called on a crashed Actor right BEFORE it is restarted to allow clean
- * up of resources before Actor is terminated.
- */
- @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
- //#lifecycle-hooksdef preRestart(reason: Throwable, message: Option[Any]): Unit = {
- context.children foreach { child ⇒
- context.unwatch(child)
- context.stop(child)
- }
- postStop()
- }
- //#lifecycle-hooks
- /**
- * User overridable callback: By default it calls `preStart()`.
- * @param reason the Throwable that caused the restart to happen
- * <p/>
- * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
- */
- @throws(classOf[Exception]) // when changing this you MUST also change ActorDocTest
- //#lifecycle-hooksdef postRestart(reason: Throwable): Unit = {
- preStart()
- }
可以看到:Akka 提供给 Actor 的默认事件接口 preRestart 先将所有直属子级 Actor 全部停止并把它们从监视清单里剔除,然后调用 postStop 执行事后清理。所以如果我们需要重载 preRestart 应该注意调用 super.preRestart 才能保留这些动作,如下:
- overridedef preRestart(reason: Throwable, message: Option[Any]): Unit = {
- log.info(s"Parent restarting with error ${message}...")
- doSomeWorkBeforeStopping
- super.preRestart(reason, message)
- }
postRestart 发生在开始之前,调用了事件接口 preStart。如果我们重载了 preStart 进行初始化,那么在重载 postRestart 时可以选择是否在重启时需要再进行初始化,如果需要则必须调用 super.postRestart:
- overridedef postRestart(reason: Throwable): Unit = {
- log.info("need to initialize too ...")
- doSomeExtraInit
- super.postRestart(reason)
- }
我们知道:很多时候由于外界原因,Actor 的重启无法保证一次成功。这种现象在使用依赖资源如数据库、网络连接等最为明显。我们前面介绍过的异常处理策略中就包含了重试(retry)次数及最长重试时间,如下:
- /**
- * Applies the fault handling `Directive` (Resume, Restart, Stop) specified in the `Decider`
- * to the child actor that failed, as opposed to [[akka.actor.AllForOneStrategy]] that applies
- * it to all children.
- *
- * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, negative value means no limit,
- * if the limit is exceeded the child actor is stopped
- * @param withinTimeRange duration of the time window for maxNrOfRetries, Duration.Inf means no window
- * @param decider mapping from Throwable to [[akka.actor.SupervisorStrategy.Directive]], you can also use a
- * [[scala.collection.immutable.Seq]] of Throwables which maps the given Throwables to restarts, otherwise escalates.
- * @param loggingEnabled the strategy logs the failure if this is enabled (true), by default it is enabled
- */
- case class OneForOneStrategy(
- maxNrOfRetries: Int = -1,
- withinTimeRange: Duration = Duration.Inf,
- overrideval loggingEnabled: Boolean =true)(val decider: SupervisorStrategy.Decider)
- extends SupervisorStrategy {...}
为了应付更复杂的重启方式,Akka 提供了一种逐步延时重启策略(BackoffSupervisor)。BackoffSupervisor 的定义如下:
- /**
- * Back-off supervisor that stops and starts a child actor using a back-off algorithm when the child actor stops.
- * This back-off supervisor is created by using `akka.pattern.BackoffSupervisor.props`
- * with `Backoff.onStop`.
- */
- final class BackoffSupervisor(
- val childProps: Props,
- val childName: String,
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- val reset: BackoffReset,
- randomFactor: Double,
- strategy: SupervisorStrategy)
- extends Actor with HandleBackoff {...}
- /**
- * Props for creating a [[BackoffSupervisor]] actor from [[BackoffOptions]].
- *
- * @param options the [[BackoffOptions]] that specify how to construct a backoff-supervisor.
- */
- def props(options: BackoffOptions): Props = options.props
- /**
- * Builds back-off options for creating a back-off supervisor.
- * You can pass `BackoffOptions` to `akka.pattern.BackoffSupervisor.props`.
- * An example of creating back-off options:
- * {{{
- * Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor)
- * .withManualReset
- * .withSupervisorStrategy(
- * OneforOneStrategy(){
- * case e: GivingUpException => Stop
- * case e: RetryableException => Restart
- * }
- * )
- *
- * }}}
- */
- object Backoff {
- /**
- * Back-off options for creating a back-off supervisor actor that expects a child actor to restart on failure.
- *
- * This explicit supervisor behaves similarly to the normal implicit supervision where
- * if an actor throws an exception, the decider on the supervisor will decide when to
- * `Stop`, `Restart`, `Escalate`, `Resume` the child actor.
- *
- * When the `Restart` directive is specified, the supervisor will delay the restart
- * using an exponential back off strategy (bounded by minBackoff and maxBackoff).
- *
- * This supervisor is intended to be transparent to both the child actor and external actors.
- * Where external actors can send messages to the supervisor as if it was the child and the
- * messages will be forwarded. And when the child is `Terminated`, the supervisor is also
- * `Terminated`.
- * Transparent to the child means that the child does not have to be aware that it is being
- * supervised specifically by this actor. Just like it does
- * not need to know when it is being supervised by the usual implicit supervisors.
- * The only caveat is that the `ActorRef` of the child is not stable, so any user storing the
- * `sender()` `ActorRef` from the child response may eventually not be able to communicate with
- * the stored `ActorRef`. In general all messages to the child should be directed through this actor.
- *
- * An example of where this supervisor might be used is when you may have an actor that is
- * responsible for continuously polling on a server for some resource that sometimes may be down.
- * Instead of hammering the server continuously when the resource is unavailable, the actor will
- * be restarted with an exponentially increasing back off until the resource is available again.
- *
- * '''***
- * This supervisor should not be used with `Akka Persistence` child actors.
- * `Akka Persistence` actors shutdown unconditionally on `persistFailure()`s rather
- * than throw an exception on a failure like normal actors.
- * [[#onStop]] should be used instead for cases where the child actor
- * terminates itself as a failure signal instead of the normal behavior of throwing an exception.
- * ***'''
- * You can define another
- * supervision strategy by using `akka.pattern.BackoffOptions.withSupervisorStrategy` on [[akka.pattern.BackoffOptions]].
- *
- * @param childProps the [[akka.actor.Props]] of the child actor that
- * will be started and supervised
- * @param childName name of the child actor
- * @param minBackoff minimum (initial) duration until the child actor will
- * started again, if it is terminated
- * @param maxBackoff the exponential back-off is capped to this duration
- * @param randomFactor after calculation of the exponential back-off an additional
- * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
- * In order to skip this additional delay pass in `0`.
- */
- def onFailure(
- childProps: Props,
- childName: String,
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- randomFactor: Double): BackoffOptions =
- BackoffOptionsImpl(RestartImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor)
- /**
- * Back-off options for creating a back-off supervisor actor that expects a child actor to stop on failure.
- *
- * This actor can be used to supervise a child actor and start it again
- * after a back-off duration if the child actor is stopped.
- *
- * This is useful in situations where the re-start of the child actor should be
- * delayed e.g. in order to give an external resource time to recover before the
- * child actor tries contacting it again (after being restarted).
- *
- * Specifically this pattern is useful for persistent actors,
- * which are stopped in case of persistence failures.
- * Just restarting them immediately would probably fail again (since the data
- * store is probably unavailable). It is better to try again after a delay.
- *
- * It supports exponential back-off between the given `minBackoff` and
- * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and
- * `maxBackoff` 30 seconds the start attempts will be delayed with
- * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset
- * if the actor is not terminated within the `minBackoff` duration.
- *
- * In addition to the calculated exponential back-off an additional
- * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20%
- * delay. The reason for adding a random delay is to avoid that all failing
- * actors hit the backend resource at the same time.
- *
- * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild`
- * message to this actor and it will reply with [[akka.pattern.BackoffSupervisor.CurrentChild]]
- * containing the `ActorRef` of the current child, if any.
- *
- * The `BackoffSupervisor`delegates all messages from the child to the parent of the
- * `BackoffSupervisor`, with the supervisor as sender.
- *
- * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
- *
- * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
- * if it wants to do an intentional stop.
- *
- * Exceptions in the child are handled with the default supervisionStrategy, which can be changed by using
- * [[BackoffOptions#withSupervisorStrategy]] or [[BackoffOptions#withDefaultStoppingStrategy]]. A
- * `Restart` will perform a normal immediate restart of the child. A `Stop` will
- * stop the child, but it will be started again after the back-off duration.
- *
- * @param childProps the [[akka.actor.Props]] of the child actor that
- * will be started and supervised
- * @param childName name of the child actor
- * @param minBackoff minimum (initial) duration until the child actor will
- * started again, if it is terminated
- * @param maxBackoff the exponential back-off is capped to this duration
- * @param randomFactor after calculation of the exponential back-off an additional
- * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay.
- * In order to skip this additional delay pass in `0`.
- */
- def onStop(
- childProps: Props,
- childName: String,
- minBackoff: FiniteDuration,
- maxBackoff: FiniteDuration,
- randomFactor: Double): BackoffOptions =
- BackoffOptionsImpl(StopImpliesFailure, childProps, childName, minBackoff, maxBackoff, randomFactor)
- }
- /**
- * Configures a back-off supervisor actor. Start with `Backoff.onStop` or `Backoff.onFailure`.
- * BackoffOptions is immutable, so be sure to chain methods like:
- * {{{
- * val options = Backoff.onFailure(childProps, childName, minBackoff, maxBackoff, randomFactor)
- * .withManualReset
- * context.actorOf(BackoffSupervisor.props(options), name)
- * }}}
- */
- trait BackoffOptions {
- /**
- * Returns a new BackoffOptions with automatic back-off reset.
- * The back-off algorithm is reset if the child does not crash within the specified `resetBackoff`.
- * @param resetBackoff The back-off is reset if the child does not crash within this duration.
- */
- def withAutoReset(resetBackoff: FiniteDuration): BackoffOptions
- /**
- * Returns a new BackoffOptions with manual back-off reset. The back-off is only reset
- * if the child sends a `BackoffSupervisor.Reset` to its parent (the backoff-supervisor actor).
- */
- def withManualReset: BackoffOptions
- /**
- * Returns a new BackoffOptions with the supervisorStrategy.
- * @param supervisorStrategy the supervisorStrategy that the back-off supervisor will use.
- * The default supervisor strategy is used as fallback if the specified supervisorStrategy (its decider)
- * does not explicitly handle an exception. As the BackoffSupervisor creates a separate actor to handle the
- * backoff process, only a [[OneForOneStrategy]] makes sense here.
- */
- def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy): BackoffOptions
- /**
- * Returns a new BackoffOptions with a default `SupervisorStrategy.stoppingStrategy`.
- * The default supervisor strategy is used as fallback for throwables not handled by `SupervisorStrategy.stoppingStrategy`.
- */
- def withDefaultStoppingStrategy: BackoffOptions
- /**
- * Returns the props to create the back-off supervisor.
- */
- private[akka] def props: Props
- }
注意以上源代码中 Backoff.onFailure 和 Backoff.onStop 的使用说明:当一个预设为永生的子级 Actor 由于某些原因而停止后再重启时用 onStop、当一个子级 Actor 因为异常造成失败中断再重启时用 onFailure。所以在处理异常时我们应该使用 onFailure。我们看到 BackoffSupervior 提供了更详细的重启方式支持。下面是使用 BackoffSupervisor 的一个典型例子:
- val childProps = Props(classOf[EchoActor])
- val supervisor = BackoffSupervisor.props(
- Backoff.onFailure(
- childProps,
- childName ="myEcho",
- minBackoff =3.seconds,
- maxBackoff =30.seconds,
- randomFactor =0.2 // adds 20% "noise" to vary the intervals slightly
- ).withManualReset
- .withSupervisorStrategy(
- OneforOneStrategy(){
- case_: GivingUpException => Stop
- case_: RetryableException => Restart
- case_: MinorException => Resume
- }
- )
- )
- system.actorOf(supervisor, name ="echoSupervisor")
以上的 withManualReset 是个 BackoffOption 的方法:要求子级 Actor 在成功重启后手动发送 akka.pattern.BackoffSupervisor.Reset 给它的监管父级 Actor 使其可以清除那些计数器,Akka 源代码中是这样处理的:
- case Reset ⇒
- reset match {
- caseManualReset ⇒ restartCount =0
- case msg ⇒ unhandled(msg)
- }
我们也可以用自动方式 withAutoReset(3.seconds):
- /**
- * Returns a new BackoffOptions with automatic back-off reset.
- * The back-off algorithm is reset if the child does not crash within the specified `resetBackoff`.
- * @param resetBackoff The back-off is reset if the child does not crash within this duration.
- */
- def withAutoReset(resetBackoff: FiniteDuration): BackoffOptions
- /**
- * Returns a new BackoffOptions with manual back-off reset. The back-off is only reset
- * if the child sends a `BackoffSupervisor.Reset` to its parent (the backoff-supervisor actor).
- */
- def withManualReset: BackoffOptions
现在我们发现:异常处理策略没有包括对下属正常终止(termination)信息的监听。那么如何捕捉 Actor 终止的信息呢?Akka 提供了 context.watch 和 context.unwatch 来设置通过 ActorRef 对任何 Actor 的终止状态监视,无须父子级别关系要求。下面是 Akka 提供的这两个函数:
- /**
- * Have this FunctionRef watch the given Actor. This method must not be
- * called concurrently from different threads, it should only be called by
- * its parent Actor.
- *
- * Upon receiving the Terminated message, unwatch() must be called from a
- * safe context (i.e. normally from the parent Actor).
- */
- def watch(actorRef: ActorRef): Unit = {
- watching += actorRef
- actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Watch(actorRef.asInstanceOf[InternalActorRef], this))
- }
- /**
- * Have this FunctionRef unwatch the given Actor. This method must not be
- * called concurrently from different threads, it should only be called by
- * its parent Actor.
- */
- def unwatch(actorRef: ActorRef): Unit = {
- watching -= actorRef
- actorRef.asInstanceOf[InternalActorRef].sendSystemMessage(Unwatch(actorRef.asInstanceOf[InternalActorRef], this))
- }
被监视对象的终止事件是通过 Terminate 消息获取的。典型的监视方式示范如下:
- class DeathPactExceptionParentActor extends Actor with ActorLogging{
- def receive={
- case "create_child"=> {
- log.info ("creating child")
- val child=context.actorOf(Props[DeathPactExceptionChildActor])
- context.watch(child) //watch child's deathchild!"stop"
- }
- case "someMessage"=> log.info ("some message")
- caseTerminated(_) => context.stop(self)//child has stopped
- }
- }
讲了这么多,还是感到有许多疑问,可能还是用一些代码来了解一下这些策略的具体用法。我想,无可否认的 BackoffSupervisor 应该是个最硬的骨头,我们先设计一个场景来示范 BackoffSupervisor 的用法和效果:假设一个咖啡餐厅场景,其中有个厨房,厨房内有大厨,这几个环节都可以用 Actor 来表达。其中餐厅是顶层 Actor,直属子级是厨房,而大厨则是厨房的直属子级。我们可以把厨房 Actor 作为一个 BackoffSupervisor,这样当大厨 Actor 出现任何异常时厨房 Actor 可以用一种逐步延时的方式来重启大厨 Actor。我们先定义这个大厨 Actor:
- object Chef {
- sealed trait Cooking
- case object CookSpecial extends Cooking
- class ChefBusy(msg: String) extends Exception(msg)
- def props = Props(new Chef)
- }
- class Chef extends Actor with ActorLogging {
- import Chef._
- log.info(s"Chef actor created at ${System.currentTimeMillis()}")
- overridedef receive: Receive = {
- case_ =>throw newChefBusy("Chef is busy cooking!")
- }
- overridedef preRestart(reason: Throwable, message: Option[Any]): Unit = {
- super.preRestart(reason, message)
- log.info(s"Restarting Chef for $message")
- }
- overridedef postRestart(reason: Throwable): Unit = {
- super.postRestart(reason)
- log.info(s"Chef restarted for ${reason.getMessage}")
- }
- overridedef postStop(): Unit = {
- log.info("Chef stopped!")
- }
- }
以上还包括了 Chef 的生命周期跟踪。现在 Chef 的唯一功能就是收到消息就立即产生异常 ChefBusy,控制马上交到直属父级 Actor。Chef 的直属父级 Actor 是 Kitchen:
- object Kitchen {
- def kitchenProps = {
- import Chef._
- val options = Backoff.onFailure(Chef.props,"chef",200millis,10seconds,0.0)
- .withSupervisorStrategy(OneForOneStrategy(maxNrOfRetries =4, withinTimeRange =30 seconds) {
- case_: ChefBusy => SupervisorStrategy.Restart
- })
- BackoffSupervisor.props(options)
- }
- }
- class Kitchen extends Actor with ActorLogging {
- overridedef receive: Receive = {
- casex => context.childrenforeach{child => child ! x}
- }
- }
上面示范了 BackoffSupervisor 的 Props 定义方法。Chef Actor 的实例构建(ActorRef 产生)应该在 Backoff.onFailure() 函数里。现在我们了解了 BackoffSupervisor 只容许独子,所以 context.children 只有一个 child:"chef"。我们必须给每个需要逐步延缓监管的 Actor 设置独立的 BackoffSupervisor 监管父级。
下面我们试试 BackoffSupervisor 的具体效果:
- object Cafe extends App {
- import Kitchen._
- val cafeSystem = ActorSystem("cafe")
- val kitchen = cafeSystem.actorOf(kitchenProps,"kitchen")
- println(s"Calling chef at ${System.currentTimeMillis()}")
- kitchen !"CookCook"
- println(s"Calling chef at ${System.currentTimeMillis()}")
- Thread.sleep(1000)
- println(s"Calling chef at ${System.currentTimeMillis()}")
- kitchen !"CookCook"
- Thread.sleep(1000)
- kitchen !"CookCook"
- Thread.sleep(1000)
- kitchen !"CookCook"
- Thread.sleep(1000)
- kitchen !"CookCook"
- Thread.sleep(1000*30)
- cafeSystem.terminate()}
测试运行结果中其中一轮显示:
- Calling chef at 1495108529380[INFO][05 / 18 / 2017 19 : 55 : 29.384][cafe - akka.actor.
- default - dispatcher - 2][akka: //cafe/user/kitchen/chef] Chef actor created at 1495108529382
- [ERROR][05 / 18 / 2017 19 : 55 : 29.392][cafe - akka.actor.
- default - dispatcher - 3][akka: //cafe/user/kitchen/chef] Chef is busy cooking!
- Chef$ChefBusy: Chef is busy cooking ! at Chef$$anonfun$receive$1.applyOrElse(Cafe.scala: 24)... [INFO][05 / 18 / 2017 19 : 55 : 29.394][cafe - akka.actor.
- default - dispatcher - 2][akka: //cafe/user/kitchen/chef] Chef stopped!
- [INFO][05 / 18 / 2017 19 : 55 : 29.614][cafe - akka.actor.
- default - dispatcher - 4][akka: //cafe/user/kitchen/chef] Chef actor created at 1495108529614
- Calling chef at 1495108530382[ERROR][05 / 18 / 2017 19 : 55 : 30.382][cafe - akka.actor.
- default - dispatcher - 3][akka: //cafe/user/kitchen/chef] Chef is busy cooking!
我们看到 Chef 被重启过程。值得注意的是:生命周期监控函数中只有 postStop 被调用过,preRestart 和 postRestart 都没引用。如果这样的话 BackoffSupervisor 就是一锤子买卖,是正真的 let it crash 模式体现了。那如果需要重新处理造成异常的消息又怎么办呢?看来只好试试 SupervisorStrategy 了。我们用下面的例子来示范一下:
- import akka.actor._
- import scala.util.Random
- import scala.concurrent.duration._
- object ChildActor {
- class RndException(msg: String) extends Exception(msg)
- def props = Props[ChildActor]
- }
- class ChildActor extends Actor with ActorLogging {
- import ChildActor._
- overridedef receive: Receive = {
- casemsg: String => {//任意产生一些RndExcption
- if (Random.nextBoolean())
- throw newRndException("Any Exception!")
- else
- log.info(s"Processed message: $msg !!!")
- }
- }
- overridedef preStart(): Unit = {
- log.info("ChildActor Started.")
- super.preStart()
- }
- //在重启时preRestart是在原来的Actor实例上调用preRestart的
- overridedef preRestart(reason: Throwable, message: Option[Any]): Unit = {
- log.info(s"Restarting ChildActor for ${reason.getMessage}...")
- message match {
- caseSome(msg) =>
- log.info(s"Exception message: ${msg.toString}")
- self ! msg//把异常消息再摆放到信箱最后
- caseNone =>
- }
- super.preRestart(reason, message)
- }
- overridedef postRestart(reason: Throwable): Unit = {
- super.postRestart(reason)
- log.info(s"Restarted ChildActor for ${reason.getMessage}...")
- }
- overridedef postStop(): Unit = {
- log.info(s"Stopped ChildActor.")
- super.postStop()
- }
- }
- //监管父级
- class Parent extends Actor with ActorLogging {
- def decider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
- case_: ChildActor.RndException => SupervisorStrategy.Restart
- }
- overrideval supervisorStrategy =
- OneForOneStrategy(maxNrOfRetries =30, withinTimeRange =3 seconds) {
- decider.orElse(SupervisorStrategy.defaultDecider)
- }
- val childActor = context.actorOf(ChildActor.props,"childActor")
- overridedef receive: Receive = {
- casemsg@ _ => childActor ! msg//把所有收到的消息都转给childActor
- }
- }
以上就是一个 SupervisorStrategy 的父子结构例子。特别要注意的是 OneForOneStrategy 参数 maxNrOfRetries,这个是一次性次数设置,每次重启成功后不会重设。在整体程序运行时这个次数会不断增加直到设置数,之后发生异常直接终止被监管 Actor。下面是这个例子的运行示范:
- object TestMyActor extends App {
- val system = ActorSystem("testSystem")
- val parentActor = system.actorOf(Props[Parent],"parentActor")
- parentActor !"Hello 1"
- parentActor !"Hello 2"
- parentActor !"Hello 3"
- parentActor !"Hello 4"
- parentActor !"Hello 5"
- Thread.sleep(5000)
- system.terminate()
- }
运算结果显示所有消息都得到处理,只是顺序变得混乱了。
好了,明白了如何使用 BackoffSupervior,我们还是把整个例子完善一下吧:还是这个 Cafe 场景。Cafe 里分厨房 Kitchen、收款员 Cashier 几个部分。上面已经介绍过 Kitchen 下还有 Chef,而 Cashier 之下还有收据打印机 ReceiptPrinter。整个工作流程大致如下:
1、一个客人进店要求一杯特价咖啡
2、Cafe 要求厨房在三种咖啡中即时选择任意一款作为特价咖啡
3、Cafe 同时要求 Cashier 按厨房提供的特价咖啡价钱收款并打印收据
4、以上 2,3 成功后完成一单完整销售,更新销售额
5、完成销售目标后通知厨房打烊
6、收款员看到厨房打烊后停业关门
另外,可能出现几种异常情况:厨房的大厨可能忙不过来准备特价咖啡、收据打印机有可能卡纸。遇到这几种情况直接通知客人迟点再来光顾。
很明显 Cafe 需要维护内部状态即当前销售额 salesAmount,Kitchen 的状态是当前特餐 currentSpecial,Cashier 的状态是 paperJammed 代表收据打印机是否卡纸。
我们先从 Chef,Kitchen 及 BackoffSupervisor 监管开始:
- object Chef {
- sealedtrait Order//消息类型
- case objectMakeSpecial extends Order//烹制特饮
- classChefBusy(msg: String) extends Exception(msg)//异常类型def props = Props[Chef]
- }
- class Chef extends Actor with ActorLogging {
- import Chef._
- log.info("Chef says: I am ready to work ...")//构建成功信息
- //内部状态
- varcurrentSpecial: Cafe.Coffee = Cafe.Original
- varchefBusy: Boolean =false
- val specials = Map(0-> Cafe.Original,1-> Cafe.Espresso,2-> Cafe.Cappuccino)
- overridedef receive: Receive = {
- caseMakeSpecial => {
- if((Random.nextInt(6) %6) ==0) {//任意产生异常 2/6log.info("Chef is busy ...")
- chefBusy =true
- throw newChefBusy("Busy!")
- }
- else {
- currentSpecial = randomSpecial//选出当前特饮log.info(s"Chef says: Current special is ${currentSpecial.toString}.")
- sender() ! currentSpecial
- }
- }
- }
- def randomSpecial = specials(Random.nextInt(specials.size))//选出当前特饮
- overridedef preRestart(reason: Throwable, message: Option[Any]): Unit = {
- log.info(s"Restarting Chef for ${reason.getMessage}...")
- super.preRestart(reason, message)
- }
- overridedef postRestart(reason: Throwable): Unit = {
- log.info(s"Restarted Chef for ${reason.getMessage}.")
- context.parent ! BackoffSupervisor.Reset
- super.postRestart(reason)
- }
- overridedef postStop(): Unit = {
- log.info("Stopped Chef.")
- super.postStop()
- }
- }
- //Kitchen只是Chef的Backoff监管,没有任何其它功能
- class Kitchen extends Actor with ActorLogging {
- overridedef receive: Receive = {
- //context.children.size == 1,就是chef。 直接把所有消息转发到Chef
- casemsg@_ =>//注意,无法使用Chef ?因为sender不明context.childrenforeach( chef => chef forward msg)
- }
- overridedef postStop(): Unit = {
- log.info("Kitchen close!")
- super.postStop()
- }
- }
- object Kitchen {
- //指定的异常处理策略val kitchenDecider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
- case_: Chef.ChefBusy => SupervisorStrategy.Restart
- }
- def kitchenProps: Props = {//定义BackoffSupervisor strategyval option = Backoff.onFailure(Chef.props,"chef",1seconds,5seconds,0.0)
- .withManualReset
- .withSupervisorStrategy {
- OneForOneStrategy(maxNrOfRetries =5, withinTimeRange =5 seconds) {
- kitchenDecider.orElse(SupervisorStrategy.defaultDecider)
- }
- }
- BackoffSupervisor.props(option)
- }
- }
Kitchen 是存粹为监管 Chef 而设置的,没有任何其它功能。收到任何消息就直接 forward 给 Chef。这里值得注意的是当我们用? 发消息给 Kitchen 再 forward 给 Chef 时,sender() 是不确定的。所以必须想法子直接 ? Chef
Chef 的唯一功能就是烹制当前特饮。如果太忙无法接单,产生 ChefBusy 异常。
Cashier 和 ReceiptPrinter 同样是一种父子监管关系。我们用 SupervisorStrategy 来实现这两个 Actor:
- object ReceiptPrinter {
- case classPrintReceipt(sendTo: ActorRef, receipt: Cafe.Receipt)//print command
- class PaperJamException extends Exception
- def props = Props[ReceiptPrinter]
- }
- class ReceiptPrinter extends Actor with ActorLogging {
- import ReceiptPrinter._
- varpaperJammed: Boolean =false
- overridedef receive: Receive = {
- casePrintReceipt(customer, receipt) =>//打印收据并发送给顾客
- if((Random.nextInt(6) %6) ==0) {
- log.info("Printer jammed paper ...")
- paperJammed =true
- throw new PaperJamException
- } else {
- log.info(s"Printing receipt $receipt and sending to ${customer.path.name}")
- customer ! receipt
- }
- }
- overridedef preRestart(reason: Throwable, message: Option[Any]): Unit = {
- log.info(s"Restarting ReceiptPrinter for ${reason.getMessage}...")
- super.preRestart(reason, message)
- }
- overridedef postRestart(reason: Throwable): Unit = {
- log.info(s"Started ReceiptPrinter for ${reason.getMessage}.")
- super.postRestart(reason)
- }
- overridedef postStop(): Unit = {
- log.info("Stopped ReceiptPrinter.")
- super.postStop()
- }
- }
- object Cashier {
- case classRingRegister(cup: Cafe.Coffee, customer: ActorRef)//收款并出具收据
- def props(kitchen: ActorRef) = Props(classOf[Cashier],kitchen)
- }
- class Cashier(kitchen: ActorRef) extends Actor with ActorLogging {
- import Cashier._
- import ReceiptPrinter._
- context.watch(kitchen) //监视厨房。如果打烊了就关门歇业val printer = context.actorOf(ReceiptPrinter.props,"printer")
- //打印机卡纸后重启策略def cashierDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
- case_: PaperJamException => SupervisorStrategy.Restart
- }
- overridedef supervisorStrategy: SupervisorStrategy =
- OneForOneStrategy(maxNrOfRetries =5, withinTimeRange =5 seconds){
- cashierDecider.orElse(SupervisorStrategy.defaultDecider)
- }
- val menu = Map[Cafe.Coffee,Double](Cafe.Original ->5.50,
- Cafe.Cappuccino ->12.95, Cafe.Espresso ->11.80)
- overridedef receive: Receive = {
- caseRingRegister(coffee, customer) =>//收款并出具收据log.info(s"Producing receipt for a cup of ${coffee.toString}...")
- val amt = menu(coffee)//计价val rcpt = Cafe.Receipt(coffee.toString,amt)
- printer ! PrintReceipt(customer,rcpt)//打印收据。可能出现卡纸异常sender() ! Cafe.Sold(rcpt)//通知Cafe销售成功 sender === Cafe
- caseTerminated(_) =>
- log.info("Cashier says: Oh, kitchen is closed. Let's make the end of day!")
- context.system.terminate() //厨房打烊,停止营业。
- }
- }
Cashier 必须确定成功打印收据后才通知 Cafe 销售成功完成。另一个功能是监视厨房打烊情况,厨房打烊则关门停止营业。
下面是 Cafe 和 Customer 的实现代码:
- object Cafe {
- sealedtrait Coffee//咖啡种类
- case object Original extends Coffee
- case object Espresso extends Coffee
- case object Cappuccino extends Coffee
- case class Receipt(item: String, amt: Double)
- sealed trait Routine
- case object PlaceOrder extends Routine
- case class Sold(receipt: Receipt) extends Routine
- }
- class Cafe extends Actor with ActorLogging {
- import Cafe._
- import Cashier._
- import context.dispatcher
- implicitval timeout = Timeout(1 seconds)
- vartotalAmount: Double =0.0
- val kitchen = context.actorOf(Kitchen.kitchenProps,"kitchen")
- //Chef可能重启,但path不变。必须直接用chef ? msg,否则经Kitchen转发无法获取正确的senderval chef = context.actorSelection("/user/cafe/kitchen/chef")
- val cashier = context.actorOf(Cashier.props(kitchen),"cashier")
- varcustomer: ActorRef = _//当前客户
- overridedef receive: Receive = {
- caseSold(rcpt) =>
- totalAmount += rcpt.amt
- log.info(s"Today's sales is up to $totalAmount")
- customer ! Customer.OrderServed(rcpt)//send him the order
- if(totalAmount >100.00
来源: http://www.cnblogs.com/tiger-xc/p/6898180.html