php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

1、AMQP_EX_TYPE_DIRECT:直连型

直连型又包括: 1对1 和1对N(N对1、 N对N)

接收端receive.php代码如下

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

<!--?php

$connect =newAMQPConnection();

$connect--->connect();

$channel =newAMQPChannel($connect);

$exchange =newAMQPExchange($channel);

$exchange->setName(‘exchange‘);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

$queue =newAMQPQueue($channel);

$queue->setName(‘logs‘);

$queue->declare();

$queue->bind(‘exchange‘,‘logs‘);

while(true) {

$queue->consume(‘callback‘);

}

$connection->close();

function callback($envelope, $queue) {

var_dump($envelope->getBody());

$queue->nack($envelope->getDeliveryTag());

}

发送端send.php代码如下

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

<!--?php

$connect =newAMQPConnection();

$connect--->connect();

$channel =newAMQPChannel($connect);

$exchange =newAMQPExchange($channel);

$exchange->setName(‘exchange‘);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

$exchange->publish(‘direct type test‘,‘logs‘);

var_dump("Send Message OK");

$connect->disconnect();

运行结果如图所示

创建receive_one.php和receive_two.php 并把send.php代码改成如下代码方便我们观看 receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

<!--?php

$connect =newAMQPConnection();

$connect--->connect();

$channel =newAMQPChannel($connect);

$exchange =newAMQPExchange($channel);

$exchange->setName(‘exchange‘);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

$queue =newAMQPQueue($channel);

$queue->setName(‘logs‘);

@$queue->declare();

$queue->bind(‘exchange‘,‘logs‘);

while(true) {

$queue->consume(‘callback‘);

}

$connection->close();

function callback($envelope, $queue) {

var_dump($envelope->getBody());

$queue->nack($envelope->getDeliveryTag());

}

send.php

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

<!--?php

$connect =newAMQPConnection();

$connect--->connect();

$channel =newAMQPChannel($connect);

$exchange =newAMQPExchange($channel);

$exchange->setName(‘exchange‘);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->declare();

for($index =1; $index <5; $index++) {

$exchange->publish($index,‘logs‘);

var_dump("Send:$index");

}

$exchange->delete();

$connect->disconnect();

运行结果如下

列队会把消息分配给每一个接收端分配处理这里看似完美但是如果想要更好的处理不同的任务就需要
公平调度

比如当1、3处理的都是简单的人 2、4都是处理的复杂的任务 如果任务过多时 receive_one.php是空闲的而receive_two.php是任务繁重的 我们进行如下测试 send.php改成5改成50

?


1

2

3

4

for($index =1; $index <50; $index++) {

$exchange->publish($index,‘logs‘);

var_dump("Send:$index");

}

receive_two.php 加上 sleep(3)

?


1

2

3

4

5

function callback($envelope, $queue) {

var_dump($envelope->getBody());

sleep(3);

$queue->nack($envelope->getDeliveryTag());

}

我们运行程序结果如下

receive_one全部运行完而receive_two才运行一个 之后receive_one一直空闲 我们可以通过 在接收端设置 $channel->setPrefetchCount(1);

任务没人完成前不接收新的消息把消息发送给其他接收端

如下receive_one.php 和 receive_two.php

?


1

$channel =newAMQPChannel($connect);

改成如下

?


1

2

$channel =newAMQPChannel($connect);

$channel->setPrefetchCount(1);

时间: 2024-11-24 01:45:34

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)的相关文章

消息队列rabbitmq的五种工作模式(go语言版本)

前言:如果你对rabbitmq基本概念都不懂,可以移步此篇博文查阅消息队列RabbitMQ 一.单发单收 二.工作队列Work Queue 三.发布/订阅 Publish/Subscribe 四.路由Routing 五.Topic类型的exchange 六.rabbitmq部分封装代码及装备工作 一.单发单收 在下图中,“ P”是我们的生产者,“ C”是我们的消费者.中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区. 单发单收模式下:一发一收 发送端只需要创建队列,然后向队列发送消

消息队列RabbitMQ

消息队列RabbitMQ 一.RabbitMQ是什么? AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.J

消息队列--RabbitMQ(一)

1.消息队列概述 可以理解为保存消息的一个媒介/或者是个容器,与之相关有两个概念(即生产者(Publish)与消费者(Consumer)).所谓生产者,就是生产创造消息的一方,那么,消费者便是从队列中取出消息,完成相应的某种目的.总而言之,消息队列是搭建生产者与消费者沟通的一座桥梁. 消息队列的产品也很多,作用也各有千秋.常见的消息队列有RabbitMQ.RocketMq.KafKa,本系列文章将以(RabbitMQ+C#客户端+Windonw)为例去探索消息队列的基本用途及相关使用.闲话不说,

ASP.NET Core消息队列RabbitMQ基础入门实战演练

一.课程介绍 人生苦短,我用.NET Core!消息队列RabbitMQ大家相比都不陌生,本次分享课程阿笨将给大家分享一下在一般项目中99%都会用到的消息队列MQ的一个实战业务运用场景.本次分享课程不是零基础教学,课程内容的侧重点是讲解的RabbitMQ的最实用.最简单的实战运用场景:Publish/Subscrib(发布/订阅)模式,发送端发送消息,单个接收端接收处理消息. 学完本次"是猴子都看的懂的消息队列RabbitMQ实战课程"后,阿笨带直接让你也能如此优雅简单的上手使用Rab

AMQP消息队列之RabbitMQ简单示例

前面一篇文章讲了如何快速搭建一个ActiveMQ的示例程序,ActiveMQ是JMS的实现,那这篇文章就再看下另外一种消息队列AMQP的代表实现RabbitMQ的简单示例吧.在具体讲解之前,先通过一个图来概览下: 1.添加Maven依赖 <!-- rabbitmq begin --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit

消息队列 RabbitMQ 入门介绍

来源:http://ityen.com/archives/578 一.什么是RabbitMQ? RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:   例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在

SpringBoot(八) Spring和消息队列RabbitMQ

概述 1.大多数应用中,可以通过消息服务中间件来提升系统异步能力和拓展解耦能力. 2.消息服务中的两个重要概念:消息代理(Message broker)和目的地(destination) 当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地. 3.消息队列主要有两种形式的目的地: 队列:点对点方式通信(point-to-point) 主题:发布/订阅消息服务 点对点式:消息发送者发送消息后,消息代理将其放入一个队列中,消息接受者从队列中读取数据,接受者接收数据后,将消息移除

C# 消息队列 RabbitMQ

1.引言 RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适. RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下: RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息. RabbitMQ提供了可靠的消息机制.跟踪机制和灵活的消息路由,支持消息集群和分布式部署. 适用于排队算法.秒杀活动.消息分发.异步处理.数据同步.处理耗时任务.CQRS等应用场景. 下面我们

消息队列RabbitMQ、缓存数据库Redis

1.RabbitMQ消息队列 1.1 RabbitMQ简介 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全. RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..N