rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制。因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率。

现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务。

与celery相比

在推送任务方面比celery的delay要快,推送的任务小。

使用更简单,没那么花哨给函数加装饰器来注册函数路由。

可以满足生产了。

比之前的 使用redis原生list结构作为消息队列取代celery框架。 更好,主要是rabbitmq有消费确认的概念,redis没有,对随意关停正在运行的程序会造成任务丢失。

# -*- coding: utf-8 -*-
from collections import Callable
import time
from threading import Lock
import unittest
import rabbitpy
from pika import BasicProperties
# noinspection PyUnresolvedReferences
from rabbitpy.message import Properties
import pika
from pika.adapters.blocking_connection import BlockingChannel
from pymongo.errors import PyMongoError
from app.utils_ydf import LogManager
from app.utils_ydf.mixins import LoggerMixin
from app.utils_ydf import decorators
from app.utils_ydf import BoundedThreadPoolExecutor
from app import config as app_config

LogManager(‘pika.heartbeat‘).get_logger_and_add_handlers(1)
LogManager(‘rabbitpy‘).get_logger_and_add_handlers(2)
LogManager(‘rabbitpy.base‘).get_logger_and_add_handlers(2)

class ExceptionForRetry(Exception):
    """为了重试的,抛出错误。只是定义了一个子类,用不用都可以"""

class ExceptionForRabbitmqRequeue(Exception):
    """遇到此错误,重新放回队列中"""

class RabbitmqClientRabbitPy:
    """
    使用rabbitpy包。
    """

    # noinspection PyUnusedLocal
    def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
        rabbit_url = f‘amqp://{username}:{password}@{host}:{port}/{virtual_host}‘
        self.connection = rabbitpy.Connection(rabbit_url)

    def creat_a_channel(self) -> rabbitpy.AMQP:
        return rabbitpy.AMQP(self.connection.channel())  # 使用适配器,使rabbitpy包的公有方法几乎接近pika包的channel的方法。

class RabbitmqClientPika:
    """
    使用pika包,多线程不安全的包。
    """

    def __init__(self, username, password, host, port, virtual_host, heartbeat=60):
        """
        parameters = pika.URLParameters(‘amqp://guest:[email protected]:5672/%2F‘)

        connection = pika.SelectConnection(parameters=parameters,
                                  on_open_callback=on_open)
        :param username:
        :param password:
        :param host:
        :param port:
        :param virtual_host:
        :param heartbeat:
        """
        credentials = pika.PlainCredentials(username, password)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host, port, virtual_host, credentials, heartbeat=heartbeat))

    def creat_a_channel(self) -> BlockingChannel:
        return self.connection.channel()

class RabbitMqFactory:
    def __init__(self, username=app_config.RABBITMQ_USER, password=app_config.RABBITMQ_PASS, host=app_config.RABBITMQ_HOST, port=app_config.RABBITMQ_PORT, virtual_host=app_config.RABBITMQ_VIRTUAL_HOST, heartbeat=60, is_use_rabbitpy=1):
        """
        :param username:
        :param password:
        :param port:
        :param virtual_host:
        :param heartbeat:
        :param is_use_rabbitpy: 为0使用pika,多线程不安全。为1使用rabbitpy,多线程安全的包。
        """
        if is_use_rabbitpy:
            self.rabbit_client = RabbitmqClientRabbitPy(username, password, host, port, virtual_host, heartbeat)
        else:
            self.rabbit_client = RabbitmqClientPika(username, password, host, port, virtual_host, heartbeat)

    def get_rabbit_cleint(self):
        return self.rabbit_client

class RabbitmqPublisher(LoggerMixin):
    def __init__(self, queue_name, is_use_rabbitpy=1, log_level_int=10):
        """
        :param queue_name:
        :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika。
        :param log_level_int:
        """
        self._queue_name = queue_name
        self._is_use_rabbitpy = is_use_rabbitpy
        self.logger.setLevel(log_level_int)
        self.rabbit_client = RabbitMqFactory(is_use_rabbitpy=is_use_rabbitpy).get_rabbit_cleint()
        self.channel = self.rabbit_client.creat_a_channel()
        self.queue = self.channel.queue_declare(queue=queue_name, durable=True)
        self._lock_for_pika = Lock()
        self._lock_for_count = Lock()
        self._current_time = None
        self.count_per_minute = None
        self._init_count()
        self.logger.info(f‘{self.__class__} 被实例化了‘)

    def _init_count(self):
        with self._lock_for_count:
            self._current_time = time.time()
            self.count_per_minute = 0

    def publish(self, msg: str):
        if self._is_use_rabbitpy:
            self._publish_rabbitpy(msg)
        else:
            self._publish_pika(msg)
        self.logger.debug(f‘向{self._queue_name} 队列,推送消息 {msg}‘)
        """
        # 屏蔽统计减少加锁,能加快速度。
        with self._lock_for_count:
            self.count_per_minute += 1
        if time.time() - self._current_time > 60:
            self._init_count()
            self.logger.info(f‘一分钟内推送了 {self.count_per_minute} 条消息到 {self.rabbit_client.connection} 中‘)
        """

    @decorators.tomorrow_threads(100)
    def _publish_rabbitpy(self, msg: str):
        # noinspection PyTypeChecker
        self.channel.basic_publish(
            exchange=‘‘,
            routing_key=self._queue_name,
            body=msg,
            properties={‘delivery_mode‘: 2},
        )

    def _publish_pika(self, msg: str):
        with self._lock_for_pika:  # 亲测pika多线程publish会出错。
            self.channel.basic_publish(exchange=‘‘,
                                       routing_key=self._queue_name,
                                       body=msg,
                                       properties=BasicProperties(
                                           delivery_mode=2,  # make message persistent
                                       )
                                       )

    def clear(self):
        self.channel.queue_purge(self._queue_name)

    def get_message_count(self):
        if self._is_use_rabbitpy:
            return self._get_message_count_rabbitpy()
        else:
            return self._get_message_count_pika()

    def _get_message_count_pika(self):
        queue = self.channel.queue_declare(queue=self._queue_name, durable=True)
        return queue.method.message_count

    def _get_message_count_rabbitpy(self):
        ch = self.rabbit_client.connection.channel()
        q = rabbitpy.amqp_queue.Queue(ch, self._queue_name)
        q.durable = True
        msg_count = q.declare(passive=True)[0]
        ch.close()
        return msg_count

class RabbitmqConsumer(LoggerMixin):
    def __init__(self, queue_name, consuming_function: Callable = None, threads_num=100, max_retry_times=3, log_level=10, is_print_detail_exception=True, msg_schedule_time_intercal=0.0, is_use_rabbitpy=1):
        """
        :param queue_name:
        :param consuming_function: 处理消息的函数,函数有且只能有一个参数,参数表示消息。是为了简单,放弃策略和模板来强制参数。
        :param threads_num:
        :param max_retry_times:
        :param log_level:
        :param is_print_detail_exception:
        :param msg_schedule_time_intercal:消息调度的时间间隔,用于控频
        :param is_use_rabbitpy: 是否使用rabbitpy包。不推荐使用pika.
        """
        self._queue_name = queue_name
        self.consuming_function = consuming_function
        self._threads_num = threads_num
        self.threadpool = BoundedThreadPoolExecutor(threads_num)
        self._max_retry_times = max_retry_times
        self.logger.setLevel(log_level)
        self.logger.info(f‘{self.__class__} 被实例化‘)
        self._is_print_detail_exception = is_print_detail_exception
        self._msg_schedule_time_intercal = msg_schedule_time_intercal
        self._is_use_rabbitpy = is_use_rabbitpy

    def start_consuming_message(self):
        if self._is_use_rabbitpy:
            self._start_consuming_message_rabbitpy()
        else:
            self._start_consuming_message_pika()

    @decorators.tomorrow_threads(100)
    @decorators.keep_circulating(1)  # 是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
    def _start_consuming_message_rabbitpy(self):
        # noinspection PyArgumentEqualDefault
        channel = RabbitMqFactory(is_use_rabbitpy=1).get_rabbit_cleint().creat_a_channel()  # type:  rabbitpy.AMQP         #
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=self._threads_num)
        for message in channel.basic_consume(self._queue_name):
            body = message.body.decode()
            self.logger.debug(f‘从rabbitmq取出的消息是:  {body}‘)
            time.sleep(self._msg_schedule_time_intercal)
            self.threadpool.submit(self._consuming_function_rabbitpy, message)

    def _consuming_function_rabbitpy(self, message: rabbitpy.message.Message, current_retry_times=0):
        if current_retry_times < self._max_retry_times:
            # noinspection PyBroadException
            try:
                self.consuming_function(message.body.decode())
                message.ack()
            except Exception as e:
                if isinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)):
                    return message.nack(requeue=True)
                self.logger.error(f‘函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,\n 原因是 {type(e)}  {e}‘, exc_info=self._is_print_detail_exception)
                self._consuming_function_rabbitpy(message, current_retry_times + 1)
        else:
            self.logger.critical(f‘达到最大重试次数 {self._max_retry_times} 后,仍然失败‘)  # 错得超过指定的次数了,就确认消费了。
            message.ack()

    @decorators.tomorrow_threads(100)
    @decorators.keep_circulating(1)  # 是为了保证无论rabbitmq异常中断多久,无需重启程序就能保证恢复后,程序正常。
    def _start_consuming_message_pika(self):
        channel = RabbitMqFactory(is_use_rabbitpy=0).get_rabbit_cleint().creat_a_channel()  # 此处先固定使用pika.
        channel.queue_declare(queue=self._queue_name, durable=True)
        channel.basic_qos(prefetch_count=self._threads_num)

        def callback(ch, method, properties, body):
            body = body.decode()
            self.logger.debug(f‘从rabbitmq取出的消息是:  {body}‘)
            time.sleep(self._msg_schedule_time_intercal)
            self.threadpool.submit(self._consuming_function_pika, ch, method, properties, body)

        channel.basic_consume(callback,
                              queue=self._queue_name,
                              # no_ack=True
                              )
        channel.start_consuming()

    @staticmethod
    def __ack_message_pika(channelx, delivery_tagx):
        """Note that `channel` must be the same pika channel instance via which
        the message being ACKed was retrieved (AMQP protocol constraint).
        """
        if channelx.is_open:
            channelx.basic_ack(delivery_tagx)
        else:
            # Channel is already closed, so we can‘t ACK this message;
            # log and/or do something that makes sense for your app in this case.
            pass

    def _consuming_function_pika(self, ch, method, properties, body, current_retry_times=0):
        if current_retry_times < self._max_retry_times:
            # noinspection PyBroadException
            try:
                self.consuming_function(body)
                ch.basic_ack(delivery_tag=method.delivery_tag)
                # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))
            except Exception as e:
                if isinstance(e, (PyMongoError, ExceptionForRabbitmqRequeue)):
                    return ch.basic_nack(delivery_tag=method.delivery_tag)
                self.logger.error(f‘函数 {self.consuming_function}  第{current_retry_times+1}次发生错误,\n 原因是 {type(e)}  {e}‘, exc_info=self._is_print_detail_exception)
                self._consuming_function_pika(ch, method, properties, body, current_retry_times + 1)
        else:
            self.logger.critical(f‘达到最大重试次数 {self._max_retry_times} 后,仍然失败‘)  # 错得超过指定的次数了,就确认消费了。
            ch.basic_ack(delivery_tag=method.delivery_tag)
            # self.rabbitmq_helper.connection.add_callback_threadsafe(functools.partial(self.ack_message, ch, method.delivery_tag))

# noinspection PyMethodMayBeStatic
class _Test(unittest.TestCase):
    def test_publish(self):
        rabbitmq_publisher = RabbitmqPublisher(‘queue_test‘, is_use_rabbitpy=1, log_level_int=10)
        [rabbitmq_publisher.publish(str(msg)) for msg in range(2000)]

    def test_consume(self):
        def f(body):
            print(‘....  ‘, body)
            time.sleep(10)  # 模拟做某事需要阻塞10秒种,必须用并发。

        rabbitmq_consumer = RabbitmqConsumer(‘queue_test‘, consuming_function=f, threads_num=200, is_use_rabbitpy=1, msg_schedule_time_intercal=0.5)
        rabbitmq_consumer.start_consuming_message()

if __name__ == ‘__main__‘:
    unittest.main()

原文地址:https://www.cnblogs.com/ydf0509/p/10272375.html

时间: 2024-11-09 03:43:47

rabbitmq消费端加入精确控频。的相关文章

RabbitMQ消费端限流策略(十)

消费端限流: 什么是消费端限流? 场景: 我们RabbitMQ服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据.(导致服务器崩溃,线上故障) 生产端一次推送几百条数据库,客户端只接收一两条,在高并发的情况下,不能再生产端做限流,只能在消费端处理. 解决方法: RabbitMQ提供了一种qos(服务质量保证)功能,在非自动确认消息的前提下, 如果一定数据的消息(通过基于consumer或者channel

RabbitMQ消费端自定义监听(九)

场景: 我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理. 实际环境: 我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用. 操作: //生产端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory

RabbitMQ消费端自定义监听器DefaultConsumer

消费者 package com.flying.rabbitmq.api.consumer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 自定义消费者类型 */ public class Consumer { public static void main(String[] args) th

rabbitmq消费端的nack和重回队列的总结

重回队列模式,是当投递消息失败时,让该消息重新回到队列的模式,该模式需要手动签收,并需要在消费者中进行判断,调用重回队列的确认模式 消费者 package com.flying.rabbitmq.api.ack; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Consumer

RabbitMQ消费端消息的获取方式(.Net Core)

1[短链接]:BasicGet(String queue, Boolean autoAck) 通过request的方式独自去获取消息,断开式,一次次获取,如果返回null,则说明队列中没有消息. 隐患:每次获取消息都会创建channel. 优点:最安全的获取方式且性能不算太差. 2[长链接]: 1).EventingBasicConsumer[订阅式] 使用这种方式消息会全部打入当前消费者中,不管是否启用确认机制. 隐患:①根据消息的长短多少将影响当前消费者的占用资源. ②如果当前消费者挂掉,那

消费端限流策略

使用场景 首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现如下情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据! Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consumer或者channel设置qos的值)未被确认前,不进行消费新的消息. 具体方法 void BasicQos(unit prefetchSize, ushort prefetchCount,

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起

小米新旗舰“翻车” 冲击中高端凸显品控短板(小米的缺点还真不少:电商、性价比、爆款、粉丝经济,说到底也都只是商业上的创新)

小米新旗舰“翻车” 冲击中高端凸显品控短板 按照消费者的理解,旗舰手机应该是绝大部分用户在手机选购上的终极选择,任何产品一旦定位旗舰市场,必定有顶级的价格和顶级的使用体验,功能上无所不能,质量上更是坚若磐石.然而,2017年小米手机有点烦,其刚刚发布的小米手机6是小米今年的重磅中高端新品,也是小米重塑品牌形象再次冲击中高端市场的关键,不过上市以来却出现了充电重启.WiFi断流等问题. 实际上,小米从2015年推出小米Note系列起就开始向中高端进军,只是效果一直不理想.既要有销量又要有口碑,从这

js组件开发-移动端地区选择控件mobile-select-area

移动端地区选择控件mobile-select-area 由于之前的[js开源组件开发]js手机联动选择地区仿ios 开源git 很受欢迎,于是我又对其进行了一些优化,包括可选的范围变大了,添加了默认空首地址的功能,也添加了更多api参数,首先我们先来看下这次的效果图. 它的github地址请点击https://github.com/tianxiangbing/mobile-select-area 它的demo演示请点击 http://www.lovewebgames.com/jsmodule/m