redis 队列缓存 + mysql 批量入库 + php 离线整合

问题分析

思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择;但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试。

解决:

  • 问题一:要求日志最好入库;但是,直接入库mysql确实扛不住,批量入库没有问题,done。【批量入库和直接入库性能差异参考文章
  • 问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚。
  • 问题三:日志量毕竟大,保存最近30条足矣,决定用php写个离线统计和清理脚本。

一:设计数据库表和存储

  • 考虑到log系统对数据库的性能更多一些,稳定性和安全性没有那么高,存储引擎自然是只支持select insert 没有索引的archive。如果确实有update需求,也可以采用myISAM。
  • 考虑到log是实时记录的所有数据,数量可能巨大,主键采用bigint,自增即可
  • 考虑到log系统以写为主,统计采用离线计算,字段均不要出现索引,因为一方面可能会影响插入数据效率,另外读时候会造成死锁,影响写数据。

二:redis存储数据形成消息队列

由于高并发,尽可能简单,直接,上代码。

<?php
/***************************************************************************
*
* 获取到的调用日志,存入redis的队列中.
* $Id$
*
**************************************************************************/

/**
* @file saveLog.php
* @date 2015/11/06 20:47:13
* @author:cuihuan
* @version $Revision$
* @brief
*
**/

// 获取info
$interface_info = $_GET[‘info‘];

// 存入redis队列
$redis = new Redis();
$redis->connect(‘xx‘, 6379);
$redis->auth("password");

// 加上时间戳存入队列
$now_time = date("Y-m-d H:i:s");
$redis->rPush("call_log", $interface_info . "%" . $now_time);
$redis->close();

/* vim: set ts=4 sw=4 sts=4 tw=100 */
?>

三:数据定时批量入库。

定时读取redis消息队列里面的数据,批量入库。

<?php
/**
* 获取redis消息队列中的脚本,拼接sql,批量入库。
* @update 2015-11-07 添加失败消息队列回滚机制
*
* @Author:cuihuan
* 2015-11-06
* */

// init redis
$redis_xx = new Redis();
$redis_xx->connect(‘ip‘, port);
$redis_xx->auth("password");

// 获取现有消息队列的长度
$count = 0;
$max = $redis_xx->lLen("call_log");

// 获取消息队列的内容,拼接sql
$insert_sql = "insert into fb_call_log (`interface_name`, `createtime`) values ";

// 回滚数组
$roll_back_arr = array();

while ($count < $max) {
$log_info = $redis_cq01->lPop("call_log");
$roll_back_arr = $log_info;
if ($log_info == ‘nil‘ || !isset($log_info)) {
$insert_sql .= ";";
break;
}

// 切割出时间和info
$log_info_arr = explode("%",$log_info);
$insert_sql .= " (‘".$log_info_arr[0]."‘,‘".$log_info_arr[1]."‘),";
$count++;
}

// 判定存在数据,批量入库
if ($count != 0) {
$link_2004 = mysql_connect(‘ip:port‘, ‘user‘, ‘password‘);
if (!$link_2004) {
die("Could not connect:" . mysql_error());
}

$crowd_db = mysql_select_db(‘fb_log‘, $link_2004);
$insert_sql = rtrim($insert_sql,",").";";
$res = mysql_query($insert_sql);

// 输出入库log和入库结果;
echo date("Y-m-d H:i:s")."insert ".$count." log info result:";
echo json_encode($res);
echo "</br>\n";

// 数据库插入失败回滚
if(!$res){
foreach($roll_back_arr as $k){
$redis_xx->rPush("call_log", $k);
}
}

// 释放连接
mysql_free_result($res);
mysql_close($link_2004);
}

// 释放redis
$redis_cq01->close();
?>

四:离线天级统计和清理数据脚本

?php
/**
* static log :每天离线统计代码日志和删除五天前的日志
*
* @Author:cuihuan
* 2015-11-06
* */

// 离线统计
$link_2004 = mysql_connect(‘ip:port‘, ‘user‘, ‘pwd‘);
if (!$link_2004) {
die("Could not connect:" . mysql_error());
}

$crowd_db = mysql_select_db(‘fb_log‘, $link_2004);

// 统计昨天的数据
$day_time = date("Y-m-d", time() - 60 * 60 * 24 * 1);
$static_sql = "get sql";

$res = mysql_query($static_sql, $link_2004);

// 获取结果入库略

// 清理15天之前的数据
$before_15_day = date("Y-m-d", time() - 60 * 60 * 24 * 15);
$delete_sql = "delete from xxx where createtime < ‘" . $before_15_day . "‘";
try {
$res = mysql_query($delete_sql);
}catch(Exception $e){
echo json_encode($e)."\n";
echo "delete result:".json_encode($res)."\n";
}

mysql_close($link_2004);
?>

五:代码部署

主要是部署,批量入库脚本的调用和天级统计脚本,crontab例行运行。

# 批量入库脚本
*/2 * * * * /home/cuihuan/xxx/lamp/php5/bin/php /home/cuihuan/xxx/batchLog.php >>/home/cuihuan/xxx/batchlog.log

# 天级统计脚本
0 5 * * * /home/cuihuan/xxx/php5/bin/php /home/cuihuan/xxx/staticLog.php >>/home/cuihuan/xxx/staticLog.log

总结:相对于其他复杂的方式处理高并发,这个解决方案简单有效:通过redis缓存抗压,mysql批量入库解决数据库瓶颈,离线计算解决统计数据,通过定期清理保证库的大小。

时间: 2024-10-13 23:27:39

redis 队列缓存 + mysql 批量入库 + php 离线整合的相关文章

转载:【高并发简单解决方案 | 靠谱崔小拽 】redis队列缓存 + mysql 批量入库 + php离线整合

需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮. 问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试. 解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接入库性能差异参考文章] 问题二:批量入库就需要有高并发的消息队列,决定

【高并发简单解决方案】redis队列缓存 + mysql 批量入库 + php离线整合

原文地址 :https://segmentfault.com/a/1190000004136250需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮. 问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试. 解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题

高并发简单解决方案————redis队列缓存+mysql 批量入库

问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试. 解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接入库性能差异] 问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚. 问题三:日志量毕竟大,保存最近30条足矣,决定用php写个离线统计和清理脚本. done,下面是小拽的

mysql批量入库问题

mysql批量入库,数据量太多出错 Too many arguments. 7 SQLSTATE[HY000]: General error: 1390 Prepared statement contains too many placeholders /data/app/framework/thinkphp/library/think/db/Connection.php 451 https://blog.csdn.net/qq_18875541/article/details/69392823

springboot2.0+redis实现消息队列+redis做缓存+mysql

本博客仅供参考,本人实现没有问题. 1.环境 先安装redis.mysql 2.springboot2.0的项目搭建(请自行完成),本人是maven项目,因此只需配置,获取相应的jar包,配置贴出. <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifact

redis缓存队列+MySQL +php任务脚本定时批量入库

原文地址:http://blog.jobbole.com/99567/ 需求背景:有个调用统计日志存储和统计需求,要求存储到mysql中:存储数据高峰能达到日均千万,瓶颈在于直接入库并发太高,可能会把mysql干垮. 问题分析 思考:应用网站架构的衍化过程中,应用最新的框架和工具技术固然是最优选择:但是,如果能在现有的框架的基础上提出简单可依赖的解决方案,未尝不是一种提升自我的尝试.解决: 问题一:要求日志最好入库:但是,直接入库mysql确实扛不住,批量入库没有问题,done.[批量入库和直接

使用redis作为缓存,数据还需要存入数据库中吗?(转)

转自https://blog.csdn.net/wypersist/article/details/79955704 使用redis作为缓存,数据还需要存入数据库中吗? 我的答案是: 1redis只是缓存,不是数据库如mysql,所以redis中有的数据库,mysql中一定有. 2用户请求先去请求redis,如果没有,再去数据库中去读取. 3redis中缓存一些请求量比较大的数据(这些缓存数据,mysql中一定也是有的),没必要所有数据都缓存到redis中. 5之所以从缓存中拿数据会快,是因为缓

redis缓存mysql

Redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合)和zset(有序集合).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且

Java-No.14 基于redis分布式缓存队列实现抢红包功能

package com.shma.util; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; public class ObjectUtil {      /**对象转byte[]      * @par