zmq笔记二: io线程和poller_t

int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0

本文主要是分析代码,方便自己日后查阅.

=========================================

上一篇中讲到io_thread_t的线程循环函数实际上调用的,是根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t)的成员函数loop().

怎样确定选用哪种I/O多路复用,由一些预编译宏确定,请看poller.hpp头文件.

本文是在windows平台下进行分析. windows下选用的是select_t,并不是iocp.

1. I/O线程

io_thread_t有三个成员变量:

        //  I/O thread accesses incoming commands via this mailbox.
        mailbox_t mailbox; //接收命令消息的邮箱,mailbox相关资料会在后面的文章展开介绍. 当需要和io_thread_t通信时,给它的邮箱发一个command_t命令

        //  Handle associated with mailbox‘ file descriptor.
        poller_t::handle_t mailbox_handle; //与邮箱绑定的句柄

        //  I/O multiplexing is performed using a poller object.
        poller_t *poller; //选用的i/o多路复用

io_thread_t这个类的功能很简洁,主要操作有: 线程开启,线程结束,处理在mailbox里的命令队列的消息(in_events函数).而mailbox里有消息待处理,是通过mailbox的fd状态可读来进行通知的,这个fd就是io_thread_t的成员变量mailbox_handle.

zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
    object_t (ctx_, tid_)
{
    poller = new (std::nothrow) poller_t (*ctx_);
    alloc_assert (poller);

    mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (mailbox_handle);//初始化时加进去poller_t的可读fd集合了.
}

2. poller_t

poller_t实际上是一个typedef的类型:

typedef select_t poller_t;

typedef epoll_t poller_t;

...

它根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t).本文只分析select_t.

就select_t而言,在windows和linux平台下,套接字集合的管理有所不同,但原理也差不多. 在select_t里有两个平台无关的通用的结构体:

        //  Internal state.
        struct fds_set_t //select_t对感兴趣的fd的事件集合管理
        {
            fds_set_t ();
            fds_set_t (const fds_set_t& other_);
            fds_set_t& operator=(const fds_set_t& other_);
            //  Convinient method to descriptor from all sets.
            void remove_fd (const fd_t& fd_);

            fd_set read;
            fd_set write;
            fd_set error;
        };

        struct fd_entry_t //与fd对应的事件处理对象(可理解成实现了i_poll_events相关接口并需要在io线程处理事件的对象)
        {
            fd_t fd;
            zmq::i_poll_events* events;
        };
        typedef std::vector<fd_entry_t> fd_entries_t;
#if defined ZMQ_HAVE_WINDOWS
        ......
#else
        fd_entries_t fd_entries; //fd对应的事件对象集合
        fds_set_t fds_set; //select系统调用需要的所有感兴趣的fd集合
        fd_t maxfd; //select系统调用需要的最大fd值
        bool retired; //是否需要移除fd对应的事件对象,如果为true,则从fd_entries删除fd所对应的事件对象
#endif

poller_t还有一个thread_t成员变量worker,它才是系统线程的包装 (io_thread_t.poller->worker).worker线程开启后,实际执行的就是poller_t:loop()函数.

        //  Handle of the physical thread doing the I/O work.
        thread_t worker;

poller_t对某些事件对象(实现了i_poll_events:in_events接口)感兴趣,就以fd为key,加进去集合fd_entries_t.同时把fd加到fds_set.error集合里,监听fd的错误事件.

zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{
    fd_entry_t fd_entry;
    fd_entry.fd = fd_;
    fd_entry.events = events_;

#if defined ZMQ_HAVE_WINDOWS
   ......
#else
    fd_entries.push_back (fd_entry);
    FD_SET (fd_, &fds_set.error);

    if (fd_ > maxfd)
        maxfd = fd_;
#endif

    adjust_load (1); //fd数量调整,原子增减操作

    return fd_;
}

需要注意的是,add_fd并没有把fd放进fds_set.read和fds_set.write,也就是说add_fd加进去的fd并不能被select监听到读写事件.

但是删除fd时,会把fd相关的信息都从poller_t里移除掉.

void zmq::select_t::rm_fd (handle_t handle_)
{
#if defined ZMQ_HAVE_WINDOWS
    ......
#else
    fd_entries_t::iterator fd_entry_it;
    for (fd_entry_it = fd_entries.begin ();
          fd_entry_it != fd_entries.end (); ++fd_entry_it)
        if (fd_entry_it->fd == handle_) //遍历集合找到目标元素
            break;
    zmq_assert (fd_entry_it != fd_entries.end ());

    fd_entry_it->fd = retired_fd; //标记设置为移除,注意找到目标元素后并没有立即从vector里remove掉,而是标记retired为true,在select系统调用完成后统一移除.
    fds_set.remove_fd (handle_); //从select集合里去掉

    if (handle_ == maxfd) { //更新最大的fd值
        maxfd = retired_fd;
        for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end ();
              ++fd_entry_it)
            if (fd_entry_it->fd > maxfd)
                maxfd = fd_entry_it->fd;
    }

    retired = true; //标记为需要删除
#endif
    adjust_load (-1); //fd数量调整
}

poller_t对fd的读写监听是通过这几个函数来操作的:

        void set_pollin (handle_t handle_); //监听fd可读状态
        void reset_pollin (handle_t handle_); //移除fd监听可读
        void set_pollout (handle_t handle_); //监听fd可写状态
        void reset_pollout (handle_t handle_); //移除fd监听可写

poller_t继承自poller_base_t,含有定时器集合:

        //  Clock instance private to this I/O thread.
        clock_t clock;

        //  List of active timers.
        struct timer_info_t
        {
            zmq::i_poll_events *sink;
            int id;
        };
        typedef std::multimap <uint64_t, timer_info_t> timers_t;
        timers_t timers;

定时器集合timers_t是用std:multimap容器,能保证timer的重复键值并有序.处理timer事件也很简洁,从最小时间值的元素开始与当前时间戳比较一下,大于当前时间就是timer时间到来,此时执行timer事件处理,处理完后从定时器集合移除.

uint64_t zmq::poller_base_t::execute_timers ()
{
    //  Fast track.
    if (timers.empty ())
        return 0;

    //  Get the current time.
    uint64_t current = clock.now_ms ();

    //   Execute the timers that are already due.
    timers_t::iterator it = timers.begin ();
    while (it != timers.end ()) {
        if (it->first > current)
            return it->first - current;

        //  Trigger the timer.
        it->second.sink->timer_event (it->second.id);

        //  Remove it from the list of active timers.
        timers_t::iterator o = it;
        ++it;
        timers.erase (o);
    }

    //  There are no more timers.
    return 0;
}

3. I/O线程的循环函数

循环里做了三件事情:

1.执行已注册的定时器

2.对fds_set的read/write/error的fd集合进行select,并处理各个fd发生的事件.

3.从事件集合fd_entries里移除已经标记为retired_fd的事件对象.

void zmq::select_t::loop ()
{
    while (!stopping) {
        //  Execute any due timers.
        int timeout = (int) execute_timers ();

        int rc = 0;

#if defined ZMQ_HAVE_WINDOWS
       ......
#else
        fds_set_t local_fds_set = fds_set;
        rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write,
            &local_fds_set.error, timeout ? &tv : NULL);

        if (rc == -1) {
            errno_assert (errno == EINTR);
            continue;
        }

        //  Size is cached to avoid iteration through just added descriptors.
        for (fd_entries_t::size_type i = 0, size = fd_entries.size (); i < size && rc > 0; ++i) {
            fd_entry_t& fd_entry = fd_entries [i];
            ......
            if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) {
                fd_entry.events->in_event ();
                --rc;
            }
       ......
            if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) {
                fd_entry.events->out_event ();
                --rc;
            }       ......
            if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) {
                fd_entry.events->in_event ();
                --rc;
            }
        }

        if (retired) { //等待select返回并处理完fd的事件后,再统一从fd_entries集合里移除标记为retired_fd的元素
            retired = false;
            fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (),
                is_retired_fd), fd_entries.end ());
        }
#endif
    }
}

  

时间: 2024-10-14 20:17:32

zmq笔记二: io线程和poller_t的相关文章

zmq笔记一: 对象关系

int major, minor, patch;zmq_version(&major, &minor, &patch); //4.2.0 本文主要是分析代码,方便自己日后查阅. ========================================= 1.上下文对象以及socket对象创建 void *context = zmq_ctx_new(); //创建上下文对象 void *responder = zmq_socket(context, ZMQ_REP); //创

转载:Pixhawk源码笔记二:APM线程

  转自:新浪@WalkAnt Pixhawk源码笔记一:APM代码基本结构,参见: http://blog.sina.com.cn/s/blog_402c071e0102v59r.html 这里,我们对 APM 线程进行讲解.如有问题,可以交流[email protected].新浪@WalkAnt,转载本博客文章,请注明出处,以便更大范围的交流,谢谢. 第三部分 APM线程 详细参考:http://dev.ardupilot.com/wiki/learning-ardupilot-threa

Java线程学习笔记(二) 线程的异常捕捉

线程异常的捕捉: 正常的情况下,我们在main()方法里是捕捉不到线程的异常的,例如以下代码: public class ExceptionThread implements Runnable{ @Override public void run() { throw new NullPointerException(); } public static void main(String[] args) { ExecutorService executorService = Executors.n

Spring Batch学习笔记二

此系列博客皆为学习Spring Batch时的一些笔记: Spring Batch的架构 一个Batch Job是指一系列有序的Step的集合,它们作为预定义流程的一部分而被执行: Step代表一个自定义的工作单元,它是Job的主要构件块:每一个Step由三部分组成:ItemReader.ItemProcessor.ItemWriter:这三个部分将执行在每一条被处理的记录上,ItemReader读取每一条记录,然后传递给ItemProcessor处理,最后交给ItemWriter做持久化:It

《How Tomcat Works》读书笔记(二)

<How Tomcat Works>读书笔记(二) 这是<How Tomcat Works>第一二章的读书笔记.第一张主要写了一个静态资源处理的web服务器,第二章加了对servlet的处理. 1. 概述 1.1 架构 HttpServer:表示Http服务器,与客户端通信,处理Http请求. StaticResourceProcessor:对静态资源请求进行处理. ServletProcessor:对Servlet资源请求进行处理. Request:表示Http请求,实现了Ser

zmq笔记四: tcp的connect操作

int major, minor, patch;zmq_version(&major, &minor, &patch); //4.2.0 本文主要是分析代码,方便自己日后查阅. ========================================= 本文以REQ/REP为例,分析一下tcp的connect的实现过程. void *context = zmq_ctx_new(); void *requester = zmq_socket(context, ZMQ_REQ)

Mysql 笔记二

Mysql 笔记二 Table of Contents 1. 前言 2. Master Thread 工作方式 2.1. 主循环(loop) 2.2. 后台循(backgroup loop) 2.3. 刷新循环(flush loop) 2.4. 暂停循环(suspend loop) 3. InnoDB关键特性 3.1. 插入缓冲(Insert Buffer) 3.2. 两次写(Double Write) 3.3. 自适应哈希索引(Adaptive Hash Index),默认开启 3.4. 异步

Android学习笔记二

17. 在ContentProvider中定义的getType()方法是定义URI的内容类型. 18. SQLiteDatabase类中的insert/delete/update/query方法其实也挺好用的,我在EquipmentProvider类中做了实现 19. Android专门有个单元测试项目(Android Test Project),在这个项目中,可以新建一个继承AndroidTestCase类的具体测试类来单元测试某个功能.我新建了一个AndroidTestProject项目,在

lucene学习笔记(二)

package index; import java.io.File; import java.io.IOException; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.CorruptIn