Linux C++ 实现时间轮 优化超时检测机制

参考资料:

http://www.ijilei.com/8357

https://www.zhihu.com/question/38427301

https://www.ibm.com/developerworks/cn/linux/l-cn-timers/

http://www.cnblogs.com/processakai/archive/2012/04/11/2442294.html

思路和代码的编写主要是参考的csdn上的一个java的代码

http://blog.csdn.net/mindfloating/article/details/8033340

我参考java代码的思路来编写的C++代码 ,说来其实也没啥技术难度,把代码分享下供后来的技术人使用

我使用c++ 实现的时间轮主要是用于检测超时,是tcp会话的超时与否

所以定义一个sessionKey作为唯一的主键key.

因为我使用了C++11中的unordered_map,所以需要重载operator == 操作符,并自己编写hash函数,也就是SessionHash中的函数

class Sessionkey{
public:
    Sessionkey(){srcIp=dstIp=srcPort=dstPort = 0;};
    Sessionkey(const Sessionkey& skey);
    Sessionkey(uint32_t src,uint32_t dst,uint16_t sp,uint16_t dp):srcIp(src),dstIp(dst),srcPort(sp),dstPort(dp){}

    bool operator == (const Sessionkey &skey) const;

public:
    uint32_t srcIp;
    uint32_t dstIp;
    uint16_t srcPort;
    uint16_t dstPort;
};

class SessionHash{
public:
    size_t operator()(const Sessionkey& sk) const;
};

include “sessionKey.h”

include “murmurHash.h”

Sessionkey::Sessionkey(const Sessionkey& sk)

{

srcIp = sk.srcIp;

dstIp = sk.dstIp;

srcPort = sk.srcPort;

dstPort = sk.dstPort;

}

bool Sessionkey::operator==(const Sessionkey& sk) const

{

return (srcIp == sk.srcIp) && (dstIp == sk.dstIp)

&& (srcPort == sk.srcPort) && (dstPort == sk.dstPort);

}

/通过Sessionkey的全部成员构造hash值,切勿随意在Sessionkey类中添加成员变量/

size_t SessionHash::operator()(const Sessionkey& sk) const

{

ull64_t ul64 = murmurHash64A(&sk, sizeof(sk), 0xee6b27eb);

return ul64;

}

哈希值生成算法采用的是

ull64_t murmurHash64A ( const void * key, int len, ull64_t seed ){

//const uint64_t m = BIG_CONSTANT(0xc6a4a7935bd1e995);

const ull64_t m = 0xc6a4a7935bd1e995;

const int r = 47;

ull64_t h = seed ^ (len * m);

const ull64_t * data = (const ull64_t *)key;
const ull64_t * end = data + (len/8);

while(data != end)
{
    ull64_t k = *data++;

    k *= m;
    k ^= k >> r;
    k *= m; 

    h ^= k;
    h *= m;
}

const unsigned char * data2 = (const unsigned char*)data;

switch(len & 7)
{
case 7: h ^= ull64_t(data2[6]) << 48;
case 6: h ^= ull64_t(data2[5]) << 40;
case 5: h ^= ull64_t(data2[4]) << 32;
case 4: h ^= ull64_t(data2[3]) << 24;
case 3: h ^= ull64_t(data2[2]) << 16;
case 2: h ^= ull64_t(data2[1]) << 8;
case 1: h ^= ull64_t(data2[0]);
    h *= m;
};

h ^= h >> r;
h *= m;
h ^= h >> r;

return h;

}

#ifndef __TIME_WHEEL_H__
#define __TIME_WHEEL_H__

#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>

#include <string>
#include <vector>
#include <unordered_map>

#include <mutex>

#include <sessionKey.h>
#include <glog/logging.h>
#include "lfds611.h"

using namespace std;

/****************************全局函数定义区**********************************/
#define TIMEOUT_SESSKEY_CNT  1000 //default
#define SESSKEY_BUFFER_CNT  2000

void *tickStepThreadGlobal(void* param);

typedef std::unordered_map<Sessionkey,int,SessionHash> tDurationMap;

/*定义槽类Slot*/
class Slot
{
public:
    Slot();
    Slot(int nId);
    ~Slot();
    void addElement(Sessionkey& key,int num);
    void removeElement(Sessionkey& key);

public:
    /*unordered_map的int是预留字段*/
    tDurationMap slotDurationMap;//sessionKey和int
    int id;
};

typedef std::unordered_map<Sessionkey,Slot*,SessionHash> tRelationMap;

/*时间轮类*/
class CTimeWheel
{
public:
    CTimeWheel();
    ~CTimeWheel();

    /*tickDuration:一个tick持续时间    ticksPerWheel 一轮的tick数(会话超时时间)       timeUnit:时间单位,如毫秒*/
    //构造函数中开启tick step线程
    CTimeWheel(int tickDuration,int ticksPerWheel,int timeUnit);
public:
    /*添加元素,返回新加入元素的timeout时间*/
    long addElement(Sessionkey& key,int num);
    bool removeElement(Sessionkey& key);

    /*开启线程步进tick数*/
    void tickStepRun();

private:
    void waitForNextTick();
    int getPreviousTickIndex();
    bool checkAdd(Sessionkey& key);
    void notifyExpired(int idx);//此处有疑问
    /*返回值以秒为单位*/
    long getCurrentTime();

public:
    /*读写加锁*/
    mutex mtx;

    /*时间轮,元素是Slot类型*/
    std::vector<Slot*> wheel;

    /*维护sessionKey和slot槽对应关系的哈希表*/
    tRelationMap keyToSlotMap;

    /*存储超时会话的SessionKey*/
    struct lfds611_queue_state *timeoutSessionQueue;

    Sessionkey  *sessKeyPool;

private:
    uint32_t tickDuration;
    uint32_t ticksPerWheel;
    uint32_t currentTickIndex;
    pthread_t tickThread;

    long startTime;
    long tick;
};

#endif /* __STW_TIMER_H__ */
#include "timeWheel.h"

void *tickStepThreadGlobal(void* param)
{
    CTimeWheel* pThis = (CTimeWheel*)param;
    pThis->tickStepRun();

    return NULL;
}

Slot::Slot()
{
    id = 0;
}

Slot::Slot(int nId)
{
    id = nId;
}

Slot::~Slot()
{

}

void Slot::addElement(Sessionkey& key,int num)
{
    if(0 == key.dstPort || 0 == key.srcPort)
    {
        VLOG(4)<<"addElement ERROR! 8888888888888888888888";
    }

    slotDurationMap.insert(make_pair(key,num));
}

void Slot::removeElement(Sessionkey& key)
{
    slotDurationMap.erase(key);
}

/************************CTimeWheel类实现**********************************/

CTimeWheel::CTimeWheel()
{
    tick = 0;
    currentTickIndex = 0;
}
CTimeWheel::~CTimeWheel()
{
    /*释放申请的slot资源*/
    for(uint32_t i=0;i<ticksPerWheel;i++)
    {
        Slot* delSlot = wheel.at(i);
        free(delSlot);
        delSlot = NULL;
    }
}

CTimeWheel::CTimeWheel(int tickDuration,int ticksPerWheel,int timeUnit)
{
    VLOG(3)<<"CTimeWheel Construct 2 Called!";

    currentTickIndex = 0;

    this->tickDuration = tickDuration*timeUnit;//单位:s
    this->ticksPerWheel = ticksPerWheel + 1;

    /*申请ticksPerWheel个slot槽*/
    for(uint32_t i=0;i<this->ticksPerWheel;i++)
    {
        Slot* tmp = new Slot(i);
        wheel.push_back(tmp);
    }

    sessKeyPool = new Sessionkey[SESSKEY_BUFFER_CNT];

    /*申请无锁队列存储超时sessionkey*/
    lfds611_queue_new(&timeoutSessionQueue,TIMEOUT_SESSKEY_CNT);

    /*开启线程,传递this指针*/
    if(pthread_create(&tickThread,NULL,tickStepThreadGlobal,this)!=0)
    {
        LOG(ERROR) <<"create tickStepThreadGlobal thread failed!";
    }
}

void CTimeWheel::tickStepRun()
{
    //获取当前时间
    startTime = getCurrentTime();

    //设置tick为1
    tick = 1;

    //1.获取当前tick指针的slot
    for(int i=0;;i++)
    {
        if(i == wheel.size())
        {
            i=0;
        }

        //加锁
        mtx.try_lock();

        currentTickIndex = i;

        //解锁
        mtx.unlock();

        //2.对当前slot所有元素进行timeout处理(重要,暂时没完成)
        notifyExpired(currentTickIndex);

        //3.等待下一次tick到来
        waitForNextTick();
    }
}

void CTimeWheel::waitForNextTick()
{
    while(1)
    {
        long currentTime = getCurrentTime();
        long sleepTime = tickDuration * tick - (currentTime - startTime);//单位

        /*这块的值可能过大,加调试信息*/
        //VLOG(3)<<"tick step Thread sleepTime is "<<sleepTime;

        if(sleepTime <= 0)
        {
            break;
        }
        else
        {
            sleep(sleepTime);
        }
    }

    //tick步进1
    VLOG(3)<<"tick step add 1";

    tick++;
}

long CTimeWheel::addElement(Sessionkey& key,int num)
{
    //1.检测时间轮是否添加相同元素,有则删除后重新将元素插入到wheel中
    if(false == checkAdd(key))
    {
        VLOG(3)<<"SessionKey is first Add";
    }

    //2.获取当前tick指针的前一个slot槽位
    int previousTickindex = getPreviousTickIndex();

    //3.添加元素到wheel->slot中
    Slot* pSlot = wheel.at(previousTickindex);
    if(NULL == pSlot)
    {
        return -1;
    }

    pSlot->addElement(key,num);
    VLOG(3)<<"threadID "<<num<<" addElement to TimeWheel!";

    //4.记录SessionKey和slot的对应关系
    keyToSlotMap.insert(make_pair(key,pSlot));
    VLOG(4)<<"keyToSlotMap size: "<<keyToSlotMap.size();

    VLOG(3)<<"threadID "<<num<<" insert sessionKey to keyToSlotMap!";

    //5.返回新加入元素的timeout时间
    return (ticksPerWheel - 1) * tickDuration;
}

bool CTimeWheel::removeElement(Sessionkey& key)
{
    Sessionkey reverseKey(key.dstIp,key.srcIp,key.dstPort,key.srcPort);

    //删除keyToSlotMap关系表key对应元素
    tRelationMap::iterator iteGot = keyToSlotMap.find(key);
    if(iteGot == keyToSlotMap.end())
    {
        iteGot = keyToSlotMap.find(reverseKey);
        if(iteGot == keyToSlotMap.end())
        {
            VLOG(3)<<"checkAdd function:sessionkey is not in keyToSlotMap";
            return false;
        }
        else
        {
            Slot* pSlot = iteGot->second;
            if(NULL == pSlot)
            {
                VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
                return false;
            }

            //4.删除wheel slot中的元素
            pSlot->removeElement(reverseKey);
            VLOG(3)<<"Erase key from wheel slot!";

            //5.删除keyToSlotMap关系表中元素,便于后续添加和更新
            keyToSlotMap.erase(reverseKey);
            VLOG(3)<<"Erase key from keyToSlotMap!";

            return true;
        }
    }
    else
    {
        Slot* pSlot = iteGot->second;
        if(NULL == pSlot)
        {
            VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
            return false;
        }

        //6.删除wheel slot中的元素
        pSlot->removeElement(key);
        VLOG(3)<<"Erase key from wheel slot!";

        //7.删除keyToSlotMap关系表中元素,便于后续添加和更新
        keyToSlotMap.erase(key);
        VLOG(3)<<"Erase key from keyToSlotMap!" << key.ToString();
    }

    /////////////测试代码///////////////////////
    struct in_addr addrSrc,addrDst;
    uint32_t srcIp = htonl(key.srcIp);
    uint32_t dstIp = htonl(key.dstIp);

    memcpy(&addrSrc,&srcIp,4);
    memcpy(&addrDst,&dstIp,4);

    string strSrcIP = inet_ntoa(addrSrc);
    string strDstIP = inet_ntoa(addrDst);

    //VLOG(4)<<strSrcIP<<"->"<<strDstIP;

    //VLOG(4)<<key.srcPort<<"->"<<key.dstPort;

    return true;
}

/*对当前tick索引对应slot中的所有元素做超时处理*/
void CTimeWheel::notifyExpired(int idx)
{
    //1.返回idx索引对应的slot槽
    if(idx<0 || idx >= ticksPerWheel)//0~ticksPerWheel-1
    {
        VLOG(4)<<"notifyExpired() function Failed!Reason:invalid Index!";
        return;
    }

    Slot*pSlot = wheel.at(idx);
    if(NULL == pSlot)
    {
        VLOG(4)<<"notifyExpired() function Failed!Reason:wheel slot is empty!";
        return;
    }

    //2.返回slot槽中元素集合
    //VLOG(4)<<"slot Element size is "<<pSlot->slotDurationMap.size();
    int index = 0;
    tDurationMap::iterator iteMap;

    for(iteMap=pSlot->slotDurationMap.begin();iteMap!=pSlot->slotDurationMap.end();iteMap++)
    {
        Sessionkey* timeoutKey = sessKeyPool+index;

        Sessionkey key = iteMap->first;
        timeoutKey->srcIp = key.srcIp;
        timeoutKey->dstIp = key.dstIp;
        timeoutKey->srcPort= key.srcPort;
        timeoutKey->dstPort= key.dstPort;

        //将超时的sessionkey压入Queue
        lfds611_queue_enqueue(timeoutSessionQueue,(void*)timeoutKey);

        struct in_addr addrSrc,addrDst;

        uint32_t srcIp = htonl(timeoutKey->srcIp);
        uint32_t dstIp = htonl(timeoutKey->dstIp);

        memcpy(&addrSrc,&srcIp,4);
        memcpy(&addrDst,&dstIp,4);

        string strSrcIP = inet_ntoa(addrSrc);
        string strDstIP = inet_ntoa(addrDst);

        VLOG(4)<<strSrcIP<<"->"<<strDstIP;

        VLOG(4)<<"srcPort:"<<timeoutKey->srcPort<<"dstPort:"<<timeoutKey->dstPort;

        VLOG(4)<<"sessionKey enqueue enqueue enqueue";

        //删除记录
        removeElement(key);

        if(++index  == SESSKEY_BUFFER_CNT)
        {
            index = 0;
        }
    }
}

bool CTimeWheel::checkAdd(Sessionkey& key)
{
    //检测集合中是否存在,如存在则删除slot槽中元素,删除keyToSlotMap对应表中元素
    Sessionkey reverseKey(key.dstIp,key.srcIp,key.dstPort,key.srcPort);

    //删除keyToSlotMap关系表key对应元素
    tRelationMap::iterator iteGot = keyToSlotMap.find(key);
    if(iteGot == keyToSlotMap.end())
    {
        iteGot = keyToSlotMap.find(reverseKey);
        if(iteGot == keyToSlotMap.end())
        {
            VLOG(3)<<"checkAdd function:sessionkey is not in keyToSlotMap";
            return false;
        }
        else
        {
            Slot* pSlot = iteGot->second;
            if(NULL == pSlot)
            {
                VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
                return false;
            }

            //4.删除wheel slot中的元素
            pSlot->removeElement(reverseKey);
            VLOG(3)<<"Erase key from wheel slot!";

            //5.删除keyToSlotMap关系表中元素,便于后续添加和更新
            keyToSlotMap.erase(reverseKey);
            VLOG(3)<<"Erase key from keyToSlotMap!";

            return true;
        }
    }
    else
    {
        Slot* pSlot = iteGot->second;
        if(NULL == pSlot)
        {
            VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
            return false;
        }

        //6.删除wheel slot中的元素
        pSlot->removeElement(key);
        VLOG(3)<<"Erase key from wheel slot!";

        //7.删除keyToSlotMap关系表中元素,便于后续添加和更新
        keyToSlotMap.erase(key);
        VLOG(3)<<"Erase key from keyToSlotMap!";

        return true;
    }

    return true;
}

int CTimeWheel::getPreviousTickIndex()
{
    //加锁
    mtx.try_lock();

    int cti = currentTickIndex;
    if(0 == cti)
    {
        return ticksPerWheel - 1;//4
    }

    return cti - 1;

    //解锁
    mtx.unlock();
}

long CTimeWheel::getCurrentTime()
{
    struct timeval tv;
    gettimeofday(&tv,NULL);
    return tv.tv_sec  + tv.tv_usec / 1000000;
}
时间: 2024-10-23 16:25:06

Linux C++ 实现时间轮 优化超时检测机制的相关文章

Linux系统的时间管理及优化

一直以来对Linux下的时间管理知之不详,GFree_wind在微博发起过几次Linux下时钟的讨论,和Godbach这些大牛比,我完全插不上话,因为不懂.近来闲暇时间研究了下Linux下的时间管理,分享出来,请大家指正.   从我们大白话的角度想,时间管理其实分成两部分,就像我们小时候学习物理的时候物理老师不断强调时间和时刻的区别.一个是时刻,比如现在是20:44:37秒,指的是时刻,我们手机上看时间,指的也是时刻.另一块是时间,比如说,我每天工作八小时,再比如说,半小时之后,我要出门了,结束

Linux php.ini的安全优化配置

Linux php.ini的安全优化配置 (1) PHP函数禁用找到 disable_functions = 该选项可以设置哪些PHP函数是禁止使用的,PHP中有一些函数的风险性还是相当大的,可以直接执行一些CentOS系统级脚本命令,如果允许这些函数执行,当PHP 程序出现漏洞时,损失是非常严重的!以下我们给出推荐的禁用函数设置: disable_functions = phpinfo,passthru,exec,system,popen,chroot,escapeshellcmd,escap

时间轮算法

问题引入:游戏里面每个Player身上有很多buffs,在每一个tick(最小时间段)都要去检查buff里面的每一个buff是不是过期,产生的效果如何,造成在每个tick里面都去遍历一个长list,明显很不好. 怎么优化? 1.原始模型:    buff的状态在每一个tick里面都要更新!可以想象指针每移动一下,都会非常沉重地拖着所有的BuffList,好可怕-- 2. 优化模型1:   我们要避免的是:原始模型在每一个tick里面都要遍历List,那么我们试下以Times为key,在加入buf

时间轮

老早之前就听说时间轮算法特别高效,Linux内核都用的它,这两天抽空实现了遍--嗯,被差一bug搞死(~ ̄▽ ̄~) 啊哈 网上扣来的图,原理好懂:轮子里的每格代表一小段时间(精度),连起来就能表示时间点了(我去年买了个表),格子内含链表,中存回调函数:时间指针每次转动一格,指向某格时,取出链表里的回调函数依次执行,后清空链表,等待下一次转动. 加入节点逻辑也简单:在轮子可表示的时间范围内(格子数*格子精度),配合当前时针位置,对格子总数取余,即得本节点需放哪个格子. 进一步为扩大时间轮的表示范围

时间轮(TimeWheel)的设计与实现

一.前言 由于工作的需要,得实现一个用于控制事件超时抛弃的时间轮,由于这是一个相对独立的接口,就总结分享一下. 首先看下需求,此时间轮需要具备下面几个功能: 1)能添加事件,同时附上其超时时间: 2)如果事件正常执行结束,可以显示将其从时间轮上剔除掉,而不需要等时间轮自动移除: 3)如果事件到了设定的超时时间还没执行完,则时间轮需将其剔除掉,并发送一个超时的消息给系统. 基于这样的需求,下面就进行相应的设计和实现. 二.时间轮的设计 基于前面的需求,可以抽象出两个实体来:时钟和槽,其中时钟去负责

高性能定时器时间轮的探究

时间轮的概念 关于定时器有很多种,有基于升序的定时器时间链表,但是这种链表存在效率的不足,就是当插入定时器的时候时间复杂度是O(n).今天,我们来认识一下高性能定时器时间轮. 如上图所示,就是一个时间轮的基本轮廓.一个轮子上有很多槽slot,每一个槽指向一个定时器链表,这个链表是无序的.时间轮每转动一步就会指向下一个槽,其实也可以理解为一个滴答时间成为时间轮的槽间隔si (slot interval).它实际上就是心跳时间.如果该时间轮有n个槽,因此它运转一周的时间是n*si. 如果现在指针指向

Linux之tomcat 安装&配置&优化

Tomcat简介: 1.Tomcat就是用来解析jsp程序的一个软件 .(jsp是一种编程语言,即JAVA) 2.Tomcat是Apache 软件基金会(Apache Software Foundation)的Jakarta项目中的一个核心项目,由   Apache.Sun和其他一些公司及个人共同开发而成. 3.Tomcat是一个轻量级应用服务器,在中小型系统和并发访问用户不是很多的场合下被普遍使用,是开   发和调试JSP程序的首选. Tomcat的安装分为两个步骤:安装JDK和安装Tomca

网络超时检测的三种方法

作者:于老师,华清远见嵌入式学院讲师. 网络通信中,很多操作会使得进程阻塞,这时我们要设定时间,到时间后强制返回,避免进程在没有数据的情况下无限阻塞 这里我们总结一下网络超时检测的三种方法: 通过setsockopt设置套接字属性SO_RCVTIMEO struct timeval t = {5, 0}           if  (setsockopt(listenfd, SOL_SOCKET, SO_RCVTIMEO, &t, sizeof(t)) == -1) {             

网络编程中的超时检测

我们在网络编程中常见的一种做法是:创建好套接字后以阻塞的方式读写,如果没有数据可读的话,程序会一直等待.事实上,网络状况一直不断变化,很有可能在通讯过程中出现网络连接断开.我们在程序中有必要对这种情况进行检测,从而及时做出响应.下面介绍几种常用的超时检测方法(假设我们要求通过套接字等待数据的最大时间为8秒): 一. 设置套接字接收超时 setsockopt可以设置套接字的属性,其中包括接收超时时间.参考代码如下        struct timeval tv; // 描述时间的结构体变量