参考资料:
https://www.zhihu.com/question/38427301
https://www.ibm.com/developerworks/cn/linux/l-cn-timers/
http://www.cnblogs.com/processakai/archive/2012/04/11/2442294.html
思路和代码的编写主要是参考的csdn上的一个java的代码
http://blog.csdn.net/mindfloating/article/details/8033340
我参考java代码的思路来编写的C++代码 ,说来其实也没啥技术难度,把代码分享下供后来的技术人使用
我使用c++ 实现的时间轮主要是用于检测超时,是tcp会话的超时与否
所以定义一个sessionKey作为唯一的主键key.
因为我使用了C++11中的unordered_map,所以需要重载operator == 操作符,并自己编写hash函数,也就是SessionHash中的函数
class Sessionkey{
public:
Sessionkey(){srcIp=dstIp=srcPort=dstPort = 0;};
Sessionkey(const Sessionkey& skey);
Sessionkey(uint32_t src,uint32_t dst,uint16_t sp,uint16_t dp):srcIp(src),dstIp(dst),srcPort(sp),dstPort(dp){}
bool operator == (const Sessionkey &skey) const;
public:
uint32_t srcIp;
uint32_t dstIp;
uint16_t srcPort;
uint16_t dstPort;
};
class SessionHash{
public:
size_t operator()(const Sessionkey& sk) const;
};
include “sessionKey.h”
include “murmurHash.h”
Sessionkey::Sessionkey(const Sessionkey& sk)
{
srcIp = sk.srcIp;
dstIp = sk.dstIp;
srcPort = sk.srcPort;
dstPort = sk.dstPort;
}
bool Sessionkey::operator==(const Sessionkey& sk) const
{
return (srcIp == sk.srcIp) && (dstIp == sk.dstIp)
&& (srcPort == sk.srcPort) && (dstPort == sk.dstPort);
}
/通过Sessionkey的全部成员构造hash值,切勿随意在Sessionkey类中添加成员变量/
size_t SessionHash::operator()(const Sessionkey& sk) const
{
ull64_t ul64 = murmurHash64A(&sk, sizeof(sk), 0xee6b27eb);
return ul64;
}
哈希值生成算法采用的是
ull64_t murmurHash64A ( const void * key, int len, ull64_t seed ){
//const uint64_t m = BIG_CONSTANT(0xc6a4a7935bd1e995);
const ull64_t m = 0xc6a4a7935bd1e995;
const int r = 47;
ull64_t h = seed ^ (len * m);
const ull64_t * data = (const ull64_t *)key;
const ull64_t * end = data + (len/8);
while(data != end)
{
ull64_t k = *data++;
k *= m;
k ^= k >> r;
k *= m;
h ^= k;
h *= m;
}
const unsigned char * data2 = (const unsigned char*)data;
switch(len & 7)
{
case 7: h ^= ull64_t(data2[6]) << 48;
case 6: h ^= ull64_t(data2[5]) << 40;
case 5: h ^= ull64_t(data2[4]) << 32;
case 4: h ^= ull64_t(data2[3]) << 24;
case 3: h ^= ull64_t(data2[2]) << 16;
case 2: h ^= ull64_t(data2[1]) << 8;
case 1: h ^= ull64_t(data2[0]);
h *= m;
};
h ^= h >> r;
h *= m;
h ^= h >> r;
return h;
}
#ifndef __TIME_WHEEL_H__
#define __TIME_WHEEL_H__
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <sessionKey.h>
#include <glog/logging.h>
#include "lfds611.h"
using namespace std;
/****************************全局函数定义区**********************************/
#define TIMEOUT_SESSKEY_CNT 1000 //default
#define SESSKEY_BUFFER_CNT 2000
void *tickStepThreadGlobal(void* param);
typedef std::unordered_map<Sessionkey,int,SessionHash> tDurationMap;
/*定义槽类Slot*/
class Slot
{
public:
Slot();
Slot(int nId);
~Slot();
void addElement(Sessionkey& key,int num);
void removeElement(Sessionkey& key);
public:
/*unordered_map的int是预留字段*/
tDurationMap slotDurationMap;//sessionKey和int
int id;
};
typedef std::unordered_map<Sessionkey,Slot*,SessionHash> tRelationMap;
/*时间轮类*/
class CTimeWheel
{
public:
CTimeWheel();
~CTimeWheel();
/*tickDuration:一个tick持续时间 ticksPerWheel 一轮的tick数(会话超时时间) timeUnit:时间单位,如毫秒*/
//构造函数中开启tick step线程
CTimeWheel(int tickDuration,int ticksPerWheel,int timeUnit);
public:
/*添加元素,返回新加入元素的timeout时间*/
long addElement(Sessionkey& key,int num);
bool removeElement(Sessionkey& key);
/*开启线程步进tick数*/
void tickStepRun();
private:
void waitForNextTick();
int getPreviousTickIndex();
bool checkAdd(Sessionkey& key);
void notifyExpired(int idx);//此处有疑问
/*返回值以秒为单位*/
long getCurrentTime();
public:
/*读写加锁*/
mutex mtx;
/*时间轮,元素是Slot类型*/
std::vector<Slot*> wheel;
/*维护sessionKey和slot槽对应关系的哈希表*/
tRelationMap keyToSlotMap;
/*存储超时会话的SessionKey*/
struct lfds611_queue_state *timeoutSessionQueue;
Sessionkey *sessKeyPool;
private:
uint32_t tickDuration;
uint32_t ticksPerWheel;
uint32_t currentTickIndex;
pthread_t tickThread;
long startTime;
long tick;
};
#endif /* __STW_TIMER_H__ */
#include "timeWheel.h"
void *tickStepThreadGlobal(void* param)
{
CTimeWheel* pThis = (CTimeWheel*)param;
pThis->tickStepRun();
return NULL;
}
Slot::Slot()
{
id = 0;
}
Slot::Slot(int nId)
{
id = nId;
}
Slot::~Slot()
{
}
void Slot::addElement(Sessionkey& key,int num)
{
if(0 == key.dstPort || 0 == key.srcPort)
{
VLOG(4)<<"addElement ERROR! 8888888888888888888888";
}
slotDurationMap.insert(make_pair(key,num));
}
void Slot::removeElement(Sessionkey& key)
{
slotDurationMap.erase(key);
}
/************************CTimeWheel类实现**********************************/
CTimeWheel::CTimeWheel()
{
tick = 0;
currentTickIndex = 0;
}
CTimeWheel::~CTimeWheel()
{
/*释放申请的slot资源*/
for(uint32_t i=0;i<ticksPerWheel;i++)
{
Slot* delSlot = wheel.at(i);
free(delSlot);
delSlot = NULL;
}
}
CTimeWheel::CTimeWheel(int tickDuration,int ticksPerWheel,int timeUnit)
{
VLOG(3)<<"CTimeWheel Construct 2 Called!";
currentTickIndex = 0;
this->tickDuration = tickDuration*timeUnit;//单位:s
this->ticksPerWheel = ticksPerWheel + 1;
/*申请ticksPerWheel个slot槽*/
for(uint32_t i=0;i<this->ticksPerWheel;i++)
{
Slot* tmp = new Slot(i);
wheel.push_back(tmp);
}
sessKeyPool = new Sessionkey[SESSKEY_BUFFER_CNT];
/*申请无锁队列存储超时sessionkey*/
lfds611_queue_new(&timeoutSessionQueue,TIMEOUT_SESSKEY_CNT);
/*开启线程,传递this指针*/
if(pthread_create(&tickThread,NULL,tickStepThreadGlobal,this)!=0)
{
LOG(ERROR) <<"create tickStepThreadGlobal thread failed!";
}
}
void CTimeWheel::tickStepRun()
{
//获取当前时间
startTime = getCurrentTime();
//设置tick为1
tick = 1;
//1.获取当前tick指针的slot
for(int i=0;;i++)
{
if(i == wheel.size())
{
i=0;
}
//加锁
mtx.try_lock();
currentTickIndex = i;
//解锁
mtx.unlock();
//2.对当前slot所有元素进行timeout处理(重要,暂时没完成)
notifyExpired(currentTickIndex);
//3.等待下一次tick到来
waitForNextTick();
}
}
void CTimeWheel::waitForNextTick()
{
while(1)
{
long currentTime = getCurrentTime();
long sleepTime = tickDuration * tick - (currentTime - startTime);//单位
/*这块的值可能过大,加调试信息*/
//VLOG(3)<<"tick step Thread sleepTime is "<<sleepTime;
if(sleepTime <= 0)
{
break;
}
else
{
sleep(sleepTime);
}
}
//tick步进1
VLOG(3)<<"tick step add 1";
tick++;
}
long CTimeWheel::addElement(Sessionkey& key,int num)
{
//1.检测时间轮是否添加相同元素,有则删除后重新将元素插入到wheel中
if(false == checkAdd(key))
{
VLOG(3)<<"SessionKey is first Add";
}
//2.获取当前tick指针的前一个slot槽位
int previousTickindex = getPreviousTickIndex();
//3.添加元素到wheel->slot中
Slot* pSlot = wheel.at(previousTickindex);
if(NULL == pSlot)
{
return -1;
}
pSlot->addElement(key,num);
VLOG(3)<<"threadID "<<num<<" addElement to TimeWheel!";
//4.记录SessionKey和slot的对应关系
keyToSlotMap.insert(make_pair(key,pSlot));
VLOG(4)<<"keyToSlotMap size: "<<keyToSlotMap.size();
VLOG(3)<<"threadID "<<num<<" insert sessionKey to keyToSlotMap!";
//5.返回新加入元素的timeout时间
return (ticksPerWheel - 1) * tickDuration;
}
bool CTimeWheel::removeElement(Sessionkey& key)
{
Sessionkey reverseKey(key.dstIp,key.srcIp,key.dstPort,key.srcPort);
//删除keyToSlotMap关系表key对应元素
tRelationMap::iterator iteGot = keyToSlotMap.find(key);
if(iteGot == keyToSlotMap.end())
{
iteGot = keyToSlotMap.find(reverseKey);
if(iteGot == keyToSlotMap.end())
{
VLOG(3)<<"checkAdd function:sessionkey is not in keyToSlotMap";
return false;
}
else
{
Slot* pSlot = iteGot->second;
if(NULL == pSlot)
{
VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
return false;
}
//4.删除wheel slot中的元素
pSlot->removeElement(reverseKey);
VLOG(3)<<"Erase key from wheel slot!";
//5.删除keyToSlotMap关系表中元素,便于后续添加和更新
keyToSlotMap.erase(reverseKey);
VLOG(3)<<"Erase key from keyToSlotMap!";
return true;
}
}
else
{
Slot* pSlot = iteGot->second;
if(NULL == pSlot)
{
VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
return false;
}
//6.删除wheel slot中的元素
pSlot->removeElement(key);
VLOG(3)<<"Erase key from wheel slot!";
//7.删除keyToSlotMap关系表中元素,便于后续添加和更新
keyToSlotMap.erase(key);
VLOG(3)<<"Erase key from keyToSlotMap!" << key.ToString();
}
/////////////测试代码///////////////////////
struct in_addr addrSrc,addrDst;
uint32_t srcIp = htonl(key.srcIp);
uint32_t dstIp = htonl(key.dstIp);
memcpy(&addrSrc,&srcIp,4);
memcpy(&addrDst,&dstIp,4);
string strSrcIP = inet_ntoa(addrSrc);
string strDstIP = inet_ntoa(addrDst);
//VLOG(4)<<strSrcIP<<"->"<<strDstIP;
//VLOG(4)<<key.srcPort<<"->"<<key.dstPort;
return true;
}
/*对当前tick索引对应slot中的所有元素做超时处理*/
void CTimeWheel::notifyExpired(int idx)
{
//1.返回idx索引对应的slot槽
if(idx<0 || idx >= ticksPerWheel)//0~ticksPerWheel-1
{
VLOG(4)<<"notifyExpired() function Failed!Reason:invalid Index!";
return;
}
Slot*pSlot = wheel.at(idx);
if(NULL == pSlot)
{
VLOG(4)<<"notifyExpired() function Failed!Reason:wheel slot is empty!";
return;
}
//2.返回slot槽中元素集合
//VLOG(4)<<"slot Element size is "<<pSlot->slotDurationMap.size();
int index = 0;
tDurationMap::iterator iteMap;
for(iteMap=pSlot->slotDurationMap.begin();iteMap!=pSlot->slotDurationMap.end();iteMap++)
{
Sessionkey* timeoutKey = sessKeyPool+index;
Sessionkey key = iteMap->first;
timeoutKey->srcIp = key.srcIp;
timeoutKey->dstIp = key.dstIp;
timeoutKey->srcPort= key.srcPort;
timeoutKey->dstPort= key.dstPort;
//将超时的sessionkey压入Queue
lfds611_queue_enqueue(timeoutSessionQueue,(void*)timeoutKey);
struct in_addr addrSrc,addrDst;
uint32_t srcIp = htonl(timeoutKey->srcIp);
uint32_t dstIp = htonl(timeoutKey->dstIp);
memcpy(&addrSrc,&srcIp,4);
memcpy(&addrDst,&dstIp,4);
string strSrcIP = inet_ntoa(addrSrc);
string strDstIP = inet_ntoa(addrDst);
VLOG(4)<<strSrcIP<<"->"<<strDstIP;
VLOG(4)<<"srcPort:"<<timeoutKey->srcPort<<"dstPort:"<<timeoutKey->dstPort;
VLOG(4)<<"sessionKey enqueue enqueue enqueue";
//删除记录
removeElement(key);
if(++index == SESSKEY_BUFFER_CNT)
{
index = 0;
}
}
}
bool CTimeWheel::checkAdd(Sessionkey& key)
{
//检测集合中是否存在,如存在则删除slot槽中元素,删除keyToSlotMap对应表中元素
Sessionkey reverseKey(key.dstIp,key.srcIp,key.dstPort,key.srcPort);
//删除keyToSlotMap关系表key对应元素
tRelationMap::iterator iteGot = keyToSlotMap.find(key);
if(iteGot == keyToSlotMap.end())
{
iteGot = keyToSlotMap.find(reverseKey);
if(iteGot == keyToSlotMap.end())
{
VLOG(3)<<"checkAdd function:sessionkey is not in keyToSlotMap";
return false;
}
else
{
Slot* pSlot = iteGot->second;
if(NULL == pSlot)
{
VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
return false;
}
//4.删除wheel slot中的元素
pSlot->removeElement(reverseKey);
VLOG(3)<<"Erase key from wheel slot!";
//5.删除keyToSlotMap关系表中元素,便于后续添加和更新
keyToSlotMap.erase(reverseKey);
VLOG(3)<<"Erase key from keyToSlotMap!";
return true;
}
}
else
{
Slot* pSlot = iteGot->second;
if(NULL == pSlot)
{
VLOG(3)<<"checkAdd() function,keyToSlotMap Element is NULL";
return false;
}
//6.删除wheel slot中的元素
pSlot->removeElement(key);
VLOG(3)<<"Erase key from wheel slot!";
//7.删除keyToSlotMap关系表中元素,便于后续添加和更新
keyToSlotMap.erase(key);
VLOG(3)<<"Erase key from keyToSlotMap!";
return true;
}
return true;
}
int CTimeWheel::getPreviousTickIndex()
{
//加锁
mtx.try_lock();
int cti = currentTickIndex;
if(0 == cti)
{
return ticksPerWheel - 1;//4
}
return cti - 1;
//解锁
mtx.unlock();
}
long CTimeWheel::getCurrentTime()
{
struct timeval tv;
gettimeofday(&tv,NULL);
return tv.tv_sec + tv.tv_usec / 1000000;
}