=
{ Source.fromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor)) } } /** * A RestartFlow wraps a [[Flow]] that gets restarted when it completes or fails. * * They are useful for graphs that need to run for longer than the [[Flow]] can necessarily guarantee it will, for * example, for [[Flow]] streams that depend on a remote server that may crash or become partitioned. The * RestartFlow ensures that the graph can continue running while the [[Flow]] restarts. */ object RestartFlow { /** * Wrap the given [[Flow]] with a [[Flow]] that will restart it when it fails or complete using an exponential * backoff. * * This [[Flow]] will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or * completed. Any termination by the [[Flow]] before that time will be handled by restarting it. Any termination * signals sent to this [[Flow]] however will terminate the wrapped [[Flow]], if it's running, and then the [[Flow]] * will be allowed to terminate without being restarted. * * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of * messages. A termination signal from either end of the wrapped [[Flow]] will cause the other end to be terminated, * and any in transit messages will be lost. During backoff, this [[Flow]] will backpressure. * * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. * @param flowFactory A factory for producing the [[Flow]] to wrap. */ def withBackoff[In, Out](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(flowFactory: () ⇒ Flow[In, Out, _]): Flow[In, Out, NotUsed]=
{ Flow.fromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor)) } } /** * A RestartSink wraps a [[Sink]] that gets restarted when it completes or fails. * * They are useful for graphs that need to run for longer than the [[Sink]] can necessarily guarantee it will, for * example, for [[Sink]] streams that depend on a remote server that may crash or become partitioned. The * RestartSink ensures that the graph can continue running while the [[Sink]] restarts. */ object RestartSink { /** * Wrap the given [[Sink]] with a [[Sink]] that will restart it when it fails or complete using an exponential * backoff. * * This [[Sink]] will never cancel, since cancellation by the wrapped [[Sink]] is always handled by restarting it. * The wrapped [[Sink]] can however be completed by feeding a completion or error into this [[Sink]]. When that * happens, the [[Sink]], if currently running, will terminate and will not be restarted. This can be triggered * simply by the upstream completing, or externally by introducing a [[KillSwitch]] right before this [[Sink]] in the * graph. * * The restart process is inherently lossy, since there is no coordination between cancelling and the sending of * messages. When the wrapped [[Sink]] does cancel, this [[Sink]] will backpressure, however any elements already * sent may have been lost. * * This uses the same exponential backoff algorithm as [[akka.pattern.Backoff]]. * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration * @param randomFactor after calculation of the exponential back-off an additional * random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. * In order to skip this additional delay pass in `0`. * @param sinkFactory A factory for producing the [[Sink]] to wrap. */ def withBackoff[T](minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double)(sinkFactory: () ⇒ Sink[T, _]): Sink[T, NotUsed]=
{ Sink.fromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor)) } }来源: http://www.cnblogs.com/tiger-xc/p/7569305.html