zeromq源码分析笔记之线程间收发命令(2)

zeromq源码分析笔记之架构说到了zmq的整体架构,可以看到线程间通信包括两类,一类是用于收发命令,告知对象该调用什么方法去做什么事情,命令的结构由command_t结构体确定;另一类是socket_base_t实例与session的消息通信,消息的结构由msg_t确定。命令的发送与存储是通过mailbox_t实现的,消息的发送和存储是通过pipe_t实现的,这两个结构都会详细说到,今天先说一下线程间的收发命令。

zeromq的线程可分为两类,一类是io线程,像reaper_t、io_thread_t都属于这一类,这类线程的特点就是内含一个轮询器poller及mailbox_t,通过poller可以监听mailbox_t的信件到来 ;另一类是zmq的socket,所有socket_base_t实例化的对象都可以看做一个单独的线程,这类线程不含poller,但同样含有一个mailbox_t,可以用于收发命令,由于不含poller,只能在每次使用socket_base_t实例的时候先处理一下mailbox_t,看是否有命令需要处理,代码上来看就是每次先调用下面这个函数接收并处理一下命令:

int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)

另外,两类线程发送命令的方式是一致的。下面,就详细的说一下命令结构、如何发送命令、两类线程如何接收命令

1、命令

先看一下命令结构(详细的结构参见源码Command.hpp):

//  This structure defines the commands that can be sent between threads.
    struct command_t
    {
        //  Object to process the command.
        zmq::object_t *destination;

        enum type_t
        {
           ...
        } type;

        union {
           ...
        } args;
    };

可以看到,命令由三部分构成,分别是发往的目的地destination,命令的类型type,命令的参数args。所谓的命令就是一个对象交代另一个对象去做某件事情,说白了就是告诉令一个对象应该调用哪个方法,命令的发出者是一个对象,而接收者是一个线程,线程接收到命令后,根据目的地派发给相应的对象做处理。可以看到命令的destination属性是object_t类型的,在上节介绍类的层次结构图时,说到object_t及其子类都具有发送和处理命令的功能(没有收命令的功能),所以有必要弄清楚一件事,对象、object_t、poller、线程、mailbox_t、命令是什么关系?

  • 在zmq中,每个线程都会拥有一个信箱,命令收发功能底层都是由信箱实现的
  • zmq提供了object_t类,用于使用线程信箱发送命令的功能(object_t类还有其他的功能),object_t还有处理命令的功能。
  • 线程内还有一个poller用于监听命令的到来,线程收到命令后把命令交由object_t处理

简单来说就是,object_t发命令,poller监听命令到来告知线程收命令,交给object_t处理,无论是object_t还是poller其实都操作mailbox_t,而这三者都在绑定在同一个线程上。下面就来看看具体的如何发送命令

2、发命令

一个对象想使用线程的发命令功能,其类就得继承自object_t(源码在Object.hpp/.cpp):

    class object_t
    {
    public:
        object_t (zmq::ctx_t *ctx_, uint32_t tid_);
        void process_command (zmq::command_t &cmd_);
        ...
    protected:
        ...
    private:
        zmq::ctx_t *ctx;//  Context provides access to the global state.
        uint32_t tid;//  Thread ID of the thread the object belongs to.
        void send_command (command_t &cmd_);
    }

可以看到,object_t内含一个tid,含义就是,该object_t对象要使用哪个线程的mailbox_t。关于zmq::ctx_t,在zmq中被称为上下文语境,上下文语境简单来说就是zmq的存活环境,里面存储是一些全局对象,zmq中所有的线程都可以使用这些对象。zmq线程中的mailbox_t对象会被zmq存储在ctx_t对象中。zmq的做法就是,在上下文语境中使用一个容器slots装载线程的mailbox,在新建线程的时候,给线程分配一个线程标志tid和mailbox,把mailbox放入容器的tid那个位置,代码来说就是slots[tid]=mailbox。有了这个基础,线程A给线程B发命令就只要往slots[B.tid]写入命令就可以了:

void zmq::object_t::send_command (command_t &cmd_)
{
    ctx->send_command (cmd_.destination->get_tid (), cmd_);
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
    slots [tid_]->send (command_);
}
void zmq::mailbox_t::send (const command_t &cmd_)
{
    sync.lock();
    cpipe.write (cmd_, false);
    bool ok = cpipe.flush ();
    sync.unlock ();
    if (!ok)
        signaler.send ();
}

3、io线程收命令

前面说过,每个io线程都含有一个poller,io线程的结构如下(源码在Io_thread_t.hpp/.cpp):

class io_thread_t : public object_t, public i_poll_events
    {
    public:
        io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
        ~io_thread_t ();
        void start (); //  Launch the physical thread.
        void stop ();//  Ask underlying thread to stop.
        ...
    private:
        mailbox_t mailbox;//  I/O thread accesses incoming commands via this mailbox.
        poller_t::handle_t mailbox_handle;//  Handle associated with mailbox‘ file descriptor.
        poller_t *poller;//  I/O multiplexing is performed using a poller object.
    }

zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
    object_t (ctx_, tid_)
{
    poller = new (std::nothrow) poller_t;
    alloc_assert (poller);

    mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (mailbox_handle);
}

构造函数中把mailbox_t句柄加入poller中,让poller监听其读事件,所以,如果有命令发过来,poller会被唤醒,并调用io_thread_t的in_event:

void zmq::io_thread_t::in_event ()
{
    //  TODO: Do we want to limit number of commands I/O thread can
    //  process in a single go?

    command_t cmd;
    int rc = mailbox.recv (&cmd, 0);

    while (rc == 0 || errno == EINTR) {//如果读管道中有内容或者等待信号的时候被中断,将一直读取
        if (rc == 0)
            cmd.destination->process_command (cmd);
        rc = mailbox.recv (&cmd, 0);
    }

    errno_assert (rc != 0 && errno == EAGAIN);
}

可以看到,in_event使用了mailbox_t的接收命令的功能。接收到命令之后,调用destination处理命令的功能去处理命令。

4、socket_base_t线程收命令

上一节说过socket_base_t的每个实例都可以看成一个zmq线程,但是比较特殊,并没有使用poller,而是在使用到socket的下面几个方法的时候去检查是否有未处理的命令:

int zmq::socket_base_t::getsockopt (int option_, void *optval_,size_t *optvallen_)
int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::term_endpoint (const char *addr_)
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
void zmq::socket_base_t::in_event ()//这个函数只有在销毁socke的时候会被用到,在后面讲zmq_close的时候会说到

检查的手段就是调用process_commands方法:

int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
{
    int rc;
    command_t cmd;
    if (timeout_ != 0) {
        //  If we are asked to wait, simply ask mailbox to wait.
        rc = mailbox.recv (&cmd, timeout_);
    }
    else {
        some code
        rc = mailbox.recv (&cmd, 0);
    }
    //  Process all available commands.
    while (rc == 0) {
        cmd.destination->process_command (cmd);
        rc = mailbox.recv (&cmd, 0);
    }
    some code
}

可见,最终都是使用mailbox_t的接收命令的功能。

这里有一个值得思考的问题,为什么socket_base_t实例对应的这个线程不使用poller呢?每次使用上面那些方法的时候去检查不是很麻烦吗?

有关mailbox_t的实现,在下节介绍

时间: 2024-11-02 20:35:08

zeromq源码分析笔记之线程间收发命令(2)的相关文章

zeromq源码分析笔记之准备篇

zeromq这个库主要用于进程通信,包括本地进程.网络通信,涉及到一些基础知识,主要包括管道通信,socket编程的内容,反应器模式(使用IO多路复用实现),无锁队列这几块比较重要的部分,下面的几个链接是这几块内容的学习笔记,有了这些知识,能比较好的理解这个开源库 1.socket原理详解 2.I/O多路复用之select 3.I/O多路复用之poll 4.I/O多路复用之epoll

《Java源码分析》:线程池 ThreadPoolExecutor

<Java源码分析>:线程池 ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一张实现,但是是间接实现. ThreadPoolExecutor是继承AbstractExecutorService.而AbstractExecutorService实现了ExecutorService接口. 在介绍细节的之前,先介绍下ThreadPoolExecutor的结构 1.线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行

GNU GRUB 2.00 源码分析笔记,持续更新

前言 很多运维类书籍或文章仅从系统管理者的角度讲解了 grub 的安装以及使用, 本篇博文则从 gnu grub 2.00 的源码入手,从开发者,以及系统底层运行机制的角度,分析 grub 是如何作为跨平台的"全面统一的引导加载程序",来引导操作系统,加载 Linux 内核的过程等等, 部分内容参考了<深度探索 Linux 操作系统>一书中相关的内容(ISBN 978-7-11143901-1 )以及 gnu grub 项目官方站点的文档,并且加入自己分析源码时的笔记. (

简单看看ThreadPool的源码以及从中看出线程间传值的另一种方法

这几天太忙没时间写博客,今天回家就简单的看了下ThreadPool的源码,发现有一个好玩的东西,叫做”执行上下文“,拽名叫做:”ExecutionContext“. 一:ThreadPool的大概流程. 第一步:它会调用底层一个helper方法. 第二步:走进这个helper方法,我们会发现有一个队列,并且这个队列的item必须是QueueUserWorkItemCallback的实例,然后这就激发了我的 兴趣,看看QueueUserWorkItemCallback到底都有些什么? 第三步:走到

jQuery源码分析笔记

jquery-2.0.3.js版本源码分析 (function(){ (21,94) 定义了一些变量和函数 jQuery = function(){}; (96,283) 给JQ对象,添加一些方法和属性 (285,347) extend : JQ的继承方法 (349,817) jQuery.extend():扩展一些工具方法 (877,2856) Sizzle :复杂选择器的实现   在jquery的官网可以直接下载它,它可以是独立的 (2880,3042) Callbacks : 回调对象 :

TCP内核源码分析笔记

Table of Contents 1 术语 1.1 ABC 1.2 SACK 1.3 D-SACK 1.4 F-RTO 1.5 template 2 tcp_v4_connect() 3 sys_accept() 3.1 tcp_accept() 4 三次握手 4.1 客户端发送SYN段 4.2 服务端接收到SYN段后,发送SYN/ACK处理 4.3 客户端回复确认ACK段 4.3.1 tcp_rcv_synsent_state_process() 4.4 服务端收到ACK段 5 数据传输 5

wifidog源码分析 - 客户端检测线程

引言 当wifidog启动时,会启动一个线程(thread_client_timeout_check)维护客户端列表,具体就是wifidog必须定时检测客户端列表中的每个客户端是否在线,而wifidog是通过两种方式进行检测客户端在线情况,一种是定时通过iptables获取客户端出入总流量更新客户端时间,通过最近更新时间进行判断(有新的出入流量则更新客户端时间,之后使用最新客户端时间与当前时间判断),一种是查询认证服务器,通过认证服务器的返回信息进行判断(将客户端IP和状态请求发送给认证服务器,

ZeroMQ源码分析之Message

使用ZeroMQ创建消息时的代码通常如下: zmq_msg_tmsgName; //1 zmq_msg_init(&msgName); //2 这两条代码做了什么呢? 首先对第1行代码,在zmq.h中有如下定义: typedef structzmq_msg_t {unsigned char _ [32];} zmq_msg_t; what?消息体就这样定义?也许这不是它的真面目. 看第2行代码,在zmq.cpp找到zmq_msg_init的实现方式. <p style="margi

Lighttpd1.4.20源码分析 笔记 状态机之response

在CON_STATE_RESPONSE_START状态中,服务器开始准备给客户端的response: case CON_STATE_RESPONSE_START: /* * the decision is done * - create the HTTP-Response-Header * */ if (srv->srvconf.log_state_handling) { log_error_write(srv, __FILE__, __LINE__, "sds", "