PHP中RabbitMQ之amqp扩展实现(四)

目前我在PHP里接触实现RabbitMQ的方式有两种,一种是通过amqp扩展,一种是使用php-amqplib,本章讲诉RabbitMQ的安装及amqp扩展及amqp扩展如何实现RabbitMQ

环境:CoentOS,PHP 7

1、RabbitMQ的安装

需要下载的两个包

erlang-21.0.7-1.el7.centos.x86_64.rpm

rabbitmq-server-3.7.7-1.el7.noarch.rpm

这两个包我已经放在了百度云盘的分享上

链接:https://pan.baidu.com/s/1rMv_yFpLnH-D1S5wrOZrbA#list/path=%2FRabbitMQ

提取码:ipyu

然后参照 weixin_41368339的博客linux rabbitmq3.7.7安装与使用一文中的步骤安装步,基本上没有什么问题

2、amqp扩展的安装

这个请参照一只猪儿虫linux 编译安装amqp一文中的步骤安装,也没有什么问题

当安装成功之后,即可开始用amqp实现RabbitMQ了

3、Demo示例

在安装完成后我们就可以开始我们的RabbitMQ之旅了,本Demo示例只创建了一个直连交换机,共有四个文件Consum.php (消费者),Publish.php (生产者) ,RabbitMqParernt.php (自己封装的RabbitMQ的方法) ,以及test.php (测试数据)

RabbitMqParernt.php如下所示

  1. <?php

  2.  

    abstract class RabbitMqParernt

  3.  

    {

  4.  

    //rabbitMQ配置信息(默认配置)

  5.  

    public $config = array(

  6.  

    ‘host‘=>‘127.0.0.1‘, //host

  7.  

    ‘port‘=>5672, //端口

  8.  

    ‘username‘=>‘guest‘, //账号

  9.  

    ‘password‘=>‘guest‘, //密码

  10.  

    ‘vhost‘=>‘/‘ //虚拟主机

  11.  

    );

  12.  

  13.  

    public $exchangeName = ‘‘; //交换机

  14.  

    public $queueName = ‘‘; //队列名

  15.  

    public $routeKey = ‘‘; //路由键

  16.  

    public $exchangeType = ‘‘; //交换机类型

  17.  

  18.  

    public $channel; //信道

  19.  

    public $connection; //连接

  20.  

    public $exchange; //交换机

  21.  

    public $queue; //队列

  22.  

  23.  

    //初始化RabbitMQ($config数组是用来修改rabbitMQ的配置信息的)

  24.  

    public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = ‘‘,$config = array())

  25.  

    {

  26.  

    $this->exchangeName = $exchangeName;

  27.  

    $this->queueName = $queueName;

  28.  

    $this->routeKey = $routeKey;

  29.  

    $this->exchangeType = $exchangeType;

  30.  

    if(!empty($config))

  31.  

    {

  32.  

    $this->setConfig($config);

  33.  

    }

  34.  

    $this->createConnet();

  35.  

    }

  36.  

  37.  

    //对RabbitMQ的配置重新进行配置

  38.  

    public function setConfig($config)

  39.  

    {

  40.  

    if (!is_array($config))

  41.  

    {

  42.  

    throw new Exception(‘config不是一个数组‘);

  43.  

    }

  44.  

    foreach($config as $key => $value)

  45.  

    {

  46.  

    $this->config[$key] = $value;

  47.  

    }

  48.  

    }

  49.  

  50.  

    //创建连接与信道

  51.  

    public function createConnet()

  52.  

    {

  53.  

    //创建连接

  54.  

    $this->connection = new AMQPConnection($this->config);

  55.  

    if(!$this->connection->connect())

  56.  

    {

  57.  

    throw new Exception(‘RabbitMQ创建连接失败‘);

  58.  

    }

  59.  

    //创建信道

  60.  

    $this->channel = new AMQPChannel($this->connection);

  61.  

    //创建交换机

  62.  

    $this->createExchange();

  63.  

    //生产时不需要队列,故队列名为空,只有消费时需要队列名

  64.  

    if(!empty($this->queueName))

  65.  

    {

  66.  

    $this->createQueue();

  67.  

    }

  68.  

    }

  69.  

  70.  

    //创建交换机

  71.  

    public function createExchange()

  72.  

    {

  73.  

    $this->exchange = new AMQPExchange($this->channel);

  74.  

    $this->exchange->setName($this->exchangeName);

  75.  

    $this->exchange->setType(AMQP_EX_TYPE_DIRECT);

  76.  

    $this->exchange->setFlags(AMQP_DURABLE);

  77.  

    }

  78.  

  79.  

    //创建队列,绑定交换机

  80.  

    public function createQueue()

  81.  

    {

  82.  

    $this->queue = new AMQPQueue($this->channel);

  83.  

    $this->queue->setName($this->queueName);

  84.  

    $this->queue->setFlags(AMQP_DURABLE);

  85.  

    $this->queue->bind($this->exchangeName, $this->routeKey);

  86.  

    }

  87.  

  88.  

    public function dealMq($flag)

  89.  

    {

  90.  

    if($flag)

  91.  

    {

  92.  

    $this->queue->consume(function($envelope){$this->getMsg($envelope, $this->queue);},AMQP_AUTOACK);//自动ACK应答

  93.  

    }

  94.  

    else

  95.  

    {

  96.  

    $this->queue->consume(function($envelope){$this->processMessage($envelope, $this->queue);});

  97.  

    }

  98.  

    }

  99.  

  100.  

    public function getMsg($envelope, $queue)

  101.  

    {

  102.  

    $msg = $envelope->getBody();

  103.  

    $this->doProcess($msg);

  104.  

    }

  105.  

  106.  

    public function processMessage($envelope, $queue)

  107.  

    {

  108.  

    $msg = $envelope->getBody();

  109.  

    $this->doProcess($msg);

  110.  

    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答

  111.  

    }

  112.  

  113.  

    //处理消息的真正函数,在消费者里使用

  114.  

    abstract public function doProcess($msg);

  115.  

  116.  

    //发送消息

  117.  

    public function sendMessage($message)

  118.  

    {

  119.  

    $this->exchange->publish($message, $this->routeKey);

  120.  

    }

  121.  

  122.  

  123.  

    //关闭连接

  124.  

    public function closeConnect()

  125.  

    {

  126.  

    $this->channel->close();

  127.  

    $this->connection->disconnect();

  128.  

    }

  129.  

    }

Consum.php 如下所示

  1. <?php

  2.  

    include_once(‘RabbitMqParernt.php‘);

  3.  

    class Consum extends RabbitMqParernt

  4.  

    {

  5.  

    public function __construct()

  6.  

    {

  7.  

    parent::__construct(‘exchange‘, ‘queue‘, ‘routeKey‘);

  8.  

    }

  9.  

    public function doProcess($msg)

  10.  

    {

  11.  

    echo $msg;

  12.  

    }

  13.  

    }

  14.  

    $consum = new Consum();

  15.  

    //$consum->dealMq(false);

  16.  

    $consum->dealMq(true);

Publish.php如下所示

  1. <?php

  2.  

    include_once(‘RabbitMqParernt.php‘);

  3.  

    class Publish extends RabbitMqParernt

  4.  

    {

  5.  

    public function __construct()

  6.  

    {

  7.  

    parent::__construct(‘exchange‘, ‘‘, ‘routeKey‘);

  8.  

    }

  9.  

    public function doProcess($msg)

  10.  

    {

  11.  

  12.  

    }

  13.  

  14.  

    }

test.php如下所示

  1. <?php

  2.  

    include_once(‘Publish.php‘);

  3.  

    $publish = new Publish();

  4.  

    $publish->sendMessage(‘Hello,World!‘);

  5.  

    $publish->closeConnect();

4、添加交换机与队列

打开http://ip(你的RabbitMQ安装的主机):15672/,会进入到RabbitMQ的可视化管理后台登录页面,登录你的账号密码(如果你是按照第一步提到的博客里的教程来装的,那你的账号密码就是guest),然后新加交换机和队列,

以下是新加交换机的操作,注意vhost与以及交换机的名称要与代码里的消费者与生产者传入的参数值保持一致,如果你不想使用"/"这个默认的vhost,也可以新建一个vhost(什么?你问我如何新建,那么请百度一下),但是要记住在代码里创建消费者与生产者时把你新加的这个vhost传进去,覆盖RabbitMqParernt.php里的vhost

以下是新加队列,这里的vhost要与上一步的vhost保持一致,保证交换机与队列在同一个vhost下,不然交换机会找不到队列的,队列名与消费者代码里传入进去的队列名保持一致

5、运行代码

然后我们在一个窗口先启动消费者

在另外一个窗口运行test.php文件

出现这个生产者发布的hello,world!就成功了

注意:消费者与生产者传入的交换机名称,路由键必须相同

            交换机类型请务必选择直连,各种交换机的路由键形式不大相同,有兴趣的同学可以去试试其它类型的交换机实现哦

            当修改了vhost或者交换机名称,队列名称等时,需要修改对应代码

            至于注释里的ack应答,我会在之后的博客里详细介绍,包括RabbitMQ的持久化,这里使用默认的ack应答即可

            对于amqp的拓展的使用,大家也可以去研究一下

            关于管理后台及RabbitMQ的命令,我这里就不多介绍了,有兴趣的同学去网上搜索一下就能搜到好多

下一篇:RabbitMQ的持久化(六)

原文地址:https://www.cnblogs.com/php-linux/p/11293892.html

时间: 2024-11-13 07:51:38

PHP中RabbitMQ之amqp扩展实现(四)的相关文章

用PHP尝试RabbitMQ(amqp扩展)

装好了amqp后就可以开始编写代码了: 消费者:接收消息 逻辑: 创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息 <?php /************************************* * PHP amqp(RabbitMQ) Demo - consumer * Author: Linvo * Date: 2012/7/30 **********************************

用PHP尝试RabbitMQ(amqp扩展)实现消息的发送和接收

消费者:接收消息 逻辑:创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息 生产者:发送消息 逻辑:创建连接-->创建channel-->创建交换机对象-->发送消息

RabbitMQ 在 PHP 下的简单使用 (一) -- 安装 AMQP 扩展和 Direct Exchange 模式

Windows 安装 amqp 扩展 RabbitMQ 是基于 amqp(高级消息队列协议) 协议的.使用 RabbitMQ 前必须为 PHP 安装相应的 amqp 扩展. 下载相应版本的 amqp 扩展:http://pecl.php.net/package/amqp,解压缩文件. 将 php_amqp.dll 复制到 php 的扩展目录 ext 下,修改配置文件 php.ini: [amqp] extension=php_amqp.dll 将 rabbitmq.*.dll 文件复制到 php

iOS8中添加的extensions总结(四)——Action扩展

Action扩展 注:此教程来源于http://www.raywenderlich.com的<iOS8 by Tutorials> 1.准备 本次教程利用网站bitly.com进行 bitly网站进行对网络链接的精简化,比如将YouTube某个视频链接转化为bit.ly/1wOl2zf这种形式:同时,该网站提供对链接进行数据分析等服务,直接在链接后+“+”即可查看流量.点击量等信息,比如 bit.ly/1wOl2zf+ . 1.注册点击:https://bitly.com/a/sign_up 

php 的rabbitmq 扩展模块amqp安装

error 提示: Please reinstall the librabbitmq distribution itself or (re)install librabbitmq development package if it available in your system 或者 checking for amqp using pkg-config... configure: error: librabbitmq not found 或者 ERROR: `/tmp/pear/temp/am

php7.1 安装amqp扩展

在php开发中使用rabbitmq消息队列时,需要安装PHP扩展amqp,安装步骤如下: 直接使用pecl进行amqp扩展的安装, /usr/local/php/bin/pecl install amqp 如果缺少librabbitmq库文件,需要先安装librabbitmq,步骤如下: 1 wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.7.1/rabbitmq-c-0.7.1.tar.gz 2 tar -zxvf

perl C/C++ 扩展(四)

在前面三篇博客中,我们了解到如何使用c/c++ 扩展自己的perl 库,但是博主在学习过程中,对动态库或静态库的加载不是十分了解,后来自己又细挖一下.后来就有了这篇博文,再后来,没有再后来了,囧!! 我们先来看看 perl c/c++ 扩展(二)中的 Makefile.PL WriteMakefile( NAME => 'two_test', VERSION_FROM => 'lib/two_test.pm', # finds $VERSION PREREQ_PM => {}, # e.

探索 OpenStack 之(14):OpenStack 中 RabbitMQ 使用研究 (上半部分)

本文是 OpenStack 中的 RabbitMQ 使用研究 两部分中的第一部分,将介绍 RabbitMQ 的基本概念,即 RabbitMQ 是什么.第二部分将介绍其在 OpenStack 中的使用. 1 RabbitMQ 的基本概念 RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件). AMQP是一个定义了在应用或者组织之间传送消息的协议的开放标准 (an open standard for passing business messages be

ActionScript3游戏中的图像编程(连载四)

1.1 RGB模式,ARGB模式及其运算 1.1.1 RGB模式及其运算 RGB是色光三原色(Red红,Green绿,Blue蓝)的简写,物理学上,自然界的所有颜色都可以分解为这三种色光.不同的颜色,色光的值会有所差别.反过来,任何颜色都可以通过红绿蓝三种色光合成出来.白色把任何色光都反射出来了,所以它的红绿蓝成分都等于100%,黑色不反射任何色光,三原色的成分都等于0%.红色只反射红光,所以红色光等于100%,另外两种色光值为0%. 计算机软件习惯将色光称作“通道”,并将该概念扩展到其它的色彩