muduo::TimerId、Timer、TimerQueue分析

  • Linux时间函数介绍

    • linux中用以获取当前时间的的函数有
    • 定时函数
  • timerfd介绍
  • TimerId介绍
  • Timer
  • TimerQueue

Linux时间函数介绍

linux中用以获取当前时间的的函数有:

time(2) / time_t(秒)

ftime(3) / struct timeb(毫秒)

gettimeofday(2) / struct timeval(微秒)

clock_gettime(2) / struct timespec(微秒)

还有gmtime / localtime / timegm / mktime / strftime / struct tm等与当前时间无关的时间格式转换函数。

定时函数

sleep(3)

alarm(3)

usleep(3)

nanosleep(2)

clock_nanosleep(2)

gettimer(2) / settitimer(2)

timer_create(2) / timer_settime(2) / tiemr_gettime(2) / timer_delete(2)

timerfd_create(2) / timerfd_gettime(2) / timerfd_settime(2)

取舍如下:

1、计时只使用gettimeofday(2)来获取当前时间。

2、定时只使用timerfd_*系列函数来处理定时任务。

timerfd介绍

这节介绍muduo中定时器的实现。先看一个2.6内核新增的有关定时的系统调用,基于这几个系统调用可以实现基于文件描述符的定时器。即可是定时,使文件描述符在某一特定时间可读。

#include <sys/timerfd.h>
    int timerfd_create(int clockid, int flags);

    int timerfd_settime(int fd, int flags,
         onst struct itimerspec *new_value,
         struct itimerspec *old_value);

    int timerfd_gettime(int fd, struct itimerspec *curr_value);

1、timerfd_create用于创建一个定时器文件,函数返回值是一个文件句柄fd。

2、timerfd_settime用于设置新的超时时间,并开始计时。flag为0表示相对时间,为1表示绝对时间。new_value为这次设置的新时间,old_value为上次设置的时间。返回0表示设置成功。

3、timerfd_gettime用于获得定时器距离下次超时还剩下的时间。如果调用时定时器已经到期,并且该定时器处于循环模式(设置超时时间时struct itimerspec::it_interval不为0),那么调用此函数之后定时器重新开始计时。

TimerId介绍

TimerId非常简单,它被设计用来取消Timer的,它的结构很简单,只有一个Timer指针和其序列号。

class TimerId : public muduo::copyable
{
 public:
  TimerId()
    : timer_(NULL),
      sequence_(0)
  {
  }

  TimerId(Timer* timer, int64_t seq)
    : timer_(timer),
      sequence_(seq)
  {
  }

  // default copy-ctor, dtor and assignment are okay

  friend class TimerQueue;

 private:
  Timer* timer_;
  int64_t sequence_;
};

TimerQueue为其友元,可以操作其私有数据。

Timer

Timer封装了定时器的一些参数,例如超时回调函数、超时时间、定时器是否重复、重复间隔时间、定时器的序列号。其函数大都是设置这些参数,run()用来调用回调函数,restart()用来重启定时器(如果设置为重复)。其源码相对简单

Timer.h

class Timer : boost::noncopyable
{
 public:
  Timer(const TimerCallback& cb, Timestamp when, double interval)
    : callback_(cb),//回调函数
      expiration_(when),//超时时间
      interval_(interval),//如果重复,间隔时间
      repeat_(interval > 0.0),//是否重复
      sequence_(s_numCreated_.incrementAndGet())//当前定时器的序列号
  { }

#ifdef __GXX_EXPERIMENTAL_CXX0X__
  Timer(TimerCallback&& cb, Timestamp when, double interval)
    : callback_(std::move(cb)),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }
#endif

  void run() const//超时时调用回调函数
  {
    callback_();
  }

  Timestamp expiration() const  { return expiration_; }
  bool repeat() const { return repeat_; }
  int64_t sequence() const { return sequence_; }

  void restart(Timestamp now);

  static int64_t numCreated() { return s_numCreated_.get(); }

 private:
  const TimerCallback callback_;//回调函数
  Timestamp expiration_;//超时时间(绝对时间)
  const double interval_;//间隔多久重新闹铃
  const bool repeat_;//是否重复
  const int64_t sequence_;//Timer序号

  static AtomicInt64 s_numCreated_;//创建Timer序号使用,static
};

Timer.cc

AtomicInt64 Timer::s_numCreated_;

void Timer::restart(Timestamp now)
{
  if (repeat_)//如果设置重复,则重新添加
  {
    expiration_ = addTime(now, interval_);//将now和interval_相加
  }
  else
  {
    expiration_ = Timestamp::invalid();
  }
}

TimerQueue

虽然TimerQueue中有Queue,但是其实现时基于Set的,而不是Queue。这样可以高效地插入、删除定时器,且找到当前已经超时的定时器。TimerQueue的public接口只有两个,添加和删除。

void addTimerInLoop(Timer* timer);
void cancelInLoop(TimerId timerId);

内部有channel,和timerfd关联。添加新的Timer后,在超时后,timerfd可读,会处理channel事件,之后调用Timer的回调函数;在timerfd的事件处理后,还有检查一遍超时定时器,如果其属性为重复还有再次添加到定时器集合中。

内部有两种类型的Set

typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
typedef std::pair<Timer*, int64_t> ActiveTimer;
typedef std::set<ActiveTimer> ActiveTimerSet;

一个Set元素类型为超时事件和Timer*指针;另一种为Timer*指针和定时器序列号。

下面是源码

TimerQueue.h

class TimerQueue : boost::noncopyable
{
 public:
  TimerQueue(EventLoop* loop);
  ~TimerQueue();

  ///
  /// Schedules the callback to be run at given time,
  /// repeats if @c interval > 0.0.
  ///
  /// Must be thread safe. Usually be called from other threads.
  TimerId addTimer(const TimerCallback& cb,
                   Timestamp when,
                   double interval);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
  TimerId addTimer(TimerCallback&& cb,
                   Timestamp when,
                   double interval);
#endif

  void cancel(TimerId timerId);

 private:

  // FIXME: use unique_ptr<Timer> instead of raw pointers.
  typedef std::pair<Timestamp, Timer*> Entry;//std::pair支持比较运算
  typedef std::set<Entry> TimerList;//元素为超时时间和指向超时的定时器
  typedef std::pair<Timer*, int64_t> ActiveTimer;
  typedef std::set<ActiveTimer> ActiveTimerSet;//元素为定时器和其序列号

  void addTimerInLoop(Timer* timer);
  void cancelInLoop(TimerId timerId);
  // called when timerfd alarms
  void handleRead();
  // move out all expired timers
  std::vector<Entry> getExpired(Timestamp now);
  void reset(const std::vector<Entry>& expired, Timestamp now);

  bool insert(Timer* timer);

  EventLoop* loop_;
  const int timerfd_;
  Channel timerfdChannel_;
  // Timer list sorted by expiration
  TimerList timers_;//定时器集合

  // for cancel()
  ActiveTimerSet activeTimers_;
  bool callingExpiredTimers_; /* atomic *///是否正在处理超时事件
  ActiveTimerSet cancelingTimers_;//取消了的定时器的集合
};

TimerQueue.cpp

int createTimerfd()//创建timerfd
{
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);//非阻塞
  if (timerfd < 0)
  {
    LOG_SYSFATAL << "Failed in timerfd_create";
  }
  return timerfd;
}

struct timespec howMuchTimeFromNow(Timestamp when)//现在距离超时时间when还有多久
{
  int64_t microseconds = when.microSecondsSinceEpoch()
                         - Timestamp::now().microSecondsSinceEpoch();
  if (microseconds < 100)
  {
    microseconds = 100;
  }
  struct timespec ts;
  ts.tv_sec = static_cast<time_t>(
      microseconds / Timestamp::kMicroSecondsPerSecond);
  ts.tv_nsec = static_cast<long>(
      (microseconds % Timestamp::kMicroSecondsPerSecond) * 1000);
  return ts;
}

void readTimerfd(int timerfd, Timestamp now)//处理超时事件。超时后,timerfd变为可读
{
  uint64_t howmany;
  ssize_t n = ::read(timerfd, &howmany, sizeof howmany);//读timerfd,howmany为超时次数
  LOG_TRACE << "TimerQueue::handleRead() " << howmany << " at " << now.toString();
  if (n != sizeof howmany)
  {
    LOG_ERROR << "TimerQueue::handleRead() reads " << n << " bytes instead of 8";
  }
}

void resetTimerfd(int timerfd, Timestamp expiration)//重新设置定时器
{
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  bzero(&newValue, sizeof newValue);
  bzero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
  if (ret)
  {
    LOG_SYSERR << "timerfd_settime()";
  }
}

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),//创建timerfd
    timerfdChannel_(loop, timerfd_),//timerfd相关的channel
    timers_(),
    callingExpiredTimers_(false)
{
  timerfdChannel_.setReadCallback(
      boost::bind(&TimerQueue::handleRead, this));//设置回调函数,读timerfd
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  timerfdChannel_.enableReading();//timerfd对应的channel监听事件为可读事件
}

TimerQueue::~TimerQueue()
{
  timerfdChannel_.disableAll();
  timerfdChannel_.remove();
  ::close(timerfd_);
  // do not remove channel, since we‘re in EventLoop::dtor();
  for (TimerList::iterator it = timers_.begin();
      it != timers_.end(); ++it)
  {
    delete it->second;//释放Timer*
  }
}

TimerId TimerQueue::addTimer(const TimerCallback& cb,//添加新的定时器
                             Timestamp when,
                             double interval)
{
  Timer* timer = new Timer(cb, when, interval);
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

#ifdef __GXX_EXPERIMENTAL_CXX0X__
TimerId TimerQueue::addTimer(TimerCallback&& cb,
                             Timestamp when,
                             double interval)
{
  Timer* timer = new Timer(std::move(cb), when, interval);
  loop_->runInLoop(
      boost::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}
#endif

void TimerQueue::cancel(TimerId timerId)
{
  loop_->runInLoop(
      boost::bind(&TimerQueue::cancelInLoop, this, timerId));
}

void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  bool earliestChanged = insert(timer);//插入成功,则启动

  if (earliestChanged)
  {
    resetTimerfd(timerfd_, timer->expiration());//启动定时器
  }
}

void TimerQueue::cancelInLoop(TimerId timerId)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  ActiveTimer timer(timerId.timer_, timerId.sequence_);
  ActiveTimerSet::iterator it = activeTimers_.find(timer);
  if (it != activeTimers_.end())//要取消的在当前激活的Timer集合中
  {
    size_t n = timers_.erase(Entry(it->first->expiration(), it->first));//在timers_中取消
    assert(n == 1); (void)n;
    delete it->first; // FIXME: no delete please
    activeTimers_.erase(it);//在activeTimers_中取消
  }
  else if (callingExpiredTimers_)//如果正在执行超时定时器的回调函数,则加入到cancelingTimers集合中
  {
    cancelingTimers_.insert(timer);
  }
  assert(timers_.size() == activeTimers_.size());
}

void TimerQueue::handleRead()//处理timerfd读事件
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);//读timerfd

  std::vector<Entry> expired = getExpired(now);//找到超时定时器

  callingExpiredTimers_ = true;
  cancelingTimers_.clear();
  // safe to callback outside critical section
  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    it->second->run();//调用timer的回调函数
  }
  callingExpiredTimers_ = false;

  reset(expired, now);//把重复的定时器重新加入到定时器中
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
  TimerList::iterator end = timers_.lower_bound(sentry);//返回第一个大于等于now的迭代器,小于now的都已经超时
  assert(end == timers_.end() || now < end->first);
  std::copy(timers_.begin(), end, back_inserter(expired));//[begin end)之间的元素追加到expired末尾
  timers_.erase(timers_.begin(), end);//删除超时定时器

  for (std::vector<Entry>::iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    size_t n = activeTimers_.erase(timer);//删除超时定时器
    assert(n == 1); (void)n;
  }

  assert(timers_.size() == activeTimers_.size());
  return expired;
}

void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
  Timestamp nextExpire;

  for (std::vector<Entry>::const_iterator it = expired.begin();
      it != expired.end(); ++it)
  {
    ActiveTimer timer(it->second, it->second->sequence());
    if (it->second->repeat()//重复
        && cancelingTimers_.find(timer) == cancelingTimers_.end())//且不在cancelingTimers_集合中
    {
      it->second->restart(now);//重启定时器
      insert(it->second);//重新插入倒timers_和activeTimers
    }
    else
    {
      // FIXME move to a free list
      delete it->second; // FIXME: no delete please
    }
  }

  if (!timers_.empty())
  {
    nextExpire = timers_.begin()->second->expiration();
  }

  if (nextExpire.valid())
  {
    resetTimerfd(timerfd_, nextExpire);
  }
}

bool TimerQueue::insert(Timer* timer)//插入一个timer
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  bool earliestChanged = false;
  Timestamp when = timer->expiration();
  TimerList::iterator it = timers_.begin();
  if (it == timers_.end() || when < it->first)//当前插入的定时器是否时最早到时的
  {
    earliestChanged = true;
  }
  {
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;//为什么(void)result
  }
  {
    std::pair<ActiveTimerSet::iterator, bool> result
      = activeTimers_.insert(ActiveTimer(timer, timer->sequence()));
    assert(result.second); (void)result;
  }

  assert(timers_.size() == activeTimers_.size());
  return earliestChanged;
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-10 17:21:21

muduo::TimerId、Timer、TimerQueue分析的相关文章

muduo::Connector、TcpClient分析

Connector TcpClient Connector Connector用来发起连接.在非阻塞网络中,主动发起连接比被动接收连接更为复杂,因为要考虑错误处理,还要考虑重试. 主要难点在于 1.socket是一次性的,一旦出错无法恢复:只能关闭重来.使用新的fd后,用新的channel. 2.错误代码与acce(2)不同.及时是socket可写,也不意味着已经成功建立连接,还需要用getsockopt(sockfd, SOL_SOCKET, SO_ERROR, --)再次确认. 3.重试的间

muduo:Channel、Poller分析

Channel Poller Channel Channel是Reactor结构中的"事件",它自始至终都属于一个EventLoop,负责一个文件描述符的IO事件,它包含又文件描述符fd_,但实际上它不拥有fd_,不用负责将其关闭.在Channel类中保存这IO事件的类型以及对应的回调函数,当IO事件发生时,最终会调用到Channel类中的回调函数.Channel类一般不单独使用,它常常包含在其他类中(Acceptor.Connector.EventLoop.TimerQueue.Tc

muduo::Logging、LogStream分析

Logging LogStream Logging Logging类是用来记录分析日志用的. 下面是Logging.h的源码 namespace muduo { class TimeZone;//forward declaration class Logger { public: enum LogLevel//用来设置不同的日志级别 { TRACE, DEBUG, INFO, WARN, ERROR, FATAL, NUM_LOG_LEVELS,//级别个数 }; // compile time

JAVA随笔篇一(Timer源代码分析和scheduleAtFixedRate的使用)

写完了基础篇,想了非常久要不要去写进阶篇.去写JSP等等的用法.最后决定先不去写.由于自己并非JAVA方面的大牛.眼下也在边做边学,所以决定先将自己不懂的拿出来学并记下来. Timer是Java自带的java.util.Timer类,通过调度一个java.util.TimerTask任务.这样的方式能够让程序依照某一个频度运行. 1.Timer类的源代码分析: public class Timer { /** * The timer task queue. This data structure

Swoole源码学习记录(十五)——Timer模块分析

swoole版本:1.7.7-stable Github地址:点此查看 1.Timer 1.1.swTimer_interval_node 声明: // swoole.h 1045-1050h typedef struct _swTimer_interval_node { struct _swTimerList_node *next, *prev; struct timeval lasttime; uint32_t interval; } swTimer_interval_node; 成员 说明

muduo::EventLoopThread、EventLoopThreadPool分析

EventLoopThread EventLoopThreadPool muduo的并发模型为one loop per thread+ threadpool.为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool. EventLoopThread 任何一个线程,只要创建并运行了EventLoop,就是一个IO线程. EventLoopThread类就是一个封

muduo::FileUtil、LogFile分析

FileUtil LogFile FileUtil 这个类负责把日志写入到文件 FileUtil.h namespace FileUtil { // read small file < 64KB class ReadSmallFile : boost::noncopyable { public: ReadSmallFile(StringArg filename); ~ReadSmallFile(); // return errno template<typename String> in

Swoole定时器Timer特性分析与使用

Swoole是一个使用c开发的php扩展,通过php就可以实现高性能web服务器,同时,还内置了定时器Timer.任务队列Task特性.这样,基于swoole,你可以在程序层面控制实现方式,减少对外部工具 - 独立的消息队列服务器.定时任务管理工具等的依赖性. swoole的强大之处就在与其进程模型的设计,既解决了异步问题,又解决了并行.用法如下: swoole_server_addtimer($serv, 10); 第二个参数是定时器的间隔时间,单位为秒.swoole定时器的最小颗粒是1秒.支

muduo::BlockingQueue、BoundedBlockingQueue分析

BlockingQueue BoundedBlockingQueue 在学习源码之前,先了解一个概念:有界缓冲和无界缓冲. 以生产者.消费者模型. 有界缓冲是指生产者在向仓库添加数据时要先判断仓库是否已满,如果已满则通知消费者来取走数据.消费者在消费时,先判断仓库是否已空,如果是则先通知生产者生产数据. BlockingQueue 在无界缓冲中,生产者不用关心仓库是否已满,只需添加数据;消费者在判断仓库已空时要等待生产者的信号.这时只需要用一个信号量. BlockingQueue就是这样的一个模