PHP + Redis 实现消息队列

Redis做消息队列的好处在于它的轻量级,高并发,延迟敏感,应用场景有 即时数据分析、秒杀计数器、缓存等

Redis做消息队列待解决的问题:

  1、消息的可靠性: 没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理。生产者只管向队列中插入数据,不管消费者是否成功消费。

  2、消费者挂掉消息不会丢失,但是需要重新触发一下消费者,才能够继续消费消息。

代码如下:

  lib.php 是工具文件,里面有数据库的连接、Redis的连接:

<?php
/**
 * 获取数据库连接
 *
 * @param $host
 * @param $username
 * @param $password
 * @param $database
 * @return mysqli
 */
function getDBConnection($host, $username, $password, $database){
    $connection = new mysqli(‘p:‘.$host, $username, $password, $database);
    if (!$connection) {
        echo "Error: Unable to connect to MySQL." . PHP_EOL;
        echo "Debugging errno: " . mysqli_connect_errno() . PHP_EOL;
        echo "Debugging error: " . mysqli_connect_error() . PHP_EOL;
        exit;
    }
    mysqli_query($connection, "set names ‘utf8‘");
    return $connection;
}

/**
 * 获取Redis连接
 *
 * @param $host
 * @param $port
 * @param string $password
 * @param int $database
 * @return Redis
 */
function getRedis($host=‘127.0.0.1‘, $port=‘6379‘, $password=null, $database=0){
    $redis = new Redis();
    if(!$redis->connect($host, $port)){
        die("Redis连接失败:IP或端口有误");
    }
    if(!empty($password) && !$redis->auth($password)){
        die("Redis连接失败:密码错误");
    }
    if($database){
        $redis->select($database);
    }
    // work中 subscribe 如果一段时间没有接到消息,就会停掉然后停掉,所以加这个语句让其永不超时
    $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
    return $redis;
}

/**
 * 打印消息日志
 *
 * @param $msg
 */
function stdout($msg=null){
    $msg = ‘[‘.date(‘Y-m-d H:i:s‘).‘]‘.$msg.chr(10);;
    fwrite(STDOUT, $msg);
}

 

 register.php 是消息发布者,注释的是将消息存入数据库部分的代码。

  首先想消息存入 register_users 队列中,存入的 key是register_users;value是一个list,消息全部存入其中。用 redis-cli 查看数据的命令是:

LRANGE register_users 0 -1

  register.php:

<?php
require ‘./lib.php‘;
$name = $argv[1];
$mobile = $argv[2];
if(empty($name) || empty($mobile)){
    die("参数错误");
}
// $connection = getDBConnection(‘localhost:3306‘, ‘root‘, ‘root‘, ‘blog‘);
// // 开启事务
// mysqli_begin_transaction($connection);
// $sql = "insert into mq_user(name, mobile) values (‘$name‘, ‘$mobile‘)";
// if(!mysqli_query($connection, $sql)){
//     die("写入用户信息失败,原因:".$connection->error);
// }
$redis = getRedis();
// 添加消息
$result = $redis->lpush(‘register_users‘, json_encode(array(‘name‘=>$name, ‘mobile‘=>$mobile), JSON_UNESCAPED_UNICODE));
if($result === false){
    mysqli_rollback($connection);
    die("添加消息队列失败");
}
// 发布消息
$redis->publish(‘register_success‘, ‘ok‘);
// 所有操作完成后提交事务
// mysqli_commit($connection);
// $connection->close();
$redis->close();

  

  work.php 做为消息的消费者

<?php
require ‘./lib.php‘;
$redis = getRedis();
$redis->subscribe([‘register_success‘], function ($instance, $channelName, $message) {
    if($channelName == "register_success" && $message = "ok") {
        $redis = getRedis();
        while($redis->lsize("register_users")>0) {
            $arr = $redis->brPop([‘register_users‘], 20);
            if(count($arr)) {
                $userInfo = json_decode($arr[1], true);
                stdout("新注册用户信息:");
                stdout("姓名:".$userInfo[‘name‘]);
                stdout("手机号:".$userInfo[‘mobile‘]);
                stdout();
                sleep(3);
            }
        }
    }
});

  

  register.php将消息放入redis 的 register_users队列中,然后再使用 publish 将 register_success 消息发不出去。work.php 使用 subscribe 订阅 register_success 的消息。接收到 register_success 消息之后,读取 register_users 的消息进行处理。

我是按照这个github上面的代码做的参照,有些改动:

  https://github.com/jormin/php-redis-mq

原文地址:https://www.cnblogs.com/Lyh1997/p/11491046.html

时间: 2024-10-04 14:35:03

PHP + Redis 实现消息队列的相关文章

NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明

php+redis实现消息队列

原文地址:http://www.cnblogs.com/lisqiong/p/6039460.htmlphp+redis实现消息队列 ? 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 ?redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 1 2 3 4 5 6 7 8 9 10 11 12 13

Redis 做消息队列

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式.利用redis这两种场景的消息队列都能够实现.定义: 生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息:即对于每个消息只能被最多一个消费者拥有. 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息:即正常情况下每个消费者收到的消息应该都是一样的. 那么如此多的MQ产品,为什么要使用redis作消息队列呢?以下附上一份总结了别人的一些r

为什么学习Redis作为消息队列服务器

使用Redis作为消息队列服务场景 " 消息 "是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中," 消息队列 "是在消息的传输过程中保存消息的 容器 . 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件

用redis实现消息队列

为什么需要消息队列 系统中引入消息队列机制是对系统一个非常大的改善.例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中.你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验. 有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作.例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟.这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此

Java利用Redis实现消息队列

应用场景 为什么要用redis?二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: 1 package Utils; 2 import java.io.*; 3 /** 4 * Created

使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,"消息队列"是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时

jedis实现redis的消息队列、发布对象消息、字节数组与字符串相互转换

redis支持发布/订阅的消息队列机制,jedis提供了java访问redis的客户端,本文将描述如何用jedis实现简单的消息队列,并传输对象. redis支持发布.订阅的功能,基本的命令有publish.subscribe等.在jedis中,有对应的java方法,并且只能发布字符串消息.为了传输对象,需要将对象进行序列化,并封装成字符串进行处理.将对象序列化后,只能成为字节流,如何封装成字符串是一个难点,具体可参考下面的代码. 实现三个类,一个对应publish.一个对应subscribe.

【转】NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明