PHP(Mysql/Redis)消息队列的介绍及应用场景案例--转载

郑重提示:本博客转载自好友博客,个人觉得写的很牛逼所以未经同意强行转载,原博客连接 http://www.cnblogs.com/wt645631686/p/8243438.html 欢迎访问

在进行网站设计的时候,有时候会遇到给用户大量发送短信,或者订单系统有大量的日志需要记录,还有做秒杀设计的时候,服务器无法承受这种瞬间的压力,无法正常处理,咱们怎么才能保证系统正常有效的运行呢?这时候我们就要引用消息队列来实现这类的需求,这时候就需要一个中间的系统进行分流和解压。消息队列就是一个中间件,需要配合其他合理使用。

消息队列的概念、原理和场景

本质上讲,消息队列结构就是一个队列结构的中间件
,也就是说把消息和内容放入这个容器之后,就可以直接的返回,不需要等它后期处理的结果,另外会有一个程序会读取这些数据,并按照顺序进行逐个的处理,也就是说按照并发非常大的一个环节的时候,同时呢你又不需要立即获得这个环节的返回结果,那么使用消息队列可以比较好的解决这个问题。一个经典的消息队列结果应该是这样的过程:
由一个业务系统进行入队,把消息逐个插入消息队列中,插入成功之后直接返回成功的结果,然后后续有一个消息处理系统,这个系统会把消息队列中的记录逐个进行取出并且进行处理,进行出队的操作。
消息系统适合的场景
冗余
首先数据需要冗余的时候,比如经常做订单系统,后续需要严格的转换和记录,消息队列可以把这些数据持久化存储在队列中,然后由订单处理程序进行获取,后续处理完成之后再把这条记录删除,保证每条记录都能处理完成。
解耦
消息队列分离了两套系统,解决了两套系统深度耦合的问题。使用消息队列后,入队的系统和出队的系统没有直接的关系,入队系统和出队系统其中一套系统崩溃的时候,都不会影响到另一个系统的正常运转。
流量削峰
这种场景最经典的就是秒杀抢购,这种情况会出现很大的流量剧增,大量的需求集中在短短的几秒内,对服务器的瞬间压力非常大,而我们配合缓存使用消息队列能非常有效的顶住瞬间访问量,防止服务器顶不住而崩溃。
异步通讯
消息本身可以使入队的系统直接返回,所以实现了程序的异步操作,因此只要适合于异步的场景都可以使用消息队列来实现。
扩展性
比如订单入队之后或许会有财务系统进行处理,但是后期我想加配货系统,我只需要让配货系统订阅消息队列就可以了,这样就很容易扩展。
排序保证
这种情况指的是在有些场景下数据处理顺序是非常重要的,这种情况非常适合队列处理,因为队列本身就可以做成单线程的单进单出的系统,从而保证数据按照顺序进行处理。

常见消息队列实现优缺点
队列介质有哪些?
Mysql:可靠性高、易实现,速度慢,比如表就可以。
Redis:速度快,单条大消息包时效率低。redis提供了list,适合做消息队列,但是redis有一个问题,消息包过大的时候,效率就慢了,一般单条内容都不大
消息系统:专业性强、可靠,但学习成本高,如RabbitMQ

消息处理三种触发机制
死循环方式读取处理:让一个死循环的程序不断地读取一个队列,并且进行后期处理,这种方式失效性是比较强的,因为这种程序不断地扫描消息队列,因此消息队列里一旦有数据,就可以进行后续处理。但是这样会造成服务器压力,最关键的是也不会知道程序什么时候会挂掉,一旦出现故障,没办法及时恢复,这种情况比较适合做秒杀,因为秒杀的时间点比较集中,一旦有秒杀可以立即处理。
定时任务:每隔几秒或者几分钟执行一次,这样做的最大好处就是把压力分开了,无论入队的系统在哪个时间点入队的峰值是多么不平均,但由于出队的系统是定时执行的,所以会把压力均摊,每个时间点的压力会差不太多,所以还是比较流行的,尤其是订单系统和物流配货系统这类的,如订单系统会把写入队列,用户就可以看到我的订单在等物流配货了,这样物流系统就会定时把订单进行汇总处理,这样压力就不会太大,唯一的缺点就是定时和间隔和数量要把握好,不要
等上一个定时任务没有执行完呢,下一个定时任务又开始了,这样容易出现不可预测的问题。
守护进程:类似于PHP-FPM和PHP-CGI进程,需要linux的shell基础。

解耦案例:队列处理订单系统和配送系统
在网购的时候,提交订单之后,看到自己的订单货物在配送中,这样就参与进来一个系统是配送系统,如果我们在做架构的时候,把订单系统和配送系统设计到一起,就会出现问题。首先对于订单系统来说,订单系统处理压力较大,对于配送系统来说没必要对这些压力做及时反映,我们没必要在订单系统出现问题的情况下,同时配送系统出现问题,这时候就会同时影响两个系统的运转,我们可以解耦解决。这两个系统分开之后,我们可以通过一个队列表来实现两个系统的沟通。首先,订单系统会接收用户的订单,进行订单的处理,会把这些订单写到队列表中,这个队列表是沟通两个系统的关键,由配送系统中的定时执行的程序来读取队列表进行处理,配送系统处理之后,会把已经处理的记录进行标记,这就是流程。
具体细节设计如下(Mysql队列举例):

首先,由order.php的文件接收用户的订单,然后生成订单号并对订单进行处理,订单系统处理完成之后会把配送系统需要的数据增加到队列表中,队列表可以这么设计,大概六个字段,order_id(订单主键id),status(订单状态),mobile(用户手机号),address(收获地址),created_at(创建的时间),updated_at(后期配送系统处理完成时间),然后有一个定时脚本,每分钟启动配送处理程序,配送处理程序这个goods.php用来处理队列表中的数据,当处理完成之后,会把队列表中的字段状态改为处理完成,这样整个流程结束。

创建订单表

CREATE TABLE `order_queue` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT ‘订单的id号‘,
  `order_id` int(11) NOT NULL,
  `mobile` varchar(20) NOT NULL COMMENT ‘用户的手机号‘,
  `address` varchar(100) NOT NULL COMMENT ‘用户的地址‘,
  `created_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘订单创建的时间‘,
  `updated_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘物流系统处理完成的时间‘,
  `status` tinyint(2) NOT NULL COMMENT ‘当前状态,0 未处理,1 已处理,2处理中‘,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;  

处理订单的order.php文件

<?php
include ‘class/db.php‘;  

if(!empty($_GET[‘mobile‘])){
    $order_id = rand(10000,99999);
    $insert_data = array(
        ‘order_id‘=>$order_id,
        ‘mobile‘=>$_GET[‘mobile‘],      //记得过滤
        ‘created_at‘=>date(‘Y-m-d H:i:s‘,time()),
        ‘order_id‘=>$order_id,
        ‘status‘=>0,    //0,未处理状态
    );
    $db = DB::getIntance();
//把数据放入队列表中
    $res = $db->insert(‘order_queue‘,$insert_data);
    if($res){
        echo $insert_data[‘order_id‘]."保存成功";
    }else{
        echo "保存失败";
    }
}else{
    echo "1";
}
?> 

配送系统处理订单的文件goods.php

<?php
//配送系统处理订单并进行标记
include ‘class/db.php‘;
$db = DB::getIntance();
//1:先要把要处理的数据状态改为待处理
$waiting = array(‘status‘=>0,);
$lock = array(‘status‘=>2,);
$res_lock = $db->update(‘order_queue‘,$lock,$waiting,2);
//2:选择出刚刚更新的数据,然后进行配送系统处理
if($res_lock){
    //选择出要处理的订单内容
    $res = $db->selectAll(‘order_queue‘,$lock);
     //然后由配货系统进行处理.....等操作
    //3:把处理过的改为已处理状态
    $success = array(
        ‘status‘=>1,
        ‘updated_at‘=>date(‘Y-m-d H;i:s‘,time()),
    );
    $res_last = $db->update(‘order_queue‘,$success,$lock);
    if($res_last){
       echo "Success:".$res_last;
    }else{
        echo "Fail:".$res_last;
    }
}else{
    echo "ALL Finished";
}
?>  

定时执行脚本的goods.sh,每分钟执行一次

#!/bin/bash
date "+%G-%m-%d %H:%M:%S"    //当前年月日
cd /data/wwwroot/default/mq/
php goods.php  

然后crontab任务定时执行脚本,并创建日志文件,然后指定输出格式

*/1 * * * * /data/wwwroot/default/mq/good.sh >> /data/wwwroot/default/mq/log.log 2>&1 //指定脚本目录并格式化输出//当然要创建log.log文件
tail -f log.log  //监控日志

这样订单系统和配送系统是相互独立的,并不影响另一个系统的正常运行。

再举一个关于Mysql消息队列的例子,以发送短信为例:

<?php
$db = new Db();
$sms = new Sms();
while(true){
    $item = $db->getFirstRecord(); //获取数据表第一条记录
    if(!$item){
        //如果队列中没有数据,则结束定时器
        break;
    }
    $res = $sms->send($item[‘phone‘],$item[‘content‘]); //发送短信
    if($res){
        $db->deleteFristRecord(); //删除发送成功的记录
        echo $item[‘phone‘].‘发送成功‘;
    }else{
        echo $item[‘phone‘].‘发送失败,稍后继续尝试‘;
    }
    sleep(10); //每隔十秒循环一次
}

echo ‘发送完毕!‘;
?>

将代码保存为timer_sms.php,打开命令行,执行定时器:

php定时器将会根据设定的时间间隔(这里设的是10秒),自动完成发送短信的任务。任务完成后将自动退出定时器,不再占用服务器资源。

根据我的测试,PHP定时器占用资源并不多,不会对服务器造成压力。而且是异步访问数据库,也不会影响数据库的运行。

这种方式的优点是:

1、后台运行,前台无需等待

2、成功率高,失败的记录会自动重发,直到成功

流量削峰案例:Redis的List类型实现秒杀

为什么要使用Redis而不适用Mysql呢?因为Redis是基于内存,速度要快很多,而Mysql需要往硬盘里写,因为其他业务还要使用Mysql,如果秒杀使用Mysql的话,会把Mysql的资源耗光,这样其他的业务在读取Mysql肯定出问题。另外Redis对数据有一个持久化作用,这样要比Memcache要有优势,并且数据类型要多,这次要用的就是Redis的List,可以向头部或者尾部向Redis的链表增加元素,这样Redis在实现一个轻量级的队列非常有优势。
LPUSH/LPUSHX:LPUSH是将值插入到链表的头部,LPUSHX是检测这个链表是否存在,如果存在的话会插入头部,如果不存在会忽略这个数据
RPUSH/RPUSHX:将值插入到链表的尾部。同上,位置相反
LPOP:移除并获取链表中的第一个元素。
RPOP:移除并获取链表中最后一个元素。
LTRIM:保留指定区间内的元素。
LLEN:获取链表的长度。
LSET:用索引设置链表元素的值。
LINDEX:通过索引获取链表中的元素。
LRANGE:获取链表指定范围内的元素。

【秒杀业务程序】记录哪个用户参与了秒杀,同时记录时间,这样方便后续处理,用户的ID会存储到【Redis】的链表里进行排队,比如打算让前10个人秒杀成功,后面的人秒杀失败,这样让redis链表的长度保持为10就可以了,10个以后如果再到redis请求追加数据,那么程序上拒绝请求,在redis存取之后,后面的程序会对redis进行取值,因为数据不能长久放在缓存,后面有一个程序遍历处理redis的值,放入数据库永久保存,因为秒杀本来不会太长,可以用脚本循环扫描。
详细说明:
首先Redis程序会把用户的请求数据放入redis,主要是uid和微秒时间戳;然后检查redis链表的长度,超出长度就放弃处理;死循环数据读取redis链表的内容,入库。

数据库代码:

 CREATE TABLE `redis_queue` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `uid` int(11) NOT NULL DEFAULT ‘0‘,
  `time_stamp` varchar(24) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=88 DEFAULT CHARSET=utf8;

另外两个程序,一个是把用户的请求接收写入redis的程序,另一个是把redis的数据拿出写入数据库的程序。

接收用户请求的程序:

<?php

$redis = new Redis();
$redis->connect(‘127.0.0.1‘,6379);
$redis-_name = ‘miaosha‘;

//秒杀用户涌入模拟,100个用户
for ($i =0; $i < 100; $i++) {
    $uid = rand(1000000,99999999);
}
//检查redis链表长度(已存在数量)
$num = 10;
if ($redis->lLen($redis_name) < 10 ) {
    //加入链表尾部
    $redis->rPush($redis_name, $uid.‘%‘.microtime());
} else {  //如果达到10个
    //秒杀结束
}
$redis->close();

处理程序(拿redis数据写入mysql)

<?php
//从队列头部读一个值,判断这个值是否存在,如果存在则切割出时间、uid保存到数据库中。(对于redis而言,如果从redis取出这个值,那么这个值就不在redis队列里了,如果出现问题失败了,那么我们需要有一个机制把失败的数据重新放入redis链表中)
$redis = new Redis();
$redis->connect(‘127.0.0.1‘,6379);
$redis-_name = ‘miaosha‘;

//死循环检测redis队列
while(1) {
    $user = $redis->lpop($redis_name);
    if (!$user || $user == ‘null‘) {  //如果没有数据跳出循环
        //如果一直执行,速度是非常快的,那么服务器压力大,这里2秒一次
        sleep(2);
        //跳出循环
        continue;
    }
    //拿出微秒时间戳和uid
    $user_arr = explode(‘%‘, $user);
    $insert_data = array(
        ‘uid‘ => $user_arr[0];
        ‘time_stamp‘ => $user_arr[1];
    );
    $res = $db->insert(‘redis_queue‘, $insert_data);
    //如果插入失败
    if (!$res) {
        //从哪个方向取出,从哪个方向插回
        $redis->lpush($redis_name, $user);
        sleep(2);
    }
}
$redis->close();

测试的话,可以先执行循环检测脚本,然后再执行秒杀脚本开始测试,监测Mysql数据库的变化。

RabbitMQ:更专业额消息系统实现方案

可以看本人博客当前栏目了解

http://www.cnblogs.com/wt645631686/category/1171220.html

原文地址:https://www.cnblogs.com/yszr/p/8807513.html

时间: 2024-10-18 18:28:57

PHP(Mysql/Redis)消息队列的介绍及应用场景案例--转载的相关文章

mysql实现消息队列

mysql之消息队列 消息队列:在消息的传输过程中保存消息的容器. 消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它. 如图所示: 在不使用消息队列的情况下,用户的请求数据直接写入数据库,再高并发的情况下,会对数据库造成巨的压力,同时也使得响应延迟加剧.在使用消息队列后,用户请求的数据发送给消息队列后立即返回,再由消息队列的消费者进程(通常情况下,该进程独立部署在专门的服务器集

redis消息队列简单应用

参考 https://blog.yxccan.cn/blog/detail/3 一.什么是消息队列 是一个消息的链表,是一个异步处理的数据处理引擎. PS:可以理解为在redis的list列表中存放消息数据,然后按照排队方式先进先出(左进右出:右进左出) 二.可以使用的应用场景 主要应用一些延迟或异步操作的场景比如:发送邮件.发送短信.视频转码.图片转码.日志存储.导入数据等在发送邮件或者短信,我们不希望程序一直停留,等待发送成功才相应,而是异步进行处理,即:将待发送的邮件数据添加到消息队列中,

Redis 消息队列的实现

概述 Redis实现消息队列有两种形式: 广播订阅模式:基于Redis的 Pub/Sub 机制,一旦有客户端往某个key里面 publish一个消息,所有subscribe的客户端都会触发事件 集群订阅模式:基于Redis List双向+ 原子性 + BRPOP Redis消息队列时,当Redis宕机后,消息可能会丢失(也要看持久化的策略).如果收消息方未有重发和验证机制,Redis内的数据会出现丢失.所以,使用Redis的作为消息队列,通常是对于消息的准确性并非特别高的场景. 原理 基于Red

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

php mysql 实现消息队列

最近遇到一个批量发送短信的需求,短信接口是第三方提供的.刚开始想到,获取到手机号之后,循环调用接口发送不就可以了吗? 但很快发现问题:当短信数量很大时,不仅耗时,而且成功率很低. 于是想到,用PHP和MySQL实现一个消息队列,一条一条的发送短信.下面介绍具体的实现方法: 首先,建立一个数据表sms,包含以下字段: id, phone, //手机号 content //短信内容 将需要发送的短信和手机号存入sms表中. 接下来,需要用PHP实现一个定时器,定时读取一条记录,并发送短信: <?ph

PHP和MySQL实现消息队列

最近遇到一个批量发送短信的需求,短信接口是第三方提供的.刚开始想到,获取到手机号之后,循环调用接口发送不就可以了吗? 但很快发现问题:当短信数量很大时,不仅耗时,而且成功率很低. 于是想到,用PHP和MySQL实现一个消息队列,一条一条的发送短信.下面介绍具体的实现方法: 首先,建立一个数据表sms,包含以下字段: id, phone, //手机号 content //短信内容 将需要发送的短信和手机号存入sms表中. 接下来,需要用PHP实现一个定时器,定时读取一条记录,并发送短信: <?ph

PHP使用MySQL实现消息队列

消息队列常用在流量削峰(秒杀场景),异步通信等地方. 大体的结构如下: 类似于消费者和生产者的关系,首先生产者在消息队列未满的时候,才将生产的产品放进消息队列中:消费者在消息队列不为空的时候,才从消息队列中取出产品进行消费.出队的那个步骤常用的方法是一直轮询和定时操作. 这里举一个外卖送餐的案例: 有个生意很好的饭店,好到什么程度呢?一分钟有500人下单,这样的话,店家掌柜肯定处理不过来,于是,就先暂时不通知用户是够接单,先把所有的订单先存着,只告诉他们正在处理中,但是呢,还有一个问题,就是有一

Redis消息队列

一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式.利用redis这两种场景的消息队列都能够实现. 生产者消费者模式:生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息:即对于每个消息只能被最多一个消费者拥有: 发布者订阅者模式:发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息:即正常情况下每个消费者收到的消息应该都是一样的: 对应的使用场景包括:A系统向队列中存放数据:B系统在队列中取数据: 1.redis队列模式:可

java redis使用之利用jedis实现redis消息队列

应用场景 对于数据库查询的IO连接数高.连接频繁的情况,可以考虑使用缓存实现. 从网上了解到redis可以对所有的内容进行二进制的存储,而java是可以对所有对象进行序列化的,序列化的方法会在下面的代码中提供实现. 序列化 这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]数组反序列化成java对象: 主要是用到了ByteArrayOutputStream和ByteArrayInputStream: 需要注意的是每个自定义的需要序列化的对象都要实现Seria