PICE(6):集群环境里多异类端点gRPC Streaming - Heterogeneous multi-endpoints gRPC streaming

gRPC Streaming的操作对象由服务端和客户端组成。在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务。这时调用服务端又成为了提供服务端的客户端了(服务消费端)。那么如果我们用streaming形式来提交服务需求及获取计算结果就是以一个服务端为Source另一个服务端为通过式passthrough Flow的stream运算了。讲详细点就是请求方用需求构建Source,以连接Flow的方式把需求传递给服务提供方。服务提供方在Flow内部对需求进行处理后再把结果返回来,请求方run这个连接的stream应该就可以得到需要的结果了。下面我们就针对以上场景在一个由JDBC,Cassandra,MongoDB几种gRPC服务组成的集群环境里示范在这几个服务之间的stream连接和运算。

首先,我们设计一个简单但比较有代表性的例子:从JDBC的客户端传一个字符型消息hello给JDBC服务端、JDBC服务端在hello后面添加“,from jdbc to cassandra”然后通过Cassandra客户端把消息当作请求传给Cassandra服务端、Cassandra服务端在消息后面再加上“,from cassandra to mongo”并通过MongoDB客户端把消息传给MongoDB服务端、最后MongoDB服务端在消息后面添加“,mongo says hi”。整个stream的形状是 jdbc-client->jdbc-service->cassandra-service-mongodb-service。如果run这个stream得到的结果应该是一个描述完整移动路径的消息。从请求-服务角度来描述:我们可以把每个节点消息更新处理当作某种完整的数据处理过程。

以下分别是JDBC,Cassandra,MongoDB gRPC IDL定义:

service JDBCServices {
  rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
}

service CQLServices {
  rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
}

service MGOServices {
  rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
}

三个服务共用了protobuf消息类型HelloMsg。我们把共用的消息统一放到一个common.proto文件里:

syntax = "proto3";

package sdp.grpc.services;

message HelloMsg {
  string hello = 1;
}

message DataRow {
    string countyname = 1;
    string statename = 2;
    int32 reportyear = 3;
    int32 value = 4;
}

然后在示范应用的.proto文件中用import 把所有protobuf,gRPC服务定义都集中起来:

syntax = "proto3";

import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";

option (scalapb.options) = {
  // use a custom Scala package name
  // package_name: "io.ontherocks.introgrpc.demo"

  // don‘t append file name to package
  flat_package: true

  // generate one Scala file for all messages (services still get their own file)
  single_file: true

  // add imports to generated file
  // useful when extending traits or using custom types
  // import: "io.ontherocks.hellogrpc.RockingMessage"

  // code to put at the top of generated file
  // works only with `single_file: true`
  //preamble: "sealed trait SomeSealedTrait"
};

/*
 * Demoes various customization options provided by ScalaPBs.
 */

package sdp.grpc.services;

import "misc/sdp.proto";
import "common.proto";
import "cql/cql.proto";
import "jdbc/jdbc.proto";
import "mgo/mgo.proto";

下面我们把最核心的服务实现挑出来讲解一下,先看看Cassandra服务的实现:

import sdp.grpc.mongo.client.MGOClient

class CQLStreamingServices(implicit ec: ExecutionContextExecutor,
                           mat: ActorMaterializer,  session: Session)
  extends CqlGrpcAkkaStream.CQLServices with LogSupport{
  val mongoClient = new MGOClient
  val stub = mongoClient.stub

  def sayHelloTo(msg: String): Flow[HelloMsg, HelloMsg, NotUsed] =
    Flow[HelloMsg].map { r => HelloMsg(r.hello + msg)}
      .via(stub.greeting)

  override def greeting: Flow[HelloMsg, HelloMsg, NotUsed] =
    Flow[HelloMsg]
      .via(sayHelloTo(",from cassandra to mongo"))

}

streaming方式的gRPC服务其实就是一个akka-stream的Flow[R1,R2,M],它把收到的数据R1处理后转换成R2输出。在处理R1的环节里可能会需要其它服务的运算结果。在以上例子里CQLService把收到的消息加工转换后传给MGOService并等待MGOService再深度加工返还的结果,所以sayHelloTo还是一个有两个节点的Flow:在第一个节点中对收到的消息进行加工,第二个节点把加工的消息传给另一个服务并连接它的运算结果作为本身最终的输出。调用其它跨集群节点的服务必须经该服务的gRPC客户端进行,这里调用的MGOClient:

package sdp.grpc.mongo.client

import sdp.grpc.services._
import sdp.logging.LogSupport
import io.grpc._
import common._
import sdp.grpc.services._
import akka.stream.scaladsl._
import akka.NotUsed

class MGOClient extends LogSupport {

  val channel = ManagedChannelBuilder
    .forAddress("localhost", 50051)
    .usePlaintext()
    .build()

  val stub = MgoGrpcAkkaStream.stub(channel)

}

JDBCService连接CQLService, CQLService连接MGOService:

import sdp.grpc.cassandra.client.CQLClient

class JDBCStreamingServices(implicit ec: ExecutionContextExecutor)
       extends JdbcGrpcAkkaStream.JDBCServices with LogSupport {
  val cassandraClient = new CQLClient
  val stub = cassandraClient.stub
  def sayHelloTo(msg: String): Flow[HelloMsg,HelloMsg,NotUsed] =
    Flow[HelloMsg]
    .map {r => HelloMsg(r.hello + msg)}
    .via(stub.greeting)

  override def greeting: Flow[HelloMsg, HelloMsg, NotUsed] =
    Flow[HelloMsg]
    .via(sayHelloTo(",from jdbc to cassandra"))

}

最后我们用DemoApp来示范整个过程:

package demo.sdp.grpc

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ThrottleMode}

import sdp.grpc.jdbc.client.JDBCClient

object DemoApp extends App {
  implicit val system = ActorSystem("jdbcClient")
  implicit val mat = ActorMaterializer.create(system)
  implicit val ec = system.dispatcher

  val jdbcClient = new JDBCClient

  jdbcClient.sayHello.runForeach(r => println(r.hello))

  scala.io.StdIn.readLine()
  mat.shutdown()
  system.terminate()

}

DemoApp调用了JDBCClient:

package sdp.grpc.jdbc.client

import sdp.grpc.services._
import sdp.logging.LogSupport
import io.grpc._
import common._
import sdp.grpc.services._
import akka.stream.scaladsl._
import akka.NotUsed

class JDBCClient extends LogSupport {

  val channel = ManagedChannelBuilder
    .forAddress("localhost", 50053)
    .usePlaintext()
    .build()

  val stub = JdbcGrpcAkkaStream.stub(channel)

  def sayHello: Source[HelloMsg, NotUsed] = {
    val row = HelloMsg("hello ")
    val rows = List.fill[HelloMsg](100)(row)
    Source
      .fromIterator(() => rows.iterator)
      .via(stub.greeting)
  }
}

运行DemoApp显示的结果:

hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
...

原文地址:https://www.cnblogs.com/tiger-xc/p/9660828.html

时间: 2024-10-11 23:18:12

PICE(6):集群环境里多异类端点gRPC Streaming - Heterogeneous multi-endpoints gRPC streaming的相关文章

EhCache缓存在集群环境中同步问题

由于 EhCache 是进程中的缓存系统,一旦将应用部署在集群环境中,当每一个节点维护各自的缓存数据,某个节点对缓存数据进行更新,这些更新的数据无法在其它节点中共享,这不仅会降低节点运行的效率,而且会导致数据不同步的情况发生.例如某个网站采用 A.B 两个节点作为集群部署,当 A 节点的缓存更新后,而 B 节点缓存尚未更新就可能出现用户在浏览页面的时候,一会是更新后的数据,一会是尚未更新的数据,尽管我们也可以通过 Session Sticky 技术来将用户锁定在某个节点上,但对于一些交互性比较强

Hadoop学习三十二:Win7下无法提交MapReduce Job到集群环境

一. 对hadoop eclipse plugin认识不足 http://zy19982004.iteye.com/blog/2024467曾经说到我最hadoop eclipse plugin作用的认识.但事实上我犯了一个错误,Win7 Eclipse里的MyWordCount程序一直在本地运行,没有提交到集群环境上运行(查看192.168.1.200:50030)没有这个Job.运行方式分为两种,右键Run As Java Application Run on Hadoop 如果说Run A

SUSE11 Oracle 11g RAC双机集群环境部署

Oracle RAC集群环境部署 一. 安装前准备 (1) Linux系统版本 SUSE Linux Enterprise Server 11 (x86_64) (2) Oracle database和Grid安装包 linux.x64_11gR2_database_1of2.zip linux.x64_11gR2_database_2of2.zip linux.x64_11gR2_grid.zip (3) ASMlib安装包 oracleasm-support-2.1.8-1.SLE11.x8

集群环境中使用 EhCache 缓存系统

EhCache 缓存系统 : 本章节将要介绍EhCache及EhCache实现分布式的一些解决方案.并针对于这些解决性方案做一个实现,后续将出一个提供项目模块化.服务化.插件化的VieMall快速开发平台,同时集成Dubbo服务化.Zookeeper(分布式调度/分布式配置管理服务).Redis分布式缓存技术及Memcache/Ehcache 二级缓存切换.FastDFS分布式文件系统.ActiveMQ异步消息中间件.Solr搜索.Nginx负载均衡等分布式及读写分离.如果有时间可以深入分表分库

TroubleShooting - 迁移到集群环境数据错乱问题

最近系统出了个问题,发现系统发布出去的某一条链式结构的数据出现断链的问题,调查发现这是在单系统迁移到集群环境中没有考虑清楚导致多个节点同时修改数据而出现的错误,这里介绍下问题的来龙去脉. 我们的系统持续接受外部系统提供的产品的信息(每次一个产品),对产品信息进行加工,把加工后的产品数据按产品类型加入内存数组,每次有新产品加入数组,就需要把数组数据打包成链式结构(外部数据结构要求),然后发给其他外部系统.单节点的时候对数组的产品添加,以及整个链式数据发送在同个类中使用了synchronized,所

Win7下无法提交MapReduce Job到集群环境(转)

一. 对hadoop eclipse plugin认识不足 http://zy19982004.iteye.com/blog/2024467曾经说到我最hadoop eclipse plugin作用的认识.但事实上我犯了一个错误,Win7 Eclipse里的MyWordCount程序一直在本地运行,没有提交到集群环境上运行(查看192.168.1.200:50030)没有这个Job.运行方式分为两种,右键Run As Java Application Run on Hadoop 如果说Run A

ZooKeeper伪集群环境搭建

1.从官网下载程序包. 2.解压. [[email protected] software]$ tar xzvf zookeeper-3.4.6.tar.gz 3.进入zookeeper目录后创建data目录. [[email protected] software]$ cd zookeeper-3.4.6 [[email protected] software]$ mkdir data [[email protected] software]$ cd data [[email protecte

ELKB5.2.2集群环境部署及优化终极文档

ELKB5.2.2集群环境部署 本人陆陆续续接触了ELK的1.4,2.0,2.4,5.0,5.2版本,可以说前面使用当中一直没有太多感触,最近使用5.2才慢慢有了点感觉,可见认知事务的艰难,本次文档尽量详细点,现在写文档越来越喜欢简洁了,不知道是不是不太好.不扯了看正文(注意这里的配置是优化前配置,正常使用没问题,量大时需要优化). 备注: 本次属于大版本变更,有很多修改,部署重大修改如下: 1,filebeat直接输出kafka,并drop不必要的字段如beat相关的 2,elasticsea

elasticsearch与mongodb分布式集群环境下数据同步

1.ElasticSearch是什么 ElasticSearch 是一个基于Lucene构建的开源.分布式,RESTful搜索引擎.它的服务是为具有数据库和Web前端的应用程序提供附加的组件(即可搜索的存储库).ElasticSearch为应用程序提供搜索算法和相关的基础架构,用户只需要将应用程序中的数据上载到ElasticSearch数据存储中,就可以通过RESTful URL与其交互.ElasticSearch的架构明显不同于它之前的其他搜索引擎架构,因为它是通过水平伸缩的方式来构建的.不同