RabbitMQ中RPC的实现及其通信机制

RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id。

RPC调用流程:

当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到rpc_queue队列,消费者等待该队列上的请求。当一个请求出现时,它会执行该任务,将带有结果的消息发送回生产者。生产者等待回调队列上的数据,当消息出现时,它检查相关ID属性,如果它与请求中的值匹配,则返回对应用程序的响应。

RabbitMQ斐波拉契计算的RPC,消费者实现:

"""
基于RabbitMQ实现RPC通信机制 --> 服务端
"""

import pika
import uuid
from functools import lru_cache

class RabbitServer(object):
    def __init__(self):
        self.conn = pika.BlockingConnection(
            pika.ConnectionParameters(host=‘localhost‘, port=5672)
        )
        self.channel = self.conn.channel()

        # 声明一个队列,并进行持久化,exclusive设置为false
        self.channel.queue_declare(
            exclusive=False, durable=True, queue=‘task_queue‘
        )

        # 声明一个exhange交换机,类型为topic
        self.channel.exchange_declare(
            exchange=‘logs_rpc‘, exchange_type=‘topic‘, durable=True
        )

        # 将队列与交换机进行绑定
        routing_keys = [‘#‘]  # 接受所有的消息
        for routing_key in routing_keys:
            self.channel.queue_bind(
                exchange=‘logs_rpc‘, queue=‘task_queue‘, routing_key=routing_key
            )

    @lru_cache()
    def fib(self, n):
        """
        斐波那契数列.===>程序的处理逻辑
        使用lru_cache 优化递归
        :param n:
        :return:
        """
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return self.fib(n - 1) + self.fib(n - 2)

    def call_back(self, channel, method, properties, body):
        print(‘------------------------------------------‘)
        print(‘接收到的消息为(斐波那契数列的入参项为):{}‘.format(str(body)))
        print(‘消息的相关属性为:‘)
        print(properties)
        value = self.fib(int(body))
        print(‘斐波那契数列的运行结果为:{}‘.format(str(value)))

        # 交换机将消息发送到队列
        self.channel.basic_publish(
            exchange=‘‘,
            routing_key=properties.reply_to,
            body=str(value),
            properties=pika.BasicProperties(
                delivery_mode=2,
                correlation_id=properties.correlation_id,
            ))

        # 消费者对消息进行确认
        self.channel.basic_ack(delivery_tag=method.delivery_tag)

    def receive_msg(self):
        print(‘开始接受消息...‘)
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            consumer_callback=self.call_back,
            queue=‘task_queue‘,
            no_ack=False,  # 消费者对消息进行确认
            consumer_tag=str(uuid.uuid4())
        )

    def consume(self):
        self.receive_msg()
        self.channel.start_consuming()

if __name__ == ‘__main__‘:
    rabbit_consumer = RabbitServer()
    rabbit_consumer.consume()

生产者实现:

"""
基于RabbitMQ实现RPC通信机制 --> 客户端
"""

import pika
import uuid
import time

class RabbitClient(object):
    def __init__(self):
        # 与RabbitMq服务器建立连接
        self.conn = pika.BlockingConnection(
            pika.ConnectionParameters(host=‘localhost‘, port=5672)
        )
        self.channel = self.conn.channel()

        # 声明一个exchange交换机,交换机的类型为topic
        self.channel.exchange_declare(
            exchange=‘logs_rpc‘, exchange_type=‘topic‘, durable=True
        )

        # 声明一个回调队列,用于接受RPC回调结果的运行结果
        result = self.channel.queue_declare(durable=True, exclusive=False)
        self.call_queue = result.method.queue

        # 从回调队列当中获取运行结果.
        self.channel.basic_consume(
            consumer_callback=self.on_response,
            queue=self.call_queue,
            no_ack=False
        )

    def on_response(self, channel, method, properties, body):
        """
        对收到的消息进行确认
        找到correlation_id与服务端的消息标识匹配的消息结果
        :param channel:
        :param method:
        :param properties:
        :param body:
        :return:
        """
        if self.corr_id == properties.correlation_id:
            self.response = body
            print(‘斐波那契数列的RPC返回结果是:{}‘.format(body))
            print(‘相关属性信息:‘)
            print(properties)
        self.channel.basic_ack(delivery_tag=method.delivery_tag)

    def send_msg(self, routing_key, message):
        """
        exchange交换机将根据消息的路由键将消息路由到对应的queue当中
        :param routing_key: 消息的路由键
        :param message: 生成者发送的消息
        :return:
        """
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange=‘logs_rpc‘,
            routing_key=routing_key,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,
                correlation_id=self.corr_id,
                reply_to=self.call_queue,
            ))

        while self.response is None:
            print(‘等待远程服务端的返回结果...‘)
            self.conn.process_data_events()  # 非阻塞式的不断获取消息.

        return self.response

    def close(self):
        self.conn.close()

if __name__ == "__main__":
    rabbit_producer = RabbitClient()
    routing_key = ‘hello every one‘
    start_time = int(time.time())
    for item in range(2000):
        num = str(item)
        print(‘生产者发送的消息为:{}‘.format(num))
        rabbit_producer.send_msg(routing_key, num)
    end_time = int(time.time())
    print("耗时{}s".format(str(end_time - start_time)))

计算2000以内的斐波拉契数列,执行结果如下:

原文地址:https://www.cnblogs.com/FG123/p/10137411.html

时间: 2024-08-13 08:05:06

RabbitMQ中RPC的实现及其通信机制的相关文章

RabbitMQ 实现RPC

实现RPC 首先要弄明白,RPC是个什么东西. (RPC) Remote Procedure Call Protocol 远程过程调用协议 在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器.但是在做开发时候往往要用到其它团队的方法,因为已经有了实现.但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效.RPC协议定义了规划,其它的公司都给出了不同的实现.比如微软的wcf,以及现在火热的WebApi. 在Rabbit

Android中的常见通信机制和Linux中的通信机制

Handler Handler是Android系统中的一种消息传递机制,起作用是应对多线程场景.将A进程的消息传递给B线程,实现异步消息处理.很多情况是将工作线程中需要更新UI的操作消息传递给UI主线程,而实现更新UI操作. 因为工作线程和主线程是共享地址空间,即Handler实例对象mHandler位于线程间共享的内存堆上,工作线程和主线程直接使用该对象,只需要注意多线程的同步问题.工作系统通过mHandler向其成员变量MessageQueue中添加Message,而主线程一直处于loop中

TensorFlow中的通信机制——Rendezvous(二)gRPC传输

背景 [作者:DeepLearningStack,阿里巴巴算法工程师,开源TensorFlow Contributor] 本篇是TensorFlow通信机制系列的第二篇文章,主要梳理使用gRPC网络传输部分模块的结构和源码.如果读者对TensorFlow中Rendezvous部分的基本结构和原理还不是非常了解,那么建议先从这篇文章开始阅读.TensorFlow在最初被开源时还只是个单机的异构训练框架,在迭代到0.8版本开始正式支持多机分布式训练.与其他分布式训练框架不同,Google选用了开源项

.Net中Remoting通信机制简单实例

.Net中Remoting通信机制 前言: 本程序例子实现一个简单的Remoting通信案例 本程序采用语言:c# 编译工具:vs2013工程文件 编译环境:.net 4.0 程序模块: Test测试 Talker Server端 Client端 源代码工程文件下载 Test测试程序截图: Talker类: 1 public class Talker : MarshalByRefObject 2 { 3 public void Talk(string word) 4 { 5 System.Con

.Net中Remoting通信机制

Remoting通信机制 Remoting介绍 主要元素 通道类型 激活方式 对象定义 Remoting介绍 什么是Remoting,简而言之,我们可以将其看作是一种分布式处理方式. 从微软的产品角度来看,可以说Remoting就是DCOM(分布式组件对象模型,分布式组件对象模式)的一种升级,它改善了很多功能,并极好的融合到.Net平台下.Microsoft .NET Remoting 提供了一种允许对象通过应用程序域与另一对象进行交互的框架.这也正是我们使用Remoting的原因.为什么呢?在

(3)MEF插件系统中通信机制的设计和实现

1.背景 一般的WinForm中通过C#自带的Event机制便能很好的实现事件的注册和分发,但是,在插件系统中却不能这么简单的直接用已有的类来完成.一个插件本不包含另外一个插件,它们均是独立解耦的,实现插件和插件间的通信还需要我们设计出一个事件引擎来完成这个需求. 目前很多高级语言中基本都实现了观察者模式,并进行了自己的包装.比如C#中的delegate和event组合,java awt中的Event和addActionListener组合,Flex中的Event.addEventListene

TensorFlow中的通信机制——Rendezvous(一)本地传输

背景 [作者:DeepLearningStack,阿里巴巴算法工程师,开源TensorFlow Contributor] 在TensorFlow源码中我们经常能看到一个奇怪的词--Rendezvous.如果从仔细统计该单词出现的频率和模块,你会发现无论在单机还是分布式,无论在core目录还是contrib目录都存在它的身影,所涉及的模块非常多.Rendezvous是一个法语单词,发音也比较特殊,一般直译为"约会.相会.会和",而在TensorFlow中,Rendezvous是用来完成消

RabbitMQ中文文档PHP版本(六)--远程过程调用(RPC)

2019年12月10日10:05:54 原文:https://www.rabbitmq.com/tutorials/tutorial-six-php.html 远程过程调用(RPC) (使用php-amqplib) 先决条件 本教程假定RabbitMQ 已在标准端口(5672)的本地主机上安装并运行.如果您使用其他主机,端口或凭据,则连接设置需要进行调整. 在哪里获得帮助 如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系. 在第二篇教程中,我们学习了如何使用工作队列在多个工作人员之间分

RabbitMQ中 exchange、route、queue的关系

从AMQP协议可以看出,MessageQueue.Exchange和Binding构成了AMQP协议的核心,下面我们就围绕这三个主要组件    从应用使用的角度全面的介绍如何利用Rabbit MQ构建消息队列以及使用过程中的注意事项. 1. 声明MessageQueue 在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue.这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确: a)消费者是无法订阅或者获取不存在的Me