MIT 2012分布式课程基础源码解析-事件管理封装

这部分的内容主要包括Epoll/select的封装,在封装好相应函数后,再使用一个类来管理相应事件,实现的文件为pollmgr.{h, cc}。

事件函数封装

可看到pollmgr.h文件下定一个了一个虚基类aio_mgr

1 class aio_mgr {
2     public:
3         virtual void watch_fd(int fd, poll_flag flag) = 0;
4         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
5         virtual bool is_watched(int fd, poll_flag flag) = 0;
6         virtual void wait_ready(std::vector<int> *readable, std::vector<int> *writable) = 0;
7         virtual ~aio_mgr() {}
8 };

这便是具体事件类实现的基类,可看到文件末尾处的继承关系

 1 class SelectAIO : public aio_mgr {
 2     public :
 3
 4         SelectAIO();
 5         ~SelectAIO();
 6         void watch_fd(int fd, poll_flag flag);
 7         bool unwatch_fd(int fd, poll_flag flag);
 8         bool is_watched(int fd, poll_flag flag);
 9         void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
10
11     private:
12
13         fd_set rfds_;
14         fd_set wfds_;
15         int highfds_;
16         int pipefd_[2];
17
18         pthread_mutex_t m_;
19
20 };
21
22 #ifdef __linux__
23 class EPollAIO : public aio_mgr {
24     public:
25         EPollAIO();
26         ~EPollAIO();
27         void watch_fd(int fd, poll_flag flag);
28         bool unwatch_fd(int fd, poll_flag flag);
29         bool is_watched(int fd, poll_flag flag);
30         void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
31
32     private:
33         int pollfd_;
34         struct epoll_event ready_[MAX_POLL_FDS];
35         int fdstatus_[MAX_POLL_FDS];
36
37 };
38 #endif /* __linux */

相应是使用select和epoll分别实现的事件管理类,其中最主要的方法是wait_ready,这个方法实现了具体的事件查询,其余几个函数用于管理套接字,如增加套接字,删除套接字以及判断套接字是否还存活着。这里我们主要看下epoll实现部分,select实现部分类似。epoll的详解可看这里

  1 EPollAIO::EPollAIO()
  2 {
  3     pollfd_ = epoll_create(MAX_POLL_FDS);
  4     VERIFY(pollfd_ >= 0);
  5     bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
  6 }
  7
  8 EPollAIO::~EPollAIO()
  9 {
 10     close(pollfd_);
 11 }
 12
 13 //状态转换
 14 static inline
 15 int poll_flag_to_event(poll_flag flag)
 16 {
 17     int f;
 18     if (flag == CB_RDONLY) {
 19         f = EPOLLIN;
 20     }else if (flag == CB_WRONLY) {
 21         f = EPOLLOUT;
 22     }else { //flag == CB_RDWR
 23         f = EPOLLIN | EPOLLOUT;
 24     }
 25     return f;
 26 }
 27 /*
 28  *   这个函数就相当于:准备下一个监听事件的类型
 29  */
 30 void
 31 EPollAIO::watch_fd(int fd, poll_flag flag)
 32 {
 33     VERIFY(fd < MAX_POLL_FDS);
 34
 35     struct epoll_event ev;
 36     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
 37     fdstatus_[fd] |= (int)flag;
 38
 39     //边缘触发模式
 40     ev.events = EPOLLET;
 41     ev.data.fd = fd;
 42     //注册读事件
 43     if (fdstatus_[fd] & CB_RDONLY) {
 44         ev.events |= EPOLLIN;
 45     }//注册写事件
 46     if (fdstatus_[fd] & CB_WRONLY) {
 47         ev.events |= EPOLLOUT;
 48     }
 49
 50     if (flag == CB_RDWR) {
 51         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
 52     }
 53     //更改
 54     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
 55 }
 56
 57 bool
 58 EPollAIO::unwatch_fd(int fd, poll_flag flag)
 59 {
 60     VERIFY(fd < MAX_POLL_FDS);
 61     fdstatus_[fd] &= ~(int)flag;
 62
 63     struct epoll_event ev;
 64     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
 65
 66     ev.events = EPOLLET;
 67     ev.data.fd = fd;
 68
 69     if (fdstatus_[fd] & CB_RDONLY) {
 70         ev.events |= EPOLLIN;
 71     }
 72     if (fdstatus_[fd] & CB_WRONLY) {
 73         ev.events |= EPOLLOUT;
 74     }
 75
 76     if (flag == CB_RDWR) {
 77         VERIFY(op == EPOLL_CTL_DEL);
 78     }
 79     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
 80     return (op == EPOLL_CTL_DEL);
 81 }
 82
 83 bool
 84 EPollAIO::is_watched(int fd, poll_flag flag)
 85 {
 86     VERIFY(fd < MAX_POLL_FDS);
 87     return ((fdstatus_[fd] & CB_MASK) == flag);
 88 }
 89 /**
 90  *  事件循环,查看有哪些事件已经准备好,准备好的事件则插入相应列表中
 91  */
 92 void
 93 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
 94 {
 95     //得到已准备好的事件数目
 96     int nfds = epoll_wait(pollfd_, ready_,    MAX_POLL_FDS, -1);
 97     //遍历套接字数组,将可读/可写套接字添加到readable/writable数组中,便于后面处理
 98     for (int i = 0; i < nfds; i++) {
 99         if (ready_[i].events & EPOLLIN) {
100             readable->push_back(ready_[i].data.fd);
101         }
102         if (ready_[i].events & EPOLLOUT) {
103             writable->push_back(ready_[i].data.fd);
104         }
105     }
106 }

事件管理

在pollmgr.h中还有个重要的类

class aio_callback {
    public:
        virtual void read_cb(int fd) = 0;
        virtual void write_cb(int fd) = 0;
        virtual ~aio_callback() {}
};

这是一个回调虚基类,里面两个函数可从函数名猜到功能,即从对应的套接字读取/写入数据。该基类在后面底层通信中扮演着重要的角色。

然后我们再看后面的PollMgr类,这便是事件管理类,同时它还使用了单例模式。

 1 class PollMgr {
 2     public:
 3         PollMgr();
 4         ~PollMgr();
 5
 6         static PollMgr *Instance();
 7         static PollMgr *CreateInst();
 8         //在对应的套接字上添加事件
 9         void add_callback(int fd, poll_flag flag, aio_callback *ch);
10         //删除套接字上的所有事件
11         void del_callback(int fd, poll_flag flag);
12         bool has_callback(int fd, poll_flag flag, aio_callback *ch);
13         //阻塞删除套接字,为何阻塞呢?因为删除时,其它线程正在使用该套接字
14         void block_remove_fd(int fd);
15         //主要事件循环方法
16         void wait_loop();
17
18         static PollMgr *instance;
19         static int useful;
20         static int useless;
21
22     private:
23         pthread_mutex_t m_;
24         pthread_cond_t changedone_c_;
25         pthread_t th_;
26
27         aio_callback *callbacks_[MAX_POLL_FDS]; //事件数组,即数组下标为相应的套接字
28         aio_mgr *aio_;   //具体的事件函数类,可实现为epoll/select
29         bool pending_change_;
30 };

其中最主要的函数是wait_loop

接下来我们看具体实现。

 1 PollMgr *PollMgr::instance = NULL;
 2 static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
 3
 4 void
 5 PollMgrInit()
 6 {
 7     PollMgr::instance = new PollMgr();
 8 }
 9
10 PollMgr *
11 PollMgr::Instance()
12 {
13     //保证PollMgrInit在本线程内只初始化一次
14     pthread_once(&pollmgr_is_initialized, PollMgrInit);
15     return instance;
16 }

这里实现单例,pthread_once保证了线程中只初始化一次PollMgrInit()函数,所以在具体使用时,只需调用PollMgr::Instance()即可获得该管理类,再在其上处理各种各种事件。这里有个小疑问是:instance变量不应该是私有变量吗?

接下来我们看构造析构函数:

PollMgr::PollMgr() : pending_change_(false)
{
    bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
    //aio_ = new SelectAIO();
    aio_ = new EPollAIO();
    VERIFY(pthread_mutex_init(&m_, NULL) == 0);
    VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
    //this表示本类,wait_loop是本类中的一个方法,false表示不分离(detach)
    VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
}

PollMgr::~PollMgr()
{
    //never kill me!!!
    VERIFY(0);
}

构造函数中初始化了事件类,使用了EpollAIO类,初始化了互斥量和条件变量,然后创建了一个线程调用wait_loop。有意思的是析构函数(never kill me)

接下来是几个管理函数,管理套接字和回调的函数

 1 void
 2 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 3 {
 4     VERIFY(fd < MAX_POLL_FDS);
 5
 6     ScopedLock ml(&m_);
 7     aio_->watch_fd(fd, flag);
 8
 9     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
10     callbacks_[fd] = ch;
11 }
12
13 //remove all callbacks related to fd
14 //the return guarantees that callbacks related to fd
15 //will never be called again
16 void
17 PollMgr::block_remove_fd(int fd)
18 {
19     ScopedLock ml(&m_);
20     aio_->unwatch_fd(fd, CB_RDWR);
21     pending_change_ = true;
22     VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
23     callbacks_[fd] = NULL;
24 }
25
26 //删除相应的回调函数
27 void
28 PollMgr::del_callback(int fd, poll_flag flag)
29 {
30     ScopedLock ml(&m_);
31     if (aio_->unwatch_fd(fd, flag)) {
32         callbacks_[fd] = NULL;
33     }
34 }
35
36 //
37 bool
38 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
39 {
40     ScopedLock ml(&m_);
41     if (!callbacks_[fd] || callbacks_[fd]!=c)
42         return false;
43
44     return aio_->is_watched(fd, flag);
45 }

下面便是循环的主方法,该方法一直循环获取相应的事件,但此方法有个问题是,当某个回调读取需要长时间阻塞时,

会耽误后续事件的读取或写入。

//循环的主方法
void
PollMgr::wait_loop()
{

    std::vector<int> readable;  //可读套接字的vector
    std::vector<int> writable;  //可写套接字的vector
    //
    while (1) {
        {
            ScopedLock ml(&m_);
            if (pending_change_) {
                pending_change_ = false;
                VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
            }
        }
        //首先清空两个vector
        readable.clear();
        writable.clear();
        //这里便监听了事件,读或写事件,有时间发生便将事件的fd插入相应的vector
        aio_->wait_ready(&readable,&writable);
        //如果这次没有可读和可写事件,则继续下一次循环
        if (!readable.size() && !writable.size()) {
            continue;
        }
        //no locking of m_
        //because no add_callback() and del_callback should
        //modify callbacks_[fd] while the fd is not dead
        for (unsigned int i = 0; i < readable.size(); i++) {
            int fd = readable[i];
            if (callbacks_[fd]) //相应的回调函数读取套接字上的数据
                callbacks_[fd]->read_cb(fd);
        }

        for (unsigned int i = 0; i < writable.size(); i++) {
            int fd = writable[i];
            if (callbacks_[fd])
                callbacks_[fd]->write_cb(fd);
        }
    }
}

具体使用时,只需获得单例类即可,然后再添加相应的套接字及回调函数,添加都是线程安全的,因为在相应的实现上都会阻塞在内部互斥变量m_上

时间: 2024-10-15 19:43:48

MIT 2012分布式课程基础源码解析-事件管理封装的相关文章

MIT 2012分布式课程基础源码解析一-源码概述

课程主页 课程介绍:本课程会在给出的源码的基础上要求完成8个lab lab overviewLab 1 - Lock ServerLab 2 - Basic File ServerLab 3 - MKDIR, UNLINK, and LockingLab 4 - Caching Lock ServerLab 5 - Caching Extent Server + ConsistencyLab 6 - PaxosLab 7 - Replicated lock serverLab 8 - Proje

MIT 2012 分布式课程基础源码解析-底层通讯实现

本节内容和前节事件管理封装是息息相关的,本节内容主要包含的代码在connection{.h, .cc}中. 这里面最主要的有两个类:connection类和tcpsconn类,connetion类主要服务于单个套接字,包括套接字上的数据读取写入等,而tcpsconn类则是服务于套接字集合,如接收连接,更新失效套接字等.具体我们看头文件. class chanmgr { public: virtual bool got_pdu(connection *c, char *b, int sz) = 0

Ejabberd源码解析前奏--管理

一.ejabberdctl 使用ejabberdctl命令行管理脚本,你可以执行ejabberdctl命令和一些普通的ejabberd命令(后面会详细解说).这意味着你可以在一个本地或远程ejabberd服务器(通过提供参数 --node NODENAME)上启动.停止以及执行很多其它管理任务. ejabberdctl脚本可在文件 ejabberdctl.cfg 里配置. 这个文件包含每个可配置选项的详细信息. ejabberdctl脚本返回一个数字状态码. 成功显示为0, 错误显示为1, 其它

convnet源码解析(一):基础准备

Jeremy Lin ConvNet是一个基于GPU实现的卷积神经网络开源代码(C++11),是由多伦多大学的Geoffrey Hinton深度学习团队编写的,它的最初版本是Hinton的学生Alex Krizhevsky编写的cuda-convnet(其项目地址在google code上面),最近cuda-convnet也从1.0版本更新到2.0版本(地址). 这份开源代码的官方地址是:http://deeplearning.cs.toronto.edu/codes 在CNN的开源代码中最出名

设计模式课程 设计模式精讲 12-3 适配器模式源码解析

1 源码解析 1.1 源码解析1(在jdk中的应用) 1.2 源码解析2(Spring中的通知管理) 1.3 源码解析3(SpringMVC中的应用) 1 源码解析 1.1 源码解析1(在jdk中的应用) xmlAdapter(此类是用于适配xml的一个类,是处理xml序列化和反序列化的一个类) public abstract class XmlAdapter<ValueType,BoundType> { /** * Do-nothing constructor for the derived

设计模式课程 设计模式精讲 4-3 简单工厂源码解析

1 源码解析 1.1 Calendar源码解析 1.2 DriverManager源码解析 1 源码解析 1.1 Calendar源码解析 /** * Gets a calendar using the specified time zone and default locale. * The <code>Calendar</code> returned is based on the current time * in the given time zone with the d

设计模式课程 设计模式精讲 6-3 抽象工厂源码解析

1 源码解析 1.1 mysql源码解析 1.2 mybaties 的sqlsession源码解析 1 源码解析 1.1 mysql源码解析 1.2 mybaties 的sqlsession源码解析 原文地址:https://www.cnblogs.com/1446358788-qq/p/11295158.html

设计模式课程 设计模式精讲 8-11 单例模式源码解析(jdk+spring+mybaties)

1 源码解析 1.1 单例解析1 1.2 单例解析2(容器单例) 1.3 单例解析3 1.4 单例解析4 1 源码解析 1.1 单例解析1 java.lang.Runtime /** * 饿汉式加载,初始化的时候,就已经new出了对象 */ private static Runtime currentRuntime = new Runtime(); /** * Returns the runtime object associated with the current Java applicat

设计模式课程 设计模式精讲 10-2 外观模式源码解析

1 源码解析 1.1 源码解析1(jdk中的JDBCUtils工具类) 1.2 源码解析2 1.3 源码解析3 1.4 源码解析4 1 源码解析 1.1 源码解析1(jdk中的JDBCUtils工具类) jdbc在springJDBC中的封装 /** * Close the given JDBC Connection and ignore any thrown exception. * This is useful for typical finally blocks in manual JDB