gRPC Streaming 的操作对象由服务端和客户端组成. 在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务. 这时调用服务端又成为了提供服务端的客户端了 (服务消费端). 那么如果我们用 streaming 形式来提交服务需求及获取计算结果就是以一个服务端为 Source 另一个服务端为通过式 passthrough Flow 的 stream 运算了. 讲详细点就是请求方用需求构建 Source, 以连接 Flow 的方式把需求传递给服务提供方. 服务提供方在 Flow 内部对需求进行处理后再把结果返回来, 请求方 run 这个连接的 stream 应该就可以得到需要的结果了. 下面我们就针对以上场景在一个由 JDBC,Cassandra,MongoDB 几种 gRPC 服务组成的集群环境里示范在这几个服务之间的 stream 连接和运算.
首先, 我们设计一个简单但比较有代表性的例子: 从 JDBC 的客户端传一个字符型消息 hello 给 JDBC 服务端, JDBC 服务端在 hello 后面添加 ",from jdbc to cassandra" 然后通过 Cassandra 客户端把消息当作请求传给 Cassandra 服务端, Cassandra 服务端在消息后面再加上 ",from cassandra to mongo" 并通过 MongoDB 客户端把消息传给 MongoDB 服务端, 最后 MongoDB 服务端在消息后面添加 ",mongo says hi". 整个 stream 的形状是 jdbc-client->jdbc-service->cassandra-service-mongodb-service. 如果 run 这个 stream 得到的结果应该是一个描述完整移动路径的消息. 从请求 - 服务角度来描述: 我们可以把每个节点消息更新处理当作某种完整的数据处理过程.
以下分别是 JDBC,Cassandra,MongoDB gRPC IDL 定义:
- service JDBCServices {
- rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
- }
- service CQLServices {
- rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
- }
- service MGOServices {
- rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
- }
三个服务共用了 protobuf 消息类型 HelloMsg. 我们把共用的消息统一放到一个 common.proto 文件里:
- syntax = "proto3";
- package sdp.grpc.services;
- message HelloMsg {
- string hello = 1;
- }
- message DataRow {
- string countyname = 1;
- string statename = 2;
- int32 reportyear = 3;
- int32 value = 4;
- }
然后在示范应用的. proto 文件中用 import 把所有 protobuf,gRPC 服务定义都集中起来:
- syntax = "proto3";
- import "google/protobuf/wrappers.proto";
- import "google/protobuf/any.proto";
- import "scalapb/scalapb.proto";
- option (scalapb.options) = {
- // use a custom Scala package name
- // package_name: "io.ontherocks.introgrpc.demo"
- // don't append file name to package
- flat_package: true
- // generate one Scala file for all messages (services still get their own file)
- single_file: true
- // add imports to generated file
- // useful when extending traits or using custom types
- // import: "io.ontherocks.hellogrpc.RockingMessage"
- // code to put at the top of generated file
- // works only with `single_file: true`
- //preamble: "sealed trait SomeSealedTrait"
- };
- /*
- * Demoes various customization options provided by ScalaPBs.
- */
- package sdp.grpc.services;
- import "misc/sdp.proto";
- import "common.proto";
- import "cql/cql.proto";
- import "jdbc/jdbc.proto";
- import "mgo/mgo.proto";
下面我们把最核心的服务实现挑出来讲解一下, 先看看 Cassandra 服务的实现:
- import sdp.grpc.mongo.client.MGOClient
- class CQLStreamingServices(implicit ec: ExecutionContextExecutor,
- mat: ActorMaterializer, session: Session)
- extends CqlGrpcAkkaStream.CQLServices with LogSupport{
- val mongoClient = new MGOClient
- val stub = mongoClient.stub
- def sayHelloTo(msg: String): Flow[HelloMsg, HelloMsg, NotUsed] =
- Flow[HelloMsg].map { r => HelloMsg(r.hello + msg)}
- .via(stub.greeting)
- override def greeting: Flow[HelloMsg, HelloMsg, NotUsed] =
- Flow[HelloMsg]
- .via(sayHelloTo(",from cassandra to mongo"))
- }
streaming 方式的 gRPC 服务其实就是一个 akka-stream 的 Flow[R1,R2,M], 它把收到的数据 R1 处理后转换成 R2 输出. 在处理 R1 的环节里可能会需要其它服务的运算结果. 在以上例子里 CQLService 把收到的消息加工转换后传给 MGOService 并等待 MGOService 再深度加工返还的结果, 所以 sayHelloTo 还是一个有两个节点的 Flow: 在第一个节点中对收到的消息进行加工, 第二个节点把加工的消息传给另一个服务并连接它的运算结果作为本身最终的输出. 调用其它跨集群节点的服务必须经该服务的 gRPC 客户端进行, 这里调用的 MGOClient:
- package sdp.grpc.mongo.client
- import sdp.grpc.services._
- import sdp.logging.LogSupport
- import io.grpc._
- import common._
- import sdp.grpc.services._
- import akka.stream.scaladsl._
- import akka.NotUsed
- class MGOClient extends LogSupport {
- val channel = ManagedChannelBuilder
- .forAddress("localhost", 50051)
- .usePlaintext()
- .build()
- val stub = MgoGrpcAkkaStream.stub(channel)
- }
JDBCService 连接 CQLService, CQLService 连接 MGOService:
- import sdp.grpc.cassandra.client.CQLClient
- class JDBCStreamingServices(implicit ec: ExecutionContextExecutor)
- extends JdbcGrpcAkkaStream.JDBCServices with LogSupport {
- val cassandraClient = new CQLClient
- val stub = cassandraClient.stub
- def sayHelloTo(msg: String): Flow[HelloMsg,HelloMsg,NotUsed] =
- Flow[HelloMsg]
- .map {r => HelloMsg(r.hello + msg)}
- .via(stub.greeting)
- override def greeting: Flow[HelloMsg, HelloMsg, NotUsed] =
- Flow[HelloMsg]
- .via(sayHelloTo(",from jdbc to cassandra"))
- }
最后我们用 DemoApp 来示范整个过程:
- package demo.sdp.grpc
- import akka.actor.ActorSystem
- import akka.stream.{ActorMaterializer, ThrottleMode}
- import sdp.grpc.jdbc.client.JDBCClient
- object DemoApp extends App {
- implicit val system = ActorSystem("jdbcClient")
- implicit val mat = ActorMaterializer.create(system)
- implicit val ec = system.dispatcher
- val jdbcClient = new JDBCClient
- jdbcClient.sayHello.runForeach(r => println(r.hello))
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
DemoApp 调用了 JDBCClient:
- package sdp.grpc.jdbc.client
- import sdp.grpc.services._
- import sdp.logging.LogSupport
- import io.grpc._
- import common._
- import sdp.grpc.services._
- import akka.stream.scaladsl._
- import akka.NotUsed
- class JDBCClient extends LogSupport {
- val channel = ManagedChannelBuilder
- .forAddress("localhost", 50053)
- .usePlaintext()
- .build()
- val stub = JdbcGrpcAkkaStream.stub(channel)
- def sayHello: Source[HelloMsg, NotUsed] = {
- val row = HelloMsg("hello")
- val rows = List.fill[HelloMsg](100)(row)
- Source
- .fromIterator(() => rows.iterator)
- .via(stub.greeting)
- }
- }
运行 DemoApp 显示的结果:
- hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
- hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
- hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
- hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
- hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
- ...
来源: https://www.cnblogs.com/tiger-xc/p/9660828.html