我在前面提到过MongoDB不支持像SQL般字符式的操作指令,所以我们必须对所有的MongoDB操作指令建立protobuf类型才能支持MongoDB指令的序列化。在对上一篇博文里我们把MongoDB的消息指令序列化单独挑出来讨论了一番,在这篇我们准备在一个MongoDB scala开发环境里通过streaming运算来示范这些protobuf消息的应用。
与前面我们介绍过的JDBC-streaming和Cassandra-streaming对应操作指令的处理相同,MGO-streaming也是是通过一个Context对象来描述操作方式和内容细节的,MGOContext定义如下:
代码语言:javascript复制 case class MGOContext(
dbName: String,
collName: String,
actionType: MGO_ACTION_TYPE = MGO_QUERY,
action: Option[MGOCommands] = None,
actionOptions: Option[Any] = None,
actionTargets: Seq[String] = Nil
) {
ctx =>
def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
def setCollName(name: String): MGOContext = ctx.copy(collName = name)
def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
def toSomeProto = MGOProtoConversion.ctxToProto(this)
}
object MGOContext {
def apply(db: String, coll: String) = new MGOContext(db, coll)
def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
MGOProtoConversion.ctxFromProto(proto)
}
上面的代码里包括了toSomeProto, fromProto两个函数来实现MGOContext的序列化转换处理。这两个函数的实现包含在文章后面提供的源代码中。MongoDB的.proto文件idl定义如下:
代码语言:javascript复制syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";
option (scalapb.options) = {
// use a custom Scala package name
// package_name: "io.ontherocks.introgrpc.demo"
// don't append file name to package
flat_package: true
// generate one Scala file for all messages (services still get their own file)
single_file: true
// add imports to generated file
// useful when extending traits or using custom types
// import: "io.ontherocks.hellogrpc.RockingMessage"
// code to put at the top of generated file
// works only with `single_file: true`
//preamble: "sealed trait SomeSealedTrait"
};
/*
* Demoes various customization options provided by ScalaPBs.
*/
package sdp.grpc.services;
import "misc/sdp.proto";
message ProtoMGOBson {
bytes bson = 1;
}
message ProtoMGODocument {
bytes document = 1;
}
message ProtoMGOResultOption { //FindObservable
int32 optType = 1;
ProtoMGOBson bsonParam = 2;
int32 valueParam = 3;
}
message ProtoMGOAdmin{
string tarName = 1;
repeated ProtoMGOBson bsonParam = 2;
ProtoAny options = 3;
string objName = 4;
}
message ProtoMGOContext { //MGOContext
string dbName = 1;
string collName = 2;
int32 commandType = 3;
repeated ProtoMGOBson bsonParam = 4;
repeated ProtoMGOResultOption resultOptions = 5;
repeated string targets = 6;
ProtoAny options = 7;
repeated ProtoMGODocument documents = 8;
google.protobuf.BoolValue only = 9;
ProtoMGOAdmin adminOptions = 10;
}
下面是本次示范的.proto文件:
代码语言:javascript复制syntax = "proto3";
import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";
option (scalapb.options) = {
// use a custom Scala package name
// package_name: "io.ontherocks.introgrpc.demo"
// don't append file name to package
flat_package: true
// generate one Scala file for all messages (services still get their own file)
single_file: true
// add imports to generated file
// useful when extending traits or using custom types
// import: "io.ontherocks.hellogrpc.RockingMessage"
// code to put at the top of generated file
// works only with `single_file: true`
//preamble: "sealed trait SomeSealedTrait"
};
/*
* Demoes various customization options provided by ScalaPBs.
*/
package sdp.grpc.services;
import "misc/sdp.proto";
import "cql/cql.proto";
import "jdbc/jdbc.proto";
import "mgo/mgo.proto";
service MGOServices {
rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
rpc runQueries(stream ProtoMGOContext) returns (stream ProtoMGODocument) {}
}
以上通过import "mgo/mgo.proto"引用了ProtoMGOContext类型,并在服务定义rpc runQueries里用作了传入参数。
下面这段是本次示范的服务实现代码:
代码语言:javascript复制package sdp.grpc.mongo.server
import sdp.mongo.engine._
import MGOClasses._
import MGOEngine._
import akka.NotUsed
import akka.stream.scaladsl.Flow
import sdp.logging.LogSupport
import sdp.grpc.services._
import org.mongodb.scala._
import scala.concurrent._
import akka.stream.ActorMaterializer
import sdp.mongo.engine.MGOProtoConversion.MGODocument
class MGOStreamingServices(implicit ec: ExecutionContextExecutor,
mat: ActorMaterializer, client: MongoClient)
extends MgostreamingGrpcAkkaStream.MGOServices with LogSupport {
override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
Flow[HelloMsg]
.map {r => HelloMsg(r.hello ", mongo ...")}
}
override def runQueries: Flow[ProtoMGOContext, ProtoMGODocument, NotUsed] =
Flow[ProtoMGOContext]
.flatMapConcat {p =>
val ctx = MGOContext.fromProto(p)
mongoStream(ctx).map{doc => MGODocument.toProto(doc)}
}
}
这个runQueries服务函数的处理流程是:接收ProtoMGOContext、转换成MGOContext、传给mongoStream、运算mongoStream返回ProtoMGODocument结果。mongoStream函数的代码如下:
代码语言:javascript复制 def mongoStream(ctx: MGOContext)(
implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
log.info(s"mongoStream> MGOContext: ${ctx}")
def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
val db = client.getDatabase(ctx.dbName)
val coll = db.getCollection(ctx.collName)
if ( ctx.action == None) {
log.error(s"mongoStream> uery action cannot be null!")
throw new IllegalArgumentException("query action cannot be null!")
}
try {
ctx.action.get match {
case Find(None, Nil, false) => //FindObservable
MongoSource(coll.find())
case Find(None, Nil, true) => //FindObservable
MongoSource(coll.find().first())
case Find(Some(filter), Nil, false) => //FindObservable
MongoSource(coll.find(filter))
case Find(Some(filter), Nil, true) => //FindObservable
MongoSource(coll.find(filter).first())
case Find(None, sro, _) => //FindObservable
val next = toResultOption(sro)
MongoSource(next(coll.find[Document]()))
case Find(Some(filter), sro, _) => //FindObservable
val next = toResultOption(sro)
MongoSource(next(coll.find[Document](filter)))
case _ =>
log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
}
}
catch { case e: Exception =>
log.error(s"mongoStream> runtime error: ${e.getMessage}")
throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
}
}
调用服务的gRPC客户端实现代码如下:
代码语言:javascript复制class MGOStreamClient(host: String, port: Int)(
implicit ec: ExecutionContextExecutor) extends LogSupport {
val channel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build()
val stub = MgostreamingGrpcAkkaStream.stub(channel)
def echoHello: Source[HelloMsg, NotUsed] = {
val row = HelloMsg("hello ")
val rows = List.fill[HelloMsg](100)(row)
Source
.fromIterator(() => rows.iterator)
.via(stub.clientStreaming)
}
val filter = and(equal("state","California"),
equal("county","Alameda"),
equal("value",10))
val proj = exclude("rowid","_id")
val proj1 = include("county","value")
val rtxfmr = Seq(
ResultOptions(
optType = FOD_LIMIT,
value = 3),
ResultOptions(
optType = FOD_PROJECTION,
bson = Some(proj)) /*,
ResultOptions(
optType = FOD_PROJECTION,
bson = Some(proj1)) */
)
val cmd = Find(filter = Some(filter), andThen = rtxfmr)
val ctx = MGOContext("testdb","aqmrpt").setCommand(cmd)
def mgoQueries: Source[ProtoMGODocument,NotUsed] = {
Source
.single[ProtoMGOContext](ctx.toSomeProto.get)
.via(stub.runQueries)
}
}
object MGOStreamingClient extends App {
implicit val system = ActorSystem("EchoNumsClient")
implicit val mat = ActorMaterializer.create(system)
implicit val ec = system.dispatcher
val mgoClient = new MGOStreamClient("localhost", 50051)
mgoClient.mgoQueries.runForeach(pd =>
println(MGODocument.fromProto(pd).toJson()))
scala.io.StdIn.readLine()
mat.shutdown()
system.terminate()
}
调用服务函数运算结果:
代码语言:javascript复制{ "measureid" : { "$numberLong" : "83" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
{ "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 1999, "value" : 10 }
{ "measureid" : { "$numberLong" : "84" }, "state" : "California", "county" : "Alameda", "year" : 2000, "value" : 10 }
下面是MongoDB序列化类型转换工具函数的源代码:
MGOProtoConversion.scala
代码语言:javascript复制package sdp.mongo.engine
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import sdp.grpc.services._
import protobuf.bytes.Converter._
import MGOClasses._
import MGOAdmins._
import MGOCommands._
import org.bson.BsonDocument
import org.bson.codecs.configuration.CodecRegistry
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.FindObservable
object MGOProtoConversion {
type MGO_COMMAND_TYPE = Int
val MGO_COMMAND_FIND = 0
val MGO_COMMAND_COUNT = 20
val MGO_COMMAND_DISTICT = 21
val MGO_COMMAND_DOCUMENTSTREAM = 1
val MGO_COMMAND_AGGREGATE = 2
val MGO_COMMAND_INSERT = 3
val MGO_COMMAND_DELETE = 4
val MGO_COMMAND_REPLACE = 5
val MGO_COMMAND_UPDATE = 6
val MGO_ADMIN_DROPCOLLECTION = 8
val MGO_ADMIN_CREATECOLLECTION = 9
val MGO_ADMIN_LISTCOLLECTION = 10
val MGO_ADMIN_CREATEVIEW = 11
val MGO_ADMIN_CREATEINDEX = 12
val MGO_ADMIN_DROPINDEXBYNAME = 13
val MGO_ADMIN_DROPINDEXBYKEY = 14
val MGO_ADMIN_DROPALLINDEXES = 15
case class AdminContext(
tarName: String = "",
bsonParam: Seq[Bson] = Nil,
options: Option[Any] = None,
objName: String = ""
){
def toProto = sdp.grpc.services.ProtoMGOAdmin(
tarName = this.tarName,
bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
objName = this.objName,
options = this.options.map(b => ProtoAny(marshal(b)))
)
}
object AdminContext {
def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
tarName = msg.tarName,
bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
objName = msg.objName,
options = msg.options.map(b => unmarshal[Any](b.value))
)
}
case class Context(
dbName: String = "",
collName: String = "",
commandType: MGO_COMMAND_TYPE,
bsonParam: Seq[Bson] = Nil,
resultOptions: Seq[ResultOptions] = Nil,
options: Option[Any] = None,
documents: Seq[Document] = Nil,
targets: Seq[String] = Nil,
only: Boolean = false,
adminOptions: Option[AdminContext] = None
){
def toProto = new sdp.grpc.services.ProtoMGOContext(
dbName = this.dbName,
collName = this.collName,
commandType = this.commandType,
bsonParam = this.bsonParam.map(bsonToProto),
resultOptions = this.resultOptions.map(_.toProto),
options = { if(this.options == None)
Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else
Some(ProtoAny(marshal(this.options.get))) },
documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
targets = this.targets,
only = Some(this.only),
adminOptions = this.adminOptions.map(_.toProto)
)
}
object MGODocument {
def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
unmarshal[Document](msg.document)
def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
new ProtoMGODocument(marshal(doc))
}
object MGOProtoMsg {
def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
dbName = msg.dbName,
collName = msg.collName,
commandType = msg.commandType,
bsonParam = msg.bsonParam.map(protoToBson),
resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
options = msg.options.map(a => unmarshal[Any](a.value)),
documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
targets = msg.targets,
adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
)
}
def bsonToProto(bson: Bson) =
ProtoMGOBson(marshal(bson.toBsonDocument(
classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
val bsdoc = unmarshal[BsonDocument](proto.bson)
override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
}
def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
case MGO_COMMAND_FIND => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Find())
)
def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
(proto.bsonParam, proto.resultOptions, proto.only) match {
case (Nil, Nil, None) => ctx
case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
case (bp,Nil,None) => ctx.setCommand(
Find(filter = Some(protoToBson(bp.head))))
case (bp,Nil,Some(b)) => ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
case (bp,fo,None) => {
ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)),
andThen = fo.map(ResultOptions.fromProto)
))
}
case (bp,fo,Some(b)) => {
ctx.setCommand(
Find(filter = Some(protoToBson(bp.head)),
andThen = fo.map(ResultOptions.fromProto),
firstOnly = b))
}
case _ => ctx
}
}
case MGO_COMMAND_COUNT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Count())
)
(proto.bsonParam, proto.options) match {
case (Nil, None) => ctx
case (bp, None) => ctx.setCommand(
Count(filter = Some(protoToBson(bp.head)))
)
case (Nil,Some(o)) => ctx.setCommand(
Count(options = Some(unmarshal[Any](o.value)))
)
case _ => ctx
}
}
case MGO_COMMAND_DISTICT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Distict(fieldName = proto.targets.head))
)
(proto.bsonParam) match {
case Nil => ctx
case bp: Seq[ProtoMGOBson] => ctx.setCommand(
Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
)
case _ => ctx
}
}
case MGO_COMMAND_AGGREGATE => {
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
)
}
case MGO_ADMIN_LISTCOLLECTION => {
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_QUERY,
action = Some(ListCollection(proto.dbName)))
}
case MGO_COMMAND_INSERT => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Insert(
newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(Insert(
newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_COMMAND_DELETE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Delete(
filter = protoToBson(proto.bsonParam.head)))
)
(proto.options, proto.only) match {
case (None,None) => ctx
case (None,Some(b)) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
onlyOne = b))
case (Some(o),None) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
case (Some(o),Some(b)) => ctx.setCommand(Delete(
filter = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)),
onlyOne = b)
)
}
}
case MGO_COMMAND_REPLACE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Replace(
filter = protoToBson(proto.bsonParam.head),
replacement = unmarshal[Document](proto.documents.head.document)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(Replace(
filter = protoToBson(proto.bsonParam.head),
replacement = unmarshal[Document](proto.documents.head.document),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_COMMAND_UPDATE => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_UPDATE,
action = Some(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head)))
)
(proto.options, proto.only) match {
case (None,None) => ctx
case (None,Some(b)) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
onlyOne = b))
case (Some(o),None) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
options = Some(unmarshal[Any](o.value)))
)
case (Some(o),Some(b)) => ctx.setCommand(Update(
filter = protoToBson(proto.bsonParam.head),
update = protoToBson(proto.bsonParam.tail.head),
options = Some(unmarshal[Any](o.value)),
onlyOne = b)
)
}
}
case MGO_ADMIN_DROPCOLLECTION =>
new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropCollection(proto.collName))
)
case MGO_ADMIN_CREATECOLLECTION => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateCollection(proto.collName))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_CREATEVIEW => {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateView(viewName = proto.targets.head,
viewOn = proto.targets.tail.head,
pipeline = proto.bsonParam.map(p => protoToBson(p))))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
viewOn = proto.targets.tail.head,
pipeline = proto.bsonParam.map(p => protoToBson(p)),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_CREATEINDEX=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPINDEXBYNAME=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropIndexByName(indexName = proto.targets.head))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPINDEXBYKEY=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
options = Some(unmarshal[Any](o.value)))
)
}
}
case MGO_ADMIN_DROPALLINDEXES=> {
var ctx = new MGOContext(
dbName = proto.dbName,
collName = proto.collName,
actionType = MGO_ADMIN,
action = Some(DropAllIndexes())
)
proto.options match {
case None => ctx
case Some(o) => ctx.setCommand(DropAllIndexes(
options = Some(unmarshal[Any](o.value)))
)
}
}
}
def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
case None => None
case Some(act) => act match {
case Count(filter, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_COUNT,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case Distict(fieldName, filter) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_DISTICT,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
targets = Seq(fieldName)
))
case Find(filter, andThen, firstOnly) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_FIND,
bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
else Seq(bsonToProto(filter.get))},
resultOptions = andThen.map(_.toProto)
))
case Aggregate(pipeLine) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_AGGREGATE,
bsonParam = pipeLine.map(bsonToProto)
))
case Insert(newdocs, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_INSERT,
documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case Delete(filter, options, onlyOne) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_DELETE,
bsonParam = Seq(bsonToProto(filter)),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
only = Some(onlyOne)
))
case Replace(filter, replacement, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_REPLACE,
bsonParam = Seq(bsonToProto(filter)),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
documents = Seq(ProtoMGODocument(marshal(replacement)))
))
case Update(filter, update, options, onlyOne) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_COMMAND_UPDATE,
bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
only = Some(onlyOne)
))
case DropCollection(coll) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = coll,
commandType = MGO_ADMIN_DROPCOLLECTION
))
case CreateCollection(coll, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = coll,
commandType = MGO_ADMIN_CREATECOLLECTION,
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case ListCollection(dbName) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
commandType = MGO_ADMIN_LISTCOLLECTION
))
case CreateView(viewName, viewOn, pipeline, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_CREATEVIEW,
bsonParam = pipeline.map(bsonToProto),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) },
targets = Seq(viewName,viewOn)
))
case CreateIndex(key, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_CREATEINDEX,
bsonParam = Seq(bsonToProto(key)),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropIndexByName(indexName, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPINDEXBYNAME,
targets = Seq(indexName),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropIndexByKey(key, options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPINDEXBYKEY,
bsonParam = Seq(bsonToProto(key)),
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
case DropAllIndexes(options) =>
Some(new sdp.grpc.services.ProtoMGOContext(
dbName = ctx.dbName,
collName = ctx.collName,
commandType = MGO_ADMIN_DROPALLINDEXES,
options = { if(options == None) Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
else Some(ProtoAny(marshal(options.get))) }
))
}
}
}