博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Akka-Cluster(3)- ClusterClient, 集群客户端
阅读量:5304 次
发布时间:2019-06-14

本文共 86434 字,大约阅读时间需要 288 分钟。

  上篇我们介绍了distributed pub/sub消息传递机制。这是在同一个集群内的消息共享机制:发布者(publisher)和订阅者(subscriber)都在同一个集群的节点上,所有节点上的DistributedPubSubMediator通过集群内部的沟通机制在底层构建了消息流通渠道。在actor pub/sub层面可以实现对象位置透明化。在现实里很多前端都会作为某个集群的客户端但又与集群分离,又或者两个独立的集群之间可能会发生交互关系,这是也会出现客户端与服务端不在同一集群内的情况,ClusterClient就是为集群外部actor与集群内部actor进行沟通的解决方案。

实际上ClusterClient模式就代表一种依赖于消息发布订阅机制的服务方式:客户端通过消息来请求服务,服务端接收请求服务消息并提供相应运算服务。

我们可以把集群客户端模式分成集群客户端ClusterClient和集群服务端ClusterClientReceptionist,从字面理解这就是个接待员这么个角色,负责接待集群外客户端发起的服务请求。在集群所有节点上(或者选定角色role)都部署ClusterClientReceptionist,它们都与本节点的DistributedPubSubMediator对接组成更上一层的消息订阅方,ClusterClient与ClusterClientReceptionist的对接又组成了一种统一集群环境可以实现上集所讨论的distributed pub/sub机制。

ClusterClient就是消息发布方,它是在目标集群之外机器上的某个actor。这个机器上的actor如果需要向集群内部actor发送消息可以通过这个机器上的ClusterClient actor与集群内的ClusterClientReceptionist搭建的通道向集群内某个ClusterClientReceptionist连接的DistributedPubSubMediator所登记的actor进行消息发送。所以使用集群客户端的机器必须在本机启动ClusterClient服务(运行这个actor),这是通讯桥梁的一端。

ClusterClient在启动时用预先配置的地址(contact points)与ClusterClientReceptionist连接,然后通过ClusterClientReceptionist发布的联络点清单来维护内部的对接点清单,可以进行持久化,在发生系统重启时用这个名单来与集群连接。一旦连接,ClusterClient会监控对方运行情况,自动进行具体ClusterClientReceiptionist的替换。ClusterClient发布消息是包嵌在三种结构里的:

1、ClusterClient.Send

2、ClusterClient.SendAll

3、ClusterClient.Publish

这几种方法我们在上篇已经讨论过,这里就略去。

ClusterClientReceiptionist是集群内的消息接收接口。集群内需要接收消息的actor必须在本地的DistributedPubSubMediator上注册自己的地址,ClusterClientReceptionist由此获得集群内所有服务项目actor的地址清单。通过ClusterClient发布的消息内指定接收方类型信息来确定最终接收消息并提供服务的actor。服务注册示范如下:

//注册服务A  val serviceA = system.actorOf(Props[Service], "serviceA")  ClusterClientReceptionist(system).registerService(serviceA)//注册服务B  val serviceB = system.actorOf(Props[Service], "serviceB")  ClusterClientReceptionist(system).registerService(serviceB)

ClusterClient调用服务示范:

val client = system.actorOf(ClusterClient.props(  ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")  client ! ClusterClient.Send("/user/serviceA", DoThis, localAffinity = true)  client ! ClusterClient.SendToAll("/user/serviceB", DoThat)

注意:ClusterClientReceptionist需要接收DoThis,DoThat消息并实现相关的运算。

在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。从发布消息的actor角度看,sender()代表的是DeadLetter。如果服务actor需要知道请求者具体地址,发布方可以把自己的地址嵌在发布的消息结构里。

下面我们就通过一个简单的例子来进行示范。先设计两个服务actor:Cat,Dog 。假设它们会提供不同的叫声作为服务吧:

class Cat extends Actor with ActorLogging {  //使用pub/sub方式设置  val mediator = DistributedPubSub(context.system).mediator  override def preStart() = {    mediator ! Subscribe("Shout", self)    super.preStart()  }  override def receive: Receive = {    case "Shout" =>      log.info("*******I am a cat, MIAOM ...******")  }}class Dog extends Actor with ActorLogging {  //使用pub/sub方式设置  val mediator = DistributedPubSub(context.system).mediator  override def preStart() = {    mediator ! Subscribe("Shout", self)    super.preStart()  }  override def receive: Receive = {    case "Shout" =>      log.info("*****I am a dog, WANG WANG...*****")  }}

我们看到,这就是两个很普通的actor。但我们还是可以和上一篇分布式pub/sub结合起来验证cluster-client是基于distributed-pub/sub的。然后我们分别把这两个actor(服务)放到不同的集群节点上:

object Cat {  def props = Props[Cat]  def create(port: Int): ActorSystem  = {    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")      .withFallback(ConfigFactory.load())    val system = ActorSystem("ClusterSystem",config)    val catSound = system.actorOf(props,"CatSound")    ClusterClientReceptionist(system).registerService(catSound)    system  }}object Dog {  def props = Props(new Dog)  def create(port: Int): ActorSystem = {    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")      .withFallback(ConfigFactory.load())    val system = ActorSystem("ClusterSystem",config)    val dogSound = system.actorOf(props,"DogSound")    ClusterClientReceptionist(system).registerService(dogSound)    system  }}

注意:集群名称是ClusterSystem。我们分别在actor所在节点用ClusterClientReceptionist.registerService登记了服务。这个集群所使用的conf如下:

akka.actor.warn-about-java-serializer-usage = offakka.log-dead-letters-during-shutdown = offakka.log-dead-letters = offakka {  loglevel = INFO  extensions = ["akka.cluster.client.ClusterClientReceptionist"]  actor {    provider = "cluster"    serializers {      java = "akka.serialization.JavaSerializer"      proto = "akka.remote.serialization.ProtobufSerializer"    }    serialization-bindings {      "java.lang.String" = java      "scalapb.GeneratedMessage" = proto    }  }  remote {    log-remote-lifecycle-events = off    netty.tcp {      hostname = "127.0.0.1"      port = 0    }  }  cluster {    seed-nodes = [      "akka.tcp://ClusterSystem@127.0.0.1:2551"]    log-info = off  }}

这是一个比较完整的集群配置文档,只有port需要再配置。然后运行这两个节点:

object PetHouse extends App {  val sysCat = Cat.create(2551)  val sysDog = Dog.create(2552)  scala.io.StdIn.readLine()  sysCat.terminate()  sysDog.terminate()}

完成了在2551,2552节点上的Cat,Dog actor构建及ClusterClientReceptionist.registerService服务登记。现在看看客户端:

object PetClient extends App {  val conf = ConfigFactory.load("client")  val clientSystem = ActorSystem("ClientSystem",conf)/* 从 conf 文件里读取 contact-points 地址  val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {    case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist"  }.toSet*/  //先放一个contact-point, 系统会自动增加其它的点  val initialContacts = Set(    ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")  )  val clusterClient = clientSystem.actorOf(    ClusterClient.props(      ClusterClientSettings(clientSystem)        .withInitialContacts(initialContacts)),    "petClient")  clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)  clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)  println(s"sent shout messages ...")  scala.io.StdIn.readLine()  clusterClient ! Publish("Shout","Shout")  println(s"publish shout messages ...")  scala.io.StdIn.readLine()  clientSystem.terminate();}

客户端的ActorSystem名称为ClientSystem,是在ClusterSystem集群之外的。conf文件如下:

akka {  actor.provider = remote  remote.netty.tcp.port= 2553  remote.netty.tcp.hostname=127.0.0.1}contact-points = [  "akka.tcp://ClusterSystem@127.0.0.1:2551",  "akka.tcp://ClusterSystem@127.0.0.1:2552"]

把它设成actor.provider=remote可以免去提供seednodes。运算结果:

[12/08/2018 09:32:51.432] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******[INFO] [12/08/2018 09:32:51.435] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******[INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****

无论ClusterClient或Receptionist都会针对自己的状态发送消息。我们可以截取这些消息来做些相应的工作。参考下面的截听器示范代码: 

package petsoundimport akka.actor._import akka.cluster.client._class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {  override def preStart(): Unit = {    clusterClient ! SubscribeContactPoints    super.preStart()  }  override def receive: Receive = {    case ContactPoints(cps) =>      cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}    case ContactPointAdded(cp) =>      log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")    case ContactPointRemoved(cp) =>      log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")  }}class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {  override def preStart(): Unit = {    receptionist ! SubscribeClusterClients    super.preStart()  }  override def receive: Receive = {    case ClusterClients(cs) =>      cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}    case ClusterClientUp(cc) =>      log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")    case ClusterClientUnreachable(cc) =>      log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")  }}

这两个event-listener的安装方法如下:

val receptionist = ClusterClientReceptionist(system).underlying    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")    val receptionist = ClusterClientReceptionist(system).underlying    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")  val clusterClient = clientSystem.actorOf(    ClusterClient.props(      ClusterClientSettings(clientSystem)        .withInitialContacts(initialContacts)),    "petClient")  clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")

看看运算结果:

[INFO] [12/09/2018 09:42:40.838] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPoints:akka.tcp://ClusterSystem@127.0.0.1:2551******[INFO] [12/09/2018 09:42:40.947] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPointAdded: akka.tcp://ClusterSystem@127.0.0.1:2552*******[INFO] [12/09/2018 09:42:40.967] [ClientSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClientSystem@127.0.0.1:2553/user/petClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist][INFO] [12/09/2018 09:42:40.979] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUp: akka.tcp://ClientSystem@127.0.0.1:2553*******[INFO] [12/09/2018 09:54:34.363] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUnreachable: akka.tcp://ClientSystem@127.0.0.1:2553*******

下面我们再做个示范,还是与上篇讨论一样:由集群客户端发送MongoDB指令至某个在集群里用ClusterClientReceptionist注册的MongoDB操作服务actor。服务方接收指令后在MongoDB上进行运算。下面是MongoDB的服务actor: 

package petsoundimport akka.actor._import com.typesafe.config._import akka.actor.ActorSystemimport org.mongodb.scala._import sdp.grpc.services.ProtoMGOContextimport sdp.mongo.engine.MGOClasses._import sdp.mongo.engine.MGOEngine._import sdp.result.DBOResult._import akka.cluster.client._import scala.collection.JavaConverters._import scala.util._class MongoAdder extends Actor with ActorLogging {  import monix.execution.Scheduler.Implicits.global  implicit val mgosys = context.system  implicit val ec = mgosys.dispatcher  val clientSettings: MongoClientSettings = MongoClientSettings.builder()    .applyToClusterSettings {b =>      b.hosts(List(new ServerAddress("localhost:27017")).asJava)    }.build()  implicit val client: MongoClient = MongoClient(clientSettings)  val ctx = MGOContext("testdb","friends")  override def receive: Receive = {    case someProto @ Some(proto:ProtoMGOContext) =>      val ctx = MGOContext.fromProto(proto)      log.info(s"****** received MGOContext: $someProto *********")      val task = mgoUpdate[Completed](ctx).toTask      task.runOnComplete {        case Success(s) => println("operations completed successfully.")        case Failure(exception) => println(s"error: ${exception.getMessage}")      }  }}object MongoAdder {  def create(port: Int): ActorSystem = {    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")      .withFallback(ConfigFactory.load())    val system = ActorSystem("ClusterSystem", config)    val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")    ClusterClientReceptionist(system).registerService(mongoAdder)    val receptionist = ClusterClientReceptionist(system).underlying    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")    system  }}

MongoAdder处于同一个集群ClusterSystem中。代码里已经包括了服务注册部分。客户端发送MongoDB指令的示范如下:

//MongoDB 操作示范  import org.mongodb.scala._  import sdp.mongo.engine.MGOClasses._  val ctx = MGOContext("testdb","friends")  val chen = Document("姓" -> "陈", "名" -> "大文","age" -> 28)  val zhang = Document("姓" -> "张", "名" -> "小海","age" -> 7)  val lee = Document("姓" -> "李", "名" -> "四","age" -> 45)  val ouyang = Document("姓" -> "欧阳", "名" -> "锋","age" -> 120)  val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))  clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)

由于MongoDB指令是通过protobuffer方式进行序列化的,所以需要修改client.conf通知akka使用protobuf格式的消息:

akka {  actor {    provider = remote    serializers {      java = "akka.serialization.JavaSerializer"      proto = "akka.remote.serialization.ProtobufSerializer"    }    serialization-bindings {      "java.lang.String" = java      "scalapb.GeneratedMessage" = proto    }  }  remote.netty.tcp.port= 2553  remote.netty.tcp.hostname=127.0.0.1}contact-points = [  "akka.tcp://ClusterSystem@127.0.0.1:2551",  "akka.tcp://ClusterSystem@127.0.0.1:2552"]

下面是本次讨论完整示范源代码:

build.sbt

import scalapb.compiler.Version.scalapbVersionimport scalapb.compiler.Version.grpcJavaVersionname := "akka-cluster-client"version := "0.1"scalaVersion := "2.12.7"scalacOptions += "-Ypartial-unification"libraryDependencies := Seq(  "com.typesafe.akka" %% "akka-actor" % "2.5.17",  "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.17",  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",  // "io.grpc" % "grpc-netty" % grpcJavaVersion,  "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,  "io.monix" %% "monix" % "2.3.0",  //for mongodb 4.0  "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0",  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20",  //other dependencies  "co.fs2" %% "fs2-core" % "0.9.7",  "ch.qos.logback"  %  "logback-classic"   % "1.2.3",  "org.typelevel" %% "cats-core" % "0.9.0",  "io.monix" %% "monix-execution" % "3.0.0-RC1",  "io.monix" %% "monix-eval" % "3.0.0-RC1")PB.targets in Compile := Seq(  scalapb.gen() -> (sourceManaged in Compile).value)

project/scalapb.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")libraryDependencies ++= Seq(  "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4")

resouces/application.conf

akka.actor.warn-about-java-serializer-usage = offakka.log-dead-letters-during-shutdown = offakka.log-dead-letters = offakka {  loglevel = INFO  extensions = ["akka.cluster.client.ClusterClientReceptionist"]  actor {    provider = "cluster"    serializers {      java = "akka.serialization.JavaSerializer"      proto = "akka.remote.serialization.ProtobufSerializer"    }    serialization-bindings {      "java.lang.String" = java      "scalapb.GeneratedMessage" = proto    }  }  remote {    log-remote-lifecycle-events = off    netty.tcp {      hostname = "127.0.0.1"      port = 0    }  }  cluster {    seed-nodes = [      "akka.tcp://ClusterSystem@127.0.0.1:2551"]    log-info = off  }}

resources/client.conf

akka {  actor {    provider = remote    serializers {      java = "akka.serialization.JavaSerializer"      proto = "akka.remote.serialization.ProtobufSerializer"    }    serialization-bindings {      "java.lang.String" = java      "scalapb.GeneratedMessage" = proto    }  }  remote.netty.tcp.port= 2553  remote.netty.tcp.hostname=127.0.0.1}contact-points = [  "akka.tcp://ClusterSystem@127.0.0.1:2551",  "akka.tcp://ClusterSystem@127.0.0.1:2552"]

protobuf/spd.proto

syntax = "proto3";import "google/protobuf/wrappers.proto";import "google/protobuf/any.proto";import "scalapb/scalapb.proto";option (scalapb.options) = {  // use a custom Scala package name  // package_name: "io.ontherocks.introgrpc.demo"  // don't append file name to package  flat_package: true  // generate one Scala file for all messages (services still get their own file)  single_file: true  // add imports to generated file  // useful when extending traits or using custom types  // import: "io.ontherocks.hellogrpc.RockingMessage"  // code to put at the top of generated file  // works only with `single_file: true`  //preamble: "sealed trait SomeSealedTrait"};package sdp.grpc.services;message ProtoDate {  int32 yyyy = 1;  int32 mm   = 2;  int32 dd   = 3;}message ProtoTime {  int32 hh   = 1;  int32 mm   = 2;  int32 ss   = 3;  int32 nnn  = 4;}message ProtoDateTime {   ProtoDate date = 1;   ProtoTime time = 2;}message ProtoAny {  bytes value = 1;}

protobuf/mgo.proto

syntax = "proto3";import "google/protobuf/wrappers.proto";import "google/protobuf/any.proto";import "scalapb/scalapb.proto";option (scalapb.options) = {  // use a custom Scala package name  // package_name: "io.ontherocks.introgrpc.demo"  // don't append file name to package  flat_package: true  // generate one Scala file for all messages (services still get their own file)  single_file: true  // add imports to generated file  // useful when extending traits or using custom types  // import: "io.ontherocks.hellogrpc.RockingMessage"  // code to put at the top of generated file  // works only with `single_file: true`  //preamble: "sealed trait SomeSealedTrait"};/* * Demoes various customization options provided by ScalaPBs. */package sdp.grpc.services;import "sdp.proto";message ProtoMGOBson {  bytes bson = 1;}message ProtoMGODocument {  bytes document = 1;}message ProtoMGOResultOption { //FindObservable   int32 optType = 1;   ProtoMGOBson bsonParam = 2;   int32 valueParam = 3;}message ProtoMGOAdmin{  string tarName = 1;  repeated ProtoMGOBson bsonParam  = 2;  ProtoAny options = 3;  string objName = 4;}message ProtoMGOContext {  //MGOContext  string dbName = 1;  string collName = 2;  int32 commandType = 3;  repeated ProtoMGOBson bsonParam = 4;  repeated ProtoMGOResultOption resultOptions = 5;  repeated string targets = 6;  ProtoAny options = 7;  repeated ProtoMGODocument documents = 8;  google.protobuf.BoolValue only = 9;  ProtoMGOAdmin adminOptions = 10;}

converters/ByteConverter.scala

package protobuf.bytesimport java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}import com.google.protobuf.ByteStringobject Converter {  def marshal(value: Any): ByteString = {    val stream: ByteArrayOutputStream = new ByteArrayOutputStream()    val oos = new ObjectOutputStream(stream)    oos.writeObject(value)    oos.close()    ByteString.copyFrom(stream.toByteArray())  }  def unmarshal[A](bytes: ByteString): A = {    val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))    val value = ois.readObject()    ois.close()    value.asInstanceOf[A]  }}

converters/DBOResultType.scala

package sdp.resultimport cats._import cats.data.EitherTimport cats.data.OptionTimport monix.eval.Taskimport cats.implicits._import scala.concurrent._import scala.collection.TraversableOnceobject DBOResult {  type DBOError[A] = EitherT[Task,Throwable,A]  type DBOResult[A] = OptionT[DBOError,A]  implicit def valueToDBOResult[A](a: A): DBOResult[A] =         Applicative[DBOResult].pure(a)  implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =         OptionT((o: Option[A]).pure[DBOError])  implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = { //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))         OptionT.liftF(EitherT.fromEither[Task](e))  }  implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {       val task = Task.fromFuture[A](fut)       val et = EitherT.liftF[Task,Throwable,A](task)       OptionT.liftF(et)  }  implicit class DBOResultToTask[A](r: DBOResult[A]) {    def toTask = r.value.value  }  implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {    def someValue: Option[A] = r match {      case Left(err) => (None: Option[A])      case Right(oa) => oa    }  }  def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =    if (coll.isEmpty)      optionToDBOResult(None: Option[C[A]])    else      optionToDBOResult(Some(coll): Option[C[A]])}

filestream/FileStreaming.scala

package sdp.fileimport java.io.{ByteArrayInputStream, InputStream}import java.nio.ByteBufferimport java.nio.file.Pathsimport akka.stream.Materializerimport akka.stream.scaladsl.{FileIO, StreamConverters}import akka.util._import scala.concurrent.Awaitimport scala.concurrent.duration._object Streaming {  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer):ByteBuffer = {    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>      hd ++ bs    }    (Await.result(fut, timeOut)).toByteBuffer  }  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer): Array[Byte] = {    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>      hd ++ bs    }    (Await.result(fut, timeOut)).toArray  }  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer): InputStream = {    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>      hd ++ bs    }    val buf = (Await.result(fut, timeOut)).toArray    new ByteArrayInputStream(buf)  }  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(    implicit mat: Materializer) = {    val ba = new Array[Byte](byteBuf.remaining())    byteBuf.get(ba,0,ba.length)    val baInput = new ByteArrayInputStream(ba)    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))    source.runWith(FileIO.toPath(Paths.get(fileName)))  }  def ByteArrayToFile(bytes: Array[Byte], fileName: String)(    implicit mat: Materializer) = {    val bb = ByteBuffer.wrap(bytes)    val baInput = new ByteArrayInputStream(bytes)    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))    source.runWith(FileIO.toPath(Paths.get(fileName)))  }  def InputStreamToFile(is: InputStream, fileName: String)(    implicit mat: Materializer) = {    val source = StreamConverters.fromInputStream(() => is)    source.runWith(FileIO.toPath(Paths.get(fileName)))  }}

logging/Log.scala

package sdp.loggingimport org.slf4j.Logger/**  * Logger which just wraps org.slf4j.Logger internally.  *  * @param logger logger  */class Log(logger: Logger) {  // use var consciously to enable squeezing later  var isDebugEnabled: Boolean = logger.isDebugEnabled  var isInfoEnabled: Boolean = logger.isInfoEnabled  var isWarnEnabled: Boolean = logger.isWarnEnabled  var isErrorEnabled: Boolean = logger.isErrorEnabled  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {    level match {      case 'debug | 'DEBUG => debug(msg)      case 'info | 'INFO => info(msg)      case 'warn | 'WARN => warn(msg)      case 'error | 'ERROR => error(msg)      case _ => // nothing to do    }  }  def debug(msg: => String): Unit = {    if (isDebugEnabled && logger.isDebugEnabled) {      logger.debug(msg)    }  }  def debug(msg: => String, e: Throwable): Unit = {    if (isDebugEnabled && logger.isDebugEnabled) {      logger.debug(msg, e)    }  }  def info(msg: => String): Unit = {    if (isInfoEnabled && logger.isInfoEnabled) {      logger.info(msg)    }  }  def info(msg: => String, e: Throwable): Unit = {    if (isInfoEnabled && logger.isInfoEnabled) {      logger.info(msg, e)    }  }  def warn(msg: => String): Unit = {    if (isWarnEnabled && logger.isWarnEnabled) {      logger.warn(msg)    }  }  def warn(msg: => String, e: Throwable): Unit = {    if (isWarnEnabled && logger.isWarnEnabled) {      logger.warn(msg, e)    }  }  def error(msg: => String): Unit = {    if (isErrorEnabled && logger.isErrorEnabled) {      logger.error(msg)    }  }  def error(msg: => String, e: Throwable): Unit = {    if (isErrorEnabled && logger.isErrorEnabled) {      logger.error(msg, e)    }  }}

logging/LogSupport.scala

package sdp.loggingimport org.slf4j.LoggerFactorytrait LogSupport {  /**    * Logger    */  protected val log = new Log(LoggerFactory.getLogger(this.getClass))}

mgo/engine/MGOProtoConversion.scala

package sdp.mongo.engineimport org.mongodb.scala.bson.collection.immutable.Documentimport org.bson.conversions.Bsonimport sdp.grpc.services._import protobuf.bytes.Converter._import MGOClasses._import MGOAdmins._import MGOCommands._import org.bson.BsonDocumentimport org.bson.codecs.configuration.CodecRegistryimport org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRYimport org.mongodb.scala.FindObservableobject MGOProtoConversion {  type MGO_COMMAND_TYPE = Int  val MGO_COMMAND_FIND            = 0  val MGO_COMMAND_COUNT           = 20  val MGO_COMMAND_DISTICT         = 21  val MGO_COMMAND_DOCUMENTSTREAM  = 1  val MGO_COMMAND_AGGREGATE       = 2  val MGO_COMMAND_INSERT          = 3  val MGO_COMMAND_DELETE          = 4  val MGO_COMMAND_REPLACE         = 5  val MGO_COMMAND_UPDATE          = 6  val MGO_ADMIN_DROPCOLLECTION    = 8  val MGO_ADMIN_CREATECOLLECTION  = 9  val MGO_ADMIN_LISTCOLLECTION    = 10  val MGO_ADMIN_CREATEVIEW        = 11  val MGO_ADMIN_CREATEINDEX       = 12  val MGO_ADMIN_DROPINDEXBYNAME   = 13  val MGO_ADMIN_DROPINDEXBYKEY    = 14  val MGO_ADMIN_DROPALLINDEXES    = 15  case class AdminContext(                           tarName: String = "",                           bsonParam: Seq[Bson] = Nil,                           options: Option[Any] = None,                           objName: String = ""                         ){    def toProto = sdp.grpc.services.ProtoMGOAdmin(      tarName = this.tarName,      bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},      objName = this.objName,      options = this.options.map(b => ProtoAny(marshal(b)))    )  }  object AdminContext {    def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(      tarName = msg.tarName,      bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),      objName = msg.objName,      options = msg.options.map(b => unmarshal[Any](b.value))    )  }  case class Context(                      dbName: String = "",                      collName: String = "",                      commandType: MGO_COMMAND_TYPE,                      bsonParam: Seq[Bson] = Nil,                      resultOptions: Seq[ResultOptions] = Nil,                      options: Option[Any] = None,                      documents: Seq[Document] = Nil,                      targets: Seq[String] = Nil,                      only: Boolean = false,                      adminOptions: Option[AdminContext] = None                    ){    def toProto = new sdp.grpc.services.ProtoMGOContext(      dbName = this.dbName,      collName = this.collName,      commandType = this.commandType,      bsonParam = this.bsonParam.map(bsonToProto),      resultOptions = this.resultOptions.map(_.toProto),      options = { if(this.options == None)        None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))      else        Some(ProtoAny(marshal(this.options.get))) },      documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),      targets = this.targets,      only = Some(this.only),      adminOptions = this.adminOptions.map(_.toProto)    )  }  object MGODocument {    def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =      unmarshal[Document](msg.document)    def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =      new ProtoMGODocument(marshal(doc))  }  object MGOProtoMsg {    def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(      dbName = msg.dbName,      collName = msg.collName,      commandType = msg.commandType,      bsonParam = msg.bsonParam.map(protoToBson),      resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),      options = msg.options.map(a => unmarshal[Any](a.value)),      documents = msg.documents.map(doc => unmarshal[Document](doc.document)),      targets = msg.targets,      adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))    )  }  def bsonToProto(bson: Bson) =    ProtoMGOBson(marshal(bson.toBsonDocument(      classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))  def protoToBson(proto: ProtoMGOBson): Bson = new Bson {    val bsdoc = unmarshal[BsonDocument](proto.bson)    override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc  }  def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {    case MGO_COMMAND_FIND => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_QUERY,        action = Some(Find())      )      def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>        rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))      (proto.bsonParam, proto.resultOptions, proto.only) match {        case (Nil, Nil, None) => ctx        case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))        case (bp,Nil,None) => ctx.setCommand(          Find(filter = Some(protoToBson(bp.head))))        case (bp,Nil,Some(b)) => ctx.setCommand(          Find(filter = Some(protoToBson(bp.head)), firstOnly = b))        case (bp,fo,None) => {          ctx.setCommand(            Find(filter = Some(protoToBson(bp.head)),              andThen = fo.map(ResultOptions.fromProto)            ))        }        case (bp,fo,Some(b)) => {          ctx.setCommand(            Find(filter = Some(protoToBson(bp.head)),              andThen = fo.map(ResultOptions.fromProto),              firstOnly = b))        }        case _ => ctx      }    }    case MGO_COMMAND_COUNT => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_QUERY,        action = Some(Count())      )      (proto.bsonParam, proto.options) match {        case (Nil, None) => ctx        case (bp, None) => ctx.setCommand(          Count(filter = Some(protoToBson(bp.head)))        )        case (Nil,Some(o)) => ctx.setCommand(          Count(options = Some(unmarshal[Any](o.value)))        )        case _ => ctx      }    }    case MGO_COMMAND_DISTICT => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_QUERY,        action = Some(Distict(fieldName = proto.targets.head))      )      (proto.bsonParam) match {        case Nil => ctx        case bp: Seq[ProtoMGOBson] => ctx.setCommand(          Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))        )        case _ => ctx      }    }    case MGO_COMMAND_AGGREGATE => {      new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_QUERY,        action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))      )    }    case MGO_ADMIN_LISTCOLLECTION => {      new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_QUERY,        action = Some(ListCollection(proto.dbName)))    }    case MGO_COMMAND_INSERT => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_UPDATE,        action = Some(Insert(          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(Insert(          newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),          options = Some(unmarshal[Any](o.value)))        )      }    }    case MGO_COMMAND_DELETE => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_UPDATE,        action = Some(Delete(          filter = protoToBson(proto.bsonParam.head)))      )      (proto.options, proto.only) match {        case (None,None) => ctx        case (None,Some(b)) => ctx.setCommand(Delete(          filter = protoToBson(proto.bsonParam.head),          onlyOne = b))        case (Some(o),None) => ctx.setCommand(Delete(          filter = protoToBson(proto.bsonParam.head),          options = Some(unmarshal[Any](o.value)))        )        case (Some(o),Some(b)) => ctx.setCommand(Delete(          filter = protoToBson(proto.bsonParam.head),          options = Some(unmarshal[Any](o.value)),          onlyOne = b)        )      }    }    case MGO_COMMAND_REPLACE => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_UPDATE,        action = Some(Replace(          filter = protoToBson(proto.bsonParam.head),          replacement = unmarshal[Document](proto.documents.head.document)))      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(Replace(          filter = protoToBson(proto.bsonParam.head),          replacement = unmarshal[Document](proto.documents.head.document),          options = Some(unmarshal[Any](o.value)))        )      }    }    case MGO_COMMAND_UPDATE => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_UPDATE,        action = Some(Update(          filter = protoToBson(proto.bsonParam.head),          update = protoToBson(proto.bsonParam.tail.head)))      )      (proto.options, proto.only) match {        case (None,None) => ctx        case (None,Some(b)) => ctx.setCommand(Update(          filter = protoToBson(proto.bsonParam.head),          update = protoToBson(proto.bsonParam.tail.head),          onlyOne = b))        case (Some(o),None) => ctx.setCommand(Update(          filter = protoToBson(proto.bsonParam.head),          update = protoToBson(proto.bsonParam.tail.head),          options = Some(unmarshal[Any](o.value)))        )        case (Some(o),Some(b)) => ctx.setCommand(Update(          filter = protoToBson(proto.bsonParam.head),          update = protoToBson(proto.bsonParam.tail.head),          options = Some(unmarshal[Any](o.value)),          onlyOne = b)        )      }    }    case MGO_ADMIN_DROPCOLLECTION =>      new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_ADMIN,        action = Some(DropCollection(proto.collName))      )    case MGO_ADMIN_CREATECOLLECTION => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_ADMIN,        action = Some(CreateCollection(proto.collName))      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(CreateCollection(proto.collName,          options = Some(unmarshal[Any](o.value)))        )      }    }    case MGO_ADMIN_CREATEVIEW => {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_ADMIN,        action = Some(CreateView(viewName = proto.targets.head,          viewOn = proto.targets.tail.head,          pipeline = proto.bsonParam.map(p => protoToBson(p))))      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,          viewOn = proto.targets.tail.head,          pipeline = proto.bsonParam.map(p => protoToBson(p)),          options = Some(unmarshal[Any](o.value)))        )      }    }    case MGO_ADMIN_CREATEINDEX=> {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_ADMIN,        action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),          options = Some(unmarshal[Any](o.value)))        )      }    }    case MGO_ADMIN_DROPINDEXBYNAME=> {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_ADMIN,        action = Some(DropIndexByName(indexName = proto.targets.head))      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,          options = Some(unmarshal[Any](o.value)))        )      }    }    case MGO_ADMIN_DROPINDEXBYKEY=> {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_ADMIN,        action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),          options = Some(unmarshal[Any](o.value)))        )      }    }    case MGO_ADMIN_DROPALLINDEXES=> {      var ctx = new MGOContext(        dbName = proto.dbName,        collName = proto.collName,        actionType = MGO_ADMIN,        action = Some(DropAllIndexes())      )      proto.options match {        case None => ctx        case Some(o) => ctx.setCommand(DropAllIndexes(          options = Some(unmarshal[Any](o.value)))        )      }    }  }  def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {    case None => None    case Some(act) => act match {      case Count(filter, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_COUNT,          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]                        else Seq(bsonToProto(filter.get))},          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))                      else Some(ProtoAny(marshal(options.get))) }      ))      case Distict(fieldName, filter) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_DISTICT,          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]                        else Seq(bsonToProto(filter.get))},          targets = Seq(fieldName)        ))      case Find(filter, andThen, firstOnly) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_FIND,          bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]          else Seq(bsonToProto(filter.get))},          resultOptions = andThen.map(_.toProto)        ))      case Aggregate(pipeLine) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_AGGREGATE,          bsonParam = pipeLine.map(bsonToProto)        ))      case Insert(newdocs, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_INSERT,          documents = newdocs.map(d => ProtoMGODocument(marshal(d))),          options = { if(options == None) None      //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) }        ))      case Delete(filter, options, onlyOne) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_DELETE,          bsonParam = Seq(bsonToProto(filter)),          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) },          only = Some(onlyOne)        ))      case Replace(filter, replacement, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_REPLACE,          bsonParam = Seq(bsonToProto(filter)),          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) },          documents = Seq(ProtoMGODocument(marshal(replacement)))        ))      case Update(filter, update, options, onlyOne) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_COMMAND_UPDATE,          bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) },          only = Some(onlyOne)        ))      case DropCollection(coll) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = coll,          commandType = MGO_ADMIN_DROPCOLLECTION        ))      case CreateCollection(coll, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = coll,          commandType = MGO_ADMIN_CREATECOLLECTION,          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) }        ))      case ListCollection(dbName) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          commandType = MGO_ADMIN_LISTCOLLECTION        ))      case CreateView(viewName, viewOn, pipeline, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_ADMIN_CREATEVIEW,          bsonParam = pipeline.map(bsonToProto),          options = { if(options == None) None  //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) },          targets = Seq(viewName,viewOn)        ))      case CreateIndex(key, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_ADMIN_CREATEINDEX,          bsonParam = Seq(bsonToProto(key)),          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) }        ))      case DropIndexByName(indexName, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_ADMIN_DROPINDEXBYNAME,          targets = Seq(indexName),          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) }        ))      case DropIndexByKey(key, options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_ADMIN_DROPINDEXBYKEY,          bsonParam = Seq(bsonToProto(key)),          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) }        ))      case DropAllIndexes(options) =>        Some(new sdp.grpc.services.ProtoMGOContext(          dbName = ctx.dbName,          collName = ctx.collName,          commandType = MGO_ADMIN_DROPALLINDEXES,          options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))          else Some(ProtoAny(marshal(options.get))) }        ))    }  }}

mgo/engine/MongoDBEngine.scala

package sdp.mongo.engineimport java.text.SimpleDateFormatimport java.util.Calendarimport akka.NotUsedimport akka.stream.Materializerimport akka.stream.alpakka.mongodb.scaladsl._import akka.stream.scaladsl.{Flow, Source}import org.bson.conversions.Bsonimport org.mongodb.scala.bson.collection.immutable.Documentimport org.mongodb.scala.bson.{BsonArray, BsonBinary}import org.mongodb.scala.model._import org.mongodb.scala.{MongoClient, _}import protobuf.bytes.Converter._import sdp.file.Streaming._import sdp.logging.LogSupportimport scala.collection.JavaConverters._import scala.concurrent._import scala.concurrent.duration._object MGOClasses {  type MGO_ACTION_TYPE = Int  val MGO_QUERY        = 0  val MGO_UPDATE       = 1  val MGO_ADMIN        = 2  /*  org.mongodb.scala.FindObservable    import com.mongodb.async.client.FindIterable    val resultDocType = FindIterable[Document]    val resultOption = FindObservable(resultDocType)      .maxScan(...)    .limit(...)    .sort(...)    .project(...) */  type FOD_TYPE       = Int  val FOD_FIRST       = 0  //def first(): SingleObservable[TResult], return the first item  val FOD_FILTER      = 1  //def filter(filter: Bson): FindObservable[TResult]  val FOD_LIMIT       = 2  //def limit(limit: Int): FindObservable[TResult]  val FOD_SKIP        = 3  //def skip(skip: Int): FindObservable[TResult]  val FOD_PROJECTION  = 4  //def projection(projection: Bson): FindObservable[TResult]  //Sets a document describing the fields to return for all matching documents  val FOD_SORT        = 5  //def sort(sort: Bson): FindObservable[TResult]  val FOD_PARTIAL     = 6  //def partial(partial: Boolean): FindObservable[TResult]  //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)  val FOD_CURSORTYPE  = 7  //def cursorType(cursorType: CursorType): FindObservable[TResult]  //Sets the cursor type  val FOD_HINT        = 8  //def hint(hint: Bson): FindObservable[TResult]  //Sets the hint for which index to use. A null value means no hint is set  val FOD_MAX         = 9  //def max(max: Bson): FindObservable[TResult]  //Sets the exclusive upper bound for a specific index. A null value means no max is set  val FOD_MIN         = 10 //def min(min: Bson): FindObservable[TResult]  //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set  val FOD_RETURNKEY   = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]  //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents  val FOD_SHOWRECORDID=12  //def showRecordId(showRecordId: Boolean): FindObservable[TResult]  //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents  case class ResultOptions(                            optType: FOD_TYPE,                            bson: Option[Bson] = None,                            value: Int = 0 ){    def toProto = new sdp.grpc.services.ProtoMGOResultOption(      optType = this.optType,      bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},      valueParam = this.value    )    def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {      optType match {        case  FOD_FIRST        => find        case  FOD_FILTER       => find.filter(bson.get)        case  FOD_LIMIT        => find.limit(value)        case  FOD_SKIP         => find.skip(value)        case  FOD_PROJECTION   => find.projection(bson.get)        case  FOD_SORT         => find.sort(bson.get)        case  FOD_PARTIAL      => find.partial(value != 0)        case  FOD_CURSORTYPE   => find        case  FOD_HINT         => find.hint(bson.get)        case  FOD_MAX          => find.max(bson.get)        case  FOD_MIN          => find.min(bson.get)        case  FOD_RETURNKEY    => find.returnKey(value != 0)        case  FOD_SHOWRECORDID => find.showRecordId(value != 0)      }    }  }  object ResultOptions {    def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(      optType = msg.optType,      bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),      value = msg.valueParam    )  }  trait MGOCommands  object MGOCommands {    case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands    case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands    /*  org.mongodb.scala.FindObservable    import com.mongodb.async.client.FindIterable    val resultDocType = FindIterable[Document]    val resultOption = FindObservable(resultDocType)      .maxScan(...)    .limit(...)    .sort(...)    .project(...) */    case class Find(filter: Option[Bson] = None,                       andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],                       firstOnly: Boolean = false) extends MGOCommands    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands  }  object MGOAdmins {    case class DropCollection(collName: String) extends MGOCommands    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands    case class ListCollection(dbName: String) extends MGOCommands    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands  }  case class MGOContext(                         dbName: String,                         collName: String,                         actionType: MGO_ACTION_TYPE = MGO_QUERY,                         action: Option[MGOCommands] = None,                         actionOptions: Option[Any] = None,                         actionTargets: Seq[String] = Nil                       ) {    ctx =>    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)    def setCollName(name: String): MGOContext = ctx.copy(collName = name)    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))    def toSomeProto = MGOProtoConversion.ctxToProto(this)  }  object MGOContext {    def apply(db: String, coll: String) = new MGOContext(db, coll)    def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =      MGOProtoConversion.ctxFromProto(proto)  }  case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {    ctxs =>    def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)    def appendContext(ctx: MGOContext): MGOBatContext =      ctxs.copy(contexts = contexts :+ ctx)  }  object MGOBatContext {    def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)  }  type MGODate = java.util.Date  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {    val ca = Calendar.getInstance()    ca.set(yyyy,mm,dd)    ca.getTime()  }  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {    val ca = Calendar.getInstance()    ca.set(yyyy,mm,dd,hr,min,sec)    ca.getTime()  }  def mgoDateTimeNow: MGODate = {    val ca = Calendar.getInstance()    ca.getTime  }  def mgoDateToString(dt: MGODate, formatString: String): String = {    val fmt= new SimpleDateFormat(formatString)    fmt.format(dt)  }  type MGOBlob = BsonBinary  type MGOArray = BsonArray  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer) = FileToByteArray(fileName,timeOut)  def mgoBlobToFile(blob: MGOBlob, fileName: String)(    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)  def mgoGetStringOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getString(fieldName))    else None  }  def mgoGetIntOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getInteger(fieldName))    else None  }  def mgoGetLonggOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getLong(fieldName))    else None  }  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getDouble(fieldName))    else None  }  def mgoGetBoolOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getBoolean(fieldName))    else None  }  def mgoGetDateOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getDate(fieldName))    else None  }  def mgoGetBlobOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      doc.get(fieldName).asInstanceOf[Option[MGOBlob]]    else None  }  def mgoGetArrayOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      doc.get(fieldName).asInstanceOf[Option[MGOArray]]    else None  }  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {    (arr.getValues.asScala.toList)      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]  }  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]}object MGOEngine extends LogSupport {  import MGOClasses._  import MGOAdmins._  import MGOCommands._  import sdp.result.DBOResult._  object TxUpdateMode {    private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(              implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {      log.info(s"mgoTxUpdate> calling ...")      observable.map(clientSession => {        val transactionOptions =          TransactionOptions.builder()            .readConcern(ReadConcern.SNAPSHOT)            .writeConcern(WriteConcern.MAJORITY).build()        clientSession.startTransaction(transactionOptions)/*        val fut = Future.traverse(ctxs.contexts) { ctx =>          mgoUpdateObservable[Completed](ctx).map(identity).toFuture()        }        Await.ready(fut, 3 seconds) */        ctxs.contexts.foreach { ctx =>          mgoUpdateObservable[Completed](ctx).map(identity).toFuture()        }        clientSession      })    }    private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {      log.info(s"commitAndRetry> calling ...")      observable.recoverWith({        case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {          log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")          commitAndRetry(observable)        }        case e: Exception => {          log.error(s"commitAndRetry> Exception during commit ...: $e")          throw e        }      })    }    private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {      log.info(s"runTransactionAndRetry> calling ...")      observable.recoverWith({        case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {          log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")          runTransactionAndRetry(observable)        }      })    }    def mgoTxBatch(ctxs: MGOBatContext)(            implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {      log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")      val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())      val commitTransactionObservable: SingleObservable[Completed] =        updateObservable.flatMap(clientSession => clientSession.commitTransaction())      val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)      runTransactionAndRetry(commitAndRetryObservable)      valueToDBOResult(Completed())    }  }  def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {    log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")    if (ctxs.tx) {        TxUpdateMode.mgoTxBatch(ctxs)      } else {/*        val fut = Future.traverse(ctxs.contexts) { ctx =>          mgoUpdate[Completed](ctx).map(identity) }        Await.ready(fut, 3 seconds)        Future.successful(new Completed) */        ctxs.contexts.foreach { ctx =>          mgoUpdate[Completed](ctx).map(identity) }         valueToDBOResult(Completed())      }  }  def mongoStream(ctx: MGOContext)(    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {    log.info(s"mongoStream>  MGOContext: ${ctx}")    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mongoStream> uery action cannot be null!")      throw new IllegalArgumentException("query action cannot be null!")    }    try {      ctx.action.get match {        case Find(None, Nil, false) => //FindObservable          MongoSource(coll.find())        case Find(None, Nil, true) => //FindObservable          MongoSource(coll.find().first())        case Find(Some(filter), Nil, false) => //FindObservable          MongoSource(coll.find(filter))        case Find(Some(filter), Nil, true) => //FindObservable          MongoSource(coll.find(filter).first())        case Find(None, sro, _) => //FindObservable          val next = toResultOption(sro)          MongoSource(next(coll.find[Document]()))        case Find(Some(filter), sro, _) => //FindObservable          val next = toResultOption(sro)          MongoSource(next(coll.find[Document](filter)))        case _ =>          log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")          throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")      }    }    catch { case e: Exception =>      log.error(s"mongoStream> runtime error: ${e.getMessage}")      throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")    }  }  // T => FindIterable  e.g List[Document]  def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {    log.info(s"mgoQuery>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))    if ( ctx.action == None) {      log.error(s"mgoQuery> uery action cannot be null!")      Left(new IllegalArgumentException("query action cannot be null!"))    }    try {      ctx.action.get match {        /* count */        case Count(Some(filter), Some(opt)) => //SingleObservable          coll.countDocuments(filter, opt.asInstanceOf[CountOptions])            .toFuture().asInstanceOf[Future[T]]        case Count(Some(filter), None) => //SingleObservable          coll.countDocuments(filter).toFuture()            .asInstanceOf[Future[T]]        case Count(None, None) => //SingleObservable          coll.countDocuments().toFuture()            .asInstanceOf[Future[T]]        /* distinct */        case Distict(field, Some(filter)) => //DistinctObservable          coll.distinct(field, filter).toFuture()            .asInstanceOf[Future[T]]        case Distict(field, None) => //DistinctObservable          coll.distinct((field)).toFuture()            .asInstanceOf[Future[T]]        /* find */        case Find(None, Nil, false) => //FindObservable          if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]          else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]        case Find(None, Nil, true) => //FindObservable          if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]          else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]        case Find(Some(filter), Nil, false) => //FindObservable          if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]          else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]        case Find(Some(filter), Nil, true) => //FindObservable          if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]          else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]        case Find(None, sro, _) => //FindObservable          val next = toResultOption(sro)          if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]          else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]        case Find(Some(filter), sro, _) => //FindObservable          val next = toResultOption(sro)          if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]          else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]        /* aggregate AggregateObservable*/        case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]        /* mapReduce MapReduceObservable*/        case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]        /* list collection */        case ListCollection(dbName) => //ListConllectionObservable          client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]      }    }    catch { case e: Exception =>      log.error(s"mgoQuery> runtime error: ${e.getMessage}")      Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))    }  }  //T => Completed, result.UpdateResult, result.DeleteResult  def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =    try {      mgoUpdateObservable[T](ctx).toFuture()    }    catch { case e: Exception =>      log.error(s"mgoUpdate> runtime error: ${e.getMessage}")      Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))    }  def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {    log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mgoUpdateObservable> uery action cannot be null!")      throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")    }    try {      ctx.action.get match {        /* insert */        case Insert(docs, Some(opt)) => //SingleObservable[Completed]          if (docs.size > 1)            coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]          else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]        case Insert(docs, None) => //SingleObservable          if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]          else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]        /* delete */        case Delete(filter, None, onlyOne) => //SingleObservable          if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]          else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]        case Delete(filter, Some(opt), onlyOne) => //SingleObservable          if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]          else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]        /* replace */        case Replace(filter, replacement, None) => //SingleObservable          coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]        case Replace(filter, replacement, Some(opt)) => //SingleObservable          coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]        /* update */        case Update(filter, update, None, onlyOne) => //SingleObservable          if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]          else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]        case Update(filter, update, Some(opt), onlyOne) => //SingleObservable          if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]          else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]        /* bulkWrite */        case BulkWrite(commands, None) => //SingleObservable          coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]        case BulkWrite(commands, Some(opt)) => //SingleObservable          coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]      }    }    catch { case e: Exception =>      log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")      throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")    }  }  def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {    log.info(s"mgoAdmin>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mgoAdmin> uery action cannot be null!")      Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))    }    try {      ctx.action.get match {        /* drop collection */        case DropCollection(collName) => //SingleObservable          val coll = db.getCollection(collName)          coll.drop().toFuture()        /* create collection */        case CreateCollection(collName, None) => //SingleObservable          db.createCollection(collName).toFuture()        case CreateCollection(collName, Some(opt)) => //SingleObservable          db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()        /* list collection      case ListCollection(dbName) =>   //ListConllectionObservable        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]        */        /* create view */        case CreateView(viewName, viewOn, pline, None) => //SingleObservable          db.createView(viewName, viewOn, pline).toFuture()        case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable          db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()        /* create index */        case CreateIndex(key, None) => //SingleObservable          coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] //   asInstanceOf[SingleObservable[Completed]]        case CreateIndex(key, Some(opt)) => //SingleObservable          coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]        /* drop index */        case DropIndexByName(indexName, None) => //SingleObservable          coll.dropIndex(indexName).toFuture()        case DropIndexByName(indexName, Some(opt)) => //SingleObservable          coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()        case DropIndexByKey(key, None) => //SingleObservable          coll.dropIndex(key).toFuture()        case DropIndexByKey(key, Some(opt)) => //SingleObservable          coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()        case DropAllIndexes(None) => //SingleObservable          coll.dropIndexes().toFuture()        case DropAllIndexes(Some(opt)) => //SingleObservable          coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()      }    }    catch { case e: Exception =>      log.error(s"mgoAdmin> runtime error: ${e.getMessage}")      throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")    }  }/*    def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    ctx.action match {      /* count */      case Count(Some(filter), Some(opt)) =>   //SingleObservable        coll.countDocuments(filter, opt.asInstanceOf[CountOptions])          .toFuture().asInstanceOf[Future[T]]      case Count(Some(filter), None) =>        //SingleObservable        coll.countDocuments(filter).toFuture()          .asInstanceOf[Future[T]]      case Count(None, None) =>                //SingleObservable        coll.countDocuments().toFuture()          .asInstanceOf[Future[T]]      /* distinct */      case Distict(field, Some(filter)) =>     //DistinctObservable        coll.distinct(field, filter).toFuture()          .asInstanceOf[Future[T]]      case Distict(field, None) =>             //DistinctObservable        coll.distinct((field)).toFuture()          .asInstanceOf[Future[T]]      /* find */      case Find(None, None, optConv, false) =>  //FindObservable        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]      case Find(None, None, optConv, true) =>   //FindObservable        if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]        else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]      case Find(Some(filter), None, optConv, false) =>   //FindObservable        if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]        else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]      case Find(Some(filter), None, optConv, true) =>   //FindObservable        if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]        else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]      case Find(None, Some(next), optConv, _) =>   //FindObservable        if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]        else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]      case Find(Some(filter), Some(next), optConv, _) =>  //FindObservable        if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]        else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]      /* aggregate AggregateObservable*/      case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]      /* mapReduce MapReduceObservable*/      case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]      /* insert */      case Insert(docs, Some(opt)) =>                  //SingleObservable[Completed]        if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()          .asInstanceOf[Future[T]]        else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()          .asInstanceOf[Future[T]]      case Insert(docs, None) =>                       //SingleObservable        if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]      /* delete */      case Delete(filter, None, onlyOne) =>            //SingleObservable        if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]        else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]      case Delete(filter, Some(opt), onlyOne) =>       //SingleObservable        if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]        else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]      /* replace */      case Replace(filter, replacement, None) =>        //SingleObservable        coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]      case Replace(filter, replacement, Some(opt)) =>    //SingleObservable        coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]      /* update */      case Update(filter, update, None, onlyOne) =>      //SingleObservable        if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]        else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]      case Update(filter, update, Some(opt), onlyOne) => //SingleObservable        if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]        else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]      /* bulkWrite */      case BulkWrite(commands, None) =>                  //SingleObservable        coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]      case BulkWrite(commands, Some(opt)) =>             //SingleObservable        coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]      /* drop collection */      case DropCollection(collName) =>                   //SingleObservable        val coll = db.getCollection(collName)        coll.drop().toFuture().asInstanceOf[Future[T]]      /* create collection */      case CreateCollection(collName, None) =>           //SingleObservable        db.createCollection(collName).toFuture().asInstanceOf[Future[T]]      case CreateCollection(collName, Some(opt)) =>      //SingleObservable        db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]      /* list collection */      case ListCollection(dbName) =>   //ListConllectionObservable        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]      /* create view */      case CreateView(viewName, viewOn, pline, None) =>       //SingleObservable        db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]      case CreateView(viewName, viewOn, pline, Some(opt)) =>  //SingleObservable        db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]      /* create index */      case CreateIndex(key, None) =>                     //SingleObservable        coll.createIndex(key).toFuture().asInstanceOf[Future[T]]      case CreateIndex(key, Some(opt)) =>                //SingleObservable        coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]      /* drop index */      case DropIndexByName(indexName, None) =>           //SingleObservable        coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]      case DropIndexByName(indexName, Some(opt)) =>      //SingleObservable        coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]      case DropIndexByKey(key, None) =>                  //SingleObservable        coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]      case DropIndexByKey(key, Some(opt)) =>             //SingleObservable        coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]      case DropAllIndexes(None) =>                       //SingleObservable        coll.dropIndexes().toFuture().asInstanceOf[Future[T]]      case DropAllIndexes(Some(opt)) =>                  //SingleObservable        coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]    }  }*/}object MongoActionStream {  import MGOClasses._  case class StreamingInsert[A](dbName: String,                                collName: String,                                converter: A => Document,                                parallelism: Int = 1                               ) extends MGOCommands  case class StreamingDelete[A](dbName: String,                                collName: String,                                toFilter: A => Bson,                                parallelism: Int = 1,                                justOne: Boolean = false                               ) extends MGOCommands  case class StreamingUpdate[A](dbName: String,                                collName: String,                                toFilter: A => Bson,                                toUpdate: A => Bson,                                parallelism: Int = 1,                                justOne: Boolean = false                               ) extends MGOCommands  case class InsertAction[A](ctx: StreamingInsert[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =      Flow[A].map(ctx.converter)        .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))  }  case class UpdateAction[A](ctx: StreamingUpdate[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =      if (ctx.justOne) {        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))      } else        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))  }  case class DeleteAction[A](ctx: StreamingDelete[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =      if (ctx.justOne) {        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))      } else        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))  }}object MGOHelpers {  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {    override val converter: (Document) => String = (doc) => doc.toJson  }  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {    override val converter: (C) => String = (doc) => doc.toString  }  trait ImplicitObservable[C] {    val observable: Observable[C]    val converter: (C) => String    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)    def headResult() = Await.result(observable.head(), 10 seconds)    def printResults(initial: String = ""): Unit = {      if (initial.length > 0) print(initial)      results().foreach(res => println(converter(res)))    }    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")  }  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {    Await.result(fut, timeOut)  }  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {    Await.result(fut, timeOut)  }  import monix.eval.Task  import monix.execution.Scheduler.Implicits.global  final class FutureToTask[A](x: => Future[A]) {    def asTask: Task[A] = Task.deferFuture[A](x)  }  final class TaskToFuture[A](x: => Task[A]) {    def asFuture: Future[A] = x.runAsync  }}

PetSound.scala

package petsoundimport akka.actor._import akka.cluster.client._import com.typesafe.config.ConfigFactoryimport akka.cluster.pubsub.DistributedPubSubMediator._import akka.cluster.pubsub._object Cat {  def props = Props[Cat]  def create(port: Int): ActorSystem  = {    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")      .withFallback(ConfigFactory.load())    val system = ActorSystem("ClusterSystem",config)    val catSound = system.actorOf(props,"CatSound")    ClusterClientReceptionist(system).registerService(catSound)    val receptionist = ClusterClientReceptionist(system).underlying    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")    system  }}object Dog {  def props = Props(new Dog)  def create(port: Int): ActorSystem = {    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")      .withFallback(ConfigFactory.load())    val system = ActorSystem("ClusterSystem",config)    val dogSound = system.actorOf(props,"DogSound")    ClusterClientReceptionist(system).registerService(dogSound)    val receptionist = ClusterClientReceptionist(system).underlying    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")    system  }}class Cat extends Actor with ActorLogging {  //使用pub/sub方式设置  val mediator = DistributedPubSub(context.system).mediator  override def preStart() = {    mediator ! Subscribe("Shout", self)    super.preStart()  }  override def receive: Receive = {    case "Shout" =>      log.info("*******I am a cat, MIAOM ...******")  }}class Dog extends Actor with ActorLogging {  //使用pub/sub方式设置  val mediator = DistributedPubSub(context.system).mediator  override def preStart() = {    mediator ! Subscribe("Shout", self)    super.preStart()  }  override def receive: Receive = {    case "Shout" =>      log.info("*****I am a dog, WANG WANG...*****")  }}

EventListener.scala

package petsoundimport akka.actor._import akka.cluster.client._class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {  override def preStart(): Unit = {    clusterClient ! SubscribeContactPoints    super.preStart()  }  override def receive: Receive = {    case ContactPoints(cps) =>      cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}    case ContactPointAdded(cp) =>      log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")    case ContactPointRemoved(cp) =>      log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")  }}class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {  override def preStart(): Unit = {    receptionist ! SubscribeClusterClients    super.preStart()  }  override def receive: Receive = {    case ClusterClients(cs) =>      cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}    case ClusterClientUp(cc) =>      log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")    case ClusterClientUnreachable(cc) =>      log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")  }}

MongoAdder.scala

package petsoundimport akka.actor._import com.typesafe.config._import akka.actor.ActorSystemimport org.mongodb.scala._import sdp.grpc.services.ProtoMGOContextimport sdp.mongo.engine.MGOClasses._import sdp.mongo.engine.MGOEngine._import sdp.result.DBOResult._import akka.cluster.client._import scala.collection.JavaConverters._import scala.util._class MongoAdder extends Actor with ActorLogging {  import monix.execution.Scheduler.Implicits.global  implicit val mgosys = context.system  implicit val ec = mgosys.dispatcher  val clientSettings: MongoClientSettings = MongoClientSettings.builder()    .applyToClusterSettings {b =>      b.hosts(List(new ServerAddress("localhost:27017")).asJava)    }.build()  implicit val client: MongoClient = MongoClient(clientSettings)  val ctx = MGOContext("testdb","friends")  override def receive: Receive = {    case someProto @ Some(proto:ProtoMGOContext) =>      val ctx = MGOContext.fromProto(proto)      log.info(s"****** received MGOContext: $someProto *********")      val task = mgoUpdate[Completed](ctx).toTask      task.runOnComplete {        case Success(s) => println("operations completed successfully.")        case Failure(exception) => println(s"error: ${exception.getMessage}")      }  }}object MongoAdder {  def create(port: Int): ActorSystem = {    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")      .withFallback(ConfigFactory.load())    val system = ActorSystem("ClusterSystem", config)    val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")    ClusterClientReceptionist(system).registerService(mongoAdder)    val receptionist = ClusterClientReceptionist(system).underlying    system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")    system  }}

PetHouse.scala

package petsoundimport akka.actor._import akka.japi.Util.immutableSeqimport akka.actor.AddressFromURIStringimport com.typesafe.config.ConfigFactoryimport akka.cluster.client._import akka.cluster.client.ClusterClient._object PetHouse extends App {  val sysCat = Cat.create(2551)  val sysDog = Dog.create(2552)  val mongo = MongoAdder.create(2555)  scala.io.StdIn.readLine()  sysCat.terminate()  sysDog.terminate()  mongo.terminate()}object PetClient extends App {  val conf = ConfigFactory.load("client")  val clientSystem = ActorSystem("ClientSystem",conf)/* 从 conf 文件里读取 contact-points 地址  val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {    case AddressFromURIString(addr) ⇒ RootActorPath(addr) / "system" / "receptionist"  }.toSet*/  //先放一个contact-point, 系统会自动增加其它的点  val initialContacts = Set(    ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")  )  val clusterClient = clientSystem.actorOf(    ClusterClient.props(      ClusterClientSettings(clientSystem)        .withInitialContacts(initialContacts)), "petClient")  clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")  clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)  clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)  println(s"sent shout messages ...")  scala.io.StdIn.readLine()  clusterClient ! Publish("Shout","Shout")  println(s"publish shout messages ...")  //MongoDB 操作示范  import org.mongodb.scala._  import sdp.mongo.engine.MGOClasses._  val ctx = MGOContext("testdb","friends")  val chen = Document("姓" -> "陈", "名" -> "大文","age" -> 28)  val zhang = Document("姓" -> "张", "名" -> "小海","age" -> 7)  val lee = Document("姓" -> "李", "名" -> "四","age" -> 45)  val ouyang = Document("姓" -> "欧阳", "名" -> "锋","age" -> 120)  val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))  clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)  scala.io.StdIn.readLine()  clientSystem.terminate()}

 

转载于:https://www.cnblogs.com/tiger-xc/p/10094659.html

你可能感兴趣的文章
Activity的启动模式
查看>>
on duplicate key update
查看>>
CSS设置字体为楷体
查看>>
nginx配置反向代理解决前后端分离跨域问题
查看>>
SP2 网络故障解决
查看>>
VC中pragma指令简介(转)
查看>>
用 Jenkins + .netcore 2.0 构建
查看>>
POJ 2546
查看>>
NAT的三种方式
查看>>
cacti 主机/网络设备流量监控 图解
查看>>
根据id获取某一类的最大最小值
查看>>
[SDOI2014]数数 --- AC自动机 + 数位DP
查看>>
使用floodfill()函数颜色填充一个联通的区域
查看>>
spark记录
查看>>
每天一个linux命令(1):ls命令
查看>>
深入浅出 Java Concurrency (3): 原子操作 part 2[转]
查看>>
MapReduce实现数据去重
查看>>
TOJ 1153
查看>>
机器学习技法(林轩田)学习笔记:Lecture 3 & Lecture 4
查看>>
找到数组中频次大于1/k的数
查看>>