装好了amqp后就可以开始编写代码了:
消费者:接收消息
逻辑:
创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息
<?php /************************************* * PHP amqp(RabbitMQ) Demo - consumer * Author: Linvo * Date: 2012/7/30 *************************************/ //配置信息 $conn_args = array( ‘host‘ => ‘192.168.1.93‘, ‘port‘ => ‘5672‘, ‘login‘ => ‘guest‘, ‘password‘ => ‘guest‘, ‘vhost‘=>‘/‘ ); $e_name = ‘e_linvo‘; //交换机名 $q_name = ‘q_linvo‘; //队列名 $k_route = ‘key_1‘; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //创建交换机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo "Exchange Status:".$ex->declare()."\n"; //创建队列 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo "Message Total:".$q->declare()."\n"; //绑定交换机与队列,并指定路由键 echo ‘Queue Bind: ‘.$q->bind($e_name, $k_route)."\n"; //阻塞模式接收消息 echo "Message:\n"; while(True){ $q->consume(‘processMessage‘); //$q->consume(‘processMessage‘, AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); /** * 消费回调函数 * 处理消息 */ function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }
生产者:发送消息
逻辑:
创建连接-->创建channel-->创建交换机对象-->发送消息
<?php /************************************* * PHP amqp(RabbitMQ) Demo - publisher * Author: Linvo * Date: 2012/7/30 *************************************/ //配置信息 $conn_args = array( ‘host‘ => ‘192.168.1.93‘, ‘port‘ => ‘5672‘, ‘login‘ => ‘guest‘, ‘password‘ => ‘guest‘, ‘vhost‘=>‘/‘ ); $e_name = ‘e_linvo‘; //交换机名 //$q_name = ‘q_linvo‘; //无需队列名 $k_route = ‘key_1‘; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($conn); //消息内容 $message = "TEST MESSAGE! 测试消息!"; //创建交换机对象 $ex = new AMQPExchange($channel); $ex->setName($e_name); //发送消息 //$channel->startTransaction(); //开始事务 for($i=0; $i<5; ++$i){ echo "Send Message:".$ex->publish($message, $k_route)."\n"; } //$channel->commitTransaction(); //提交事务 $conn->disconnect();
需要注意的地方是:
queue对象有两个方法可用于取消息:consume和get。
前者是阻塞的,无消息时会被挂起,适合循环中使用;
后者则是非阻塞的,取消息时有则取,无则返回false。
测试截图
运行消费者:
运行生产者,发消息:
消费者接收到消息:
RabbitMQ消息队列在PHP下的应用
消息队列的实现中,RabbitMQ以其健壮和可靠见长.公司的项目中选择了它作为消息队列的实现.关于MQ的机制和原理网上有很多文章可以看,这里就不再赘述,只讲几个比较容易混淆的问题
1,binding key和routing key
binding key和routing key是都不过是自己设置的一组字符,只是用的地方不同,binding key是在绑定交换机和队列时候通过方法传递的字符串,routing key是在发布消息时候,顺便带上的字符串,有些人说这两个其实是一个东西,也对也不对,说对,是因为这两个可以完全一样,说不对,是因为这两个起的作用不同,一个交换机可以绑定很多队列,但是每个队列也许需要的消息类型不同,binding key就是这个绑定时候留在交换机和队列之间的提示信息,当消息发送出来后,随着消息一起发送的routing key如果和binding key一样就说明消息是这个队列要的东西,如果不一样那就不要给这个队列,交换机你找找下个队列看看要不要.明白了吧,这两个key就是暗号,对上了就是自己人,对不上那麻烦你再找找去.
binding key和routing key的配对其实也不是就要完全一样,还可以‘相似‘配对,建立交换机的时候,就要告诉MQ,我要声明的这个交换机和它上面的队列之间传输消息时候要求routing key和binding key完全一样,这种模式叫Direct,如果routing key和binding key可以‘模糊‘匹配,这种模式叫Topic,如果不需要匹配,尽管发,叫Fanout.
2,持久化
交换机和队列都可以在创建时候设置为持久化,重启以后会回复,但是其中的消息未不会,如果要消息也恢复,将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式), 1为非持久化,2为持久化.
3,流控机制
当消息生产的速度更快,而进程的处理能力低时,消息就会堆积起来,占用内存越来越多,导致MQ崩溃,所以rabbitmq有一个流控机制,当超过限定时候就会阻止接受消息,mq流控有三种机制
1,主动阻塞住发消息太快的连接,这个无法调整,如果被阻塞了,在abbitmqctl 控制台上会显示一个blocked的状态。
2,内存超过限量,会阻塞连接,在vm_memory_high_watermark可调
3,剩余磁盘在限定以下mq会 主动阻塞所有的生产者,默认为50m,在disk_free_limit可调.
下面是在centos7上面的,MQ安装过程.
1,必要的支持
yum install ncurses-devel unixODBC unixODBC-devel
2,erlang环境
wget http://www.erlang.org/download/ otp_src_17.3.tar.gz tar zxvf otp_src_17.3.tar.gz cd otp_src_17.3 ./configure --without-javac #忽略警告 make && make install
3,安装rabbitmq依赖文件,安装rabbitmq
yum install xmlto wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.1/rabbitmq-server-3.4.1.tar.gz tar zxvf rabbitmq-server-3.4.1.tar.gz cd rabbitmq-server-3.4.1/ make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc install /usr/rabbitmq/sbin/rabbitmq-server -detached 启动rabbitmq /usr/rabbitmq/sbin/rabbitmqctl status 查看状态 /usr/rabbitmq/sbin/rabbitmqctl stop 关闭rabbitmq
4,启用管理插件
mkdir /etc/rabbitmq cd /usr/rabbitmq/sbin ./rabbitmq-plugins enable rabbitmq_management (启用插件) ./rabbitmq-plugins disable rabbitmq_management (禁用插件) # 重启rabbitmq # 访问 http://127.0.0.1:15672/ # 如果有iptables # vi /etc/sysconfig/iptables 增加 # -A INPUT -m state --state NEW -m tcp -p tcp --dport 15672 -j ACCEPT # 重启动iptable systemctl restart iptables.service
5,创建配置文件
#在/usr/rabbitmq/sbin/rabbitmq-defaults 查看config文件路径 # 创建配置文件 touch/usr/rabbitmq/sbin #vm_memory_high_watermark 内存低水位线,若低于该水位线,则开启流控机制,阻止所有请求,默认值是0.4,即内存总量的40%, #vm_memory_high_watermark_paging_ratio 内存低水位线的多少百分比开始通过写入磁盘文件来释放内存 vi /usr/rabbitmq/sbin/rabbitmq.config 输入 [ {rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75}, {vm_memory_high_watermark, 0.7}]} ].
6,创建环境文件
touch /etc/rabbitmq/rabbitmq-env.conf #输入 RABBITMQ_NODENAME=FZTEC-240088 节点名称 RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 监听IP RABBITMQ_NODE_PORT=5672 监听端口 RABBITMQ_LOG_BASE=/data/rabbitmq/log 日志目录 RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目录 RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 后端存储目录
7,安装php的rabbitmq扩展
yum install librabbitmq-devel.x86_64 wget http://pecl.php.net/get/amqp-1.4.0.tgz tar zxvf amqp-1.4.0.tgz cd amqp-1.4.0 /usr/local/php/bin/phpize ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp make && make install vim /usr/local/php/etc/php.ini #输入 extension=amqp.so service nginx reload service php-fpm restart
操作命令
查看exchange信息 /usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete arguments 查看队列信息 /usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me 查看绑定信息 /usr/rabbitmq/sbin/rabbitmqctl list_bindings 查看连接信息 /usr/rabbitmq/sbin/rabbitmqctl list_connections
php的server端脚本
<?php $routingkey=‘key‘; //设置你的连接 $conn_args = array(‘host‘ => ‘localhost‘, ‘port‘ => ‘5672‘, ‘login‘ => ‘guest‘, ‘password‘ => ‘guest‘); $conn = new AMQPConnection($conn_args); if ($conn->connect()) { echo "Established a connection to the broker \n"; } else { echo "Cannot connect to the broker \n "; } //你的消息 $message = json_encode(array(‘Hello World3!‘,‘php3‘,‘c++3:‘)); //创建channel $channel = new AMQPChannel($conn); //创建exchange $ex = new AMQPExchange($channel); $ex->setName(‘exchange‘);//创建名字 $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); //$ex->setFlags(AMQP_AUTODELETE); //echo "exchange status:".$ex->declare(); echo "exchange status:".$ex->declareExchange(); echo "\n"; for($i=0;$i<100;$i++){ if($routingkey==‘key2‘){ $routingkey=‘key‘; }else{ $routingkey=‘key2‘; } $ex->publish($message,$routingkey); } /* $ex->publish($message,$routingkey); 创建队列 $q = new AMQPQueue($channel); 设置队列名字 如果不存在则添加 $q->setName(‘queue‘); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); echo "queue status: ".$q->declare(); echo "\n"; echo ‘queue bind: ‘.$q->bind(‘exchange‘,‘route.key‘); 将你的队列绑定到routingKey echo "\n"; $channel->startTransaction(); echo "send: ".$ex->publish($message, ‘route.key‘); //将你的消息通过制定routingKey发送 $channel->commitTransaction(); $conn->disconnect(); */
php客户端脚本
<?php $bindingkey=‘key2‘; //连接RabbitMQ $conn_args = array( ‘host‘=>‘127.0.0.1‘ , ‘port‘=> ‘5672‘, ‘login‘=>‘guest‘ , ‘password‘=> ‘guest‘,‘vhost‘ =>‘/‘); $conn = new AMQPConnection($conn_args); $conn->connect(); //设置queue名称,使用exchange,绑定routingkey $channel = new AMQPChannel($conn); $q = new AMQPQueue($channel); $q->setName(‘queue2‘); $q->setFlags(AMQP_DURABLE); $q->declare(); $q->bind(‘exchange‘,$bindingkey); //消息获取 $messages = $q->get(AMQP_AUTOACK) ; if ($messages){ var_dump(json_decode($messages->getBody(), true )); } $conn->disconnect(); ?>
翻译了部分mq常量设置,不正确的地方,大家以试验为准
/** * Passing in this constant as a flag will forcefully disable all other flags. * Use this if you want to temporarily disable the amqp.auto_ack ini setting. * 传递这个参数作为标志将完全禁用其他标志,如果你想临时禁用amqp.auto_ack设置起效 */ define(‘AMQP_NOPARAM‘, 0); /** * Durable exchanges and queues will survive a broker restart, complete with all of their data. * 持久化交换机和队列,当代理重启动后依然存在,并包括它们中的完整数据 */ define(‘AMQP_DURABLE‘, 2); /** * Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist. * 被动模式的交换机和队列不能被重新定义,但是如果交换机和队列不存在,代理将扔出一个错误提示 */ define(‘AMQP_PASSIVE‘, 4); /** * Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue. * 仅对队列有效,这个人标志定义队列仅允许一个客户端连接并且从其消费消息 */ define(‘AMQP_EXCLUSIVE‘, 8); /** * For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound * to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete * flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no * subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be * automatically deleted with the client disconnects. * 对交换机而言,自动删除标志表示交换机将在没有队列绑定的情况下被自动删除,如果从没有队列和其绑定过,这个交换机将不会被删除. * 对队列而言,自动删除标志表示如果没有消费者和你绑定的话将被自动删除,如果从没有消费者和其绑定,将不被删除,独占队列在客户断 * 开连接的时候将总是会被删除 */ define(‘AMQP_AUTODELETE‘, 16); /** * Clients are not allowed to make specific queue bindings to exchanges defined with this flag. * 这个标志标识不允许自定义队列绑定到交换机上 */ define(‘AMQP_INTERNAL‘, 32); /** * When passed to the consume method for a clustered environment, do not consume from the local node. * 在集群环境消费方法中传递这个参数,表示将不会从本地站点消费消息 */ define(‘AMQP_NOLOCAL‘, 64); /** * When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag, * the messages will be immediately marked as acknowledged by the server upon delivery. * 当在队列get方法中作为标志传递这个参数的时候,消息将在被服务器输出之前标志为acknowledged (已收到) */ define(‘AMQP_AUTOACK‘, 128); /** * Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty. * 在队列建立时候传递这个参数,这个标志表示队列将在为空的时候被删除 */ define(‘AMQP_IFEMPTY‘, 256); /** * Passed on queue or exchange creation, this flag indicates that the queue or exchange should be * deleted when no clients are connected to the given queue or exchange. * 在交换机或者队列建立的时候传递这个参数,这个标志表示没有客户端连接的时候,交换机或者队列将被删除 */ define(‘AMQP_IFUNUSED‘, 512); /** * When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned. * 当发布消息的时候,消息必须被正确路由到一个有效的队列,否则将返回一个错误 */ define(‘AMQP_MANDATORY‘, 1024); /** * When publishing a message, mark this message for immediate processing by the broker. (High priority message.) * 当发布消息时候,这个消息将被立即处理. */ define(‘AMQP_IMMEDIATE‘, 2048); /** * If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple * messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message. * If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding * messages. * 当在调用AMQPQueue::ack时候设置这个标志,传递标签将被视为最大包含数量,以便通过单个方法标示多个消息为已收到,如果设置为0 * 传递标签指向单个消息,如果设置了AMQP_MULTIPLE,并且传递标签是0,将所有未完成消息标示为已收到 */ define(‘AMQP_MULTIPLE‘, 4096); /** * If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait * for a reply method. If the server could not complete the method it will raise a channel or connection exception. * 当在调用AMQPExchange::bind()方法的时候,服务器将不响应请求,客户端将不应该等待响应,如果服务器无法完成该方法,将会抛出一个异常 */ define(‘AMQP_NOWAIT‘, 8192); /** * If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue. * 如果在调用AMQPQueue::nack方法时候设置,消息将会被传递回队列 */ define(‘AMQP_REQUEUE‘, 16384); /** * A direct exchange type. * direct类型交换机 */ define(‘AMQP_EX_TYPE_DIRECT‘, ‘direct‘); /** * A fanout exchange type. * fanout类型交换机 */ define(‘AMQP_EX_TYPE_FANOUT‘, ‘fanout‘); /** * A topic exchange type. * topic类型交换机 */ define(‘AMQP_EX_TYPE_TOPIC‘, ‘topic‘); /** * A header exchange type. * header类型交换机 */ define(‘AMQP_EX_TYPE_HEADERS‘, ‘headers‘); /** * socket连接超时设置 */ define(‘AMQP_OS_SOCKET_TIMEOUT_ERRNO‘, 536870947);