由于公司打算将php-resque的消息队列切换到activemq来,了解一下centos+php+activemq+stomp搭建消息队列的。
一、安装JDK
下载JDK(官网:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html)
1.1 选择:jdk-8u131-linux-x64.tar.gz 下载到 /data/service
1.2 解压:tar -zxvf jdk-8u131-linux-x64.tar.gz
1.3 移到:mkdir -p /usr/local/java cp -R jdk1.8.0_131/ /usr/local/java/
1.4 添加JAVA环境变量 vi /etc/profile
JAVA_HOME=/usr/local/java/jdk1.8.0_131 JRE_HOME=/usr/local/java/jdk1.8.0_131/jre PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH CLASSPATH=:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export JAVA_HOME JRE_HOME PATH CLASSPATH
1.5 重新加载 profile
source /etc/profile
二、安装activemq
2.1:下载activemq
cd /data/service wget http://mirrors.hust.edu.cn/apache//activemq/5.14.5/apache-activemq-5.14.5-bin.tar.gz
2.2:解压
tar -zxvf apache-activemq-5.14.5-bin.tar.gz
2.3 配置stomp
vim /data/software/apache-activemq-5.14.5/conf/activemq.xml 在<transportConnectors>中添加如下: <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
2.4 启动 activemq
/data/software/apache-activemq-5.14.5/bin/activemq start
2.5:监控activemq
http://activemqserverip:8161/admin/ (默认的账号:admin 默认密码:admin)
2.6:修改监控账号
vi /data/software/apache-activemq-5.14.5/conf/jetty-realm.properties
注意的是 用户名和密码的格式是
用户名 : 密码 ,角色名
三、PHP使用activemq测试消息队列(发送数据:sender,获取数据:receiver)
3.1 sender(生产端)
class MqController extends Controller { public static $activemqConf = array( ‘url‘ => ‘tcp://activemqServerIp:61613‘, ‘id‘ => ‘admin‘, ‘pswd‘ => ‘admin‘, ‘queue‘ => ‘sms‘, ‘enable‘ => TRUE ); public function connectActiveMq() { $link = stomp_connect(self::$activemqConf[‘url‘], self::$activemqConf[‘id‘], self::$activemqConf[‘pswd‘]); if (!$link) { die("Can‘t connect MQ !!"); } else { return $link; } } public function actionSender() { $link = $this->connectActiveMq(); $item = ‘{ "mobile":"13900000000", "content":"您好!您的验证码为112233", "callback":"demo.xiaohuideng.com/sms/callback" }‘; //使用 persistent message $result = stomp_send($link, self::$activemqConf[‘queue‘], $item, array("persistent" => "true")); if (FALSE === $result) { echo ‘推入队列失败‘; } else { echo ‘job_id:‘ . $result; } } }
3.2 receiver(接收端)
class MqController extends Controller { public static $activemqConf = array( ‘url‘ => ‘tcp://activemqServerIp:61613‘, ‘id‘ => ‘admin‘, ‘pswd‘ => ‘admin‘, ‘queue‘ => ‘sms‘, ‘enable‘ => TRUE ); public function connectActiveMq() { $link = stomp_connect(self::$activemqConf[‘url‘], self::$activemqConf[‘id‘], self::$activemqConf[‘pswd‘]); if (!$link) { die("Can‘t connect MQ !!"); } else { return $link; } } public function actionReceiver() { $this->connectActiveMq(); $stomp = new Stomp(self::$activemqConf[‘url‘], self::$activemqConf[‘id‘], self::$activemqConf[‘pswd‘]); $stomp->subscribe(‘*‘); while ($stomp->hasFrame()) { $frame = $stomp->readFrame(); if ($frame != NULL) { // 收到的数据为 $frame->body $res = json_decode($frame->body, true); $res = json_encode($res, JSON_UNESCAPED_UNICODE); echo ‘<pre>‘; print_r($res); echo ‘</pre>‘; $stomp->ack($frame); } } } }
时间: 2024-10-12 04:38:18