我在前面提到过 MongoDB 不支持像 SQL 般字符式的操作指令, 所以我们必须对所有的 MongoDB 操作指令建立 protobuf 类型才能支持 MongoDB 指令的序列化. 在对上一篇博文里我们把 MongoDB 的消息指令序列化单独挑出来讨论了一番, 在这篇我们准备在一个 MongoDB scala 开发环境里通过 streaming 运算来示范这些 protobuf 消息的应用.
与前面我们介绍过的 JDBC-streaming 和 Cassandra-streaming 对应操作指令的处理相同, MGO-streaming 也是是通过一个 Context 对象来描述操作方式和内容细节的, MGOContext 定义如下:
- case class MGOContext(
- dbName: String,
- collName: String,
- actionType: MGO_ACTION_TYPE = MGO_QUERY,
- action: Option[MGOCommands] = None,
- actionOptions: Option[Any] = None,
- actionTargets: Seq[String] = Nil
- ) {
- ctx =>
- def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
- def setCollName(name: String): MGOContext = ctx.copy(collName = name)
- def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
- def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
- def toSomeProto = MGOProtoConversion.ctxToProto(this)
- }
- object MGOContext {
- def apply(db: String, coll: String) = new MGOContext(db, coll)
- def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
- MGOProtoConversion.ctxFromProto(proto)
- }
上面的代码里包括了 toSomeProto, fromProto 两个函数来实现 MGOContext 的序列化转换处理. 这两个函数的实现包含在文章后面提供的源代码中. MongoDB 的. proto 文件 idl 定义如下:
- syntax = "proto3";
- import "google/protobuf/wrappers.proto";
- import "google/protobuf/any.proto";
- import "scalapb/scalapb.proto";
- option (scalapb.options) = {
- // use a custom Scala package name
- // package_name: "io.ontherocks.introgrpc.demo"
- // don't append file name to package
- flat_package: true
- // generate one Scala file for all messages (services still get their own file)
- single_file: true
- // add imports to generated file
- // useful when extending traits or using custom types
- // import: "io.ontherocks.hellogrpc.RockingMessage"
- // code to put at the top of generated file
- // works only with `single_file: true`
- //preamble: "sealed trait SomeSealedTrait"
- };
- /*
- * Demoes various customization options provided by ScalaPBs.
- */
- package sdp.grpc.services;
- import "misc/sdp.proto";
- message ProtoMGOBson {
- bytes bson = 1;
- }
- message ProtoMGODocument {
- bytes document = 1;
- }
- message ProtoMGOResultOption { //FindObservable
- int32 optType = 1;
- ProtoMGOBson bsonParam = 2;
- int32 valueParam = 3;
- }
- message ProtoMGOAdmin{
- string tarName = 1;
- repeated ProtoMGOBson bsonParam = 2;
- ProtoAny options = 3;
- string objName = 4;
- }
- message ProtoMGOContext { //MGOContext
- string dbName = 1;
- string collName = 2;
- int32 commandType = 3;
- repeated ProtoMGOBson bsonParam = 4;
- repeated ProtoMGOResultOption resultOptions = 5;
- repeated string targets = 6;
- ProtoAny options = 7;
- repeated ProtoMGODocument documents = 8;
- google.protobuf.BoolValue only = 9;
- ProtoMGOAdmin adminOptions = 10;
- }
下面是本次示范的. proto 文件:
- syntax = "proto3";
- import "google/protobuf/wrappers.proto";
- import "google/protobuf/any.proto";
- import "scalapb/scalapb.proto";
- option (scalapb.options) = {
- // use a custom Scala package name
- // package_name: "io.ontherocks.introgrpc.demo"
- // don't append file name to package
- flat_package: true
- // generate one Scala file for all messages (services still get their own file)
- single_file: true
- // add imports to generated file
- // useful when extending traits or using custom types
- // import: "io.ontherocks.hellogrpc.RockingMessage"
- // code to put at the top of generated file
- // works only with `single_file: true`
- //preamble: "sealed trait SomeSealedTrait"
- };
- /*
- * Demoes various customization options provided by ScalaPBs.
- */
- package sdp.grpc.services;
- import "misc/sdp.proto";
- import "cql/cql.proto";
- import "jdbc/jdbc.proto";
- import "mgo/mgo.proto";
- service MGOServices {
- rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
- rpc runQueries(stream ProtoMGOContext) returns (stream ProtoMGODocument) {}
- }
以上通过 import "mgo/mgo.proto" 引用了 ProtoMGOContext 类型, 并在服务定义 rpc runQueries 里用作了传入参数.
下面这段是本次示范的服务实现代码:
- package sdp.grpc.mongo.server
- import sdp.mongo.engine._
- import MGOClasses._
- import MGOEngine._
- import akka.NotUsed
- import akka.stream.scaladsl.Flow
- import sdp.logging.LogSupport
- import sdp.grpc.services._
- import org.mongodb.scala._
- import scala.concurrent._
- import akka.stream.ActorMaterializer
- import sdp.mongo.engine.MGOProtoConversion.MGODocument
- class MGOStreamingServices(implicit ec: ExecutionContextExecutor,
- mat: ActorMaterializer, client: MongoClient)
- extends MgostreamingGrpcAkkaStream.MGOServices with LogSupport {
- override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
- Flow[HelloMsg]
- .map {r => HelloMsg(r.hello+", mongo ...")}
- }
- override def runQueries: Flow[ProtoMGOContext, ProtoMGODocument, NotUsed] =
- Flow[ProtoMGOContext]
- .flatMapConcat {p =>
- val ctx = MGOContext.fromProto(p)
- mongoStream(ctx).map{doc => MGODocument.toProto(doc)}
- }
- }
这个 runQueries 服务函数的处理流程是: 接收 ProtoMGOContext, 转换成 MGOContext, 传给 mongoStream, 运算 mongoStream 返回 ProtoMGODocument 结果. mongoStream 函数的代码如下:
- def mongoStream(ctx: MGOContext)(
- implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
- log.info(s"mongoStream> MGOContext: ${ctx}")
- def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mongoStream> uery action cannot be null!")
- throw new IllegalArgumentException("query action cannot be null!")
- }
- try {
- ctx.action.get match {
- case Find(None, Nil, false) => //FindObservable
- MongoSource(coll.find())
- case Find(None, Nil, true) => //FindObservable
- MongoSource(coll.find().first())
- case Find(Some(filter), Nil, false) => //FindObservable
- MongoSource(coll.find(filter))
- case Find(Some(filter), Nil, true) => //FindObservable
- MongoSource(coll.find(filter).first())
- case Find(None, sro, _) => //FindObservable
- val next = toResultOption(sro)
- MongoSource(next(coll.find[Document]()))
- case Find(Some(filter), sro, _) => //FindObservable
- val next = toResultOption(sro)
- MongoSource(next(coll.find[Document](filter)))
- case _ =>
- log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
- throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
- }
- }
- catch { case e: Exception =>
- log.error(s"mongoStream> runtime error: ${e.getMessage}")
- throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
- }
- }
调用服务的 gRPC 客户端实现代码如下:
- class MGOStreamClient(host: String, port: Int)(
- implicit ec: ExecutionContextExecutor) extends LogSupport {
- val channel = ManagedChannelBuilder
- .forAddress(host, port)
- .usePlaintext()
- .build()
- val stub = MgostreamingGrpcAkkaStream.stub(channel)
- def echoHello: Source[HelloMsg, NotUsed] = {
- val row = HelloMsg("hello")
- val rows = List.fill[HelloMsg](100)(row)
- Source
- .fromIterator(() => rows.iterator)
- .via(stub.clientStreaming)
- }
- val filter = and(equal("state","California"),
- equal("county","Alameda"),
- equal("value",10))
- val proj = exclude("rowid","_id")
- val proj1 = include("county","value")
- val rtxfmr = Seq(
- ResultOptions(
- optType = FOD_LIMIT,
- value = 3),
- ResultOptions(
- optType = FOD_PROJECTION,
- bson = Some(proj)) /*,
- ResultOptions(
- optType = FOD_PROJECTION,
- bson = Some(proj1)) */
- )
- val cmd = Find(filter = Some(filter), andThen = rtxfmr)
- val ctx = MGOContext("testdb","aqmrpt").setCommand(cmd)
- def mgoQueries: Source[ProtoMGODocument,NotUsed] = {
- Source
- .single[ProtoMGOContext](ctx.toSomeProto.get)
- .via(stub.runQueries)
- }
- }
- object MGOStreamingClient extends App {
- implicit val system = ActorSystem("EchoNumsClient")
- implicit val mat = ActorMaterializer.create(system)
- implicit val ec = system.dispatcher
- val mgoClient = new MGOStreamClient("localhost", 50051)
- mgoClient.mgoQueries.runForeach(pd =>
- println(MGODocument.fromProto(pd).toJson()))
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
调用服务函数运算结果:
- { "measureid" : { "$numberLong" : "83" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
- { "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
- { "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 2000, "value" : 10 }
下面是 MongoDB 序列化类型转换工具函数的源代码:
- MGOProtoConversion.scala
- package sdp.mongo.engine
- import org.mongodb.scala.bson.collection.immutable.Document
- import org.bson.conversions.Bson
- import sdp.grpc.services._
- import protobuf.bytes.Converter._
- import MGOClasses._
- import MGOAdmins._
- import MGOCommands._
- import org.bson.BsonDocument
- import org.bson.codecs.configuration.CodecRegistry
- import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
- import org.mongodb.scala.FindObservable
- object MGOProtoConversion {
- type MGO_COMMAND_TYPE = Int
- val MGO_COMMAND_FIND = 0
- val MGO_COMMAND_COUNT = 20
- val MGO_COMMAND_DISTICT = 21
- val MGO_COMMAND_DOCUMENTSTREAM = 1
- val MGO_COMMAND_AGGREGATE = 2
- val MGO_COMMAND_INSERT = 3
- val MGO_COMMAND_DELETE = 4
- val MGO_COMMAND_REPLACE = 5
- val MGO_COMMAND_UPDATE = 6
- val MGO_ADMIN_DROPCOLLECTION = 8
- val MGO_ADMIN_CREATECOLLECTION = 9
- val MGO_ADMIN_LISTCOLLECTION = 10
- val MGO_ADMIN_CREATEVIEW = 11
- val MGO_ADMIN_CREATEINDEX = 12
- val MGO_ADMIN_DROPINDEXBYNAME = 13
- val MGO_ADMIN_DROPINDEXBYKEY = 14
- val MGO_ADMIN_DROPALLINDEXES = 15
- case class AdminContext(
- tarName: String = "",
- bsonParam: Seq[Bson] = Nil,
- options: Option[Any] = None,
- objName: String = ""
- ){
- def toProto = sdp.grpc.services.ProtoMGOAdmin(
- tarName = this.tarName,
- bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
- objName = this.objName,
- options = this.options.map(b => ProtoAny(marshal(b)))
- )
- }
- object AdminContext {
- def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
- tarName = msg.tarName,
- bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
- objName = msg.objName,
- options = msg.options.map(b => unmarshal[Any](b.value))
- )
- }
- case class Context(
- dbName: String = "",
- collName: String = "",
- commandType: MGO_COMMAND_TYPE,
- bsonParam: Seq[Bson] = Nil,
- resultOptions: Seq[ResultOptions] = Nil,
- options: Option[Any] = None,
- documents: Seq[Document] = Nil,
- targets: Seq[String] = Nil,
- only: Boolean = false,
- adminOptions: Option[AdminContext] = None
- ){
- def toProto = new sdp.grpc.services.ProtoMGOContext(
- dbName = this.dbName,
- collName = this.collName,
- commandType = this.commandType,
- bsonParam = this.bsonParam.map(bsonToProto),
- resultOptions = this.resultOptions.map(_.toProto),
- options = { if(this.options == None)
- Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else
- Some(ProtoAny(marshal(this.options.get))) },
- documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
- targets = this.targets,
- only = Some(this.only),
- adminOptions = this.adminOptions.map(_.toProto)
- )
- }
- object MGODocument {
- def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
- unmarshal[Document](msg.document)
- def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
- new ProtoMGODocument(marshal(doc))
- }
- object MGOProtoMsg {
- def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
- dbName = msg.dbName,
- collName = msg.collName,
- commandType = msg.commandType,
- bsonParam = msg.bsonParam.map(protoToBson),
- resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
- options = msg.options.map(a => unmarshal[Any](a.value)),
- documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
- targets = msg.targets,
- adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
- )
- }
- def bsonToProto(bson: Bson) =
- ProtoMGOBson(marshal(bson.toBsonDocument(
- classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
- def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
- val bsdoc = unmarshal[BsonDocument](proto.bson)
- override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
- }
- def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
- case MGO_COMMAND_FIND => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Find())
- )
- def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
- (proto.bsonParam, proto.resultOptions, proto.only) match {
- case (Nil, Nil, None) => ctx
- case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
- case (bp,Nil,None) => ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head))))
- case (bp,Nil,Some(b)) => ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
- case (bp,fo,None) => {
- ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head)),
- andThen = fo.map(ResultOptions.fromProto)
- ))
- }
- case (bp,fo,Some(b)) => {
- ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head)),
- andThen = fo.map(ResultOptions.fromProto),
- firstOnly = b))
- }
- case _ => ctx
- }
- }
- case MGO_COMMAND_COUNT => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Count())
- )
- (proto.bsonParam, proto.options) match {
- case (Nil, None) => ctx
- case (bp, None) => ctx.setCommand(
- Count(filter = Some(protoToBson(bp.head)))
- )
- case (Nil,Some(o)) => ctx.setCommand(
- Count(options = Some(unmarshal[Any](o.value)))
- )
- case _ => ctx
- }
- }
- case MGO_COMMAND_DISTICT => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Distict(fieldName = proto.targets.head))
- )
- (proto.bsonParam) match {
- case Nil => ctx
- case bp: Seq[ProtoMGOBson] => ctx.setCommand(
- Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
- )
- case _ => ctx
- }
- }
- case MGO_COMMAND_AGGREGATE => {
- new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
- )
- }
- case MGO_ADMIN_LISTCOLLECTION => {
- new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(ListCollection(proto.dbName)))
- }
- case MGO_COMMAND_INSERT => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Insert(
- newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(Insert(
- newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_COMMAND_DELETE => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Delete(
- filter = protoToBson(proto.bsonParam.head)))
- )
- (proto.options, proto.only) match {
- case (None,None) => ctx
- case (None,Some(b)) => ctx.setCommand(Delete(
- filter = protoToBson(proto.bsonParam.head),
- onlyOne = b))
- case (Some(o),None) => ctx.setCommand(Delete(
- filter = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)))
- )
- case (Some(o),Some(b)) => ctx.setCommand(Delete(
- filter = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)),
- onlyOne = b)
- )
- }
- }
- case MGO_COMMAND_REPLACE => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Replace(
- filter = protoToBson(proto.bsonParam.head),
- replacement = unmarshal[Document](proto.documents.head.document)))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(Replace(
- filter = protoToBson(proto.bsonParam.head),
- replacement = unmarshal[Document](proto.documents.head.document),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_COMMAND_UPDATE => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head)))
- )
- (proto.options, proto.only) match {
- case (None,None) => ctx
- case (None,Some(b)) => ctx.setCommand(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head),
- onlyOne = b))
- case (Some(o),None) => ctx.setCommand(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head),
- options = Some(unmarshal[Any](o.value)))
- )
- case (Some(o),Some(b)) => ctx.setCommand(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head),
- options = Some(unmarshal[Any](o.value)),
- onlyOne = b)
- )
- }
- }
- case MGO_ADMIN_DROPCOLLECTION =>
- new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropCollection(proto.collName))
- )
- case MGO_ADMIN_CREATECOLLECTION => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(CreateCollection(proto.collName))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_CREATEVIEW => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(CreateView(viewName = proto.targets.head,
- viewOn = proto.targets.tail.head,
- pipeline = proto.bsonParam.map(p => protoToBson(p))))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
- viewOn = proto.targets.tail.head,
- pipeline = proto.bsonParam.map(p => protoToBson(p)),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_CREATEINDEX=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_DROPINDEXBYNAME=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropIndexByName(indexName = proto.targets.head))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_DROPINDEXBYKEY=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_DROPALLINDEXES=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropAllIndexes())
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(DropAllIndexes(
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- }
- def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
- case None => None
- case Some(act) => act match {
- case Count(filter, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_COUNT,
- bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
- else Seq(bsonToProto(filter.get))},
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- case Distict(fieldName, filter) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_DISTICT,
- bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
- else Seq(bsonToProto(filter.get))},
- targets = Seq(fieldName)
- ))
- case Find(filter, andThen, firstOnly) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_FIND,
- bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
- else Seq(bsonToProto(filter.get))},
- resultOptions = andThen.map(_.toProto)
- ))
- case Aggregate(pipeLine) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_AGGREGATE,
- bsonParam = pipeLine.map(bsonToProto)
- ))
- case Insert(newdocs, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_INSERT,
- documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- case Delete(filter, options, onlyOne) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_DELETE,
- bsonParam = Seq(bsonToProto(filter)),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- only = Some(onlyOne)
- ))
- case Replace(filter, replacement, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_REPLACE,
- bsonParam = Seq(bsonToProto(filter)),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- documents = Seq(ProtoMGODocument(marshal(replacement)))
- ))
- case Update(filter, update, options, onlyOne) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_UPDATE,
- bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- only = Some(onlyOne)
- ))
- case DropCollection(coll) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = coll,
- commandType = MGO_ADMIN_DROPCOLLECTION
- ))
- case CreateCollection(coll, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = coll,
- commandType = MGO_ADMIN_CREATECOLLECTION,
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- case ListCollection(dbName) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- commandType = MGO_ADMIN_LISTCOLLECTION
- ))
- case CreateView(viewName, viewOn, pipeline, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_CREATEVIEW,
- bsonParam = pipeline.map(bsonToProto),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- targets = Seq(viewName,viewOn)
- ))
- case CreateIndex(key, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_CREATEINDEX,
- bsonParam = Seq(bsonToProto(key)),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- case DropIndexByName(indexName, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_DROPINDEXBYNAME,
- targets = Seq(indexName),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- case DropIndexByKey(key, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_DROPINDEXBYKEY,
- bsonParam = Seq(bsonToProto(key)),
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- case DropAllIndexes(options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_DROPALLINDEXES,
- options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- }
- }
- }
来源: https://www.cnblogs.com/tiger-xc/p/9536859.html