RabbitMQ(二) ——工作队列

RabbitMQ(二)

——工作队列

(转载请附上本文链接——linhxx)

一、概述

工作队列模式(work queue),是有多个消费者的情况下,可以共同消费队列内的内容,加快消息处理速度。这是RabbitMQ的基本工作模式。

二、使用方式

和上一篇中的生产和消费消息的方式一样,就是需要多在cli进程中打开一个消费者的php文件。即需要打开3个php,一个是生产者的php文件,两个消费者的php文件(或多个php文件)。

三、工作机制

3.1 轮询(Round-robin dispatching)

当开启多个生产者的时候,消费者产生消息并发送到队列的情况下,队列会将消息均衡的分发给同时打开的多个消费者。即采用轮询的方式,假设有两个消费者c1和c2,第一次有消息给c1,第二次有消息给c2,第三个再给c1,以此类推。

3.2 回馈机制(Message acknowledgment)

为了保证消息的可靠性,RabbitMQ允许用户采用消费者的ack机制,即只有消费者回馈给队列ack后,队列才会将消息从队列中剔除。这样,可以确保队列中的每个消息都是确认被消费者处理完毕的。

开启的方式,只要将方法basic_consume第四个参数设置成false,就表示开启ack机制。

开启ack,就必须要记得在消费者的代码总,加入回馈的代码,否则,消息会被队列认为没有消费,不断的堵在队列中,导致队列堵塞。

要查看队列中还没确认的内容,可以采用RabbitMQ的管理工具——rabbitmqctl。

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

3.3 消息持久化(Message durability)

RabbitMQ具有完善的持久化机制,能够确保消息的安全性。可以在代码中开启持久化。消息持久化需要在队列和消息分别开启持久化。

1)队列持久化:

queue_declare的第三个参数设置为true。

2)消息持久化:

$msg = new AMQPMessage($data,

array(‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSISTENT)

);

这样可以确保消息可以保存在本地磁盘,因此即使rabbitmq的服务器宕机的时候,也可以保证消息的安全性。

当然,这也也存在一定的风险,因为操作系统自身的机制,开启持久化的时候,操作系统为了保证运作速度,消息会保存在操作系统层面的缓存,并且定时将消息存入硬盘。因此,还没存入磁盘的这一小段间隔(30秒左右),如果服务器宕机,有可能消息丢失。但是这个可能性很低,因此安全性已经很高。

3.4 公平分发(Fair dispatch)与预取机制(prefetch)

消息的轮询机制,每次有消息,队列则直接按照排好的顺序将消息传给对应的队列,有可能出现一部分队列上一条消息还没处理完,就出现了下一条的消息,而另外一部分队列则空闲的情况。

因此,RabbitMQ允许用户开启公平分发机制,让每个消费者只能接收一个消息,如果还没给队列回复ack,则消息来的时候,即使顺序轮到队列,也不会分发给它,而是分发给它下一个空队列。

开启方式很简单,在消费者的channel,在消费消息之前(使用basic_consume方法),给channel加一个qos机制:

$channel->basic_qos(null, 1, null);

第二个参数就是要求消费者每次只能处理几个消息。

这样也存在一个隐患,即如果所有的消费者都还没ack,而生产者又不断的往队列发数据,则队列有可能会塞满。这个处理方案只有增加消费者,或者从代码、逻辑层面控制消息的产生速度。

——written by linhxx

更多最新文章,欢迎关注微信公众号“决胜机器学习”,或扫描右边二维码。

原文地址:https://www.cnblogs.com/linhxx/p/8434165.html

时间: 2024-08-28 11:18:11

RabbitMQ(二) ——工作队列的相关文章

RabbitMQ之工作队列

工作队列 工作队列(又称:任务队列Task Queues)是为了避免等待一些占用大量资源.时间的操作,当我们把任务Task当做消息发送队列中,一个运行在后台的工作者worker进程就会取出任务然后处理. 当有多个works,任务在它们之间共享 创建任务 创建任务的new_task.py #!/usr/bin/env python #-*- coding:utf8 -*- import sys import pika import logging logging.basicConfig(forma

RabbitMQ(二):理解消息通信RabbitMQ

原文:RabbitMQ(二):理解消息通信RabbitMQ 一.消费者.生产者和信道 生产者(producer):生产者创建消息,然后发布(发送)到代理服务器(RabbitMQ),可以说发送消息的程序就是生产者.什么是消息?消息包含两部分:有效载荷和标签.有效载荷就是传输的数据,可以是任何内容,包括json数据和图片等等.而标签(一个叫交换器名称和可选的主题标记)描述了有效载荷,RabbitMQ用它来决定谁将获得这个消息. 消费者(consumer):消费者就是接收消息并处理消息的程序,他们连接

RabbitMQ指南(C#)(二)工作队列

上一节我们实现了向指定的队列发送和接收消息.这一节,我们主要讲工作队列,用于在多个消费者之间分配置实时任务. 工作队列方式主要是为了防止在执行一个耗费资源的任务时,要等待其结束才能处理其它事情.我们将任务的执行延迟,将其封装成一个消息,然后发送给一个列队.后台再运行一个程序从队列里取出消息,然后执行任务.如果有多个消费者,还可以分享任务. 对于Web应用程序来说,这样就可以使用Http的短请求来处理复杂的业务. 准备 我们发送一个字符串来代表复杂的任务,然后用Thread.Sleep()来模拟耗

【译】RabbitMQ:工作队列(Work Queue)

在第一篇我们写了两个程序通过一个命名的队列分别发送和接收消息.在这一篇,我们将创建一个工作队列在多个工作线程间分发耗时的工作任务. 工作队列的核心思想是避免立刻处理资源密集型任务导致必须等待其执行完成.相反的,我们安排这些任务在稍晚的时间完成.我们将一个任务封装为一个消息并把它发送到队列中.一个后台的工作线程将从队列中取出任务并最终执行.当你运行多个工作线程,这些任务将在这些工作线程间共享. 这个概念对于在一个HTTP请求中处理复杂任务的Web应用尤其有用. 准备工作 在前一篇中,我们发送了一条

一个winform带你玩转rabbitMQ(二)

接上一篇内容 安装,简介和初探 下面我们接着来学习下RabbitMQ 一.  exchange属性 Type 前一章我们说了exchange的类型分为fanout,direct,topic.还有一种不常用的headers. headers这种类型的exchange绑定的时候会忽略掉routingkey,Headers是一个键值对,可以定义成成字典等.发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息.匹配有两种方式all和any.这两

RabbitMQ 笔记-工作队列

工作队列的主要思想是不用等待资源密集型的任务处理完成, 为了确保消息或者任务不会丢失,rabbitmq 支持消息确信 ACK.ACK机制是消费者端从rabbitmq收到消息并处理完成后,反馈给rabbitmq,rabbitmq收到反馈信息后将消息从队列中删除 如果rabbitmq向消费者改善消息时,消费者服务器挂了,消息也不会超时,即使一个消息需要非常长的时间处理,也不会导致消息超时,永远不会从rabbitmq中删除, 忘记通过basicAck返回确认信息是个严重的错误 rabbitmq不允许重

RabbitMQ(二):mandatory标志的作用

本文转自:http://m.blog.csdn.net/article/details?id=54311277 在生产者通过channel的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看Channel接口,会发现存在3个重载的basicPublish方法 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[]

rabbitmq(二)原理

一.基本概念1.1 可以看到提供方提供一个Broker(消息队列实体)当中的虚拟主机->>包含了Exchange(交换器)通过binding绑定一个队列Queue 客户端再通过连接不同渠道(Channel)给客户端提供消息而一个消息队列又分几种模式 1.2VirtualHost虚拟主机.表示一批交换器,消息队列和相关对象.虚拟主机是共享相同的身份认证和加密环境的独立服务器域.每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列.交换器.绑定和权限机制.vh

RabbitMQ-linux安装rabbitmq(二)

说明 本地装了个虚拟机模拟集群 所以记下安装步骤 安装Erlang 安装类库 yum -y install ncurses-devel yum -y install openssl-devel yum -y install unixODBC-devel yum -y install gcc-c++ 下载otp_src资源包并安装 1.下载资源包(可以通过-P ~/download file.name 指定下载文件地址默认再~目录) wget http://erlang.org/download/

SpringBoot整合RabbitMq(二)

       本文序列化和添加package参考:https://www.jianshu.com/p/13fd9ff0648d RabbitMq安装 [[email protected] ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE elasticsearch latest 874179f19603 11 days ago 771 MB springbootdemo4docker latest cd13bc7f56a0 2 week