在前面几篇讨论里我们介绍了 scala-gRPC 的基本功能和使用方法, 我们基本确定了选择 gRPC 作为一种有效的内部系统集成工具, 主要因为下面 gRPC 支持的几种服务模式:
,Unary-Call: 独立的一对 client-request/server-response, 是我们常用的 http 交互模式
,Server-Streaming:client 发出一个 request 后从 server 端接收一串多个 response
,Client-Streaming:client 向 server 发送一串多个 request 后从 server 接收一个 response
,Bidirectional-Streaming: 由 client 首先发送 request 启动连接, 然后在这个连接上两端可以不断交互信息.
很明显, gRPC 支持双向的 streaming. 那么如果能把 gRPC 中 ListenableFuture 和 StreamObserver 这两种类型转成 akka-stream 的基本类型应该就能够实现所谓的 reactive-gRPC 了. 如果我们能用 akka-stream 编程方式实现 gRPC 服务调用的话, 可能会遭遇下面的场景: 在服务端我们只需要实现一种 akka-stream 的 Flow 把进来的 request 转化成出去的 response, 如下:
- // Unary case
- Flow[Request].map(computeResponse)
- // Server streaming
- Flow[Request].flatMapConcat(computeResponses)
- // Client streaming
- Flow[Request].fold(defaultResponse)(computeResponse)
- // Bidirectional streaming
- Flow[Request].flatMapConcat(computeResponses)
当然, 这是个 akka-stream Flow, 我们可以在这个 Flow 里调用任何 akka-stream 提供的功能, 如:
- Flow[Request]
- .throttle(1, 10.millis, 1, ThrottleMode.Shaping)
- .map(computeResponse)
在客户端我们可以直接经客户端 stub 调用 Flow, 如下:
- Source
- .single(request)
- .via(stub.doSomething)
- .runForeach(println)
刚好, beyond-the-lines gRPCAkkaStream 开源项目提供这么一种 gRPC StreamObserver 到 aka-stream Flow 转换桥梁. 下面是 gRPCAkkaStream 的使用示范. 先从 Unary-Call 开始: 下面是. proto 文件的 IDL 服务描述:
- syntax = "proto3";
- package learn.grpc.akka.stream.services;
- message NumPair {
- int32 num1 = 1;
- int32 num2 = 2;
- }
- message Num {
- int32 num = 1;
- }
- message SumResult {
- int32 result = 1;
- }
- service SumNumbers {
- rpc SumPair(NumPair) returns (SumResult) {}
- }
我们看看编译后自动产生的 SumGrpcAkkaStream.scala 文件中一些相关类型和函数:
服务界面描述:
trait SumNumbers extends AbstractService {
- override def serviceCompanion = SumNumbers
- def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
- }
我们看到服务函数 sumPair 是一个 akka-stream Fow[NumPair,SumResult,NotUsed]. 下面是具体实现 SumNumbers.sumPair 代码:
- class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
- val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
- override def sumPair: Flow[NumPair, SumResult, NotUsed] = {
- logger.info(s"*** calling sumPair ... ***")
- Flow[NumPair].map {
- case NumPair(a,b) => {
- logger.info(s"serving ${a} + ${b} = ???")
- SumResult(a + b)
- }
- }
- }
产生的客户端 stub 源代码如下:
- class SumNumbersStub(
- channel: Channel,
- options: CallOptions = CallOptions.DEFAULT
- ) extends AbstractStub[SumNumbersStub](channel, options) with SumNumbers {
- override def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] =
- Flow[learn.grpc.akka.stream.services.sum.NumPair].flatMapConcat(request =>
- Source.fromFuture(
- Grpc.guavaFuture2ScalaFuture(
- ClientCalls.futureUnaryCall(channel.newCall(METHOD_SUM_PAIR, options), request)
- )
- )
- )
- def stub(channel: Channel): SumNumbersStub = new SumNumbersStub(channel)
我们可以通过 stub 来调用 sumPair 方法, 如下:
- val channel = ManagedChannelBuilder
- .forAddress(host,port)
- .usePlaintext(true)
- .build()
- val stub = SumGrpcAkkaStream.stub(channel)
- def addPair(num1: Int, num2: Int): Source[String,NotUsed] = {
- logger.info(s"Requesting to add $num1, $num2")
- Source
- .single(NumPair(num1,num2))
- .via(stub.sumPair)
- .map(r => s"the result: ${r.result}")
- }
下面是 Unary-Call 的具体调用方式:
- object UnaryCallClient extends App {
- implicit val system = ActorSystem("UnaryClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.addPair(29,33).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
在 Server-Streaming 中一个 request 返回的是 stream of responses.IDL 的描述如下:
service SumNumbers {
rpc SumPair(NumPair) returns (SumResult) {}
rpc GenIncsFrom(Num) returns (stream Num) {}
}
编译后自动产生的 service trait 如下:
trait SumNumbers extends AbstractService {
- override def serviceCompanion = SumNumbers
- def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
- def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed]
- }
这个服务函数 genIncsFrom 是 Flow[Num,Num,NotUsed], 它的具体实现如下:
- class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
- val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
- override def genIncsFrom: Flow[Num, Num, NotUsed] = {
- logger.info("*** calling genIncsFrom")
- Flow[Num].mapConcat {
- n => (1 to n.num).map {m =>
- logger.info(s"genIncFrom producing num: ${m}")
- Num(m)
- }
- }
- }
- }
因为输出 response 是一个 stream, 可以用 mapConcat 展平 Seq 来产生一个. 在客户方调用服务函数 genIncsFrom 的方式如下:
- def genIncNumbers(len: Int): Source[Int,NotUsed] = {
- logger.info(s"Requesting to produce ${len} inc numbers")
- Source
- .single(Num(len))
- .via(stub.genIncsFrom)
- .map(n => n.num)
- }
我们还是用 runForeach 来运算这个 Source:
- object ServerStreamingClient extends App {
- implicit val system = ActorSystem("ServerStreamingClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.genIncNumbers(5).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
再来看看 Client-Streaming 是如何通过 reactive-stream 实现的. IDL 服务描述如下:
service SumNumbers {
rpc SumPair(NumPair) returns (SumResult) {}
rpc GenIncsFrom(Num) returns (stream Num) {}
rpc SumStreamNums(stream Num) returns (SumResult) {}
}
自动产生的 service 接口如下:
trait SumNumbers extends AbstractService {
- override def serviceCompanion = SumNumbers
- def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
- def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed]
- def sumStreamNums: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
- }
sumStreamNums Flow 实现如下:
- override def sumStreamNums: Flow[Num, SumResult, NotUsed] = {
- logger.info("*** calling sumStreamNums")
- Flow[Num].fold(SumResult(0)) {
- case (a, b) =>
- logger.info(s"receiving operand ${b.num}")
- SumResult(b.num + a.result)
- }
- }
request 是一个 stream, 可以用 aggregation 来汇总成一个 response. 在客户端调用 stub.sumStreamNums:
- def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = {
- logger.info(s"Requesting to sum up ${nums}")
- Source(nums.map(Num(_)).to[collection.immutable.Iterable])
- .via(stub.sumStreamNums)
- .map(r => s"the result: ${r.result}")
- }
- object ClientStreamingClient extends App {
- implicit val system = ActorSystem("ClientStreamingClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
最后我们示范一下 BiDirectional-Streaming. 先用 IDL 定义一个流输入输出的服务函数 keepAdding:
service SumNumbers {
rpc SumPair(NumPair) returns (SumResult) {}
rpc GenIncsFrom(Num) returns (stream Num) {}
rpc SumStreamNums(stream Num) returns (SumResult) {}
rpc KeepAdding(stream Num) returns (stream SumResult) {}
}
这个函数的实现代码:
- override def keepAdding: Flow[Num, SumResult, NotUsed] = {
- Flow[Num].scan(SumResult(0)) {
- case (a,b) =>
- logger.info(s"receiving operand ${b.num}")
- SumResult(b.num + a.result)
- }
- }
这个服务函数的作用是把一串输入数字逐个相加并输出当前结果. 我们可以用 scan 来实现这样的功能. 下面是客户端调用服务的示范代码:
- def ContSum(nums: Seq[Int]): Source[String,NotUsed] = {
- logger.info(s"Requesting to sum up ${nums}")
- Source(nums.map(Num(_)).to[collection.immutable.Iterable])
- .throttle(1, 500.millis, 1, ThrottleMode.shaping)
- .map { n =>
- logger.info(s"Sending number: $n")
- n
- }
- .via(stub.keepAdding)
- .map(r => s"current sum = ${r.result}")
- }
用下面这段代码运算:
- object BiDiStreamingClient extends App {
- implicit val system = ActorSystem("BiDiStreamingClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.ContSum(Seq(12,4,8,19)).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
好, 下面是本次讨论涉及的所有源代码:
- project/scalapb.sbt
- addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
- resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
- libraryDependencies ++= Seq(
- "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1",
- "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5"
- )
- build.sbt
- import scalapb.compiler.Version.scalapbVersion
- import scalapb.compiler.Version.grpcJavaVersion
- name := "gRPCAkkaStreamDemo"
- version := "0.1"
- scalaVersion := "2.12.6"
- resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
- libraryDependencies ++= Seq(
- "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
- "io.grpc" % "grpc-netty" % grpcJavaVersion,
- "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
- "io.monix" %% "monix" % "2.3.0",
- // for GRPC Akkastream
- "beyondthelines" %% "grpcakkastreamruntime" % "0.0.5"
- )
- PB.targets in Compile := Seq(
- scalapb.gen() -> (sourceManaged in Compile).value,
- // generate the akka stream files
- grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value
- )
- src/main/protobuf/sum.proto
- syntax = "proto3";
- package learn.grpc.akka.stream.services;
- message NumPair {
- int32 num1 = 1;
- int32 num2 = 2;
- }
- message Num {
- int32 num = 1;
- }
- message SumResult {
- int32 result = 1;
- }
- service SumNumbers {
rpc SumPair(NumPair) returns (SumResult) {}
rpc GenIncsFrom(Num) returns (stream Num) {}
rpc SumStreamNums(stream Num) returns (SumResult) {}
rpc KeepAdding(stream Num) returns (stream SumResult) {}
- }
- src/main/scala/gRPCAkkaStreamService.scala
- package learn.grpc.akka.stream.services.impl
- import akka.NotUsed
- import akka.stream.scaladsl.Flow
- import learn.grpc.akka.stream.services.sum._
- import java.util.logging.Logger
- class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
- val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
- override def sumPair: Flow[NumPair, SumResult, NotUsed] = {
- logger.info(s"*** calling sumPair ... ***")
- Flow[NumPair].map {
- case NumPair(a, b) => {
- logger.info(s"serving ${a} + ${b} = ???")
- SumResult(a + b)
- }
- }
- }
- override def genIncsFrom: Flow[Num, Num, NotUsed] = {
- logger.info("*** calling genIncsFrom ... ***")
- Flow[Num].mapConcat {
- n =>
- (1 to n.num).map { m =>
- logger.info(s"genIncFrom producing num: ${m}")
- Num(m)
- }
- }
- }
- override def sumStreamNums: Flow[Num, SumResult, NotUsed] = {
- logger.info("*** calling sumStreamNums ... ***")
- Flow[Num].fold(SumResult(0)) {
- case (a, b) =>
- logger.info(s"receiving operand ${b.num}")
- SumResult(b.num + a.result)
- }
- }
- override def keepAdding: Flow[Num, SumResult, NotUsed] = {
- Flow[Num].scan(SumResult(0)) {
- case (a,b) =>
- logger.info(s"receiving operand ${b.num}")
- SumResult(b.num + a.result)
- }
- }
- }
- src/main/scala/gRPCAkkaStreamServer.scala
- package learn.grpc.akka.stream.server
- import java.util.logging.Logger
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import io.grpc.Server
- import learn.grpc.akka.stream.services.impl.gRPCAkkaStreamService
- import io.grpc.ServerBuilder
- import learn.grpc.akka.stream.services.sum._
- class gRPCServer(server: Server) {
- val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)
- def start(): Unit = {
- server.start()
- logger.info(s"Server started, listening on ${server.getPort}")
- sys.addShutdownHook {
- // Use stderr here since the logger may has been reset by its JVM shutdown hook.
- System.err.println("*** shutting down gRPC server since JVM is shutting down")
- stop()
- System.err.println("*** server shut down")
- }
- ()
- }
- def stop(): Unit = {
- server.shutdown()
- }
- /**
- * Await termination on the main thread since the grpc library uses daemon threads.
- */
- def blockUntilShutdown(): Unit = {
- server.awaitTermination()
- }
- }
- object DemoServer extends App {
- implicit val system = ActorSystem("UnaryServer")
- implicit val mat = ActorMaterializer.create(system)
- val server = new gRPCServer(
- ServerBuilder
- .forPort(50051)
- .addService(
- SumGrpcAkkaStream.bindService(
- new gRPCAkkaStreamService
- )
- ).build()
- )
- server.start()
- // UnaryServer.blockUntilShutdown()
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- src/main/scala/gRPCAkkaStreamClient.scala
- package learn.grpc.akka.stream.client
- import learn.grpc.akka.stream.services.sum._
- import java.util.logging.Logger
- import akka.stream.scaladsl._
- import akka.NotUsed
- import akka.actor.ActorSystem
- import akka.stream.{ActorMaterializer, ThrottleMode}
- import scala.concurrent.duration._
- import io.grpc._
- class gRPCAkkaStreamClient(host: String, port: Int) {
- val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamClient].getName)
- val channel = ManagedChannelBuilder
- .forAddress(host,port)
- .usePlaintext(true)
- .build()
- val stub = SumGrpcAkkaStream.stub(channel)
- def addPair(num1: Int, num2: Int): Source[String,NotUsed] = {
- logger.info(s"Requesting to add $num1, $num2")
- Source
- .single(NumPair(num1,num2))
- .via(stub.sumPair)
- .map(r => s"the result: ${r.result}")
- }
- def genIncNumbers(len: Int): Source[Int,NotUsed] = {
- logger.info(s"Requesting to produce ${len} inc numbers")
- Source
- .single(Num(len))
- .via(stub.genIncsFrom)
- .map(n => n.num)
- }
- def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = {
- logger.info(s"Requesting to sum up ${nums}")
- Source(nums.map(Num(_)).to[collection.immutable.Iterable])
- .throttle(1, 500.millis, 1, ThrottleMode.shaping)
- .map { n =>
- logger.info(s"Sending number: $n")
- n
- }
- .via(stub.sumStreamNums)
- .map(r => s"the result: ${r.result}")
- }
- def ContSum(nums: Seq[Int]): Source[String,NotUsed] = {
- logger.info(s"Requesting to sum up ${nums}")
- Source(nums.map(Num(_)).to[collection.immutable.Iterable])
- .throttle(1, 500.millis, 1, ThrottleMode.shaping)
- .map { n =>
- logger.info(s"Sending number: $n")
- n
- }
- .via(stub.keepAdding)
- .map(r => s"current sum = ${r.result}")
- }
- }
- object UnaryCallClient extends App {
- implicit val system = ActorSystem("UnaryClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.addPair(29,33).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object ServerStreamingClient extends App {
- implicit val system = ActorSystem("ServerStreamingClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.genIncNumbers(5).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object ClientStreamingClient extends App {
- implicit val system = ActorSystem("ClientStreamingClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object BiDiStreamingClient extends App {
- implicit val system = ActorSystem("BiDiStreamingClient")
- implicit val mat = ActorMaterializer.create(system)
- val client = new gRPCAkkaStreamClient("localhost", 50051)
- client.ContSum(Seq(12,4,8,19)).runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
来源: https://www.cnblogs.com/tiger-xc/p/9066799.html