rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。)

rabbitmq作为消息队列可以有消息消费确认机制,redis的list结构可以简单充当消息队列,但不具备消费确认机制,随意关停程序,会丢失一部分正在程序中处理但还没执行完的消息。

使用rabbitmq的最常用库pika

# coding=utf-8
"""
一个通用的rabbitmq生产者和消费者。使用多个线程消费同一个消息队列。
"""
import abc
import functools
import time
from threading import Lock
from pika import BasicProperties
# noinspection PyUnresolvedReferences
from app.utils_ydf import (LoggerMixin, LogManager, decorators, RabbitMqHelper, BoundedThreadPoolExecutor)

class RabbitmqPublisher(LoggerMixin):
    def __init__(self, queue_name):
        self._queue_name = queue_name
        channel = RabbitMqHelper().creat_a_channel()
        channel.queue_declare(queue=queue_name, durable=True)
        self.channel = channel
        self.lock = Lock()

    def publish(self, msg):
        with self.lock:
            self.channel.basic_publish(exchange=‘‘,
                                       routing_key=self._queue_name,
                                       body=msg,
                                       properties=BasicProperties(
                                           delivery_mode=2,  # make message persistent
                                       )
                                       )
            self.logger.debug(f‘放入 {msg} 到 {self._queue_name} 队列中‘)

class RabbitmqConsumer(LoggerMixin, ):
    def __init__(self, queue_name, consuming_function=None, threads_num=100, max_retry_times=3, log_level=1, is_print_detail_exception=True):
        """
        :param queue_name:
        :param consuming_function: 处理消息的函数,函数有且只能有一个参数,参数表示消息。
        :param threads_num:
        :param max_retry_times:
        :param log_level:
        :param is_print_detail_exception:
        """
        self._queue_name = queue_name
        self.consuming_function = consuming_function
        self.threadpool = BoundedThreadPoolExecutor(threads_num)
        self._max_retry_times = max_retry_times
        self.logger.setLevel(log_level * 10)
        self.logger.info(f‘{self.__class__} 被实例化‘)
        self._is_print_detail_exception = is_print_detail_exception
        self.rabbitmq_helper = RabbitMqHelper(heartbeat_interval=30)
        channel = self.rabbitmq_helper.creat_a_channel()
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=threads_num)
        self.channel = channel
        LogManager(‘pika.heartbeat‘).get_logger_and_add_handlers(1)

    @decorators.keep_circulating(1)    # 是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
    def start_consuming_message(self):
        def callback(ch, method, properties, body):
            msg = body.decode()
            self.logger.debug(f‘从rabbitmq取出的消息是:  {msg}‘)
            # ch.basic_ack(delivery_tag=method.delivery_tag)
            self.threadpool.submit(self.__consuming_function, ch, method, properties, msg)

        self.channel.basic_consume(callback,
                                   queue=self._queue_name,
                                   # no_ack=True
                                   )
        self.channel.start_consuming()

    @staticmethod
    def ack_message(channelx, delivery_tagx):
        """Note that `channel` must be the same pika channel instance via which
        the message being ACKed was retrieved (AMQP protocol constraint).
        """
        if channelx.is_open:
            channelx.basic_ack(delivery_tagx)
        else:
            # Channel is already closed, so we can‘t ACK this message;
            # log and/or do something that makes sense for your app in this case.
            pass

    def __consuming_function(self, ch, method, properties, msg, current_retry_times=0):
        if current_retry_times < self._max_retry_times:
            # noinspection PyBroadException
            try:
                self.consuming_function(msg)
                # ch.basic_ack(delivery_tag=method.delivery_tag)
                self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
            except Exception as e:
                self.logger.error(f‘函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,\n 原因是{e}‘, exc_info=self._is_print_detail_exception)
                self.__consuming_function(ch, method, properties, msg, current_retry_times + 1)
        else:
            self.logger.critical(f‘达到最大重试次数 {self._max_retry_times} 后,仍然失败‘)
            # ch.basic_ack(delivery_tag=method.delivery_tag)
            self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))

if __name__ == ‘__main__‘:
    rabbitmq_publisher = RabbitmqPublisher(‘queue_test‘)
    [rabbitmq_publisher.publish(str(i)) for i in range(1000)]

    def f(msg):
        print(‘....  ‘, msg)
        time.sleep(10)  # 模拟做某事需要10秒种。

    rabbitmq_consumer = RabbitmqConsumer(‘queue_test‘, consuming_function=f, threads_num=20)
    rabbitmq_consumer.start_consuming_message()

原文地址:https://www.cnblogs.com/ydf0509/p/10142922.html

时间: 2024-10-01 03:12:27

rabbitmq 生产者 消费者(多个线程消费同一个队列里面的任务。)的相关文章

生产者消费者模型中线程怎样正常退出

生产者:不停地往队列中放数据 消费者:不停地从队列中拿数据 两者通过两个信号量同步 当生产者不再生产数据时,消费者正好挂在一个信号量上,处于睡眠状态,这时候pthread_join也会一直挂着的.该怎样使得消费者正常退出呢? 我的做法是让生产者在往队列中放一个[结束数据],也就是一个标识,消费者拿到数据后,如果这个数据是结束标识则自杀退出. 生产者消费者模型中线程怎样正常退出

生产者消费者问题-05-多线程

1 // 2 // ViewController.m 3 // 06-生产者消费者问题 4 // 5 // Created by mac on 16/4/20. 6 // Copyright © 2016年 mac. All rights reserved. 7 // 8 9 /*生产者消费者处理线程同步问题 10 思路: 11 1).生产者要取得锁,然后生产(去库房中放其生产的商品),如果库房满了,则wait,这时释放锁.直到有线程唤醒它再去生产:如果没有满,则生产商品后发送signal(消息

Android-Java多线程通讯(生产者 消费者)&amp;10条线程对-等待唤醒/机制的管理

上一篇博客 Android-Java多线程通讯(生产者 消费者)&等待唤醒机制 是两条线程(Thread-0 / Thread-1) 在被CPU随机切换执行: 而今天这篇博客是,在上一篇博客Android-Java多线程通讯(生产者 消费者)&等待唤醒机制 的基础上,扩大规模增加10条线程去执行 生产者 消费者: 注意:?? 上一篇博客是两条线程在执行(生产者 消费者)例如:当Thread-0 锁.wait(); 等待 冻结后,  Thread-1 锁.notify(); 唤醒的一定是 T

多线程,生产者消费者模型(生产馒头,消费馒头)

先建立一个容器 /** * 容器 * 共享资源 * @author Administrator * */ public class SynStack { int index = 0; //容器 SteamBread[] stb = new SteamBread[6]; /** * 往容器中放产品 */ public synchronized void push(SteamBread st){ while(index == stb.length){ try { this.wait(); } cat

RabbitMQ生产者消费者

package com.ra.car.rabbitMQ; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.

生产者消费者模型中线程如何正常退出

生产者:不停地往队列中放数据 消费者:不停地从队列中拿数据 两者通过两个信号量同步 当生产者不再生产数据时,消费者正好挂在一个信号量上,处于睡眠状态.这时候pthread_join也会一直挂着的.该如何使得消费者正常退出呢? 我的做法是让生产者在往队列中放一个[结束数据],也就是一个标识,消费者拿到数据后,假设这个数据是结束标识则自杀退出.

转: 【Java并发编程】之十三:生产者—消费者模型(含代码)

转载请注明出处:http://blog.csdn.net/ns_code/article/details/17249321 生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据. 这里实现如下情况的生产--消费模型: 生产者不断交替地生产两组数据"姓名--1 --> 内容--1","姓名--2--> 内容--2",消费者不断交替地取得这两组数据,这里的"姓名--1&quo

Thinking in Java---线程通信+三种方式实现生产者消费者问题

前面讲过线程之间的同步问题:同步问题主要是为了保证对共享资源的并发访问不会出错,主要的思想是一次只让一个线程去访问共享资源,我们是通过加锁的方法实现.但是有时候我们还需要安排几个线程的执行次序,而在系统内部线程的调度是透明的,没有办法准确的控制线程的切换.所以Java提供了一种机制来保证线程之间的协调运行,这也就是我们所说的线程调度.在下面我们会介绍三种用于线程通信的方式,并且每种方式都会使用生产者消费者问题进行检验. 一.使用Object类提供的线程通信机制 Object类提供了wait(),

Java并发编程】之十三:生产者—消费者模型(含代码)

生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据. 这里实现如下情况的生产--消费模型: 生产者不断交替地生产两组数据"姓名--1 --> 内容--1","姓名--2--> 内容--2",消费者不断交替地取得这两组数据,这里的"姓名--1"和"姓名--2"模拟为数据的名称,"内容--1 "和"内容--2 &q