grpc| python 实战 grpc【h】

title: grpc| python 实战 grpc
description: 只要代码可以跑起来, 很多难题都会迎刃而解. so, keep coding and stay hungry.

之前用 swoole 写 server 时就接触过 protobuf, 本来以为基于 protobuf 的 grpc, 上手起来会轻轻松松, 没想到结结实实的折腾了许久, 从 php 开始配置 grpc 需要的环境, 到无奈转到 grpc 最亲和 的 go 语言, 又无奈面对各种 go get 撞墙, 直到现在使用 python 语言, 终于 丝般顺滑 的跑完了官网 demo. 代码运行起来后, 之前 grpc 中不太理解的概念, 终于可以 会心一笑 了.

  • grpc 的基础: protobuf
  • grpc helloworld: python 实战 grpc 环境配置
  • grpc basic: grpc 4 种通信方式

grpc 的基础: protobuf

grpc 使用 protobuf 进行数据传输. protobuf 是一种数据交换格式, 由三部分组成:

  • proto 文件: 使用的 proto 语法的文本文件, 用来定义数据格式

proto语法现在有 proto2 和 proto3 两个版本, 推荐使用 proto3, 更加简洁明了

// [python quickstart](https://grpc.io/docs/quickstart/python.html#run-a-grpc-application)
// python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto

// helloworld.proto
syntax = "proto3";

service Greeter {
    rpc SayHello(HelloRequest) returns (HelloReply) {}
    rpc SayHelloAgain(HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}
  • protoc: protobuf 编译器(compile), 将 proto 文件编译成不同语言的实现, 这样不同语言中的数据就可以和 protobuf 格式的数据进行交互
  • protobuf 运行时(runtime): protobuf 运行时所需要的库, 和 protoc 编译生成的代码进行交互

使用 protobuf 的过程:

编写 proto 文件 -> 使用 protoc 编译 -> 添加 protobuf 运行时 -> 项目中集成

更新 protobuf 的过程:

修改 proto 文件 -> 使用 protoc 重新编译 -> 项目中修改集成的地方

PS: proto3 的语法非常非常的简单, 上手 protobuf 也很轻松, 反而是配置 protoc 的环境容易卡住, 所以推荐使用 python 入门, 配置 protoc 这一步非常省心.

grpc helloworld: python 实战 grpc 环境配置

上面已经定义好了 grpc helloworld demo 所需的 proto 文件, 现在来具体看看 python 怎么一步步把 grpc helloworld 的环境搭建起来:

  • protobuf 运行时(runtime)

这一步很简单, 安装 grpc 相关的 python 模块(module) 即可

pip install grpcio
  • 使用 protoc 编译 proto 文件, 生成 python 语言的实现
# 安装 python 下的 protoc 编译器
pip install grpcio-tools

# 编译 proto 文件
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. helloworld.proto

python -m grpc_tools.protoc: python 下的 protoc 编译器通过 python 模块(module) 实现, 所以说这一步非常省心
--python_out=. : 编译生成处理 protobuf 相关的代码的路径, 这里生成到当前目录
--grpc_python_out=. : 编译生成处理 grpc 相关的代码的路径, 这里生成到当前目录
-I. helloworld.proto : proto 文件的路径, 这里的 proto 文件在当前目录

编译后生成的代码:

  1. helloworld_pb2.py: 用来和 protobuf 数据进行交互
  2. helloworld_pb2_grpc.py: 用来和 grpc 进行交互
  • 最后一步, 编写 helloworld 的 grpc 实现:

服务器端: helloworld_grpc_server.py

from concurrent import futures
import time
import grpc
import helloworld_pb2
import helloworld_pb2_grpc

# 实现 proto 文件中定义的 GreeterServicer
class Greeter(helloworld_pb2_grpc.GreeterServicer):
    # 实现 proto 文件中定义的 rpc 调用
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message = ‘hello {msg}‘.format(msg = request.name))

    def SayHelloAgain(self, request, context):
        return helloworld_pb2.HelloReply(message=‘hello {msg}‘.format(msg = request.name))

def serve():
    # 启动 rpc 服务
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port(‘[::]:50051‘)
    server.start()
    try:
        while True:
            time.sleep(60*60*24) # one day in seconds
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == ‘__main__‘:
    serve()

客户端: helloworld_grpc_client.py

import grpc
import helloworld_pb2
import helloworld_pb2_grpc

def run():
    # 连接 rpc 服务器
    channel = grpc.insecure_channel(‘localhost:50051‘)
    # 调用 rpc 服务
    stub = helloworld_pb2_grpc.GreeterStub(channel)
    response = stub.SayHello(helloworld_pb2.HelloRequest(name=‘czl‘))
    print("Greeter client received: " + response.message)
    response = stub.SayHelloAgain(helloworld_pb2.HelloRequest(name=‘daydaygo‘))
    print("Greeter client received: " + response.message)

if __name__ == ‘__main__‘:
    run()

运行 python helloworld_grpc_server.pypython helloworld_grpc_client.py, 就可以看到效果了

grpc basic: 4 种通信方式

helloworld 使用了最简单的 grpc 通信方式: 类似 http 协议的一次 request+response.

根据不同的业务场景, grpc 支持 4 种通信方式:

  • 客服端一次请求, 服务器一次应答
  • 客服端一次请求, 服务器多次应答(流式)
  • 客服端多次请求(流式), 服务器一次应答
  • 客服端多次请求(流式), 服务器多次应答(流式)

官方提供了一个 route guide service 的 demo, 应用到了这 4 种通信方式, 具体的业务如下:

  • 数据源: json 格式的数据源, 存储了很多地点, 每个地点由经纬度(point)和地名(location)组成
  • 通信方式 1: 客户端请求一个地点是否在数据源中
  • 通信方式 2: 客户端指定一个矩形范围(矩形的对角点坐标), 服务器返回这个范围内的地点信息
  • 通信方式 3: 客户端给服务器发送多个地点信息, 服务器返回汇总信息(summary)
  • 通信方式 4: 客户端和服务器使用地点信息 聊天(chat)

对应的 proto 文件: routeguide.proto:

// [python quickstart](https://grpc.io/docs/quickstart/python.html#run-a-grpc-application)
// python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. routeguide.proto

syntax = "proto3";

service RouteGuide {
    // simple rpc
    rpc GetFeature(Point) returns (Feature) {}
    // server2client stream rpc
    rpc ListFeature(Rectangle) returns (stream Feature) {}
    // client2server stream rpc
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    // stream rpc
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

message Point {
    int32 latitude = 1;
    int32 longitude = 2;
}

message Rectangle {
    Point lo = 1;
    Point hi = 2;
}

message Feature {
    string name = 1;
    Point location = 2;
}

message RouteNote {
    Point location = 1;
    string message = 2;
}

message RouteSummary {
    int32 point_count = 1;
    int32 feature_count = 2;
    int32 distance = 3;
    int32 elapsed_time = 4;
}

proto 中想要表示流式传输, 只需要添加 stream 关键字即可

同样的, 使用 protoc 生成代码:

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. routeguide.proto

生成了 routeguide_pb2.py routeguide_pb2_grpc.py 文件, 和上面的 helloworld 对应

这里需要增加一个 routeguide_db.py, 用来处理 demo 中数据源(routeguide_db.json)文件:

import json
import routeguide_pb2

def read_routeguide_db():
    feature_list = []
    with open(‘routeguide_db.json‘) as f:
        for item in json.load(f):
            feature = routeguide_pb2.Feature(
                name = item[‘name‘],
                location = routeguide_pb2.Point(
                    latitude=item[‘location‘][‘latitude‘],
                    longitude=item[‘location‘][‘longitude‘]
                )
            )
            feature_list.append(feature)
    return feature_list

处理 json 的过程很简单, 解析 json 数据得到由坐标点组成的数组

好了, 还剩下一个难题: 怎么处理流式数据呢?. 答案是 for-in + yield

  • 客户端读取服务器发送的流式数据
print("-------------- ListFeatures --------------")
response = stub.ListFeature(routeguide_pb2.Rectangle(
    lo = routeguide_pb2.Point(latitude=400000000, longitude=-750000000),
    hi=routeguide_pb2.Point(latitude=420000000, longitude=-730000000)
))
for feature in response:
    print("Feature called {name} at {location}".format(name=feature.name, location=feature.location))
  • 客户端发送流式数据给服务器
def generate_route(feature_list):
    for _ in range(0, 20):
        random_feature = feature_list[random.randint(0, len(feature_list) - 1)]
        print("random feature {name} at {location}".format(
            name=random_feature.name, location=random_feature.location))
        yield random_feature.location

print("-------------- RecordRoute --------------")
feature_list = routeguide_db.read_routeguide_db()
route_iterator = generate_route(feature_list)
response = stub.RecordRoute(route_iterator)
print("point count: {point_count} feature count: {feature_count} distance: {distance} elapsed time:{elapsed_time}".format(
    point_count  = response.point_count,
    feature_count = response.feature_count,
    distance = response.distance,
    elapsed_time = response.elapsed_time
))
  • 完整的服务器端代码: routeguide_grpc_server.py:

from concurrent import futures
import math
import time
import grpc
import routeguide_pb2
import routeguide_pb2_grpc
import routeguide_db

def get_feature(db, point):
    for feature in db:
        if feature.location == point:
            return feature
    return None

def get_distance(start, end):
    """Distance between two points."""
    coord_factor = 10000000.0
    lat_1 = start.latitude / coord_factor
    lat_2 = end.latitude / coord_factor
    lon_1 = start.longitude / coord_factor
    lon_2 = end.longitude / coord_factor
    lat_rad_1 = math.radians(lat_1)
    lat_rad_2 = math.radians(lat_2)
    delta_lat_rad = math.radians(lat_2 - lat_1)
    delta_lon_rad = math.radians(lon_2 - lon_1)

    # Formula is based on http://mathforum.org/library/drmath/view/51879.html
    a = (pow(math.sin(delta_lat_rad / 2), 2) +
         (math.cos(lat_rad_1) * math.cos(lat_rad_2) * pow(
             math.sin(delta_lon_rad / 2), 2)))
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    R = 6371000
    # metres
    return R * c

class RouteGuide(routeguide_pb2_grpc.RouteGuideServicer):
    def __init__(self):
        self.db = routeguide_db.read_routeguide_db()

    def GetFeature(self, request, context):
        feature = get_feature(self.db, request)
        if feature is None:
            return routeguide_pb2.Feature(name = ‘‘, location = request)
        else:
            return feature

    def ListFeature(self, request, context):
        left = min(request.lo.longitude, request.hi.longitude)
        right = max(request.lo.longitude, request.hi.longitude)
        top = max(request.lo.latitude, request.hi.latitude)
        bottom = min(request.lo.latitude, request.hi.latitude)
        for feature in self.db:
            if (feature.location.longitude >= left
                and feature.location.longitude <= right
            and feature.location.latitude >= bottom
            and feature.location.latitude <= top):
                yield feature

    def RecordRoute(self, request_iterator, context):
        point_count = 0
        feature_count = 1
        distance = 0.0
        prev_point = None

        start_time = time.time()
        for point in request_iterator:
            point_count += 1
            if get_feature(self.db, point):
                feature_count += 1
            if prev_point:
                distance += get_distance(prev_point, point)
            prev_point = point
        elapsed_time = time.time() - start_time
        return routeguide_pb2.RouteSummary(
            point_count = point_count,
            feature_count = feature_count,
            distance = int(distance),
            elapsed_time = int(elapsed_time)
        )

    def RouteChat(self, request_iterator, context):
        prev_notes = []
        for new_note in request_iterator:
            for prev_note in prev_notes:
                if prev_note.location == new_note.location:
                    yield prev_note
            prev_notes.append(new_note)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    routeguide_pb2_grpc.add_RouteGuideServicer_to_server(RouteGuide(), server)
    server.add_insecure_port(‘[::]:50051‘)
    server.start()
    try:
        while True:
            time.sleep(60*60*24) # one day in seconds
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == ‘__main__‘:
    serve()
  • 完整的客户端代码: routeguide_grpc_client.py:
import grpc
import routeguide_pb2
import routeguide_pb2_grpc
import routeguide_db
import random

def get_feature(feature):
    if not feature.location:
        print("Server returned incomplete feature")
        return
    if feature.name:
        print("Feature called {name} at {location}".format(name = feature.name, location = feature.location))
    else:
        print("Found no feature at {location}".format(location = feature.location))

def generate_route(feature_list):
    for _ in range(0, 20):
        random_feature = feature_list[random.randint(0, len(feature_list) - 1)]
        print("random feature {name} at {location}".format(
            name=random_feature.name, location=random_feature.location))
        yield random_feature.location

def make_route_note(message, latitude, longitude):
    return routeguide_pb2.RouteNote(
        message=message,
        location=routeguide_pb2.Point(latitude=latitude, longitude=longitude))

def generate_route_note():
    msgs = [
        make_route_note(‘msg 1‘, 0, 0),
        make_route_note(‘msg 2‘, 1, 0),
        make_route_note(‘msg 3‘, 0, 1),
        make_route_note(‘msg 4‘, 0, 0),
        make_route_note(‘msg 5‘, 1, 1),
    ]
    for msg in msgs:
        print("send message {message} location {location}".format(message = msg.message, location = msg.location))
        yield msg

def run():
    channel = grpc.insecure_channel(‘localhost:50051‘)
    stub = routeguide_pb2_grpc.RouteGuideStub(channel)
    print("-------------- GetFeature --------------")
    response = stub.GetFeature(routeguide_pb2.Point(latitude=409146138, longitude=-746188906))
    get_feature(response)
    response = stub.GetFeature(routeguide_pb2.Point(latitude=0, longitude=-0))
    get_feature(response)

    print("-------------- ListFeatures --------------")
    response = stub.ListFeature(routeguide_pb2.Rectangle(
        lo = routeguide_pb2.Point(latitude=400000000, longitude=-750000000),
        hi=routeguide_pb2.Point(latitude=420000000, longitude=-730000000)
    ))
    for feature in response:
        print("Feature called {name} at {location}".format(name=feature.name, location=feature.location))

    print("-------------- RecordRoute --------------")
    feature_list = routeguide_db.read_routeguide_db()
    route_iterator = generate_route(feature_list)
    response = stub.RecordRoute(route_iterator)
    print("point count: {point_count} feature count: {feature_count} distance: {distance} elapsed time:{elapsed_time}".format(
        point_count  = response.point_count,
        feature_count = response.feature_count,
        distance = response.distance,
        elapsed_time = response.elapsed_time
    ))

    print("-------------- RouteChat --------------")
    response = stub.RouteChat(generate_route_note())
    for msg in response:
        print("recived message {message} location {location}".format(
            message=msg.message, location=msg.location))

if __name__ == ‘__main__‘:
    run()

运行 python routeguide_grpc_server.pypython routeguide_grpc_client.py 就可以看到效果

写在最后

只要代码可以跑起来, 很多难题都会 迎刃而解

so, keep coding and stay hungry

关于 protobuf 的更多物料:

关于 python 实战 grpc 的更多物料:

原文地址:https://www.cnblogs.com/ExMan/p/12112678.html

时间: 2024-12-20 06:37:12

grpc| python 实战 grpc【h】的相关文章

python 使用gRPC

Python gRPC 概述: gRPC 是谷歌开源的一个rpc(远程程序调用)框架,可以轻松实现跨语言,跨平台编程,其采用gRPC协议(基于HTTP2). rpc: remote procedure call, 翻译过来就是是远程程序调用.具体来说,就是客户端c1需要调用服务器s1上的某个方法(函数),得到相应的返回值并传递给c1. gRPC协议 要说gRPC协议需要先了解HTTP2, 虽然HTTP1.X 协议至今仍是主流协议,但是随着我们对性能要求越来越高,和web规模的不断扩大,HTTP2

python使用grpc,并打包成python模块

xmlrpc依然是可行的方案一.环境python3.6 二.安装模块 pip3 install grpcio pip3 install protobuf pip3 install grpcio-tools 三.准备grpc配置文件grpcdatabase.proto目录结构:内容如下: syntax = "proto3"; package grpcServer; service Greeter { rpc GetSummary (Request) returns (Return) {}

python的gRPC示例

参考URL: https://segmentfault.com/a/1190000015220713?utm_source=channel-hottest gRPC 是一个高性能.开源和通用的 RPC 框架,面向移动和 HTTP/2 设计.目前提供 C.Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持. gRPC 基于 HTT

gRPC Python 入门到生产环境

所有的代码在 https://github.com/xsren/learning_record/tree/master/grpc,欢迎star. 一.先了解几个概念 RPC RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议. gRPC gRPC是一个高性能.通用的开源RPC框架,其由Google主要由开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers)序列化协议

python实战--数据结构二叉树

此文将讲述如何用python实战解决二叉树实验 前面已经讲述了python语言的基本用法,现在让我们实战一下具体明确python的用法 点击我进入python速成笔记 先看一下最终效果图: 首先我们要定义二叉树结点的一个类,在python中定义二叉树结点代码如下: #二叉链表 class BiTree: def __init__(self, elementType=None, lchild=None, rchild=None): self.elementType = elementType se

BoW图像检索Python实战

下文来自我的博客:BoW图像检索Python实战 前几天把HABI哈希图像检索工具包更新到V2.0版本后,小白菜又重新回头来用Python搞BoW词袋模型,一方面主要是练练Python,另一方面也是为了CBIR群开讲的关于图像检索群活动第二期而准备的一些素材.关于BoW,网上堆资料讲得挺好挺全的了,小白菜自己在曾留下过一篇讲解BoW词袋构建过程的博文Bag of Words模型,所以这里主要讲讲BoW的实战.不过在实战前,小白菜还想在结合自己这两年多BoW的思考和沉淀重新以更直白的方式对BoW做

Python实战:美女图片下载器,海量图片任你下载

Python应用现在如火如荼,应用范围很广.因其效率高开发迅速的优势,快速进入编程语言排行榜前几名.本系列文章致力于可以全面系统的介绍Python语言开发知识和相关知识总结.希望大家能够快速入门并学习Python这门语言. 本文是在前一部分Python基础之上Python实战:Python爬虫学习教程,获取电影排行榜,再次升级的Python网页爬虫实战课程. 1.项目概述. 利用XPath和requests模块进行网页抓取与分析,达到网页图片下载的效果. 抓爬图片地址:http://www.2c

Python实战:Python爬虫学习教程,获取电影排行榜

Python应用现在如火如荼,应用范围很广.因其效率高开发迅速的优势,快速进入编程语言排行榜前几名.本系列文章致力于可以全面系统的介绍Python语言开发知识和相关知识总结.希望大家能够快速入门并学习Python这门语言. 本文是在前一部分Python基础之上程序员带你十天快速入门Python,玩转电脑软件开发(四),再次进行的Python爬虫实战课程. 正则表达式实例简单详解 正则表达式干什么用? 就是在字符串中提取我们需要的内容的. 记得哦,要先引用正则表达式模块的哦. re就是正则表达式相

zeromq 学习和python实战

参考文档: 官网 http://zeromq.org/ http://www.cnblogs.com/rainbowzc/p/3357594.html 原理解读 zeromq只是一层针对socket的封装,介于传输层和应用层之间,并不是单独的服务或者程序,仅仅是一套组件. zeromq使用c语言编写,相应速度非常快. 主要有以下几个部分: - 主线程:负责相应用户的请求,比如创建zmq等 - IO线程:主要负责网络IO的调度,每个IO线程会对应一个异步poll(如select,epoll等),使