通过上篇关于 Cluster-Singleton 的介绍,我们了解了 Akka 为分布式程序提供的编程支持:基于消息驱动的运算模式特别适合分布式程序编程,我们不需要特别的努力,只需要按照普通的 Actor 编程方式就可以实现集群分布式程序了。Cluster-Singleton 可以保证无论集群节点出了任何问题,只要集群中还有节点在线,都可以持续的安全运算。Cluster-Singleton 这种模式保证了某种 Actor 的唯一实例可以安全稳定地在集群环境下运行。还有一种情况就是如果有许多特别占用资源的 Actor 需要同时运行,而这些 Actor 同时占用的资源远远超过一台服务器的容量,如此我们必须把这些 Actor 分布到多台服务器上,或者是一个由多台服务器组成的集群环境,这时就需要 Cluster-Sharding 模式来帮助解决这样的问题了。
我把通过使用 Cluster-Sharding 后达到的一些目的和大家分享一下,大家一起来分析分析到底这些达成的目标里是否包括了 Actor 在集群节点间的分布:
首先我有个 Actor,它的名称是一个自编码,由 Cluster-Sharding 在集群中某个节点上构建。由于在一个集群环境里所以这个 Actor 到底在哪个节点上,具体地址是什么我都不知道,我只需要用这个自编码就可以和它沟通。如果我有许多自编码的消耗资源的 Actor,我可以通过自编码中的分片(shard)编号来指定在其它的分片(shard)里构建这些 Actor。Akka-Cluster 还可以根据整个集群中节点的增减按当前集群节点情况进行分片在集群节点调动来重新配载(rebalance),包括在某些节点因故脱离集群时把节点上的所有 Actor 在其它在线节点上重新构建。这样看来,这个 Actor 的自编码应该是 Cluster-Sharding 的应用核心元素了。按惯例我们还是用例子来示范 Cluster-Sharding 的使用。我们需要分片(sharding)的 Actor 就是前几篇讨论里提到的 Calculator:
- package clustersharding.entity
- import akka.actor._ import akka.cluster._ import akka.persistence._ import scala.concurrent.duration._ import akka.cluster.sharding._
- object Calculator {
- sealed trait Command
- case class Num(d:
- Double) extends Command
- case class Add(d:
- Double) extends Command
- case class Sub(d:
- Double) extends Command
- case class Mul(d:
- Double) extends Command
- case class Div(d:
- Double) extends Command
- case object ShowResult extends Command
- sealed trait Event
- case class SetNum(d:
- Double) extends Event
- case class Added(x:
- Double, y: Double) extends Event
- case class Subtracted(x:
- Double, y: Double) extends Event
- case class Multiplied(x:
- Double, y: Double) extends Event
- case class Divided(x:
- Double, y: Double) extends Event
- case class State(result:
- Double) {
- def updateState(evt:
- Event):
- State = evt match {
- case SetNum(n) = >copy(result = n)
- case Added(x, y) = >copy(result = x + y)
- case Subtracted(x, y) = >copy(result = x - y)
- case Multiplied(x, y) = >copy(result = x * y)
- case Divided(x, y) = >copy(result = {
- val _ = x.toInt / y.toInt //yield ArithmeticException when /0.00
- x / y
- })
- }
- }
- case object Disconnect extends Command //exit cluster
- def props = Props(new Calcultor)
- }
- class Calcultor extends PersistentActor with ActorLogging {
- import Calculator._ val cluster = Cluster(context.system)
- var state: State = State(0)
- override def persistenceId: String = self.path.parent.name + "-" + self.path.name
- override def receiveRecover: Receive = {
- case evt:
- Event = >state = state.updateState(evt)
- case SnapshotOffer(_, st: State) = >state = state.copy(result = st.result)
- }
- override def receiveCommand: Receive = {
- case Num(n) = >persist(SetNum(n))(evt = >state = state.updateState(evt))
- case Add(n) = >persist(Added(state.result, n))(evt = >state = state.updateState(evt))
- case Sub(n) = >persist(Subtracted(state.result, n))(evt = >state = state.updateState(evt))
- case Mul(n) = >persist(Multiplied(state.result, n))(evt = >state = state.updateState(evt))
- case Div(n) = >persist(Divided(state.result, n))(evt = >state = state.updateState(evt))
- case ShowResult = >log.info(s "Result on ${cluster.selfAddress.hostPort} is: ${state.result}")
- case Disconnect = >log.info(s "${cluster.selfAddress} is leaving cluster!!!") cluster.leave(cluster.selfAddress)
- }
- override def preRestart(reason: Throwable, message: Option[Any]) : Unit = {
- log.info(s "Restarting calculator: ${reason.getMessage}") super.preRestart(reason, message)
- }
- }
- class CalcSupervisor extends Actor {
- def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
- case _:
- ArithmeticException = >SupervisorStrategy.Resume
- }
- override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds) {
- decider.orElse(SupervisorStrategy.defaultDecider)
- }
- val calcActor = context.actorOf(Calculator.props, "calculator")
- override def receive: Receive = {
- case msg@_ = >calcActor.forward(msg)
- }
- }
我们看到:Calculator 是一个普通的 PersisitentActor,内部状态可以实现持久化,Actor 重启时可以恢复状态。CalcSupervisor 是 Calculator 的监管,这样做是为了实现新的监管策略 SupervisorStrategy。
Calculator 就是我们准备集群分片(sharding)的目标 enitity。一种 Actor 的分片是通过 Akka 的 Cluster-Sharding 的 ClusterSharding.start 方法在集群中构建的。我们需要在所有将承载分片的节点上运行这个方法来部署分片:
- /**
- * Register a named entity type by defining the [[akka.actor.Props]] of the entity actor and
- * functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
- * for this type can later be retrieved with the [[#shardRegion]] method.
- *
- * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
- * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
- *
- * Some settings can be configured as described in the `akka.cluster.sharding` section
- * of the `reference.conf`.
- *
- * @param typeName the name of the entity type
- * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
- * @param settings configuration settings, see [[ClusterShardingSettings]]
- * @param extractEntityId partial function to extract the entity id and the message to send to the
- * entity from the incoming message, if the partial function does not match the message will
- * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
- * @param extractShardId function to determine the shard id for an incoming message, only messages
- * that passed the `extractEntityId` will be used
- * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
- */
- def start(typeName: String, entityProps: Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId) : ActorRef = {
- val allocationStrategy = new LeastShardAllocationStrategy(settings.tuningParameters.leastShardAllocationRebalanceThreshold, settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance)
- start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, PoisonPill)
- }
start 返回了 ShardRegion,是个 ActorRef 类型。ShardRegion 是一个特殊的 Actor,负责管理可能多个分片(shard)内称为 Entity 的 Actor 实例。这些分片可能是分布在不同的集群节点上的,外界通过 ShardRegion 与其辖下 Entities 沟通。从 start 函数参数 entityProps 我们看到:每个分片中只容许一个种类的 Actor;具体的 Entity 实例是由另一个内部 Actor 即 shard 构建的,shard 可以在一个分片中构建多个 Entity 实例。多 shard 多 entity 的特性可以从 extractShardId,extractEntityId 这两个方法中得到一些信息。我们说过 Actor 自编码即 entity-id 是 Cluster-Sharding 的核心元素。在 entity-id 这个自编码中还包含了 shard-id,所以用户可以通过 entity-id 的编码规则来设计整个分片系统包括每个 ShardRegion 下 shard 和 entity 的数量。当 ShardRegion 得到一个 entity-id 后,首先从中抽取 shard-id,如果 shard-id 在集群中不存在的话就按集群各节点负载情况在其中一个节点上构建新的 shard;然后再用 entity-id 在 shard-id 分片中查找 entity,如果不存在就构建一个新的 entity 实例。整个 shard 和 entity 的构建过程都是通过用户提供的函数 extractShardId 和 extractEntityId 实现的,Cluster-Sharding 就是通过这两个函数按用户的要求来构建和使用 shard 和 entity 的。这个自编码无需按一定的顺序,只需要保证唯一性。下面是一个编码例子:
- object CalculatorShard {
- import Calculator._
- case class CalcCommands(eid:
- String, msg: Command) //user should use it to talk to shardregion
- val shardName = "calcShard"val getEntityId: ShardRegion.ExtractEntityId = {
- case CalcCommands(id, msg) = >(id, msg)
- }
- val getShardId: ShardRegion.ExtractShardId = {
- case CalcCommands(id, _) = >id.head.toString
- }
- def entityProps = Props(new CalcSupervisor)
- }
用户是用 CalcCommands 与 ShardRegion 沟通的。这是一个专门为与分片系统沟通而设的包嵌消息类型,包嵌的信息里除了 Calculator 正常支持的 Command 消息外,还包括了目标 Entity 实例的编号 eid。这个 eid 的第一个字节代表 shard-id,这样我们可以直接指定目标 entity 所在分片或者随意任选一个 shard-id 如:Random.NextInt(9).toString。由于每个分片只含一种类型的 Actor,不同的 entity-id 代表多个同类 Actor 实例的同时存在,就像前面讨论的 Router 一样:所有实例针对不同的输入进行相同功能的运算处理。一般来说用户会通过某种算法任意产生 entity-id,希望能做到各分片中 entity 的均衡部署,Cluster-Sharding 可以根据具体的集群负载情况自动调整分片在集群节点层面上的部署。
下面的代码示范了如何在一个集群节点上部署分片:
- package clustersharding.shard import akka.persistence.journal.leveldb._ import akka.actor._ import akka.cluster.sharding._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern._ import clustersharding.entity.CalculatorShard
- object CalcShards {
- def create(port: Int) = {
- val config = ConfigFactory.parseString(s "akka.remote.netty.tcp.port=${port}").withFallback(ConfigFactory.load("sharding"))
- // Create an Akka system
- val system = ActorSystem("ShardingSystem", config)
- startupSharding(port, system)
- }
- def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath) : Unit = {
- // Start the shared journal one one node (don't crash this SPOF)
- // This will not be needed with a distributed journal
- if (startStore) system.actorOf(Props[SharedLeveldbStore], "store")
- // register the shared journal
- import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess {
- case ActorIdentity(_, Some(ref)) = >SharedLeveldbJournal.setStore(ref, system)
- case _ = >system.log.error("Shared journal not started at {}", path) system.terminate()
- }
- f.onFailure {
- case _ = >system.log.error("Lookup of shared journal at {} timed out", path) system.terminate()
- }
- }
- def startupSharding(port: Int, system: ActorSystem) = {
- startupSharedJournal(system, startStore = (port == 2551), path = ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store"))
- ClusterSharding(system).start(typeName = CalculatorShard.shardName, entityProps = CalculatorShard.entityProps, settings = ClusterShardingSettings(system), extractEntityId = CalculatorShard.getEntityId, extractShardId = CalculatorShard.getShardId)
- }
- }
具体的部署代码在 startupSharding 方法里。下面这段代码示范了如何使用分片里的 entity:
- package clustersharding.demo import akka.actor.ActorSystem import akka.cluster.sharding._ import clustersharding.entity.CalculatorShard.CalcCommands import clustersharding.entity._ import clustersharding.shard.CalcShards import com.typesafe.config.ConfigFactory
- object ClusterShardingDemo extends App {
- CalcShards.create(2551) CalcShards.create(0) CalcShards.create(0) CalcShards.create(0)
- Thread.sleep(1000)
- val shardingSystem = ActorSystem("ShardingSystem", ConfigFactory.load("sharding")) CalcShards.startupSharding(0, shardingSystem)
- Thread.sleep(1000)
- val calcRegion = ClusterSharding(shardingSystem).shardRegion(CalculatorShard.shardName)
- calcRegion ! CalcCommands("1001", Calculator.Num(13.0)) //shard 1, entity 1001
- calcRegion ! CalcCommands("1001", Calculator.Add(12.0)) calcRegion ! CalcCommands("1001", Calculator.ShowResult) //shows address too
- calcRegion ! CalcCommands("1001", Calculator.Disconnect) //disengage cluster
- calcRegion ! CalcCommands("2003", Calculator.Num(10.0)) //shard 2, entity 2003
- calcRegion ! CalcCommands("2003", Calculator.Mul(3.0)) calcRegion ! CalcCommands("2003", Calculator.Div(2.0))
- Thread.sleep(15000) calcRegion ! CalcCommands("1001", Calculator.ShowResult) //check if restore result on another node
- calcRegion ! CalcCommands("2003", Calculator.ShowResult)
- }
以上代码里人为选定了分片和 entity-id,其中包括了从集群中抽出一个节点的操作。运算结果如下:
- [INFO][07 / 14 / 2017 13 : 52 : 05.911][ShardingSystem - akka.actor.
- default - dispatcher - 28][akka.tcp: //ShardingSystem@127.0.0.1:2551/system/sharding/calcShard/1/1001/calculator] Result on ShardingSystem@127.0.0.1:2551 is: 25.0
- [INFO][07 / 14 / 2017 13 : 52 : 05.911][ShardingSystem - akka.actor.
- default - dispatcher - 28][akka.tcp: //ShardingSystem@127.0.0.1:2551/system/sharding/calcShard/1/1001/calculator] akka.tcp://ShardingSystem@127.0.0.1:2551 is leaving cluster!!!
- [INFO][07 / 14 / 2017 13 : 52 : 15.826][ShardingSystem - akka.actor.
- default - dispatcher - 34][akka.tcp: //ShardingSystem@127.0.0.1:58287/system/sharding/calcShard/2/2003/calculator] Result on ShardingSystem@127.0.0.1:58287 is: 15.0
- [INFO][07 / 14 / 2017 13 : 52 : 17.819][ShardingSystem - akka.actor.
- default - dispatcher - 23][akka.tcp: //ShardingSystem@127.0.0.1:58288/system/sharding/calcShard/1/1001/calculator] Result on ShardingSystem@127.0.0.1:58288 is: 25.0
结果显示 entity1001 在节点 2551 退出集群后被转移到节点 58288 上,并行保留了状态。
下面是本次示范的源代码:
build.sbt
- name: ="cluster-sharding"
- version: ="1.0"
- scalaVersion: ="2.11.9"
- resolvers += "Akka Snapshot Repository"at "http://repo.akka.io/snapshots/"
- val akkaversion = "2.4.8"
- libraryDependencies++=Seq("com.typesafe.akka" % %"akka-actor" % akkaversion, "com.typesafe.akka" % %"akka-remote" % akkaversion, "com.typesafe.akka" % %"akka-cluster" % akkaversion, "com.typesafe.akka" % %"akka-cluster-tools" % akkaversion, "com.typesafe.akka" % %"akka-cluster-sharding" % akkaversion, "com.typesafe.akka" % %"akka-persistence" % "2.4.8", "com.typesafe.akka" % %"akka-contrib" % akkaversion, "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8")
resources/sharding.conf
- akka.actor.warn - about - java - serializer - usage = off akka.log - dead - letters - during - shutdown = off akka.log - dead - letters = off
- akka {
- loglevel = INFO actor {
- provider = "akka.cluster.ClusterActorRefProvider"
- }
- remote {
- log - remote - lifecycle - events = off netty.tcp {
- hostname = "127.0.0.1"port = 0
- }
- }
- cluster {
- seed - nodes = ["akka.tcp://ShardingSystem@127.0.0.1:2551"] log - info = off
- }
- persistence {
- journal.plugin = "akka.persistence.journal.leveldb-shared"journal.leveldb - shared.store {#DO NOT USE 'native = off'IN PRODUCTION !! !native = off dir = "target/shared-journal"
- }
- snapshot - store.plugin = "akka.persistence.snapshot-store.local"snapshot - store.local.dir = "target/snapshots"
- }
- }
Calculator.scala
- package clustersharding.entity
- import akka.actor._ import akka.cluster._ import akka.persistence._ import scala.concurrent.duration._ import akka.cluster.sharding._
- object Calculator {
- sealed trait Command
- case class Num(d:
- Double) extends Command
- case class Add(d:
- Double) extends Command
- case class Sub(d:
- Double) extends Command
- case class Mul(d:
- Double) extends Command
- case class Div(d:
- Double) extends Command
- case object ShowResult extends Command
- sealed trait Event
- case class SetNum(d:
- Double) extends Event
- case class Added(x:
- Double, y: Double) extends Event
- case class Subtracted(x:
- Double, y: Double) extends Event
- case class Multiplied(x:
- Double, y: Double) extends Event
- case class Divided(x:
- Double, y: Double) extends Event
- case class State(result:
- Double) {
- def updateState(evt:
- Event):
- State = evt match {
- case SetNum(n) = >copy(result = n)
- case Added(x, y) = >copy(result = x + y)
- case Subtracted(x, y) = >copy(result = x - y)
- case Multiplied(x, y) = >copy(result = x * y)
- case Divided(x, y) = >copy(result = {
- val _ = x.toInt / y.toInt //yield ArithmeticException when /0.00
- x / y
- })
- }
- }
- case object Disconnect extends Command //exit cluster
- def props = Props(new Calcultor)
- }
- class Calcultor extends PersistentActor with ActorLogging {
- import Calculator._ val cluster = Cluster(context.system)
- var state: State = State(0)
- override def persistenceId: String = self.path.parent.name + "-" + self.path.name
- override def receiveRecover: Receive = {
- case evt:
- Event = >state = state.updateState(evt)
- case SnapshotOffer(_, st: State) = >state = state.copy(result = st.result)
- }
- override def receiveCommand: Receive = {
- case Num(n) = >persist(SetNum(n))(evt = >state = state.updateState(evt))
- case Add(n) = >persist(Added(state.result, n))(evt = >state = state.updateState(evt))
- case Sub(n) = >persist(Subtracted(state.result, n))(evt = >state = state.updateState(evt))
- case Mul(n) = >persist(Multiplied(state.result, n))(evt = >state = state.updateState(evt))
- case Div(n) = >persist(Divided(state.result, n))(evt = >state = state.updateState(evt))
- case ShowResult = >log.info(s "Result on ${cluster.selfAddress.hostPort} is: ${state.result}")
- case Disconnect = >log.info(s "${cluster.selfAddress} is leaving cluster!!!") cluster.leave(cluster.selfAddress)
- }
- override def preRestart(reason: Throwable, message: Option[Any]) : Unit = {
- log.info(s "Restarting calculator: ${reason.getMessage}") super.preRestart(reason, message)
- }
- }
- class CalcSupervisor extends Actor {
- def decider: PartialFunction[Throwable, SupervisorStrategy.Directive] = {
- case _:
- ArithmeticException = >SupervisorStrategy.Resume
- }
- override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds) {
- decider.orElse(SupervisorStrategy.defaultDecider)
- }
- val calcActor = context.actorOf(Calculator.props, "calculator")
- override def receive: Receive = {
- case msg@_ = >calcActor.forward(msg)
- }
- }
- object CalculatorShard {
- import Calculator._
- case class CalcCommands(eid:
- String, msg: Command) //user should use it to talk to shardregion
- val shardName = "calcShard"val getEntityId: ShardRegion.ExtractEntityId = {
- case CalcCommands(id, msg) = >(id, msg)
- }
- val getShardId: ShardRegion.ExtractShardId = {
- case CalcCommands(id, _) = >id.head.toString
- }
- def entityProps = Props(new CalcSupervisor)
- }
CalcShard.scala
- package clustersharding.shard import akka.persistence.journal.leveldb._ import akka.actor._ import akka.cluster.sharding._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import scala.concurrent.duration._ import akka.pattern._ import clustersharding.entity.CalculatorShard
- object CalcShards {
- def create(port: Int) = {
- val config = ConfigFactory.parseString(s "akka.remote.netty.tcp.port=${port}").withFallback(ConfigFactory.load("sharding"))
- // Create an Akka system
- val system = ActorSystem("ShardingSystem", config)
- startupSharding(port, system)
- }
- def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath) : Unit = {
- // Start the shared journal one one node (don't crash this SPOF)
- // This will not be needed with a distributed journal
- if (startStore) system.actorOf(Props[SharedLeveldbStore], "store")
- // register the shared journal
- import system.dispatcher implicit val timeout = Timeout(15.seconds) val f = (system.actorSelection(path) ? Identify(None)) f.onSuccess {
- case ActorIdentity(_, Some(ref)) = >SharedLeveldbJournal.setStore(ref, system)
- case _ = >system.log.error("Shared journal not started at {}", path) system.terminate()
- }
- f.onFailure {
- case _ = >system.log.error("Lookup of shared journal at {} timed out", path) system.terminate()
- }
- }
- def startupSharding(port: Int, system: ActorSystem) = {
- startupSharedJournal(system, startStore = (port == 2551), path = ActorPath.fromString("akka.tcp://ShardingSystem@127.0.0.1:2551/user/store"))
- ClusterSharding(system).start(typeName = CalculatorShard.shardName, entityProps = CalculatorShard.entityProps, settings = ClusterShardingSettings(system), extractEntityId = CalculatorShard.getEntityId, extractShardId = CalculatorShard.getShardId)
- }
- }
ClusterShardingDemo.scala
- package clustersharding.demo import akka.actor.ActorSystem import akka.cluster.sharding._ import clustersharding.entity.CalculatorShard.CalcCommands import clustersharding.entity._ import clustersharding.shard.CalcShards import com.typesafe.config.ConfigFactory
- object ClusterShardingDemo extends App {
- CalcShards.create(2551) CalcShards.create(0) CalcShards.create(0) CalcShards.create(0)
- Thread.sleep(1000)
- val shardingSystem = ActorSystem("ShardingSystem", ConfigFactory.load("sharding")) CalcShards.startupSharding(0, shardingSystem)
- Thread.sleep(1000)
- val calcRegion = ClusterSharding(shardingSystem).shardRegion(CalculatorShard.shardName)
- calcRegion ! CalcCommands("1001", Calculator.Num(13.0)) //shard 1, entity 1001
- calcRegion ! CalcCommands("1001", Calculator.Add(12.0)) calcRegion ! CalcCommands("1001", Calculator.ShowResult) //shows address too
- calcRegion ! CalcCommands("1001", Calculator.Disconnect) //disengage cluster
- calcRegion ! CalcCommands("2003", Calculator.Num(10.0)) //shard 2, entity 2003
- calcRegion ! CalcCommands("2003", Calculator.Mul(3.0)) calcRegion ! CalcCommands("2003", Calculator.Div(2.0))
- Thread.sleep(15000) calcRegion ! CalcCommands("1001", Calculator.ShowResult) //check if restore result on another node
- calcRegion ! CalcCommands("2003", Calculator.ShowResult)
- }
来源: http://www.cnblogs.com/tiger-xc/p/7170127.html