四 分析easyswoole源码(启动服务&Cache组件原理)

前文提到的在系统设置Cache组件 Cache::getInstance()的时候

Cache是以单例模式实现的。构造器会进行如下操作

//根据配置创建指定数目的Cache服务进程,然后启动。
$num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默认配置数目是1,在Config.php里‘EASY_CACHE.PROCESS_NUM‘=>1
if($num <= 0){
   return;
}
$this->cliTemp = new SplArray();//这个数组以后会给单元测试时候单独使用,正常模式这个数组是不使用的
//若是在主服务创建,而非单元测试调用
if(ServerManager::getInstance()->getServer()){
    //创建了一个swoole_table ,表名为__Cache,里面存储data(后面就讲到其实这里存储的是操作Cache的指令)作用是用来做GC(防止Cache被撑爆)
    TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[
        ‘data‘=>[
            ‘type‘=>Table::TYPE_STRING,
            ‘size‘=>10*1024
        ],
        ‘microTime‘=>[
            ‘type‘=>Table::TYPE_STRING,
            ‘size‘=>15
        ]
    ],2048);
    $this->processNum = $num;
    for ($i=0;$i < $num;$i++){
        ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class);
    }
}

ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)这句话才是Cache的核心逻辑。

ProcessManager::getInstance()这句话主要做了下面的操作
ProcessManager 的__construct构造函数创建了一个swoole_table,表名是process_hash_map

TableManager::getInstance()->add(
    ‘process_hash_map‘,[
        ‘pid‘=>[
            ‘type‘=>Table::TYPE_INT,
            ‘size‘=>10
        ]
    ],256
);

addProcess($this->generateProcessName($i),CacheProcess::class);
$this->generateProcessName($i)这个代码很简单就是根据$i来设置进程名称
addProcess 是在processList存储CacheProcess::class的实例,具体代码如下

$key = md5($processName);
if(!isset($this->processList[$key])){
    try{

        $process = new $processClass($processName,$args,$async);
        $this->processList[$key] = $process;
        return true;
    }catch (\Throwable $throwable){
        Trigger::throwable($throwable);
        return false;
    }
}else{
    trigger_error("you can not add the same name process : {$processName}.{$processClass}");
    return false;
}

那么CacheProcess::class的实例话做了什么操作呢
$this->cacheData = new SplArray();//这里很关键,为什么这么说每个Cache进程实际保存的缓存值都是在这里的,每个Cache进程都有自己的一个cacheData数组
$this->persistentTime = Config::getInstance()->getConf(‘EASY_CACHE.PERSISTENT_TIME‘);
parent::__construct($processName, $args);
CacheProcess::class继承于AbstractProcess
AbstractProcess的构造方法

$this->async = $async;
$this->args = $args;
$this->processName = $processName;
$this->swooleProcess = new \swoole_process([$this,‘__start‘],false,2);
ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服务会addProcess一个Cache的任务进程。

__start方法主要是给swoole_table,表名为process_hash_map插入当前CacheProcess的进程名为key,进程IDpid为value。并且注册进程退出的事件。

if(PHP_OS != ‘Darwin‘){
    $process->name($this->getProcessName());
}
TableManager::getInstance()->get(‘process_hash_map‘)->set(
    md5($this->processName),[‘pid‘=>$this->swooleProcess->pid]
);
ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);
if (extension_loaded(‘pcntl‘)) {
    pcntl_async_signals(true);
}
Process::signal(SIGTERM,function ()use($process){
    $this->onShutDown();
    TableManager::getInstance()->get(‘process_hash_map‘)->del(md5($this->processName));
    swoole_event_del($process->pipe);
    $this->swooleProcess->exit(0);
});
if($this->async){
    swoole_event_add($this->swooleProcess->pipe, function(){
        $msg = $this->swooleProcess->read(64 * 1024);
        $this->onReceive($msg);
    });
}
$this->run($this->swooleProcess);

$this->run($this->swooleProcess)这个函数是CacheProcess如果配置了persistentTime,就会开启一个定时器定时去取$file = Config::getInstance()->getConf(‘TEMP_DIR‘)."/{$processName}.data";的数据备份,默认是0也就是不会去做定时数据落地的操作

看到这里才是Cache组件在第一次实例化的时候做的相关事情,总结就是创建了指定数量的Cache进程绑定到swoole服务器上。在全局的process_hash_map表中能找到对应的Cache进程ID。然后Cache进程是可以以管道方式来进行通信。

set缓存方法

public function set($key,$data)
{
    if(!ServerManager::getInstance()->isStart()){
        $this->cliTemp->set($key,$data);
    }
    if(ServerManager::getInstance()->getServer()){
        $num = $this->keyToProcessNum($key);
        $msg = new Msg();
        $msg->setCommand(‘set‘);
        $msg->setArg(‘key‘,$key);
        $msg->setData($data);
        ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(\swoole_serialize::pack($msg));//直接把需要缓存的数据,封装成msg然后write给hash映射到的Cache进程
    }
}

当进程获取到的时候会回调onReceive方法

public function onReceive(string $str,...$agrs)
{
    // TODO: Implement onReceive() method.

    $msg = \swoole_serialize::unpack($str);
    $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME);
    if(count($table) > 1900){
        //接近阈值的时候进行gc检测
        //遍历Table 依赖pcre 如果发现无法遍历table,检查机器是否安装pcre-devel
        //超过0.1s 基本上99.99%为无用数据。
        $time = microtime(true);
        foreach ($table as $key => $item){
            if(round($time - $item[‘microTime‘]) > 0.1){
                $table->del($key);
            }
        }
    }
    if($msg instanceof Msg){
        switch ($msg->getCommand()){
            case ‘set‘:{
                $this->cacheData->set($msg->getArg(‘key‘),$msg->getData());
                break;
            }
            case ‘get‘:{
                $ret = $this->cacheData->get($msg->getArg(‘key‘));
                $msg->setData($ret);
                $table->set($msg->getToken(),[
                    ‘data‘=>\swoole_serialize::pack($msg),
                    ‘microTime‘=>microtime(true)
                ]);
                break;
            }
            case ‘del‘:{
                $this->cacheData->delete($msg->getArg(‘key‘));
                break;
            }
            case ‘flush‘:{
                $this->cacheData->flush();
                break;
            }
            case ‘enQueue‘:{
                $que = $this->cacheData->get($msg->getArg(‘key‘));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg(‘key‘),$que);
                }
                $que->enqueue($msg->getData());
                break;
            }
            case ‘deQueue‘:{

                $que = $this->cacheData->get($msg->getArg(‘key‘));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg(‘key‘),$que);
                }
                $ret = null;
                if(!$que->isEmpty()){
                    $ret = $que->dequeue();
                }
                $msg->setData($ret);
                //deQueue 有cli 服务未启动的请求,但无token
                if(!empty($msg->getToken())){
                    $table->set($msg->getToken(),[
                        ‘data‘=>\swoole_serialize::pack($msg),
                        ‘microTime‘=>microtime(true)
                    ]);
                }
                break;
            }
            case ‘queueSize‘:{
                $que = $this->cacheData->get($msg->getArg(‘key‘));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                }
                $msg->setData($que->count());
                $table->set($msg->getToken(),[
                    ‘data‘=>\swoole_serialize::pack($msg),
                    ‘microTime‘=>microtime(true)
                ]);
                break;
            }
        }
    }
}

这里一开始会进行缓存GC确保内存不会撑爆

set方法会直接给$this->cacheData,设置缓存值。

get方法比较特殊,它会去给Cache进程发送get的命令,然后Cache读取到命令会将值写到_Cache,Swoole_table表中。然后再去读取(这个会有一个while循环,类似自旋)出缓存内容。这样的好处,可以确保可以读取到当时的数据缓存,不会因为高并发读取到最新的缓存值内容。而且还能更有效的做gc,防止Cache内存撑爆。

public function get($key,$timeOut = 0.01)
{
    if(!ServerManager::getInstance()->isStart()){
        return $this->cliTemp->get($key);
    }
    $num = $this->keyToProcessNum($key);
    $token = Random::randStr(9);//这个是一个凭证,是确保获取到自己此刻想获取的cache数据,和事务类似为了保证可重复读
    $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num));
    $msg = new  Msg();
    $msg->setArg(‘timeOut‘,$timeOut);
    $msg->setArg(‘key‘,$key);
    $msg->setCommand(‘get‘);
    $msg->setToken($token);
    $process->getProcess()->write(\swoole_serialize::pack($msg));
    return $this->read($token,$timeOut);
}

$process->getProcess()->write(\swoole_serialize::pack($msg))发这个包给Cache进程,Cache进程会进行下面这些操作

$ret = $this->cacheData->get($msg->getArg(‘key‘));//获取到当前的缓存值
$msg->setData($ret);
//将当前的内容设置到_Cache表中,token是请求的时候发过来的凭证原样拼装。这有什么好处呢,就是确保在高并发下,在A时刻获取的缓存,不会拿到后面B时刻更新的值。
$table->set($msg->getToken(),[
    ‘data‘=>\swoole_serialize::pack($msg),
    ‘microTime‘=>microtime(true)
]);

$this->read($token,$timeOut);
//这里的操作是直接从_Cache表中获取缓存数据,如果缓存存在并且进程调度没有超时,然后在表中将取过数据的内容删除掉返回
private function read($token,$timeOut)
{
    $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME);
    $start = microtime(true);
    $data = null;
    while(true){
        usleep(1);
        if($table->exist($token)){
            $data = $table->get($token)[‘data‘];
            $data = \swoole_serialize::unpack($data);
            if(!$data instanceof Msg){
                $data = null;
            }
            break;
        }
        if(round($start - microtime(true),3) > $timeOut){
            break;
        }
    }
    $table->del($token);
    if($data){
        return $data->getData();
    }else{
        return null;
    }
}

原文地址:https://www.cnblogs.com/gavinjunftd/p/9438826.html

时间: 2024-10-29 02:01:57

四 分析easyswoole源码(启动服务&Cache组件原理)的相关文章

分析easyswoole源码

分析easyswoole源码 1以启动为例 //检查是否已经安装 installCheck();//检查锁文件是否存在,不存在结束 //启动服务 serverStart showLogo();//显示logo $conf = Conf::getInstance();//获取config单例 $inst = Core::getInstance()->initialize();//获取Core(核心类)的单例并且initialize //这一步干了这些事情 //__construct() Core的

网狐6878完整大厅源码+完美服务端组件+金蟾捕鱼(完美库存)

网狐6878完整大厅源码+完美服务端组件+金蟾捕鱼(完美库存+前台控制)运营版!源码完整,可有朋友架设成功.看图,带多款游戏.本源码不提供技术服务请自行解决.大家互相学习一起研究. 源码下载:http://www.yxkfw.com/thread-4773-1-1.html

zookeeper源码之服务端启动模块

服务端启动模块主要负责解析配置文件,启动服务器监听并执行zookeeper命令. 类图 QuorumPeerMain QuorumPeerMain是服务端主程序,主要功能是解析配置文件,启动zookeeper服务.内部使用QuorumPeerConfig来解析配置文件:使用QuorumPeer来解析命令:使用QuorumPeer来启动zookeeper服务. QuorumPeerConfig 解析properties配置文件zoo.cfg,主要获取一下信息: 配置 说明 dataDir 数据存放

【一起学源码-微服务】Nexflix Eureka 源码十三:Eureka源码解读完结撒花篇~!

前言 想说的话 [一起学源码-微服务-Netflix Eureka]专栏到这里就已经全部结束了. 实话实说,从最开始Eureka Server和Eureka Client初始化的流程还是一脸闷逼,到现在Eureka各种操作都了然于心了. 本专栏从12.17开始写,一直到今天12.30(文章在平台是延后发布的),这将近半个月的时间确实收获很多.每天都会保持一定的时间学习,只要肯下功夫,没有学不会的东西. 2020年将继续保持学习的节奏,自己定的目标是把spring cloud几个重要的组件都学一遍

OpenStack_Swift源码分析——Object-auditor源码分析(2)

1 Object-aduitor审计具体分析 上一篇文章中,讲解了Object-aduitor的启动,其中审计的具体执行是AuditorWorker实现的,在run_audit中实例化了AuditorWorker类,并调用audit_all_objects方法,下面看此方法的具体代码实现: def audit_all_objects(self, mode='once', device_dirs=None): #run_forever传过来的mode 为forever description =

Android源码分析--MediaServer源码分析(二)

在上一篇博客中Android源码分析–MediaServer源码分析(一),我们知道了ProcessState和defaultServiceManager,在分析源码的过程中,我们被Android的Binder通信机制中的各种复杂的类关系搞的眼花缭乱,接下来我们就以MediaPlayerService为例来分析一下Binder的通信机制.首先来回顾一下: BpBinder和BBinder都是Android中Binder通信的代表类,其中BpBinder是客户端用来与Server交互的代理类,p代

OpenStack_Swift源码分析——ObjectReplicator源码分析(2)

1.Replicator执行代码详细分析 上篇问中介绍了启动Replicator的具体过程,下面讲解Replicator的执行代码的具体实现,首先看replicate方法: def replicate(self, override_devices=None, override_partitions=None): """Run a replication pass""" self.start = time.time() self.suffix_co

【一起学源码-微服务】Nexflix Eureka 源码十:服务下线及实例摘除,一个client下线到底多久才会被其他实例感知?

前言 前情回顾 上一讲我们讲了 client端向server端发送心跳检查,也是默认每30钟发送一次,server端接收后会更新注册表的一个时间戳属性,然后一次心跳(续约)也就完成了. 本讲目录 这一篇有两个知识点及一个疑问,这个疑问是在工作中真真实实遇到过的. 例如我有服务A.服务B,A.B都注册在同一个注册中心,当B下线后,A多久能感知到B已经下线了呢? 不知道大家有没有这个困惑,这篇文章最后会对此问题答疑,如果能够看到文章的结尾,或许你就知道答案了,当然答案也会在结尾揭晓. 目录如下: C

如何分析SpringBoot源码模块及结构?--SpringBoot源码(二)

注:该源码分析对应SpringBoot版本为2.1.0.RELEASE 1 前言 本篇接如何搭建自己的SpringBoot源码调试环境?--SpringBoot源码(一). 前面搭建好了自己本地的SpringBoot源码调试环境后,此时我们不要急着下手进入到具体的源码调试细节中,刚开始阅读源码,此时我们一定要对项目结构等有一个整体的认识,然后再进行源码分析调试.推荐阅读下笔者之前写的的分析开源项目源码,我们该如何入手分析?一文,干货满满哦. 2 SpringBoot源码模块一览 我们先来对Spr