RabbitMQ 使用(一)

RabbitMQ中的使用

这篇文章将会介绍关于RabbbitMQ的使用,并且使用的是kombo(客户端的Python实现)来实现;

安装

如果使用的是mac安装的话,可以先安装到指定的位置,接着配置命令访问路径:

  1. cd ~
  2. vi .bash_profile,输入下面两行
    RABBIT_HOME=/usr/local/Cellar/rabbitmq/3.6.9_1
    PATH=$PATH:$RABBIT_HOME/sbin
    
  3. esc,:wq保存并退出即可

启动和停止

开始:sudo rabbitmq-server start结束:sudo rabbitmq-server stop

Producer 和 Consumer

首先我们需要知道Producer和Consumer的初始化和其对应的publish和consumer方法。

Producer

class kombu.Producer(channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None,
compression=None, on_return=None)
# 发布消息
.publish(body, routing_key=None, delivery_mode=None, mandatory=False,
immediate=False, priority=0, content_type=None, content_encoding=None,
serializer=None, headers=None, compression=None, exchange=None,
retry=False, retry_policy=None, declare=None, expiration=None, **properties)

Consumer

class kombu.Consumer(channel, queues=None, no_ack=None,
auto_declare=None, callbacks=None, on_decode_error=None,
on_message=None, accept=None, prefetch_count=None, tag_prefix=None)
# 消费
.consume(no_ack=None)

Hello world

当收到消息的时候,除非你已经对这个message进行了相关的操作,否则像是某个消费者的通道关闭等特殊情况下,RabbitMQ不会丢失掉这个信息,如果存在其它的消费者,则丢给其它消费者,没有就扔回队列中;当然你也可以通过no_ack=True来关闭消息确认机制。

from kombu import Exchange, Queue, Connection, Consumer, Producer
task_queue = Queue(‘tasks‘, exchange=Exchange(‘tasks‘, type=‘direct‘), routing_key=‘tasks‘)
# 生产者
with Connection(‘amqp://[email protected]:5672//‘) as conn:
    with conn.channel() as channel:
        producer = Producer(channel)
        producer.publish({‘hello‘: ‘world‘},
                         retry=True,
                         exchange=task_queue.exchange,
                         routing_key=task_queue.routing_key,
                         declare=[task_queue])
def get_message(body, message):
    print(body)
    # message.ack()
# 消费者
with Connection(‘amqp://[email protected]:5672//‘) as conn:
    with conn.channel() as channel:
        consumer = Consumer(channel, queues=task_queue, callbacks=[get_message,], prefetch_count=10)
        consumer.consume(no_ack=True)

生产者和消费者相互对应,这样一个简易的消息队列就可以使用了。

任务队列

我们将创建一个工作队列,专门用来处理分配耗时的任务。原理就是将任务封装成一个消息,由客户端发送到消息队列中,而后台运行的工作进程负责弹出任务并且分配给消费者来执行任务。这种方案在一些IO密集型的情况下很有用,比如在短时间内HTTP请求窗口中无法处理复杂的任务。

  1. 我们先创建相关的exchange和queue,queues.py文件如下:

    from kombu import Exchange, Queue
    task_exchange = Exchange(‘tasks‘, type=‘direct‘)
    task_queues = [Queue(‘high‘, exchange=task_exchange, routing_key=‘high‘),
                   Queue(‘middle‘, exchange=task_exchange, routing_key=‘middle‘),
                   Queue(‘low‘, exchange=task_exchange, routing_key=‘low‘)]
    
  2. 接下来再创建消费者,worker.py文件如下:
    from kombu.mixins import ConsumerMixin
    from queues import task_queues
    # 消费者
    class Worker(ConsumerMixin):
        def __init__(self, connection):
            self.connection = connection
        def get_consumers(self, Consumer, channel):
            consumer = Consumer(queues=task_queues, callbacks=[self.process_task], accept=[‘text/plain‘, ‘json‘, ‘pickle‘])
            consumer.qos(prefetch_count=10)  # 最多一下子获取10个任务
            return [consumer]
        def process_task(self, body, message):
            fun = body[‘fun‘]; args = body[‘args‘]; kwargs = body[‘kwargs‘]
            try:
                fun(*args, **kwargs)
            except Exception as exc:
                print(exc)
                message.requeue()
            else:
                message.ack()
    if __name__ == ‘__main__‘:
        from kombu import Connection
        with Connection(‘amqp://[email protected]:5672//‘) as conn:
            try:
                worker = Worker(conn)
                worker.run()
            except KeyboardInterrupt:
                print(‘bye bye‘)
    
  3. 创建需要传递给消费者执行的任务,tasks.py如下:
    def hello_task(who=‘world‘):
        import time
        print(‘wait one second‘)
        time.sleep(1)
        print(‘Hello {}‘.format(who))
    
  4. 最后,创建生产者,client.py如下:
    from kombu.pools import producers
    from queues import task_exchange
    routing_keys = {
        ‘high‘: ‘high‘,
        ‘middle‘: ‘middle‘,
        ‘low‘: ‘low‘
    }
    # 将消息序列化后发送到队列中
    def send_as_task(connection, fun, key=‘middle‘, args=(), kwargs={}):
        payload = {‘fun‘: fun, ‘args‘: args, ‘kwargs‘: kwargs}
        routing_key = routing_keys[key]
        with producers[connection].acquire(block=True) as producer:
            producer.publish(payload, serializer=‘pickle‘, exchange=task_exchange,
                             routing_key=routing_key, declare=[task_exchange])
    if __name__ == ‘__main__‘:
        from kombu import Connection
        from tasks import hello_task
        with Connection(‘amqp://[email protected]:5672//‘) as conn:
            send_as_task(conn, fun=hello_task, args=(‘wang‘,))
    

上面的代码主要实现的是,将hello_task这个任务经过pickle序列化以后发送到指定的middle消息队列中,接着消费者(可以开多个进程)从中取出消息后再执行任务。

时间: 2024-08-03 20:04:34

RabbitMQ 使用(一)的相关文章

windows上部署rabbitmq遇到的一些问题及解决方法

在目前这家公司,刚进公司的时候接手了一个服务,算是个比较完备的服务,其中几台电脑之间通信用到了rabbitmq,一开始没出什么问题,然后后来勒索病毒wanner cry来的时候,系服把所有服务器装了一个什么杀毒软件,重启之后rabibtmq集群就出现了一些问题,经过一番学习,把这些问题都搞定了,现在做一个总结. 一开始,我按照官网的描述,把四台服务器加入了一个集群,但是不知道为什么,除了主节点外,另外三台都看不了集群状态,由于并不影响什么,就先放在那没管,其实想起来,是因为之前集群的配置文件没删

Spring rabbitMq 中 correlationId或CorrelationIdString 消费者获取为null的问题

问题 在用Spring boot 的 spring-boot-starter-amqp   快速启动 rabbitMq 是遇到了个坑 消费者端获取不到:correlationId或CorrelationIdString 问题产生的原因 correlationId 的在 spring rabbitmq 2.0 以后 byte方式会被放弃,所以 目前 代码中有些地方没有改过来,应该算一个BUG @SuppressWarnings("deprecation") public class De

在Node.js中使用RabbitMQ系列二 任务队列

在上一篇文章在Node.js中使用RabbitMQ系列一 Hello world我有使用一个任务队列,不过当时的场景是将消息发送给一个消费者,本篇文章我将讨论有多个消费者的场景. 其实,任务队列最核心解决的问题是避免立即处理那些耗时的任务,也就是避免请求-响应的这种同步模式.取而代之的是我们通过调度算法,让这些耗时的任务之后再执行,也就是采用异步的模式.我们需要将一条消息封装成一个任务,并且将它添加到任务队列里面.后台会运行多个工作进程(worker process),通过调度算法,将队列里的任

RabbitMQ 很成熟 不是阿里的

简介 官网 http://www.rabbitmq.com RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现 RabbitMQ实现了AMQP标准 安装 参考 http://www.cnblogs.com/LipeiNet/p/5973061.html 安装 erlang 有安装C运行库,给 Erlang 用的  配置环境变量 ERLANG_HOME C:\Program Files\erl8.3 安装 MQ服务器软件 3.6.9  配

杂项之rabbitmq

杂项之rabbitmq 本节内容 rabbitmq简介 AMQP协议 rabbitmq使用 应用举例 rabbitmq简介 介绍rabbitmq之前,先介绍一下AMQP协议,因为rabbitmq就是基于AMQP协议实现的一个服务程序.(目前为止应该也是唯一实现了AMQP协议的服务) AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信. arbbitmq使用erlan

RabbitMQ安装和使用(和Spring集成)

一.安装Rabbit MQ Rabbit MQ 是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang.通过下面两个连接下载安装3.2.3 版本: 下载并安装 Eralng OTP For Windows (vR16B03) 运行安装 Rabbit MQ Server Windows Installer (v3.2.3) 具体操作步骤参考:在 Windows 上安装Rabbit MQ 指南 本人遇到的问题 当安装RabbitMQ后,使用rabbitmqctl

rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较

Kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲.异步通信.汇集日志.系统解耦等方面.相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的读写性能. 针对Kafka性能方面进行简单分析,相关数据请参考:https://segmentfault.com/a/1190000003985468,下面介绍一下Kafka的架构和涉及到的名词: Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Parti

NET操作RabbitMQ组件EasyNetQ

NET操作RabbitMQ组件EasyNetQ使用中文简版文档. 本文出自EasyNetQ官方文档,内容为自己理解加翻译.文档地址:https://github.com/EasyNetQ/EasyNetQ/wiki/Quick-Start EasyNetQ简介 EasyNetQ是基于官方.NET组件RabbitMQ.Client 的又一层封装,使用起来更加方便,开发者不用关心具体队列声明,路由声明等细节,几句简单代码即可发送消息到队列,接收消息也很简单,下面将简单介绍EasyNetQ的使用方法.

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

Centos7安装rabbitmq server 3.6.0

###假设所有操作在opt目录下进行 cd /opt mkdir apps cd apps ### 下载 RabbitMQ Server wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-generic-unix-3.6.0.tar.xz ### 解压 tar -xvJf rabbitmq-server-generic-unix-3.6.0.tar.xz ### 安装 Erlang yum i