RabbitMQ使用场景_002_工作队列

工作队列

利用轮循分配来消费任务信息(竞争消费者模式)

背后的主要思想工作队列(又名:任务队列)是为了避免立即做一个资源密集型任务,不得不等待它完成。相反,我们安排以后的任务要做。我们封装任务作为消息并将其发送到一个队列。一个工作进程在后台运行将流行的任务和最终执行这项工作。当您运行许多消费者的任务将在他们之间共享。

循环调度与公平的分配

使用一个任务队列的优点之一是能够轻易并行化"parallelise"工作。如果建立一个任务队列并添加多个消费者,,RabbitMQ将发送每个消息到下一个消费者,在序列中平均每个消费者将获得相同数量的信息。这种分发消息方式称为循环。

消息答复

为了确保消息不会丢失,RabbitMQ支持消息应答。发送ack(nowledgement)消费者告诉RabbitMQ特定的消息已经收到,RabbitMQ可以处理删除它。消息确认默认是关闭的。可以把他们的第四个参数设置为basic_consume为false(true意味着没有ack)消费队列一旦完成一个任务就发送适当的确认应答。

消息的持久性

知道如何确保即使消费者死亡任务也不会丢失。但是RabbitMQ服务器挂了的话任务消息仍旧会丢失。

任务队列和消费队列

1. 任务队列 tasker.php

require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建好服务器连接
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
// 创建"channel"通道和声明队列名和发送消息到队列中
$channel = $connection->channel();
//声明队列名
$queue  = 'task_queue';

/*
    name: 队列名称
    passive: false
    durable: true //服务器重启后队列依旧存活
    exclusive: false //队列能被其他channel访问
    auto_delete: false //channel关闭之后队列不删除
*/
$channel->queue_declare($queue, false, true, false, false);

//实例化一个消息,并设置消息持久化
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

$channel->basic_publish($msg, '', $queue);

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

1. 工作队列 worker.php

require_once __DIR__ . '/config.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建好服务器连接
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
// 创建"channel"通道和声明队列名和发送消息到队列中
$channel = $connection->channel();
//声明队列名
$queue  = 'task_queue';

/*
    name: 队列名称
    passive: false
    durable: true //服务器重启后队列依旧存活
    exclusive: false //队列能被其他channel访问
    auto_delete: false //channel关闭之后队列不删除
*/
$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

/**
 * prefetch_count = 1设置。这告诉RabbitMQ不同时给多个消息到同一个消费者
 * 也就是说 在消息未处理完成前不分配新的任务给消费者
 */
$channel->basic_qos(null, 1, null);

/*
    消费消息
    queue: 制定队列
    consumer_tag: Consumer identifier
    no_local: Don't receive messages published by this consumer.
    no_ack: 服务器是否消息确认,默认为true是关闭的
    exclusive: 独占该消息,只有该channel才能消费这条消息
    nowait:
    callback: 回调函数
*/
$channel->basic_consume($queue, '', false, false, false, false, $callback);

// 循环监听回调
while(count($channel->callbacks)) {
    $channel->wait();
}

// 最后关闭通道和连接
$channel->close();
$connection->close();

在命令行(终端)运行3个脚本

同时开两个终端,运行:

php receive.php

运行发送消息脚本

php send.php
时间: 2024-10-14 07:14:57

RabbitMQ使用场景_002_工作队列的相关文章

RabbitMQ(二) ——工作队列

RabbitMQ(二) --工作队列 (转载请附上本文链接--linhxx) 一.概述 工作队列模式(work queue),是有多个消费者的情况下,可以共同消费队列内的内容,加快消息处理速度.这是RabbitMQ的基本工作模式. 二.使用方式 和上一篇中的生产和消费消息的方式一样,就是需要多在cli进程中打开一个消费者的php文件.即需要打开3个php,一个是生产者的php文件,两个消费者的php文件(或多个php文件). 三.工作机制 3.1 轮询(Round-robin dispatchi

RabbitMQ --- Work Queues(工作队列)

目录 RabbitMQ --- Hello Mr.Tua 前言 Work Queues 即工作队列,它表示一个 Producer 对应多个 Consumer,包括两种分发模式:轮循分发(Round-robin)和公平分发(Fair dispatch).旨在为了避免立即执行任务时出现占用很多资源和时间却又必须等待完成的现象. 原理分析: Producer 把工作任务转化为消息发送给队列,当后台有一个 Consumer 进程在运行时,它会不间断地从队列中取出消息来执行:当后台有多个 Consumer

2. RabbitMQ 之Work Queues (工作队列)

在上一篇揭开RabbitMQ的神秘面纱一文中,我们编写了程序来发送和接收来自命名队列的消息. 本篇我们将会创建一个 Work Queue(工作队列) 来使用分发任务在多个任务中. 前提:本教程假定RabbitMQ 已在标准端口(15672)上的localhost上安装并运行.如果您使用不同的主机,端口或凭据,则需要调整连接设置. 1. Work Queue 工作队列 工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成. 相反,我们安排任务稍后完成.我们将任务

RabbitMQ (二)工作队列 -摘自网络

这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务.工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成.相反地,我们进行任务调度:我们把任务封装为消息发送给队列.工作进行在后台运行并不断的从队列中取出任务然后执行.当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行. 这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务. 1. 准备 我们使用Thread.sleep来模拟耗时的任务.我们在发送到队列的消息的末尾添加

RabbitMQ应用场景及原理

转载自:http://blog.csdn.net/whoamiyang/article/details/54954780 1.背景 RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现. 2.应用场景 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式 (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端. 这有一个问题是,邮件

rabbitmq消息队列学习——"工作队列"

二."工作队列" 在第一节中我们发送接收消息直接从队列中进行.这节中我们会创建一个工作队列来分发处理多个工作者中的耗时性任务. 工作队列主要是为了避免进行一些必须同步等待的资源密集型的任务.实际上我们将这些任务时序话稍后分发完成.我们将某个任务封装成消息然后发送至队列,后台运行的工作进程将这些消息取出然后执行这些任务.当你运行多个工作进程的时候,这些任务也会在它们之间共享. 前期准备 上一节的练习中我们发送的是简单包含"Hello World!"的消息,这节我们还发

RabbitMQ官网教程---工作队列

(使用python的pika 0.9.8客户端) 在第一个教程中,我们写了一个从命名的队列中发送和接收消息的程序.在这一个里面,我们将创建一个Work Queue来用于在多个工作者之间分类耗时任务. Work Queues后面的主要思想是避免理解做一些资源密集的任务并且需要等待它完成.我们用计划任务在后面完成它.我们把一个任务封装为一个消息发送给队列.一个在后台执行的工作者队列将弹出任务并且完全的执行这个工作.当你运行许多工作者的时候,这个任务将在它们之间共享. 这种概念在web应用程序中是特别

RabbitMQ指南(C#)(二)工作队列

上一节我们实现了向指定的队列发送和接收消息.这一节,我们主要讲工作队列,用于在多个消费者之间分配置实时任务. 工作队列方式主要是为了防止在执行一个耗费资源的任务时,要等待其结束才能处理其它事情.我们将任务的执行延迟,将其封装成一个消息,然后发送给一个列队.后台再运行一个程序从队列里取出消息,然后执行任务.如果有多个消费者,还可以分享任务. 对于Web应用程序来说,这样就可以使用Http的短请求来处理复杂的业务. 准备 我们发送一个字符串来代表复杂的任务,然后用Thread.Sleep()来模拟耗

rabbitmq系列二 之工作队列

---恢复内容开始--- 1.工作队列的简介 在上一篇中,我们已经写了一个从已知队列中发送和获取消息的程序,在这里,我们创建一个工作队列(work queue), 会发送一些耗时的任务给多个工作者.模型图如下: 工作队列,由称为任务队列(task queue), 主要是为了避免一些占用大量资源,时间的操作.当我们把任务(task)当作消息发送到队列时, 一个运行在后台的工作者(worker),当你运行多个工作者,任务就会在它们之间共享. 这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请