muduo源码-EventLoop.h

0 设计

EventLoop 类,主要是用来管理一个进程中的channel、计时器、以及epoll 的类。

每个类只有一个,(因为如果有两个,那么一个eventloop 在loop()的时候,另一个eventloop 得不到执行)。

每一个channel或者说是每一个文件描述符都必须属于eventloop,换句话说,每一个文件描述符属于一个线程。(这是必然的,因为eventloop在代表了一个线程)

eventloop 类的主要工作,就是wait epoll,然后执行channel 的事件处理函数。

1 源码

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <vector>

#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

#include <muduo/base/Mutex.h>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>

namespace muduo
{
namespace net
{

class Channel;
class Poller;
class TimerQueue;

// 事件处理类
class EventLoop : boost::noncopyable
{
  public:
    typedef boost::function<void()> Functor;

    EventLoop();
    ~EventLoop();

    // 里面一个while循环,直接开始工作.
    void loop();

    // 别的线程调用eventloop 的quit,设置 loop() 中循环的判断条件
    // 从而在下一次循环的时候停止
    void quit();

    // 返回上一次 epoll 的返回时间
    Timestamp pollReturnTime() const { return pollReturnTime_; }

    int64_t iteration() const { return iteration_; }

    // 添加函数对象到eventloop线程中执行。
    void runInLoop(const Functor &cb);

    // 当调用函数添加函数对象到eventloop中执行的时候
    // 需要判断是否是在eventloop 线程中.如果是在当前eventloop 的线程中
    // 那么直接执行,否则,就添加到队列中,然后唤醒epoll
    void queueInLoop(const Functor &cb);

    // 返回当前 等待执行函数对象的队列长度
    size_t queueSize() const;

    // 下面三个函数,是用来添加计时器对象的
    TimerId runAt(const Timestamp &time, const TimerCallback &cb);

    TimerId runAfter(double delay, const TimerCallback &cb);

    TimerId runEvery(double interval, const TimerCallback &cb);

    void cancel(TimerId timerId);

    // wakeup函数用来显示的唤醒 epoll ,主要是在 eventloop执行了quit
    // 还有添加了函数对象到队列中的时候 主动的唤醒 epoll
    // 能唤醒的原因在于,eventloop 对象持有一个 eventfd,注册在了epoll中
    // 然后需要唤醒的时候,直接 eventfd 中写数据即可.
    void wakeup();

    // 这两个函数被channel 的update 和remove 调用
    // 然后,这俩函数由去调用epoll 的对应接口
    void updateChannel(Channel *channel);
    void removeChannel(Channel *channel);

    // 用来判断一个 epoll 是否持有 这个 channel
    // 调用的是 epoll 的对象
    bool hasChannel(Channel *channel);

    // 用来判断 绑定在eventloop 中的channel 的运行是否在 本eventloop 线程中
    // 或是其他操作,需要属于本eventloop 线程
    void assertInLoopThread()
    {
        if (!isInLoopThread())
        {
            abortNotInLoopThread();
        }
    }

    // 被上面的函数调用,就是简单的判断下初始化eventloop 的线程是不是当前运行函数的线程
    bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

    // eventHandling() 用来判断是否是在已就绪channel的事件处理函数。
    // 因为channel的处理只能是在 eventloop 线程中,也就是说在 loop 处理的时候
    // 如果这个时候 remove channel 那么要删除的channel 就是当前正在处理channel
    bool eventHandling() const { return eventHandling_; }

    // 没被用到,貌似
    void setContext(const boost::any &context)
    {
        context_ = context;
    }

    const boost::any &getContext() const
    {
        return context_;
    }

    boost::any *getMutableContext()
    {
        return &context_;
    }

    static EventLoop *getEventLoopOfCurrentThread();

  private:
    void abortNotInLoopThread();
    // 这个函数是在,eventloop 主动唤醒 epoll 的时候处理eventfd 读事件的函数
    void handleRead(); // waked up
    // 执行队列中的函数对象
    void doPendingFunctors();
    // 用来打印本次已就绪channel 发生的对应的时间,也就是去调用了channel 的tostring
    void printActiveChannels() const; // DEBUG

    typedef std::vector<Channel *> ChannelList;

    bool looping_;
    bool quit_;
    bool eventHandling_;
    bool callingPendingFunctors_;
    // iteration_ 是poller 被唤醒的次数
    int64_t iteration_;
    // 初始化时候,本eventloop 所在的线程
    const pid_t threadId_;
    // 上一次epoll被唤醒的时间戳
    Timestamp pollReturnTime_;
    // 具体的epoll ,实现上可是用poll的,因此是一个继承体系
    boost::scoped_ptr<Poller> poller_;
    // 计时器队列.
    boost::scoped_ptr<TimerQueue> timerQueue_;
    // eventfd,是eventloop主动唤醒epoll的fd
    int wakeupFd_;
    // 是分装eventfd的channel
    boost::scoped_ptr<Channel> wakeupChannel_;
    boost::any context_;

    // 这个容器被epoll 填充。装了所有已就绪的channel
    ChannelList activeChannels_;
    // 当前正在处理的 channel
    Channel *currentActiveChannel_;

    mutable MutexLock mutex_;
    // 添加到eventloop 中需要执行的函数对象,就在这里。
    std::vector<Functor> pendingFunctors_;
};
}
}
#endif // MUDUO_NET_EVENTLOOP_H

实现

#include <muduo/net/EventLoop.h>

#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>
#include <muduo/net/SocketsOps.h>
#include <muduo/net/TimerQueue.h>

#include <boost/bind.hpp>

#include <signal.h>
#include <sys/eventfd.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

// 当定义一个命名空间时,可以忽略这个命名空间的名称:
//  编译器在内部会为这个命名空间生成一个唯一的名字,而且还会为这个匿名的命名空间生成一条using指令。
// 命名空间都是具有external 连接属性的,只是匿名的命名空间产生的__UNIQUE_NAME__在别的文件中无法得到,这个唯一的名字是不可见的.
namespace
{
// 使用了 __thread 每个线程保存自己的eventloop指针
__thread EventLoop *t_loopInThisThread = 0;

// epoll 的超时时间
const int kPollTimeMs = 10000;

// 是eventloop 的eventfd ,用来唤醒 epoll
// 唤醒的时机在前面说了
int createEventfd()
{
    int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (evtfd < 0)
    {
        LOG_SYSERR << "Failed in eventfd";
        // 如果创建失败,那么直接结束程序,没有必要执行下去了
        abort();
    }
    return evtfd;
}

// 这是两条预处理执行令,表示发生 -Wold-style-cast 的时候忽略
// 然后到下一条.
#pragma GCC diagnostic ignored "-Wold-style-cast"

// 这个对象只是用一次。
// 是为了防止管道破裂的。因此在构造的时候,直接将SIGPIPE设置为忽略
// 并且初始化的时候就执行了
class IgnoreSigPipe
{
  public:
    IgnoreSigPipe()
    {
        ::signal(SIGPIPE, SIG_IGN);
        // LOG_TRACE << "Ignore SIGPIPE";
    }
};
// 到这里的时候开启.
#pragma GCC diagnostic error "-Wold-style-cast"

IgnoreSigPipe initObj;
}

// 返回本线程的 eventloop 对象
EventLoop *EventLoop::getEventLoopOfCurrentThread()
{
    return t_loopInThisThread;
}

EventLoop::EventLoop()
    : looping_(false),
      quit_(false),
      eventHandling_(false),
      callingPendingFunctors_(false),
      iteration_(0),
      threadId_(CurrentThread::tid()),
      poller_(Poller::newDefaultPoller(this)), // 使用一个工厂模式返回poll
      timerQueue_(new TimerQueue(this)),       // 和计时器有关的
      wakeupFd_(createEventfd()),              // 创建eventloop 主动唤醒 epoll 的channel
      wakeupChannel_(new Channel(this, wakeupFd_)),
      currentActiveChannel_(NULL)
{
    LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
    //   如果当前线程已经有一个eventloop 了那么就退出啊。 LOG_FATAL 应该会直接停了。
    if (t_loopInThisThread)
    {
        LOG_FATAL << "Another EventLoop " << t_loopInThisThread
                  << " exists in this thread " << threadId_;
    }
    else
    {
        t_loopInThisThread = this;
    }
    wakeupChannel_->setReadCallback(
        boost::bind(&EventLoop::handleRead, this));
    // we are always reading the wakeupfd
    wakeupChannel_->enableReading();
}

// 析构函数
EventLoop::~EventLoop()
{
    LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
              << " destructs in thread " << CurrentThread::tid();
    // 先将 wakechannel 注销掉,然后直接关闭就行了。
    // 因为也没什么需要回收的资源,都是使用指针指针.
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = NULL;
}

void EventLoop::loop()
{
    assert(!looping_);
    assertInLoopThread();
    looping_ = true;
    quit_ = false;
    LOG_TRACE << "EventLoop " << this << " start looping";

    // 使用 quit 判断是否应该继续循环
    while (!quit_)
    {
        // activeChannels_ 用来填充就绪channel的
        activeChannels_.clear();
        // 传入了 activeChannels_ 的地址,由poll 填充已就绪的channel 到activeChannels_中
        pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
        // iteration_ 是poller 被唤醒的次数
        ++iteration_;

        // 根据日志级别,决定是否打印出所有的channel 的事件
        if (Logger::logLevel() <= Logger::TRACE)
        {
            printActiveChannels();
        }

        // 接下来遍历所有的 activeChannels_ 中的元素,去执行它们的
        // handleEvent() 实际的执行channel 的事件处理函数
        // 这里面可能有 TimerQueue 的channel ,用来处理已经超时的计时器
        // 这些计时器,被 TimerQueue 通过 add 添加函数对象的形式添加到了队列中?或是直接执行了。
        // 想一下,应该是直接执行了,因为是在本线程中。
        // 嗯在for中直接执行了每一个超时的计时器
        eventHandling_ = true;
        for (ChannelList::iterator it = activeChannels_.begin();
             it != activeChannels_.end(); ++it)
        {
            // 这里保存一次  currentActiveChannel_ 的意义是
            // 只是在删除额时候判断,删除的是否是这个channel、
            // 还有一个没看懂
            currentActiveChannel_ = *it;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }
        currentActiveChannel_ = NULL;
        eventHandling_ = false;

        // 执行add 到eventloop 的函数对象。
        doPendingFunctors();
    }

    LOG_TRACE << "EventLoop " << this << " stop looping";
    looping_ = false;
}

// 退出。
void EventLoop::quit()
{
    quit_ = true;

    // 推出的时候,直接wakeup 唤醒 epoll
    if (!isInLoopThread())
    {
        wakeup();
    }
}

void EventLoop::runInLoop(const Functor &cb)
{
    // 添加函数对象,如果是在本线程中就直接执行,否则就添加到队列中
    if (isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(cb);
    }
}

// 添加到队列中
void EventLoop::queueInLoop(const Functor &cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(cb);
    }

    // 如果不是在本队列中,或是在执行添加的函数对象
    // 那么本次添加的就已经不能在本次epoll唤醒的时候调用了
    // 因此这里直接wakeup。那么epoll 的wait直接被唤醒了
    if (!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup();
    }
}

size_t EventLoop::queueSize() const
{
    MutexLockGuard lock(mutex_);
    return pendingFunctors_.size();
}
// 下面三个函数用来添加计时器的
TimerId EventLoop::runAt(const Timestamp &time, const TimerCallback &cb)
{
    return timerQueue_->addTimer(cb, time, 0.0);
}

TimerId EventLoop::runAfter(double delay, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), delay));
    return runAt(time, cb);
}

TimerId EventLoop::runEvery(double interval, const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(), interval));
    return timerQueue_->addTimer(cb, time, interval);
}

// 取消计时器。
void EventLoop::cancel(TimerId timerId)
{
    return timerQueue_->cancel(timerId);
}

// channel 调用的updatechannel,被转发了
void EventLoop::updateChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    poller_->updateChannel(channel);
}

// 同上
void EventLoop::removeChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    // 这里使用了 eventHandling_ ,如果是正在处理channel 的事件函数,
    // 此时又要移除channel,那么要溢出的channel 必须是本channel 或是不在 activechannels中的channel
    // 否则报错.这里为什么要这样处理? 一般来说都是本channel移除自己.
    if (eventHandling_)
    {
        assert(currentActiveChannel_ == channel ||
               std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
    }
    poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    return poller_->hasChannel(channel);
}

// 当不是在eventloop 的线程执行 eventloop 的相关操作,就会调用该函数.end
// 打印日志,然后退出
void EventLoop::abortNotInLoopThread()
{
    LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
              << " was created in threadId_ = " << threadId_
              << ", current thread id = " << CurrentThread::tid();
}

// 主动唤醒的函数,向eventfd 中写入数据。
void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
        LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
    }
}

// 主动唤醒epoll 后,需要处理eventfd 的读事件,因此读出来就行了
void EventLoop::handleRead()
{
    uint64_t one = 1;
    ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
    if (n != sizeof one)
    {
        LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
    }
}

// 执行添加到eventloop 的函数对象。
// 这里没有使用拷贝,而是通过交换 vector 的方式。
// 更加效率
void EventLoop::doPendingFunctors()
{
    std::vector<Functor> functors;
    callingPendingFunctors_ = true;

    {
        MutexLockGuard lock(mutex_);
        functors.swap(pendingFunctors_);
    }

    for (size_t i = 0; i < functors.size(); ++i)
    {
        functors[i]();
    }
    callingPendingFunctors_ = false;
}

// 打印所有就绪channel 事件的函数
void EventLoop::printActiveChannels() const
{
    for (ChannelList::const_iterator it = activeChannels_.begin();
         it != activeChannels_.end(); ++it)
    {
        const Channel *ch = *it;
        LOG_TRACE << "{" << ch->reventsToString() << "} ";
    }
}

2 什么时候需要runinloop()

eventloop所在线程是实际进行IO操作的线程。

虽然,eventloop 被绑定在每一个channel,但是实际上channel 又被关联在各种其他的类中,比如TcpConnection 中,当TcpConnection在别的线程中,需要发送数据的时候,那么会调用send,此时TcpConnection判断运行函数的线程是否是关联的channel绑定的evenloop,如果是,那么直接执行,如果不是,那么runinloop(),通过bind的方式传入。因为send需要传入数据,而在bind中绑定数据即可。

对于这点,后面再添加。

因为涉及到一个什么时候析构的问题。

原文地址:https://www.cnblogs.com/perfy576/p/8656746.html

时间: 2024-10-12 08:04:25

muduo源码-EventLoop.h的相关文章

muduo源码-EpollPoller.h

0 设计 EpollPoller继承自Poller,为的是能够使用epoll和poll两种io函数.(这貌似是策略模式?记不清了) 1 源码 EpollPoller.h #ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H #define MUDUO_NET_POLLER_EPOLLPOLLER_H #include <muduo/net/Poller.h> #include <vector> struct epoll_event; namespace mu

muduo源码-HttpResponse.h

1 设计 HttpResponse类用来,存放需要发送给客户端的数据. 这些数据是一些原始的数据.并没有合成完整的报文. 但是提供一个接口,可以填充传入的 buffer 对象,合成完整的响应报文. 2 源码 #ifndef MUDUO_NET_HTTP_HTTPRESPONSE_H #define MUDUO_NET_HTTP_HTTPRESPONSE_H #include <muduo/base/copyable.h> #include <muduo/base/Types.h>

muduo源码-HttpRequest.h

1 设计 HttpRequest 类,是解析完毕的请求体所存放的地方,被关联在 HttpConnext中. 2 源码 #ifndef MUDUO_NET_HTTP_HTTPREQUEST_H #define MUDUO_NET_HTTP_HTTPREQUEST_H #include <muduo/base/copyable.h> #include <muduo/base/Timestamp.h> #include <muduo/base/Types.h> #includ

muduo源码分析--我对muduo的理解

分为几个模块 EventLoop.TcpServer.Acceptor.TcpConnection.Channel等 对于EventLoop来说: 他只关注里面的主驱动力,EventLoop中只关注poll,这类系统调用使得其成为Reactor模式,EventLoop中有属于这个loop的所有Channel,这个loop属于哪一个Server. 几个类存在的意义: 从应用层使用的角度来看,用户需要初始化一个EventLoop,然后初始化一个TcpServer(当然也可以自定义个TcpServer

muduo源码分析--Reactor模式在muduo中的使用

一. Reactor模式简介 Reactor释义"反应堆",是一种事件驱动机制.和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为"回调函数". 二. moduo库Reactor模式的实现 muduo主要通过3个类来实现Reactor模式:EventLoop,Chann

muduo源码分析--Reactor模式的在muduo中的使用(二)

一. TcpServer类: 管理所有的TCP客户连接,TcpServer供用户直接使用,生命期由用户直接控制.用户只需设置好相应的回调函数(如消息处理messageCallback)然后TcpServer::start()即可. 主要数据成员: boost::scoped_ptr<Accepter> acceptor_; 用来接受连接 std::map<string,TcpConnectionPtr> connections_; 用来存储所有连接 connectonCallbac

SGI STL源码stl_bvector.h分析

前言 上篇文章讲了 STL vector 泛化版本的实现,其采用普通指针作为迭代器,可以接受任何类型的元素.但如果用来存储 bool 类型的数据,可以实现功能,但每一个 bool 占一个字节(byte),而一个字节有 8 位(bit),这样就有点浪费了.所以 SGI STL 设计了一个特化版本的位向量容器 bit_vector 来节省空间内存.bit_vector 是一个 bit 位元素的序列容器,具有 vector 容器一样的成员函数,常用于硬件端口的控制. 原文地址:https://www.

(素材源码)猫猫学IOS(二十六)UI之iOS抽屉效果小Demo

猫猫分享,必须精品 素材代码地址:http://download.csdn.net/detail/u013357243/8635679 原创文章,欢迎转载.转载请注明:翟乃玉的博客 地址:http://blog.csdn.net/u013357243?viewmode=contents 先看效果 源码 NYDrawViewController.h // // NYDrawViewController.h // 06-抽屉效果 // // Created by apple on 14-9-1. /

h5微信房卡牛牛源码学习讲解

h5微信房卡牛牛源码Q 2171793408 官网地址: http://wowotouba.com/h5 比较仔细的学习了<c++primer>,并对每个习题都自己写代码实现了一遍,包括稍微复杂一点的例子. 认真读完了<effective c++>,<effective stl>. 比较仔细的学完了<数据结构与算法分析>,并把其中的每种数据结构和算法都用c++实现了一遍.包括各种线性表,树(二叉树.AVL树.RB树的各种操作),图(BFS.DFS.prim.