接着上期讨论的 gRPC unary 服务我们跟着介绍 gRPC streaming, 包括: Server-Streaming, Client-Streaming 及 Bidirectional-Streaming. 我们首先在. proto 文件里用 IDL 描述 Server-Streaming 服务:
- /*
- * responding stream of increment results
- */
- service SumOneToMany {rpc AddOneToMany(SumRequest) returns (stream SumResponse) {}
- }
- message SumRequest {
- int32 toAdd = 1;
- }
- message SumResponse {
- int32 currentResult = 1;
- }
SumOneToMany 服务中 AddOneToMany 函数接受一个 SumRequest 然后返回 stream SumResponse, 就这么简单. 经过编译后产生了 SumOneToManyGrpc.scala 文件, 在这个文件里提供了有关 RPC 操作的 api. 我们看看 protoc 把 IDL 描述的服务函数变成了什么样的 scala 函数:
def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit
调用 scala 函数 addOneToMany 需要传入参数 SumRequest 和 StreamObserver[SumResponse], 也就是说用户需要准备这两个入参数. 在调用 addOneToMany 函数时用户事先构建这个 StreamObserver 传给 server, 由 server 把结果通过这个结构传回用户. gRPC 是通过 StreamObserver 类型实例来实现数据 streaming 的. 这个类型的构建例子如下:
- val responseObserver = new StreamObserver[SumResponse] {
- def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
- def onCompleted(): Unit = println("ON_COMPLETED")
- def onNext(value: SumResponse): Unit =
- println(s"ON_NEXT: Current sum: ${value.currentResult}")
- }
server 端通过 onNext 把结果不断传回给 client 端, 因为这个 responseObserver 是在 client 端构建的. 下面是 SumManyToMany 的实现:
- class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany {
- override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = {
- val currentSum: AtomicInt = Atomic(0)
- (1 to request.toAdd).map { _ =>
- responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
- }
- Thread.sleep(1000) //delay and then finish
- responseObserver.onCompleted()
- }
- }
这个 addOneToMany 服务函数把 1-request.toAdd 之间的数字逐个通过 responseObserver 返还调用方. 在客户端如下调用服务:
- // get asyn stub
- val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel)
- // prepare stream observer
- val streamObserver = new StreamObserver[SumResponse] {
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done incrementing !!!")
- override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
- }
- // call service with stream observer
- client.addOneToMany(SumRequest().withToAdd(6),streamObserver)
Client-Streaming 服务的 IDL 如下:
- /*
- * responding a result from a request of stream of numbers
- */
- service SumManyToOne {
- rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
- }
传入 stream SumRequest, 返回 SumResponse.scalaPB 自动产生 scala 代码中的 addManyToOne 函数款式如下:
def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
调用方提供 StreamObserver[SumResponse] 用作返回结果, 函数返回客方需要的 StreamObserver[SumRequest] 用以传递 request 流. 注意: 虽然在. proto 文件中 AddManyToOne 的返回结果是单个 SumResponse, 但产生的 scala 函数则提供了一个 StreamObserver[SumResponse] 类型, 所以需要谨记只能调用一次 onNext. 下面是这个服务的实现代码:
- class Many2OneService extends SumManyToOneGrpc.SumManyToOne {
- val currentSum: AtomicInt = Atomic(0)
- override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
- new StreamObserver[SumRequest] {
- val currentSum: AtomicInt = Atomic(0)
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done summing!")
- override def onNext(value: SumRequest): Unit = {
- //only allow one response
- if (value.toAdd> 0)
- currentSum.add(value.toAdd)
- else
- responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
- }
- }
- }
客户方调用示范如下:
- //pass to server for result
- val respStreamObserver = new StreamObserver[SumResponse] {
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done responding!")
- override def onNext(value: SumResponse): Unit =
- println(s"Result: ${value.currentResult}")
- }
- //get async stub
- val client = SumManyToOneGrpc.stub(channel)
- //get request stream observer from server
- val reqStreamObserver = client.addManyToOne(respStreamObserver)
- List(2,5,8,4,0).map { n =>
- reqStreamObserver.onNext(SumRequest(n))
- }
Bidirectional-Streaming 的 IDL 描述如下:
- /*
- * Sums up numbers received from the client and returns the current result after each received request.
- */
- service SumInter {
- rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
- }
这个 service SumInter 描述了 stream SumRequest 及 stream SumResponse 运算模式. 产生的对应 scala 函数如下:
def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
这个函数的款式与 Client-Streaming 服务函数是一样的. 但是, 我们可以通过 responseObserver 传递多个 SumResponse. 这个服务的实现代码是这样的:
- class Many2ManyService extends SumInterGrpc.SumInter {
- override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
- new StreamObserver[SumRequest] {
- val currentSum: AtomicInt = Atomic(0)
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done requesting!")
- override def onNext(value: SumRequest): Unit = {
- responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
- }
- }
- }
我们可以多次调用 responseObserver.onNext. 客户端源代码如下:
- //create stream observer for result stream
- val responseObserver = new StreamObserver[SumResponse] {
- def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
- def onCompleted(): Unit = println("ON_COMPLETED")
- def onNext(value: SumResponse): Unit =
- println(s"ON_NEXT: Current sum: ${value.currentResult}")
- }
- //get request container
- val requestObserver = client.addInter(responseObserver)
- scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) {
- val toBeAdded = Random.nextInt(11)
- println(s"Adding number: $toBeAdded")
- requestObserver.onNext(SumRequest(toBeAdded))
- }
下面是本次示范的源代码:
- project/scalapb.sbt
- addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
- libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"
- build.sbt
- import scalapb.compiler.Version.scalapbVersion
- import scalapb.compiler.Version.grpcJavaVersion
- name := "learn-gRPC"
- version := "0.1"
- scalaVersion := "2.12.6"
- libraryDependencies ++= Seq(
- "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
- "io.grpc" % "grpc-netty" % grpcJavaVersion,
- "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
- "io.monix" %% "monix" % "2.3.0"
- )
- PB.targets in Compile := Seq(
- scalapb.gen() -> (sourceManaged in Compile).value
- )
- src/main/protobuf/sum.proto
- syntax = "proto3";
- package learn.grpc.services;
- /*
- * responding stream of increment results
- */
- service SumOneToMany {
- rpc AddOneToMany(SumRequest) returns (stream SumResponse) {}
- }
- /*
- * responding a result from a request of stream of numbers
- */
- service SumManyToOne {
- rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
- }
- /*
- * Sums up numbers received from the client and returns the current result after each received request.
- */
- service SumInter {
- rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
- }
- message SumRequest {
- int32 toAdd = 1;
- }
- message SumResponse {
- int32 currentResult = 1;
- }
- gRPCServer.scala
- package learn.grpc.server
- import io.grpc.{ServerBuilder,ServerServiceDefinition}
- trait gRPCServer {
- def runServer(service: ServerServiceDefinition): Unit = {
- val server = ServerBuilder
- .forPort(50051)
- .addService(service)
- .build
- .start
- // make sure our server is stopped when jvm is shut down
- Runtime.getRuntime.addShutdownHook(new Thread() {
- override def run(): Unit = server.shutdown()
- })
- server.awaitTermination()
- }
- }
- OneToManyServer.scala
- package learn.grpc.sum.one2many.server
- import io.grpc.stub.StreamObserver
- import learn.grpc.services.sum._
- import monix.execution.atomic.{Atomic,AtomicInt}
- import learn.grpc.server.gRPCServer
- object One2ManyServer extends gRPCServer {
- class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany {
- override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = {
- val currentSum: AtomicInt = Atomic(0)
- (1 to request.toAdd).map { _ =>
- responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
- }
- Thread.sleep(1000) //delay and then finish
- responseObserver.onCompleted()
- }
- }
- def main(args: Array[String]) = {
val svc = SumOneToManyGrpc.bindService(new SumOne2ManyService, scala.concurrent.ExecutionContext.global)
- runServer(svc)
- }
- }
- OneToManyClient.scala
package learn.grpc.sum.one2many.client
- import io.grpc.stub.StreamObserver
- import learn.grpc.services.sum._
- object One2ManyClient {
- def main(args: Array[String]): Unit = {
- //build connection channel
- val channel = io.grpc.ManagedChannelBuilder
- .forAddress("LocalHost",50051)
- .usePlaintext(true)
- .build()
- // get asyn stub
- val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel)
- // prepare stream observer
- val streamObserver = new StreamObserver[SumResponse] {
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done incrementing !!!")
- override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
- }
- // call service with stream observer
- client.addOneToMany(SumRequest().withToAdd(6),streamObserver)
- // wait for async execution
- scala.io.StdIn.readLine()
- }
- }
- ManyToOneServer.scala
- package learn.grpc.sum.many2one.server
- import io.grpc.stub.StreamObserver
- import learn.grpc.services.sum._
- import learn.grpc.server.gRPCServer
- import monix.execution.atomic.{Atomic,AtomicInt}
- object Many2OneServer extends gRPCServer {
- class Many2OneService extends SumManyToOneGrpc.SumManyToOne {
- val currentSum: AtomicInt = Atomic(0)
- override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
- new StreamObserver[SumRequest] {
- val currentSum: AtomicInt = Atomic(0)
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done summing!")
- override def onNext(value: SumRequest): Unit = {
- //only allow one response
- if (value.toAdd> 0)
- currentSum.add(value.toAdd)
- else
- responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
- }
- }
- }
- def main(args: Array[String]): Unit = {
val svc = SumManyToOneGrpc.bindService(new Many2OneService,scala.concurrent.ExecutionContext.global)
- runServer(svc)
- }
- }
- ManyToOneClient.scala
package learn.grpc.sum.many2one.client
- import io.grpc.stub.StreamObserver
- import learn.grpc.services.sum._
- object Many2OneClient {
- def main(args: Array[String]): Unit = {
- //build channel
- val channel = io.grpc.ManagedChannelBuilder
- .forAddress("LocalHost",50051)
- .usePlaintext(true)
- .build()
- //pass to server for result
- val respStreamObserver = new StreamObserver[SumResponse] {
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done responding!")
- override def onNext(value: SumResponse): Unit =
- println(s"Result: ${value.currentResult}")
- }
- //get async stub
- val client = SumManyToOneGrpc.stub(channel)
- //get request stream observer from server
- val reqStreamObserver = client.addManyToOne(respStreamObserver)
- List(2,5,8,4,0).map { n =>
- reqStreamObserver.onNext(SumRequest(n))
- }
- scala.io.StdIn.readLine()
- }
- }
- ManyToManyServer.scala
- package learn.grpc.sum.many2many.server
- import io.grpc.stub.StreamObserver
- import learn.grpc.services.sum._
- import learn.grpc.server.gRPCServer
- import monix.execution.atomic.{Atomic,AtomicInt}
- object Many2ManyServer extends gRPCServer {
- class Many2ManyService extends SumInterGrpc.SumInter {
- override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
- new StreamObserver[SumRequest] {
- val currentSum: AtomicInt = Atomic(0)
- override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
- override def onCompleted(): Unit = println("Done requesting!")
- override def onNext(value: SumRequest): Unit = {
- responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
- }
- }
- }
- def main(args: Array[String]): Unit = {
val svc = SumInterGrpc.bindService(new Many2ManyService, scala.concurrent.ExecutionContext.global)
- runServer(svc)
- }
- }
- ManyToManyClient.scala
package learn.grpc.sum.many2many.client
- import monix.execution.Scheduler.{global => scheduler}
- import learn.grpc.services.sum._
- import scala.concurrent.duration._
- import scala.util.Random
- import io.grpc._
- import io.grpc.stub.StreamObserver
- object Many2ManyClient {
- def main(args: Array[String]): Unit = {
- val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build
- val client = SumInterGrpc.stub(channel)
- //create stream observer for result stream
- val responseObserver = new StreamObserver[SumResponse] {
- def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
- def onCompleted(): Unit = println("ON_COMPLETED")
- def onNext(value: SumResponse): Unit =
- println(s"ON_NEXT: Current sum: ${value.currentResult}")
- }
- //get request container
- val requestObserver = client.addInter(responseObserver)
- scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) {
- val toBeAdded = Random.nextInt(11)
- println(s"Adding number: $toBeAdded")
- requestObserver.onNext(SumRequest(toBeAdded))
- }
- scala.io.StdIn.readLine()
- }
- }
来源: https://www.cnblogs.com/tiger-xc/p/9023937.html