Akka-Cluster 可以在一部物理机或一组网络连接的服务器上搭建部署。用 Akka 开发同一版本的分布式程序可以在任何硬件环境中运行, 这样我们就可以确定以 Akka 分布式程序作为标准的编程方式了。
在上面两篇讨论里我们介绍了 Akka-Remoting。Akka-Remoting 其实是一种 ActorSystem 之间 Actor 对 Actor 点对点的沟通协议。通过 Akka-Remoting 来实现一个 ActorSystem 中的一个 Actor 与另一个 Actorsystem 中的另一个 Actor 之间的沟通。在 Remoting 功能之后,Akka 又发展了集群 Cluster 功能。Akka-Cluster 是基于 Akka-Remoting 之上的新一代分布式运算环境,所以 Remoting 已经成为了 Akka-Cluster 的内部支持功能,在生产环境中的分布式运算应该尽量使用 Akka-Cluster。当然,人们仍然可以在学习和测试环境中使用 Akka-Remoting 来了解 Akka 的分布式运算机制和原理。Remoting 和 Cluster 的明显分别之一就是真正实现了 Actor 的位置透明化。让编程人员可以更轻松自然的实现分布式编程。当然,更重要的是相对 Akka-Remoting 而言,Akka-Cluster 提供了一整套更安全、更高效的分布式运算环境。
简单来说 Akka-Cluster 将多个 JVM 连接整合起来,实现消息地址的透明化和统一化使用管理,集成一体化的消息驱动系统。最终目的是能够把一个大型程序分割成多个子程序,然后部署到很多 JVM 上去实现程序的分布式并行运算。更重要的是:Cluster 的构建过程与 Actor 编程没有牵连,当 Cluster 把多个 ActorSystem 集合成一个统一系统后,我们可以用在单一 ActorSystem 里编程的习惯方式编写分布式运算程序。由于在单一机器上就可以配置多个节点形成一个集群,我们开发的分布式程序可以在单机或多机群上运行,不同的只是如何部署和配置集群环境。
我们首先来认识一些有关 Akka-Cluster 的基础概念:
Node:集群节点,也可以说是代表一个独立的 ActorSystem,用 hostname:port 来表示。一部物理机器上可以构建多个集群节点 Node,这时它们有着相同的 hostname 和不同的 port,在不同机器上的 Node 则可以使用不同的 hostname 和相同的 port。
Cluster:由多个节点 Node 作为集群成员通过一种集群组织协议形成集群的一个整体。
Leader:集群中的某个成员节点 Node。由 Akka 自动在集群成员中选定,负责集群成员生命周期状态的具体转换操作。
Seed-Node:由一或多个集群中的节点组成。一个节点在加入集群之前先向所有用户指定的 Seed-Node 发出联系消息,然后向最先答复的 Seed-Node 发出加入集群请求。Seed-Node 的主要作用是为申请加入集群的节点提供具体的联络地址,毕竟申请加入的节点需要一个具体的地址来发送申请加入消息,从这个方面来说:Seed-Node 可以是集群中任何已知地址的节点。
Node-Lifecycle-State:一个节点的生命周期里包括以下几个状态转换:
Joining->Up,Leaving->Exiting,Exiting->Removed,Unreachable->Up,Unreachable->Down,Down->Removed
另外,Akka-Cluster 通过交流心跳信号(heart-beat signal)方式可以监测任何节点是否处于无法联络 Unreachable 状态。
Membership:集群成员组织是通过 Gossip 沟通协议把多个节点组织起来形成的一个集群整体。
Membership-State: 集群状态,是一个集群内所有节点共享的数据结构,用于存放群内所有节点状态。集群状态是一种 CRDT 数据结构,提供安全便捷的数据合并操作,方便逐步累加型数据合并更新。
Gossip-Protocal:是 Node 之间的交流协议。集群内的节点分邻里相互通过 Gossip 交流更新集群状态数据,逐步扩散交流覆盖整个集群所有节点并形成完整的统一集群状态数据。
Gossip-Convergence:集群统一状态。当 Gossip 交流覆盖了集群中所有节点,即所有节点都获得统一的集群状态,就达到集群统一状态 Convergence。
Failure-Detector fd:所有节点都具备心跳信号交流功能。集群中某个节点可能被多个节点用 heartbeat 检测在线是否 Reachable/Unreachable。如果集群中任何一个节点处于 Unreachable 状态则整个集群无法达至 Convergence 状态。
Leader-Actions:当集群达到 Convergence 后系统自动选定一个 Leader 节点进行以上描述的节点状态转换操作。如果集群内有节点处于 Unreachable 状态,无法达到集群 Convergence,则无法满足任何节点状态转换请求。
在 Akka-Cluster 中一个节点加入集群是自动的,只要在配置文件里设置一个 Seed-Node 清单,否则就必须在 Actor 程序里用 Cluster.join 或 Cluster.joinSeedNodes 方法加人:
- /**
- * Try to join this cluster node with the node specified by 'address'.
- * A 'Join(selfAddress)' command is sent to the node to join.
- *
- * An actor system can only join a cluster once. Additional attempts will be ignored.
- * When it has successfully joined it must be restarted to be able to join another
- * cluster or to join the same cluster again.
- *
- * The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
- * cluster.
- */
- def join(address: Address): Unit =
- clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))
- /**
- * Join the specified seed nodes without defining them in config.
- * Especially useful from tests when Addresses are unknown before startup time.
- *
- * An actor system can only join a cluster once. Additional attempts will be ignored.
- * When it has successfully joined it must be restarted to be able to join another
- * cluster or to join the same cluster again.
- */
- def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit =
- clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes.toVector.map(fillLocal))
集群节点 Leave 和 Down 实现方法如下:
- /**
- * Send command to issue state transition to LEAVING for the node specified by 'address'.
- * The member will go through the status changes [[MemberStatus]] `Leaving` (not published to
- * subscribers) followed by [[MemberStatus]] `Exiting` and finally [[MemberStatus]] `Removed`.
- *
- * Note that this command can be issued to any member in the cluster, not necessarily the
- * one that is leaving. The cluster extension, but not the actor system or JVM, of the
- * leaving member will be shutdown after the leader has changed status of the member to
- * Exiting. Thereafter the member will be removed from the cluster. Normally this is
- * handled automatically, but in case of network failures during this process it might
- * still be necessary to set the node's status to Down in order to complete the removal.
- */
- def leave(address: Address): Unit =
- clusterCore ! ClusterUserAction.Leave(fillLocal(address))
- /**
- * Send command to DOWN the node specified by 'address'.
- *
- * When a member is considered by the failure detector to be unreachable the leader is not
- * allowed to perform its duties, such as changing status of new joining members to 'Up'.
- * The status of the unreachable member must be changed to 'Down', which can be done with
- * this method.
- */
- def down(address: Address): Unit =
- clusterCore ! ClusterUserAction.Down(fillLocal(address))
Akka-Cluster 的集群节点状态转换可以作为事件在 Akka 的 EventBus 上发布:
- /**
- * Marker interface for membership events.
- * Published when the state change is first seen on a node.
- * The state change was performed by the leader when there was
- * convergence on the leader node, i.e. all members had seen previous
- * state.
- */
- sealed trait MemberEvent extends ClusterDomainEvent {
- def member: Member
- }
- /**
- * Member status changed to Joining.
- */
- final
- case class MemberJoined(member:
- Member) extends MemberEvent {
- if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member)
- }
- /**
- * Member status changed to WeaklyUp.
- * A joining member can be moved to `WeaklyUp` if convergence
- * cannot be reached, i.e. there are unreachable nodes.
- * It will be moved to `Up` when convergence is reached.
- */
- final
- case class MemberWeaklyUp(member:
- Member) extends MemberEvent {
- if (member.status != WeaklyUp) throw new IllegalArgumentException("Expected WeaklyUp status, got: " + member)
- }
- /**
- * Member status changed to Up.
- */
- final
- case class MemberUp(member:
- Member) extends MemberEvent {
- if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
- }
- /**
- * Member status changed to Leaving.
- */
- final
- case class MemberLeft(member:
- Member) extends MemberEvent {
- if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
- }
- /**
- * Member status changed to `MemberStatus.Exiting` and will be removed
- * when all members have seen the `Exiting` status.
- */
- final
- case class MemberExited(member:
- Member) extends MemberEvent {
- if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
- }
- /**
- * Member completely removed from the cluster.
- * When `previousStatus` is `MemberStatus.Down` the node was removed
- * after being detected as unreachable and downed.
- * When `previousStatus` is `MemberStatus.Exiting` the node was removed
- * after graceful leaving and exiting.
- */
- final
- case class MemberRemoved(member:
- Member, previousStatus: MemberStatus) extends MemberEvent {
- if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
- }
- /**
- * Marker interface to facilitate subscription of
- * both [[UnreachableMember]] and [[ReachableMember]].
- */
- sealed trait ReachabilityEvent extends ClusterDomainEvent {
- def member: Member
- }
- /**
- * A member is considered as unreachable by the failure detector.
- */
- final
- case class UnreachableMember(member:
- Member) extends ReachabilityEvent
- /**
- * A member is considered as reachable by the failure detector
- * after having been unreachable.
- * @see [[UnreachableMember]]
- */
- final
- case class ReachableMember(member:
- Member) extends ReachabilityEvent
集群的当前状态值是存放在下面 CurrentClusterState 结构里的:
- /**
- * Current snapshot state of the cluster. Sent to new subscriber.
- */
- final case class CurrentClusterState(
- members: immutable.SortedSet[Member] = immutable.SortedSet.empty,
- unreachable: Set[Member] = Set.empty,
- seenBy: Set[Address] = Set.empty,
- leader: Option[Address] = None,
- roleLeaderMap: Map[String, Option[Address]] = Map.empty) {
- /**
- * Java API: get current member list.
- */
- def getMembers: java.lang.Iterable[Member] = {
- import scala.collection.JavaConverters._
- members.asJava
- }
- /**
- * Java API: get current unreachable set.
- */
- def getUnreachable: java.util.Set[Member] =
- scala.collection.JavaConverters.setAsJavaSetConverter(unreachable).asJava
- /**
- * Java API: get current "seen-by" set.
- */
- def getSeenBy: java.util.Set[Address] =
- scala.collection.JavaConverters.setAsJavaSetConverter(seenBy).asJava
- /**
- * Java API: get address of current leader, or null if none
- */
- def getLeader: Address = leader orNull
- /**
- * All node roles in the cluster
- */
- def allRoles: Set[String] = roleLeaderMap.keySet
- /**
- * Java API: All node roles in the cluster
- */
- def getAllRoles: java.util.Set[String] =
- scala.collection.JavaConverters.setAsJavaSetConverter(allRoles).asJava
- /**
- * get address of current leader, if any, within the role set
- */
- def roleLeader(role: String): Option[Address] = roleLeaderMap.getOrElse(role, None)
- /**
- * Java API: get address of current leader within the role set,
- * or null if no node with that role
- */
- def getRoleLeader(role: String): Address = roleLeaderMap.get(role).flatten.orNull
- }
用户可以监听这些事件的发生:
- cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
- classOf[MemberEvent], classOf[UnreachableMember])
另外,我们还可以用 callback 方式在状态转换前后调用一些运算来进行准备处理和事后处理:
- /**
- * The supplied thunk will be run, once, when current cluster member is `Up`.
- * Typically used together with configuration option `akka.cluster.min-nr-of-members`
- * to defer some action, such as starting actors, until the cluster has reached
- * a certain size.
- */
- def registerOnMemberUp[T](code: ⇒ T): Unit =
- registerOnMemberUp(newRunnable { def run() = code })
- /**
- * Java API: The supplied callback will be run, once, when current cluster member is `Up`.
- * Typically used together with configuration option `akka.cluster.min-nr-of-members`
- * to defer some action, such as starting actors, until the cluster has reached
- * a certain size.
- */
- def registerOnMemberUp(callback: Runnable): Unit =
- clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)
- /**
- * The supplied thunk will be run, once, when current cluster member is `Removed`.
- * If the cluster has already been shutdown the thunk will run on the caller thread immediately.
- * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
- */
- def registerOnMemberRemoved[T](code: ⇒ T): Unit =
- registerOnMemberRemoved(newRunnable {overridedef run(): Unit = code })
- /**
- * Java API: The supplied thunk will be run, once, when current cluster member is `Removed`.
- * If the cluster has already been shutdown the thunk will run on the caller thread immediately.
- * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.terminate()`.
- */
- def registerOnMemberRemoved(callback: Runnable): Unit = {
- if(_isTerminated.get())
- callback.run()
- else
- clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback)
- }
下面我们就用个例子来示范 Akka-Cluster 的运作过程:
首先需要 Akka-Cluster 的 dependency:build.sbt
- name :="cluster-states-demo"
- version :="1.0"
- scalaVersion :="2.11.8"
- libraryDependencies ++= {
- val akkaVersion ="2.5.3"
- Seq(
- "com.typesafe.akka"%%"akka-actor"% akkaVersion,
- "com.typesafe.akka"%%"akka-cluster"% akkaVersion
- )
- }
然后是基本的配置:cluster.conf
- akka {
- actor {
- provider ="cluster"
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname ="127.0.0.1"
- port =2551
- }
- }
- cluster {
- seed-nodes = [
- "akka.tcp://clusterSystem@127.0.0.1:2551"]
- }
- }
下面是一个集群状态转换事件的监听 Actor:
- import akka.actor._
- import akka.cluster.ClusterEvent._
- import akka.cluster._
- import com.typesafe.config.ConfigFactory
- class EventLisener extends Actor with ActorLogging {
- val cluster = Cluster(context.system)
- overridedef preStart(): Unit = {
- cluster.subscribe(self,initialStateMode = InitialStateAsEvents
- ,classOf[MemberEvent],classOf[UnreachableMember]) //订阅集群状态转换信息
- super.preStart()
- }
- overridedef postStop(): Unit = {
- cluster.unsubscribe(self) //取消订阅
- super.postStop()
- }
- overridedef receive: Receive = {
- caseMemberJoined(member) =>
- log.info("Member is Joining: {}", member.address)
- caseMemberUp(member) =>
- log.info("Member is Up: {}", member.address)
- caseMemberLeft(member) =>
- log.info("Member is Leaving: {}", member.address)
- caseMemberExited(member) =>
- log.info("Member is Exiting: {}", member.address)
- caseMemberRemoved(member, previousStatus) =>
- log.info(
- "Member is Removed: {} after {}",
- member.address, previousStatus)
- caseUnreachableMember(member) =>
- log.info("Member detected as unreachable: {}", member)
- cluster.down(member.address) //手工驱除,不用auto-down
- case_: MemberEvent =>// ignore
- }
- }
下面是 EventListener 使用测试代码,增加了 Node 加人集群后可能进行的前期设置及退出集群后的事后清理:
- object ClusterEventsDemo {
- def main(args: Array[String]): Unit = {
- //重设port,seed-node-addressval port =if(args.isEmpty)"0"
- elseargs(0)
- val addr =if(args.length <2)"2551"
- elseargs(1)
- val seednodeSetting ="akka.cluster.seed-nodes = ["+"\"akka.tcp://clusterSystem@127.0.0.1:"+
- s"${addr}"+"\"]"
- val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}")
- .withFallback(ConfigFactory.parseString(seednodeSetting))
- .withFallback(ConfigFactory.load("cluster.conf"))
- val clusterSystem = ActorSystem(name="clusterSystem",config=config)
- val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener")
- val cluster = Cluster(clusterSystem)
- cluster.registerOnMemberRemoved(println("Leaving cluster. I should cleanup... "))
- cluster.registerOnMemberUp(println("Hookup to cluster. Do some setups ..."))
- println("actor system started!")
- scala.io.StdIn.readLine()
- clusterSystem.terminate()
- }
- }
我们在多个 terminal 上用 sbt 来测试运行:
1、run "2551" "2551" // 这是个 seed-node
- [INFO][06 / 26 / 2017 21 : 25 : 46.743][clusterSystem - akka.actor.
- default - dispatcher - 3][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Node [akka.tcp://clusterSystem@127.0.0.1:2551] is JOINING, roles []
- [INFO][06 / 26 / 2017 21 : 25 : 46.751][clusterSystem - akka.actor.
- default - dispatcher - 3][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://clusterSystem@127.0.0.1:2551] to [Up]
- [INFO][06 / 26 / 2017 21 : 25 : 46.755][clusterSystem - akka.actor.
- default - dispatcher - 16][akka.tcp: //clusterSystem@127.0.0.1:2551/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551
2、run "0" "2551" //port=0 代表由系统自动选择端口
- [INFO][06 / 26 / 2017 21 : 26 : 57.467][run - main - 1e][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52459] - Started up successfully
- actor system started ! [INFO][06 / 26 / 2017 21 : 26 : 57.735][clusterSystem - akka.actor.
- default - dispatcher - 4][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52459] - Welcome from [akka.tcp://clusterSystem@127.0.0.1:2551]
- [INFO][06 / 26 / 2017 21 : 26 : 57.751][clusterSystem - akka.actor.
- default - dispatcher - 3][akka.tcp: //clusterSystem@127.0.0.1:52459/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551
- [INFO][06 / 26 / 2017 21 : 26 : 57.752][clusterSystem - akka.actor.
- default - dispatcher - 3][akka.tcp: //clusterSystem@127.0.0.1:52459/user/eventListener] Member is Joining: akka.tcp://clusterSystem@127.0.0.1:52459
- [INFO][06 / 26 / 2017 21 : 26 : 57.809][clusterSystem - akka.actor.
- default - dispatcher - 16][akka.tcp: //clusterSystem@127.0.0.1:52459/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52459
3、run "0" "2551" //port=0 代表由系统自动选择端口
- [INFO][06 / 26 / 2017 21 : 28 : 22.577][run - main - 1][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52467] - Started up successfully
- actor system started ! [INFO][06 / 26 / 2017 21 : 28 : 22.736][clusterSystem - akka.actor.
- default - dispatcher - 2][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:52467] - Welcome from [akka.tcp://clusterSystem@127.0.0.1:2551]
- [INFO][06 / 26 / 2017 21 : 28 : 22.747][clusterSystem - akka.actor.
- default - dispatcher - 16][akka.tcp: //clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:2551
- [INFO][06 / 26 / 2017 21 : 28 : 22.749][clusterSystem - akka.actor.
- default - dispatcher - 16][akka.tcp: //clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52459
- [INFO][06 / 26 / 2017 21 : 28 : 22.749][clusterSystem - akka.actor.
- default - dispatcher - 16][akka.tcp: //clusterSystem@127.0.0.1:52467/user/eventListener] Member is Joining: akka.tcp://clusterSystem@127.0.0.1:52467
- [INFO][06 / 26 / 2017 21 : 28 : 24.611][clusterSystem - akka.actor.
- default - dispatcher - 22][akka.tcp: //clusterSystem@127.0.0.1:52467/user/eventListener] Member is Up: akka.tcp://clusterSystem@127.0.0.1:52467
在 terminal2 运算 cluster.leave(cluster.selfAddress):
- [INFO][06 / 26 / 2017 22 : 40 : 47.614][clusterSystem - akka.actor.
- default - dispatcher - 4][akka.tcp: //clusterSystem@127.0.0.1:2551/user/eventListener] Member is Leaving: akka.tcp://clusterSystem@127.0.0.1:53986
- [INFO][06 / 26 / 2017 22 : 40 : 48.032][clusterSystem - akka.actor.
- default - dispatcher - 15][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://clusterSystem@127.0.0.1:53986] to [Exiting]
- [INFO][06 / 26 / 2017 22 : 40 : 48.032][clusterSystem - akka.actor.
- default - dispatcher - 21][akka.tcp: //clusterSystem@127.0.0.1:2551/user/eventListener] Member is Exiting: akka.tcp://clusterSystem@127.0.0.1:53986
- [INFO][06 / 26 / 2017 22 : 40 : 48.047][clusterSystem - akka.actor.
- default - dispatcher - 21][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Exiting confirmed [akka.tcp://clusterSystem@127.0.0.1:53986]
- [INFO][06 / 26 / 2017 22 : 40 : 49.033][clusterSystem - akka.actor.
- default - dispatcher - 15][akka.cluster.Cluster(akka: //clusterSystem)] Cluster Node [akka.tcp://clusterSystem@127.0.0.1:2551] - Leader is removing confirmed Exiting node [akka.tcp://clusterSystem@127.0.0.1:53986]
- [INFO][06 / 26 / 2017 22 : 40 : 49.033][clusterSystem - akka.actor.
- default - dispatcher - 21][akka.tcp: //clusterSystem@127.0.0.1:2551/user/eventListener] Member is Removed: akka.tcp://clusterSystem@127.0.0.1:53986 after Exiting
下面就是本次示范的源代码:
resources/cluster.conf
- akka {
- actor {
- provider ="akka.cluster.ClusterActorRefProvider"
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname ="127.0.0.1"
- port =0
- }
- }
- cluster {
- seed-nodes = [
- "akka.tcp://clusterSystem@127.0.0.1:2551"]
- }
- }
ClusterEventsDemo.scala
- import akka.actor._
- import akka.cluster.ClusterEvent._
- import akka.cluster._
- import com.typesafe.config.ConfigFactory
- class EventLisener extends Actor with ActorLogging {
- val cluster = Cluster(context.system)
- overridedef preStart(): Unit = {
- cluster.subscribe(self,initialStateMode = InitialStateAsEvents
- ,classOf[MemberEvent],classOf[UnreachableMember]) //订阅集群状态转换信息
- super.preStart()
- }
- overridedef postStop(): Unit = {
- cluster.unsubscribe(self) //取消订阅
- super.postStop()
- }
- overridedef receive: Receive = {
- caseMemberJoined(member) =>
- log.info("Member is Joining: {}", member.address)
- caseMemberUp(member) =>
- log.info("Member is Up: {}", member.address)
- caseMemberLeft(member) =>
- log.info("Member is Leaving: {}", member.address)
- caseMemberExited(member) =>
- log.info("Member is Exiting: {}", member.address)
- caseMemberRemoved(member, previousStatus) =>
- log.info(
- "Member is Removed: {} after {}",
- member.address, previousStatus)
- caseUnreachableMember(member) =>
- log.info("Member detected as unreachable: {}", member)
- cluster.down(member.address) //手工驱除,不用auto-down
- case_: MemberEvent =>// ignore
- }
- }
- object ClusterEventsDemo {
- def main(args: Array[String]): Unit = {
- //重设port,seed-node-addressval port =if(args.isEmpty)"0"
- elseargs(0)
- val addr =if(args.length <2)"2551"
- elseargs(1)
- val seednodeSetting ="akka.cluster.seed-nodes = ["+"\"akka.tcp://clusterSystem@127.0.0.1:"+
- s"${addr}"+"\"]"
- val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = ${port}")
- .withFallback(ConfigFactory.parseString(seednodeSetting))
- .withFallback(ConfigFactory.load("cluster.conf"))
- val clusterSystem = ActorSystem(name="clusterSystem",config=config)
- val eventListener = clusterSystem.actorOf(Props[EventLisener],"eventListener")
- val cluster = Cluster(clusterSystem)
- cluster.registerOnMemberRemoved(println("Leaving cluster. I should cleanup... "))
- cluster.registerOnMemberUp(println("Hookup to cluster. Do some setups ..."))
- println("actor system started!")
- scala.io.StdIn.readLine()
- clusterSystem.terminate()
- }
- }
来源: http://www.cnblogs.com/tiger-xc/p/7082905.html