由于本人对 Akka 比较感兴趣, 也用 Akka 开发了一些系统, 但对 Akka 的源码还没有具体分析过, 希望研究源码的同时写一点博客跟大家分享. 有不当之处还请指正. 我准备采取 Debug 的方式来研究 Akka 的运行过程, 从入口开始, 直至分析 Akka 是如何运转的. 这样虽然会有点乱, 但比较直接, 大家凑合着看吧.
使用 Akka 首先要创建的一个对象就是 ActorSystem, 那么我们就先分析这个类及其相关的技术细节.
val system = ActorSystem("WhilePattern1",ConfigFactory.load())
第一步就是创建 ActorSystem, 很明显, 这是调用了 ActorSystem 伴生对象的 apply 方法. ActorSystem 的伴生对象并不复杂, 有很多的 apply 和 create 方法来创建 ActorSystem 的实例. apply/create 分别供 scala 和 java 开发使用. 其他字段都是一些环境变量, 例如 version,envHome,systemHome. 还有一个内部类 Settings, 主要是用来给 ActorSystem 提供参数配置.
下面我们来看 ActorSystem 类, 这是一个抽象类, 它继承了 ActorRefFactory 特质, 下面是源码中对该特质的描述. 很明显, 这个特质是用来创建 Actor 实例的. 我们常用的 actorFor 和 actorSelection 是该特质提供的比较重要的方法, 当然还有与创建 actor 有关的其他函数和字段. ActorSystem 是一个抽象类, 除了继承 ActorRefFactory 特质的函数和字段之外, 定义了一些其他字段和方法, 但也都没有具体的实现.
/**
* Interface implemented by ActorSystem and ActorContext, the only two places
* from which you can get fresh actors.
*/
通过跟踪 AcotSystem 的 apply 我们发现最终调用了以下代码, 主要涉及了两个对象: ActorSystemSetup,ActorSystemImpl. 其中源码中对 ActorSystemSetup 的描述是 "A set of setup settings for programmatic configuration of the actor system." 很明显主要是提供一些可编程的配置, 我们不再深入这个类. ActorSystemImpl 则是我们需要关心的类, 因为 ActorSystem.apply 最终创建了这个类的实例. 而 ActorSystemImpl 由继承了 ExtendedActorSystem,ExtendedActorSystem 抽象类提供了有限的几个函数, 暴露了 ActorRefFactory 中本来是 protected 的函数, 也并没有具体的实现, 我们也暂时忽略.
- /**
- * Scala API: Creates a new actor system with the specified name and settings
- * The core actor system settings are defined in [[BootstrapSetup]]
- */
- def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
- val bootstrapSettings = setup.get[BootstrapSetup]
- val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
- val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
- val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)
- new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
- }
由于 ActorSystemImpl 代码比较多, 如果从头到尾读一遍代码效率比较低. 而且从上面代码可以看出, apply 在创建 ActorSystemImpl 实例之后, 调用了 start 函数, 那么我们就从 start 切入, 看看做了哪些操作.
- private lazy val _start: this.type = try {
- registerOnTermination(stopScheduler())
- // the provider is expected to start default loggers, LocalActorRefProvider does this
- provider.init(this)
- // at this point it should be initialized "enough" for most extensions that we might want to guard against otherwise
- _initialized = true
- if (settings.LogDeadLetters> 0)
- logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
- eventStream.startUnsubscriber()
- loadExtensions()
- if (LogConfigOnStart) logConfiguration()
- this
- } catch {
- case NonFatal(e)
- try terminate() catch { case NonFatal(_) Try(stopScheduler()) }
- throw e
- }
其实 start 的代码还是比较清晰的, 首先用 registerOnTermination 注册了 stopScheduler(), 也就是给 ActorSystem 的退出注册了一个回调函数 stopScheduler(), 这一点也不再具体分析. 而 provider.init(this) 这段代码比较重要, 从 provider 的类型来看, 它是一个 ActorRefProvider, 前面我们已经分析过, 这是一个用来创建 actor 的工厂类. provider 初始化完成意味着就可以创建 actor 了, 源码注释中也明确的说明了这一点.
- val provider: ActorRefProvider = try {
- val arguments = Vector(
- classOf[String] name,
- classOf[Settings] settings,
- classOf[EventStream] eventStream,
- classOf[DynamicAccess] dynamicAccess)
- dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get
- } catch {
- case NonFatal(e)
- Try(stopScheduler())
- throw e
- }
上面是 provider 的创建过程, 最重要的一段代码是 dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get, 它使用 DynamicAccess 创建了 ActorRefProvider 对象的实例. 跟踪 dynamicAccess 创建我们发现这是一个 ReflectiveDynamicAccess 实例, 其实这个类也比较简单, 就是从 ClassLoader 中根据 ProviderClass 字段加载对应的类并创建对应的实例. ProviderClass 定义如下, 这是配置文件中经常看到的配置. 目前的 provider 一共有三种: LocalActorRefProvider,akka.remote.RemoteActorRefProvider,akka.cluster.ClusterActorRefProvider, 当然我们也可以自定义.
- final val ProviderClass: String =
- setup.get[BootstrapSetup]
- .flatMap(_.actorRefProvider).map(_.identifier)
- .getOrElse(getString("akka.actor.provider")) match {
- case "local" classOf[LocalActorRefProvider].getName
- // these two cannot be referenced by class as they may not be on the classpath
- case "remote" "akka.remote.RemoteActorRefProvider"
- case "cluster" "akka.cluster.ClusterActorRefProvider"
- case fqcn fqcn
- }
自此 provider 创建结束, 简单来说就是根据配置, 通过 Class.forName 加载了对应的 ActorRefProvider 实现类, 并把当前的参数传给它, 调用对应的构造函数, 完成实例的创建. provider 创建完成后调用 init 完成初始化, 就可以创建 actor 了.
start 函数还创建了一个 DeadLetterListener 类型的 actor, 这也是我们经常会遇到的. 如果给一个不存在的目标 actor 发消息, 或者发送消息超时, 都会把消息转发给这个 DeadLetter. 这就是一个普通的 actor, 主要用来接收没有发送成功的消息, 并把消息打印出来. 后面还调用了 eventStream.startUnsubscriber(), 由于 eventStream 也不是我们关注的重点, 先忽略. loadExtensions() 功能也比较单一, 就是根据配置加载 ActorSystem 的扩展类, 并进行注册, 关于 Extensions 也不再深入分析.
- private def loadExtensions() {
- /**
- * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)
- */
- def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = {
- immutableSeq(settings.config.getStringList(key)) foreach { fqcn
- dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match {
- case Success(p: ExtensionIdProvider) registerExtension(p.lookup())
- case Success(p: ExtensionId[_]) registerExtension(p)
- case Success(other)
- if (!throwOnLoadFail) log.error("[{}] is not an'ExtensionIdProvider'or'ExtensionId', skipping...", fqcn)
- else throw new RuntimeException(s"[$fqcn] is not an'ExtensionIdProvider'or'ExtensionId'")
- case Failure(problem)
- if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
- else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
- }
- }
- }
- // eager initialization of CoordinatedShutdown
- CoordinatedShutdown(this)
- loadExtensions("akka.library-extensions", throwOnLoadFail = true)
- loadExtensions("akka.extensions", throwOnLoadFail = false)
- }
至此, 我们就对 ActorSystem 的创建和启动分析完毕, 但还有一些细节需要说明, 在 start 之前还是有一些其他字段的初始化. 由于这些字段同样重要, 且初始化的顺序没有太大关联, 我就按照代码结构从上至下依次分析几个重要的字段.
- final val threadFactory: MonitorableThreadFactory =
- MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler)
threadFactory 这是一个线程工厂类, 默认是 MonitorableThreadFactory, 我们只需要记住这是一个线程工厂类, 默认创建 ForkJoinWorkerThread 的线程就好了.
val scheduler: Scheduler = createScheduler()
scheduler 是一个调度器, 主要用来定时发送一些消息, 这个我们也会经常遇到, 但不是此次分析的重点, 略过就好.
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters)
mailboxes 是一个非常重要的字段, 它是 Mailboxes 一个实例, 用来创建对应的 Mailbox,Mailbox 用来接收消息, 并通过 dispatcher 分发给对应的 actor.
- val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
- threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))
- val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
dispatchers 是 Dispatchers 的一个实例, 它用来创建, 查询对应的 MessageDispatcher. 它有一个默认的全局 dispatcher, 从代码来看, 它从配置中读取 akka.actor.default-dispatcher, 并创建 MessageDispatcher 实例. MessageDispatcher 也是一个非常重要的类, 我们后面再具体分析.
- /**
- * The one and only default dispatcher.
- */
- def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
- object Dispatchers {
- /**
- * The id of the default dispatcher, also the full key of the
- * configuration of the default dispatcher.
- */
- final val DefaultDispatcherId = "akka.actor.default-dispatcher"
- }
到这里我们就算分析完了 ActorSystem 的创建过程及其技术细节, 当然 ActorSystem 创建只是第一步, 后面需要创建 actor,actor 如何收到 dispatcher 的消息, 还是需要进一步研究的.
来源: https://www.cnblogs.com/gabry/p/9336477.html