场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。
场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:
php代码如下:
/** * 死信 * 创建交换机1、交换机2、队列1、队列2 * 交换机1绑定队列1、交换机2绑定队列2 * 其中交换机1为死信交换机,队列1处理消息的ttl,队列1没有消费者 * * 由于队列1没有消费者,所以它里面的消息过期后会变成死信,再定义规则,让死信进入交换机2,交换机2再把消息路由到队列2 * 我们的客户端只需要消费队列2即可 * * 需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。 */ public function actionDeadletter(){ $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost); $channel = $connection->channel(); // 死信交换机和队列的设置 $channel->exchange_declare(‘dead_exchange‘, ‘direct‘, false, true, false); $channel->queue_declare(‘queue1‘, false, true, false, false, false, [ ‘x-dead-letter-exchange‘ => [‘S‘, ‘normal_exchange‘], // 死信被转发到哪个交换机 ‘x-dead-letter-routing-key‘ => [‘S‘, ‘normal_routingkey‘] // 死信路由 ]); $channel->queue_bind(‘queue1‘, ‘dead_exchange‘, ‘dead_routingkey‘); // 正常交换机和队列的设置 $channel->exchange_declare(‘normal_exchange‘, ‘direct‘, false, true, false); $channel->queue_declare(‘queue2‘, false, true, false, false, false); $channel->queue_bind(‘queue2‘, ‘normal_exchange‘, ‘normal_routingkey‘); $msg = new AMQPMessage(‘hello world‘, [ ‘delivery_mode‘ => 2, ‘expiration‘ => 10*1000 //毫秒 ]); $channel->basic_publish($msg, ‘dead_exchange‘, ‘dead_routingkey‘); echo " [x] Sent ‘Hello World!‘\n"; $channel->close(); $connection->close(); }
运行程序,打开rabbitmq的web管理界面,可以看到消息先进入队列1,当消息过期后会自动进入队列2
参考:https://stackoverflow.com/questions/21942063/how-to-delay-php-amqplib
参考:http://www.cnblogs.com/haoxinyue/p/6613706.html
时间: 2024-11-02 10:13:55