grpc.go

package naming

import (
    "encoding/json"

    etcd "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/naming"
)

// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
//GRPCResolver  创建一个 grpc.Watcher对于目标对象,追踪目标对象的改变
type GRPCResolver struct {
    // Client is an initialized etcd client.
    Client *etcd.Client
}
//监听对象更新
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
    switch nm.Op {
    case naming.Add:
        var v []byte
        if v, err = json.Marshal(nm); err != nil {
            return grpc.Errorf(codes.InvalidArgument, err.Error())
        }
        _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
    case naming.Delete:
        _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
    default:
        return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op")
    }
    return err
}

func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
    ctx, cancel := context.WithCancel(context.Background())
    w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
    return w, nil
}

type gRPCWatcher struct {
    c      *etcd.Client
    target string
    ctx    context.Context
    cancel context.CancelFunc
    wch    etcd.WatchChan
    err    error
}

// Next gets the next set of updates from the etcd resolver.
// Calls to Next should be serialized; concurrent calls are not safe since
// there is no way to reconcile the update ordering.
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
    if gw.wch == nil {
        // first Next() returns all addresses
        return gw.firstNext()
    }
    if gw.err != nil {
        return nil, gw.err
    }

    // process new events on target/*
    wr, ok := <-gw.wch
    if !ok {
        gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed")
        return nil, gw.err
    }
    if gw.err = wr.Err(); gw.err != nil {
        return nil, gw.err
    }

    updates := make([]*naming.Update, 0, len(wr.Events))
    for _, e := range wr.Events {
        var jupdate naming.Update
        var err error
        switch e.Type {
        case etcd.EventTypePut:
            err = json.Unmarshal(e.Kv.Value, &jupdate)
            jupdate.Op = naming.Add
        case etcd.EventTypeDelete:
            err = json.Unmarshal(e.PrevKv.Value, &jupdate)
            jupdate.Op = naming.Delete
        }
        if err == nil {
            updates = append(updates, &jupdate)
        }
    }
    return updates, nil
}

func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
    // Use serialized request so resolution still works if the target etcd
    // server is partitioned away from the quorum.
    resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
    if gw.err = err; err != nil {
        return nil, err
    }

    updates := make([]*naming.Update, 0, len(resp.Kvs))
    for _, kv := range resp.Kvs {
        var jupdate naming.Update
        if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
            continue
        }
        updates = append(updates, &jupdate)
    }

    opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
    gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
    return updates, nil
}

func (gw *gRPCWatcher) Close() { gw.cancel() }
				
时间: 2024-07-30 18:41:42

grpc.go的相关文章

golang环境中grpc与zipkin的集成

对于分布式系统服务,一个请求需要系统中多个模块,上百台机器配合才能完成.当进行系统调优时单靠分析日志是很难发现到系能瓶颈的,效率非常低下,为此google上线了分布式服务追踪系统Drapper.目前市面上的实现有Twitter的zipkin和阿里的鹰眼系统.最近我在做服务追踪,我们后台使用golang开发,grpc做服务通信,最终选择集成zipkin来做,这个公开资料比较少,写文章来总结一下,方便后来者. 一.首先zipkin的安装使用 1.下载 wget -O zipkin.jar 'http

Google 高性能 RPC 框架 gRPC 1.0.0 发布(附精彩评论)

gRPC是一个高性能.开源.通用的RPC框架,面向移动和HTTP/2设计,是由谷歌发布的首款基于Protocol Buffers的RPC框架. gRPC基于HTTP/2标准设计,带来诸如双向流.流控.头部压缩.单TCP连接上的多复用请求等特性.这些特性使得其在移动设备上表现更好,更省电且节省空间占用. gRPC 1.0版本是2015年面世以后的第一次版本发布,开发者可以把该版本用于生产.API现在也是很稳定的. 关于Java版本发布情况,大家阅读发布日志:https://github.com/g

grpc set up..

get the grpc source file.. 1 git clone https://github.com/grpc/grpc 2 git submodule update --init --recursive 3 cd third-party/protobuf 4 ./autogen.sh #this step consist of a step of curl getting gmock, might failed.. advice is to use thunder to down

gRPC的Go语言使用例子

gRPC刚被Google开源, gRPC是啥? 先照抄一段说明: gRPC是一个高性能.通用的开源RPC框架,其由Google主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers)序列化协议开发,且支持众多开发语言.gRPC提供了一种简单的方法来精确地定义服务和为iOS.Android和后台支持服务自动生成可靠性很强的客户端功能库.客户端充分利用高级流和链接功能,从而有助于节省带宽.降低的TCP链接次数.节省CPU使用.和电池寿命. 这段抄

谷歌发布的首款基于HTTP/2和protobuf的RPC框架:GRPC

Google 刚刚开源了grpc,  一个基于HTTP2 和 Protobuf 的高性能.开源.通用的RPC框架.Protobuf 本身虽然提供了RPC  的定义语法,但是一直以来,Google 只开源了Protobuf 序列化反序列化的代码,而没有开源RPC 的实现,于是存在着众多良莠不齐的第三方RPC 实现,不过我在项目中采用WCF搭配Protobuf是一个很不错的RPC实现,Google这个框架是是基于HTTP2的,这是他有特色的地方,带来诸如双向流.流控.头部压缩.单TCP连接上的多复用

Windows上编译GRPC

Windows上源码编译多数开源软件都很麻烦 编译环境:VS2015(grpc支持2013及以上,2012上没有Nuget,编译起来要费劲的多) 编译GRPC涉及内容 grpc protobuf grpc_protoc_plugin(本文以c++语言为编译目标,因此只涉及grpc_cpp_plugin) zlib grpc代码下载后,执行git submodule update --init初始化依赖的submodule 1. protobuf 参考readme用CMAKE生成工程文件,编译即可

可以用grpc+delphi(远程调用)

datasnap用多账套+连接池,短连接,一个服务支持2000个客户端完全没有问题,你如果客户端量还大,可以考虑把datasnap做成集群 这个老方法了,可以用grpc+delphi.java+delphi.net都快死了.现在很多项目服务端都用golang http://www.grpc.io/docs/quickstart/cpp.html

gRPC Client的负载均衡器

一.gRPC是什么? gRPC是一个高性能.通用的开源RPC框架,其由Google主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers)序列化协议开发,且支持众多开发语言.gRPC提供了一种简单的方法来精确地定义服务和为iOS.Android和后台支持服务自动生成 可靠性很强的客户端功能库.客户端充分利用高级流和链接功能,从而有助于节省带宽.降低的TCP链接次数.节省CPU使用.和电池寿命. 二.为什么使用gRPC? 有了 gRPC, 我们

java 使用grpc步骤

1.配置grpc maven依赖 2.设置idea 插件 <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.4.1.Final</version> </extension> </extensions

gRPC中Any类型的使用(Java和NodeJs端)

工作中要把原来Java服务端基于SpringMVC的服务改为使用gRPC直接调用.由于原Service的返回值为动态的Map类型,key值不确定,且value的类型不唯一,因此使用了protobuf 3中的map和Any类型.在这个过程中遇到了一些困难,查阅资料时发现这一块的资料不是很多,尤其是在NodeJS的gRPC-Client处理google.protobuf.Any类型,完全找不到相关的资料.好在自己摸索和调试后解决了问题,因此记录下来以供他人之需. testservice.proto: