Cache应用/任务Mutex,用于高并发任务处理经过多个项目使用

<?php
/**
 * Class Cache redis 用于报表的缓存基本存储和读写 2.0
 * <pre>
 *  Cache::read("diamond.account",$nick);
 *  Cache::readSync("diamond.account",$nick);
 *  $finder = Cache::createFinder("diamond.account",$nick);
 *  $finder->read();
 *  $finder->readSync();
 *
 *  Cache::save("diamond.account",$nick,$data);
 *  $storage = Cache::createStorage("diamond.account",$nick);
 *  $storage->save($data);
 *  $storage->save($data,7200);
 * </pre>
 * @category cache
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.0
 * @copyright oShine 2015/08/07
 */
class Cache {

    /**
     * 非安全读取的数据
     * @param $key
     * @param string $nick
     * @return array|null
     */
    public static function read($key,$nick = "sys"){
        $finder = self::createFinder($key,$nick);
        return $finder->read();
    }

    /**
     * 同步读取数据
     * @param $key
     * @param string $nick
     * @return mixed
     */
    public static function readSync($key,$nick = "sys"){
        $finder = self::createFinder($key,$nick);
        return $finder->readSync();
    }

    /**
     * 创建Finder
     * @param $key
     * @param string $nick
     * @return Finder
     */
    public static function createFinder($key,$nick = "sys"){
        $key = Generate::key($key,$nick);
        return new Finder($key);
    }

    /**
     * 创建Storage
     * @param $key
     * @param string $nick
     * @return Storage
     */
    public static function createStorage($key,$nick = "sys"){
        $key = Generate::key($key,$nick);
        return new Storage($key);
    }

    /**
     * 保存数据
     * @param $key
     * @param string $nick
     * @param array $data
     * @param int $expired
     * @return bool
     */
    public static function save($key,$nick = "sys",$data = array(),$expired=7200){
        $storage = self::createStorage($key,$nick);
        return $storage->save($data,$expired);
    }

    /**
     * @param string $nick
     */
    public static function clear($nick = "sys"){
        $redis = CacheFactory::create();
        $redis->del($redis->keys(md5($nick).".data.*"));
    }

}

/**
 * Class Finder  数据读取
 * @category cache
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.1
 * @copyright oShine 2015/08/07
 */
class Finder {

    /**
     * @var string $key
     */
    public $key;

    /**
     * @param string $key
     */
    public function __construct($key){
        $this->key = $key;
    }

    /**
     * 非安全读取数据
     * @return mixed
     */
    public function read(){
        $data = $this->readData();
        if($data->isRead === true && !$data->isExpired()) {
            return json_decode(json_encode($data->data), true);
        }
        return null;
    }

    /**
     * @return Data
     */
    protected function readData(){
        $redis =  CacheFactory::create();
        $rptData = new Data();
        $data = json_decode($redis->get($this->key));
        if(false == $data){
            $rptData->isRead = false;
            $rptData->expiredTime = time();
            $rptData->expired = 24*3600;
        }else{
            $rptData->expired = $data->expired;
            $rptData->isRead = isset($data->isRead) && $data->isRead === true?true:false;
            $rptData->expiredTime = $data->expiredTime;
            $rptData->data = $data->data;
        }
        return $rptData;
    }

    /**
     * 同步读取数据
     * @return mixed
     */
    public function readSync(){
        while(true){
            $rptData = $this->readData();
            if($rptData->isRead && !$rptData->isExpired())
                return $this->read();
            sleep(1);
        }
    }
}

/**
 * Class Storage  数据存储
 * @category cache
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.0
 * @copyright oShine 2015/08/07
 */
class Storage {

    /**
     * @var string key
     */
    public $key;

    /**
     * @param string $key
     */
    public function __construct($key){
      $this->key = $key;
    }

    /**
     * @return bool
     */
    public function flush(){
        $rptData = new Data();
        $rptData->data = null;
        $rptData->expiredTime = time();
        $rptData->isRead = false;
        $rptData->expired = 1;
        $redis =  CacheFactory::create();

        return $redis->setex($this->key, $rptData->expired,json_encode($rptData));
    }

    /**
     * 写入数据
     * @param $data
     * @param int $expired
     * @return bool
     */
    public function save($data,$expired=7200){

        $rptData = new Data();
        $rptData->data = $data;
        $rptData->expiredTime = time();
        $rptData->isRead = true;
        $rptData->expired = $expired;
        $redis = CacheFactory::create();

        return $redis->setex($this->key, $rptData->expired,json_encode($rptData));
    }
}

/**
 * Class Data redis存储数据实体
 * @category cache
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.0
 * @copyright oShine 2015/08/07
 */
class Data {
    /**
     * @var int $expired 失效间隔时长
     */
    public $expired;
    /**
     * @var int
     */
    public $expiredTime;
    /**
     * @var mixed 存储的具体数据
     */
    public $data;
    /**
     * @var bool 是否可以读取
     */
    public $isRead;

    /**
     * 是否失效
     * @return bool
     */
    public function isExpired(){
        if(time()-$this->expiredTime > $this->expired)
            return true;
        return false;
    }
}

/**
 * Class Generate key生成
 * @category cache
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.0
 * @copyright oShine 2015/08/07
 */
class Generate {
    /**
     * @static
     * @param $key
     * @param $nick
     * @return string
     */
    public static function key($key,$nick){
        return md5($nick).".data.".$key;
    }
}

CacheFactory:

<?php
/**
 * @category cache
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.0
 * @copyright oShine 2015/08/07
 */
class CacheFactory {

    /**
     * @var Redis $instance
     */
    private static $instance = null;

    /**
     * @return Redis
     */
    public static function create(){

        if(self::$instance == null){
            self::$instance = new Redis();
            self::$instance->connect(Yii::app()->params["RedisServerIP"]);
        }else{
            try{
                if(preg_match("/PONG/",self::$instance->ping())){
                    return self::$instance;
                }
            }catch (Exception $e){
                self::$instance = new Redis();
                self::$instance->connect(Yii::app()->params["RedisServerIP"]);
            }
        }
        return self::$instance;

    }

}

Mutex:用于任务锁,辅助任务处理,使用Cache记录任务状态,告别表锁任务标记

<?php

/**
 * Class Mutex 用于任务锁
 * @category mutex
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.0
 * @copyright oShine 2015/08/07
 * @example
 * Mutex::create($nick)->init("download")->wait()
 * Mutex::create($nick)->init("download")->doing()
 * Mutex::create($nick)->init("download")->done()
 * Mutex::create($nick)->init("download")->error()
 */
class Mutex {

    /**
     * @var string
     */
    private $nick;

    /**
     * @param string $nick
     */
    public function __construct($nick){
        $this->nick = $nick;
    }

    /**
     * @param $nick
     * @return Mutex
     */
    public static function create($nick){
        return new self($nick);
    }

    /**
     * @param $nick
     */
    public static function clear($nick){
        $redis =  CacheFactory::create();
        $redis->del($redis->keys(md5($nick).".mutex.*"));
    }

    /**
     * @param $key
     * @return MutexStorage
     */
    public function init($key){
        $key = md5($this->nick).".mutex.".$key;
        return new MutexStorage($key, CacheFactory::create());
    }

}

/**
 * Class MutexStorage
 * @category mutex
 * @package cache
 * @author oShine <[email protected]>
 * @version 2.0.0.0
 * @copyright oShine 2015/08/07
 */
class MutexStorage {

    private $key;

    /**
     * @var Redis $redis
     */
    private $redis;

    public function __construct($key,$redis){
        $this->key = $key;
        $this->redis = $redis;
    }

    /**
     * @return $this
     * @throws Exception
     */
    public function wait(){
        $this->save("WAIT");
        return $this;
    }

    /**
     * @return $this
     * @throws Exception
     */
    public function doing(){
        $this->save("DOING");
        return $this;
    }

    /**
     * @return $this
     * @throws Exception
     */
    public function done(){
        $this->save("DONE");
        return $this;
    }

    /**
     * @return $this
     * @throws Exception
     */
    public function error(){
        $this->save("ERROR");
        return $this;
    }

    /**
     * @param $data
     * @return $this
     * @throws Exception
     */
    protected function save($data){

        $data = json_encode(array("status"=>$data,"date"=>date("Y-m-d"),"timestamp"=>time()));

        $flag = $this->redis->setex($this->key,3*24*2400,$data);
        if(!$flag)
            throw new Exception("SAVE Error!");
        return $this;
    }

    /**
     * @return string|null
     */
    protected function get(){

        $data = json_decode($this->redis->get($this->key),true);
        if(strtotime(date("Y-m-d")) == strtotime($data["date"])){
            return $data["status"];
        }
        return null;
    }

    /**
     * @return bool
     */
    public function isDoing(){
        $data = json_decode($this->redis->get($this->key),true);
        if(isset($data) && isset($data["status"]) && $data["status"] == "DOING" && isset($data["timestamp"]) && (time()-$data["timestamp"])<60)
            return true;
        return false;
    }

    /**
     * @return bool
     */
    public function isDone(){
        $status = $this->get();
        if(isset($status) && $status == "DONE")
            return true;
        return false;
    }

    /**
     * @return bool
     */
    public function isError(){
        $status = $this->get();
        if(isset($status) && $status == "ERROR")
            return true;
        return false;
    }
}
时间: 2024-08-01 22:05:15

Cache应用/任务Mutex,用于高并发任务处理经过多个项目使用的相关文章

Memcache的mutex设计模式 -- 高并发解决方案

场景 Mutex主要用于有大量并发访问并存在cache过期的场合,如 首页top 10, 由数据库加载到memcache缓存n分钟: 微博中名人的content cache, 一旦不存在会大量请求不能命中并加载数据库: 需要执行多个IO操作生成的数据存在cache中, 比如查询db多次: 问题 在大并发的场合,当cache失效时,大量并发同时取不到cache,会同一瞬间去访问db并回设cache,可能会给系统带来潜在的超负荷风险.我们曾经在线上系统出现过类似故障. 解决方法  方法一 高并发时,

Java大型互联网-构建高并发和高可用的电商平台架构实践原理

并发,在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行. "高可用性"(High Availability)通常来描述一个系统经过专门的设计,从而减少停工时间,而保持其服务的高度可用性. 一. 设计理念 1. 空间换时间 多级缓存,静态化 客户端页面缓存(http header中包含Expires/Cache of Control,last modified(304,server不返

程序员面试,为什么不跟我谈高并发?

作为一个看过几千份简历,面试过几百人的面试官,常常会看到简历中有如下文字: 对业务逻辑解耦,高并发等有比较深入的研究和丰富的开发实战经验 对解决高并发问题有深入理解 熟悉大并发技术,如:反向代理.负载均衡.Keepalived 而当我在面试中,问及对方的职业规划的时候,也有一大半人会回答 希望将来可以处理高并发业务 希望学习高并发相关技术 希望开发数千万/数亿级别并发的应用 但是当我问及以下问题的时候,绝大多数人都会麻爪: 负载均衡有几种分配方式?(大概不到1/10的简历提及高并发的人能答出来)

程序员面试,为什么不要大谈高并发?

作为一个看过几千份简历,面试过几百人的面试官,常常会看到简历中有如下文字: 对业务逻辑解耦,高并发等有比较深入的研究和丰富的开发实战经验 对解决高并发问题有深入理解 熟悉大并发技术,如:反向代理.负载均衡.Keepalived 而当我在面试中,问及对方的职业规划的时候,也有一大半人会回答 希望将来可以处理高并发业务 希望学习高并发相关技术 希望开发数千万/数亿级别并发的应用 但是当我问及以下问题的时候,绝大多数人都会麻爪: 负载均衡有几种分配方式?(大概不到1/10的简历提及高并发的人能答出来)

Java 高并发缓存与Guava Cache

一.背景 缓存是我们在开发中为了提高系统的性能,把经常的访问业务的数据第一次把处理结果先放到缓存中,第二次就不用在对相同的业务数据在重新处理一遍,这样就提高了系统的性能.缓存分好几种: (1)本地缓存. (2)数据库缓存. (3)分布式缓存. 分布式缓存比较常用的有memcached等,memcached是高性能的分布式内存缓存服务器,缓存业务处理结果,减少数据库访问次数和相同复杂逻辑处理的时间,以提高动态Web应用的速度. 提高可扩展性. 二.本地缓存在高并发下的问题以及解决 今天我们介绍的是

构建高并发高可用的电商平台架构实践

从各个角度总结了电商平台中的架构实践,由于时间仓促,定了个初稿,待补充完善,欢迎大家一起交流. 转载请声明出处:http://blog.csdn.net/yangbutao/article/details/12242441 作者:杨步涛 关注分布式架构.大数据.搜索.开源技术 QQ:306591368 技术Blog:http://blog.csdn.net/yangbutao 一. 设计理念 1.      空间换时间 1)      多级缓存,静态化 客户端页面缓存(http header中包

高并发电子商务平台技术架构

原文出自:http://blog.csdn.net/yangbutao/article/details/12242441 http://stamen.iteye.com/blog/1525924 我自己的大型B2B和B2C站点原来也是用Hibernate,可是后来不得不换成mybatis, 第一是用Hibernate 因为它封装得太高了.非常多东西是隐式进行的.常常引起问题,非常难定位.毕竟凡事有利必有弊: 第二大型站点肯定不是一个数据库.这点Hibernate是非常麻烦的,用Jdbc或Myba

构建高并发高可用的电商平台架构实践(上)

构建高并发高可用的电商平台架构实践(上) 一. 设计理念 1.      空间换时间 1)      多级缓存,静态化 客户端页面缓存(http header中包含Expires/Cache of Control,last modified(304,server不返回body,客户端可以继续用cache,减少流量),ETag) 反向代理缓存 应用端的缓存(memcache) 内存数据库 Buffer.cache机制(数据库,中间件等) 2)      索引 哈希.B树.倒排.bitmap 哈希索

构建高并发高可用的架构

从各个角度总结了电商平台中的架构实践,由于时间仓促,定了个初稿,待补充完善,欢迎大家一起交流. 转载请声明出处:http://blog.csdn.net/yangbutao/article/details/12242441 作者:杨步涛 关注分布式架构.大数据.搜索.开源技术 QQ:306591368 技术Blog:http://blog.csdn.net/yangbutao 一. 设计理念 1.      空间换时间 1)      多级缓存,静态化 客户端页面缓存(http header中包