前面我们完成了一个 CQRS 模式的数据采集 (录入) 平台. 可以预见: 数据的产生是在线下各式各样的终端系统中, 包括 Web, 桌面, 移动终端. 那么, 为了实现一个完整的系统, 必须把前端设备通过某种网络连接形式与数据采集平台集成为一体. 有两种方式可以实现需要的网络连接: Restful-API, gRPC. 由于 gRPC 支持 http/2 通讯协议, 支持持久连接方式及双向数据流. 所以对于 POS 设备这样的前端选择 gRPC 作为网络连接方式来实现实时的操作控制应该是正确的选择, 毕竟采用恒久连接和双向数据流效率会高很多. gRPC 是 google 公司的标准, 基于 protobuffer 消息: 一种二进制序列化数据交换机制. gRPC 的优势在这里就不再细说, 读者可以参考前面有关 gRPC 的讨论博文.
下面是系统结构示意图:
这篇讨论焦点集中在 gRPC 的 server,client 两头的具体实现. 刚才提过, gRPC 是 google 公司的开源库, 同时还提供了各种语言的客户端, 有: java, C++,python,go ... 但就是没有 scala 的, 只能找第三方的 scala 客户端了. 现在市面可供选择的 gRPC-scala - 客户端有 scalaPB 和 akka-grpc 两个, akka-grpc 是基于 akka-stream 和 akka-http 构建的, 按理来说会更合适, 但由于还是处于 preview 版本, 以后再说吧, 现在只有 scalaPB 可选了. scalaPB 是一个比较成熟的 gRPC 客户端, 在前面的博客里我们也进行了介绍和示范. 下面我们就用 scalaPB 来实现上面这个例子的客户端 - 平台集成.
首先, gRPC 是通过 protobuffer 进行序列化数据传输的. 下面是这个例子的. proto 定义文件:
- syntax = "proto3";
- import "google/protobuf/wrappers.proto";
- import "google/protobuf/any.proto";
- import "scalapb/scalapb.proto";
- option (scalapb.options) = {
- // use a custom Scala package name
- // package_name: "io.ontherocks.introgrpc.demo"
- // don't append file name to package
- flat_package: true
- // generate one Scala file for all messages (services still get their own file)
- single_file: true
- // add imports to generated file
- // useful when extending Traits or using custom types
- // import: "io.ontherocks.hellogrpc.RockingMessage"
- // code to put at the top of generated file
- // works only with `single_file: true`
- //preamble: "sealed trait SomeSealedTrait"
- };
- package com.datatech.pos.messages;
- message PBVchState { // 单据状态
- string opr = 1; // 收款员
- int64 jseq = 2; //begin journal sequence for read-side replay
- int32 num = 3; // 当前单号
- int32 seq = 4; // 当前序号
- bool void = 5; // 取消模式
- bool refd = 6; // 退款模式
- bool susp = 7; // 挂单
- bool canc = 8; // 废单
- bool due = 9; // 当前余额
- string su = 10; // 主管编号
- string mbr = 11; // 会员号
- int32 mode = 12; // 当前操作流程: 0=logOff, 1=LogOn, 2=Payment
- }
- message PBTxnItem { // 交易记录
- string txndate = 1; // 交易日期
- string txntime = 2; // 录入时间
- string opr = 3; // 操作员
- int32 num = 4; // 销售单号
- int32 seq = 5; // 交易序号
- int32 txntype = 6; // 交易类型
- int32 salestype = 7; // 销售类型
- int32 qty = 8; // 交易数量
- int32 price = 9; // 单价(分)
- int32 amount = 10; // 码洋(分)
- int32 disc = 11; // 折扣率 (%)
- int32 dscamt = 12; // 折扣额: 负值 net 实洋 = amount + dscamt
- string member = 13; // 会员卡号
- string code = 14; // 编号(商品, 卡号...)
- string acct = 15; // 账号
- string dpt = 16; // 部类
- }
- message PBPOSResponse {
- int32 sts = 1;
- string msg = 2;
- PBVchState voucher = 3;
- repeated PBTxnItem txnitems = 4;
- }
- message PBPOSCommand {
- int64 shopid = 1;
- string commandname = 2;
- string delimitedparams = 3; //for multiple parameters, use ; to delimit
- }
- service SendCommand {
- rpc SingleResponse(PBPOSCommand) returns (PBPOSResponse) {};
- rpc MultiResponse(PBPOSCommand) returns (stream PBPOSResponse) {};
- }
前端通过两种模式向平台发送指令 PBPOSCommand: SingleResponse 代表传统的 request/response 交互模式, MultiResponse, 又或者 server-streaming, 代表前端发送一个指令, 服务端返回一串 Response, 或 response-stream.Command 和 PBCommand,POSResponse 和 PBPOSResponse 之间必须具备相互转换函数:
- package com.datatech.pos.cloud
- import Messages._
- import com.datatech.pos.messages._
- object PBConverter {
- implicit class PBConverter(pbmsg: PBPOSCommand) {
- def toPOSComand: POSMessage = pbmsg.commandname.toUpperCase match {
- case "LOGON" => POSMessage(pbmsg.shopid,LogOn(pbmsg.delimitedparams))
- case "LOGOFF" => POSMessage(pbmsg.shopid,LogOff)
- ...
- }
- }
- implicit class POSResponseConvert(resp: POSResponse) {
- def toPBPOSResponse: PBPOSResponse = new PBPOSResponse(
- sts = resp.sts,
- msg = resp.msg,
- voucher = Some(resp.voucher.toPBVchState),
- txnitems = resp.txnItems.map(_.toPBTxnItem)
- )
- }
- implicit class VchStateConvert(state: VchStates) {
- def toPBVchState: PBVchState = new PBVchState(
- opr = state.opr, // 收款员
- jseq = state.jseq, //begin journal sequence for read-side replay
- num = state.num, // 当前单号
- seq = state.seq, // 当前序号
- void = state.void, // 取消模式
- refd = state.refd, // 退款模式
- susp = state.susp, // 挂单
- canc = state.canc, // 废单
- due = state.due, // 当前余额
- su = state.su, // 主管编号
- mbr = state.mbr, // 会员号
- mode = state.mode // 当前操作流程: 0=logOff, 1=LogOn, 2=Payment
- )
- }
- implicit class TxnItemConvert(item: TxnItem) {
- def toPBTxnItem: PBTxnItem = new PBTxnItem(
- txndate = item.txndate, // 交易日期
- txntime = item.txntime, // 录入时间
- opr = item.opr, // 操作员
- num = item.num, // 销售单号
- seq = item.seq, // 交易序号
- txntype = item.txntype, // 交易类型
- salestype = item.salestype, // 销售类型
- qty = item.qty, // 交易数量
- price = item.price, // 单价(分)
- amount = item.amount, // 码洋(分)
- disc = item.disc, // 折扣率 (%)
- dscamt = item.dscamt, // 折扣额: 负值 net 实洋 = amount + dscamt
- member = item.member, // 会员卡号
- code = item.code, // 编号(商品, 卡号...)
- acct = item.acct, // 账号
- dpt = item.dpt // 部类
- )
- }
- }
然后可以开始实现平台端 POS 接口服务了:
- package com.datatech.pos.cloud
- import com.datatech.pos.messages._
- import io.grpc.stub.StreamObserver
- import PBConverter._
- import akka.actor.ActorRef
- import akka.pattern.ask
- import scala.concurrent.duration._
- import akka.util.Timeout
- import Messages._
- import scala.concurrent.{Await, Future}
- import com.typesafe.config.ConfigFactory
- import com.datatech.sdp
- import sdp.logging._
- class gRPCServices(writerRouter: ActorRef) extends SendCommandGrpc.SendCommand with LogSupport {
- import gRPCServices._
- import PBConverter._
- var posConfig: com.typesafe.config.Config = _
- var exetimeout: Int = 5
- try {
- posConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
- exetimeout = posConfig.getInt("executimeout")
- }
- catch {
- case excp : Throwable =>
- log.warn(s"gRPCServices: ${excp.getMessage}")
- exetimeout = 5
- }
- override def singleResponse(request: PBPOSCommand): Future[PBPOSResponse] = {
- getPBResponse(writerRouter,request.toPOSComand, exetimeout)
- }
- override def multiResponse(request: PBPOSCommand, responseObserver: StreamObserver[PBPOSResponse]): Unit = ???
- }
- object gRPCServices {
- import scala.concurrent.ExecutionContext.Implicits.global
- def getPBResponse(ref: ActorRef, cmd: POSMessage, executimeout: Int = 5): Future[PBPOSResponse] = {
- implicit val timeout = Timeout(executimeout second)
- val futRes: Future[POSResponse] = ask(ref, cmd).mapTo[POSResponse]
- futRes.map(_.toPBPOSResponse)
- }
- }
现在需要把 gRPCService 与 POS 系统集成为一体, 这样前端发来的 PBCommand 转换成 Command 后经 POSAgent 转发给集群分片 writerRouter,writeRouter 再发给 writer 去进行具体的操作处理, 完后把 POSResponse 转换成 PBPOSResponse 通过 service 再返回前端:
- def getPBResponse(ref: ActorRef, cmd: POSMessage, executimeout: Int = 5): Future[PBPOSResponse] = {
- implicit val timeout = Timeout(executimeout second)
- val futRes: Future[POSResponse] = ask(ref, cmd).mapTo[POSResponse]
- futRes.map(_.toPBPOSResponse)
- }
可以看到上面使用了 ask()模式来进行双向沟通. 这个 ref 是一个中间信息交互 actor (POSAgent):
- var config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
- .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
- .withFallback(ConfigFactory.parseString("cassandra-journal.contact-points=[\"" + host + "\"]"))
- .withFallback(ConfigFactory.parseString("cassandra-snapshot-store.contact-points=[\"" + host + "\"]"))
- if (!seednodes.isEmpty)
- config = config.withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=[" + seednodes + "]"))
- //roles can be deployed on this node
- config = config.withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
- .withFallback(ConfigFactory.load())
- val posSystem = ActorSystem(systemName, config)
- posSystem.actorOf(ClusterMonitor.props, "cps-cluster-monitor")
- posSystem.actorOf(ActionReader.readerProps(showSteps),"reader")
- val readerRouter = posSystem.actorOf(ReaderRouter.props(showSteps),"reader-router")
- WriterShard.deployShard(posSystem)(ReaderInfo(readerRouter,writeOnly),showSteps)
- val posHandler = ClusterSharding(posSystem).shardRegion(WriterShard.shardName)
- val posref = posSystem.actorOf(WriterRouter.props(posHandler), "writer-router")
- val passer = posSystem.actorOf(POSAgent.props(posref), "pos-agent")
- val svc = SendCommandGrpc.bindService(new gRPCServices(passer), posSystem.dispatcher)
- runServer(svc)
- ...
- package com.datatech.pos.cloud
- import akka.actor._
- import com.datatech.sdp
- import sdp.logging._
- import Messages._
- object POSAgent {
- def props(pos: ActorRef) = Props(new WriterRouter(pos))
- }
- class POSAgent(posHandler: ActorRef) extends Actor with LogSupport {
- var _sender: ActorRef = _
- override def receive: Receive = {
- case msg @ POSMessage(_,_) =>
- _sender = sender()
- posHandler ! msg
- case resp: POSResponse => _sender ! resp
- }
- }
- ...
- package com.datatech.pos.cloud
- import akka.actor._
- import com.datatech.sdp
- import sdp.logging._
- import Messages._
- object WriterRouter {
- def props(pos: ActorRef) = Props(new WriterRouter(pos))
- }
- class WriterRouter(posHandler: ActorRef) extends Actor with LogSupport {
- var _sender: ActorRef = _
- override def receive: Receive = {
- case msg @ POSMessage(_,_) =>
- _sender = sender()
- posHandler ! msg
- case resp: POSResponse => _sender ! resp
- // log.info(s"*********response from server: $resp *********")
- }
- }
前端是 gRPC 的客户端. 我们构建一个来测试后台控制逻辑:
- package poc.client
- import scala.concurrent.Future
- import com.datatech.pos.messages._
- import com.datatech.pos.messages.SendCommandGrpc
- import io.grpc.netty.{NegotiationType, NettyChannelBuilder}
- object POCClient {
- def main(args: Array[String]): Unit = {
- val channel = NettyChannelBuilder
- .forAddress("192.168.11.189",50051)
- .negotiationType(NegotiationType.PLAINTEXT)
- .build()
- /*
- //build connection channel
- val channel = io.grpc.ManagedChannelBuilder
- .forAddress("192.168.11.189",50051)
- .usePlaintext(true)
- .build()
- val pbCommand = PBPOSCommand(1022,"LogOn","888")
- //async call
- val asyncStub = SendCommandGrpc.blockingStub(channel)
- val futResponse: Future[PBPOSResponse] = asyncStub.singleResponse(pbCommand)
- import scala.concurrent.ExecutionContext.Implicits.global
- futResponse.foreach(result => println(result)) */
- val pbCommand = PBPOSCommand(1022,"LogOn","888")
- val syncStub1: SendCommandGrpc.SendCommandBlockingClient = SendCommandGrpc.blockingStub(channel)
- val response1: PBPOSResponse = syncStub1.singleResponse(pbCommand)
- println(s"${response1.msg}")
- val pbCommand2 = PBPOSCommand(1022,"LogOff","")
- //sync call
- val syncStub: SendCommandGrpc.SendCommandBlockingClient = SendCommandGrpc.blockingStub(channel)
- val response: PBPOSResponse = syncStub.singleResponse(pbCommand2)
- println(s"${response.msg}")
- scala.io.StdIn.readLine()
- channel.shutdown()
- }
- }
这里有几点必须注意:
1,protobuffer 对象的强名称必须一致. 在客户端用了同一个 posmessages.proto 定义文件:
- syntax = "proto3";
- import "google/protobuf/wrappers.proto";
- import "google/protobuf/any.proto";
- import "scalapb/scalapb.proto";
- option (scalapb.options) = {
- // use a custom Scala package name
- // package_name: "io.ontherocks.introgrpc.demo"
- // don't append file name to package
- flat_package: true
- // generate one Scala file for all messages (services still get their own file)
- single_file: true
- // add imports to generated file
- // useful when extending Traits or using custom types
- // import: "io.ontherocks.hellogrpc.RockingMessage"
- // code to put at the top of generated file
- // works only with `single_file: true`
- //preamble: "sealed trait SomeSealedTrait"
- };
- package com.datatech.pos.messages;
- message PBVchState { // 单据状态
- string opr = 1; // 收款员
- int64 jseq = 2; //begin journal sequence for read-side replay
- int32 num = 3; // 当前单号
- int32 seq = 4; // 当前序号
- bool void = 5; // 取消模式
- bool refd = 6; // 退款模式
- bool susp = 7; // 挂单
- bool canc = 8; // 废单
- bool due = 9; // 当前余额
- string su = 10; // 主管编号
- string mbr = 11; // 会员号
- int32 mode = 12; // 当前操作流程: 0=logOff, 1=LogOn, 2=Payment
- }
- message PBTxnItem { // 交易记录
- string txndate = 1; // 交易日期
- string txntime = 2; // 录入时间
- string opr = 3; // 操作员
- int32 num = 4; // 销售单号
- int32 seq = 5; // 交易序号
- int32 txntype = 6; // 交易类型
- int32 salestype = 7; // 销售类型
- int32 qty = 8; // 交易数量
- int32 price = 9; // 单价(分)
- int32 amount = 10; // 码洋(分)
- int32 disc = 11; // 折扣率 (%)
- int32 dscamt = 12; // 折扣额: 负值 net 实洋 = amount + dscamt
- string member = 13; // 会员卡号
- string code = 14; // 编号(商品, 卡号...)
- string acct = 15; // 账号
- string dpt = 16; // 部类
- }
- message PBPOSResponse {
- int32 sts = 1;
- string msg = 2;
- PBVchState voucher = 3;
- repeated PBTxnItem txnitems = 4;
- }
- message PBPOSCommand {
- int64 shopid = 1;
- string commandname = 2;
- string delimitedparams = 3;
- }
- service SendCommand {
- rpc SingleResponse(PBPOSCommand) returns (PBPOSResponse) {};
- rpc MultiResponse(PBPOSCommand) returns (stream PBPOSResponse) {};
- }
注意 package com.datatech.pos.messages, 这项在服务端和客户端必须一致.
2, 另外就是客户端的 channelbuilder: 在 scalaPB 例子里使用的是 ManagedChannelBuilder, 这是一个实验阶段的东东:
- //build connection channel
- val channel = io.grpc.ManagedChannelBuilder
- .forAddress("132.232.229.60",50051)
- .usePlaintext(true)
- .build()
要用 gRPC 中正式的 channelbuilder:
- val channel = NettyChannelBuilder
- .forAddress("192.168.11.189",50051)
- .negotiationType(NegotiationType.PLAINTEXT)
- .build()
上面这个 NettyChannelBuilder 的设置与那个 io.grpc.ManagedChannelBuilder 功能相等. 但 NettyChannelBuilder 还具备更多的设置参数, 如 ssl/tls 设置.
3, 还有: 因为客户端是按照顺序来发送操作指令的, 每发一个指令, 等待返回结果后才能再发下一个指令. 所以必须使用同步客户端调用函数 blockingStub.
下面是本次示范的一些配置文档:
- project/plugins.sbt
- addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")
- addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2")
- addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15")
- addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.21")
- libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.9.0-M6"
- build.sbt
- name := "pos-on-cloud"
- version := "0.1"
- scalaVersion := "2.12.8"
- scalacOptions += "-Ypartial-unification"
- val akkaVersion = "2.5.23"
- libraryDependencies := Seq(
- "com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
- "com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
- "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
- "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.0.1",
- "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
- "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.0.1",
- "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
- "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.97",
- "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
- "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
- "ch.qos.logback" % "logback-classic" % "1.2.3",
- "io.monix" %% "monix" % "3.0.0-RC2",
- "org.typelevel" %% "cats-core" % "2.0.0-M1",
- "io.grpc" % "grpc-netty" % scalapb.compiler.Version.grpcJavaVersion,
- "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
- "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
- )
- PB.targets in Compile := Seq(
- scalapb.gen() -> (sourceManaged in Compile).value
- )
- enablePlugins(JavaAppPackaging)
- resouces/application.conf
- akka.actor.warn-about-java-serializer-usage = off
- akka.log-dead-letters-during-shutdown = off
- akka.log-dead-letters = off
- akka.remote.use-passive-connections=off
- akka {
- loglevel = INFO
- actor {
- provider = "cluster"
- }
- remote {
- log-remote-lifecycle-events = on
- netty.tcp {
- hostname = "127.0.0.1"
- # port set to 0 for netty to randomly choose from
- port = 0
- }
- }
- cluster {
- seed-nodes = [
- "akka.tcp://cloud-pos-server@172.27.0.8:2551"
- ,"akka.tcp://cloud-pos-server@172.27.0.7:2551"
- ]
- log-info = off
- sharding {
- role = "poswriter"
- passivate-idle-entity-after = 30 m
- }
- }
- persistence {
- journal.plugin = "cassandra-journal"
- snapshot-store.plugin = "cassandra-snapshot-store"
- }
- }
- cassandra-journal {
- contact-points = [
- "172.27.0.8",
- "172.27.0.7",
- "172.27.0.15"
- ]
- }
- cassandra-snapshot-store {
- contact-points = [
- "172.27.0.8",
- "172.27.0.7",
- "172.27.0.15"
- ]
- }
- # Enable metrics extension in akka-cluster-metrics.
- akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
- akka.actor.deployment {
- /reader-router/readerRouter = {
- # Router type provided by metrics extension.
- router = cluster-metrics-adaptive-group
- # Router parameter specific for metrics extension.
- # metrics-selector = heap
- # metrics-selector = load
- # metrics-selector = CPU
- metrics-selector = mix
- #
- routees.paths = ["/user/reader"]
- cluster {
- max-nr-of-instances-per-node = 10
- max-total-nr-of-instances = 1000
- enabled = on
- #set to on when there is a instance of routee created
- #on the same node as the router
- #very important to set this off, could cause missing msg in local cluster
- allow-local-routees = on
- }
- }
- }
- dbwork-dispatcher {
- # Dispatcher is the name of the event-based dispatcher
- type = Dispatcher
- # What kind of ExecutionService to use
- executor = "fork-join-executor"
- # Configuration for the fork join pool
- fork-join-executor {
- # Min number of threads to cap factor-based parallelism number to
- parallelism-min = 2
- # Parallelism (threads) ... ceil(available processors * factor)
- parallelism-factor = 2.0
- # Max number of threads to cap factor-based parallelism number to
- parallelism-max = 10
- }
- # Throughput defines the maximum number of messages to be
- # processed per actor before the thread jumps to the next actor.
- # Set to 1 for as fair as possible.
- throughput = 100
- }
resources/logback.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <Pattern>
- %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
- </Pattern>
- </encoder>
- </appender>
- <logger name="com.datatech" level="info"
- additivity="false">
- <appender-ref ref="STDOUT" />
- </logger>
- <logger name="com.datatech.sdp" level="info"
- additivity="false">
- <appender-ref ref="STDOUT" />
- </logger>
- <root level="warn">
- <appender-ref ref="STDOUT" />
- </root>
- </configuration>
- resources/pos.conf
- pos {
- server {
- debug = false
- cqlport = 9042
- readinterval = 1000
- executimeout = 5
- }
- }
来源: https://www.cnblogs.com/tiger-xc/p/11007529.html