PICE(5):MongoDBStreaming - gRPC -MGO Service

我在前面提到过MongoDB不支持像SQL般字符式的操作指令,所以我们必须对所有的MongoDB操作指令建立protobuf类型才能支持MongoDB指令的序列化。在对上一篇博文里我们把MongoDB的消息指令序列化单独挑出来讨论了一番,在这篇我们准备在一个MongoDB scala开发环境里通过streaming运算来示范这些protobuf消息的应用。

与前面我们介绍过的JDBC-streaming和Cassandra-streaming对应操作指令的处理相同,MGO-streaming也是是通过一个Context对象来描述操作方式和内容细节的,MGOContext定义如下:

 case class MGOContext(
                         dbName: String,
                         collName: String,
                         actionType: MGO_ACTION_TYPE = MGO_QUERY,
                         action: Option[MGOCommands] = None,
                         actionOptions: Option[Any] = None,
                         actionTargets: Seq[String] = Nil
                       ) {
    ctx =>
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)

    def setCollName(name: String): MGOContext = ctx.copy(collName = name)

    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)

    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))

    def toSomeProto = MGOProtoConversion.ctxToProto(this)

  }

  object MGOContext {
    def apply(db: String, coll: String) = new MGOContext(db, coll)
    def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
      MGOProtoConversion.ctxFromProto(proto)
  }

上面的代码里包括了toSomeProto, fromProto两个函数来实现MGOContext的序列化转换处理。这两个函数的实现包含在文章后面提供的源代码中。MongoDB的.proto文件idl定义如下:

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don‘t append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

package sdp.grpc.services;

import "misc/sdp.proto";

message ProtoMGOBson {
  bytes bson = 1;
}

message ProtoMGODocument {
  bytes document = 1;
}

message ProtoMGOResultOption { //FindObservable
   int32 optType = 1;
   ProtoMGOBson bsonParam = 2;
   int32 valueParam = 3;
}

message ProtoMGOAdmin{
  string tarName = 1;
  repeated ProtoMGOBson bsonParam  = 2;
  ProtoAny options = 3;
  string objName = 4;
}

message ProtoMGOContext {  //MGOContext
  string dbName = 1;
  string collName = 2;
  int32 commandType = 3;
  repeated ProtoMGOBson bsonParam = 4;
  repeated ProtoMGOResultOption resultOptions = 5;
  repeated string targets = 6;
  ProtoAny options = 7;
  repeated ProtoMGODocument documents = 8;
  google.protobuf.BoolValue only = 9;
  ProtoMGOAdmin adminOptions = 10;
}

下面是本次示范的.proto文件:

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don‘t append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

package sdp.grpc.services;

import "misc/sdp.proto";
import "cql/cql.proto";
import "jdbc/jdbc.proto";
import "mgo/mgo.proto";

service MGOServices {
  rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
  rpc runQueries(stream ProtoMGOContext) returns (stream ProtoMGODocument) {}
}

以上通过import "mgo/mgo.proto"引用了ProtoMGOContext类型,并在服务定义rpc runQueries里用作了传入参数。

下面这段是本次示范的服务实现代码:

package sdp.grpc.mongo.server

import sdp.mongo.engine._
import MGOClasses._
import MGOEngine._
import akka.NotUsed
import akka.stream.scaladsl.Flow
import sdp.logging.LogSupport
import sdp.grpc.services._
import org.mongodb.scala._
import scala.concurrent._
import akka.stream.ActorMaterializer
import sdp.mongo.engine.MGOProtoConversion.MGODocument

class MGOStreamingServices(implicit ec: ExecutionContextExecutor,
                            mat: ActorMaterializer, client: MongoClient)
  extends MgostreamingGrpcAkkaStream.MGOServices with LogSupport {
  override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
    Flow[HelloMsg]
      .map {r => HelloMsg(r.hello+", mongo ...")}
  }

  override def runQueries: Flow[ProtoMGOContext, ProtoMGODocument, NotUsed] =
    Flow[ProtoMGOContext]
    .flatMapConcat {p =>
      val ctx = MGOContext.fromProto(p)
      mongoStream(ctx).map{doc => MGODocument.toProto(doc)}
    }

}

这个runQueries服务函数的处理流程是:接收ProtoMGOContext、转换成MGOContext、传给mongoStream、运算mongoStream返回ProtoMGODocument结果。mongoStream函数的代码如下:

  def mongoStream(ctx: MGOContext)(
    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
    log.info(s"mongoStream>  MGOContext: ${ctx}")

    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))

    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    if ( ctx.action == None) {
      log.error(s"mongoStream> uery action cannot be null!")
      throw new IllegalArgumentException("query action cannot be null!")
    }
    try {
      ctx.action.get match {
        case Find(None, Nil, false) => //FindObservable
          MongoSource(coll.find())
        case Find(None, Nil, true) => //FindObservable
          MongoSource(coll.find().first())
        case Find(Some(filter), Nil, false) => //FindObservable
          MongoSource(coll.find(filter))
        case Find(Some(filter), Nil, true) => //FindObservable
          MongoSource(coll.find(filter).first())
        case Find(None, sro, _) => //FindObservable
          val next = toResultOption(sro)
          MongoSource(next(coll.find[Document]()))
        case Find(Some(filter), sro, _) => //FindObservable
          val next = toResultOption(sro)
          MongoSource(next(coll.find[Document](filter)))
        case _ =>
          log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
          throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")

      }
    }
    catch { case e: Exception =>
      log.error(s"mongoStream> runtime error: ${e.getMessage}")
      throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
    }

  }

调用服务的gRPC客户端实现代码如下:

class MGOStreamClient(host: String, port: Int)(
  implicit ec: ExecutionContextExecutor) extends LogSupport {

  val channel = ManagedChannelBuilder
    .forAddress(host, port)
    .usePlaintext()
    .build()

  val stub = MgostreamingGrpcAkkaStream.stub(channel)

  def echoHello: Source[HelloMsg, NotUsed] = {
    val row = HelloMsg("hello ")
    val rows = List.fill[HelloMsg](100)(row)
    Source
      .fromIterator(() => rows.iterator)
      .via(stub.clientStreaming)
  }

  val filter = and(equal("state","California"),
    equal("county","Alameda"),
    equal("value",10))

  val proj = exclude("rowid","_id")
  val proj1 = include("county","value")
  val rtxfmr = Seq(
    ResultOptions(
      optType = FOD_LIMIT,
      value = 3),
    ResultOptions(
      optType = FOD_PROJECTION,
      bson = Some(proj)) /*,
    ResultOptions(
      optType = FOD_PROJECTION,
      bson = Some(proj1)) */
  )

  val cmd = Find(filter = Some(filter), andThen = rtxfmr)

  val ctx = MGOContext("testdb","aqmrpt").setCommand(cmd)

  def mgoQueries: Source[ProtoMGODocument,NotUsed] = {
       Source
         .single[ProtoMGOContext](ctx.toSomeProto.get)
      .via(stub.runQueries)
  }
}

object MGOStreamingClient extends App {
  implicit val system = ActorSystem("EchoNumsClient")
  implicit val mat = ActorMaterializer.create(system)
  implicit val ec = system.dispatcher
  val mgoClient = new MGOStreamClient("localhost", 50051)

   mgoClient.mgoQueries.runForeach(pd =>
      println(MGODocument.fromProto(pd).toJson()))

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

调用服务函数运算结果:

{ "measureid" : { "$numberLong" : "83" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
{ "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
{ "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 2000, "value" : 10 }

下面是MongoDB序列化类型转换工具函数的源代码:

MGOProtoConversion.scala

package sdp.mongo.engine
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import sdp.grpc.services._
import protobuf.bytes.Converter._
import MGOClasses._
import MGOAdmins._
import MGOCommands._
import org.bson.BsonDocument
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.FindObservable

object MGOProtoConversion {

  type MGO_COMMAND_TYPE = Int
  val MGO_COMMAND_FIND            = 0
  val MGO_COMMAND_COUNT           = 20
  val MGO_COMMAND_DISTICT         = 21
  val MGO_COMMAND_DOCUMENTSTREAM  = 1
  val MGO_COMMAND_AGGREGATE       = 2
  val MGO_COMMAND_INSERT          = 3
  val MGO_COMMAND_DELETE          = 4
  val MGO_COMMAND_REPLACE         = 5
  val MGO_COMMAND_UPDATE          = 6

  val MGO_ADMIN_DROPCOLLECTION    = 8
  val MGO_ADMIN_CREATECOLLECTION  = 9
  val MGO_ADMIN_LISTCOLLECTION    = 10
  val MGO_ADMIN_CREATEVIEW        = 11
  val MGO_ADMIN_CREATEINDEX       = 12
  val MGO_ADMIN_DROPINDEXBYNAME   = 13
  val MGO_ADMIN_DROPINDEXBYKEY    = 14
  val MGO_ADMIN_DROPALLINDEXES    = 15

  case class AdminContext(
                           tarName: String = "",
                           bsonParam: Seq[Bson] = Nil,
                           options: Option[Any] = None,
                           objName: String = ""
                         ){
    def toProto = sdp.grpc.services.ProtoMGOAdmin(
      tarName = this.tarName,
      bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
      objName = this.objName,
      options = this.options.map(b => ProtoAny(marshal(b)))

    )
  }

  object AdminContext {
    def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
      tarName = msg.tarName,
      bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
      objName = msg.objName,
      options = msg.options.map(b => unmarshal[Any](b.value))
    )
  }

  case class Context(
                      dbName: String = "",
                      collName: String = "",
                      commandType: MGO_COMMAND_TYPE,
                      bsonParam: Seq[Bson] = Nil,
                      resultOptions: Seq[ResultOptions] = Nil,
                      options: Option[Any] = None,
                      documents: Seq[Document] = Nil,
                      targets: Seq[String] = Nil,
                      only: Boolean = false,
                      adminOptions: Option[AdminContext] = None
                    ){

    def toProto = new sdp.grpc.services.ProtoMGOContext(
      dbName = this.dbName,
      collName = this.collName,
      commandType = this.commandType,
      bsonParam = this.bsonParam.map(bsonToProto),
      resultOptions = this.resultOptions.map(_.toProto),
      options = { if(this.options == None)
        Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
      else
        Some(ProtoAny(marshal(this.options.get))) },
      documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
      targets = this.targets,
      only = Some(this.only),
      adminOptions = this.adminOptions.map(_.toProto)
    )

  }

  object MGODocument {
    def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
      unmarshal[Document](msg.document)
    def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
      new ProtoMGODocument(marshal(doc))
  }

  object MGOProtoMsg {
    def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
      dbName = msg.dbName,
      collName = msg.collName,
      commandType = msg.commandType,
      bsonParam = msg.bsonParam.map(protoToBson),
      resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
      options = msg.options.map(a => unmarshal[Any](a.value)),
      documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
      targets = msg.targets,
      adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
    )
  }

  def bsonToProto(bson: Bson) =
    ProtoMGOBson(marshal(bson.toBsonDocument(
      classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))

  def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
    val bsdoc = unmarshal[BsonDocument](proto.bson)
    override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
  }

  def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
    case MGO_COMMAND_FIND => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Find())
      )
      def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
        rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))

      (proto.bsonParam, proto.resultOptions, proto.only) match {
        case (Nil, Nil, None) => ctx
        case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
        case (bp,Nil,None) => ctx.setCommand(
          Find(filter = Some(protoToBson(bp.head))))
        case (bp,Nil,Some(b)) => ctx.setCommand(
          Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
        case (bp,fo,None) => {
          ctx.setCommand(
            Find(filter = Some(protoToBson(bp.head)),
              andThen = fo.map(ResultOptions.fromProto)
            ))
        }
        case (bp,fo,Some(b)) => {
          ctx.setCommand(
            Find(filter = Some(protoToBson(bp.head)),
              andThen = fo.map(ResultOptions.fromProto),
              firstOnly = b))
        }
        case _ => ctx
      }
    }
    case MGO_COMMAND_COUNT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Count())
      )
      (proto.bsonParam, proto.options) match {
        case (Nil, None) => ctx
        case (bp, None) => ctx.setCommand(
          Count(filter = Some(protoToBson(bp.head)))
        )
        case (Nil,Some(o)) => ctx.setCommand(
          Count(options = Some(unmarshal[Any](o.value)))
        )
        case _ => ctx
      }
    }
    case MGO_COMMAND_DISTICT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Distict(fieldName = proto.targets.head))
      )
      (proto.bsonParam) match {
        case Nil => ctx
        case bp: Seq[ProtoMGOBson] => ctx.setCommand(
          Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
        )
        case _ => ctx
      }
    }
    case MGO_COMMAND_AGGREGATE => {
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
      )
    }
    case MGO_ADMIN_LISTCOLLECTION => {
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_QUERY,
        action = Some(ListCollection(proto.dbName)))
    }
    case MGO_COMMAND_INSERT => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Insert(
          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(Insert(
          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_COMMAND_DELETE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Delete(
          filter = protoToBson(proto.bsonParam.head)))
      )
      (proto.options, proto.only) match {
        case (None,None) => ctx
        case (None,Some(b)) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          onlyOne = b))
        case (Some(o),None) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
        case (Some(o),Some(b)) => ctx.setCommand(Delete(
          filter = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)),
          onlyOne = b)
        )
      }
    }
    case MGO_COMMAND_REPLACE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Replace(
          filter = protoToBson(proto.bsonParam.head),
          replacement = unmarshal[Document](proto.documents.head.document)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(Replace(
          filter = protoToBson(proto.bsonParam.head),
          replacement = unmarshal[Document](proto.documents.head.document),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_COMMAND_UPDATE => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_UPDATE,
        action = Some(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head)))
      )
      (proto.options, proto.only) match {
        case (None,None) => ctx
        case (None,Some(b)) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          onlyOne = b))
        case (Some(o),None) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          options = Some(unmarshal[Any](o.value)))
        )
        case (Some(o),Some(b)) => ctx.setCommand(Update(
          filter = protoToBson(proto.bsonParam.head),
          update = protoToBson(proto.bsonParam.tail.head),
          options = Some(unmarshal[Any](o.value)),
          onlyOne = b)
        )
      }
    }
    case MGO_ADMIN_DROPCOLLECTION =>
      new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropCollection(proto.collName))
      )
    case MGO_ADMIN_CREATECOLLECTION => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateCollection(proto.collName))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_CREATEVIEW => {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateView(viewName = proto.targets.head,
          viewOn = proto.targets.tail.head,
          pipeline = proto.bsonParam.map(p => protoToBson(p))))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
          viewOn = proto.targets.tail.head,
          pipeline = proto.bsonParam.map(p => protoToBson(p)),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_CREATEINDEX=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPINDEXBYNAME=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropIndexByName(indexName = proto.targets.head))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPINDEXBYKEY=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }
    case MGO_ADMIN_DROPALLINDEXES=> {
      var ctx = new MGOContext(
        dbName = proto.dbName,
        collName = proto.collName,
        actionType = MGO_ADMIN,
        action = Some(DropAllIndexes())
      )
      proto.options match {
        case None => ctx
        case Some(o) => ctx.setCommand(DropAllIndexes(
          options = Some(unmarshal[Any](o.value)))
        )
      }
    }

  }

  def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
    case None => None
    case Some(act) => act match {
      case Count(filter, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_COUNT,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                        else Seq(bsonToProto(filter.get))},
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
                      else Some(ProtoAny(marshal(options.get))) }
      ))
      case Distict(fieldName, filter) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_DISTICT,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
                        else Seq(bsonToProto(filter.get))},
          targets = Seq(fieldName)

        ))

      case Find(filter, andThen, firstOnly) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_FIND,
          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
          else Seq(bsonToProto(filter.get))},
          resultOptions = andThen.map(_.toProto)
        ))

      case Aggregate(pipeLine) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_AGGREGATE,
          bsonParam = pipeLine.map(bsonToProto)
        ))

      case Insert(newdocs, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_INSERT,
          documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case Delete(filter, options, onlyOne) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_DELETE,
          bsonParam = Seq(bsonToProto(filter)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          only = Some(onlyOne)
        ))

      case Replace(filter, replacement, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_REPLACE,
          bsonParam = Seq(bsonToProto(filter)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          documents = Seq(ProtoMGODocument(marshal(replacement)))
        ))

      case Update(filter, update, options, onlyOne) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_COMMAND_UPDATE,
          bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          only = Some(onlyOne)
        ))

      case DropCollection(coll) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = coll,
          commandType = MGO_ADMIN_DROPCOLLECTION
        ))

      case CreateCollection(coll, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = coll,
          commandType = MGO_ADMIN_CREATECOLLECTION,
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case ListCollection(dbName) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          commandType = MGO_ADMIN_LISTCOLLECTION
        ))

      case CreateView(viewName, viewOn, pipeline, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_CREATEVIEW,
          bsonParam = pipeline.map(bsonToProto),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) },
          targets = Seq(viewName,viewOn)
        ))

      case CreateIndex(key, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_CREATEINDEX,
          bsonParam = Seq(bsonToProto(key)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case DropIndexByName(indexName, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPINDEXBYNAME,
          targets = Seq(indexName),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case DropIndexByKey(key, options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPINDEXBYKEY,
          bsonParam = Seq(bsonToProto(key)),
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

      case DropAllIndexes(options) =>
        Some(new sdp.grpc.services.ProtoMGOContext(
          dbName = ctx.dbName,
          collName = ctx.collName,
          commandType = MGO_ADMIN_DROPALLINDEXES,
          options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
          else Some(ProtoAny(marshal(options.get))) }
        ))

    }
  }

}

原文地址:https://www.cnblogs.com/tiger-xc/p/9536859.html

时间: 2024-10-13 05:11:16

PICE(5):MongoDBStreaming - gRPC -MGO Service的相关文章

PICE(6):集群环境里多异类端点gRPC Streaming - Heterogeneous multi-endpoints gRPC streaming

gRPC Streaming的操作对象由服务端和客户端组成.在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务.这时调用服务端又成为了提供服务端的客户端了(服务消费端).那么如果我们用streaming形式来提交服务需求及获取计算结果就是以一个服务端为Source另一个服务端为通过式passthrough Flow的stream运算了.讲详细点就是请求方用需求构建Source,以连接Flow的方式把需求传递给服务提供方.服务提供方在Flow内部对需求进行处理后再把

区块链在中国(1):IBM HyperLedger

在我看来,比特币就是现实中的V字仇杀队,当然现实是更残酷的世界政府,这场博弈关乎着人类文明.政治.社会属性.经济和人权. IBM HyperLeger 又叫 fabric,你可以把它想象成一个由全社会来共同维护的一个超级账本,没有中心机构拥揽权力,你的每一笔交易都是全网公开且安全的,信用由全社会共同见证.它与Bitcoin的关系就是,你可以利用fabric构建出一个叫Bitcoin的应用来帮助你change the world. 愿景是那么的牛X,貌似正合我们想改变世界的胃口,但是在残酷的现实和

ActiveReports 9实战教程(1): 手把手搭建好开发环境Visual Studio 2013 社区版

ActiveReports9刚刚公布3天.微软就公布了 Visual Studio Community 2013 开发环境. Visual Studio Community 2013 提供完整功能的 IDE ,可开发 Windows.Android 和 iOS 应用.支持:C++, Python, HTML5, JavaScript, 和 C#,VB, F# 语言的开发.提供设计器.编辑器.调试器和诊断工具. 最牛逼的在于你全然能够免费使用该工具: 能够正大光明的免费使用visual studi

深度学习方法(十):卷积神经网络结构变化——Maxout Networks,Network In Network,Global Average Pooling

技术交流QQ群:433250724,欢迎对算法.技术感兴趣的同学加入. 最近接下来几篇博文会回到神经网络结构的讨论上来,前面我在"深度学习方法(五):卷积神经网络CNN经典模型整理Lenet,Alexnet,Googlenet,VGG,Deep Residual Learning"一文中介绍了经典的CNN网络结构模型,这些可以说已经是家喻户晓的网络结构,在那一文结尾,我提到"是时候动一动卷积计算的形式了",原因是很多工作证明了,在基本的CNN卷积计算模式之外,很多简

EM算法(1):K-means 算法

目录 EM算法(1):K-means 算法 EM算法(2):GMM训练算法 EM算法(3):EM算法详解 EM算法(1) : K-means算法 1. 简介 K-means算法是一类无监督的聚类算法,目的是将没有标签的数据分成若干个类,每一个类都是由相似的数据组成.这个类的个数一般是认为给定的. 2. 原理 假设给定一个数据集$\mathbf{X} = \{\mathbf{x}_1, \mathbf{x}_2,...,\mathbf{x}_N \}$, 和类的个数K.我们的每个类都用一个中心点$

EM算法(3):EM算法详解

目录 EM算法(1):K-means 算法 EM算法(2):GMM训练算法 EM算法(3):EM算法详解

jQuery学习笔记(一):入门

jQuery学习笔记(一):入门 一.JQuery是什么 JQuery是什么?始终是萦绕在我心中的一个问题: 借鉴网上同学们的总结,可以从以下几个方面观察. 不使用JQuery时获取DOM文本的操作如下: 1 document.getElementById('info').value = 'Hello World!'; 使用JQuery时获取DOM文本操作如下: 1 $('#info').val('Hello World!'); 嗯,可以看出,使用JQuery的优势之一是可以使代码更加简练,使开

设计模式(九):外观模式

一.概述 外观模式提供了一个统一的接口,用来访问子系统中的一群接口.外观定义了一个高层接口,让子系统更容易使用. 二.解决问题 在上一讲中,我们学习了适配器模式,它是用来转换一个接口的,而外观模式可以理解为转换一群接口,客户只要调用一个接口,而不用调用多个接口就可以达到目的.想想现实生活中例子,我们在pc上安装软件的时候经常有默认安装或者是一键安装选项(省去选择安装目录.安装的组件等等),还有就是手机的重启功能(把关机和启动合为一个操作).总之,外观模式就是解决多个复杂接口带来的使用困难,起到简

我与小娜(05):变换时空,重返北京

我与小娜(05):变换时空,重返北京?       2月5日早上6点,我用手拍了一下“小口袋”,说:出来吧!小娜从袋子里面把紧闭的袋口打开,探出头来,看了我一眼,然后对着说:你是谁啊?       为什么小娜不认识我了?我是谁?……我也不记得自己是谁了?头脑里面一片空白,……突然之间,我明白了一个道理:物理因果关系是不能颠倒的,即便借助量子纠缠超距作用,也不能远距离传递传统信息,难怪小娜不认识我了,因为我的帐号没有传递给她.我自己也是头脑空白,一片无知,……       所幸的是,在小娜容身的“