zmq笔记三:socket和mailbox

int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0

本文主要是分析代码,方便自己日后查阅.

=========================================

1. socket类型

每个socket类型有一个类与之对应. 所有的这些类都继承于socket_base_t.各子类的继承关系图请查看笔记一.

   class socket_base_t :
        public own_t,
        public array_item_t <>,
        public i_poll_events,
        public i_pipe_events
    {
        friend class reaper_t;

    public:

        ......
        int send (zmq::msg_t *msg_, int flags_);
        int recv (zmq::msg_t *msg_, int flags_);
        int add_signaler (signaler_t *s);
        int remove_signaler (signaler_t *s);
        int close ();

        //  These functions are used by the polling mechanism to determine
        //  which events are to be reported from this socket.
        bool has_in ();
        bool has_out ();
        ......
        //  i_poll_events implementation. This interface is used when socket
        //  is handled by the poller in the reaper thread.
        void in_event ();
        void out_event ();
        void timer_event (int id_);
         ......
    protected:
        socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false);
        virtual ~socket_base_t ();
        .....

        //  The default implementation assumes that send is not supported.
        virtual bool xhas_out ();
        virtual int xsend (zmq::msg_t *msg_);

        //  The default implementation assumes that recv in not supported.
        virtual bool xhas_in ();
        virtual int xrecv (zmq::msg_t *msg_);
        ......
    private:
        //  Creates new endpoint ID and adds the endpoint to the map.
        void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);

        //  Map of open endpoints.
        typedef std::pair <own_t *, pipe_t*> endpoint_pipe_t;
        typedef std::multimap <std::string, endpoint_pipe_t> endpoints_t;
        endpoints_t endpoints;

        //  Map of open inproc endpoints.
        typedef std::multimap <std::string, pipe_t *> inprocs_t;
        inprocs_t inprocs;

        //  Moves the flags from the message to local variables,
        //  to be later retrieved by getsockopt.
        void extract_flags (msg_t *msg_);
        ......
        int process_commands (int timeout_, bool throttle_);
        //  Socket‘s mailbox object.
        i_mailbox *mailbox;

        //  List of attached pipes.
        typedef array_t <pipe_t, 3> pipes_t;
        pipes_t pipes;

        //  Reaper‘s poller and handle of this socket within it.
        poller_t *poller;
        poller_t::handle_t handle;
        ......
    };

socket_base_t这个父类做了大部分逻辑,子类再按需实现函数重载. 拿req_t为例, req_t继承dealer_t,dealer_t继承socket_base_t. 子类以实现xsend/xrecv等带x前缀的重载函数为主,而父类socket_base_t对外暴露的是不带前缀x的函数.

   class req_t : public dealer_t
    {
    public:

        req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
        ~req_t ();

        //  Overrides of functions from socket_base_t.
        int xsend (zmq::msg_t *msg_);
        int xrecv (zmq::msg_t *msg_);
        bool xhas_in ();
        bool xhas_out ();
        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
        void xpipe_terminated (zmq::pipe_t *pipe_);

    protected:
        ......
    private:
        ......
        //  The pipe the request was sent to and where the reply is expected.
        zmq::pipe_t *reply_pipe;
        ......
        req_t (const req_t&);
        const req_t &operator = (const req_t&);
    };

2.mailbox

基类socket_base_t有一个成员变量 i_mailbox *mailbox. 这就是socket的邮箱了,所有投递给socket的命令消息command_t都会放到这个邮箱的队列里.

zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
    own_t (parent_, tid_),
    ......
    thread_safe (thread_safe_),
    reaper_signaler (NULL)
{
    options.socket_id = sid_;
    options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
    options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;

    if (thread_safe)
        mailbox = new mailbox_safe_t(&sync);
    else {
        mailbox_t *m = new mailbox_t();
        if (m->get_fd () != retired_fd)
            mailbox = m;
        else {
            LIBZMQ_DELETE (m);
            mailbox = NULL;
        }
    }
}

由构造函数可知,mailbox是有线程安全的分别的, mailbox_safe_t和mailbox_t都是mutex_t sync作为访问互斥. 这是因为mailbox的消息队列 ypipe_t是无锁链表,读写需要同步,ypipe_t更详细的实现和分析可参考这篇博客.

        //  The pipe to store actual commands.
        typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t;
        cpipe_t cpipe;

邮箱的sync在mailbox_safe_t是以socket_base_t的sync指针来初始化的,而mailbox_t则是独立于socket本身的.

对于mailbox_t来说,任意时刻只能有一个线程去读它的命令消息队列,读消息不用加锁,并只需要一个signaler去通知读线程; 而写入消息队列时,却可能有多个线程写,所以需要在写入队列时加锁互斥.

        //  Signaler to pass signals from writer thread to reader thread.
        signaler_t signaler;

对于mailbox_safe_t则是根据对socket本身的互斥访问来读写它的命令消息队列,并且有多个signaler来通知可读状态.

std::vector <zmq::signaler_t* > signalers;

实际上读的时候它使用的是pthread_cond_wait和pthread_cond_broadcast的组合来获得锁.

void zmq::mailbox_safe_t::send (const command_t &cmd_)
{
    sync->lock ();
    cpipe.write (cmd_, false);
    const bool ok = cpipe.flush ();

    if (!ok) {
        cond_var.broadcast (); //调用pthread_cond_broadcast唤醒正在等待pthread_cond_wait返回的读线程
        for (std::vector<signaler_t*>::iterator it = signalers.begin(); it != signalers.end(); ++it){
            (*it)->send(); //唤醒各个reader有消息可读
        }
    }

    sync->unlock ();
}

int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
{
    //  Try to get the command straight away.
    if (cpipe.read (cmd_)) //无锁队列,能获取消息则必定由一个线程取出,compare_and_swap原子操作
        return 0;

    //  Wait for signal from the command sender.
    int rc = cond_var.wait (sync, timeout_); //获取sync的锁,并休眠等待pthread_cond_broadcast信号唤醒; 注意,pthread_cond_wait返回后,其实同时也获得了sync的锁
    if (rc == -1) {
        errno_assert (errno == EAGAIN || errno == EINTR);
        return -1;
    }

    //  Another thread may already fetch the command
    const bool ok = cpipe.read (cmd_);

    if (!ok) {
        errno = EAGAIN;
        return -1;
    }

    return 0;
}

笔者的分析是基于mailbox_t而不是mailbox_safe_t,所以对mailbox_safe_t的使用场合并没有经验研究.

3.signaler

邮箱是否有可待读取的命令消息,依靠signaler来通知.先来看一下这个类结构:

    class signaler_t
    {
    public:

        signaler_t ();
        ~signaler_t ();

        fd_t get_fd () const;
        void send ();
        int wait (int timeout_);
        void recv ();
        int recv_failable ();
        ......
    private:

        //  Creates a pair of file descriptors that will be used
        //  to pass the signals.
        static int make_fdpair (fd_t *r_, fd_t *w_);

        //  Underlying write & read file descriptor
        //  Will be -1 if we exceeded number of available handles
        fd_t w;
        fd_t r;

        ......
    };

signaler类主要是提供一对socket句柄(w/r).在支持socketpair的平台下(*nix),可直接调用返回;而在windows平台下,是通过打通w/r两个socket句柄的通信.当有写线程给mailbox发送命令消息时,判断如果持有mailbox的读线程挂起了,就调用mailbox的signaler->send():

void zmq::signaler_t::send ()
{
#if defined HAVE_FORK
    if (unlikely (pid != getpid ())) {
        //printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
        return; // do not send anything in forked child context
    }
#endif
#if defined ZMQ_HAVE_EVENTFD
    ......
#elif defined ZMQ_HAVE_WINDOWS
    unsigned char dummy = 0;
    int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
    wsa_assert (nbytes != SOCKET_ERROR);
    zmq_assert (nbytes == sizeof (dummy));
#else
    unsigned char dummy = 0;
    while (true) {
        ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
        if (unlikely (nbytes == -1 && errno == EINTR))
            continue;
#if defined(HAVE_FORK)
        if (unlikely (pid != getpid ())) {
            //printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
            errno = EINTR;
            break;
        }
#endif
        zmq_assert (nbytes == sizeof dummy);
        break;
    }
#endif
}

给w发送消息,这样r变成可读状态,挂起的select阻塞调用立即返回. mailbox.get_fd()返回的其实就是mailbox.signaler.r. *请注意*, signaler的w/r套接字句柄是可阻塞的.

对于非线程安全的mailbox_t,对于socket类对象,它们本身并没有I/O线程的loop()轮询函数,那么它的mailbox的可读消息状态是由signaler的r句柄通知,由signaler.wait()函数对r进行select调用,而signaler.wait()是一般是通过socket_base_t:process_commands() -> mailbox_t:recv () -> signaler:wait () 调用链.当mailbox的命令队列为空,r也没可读状态时,signaler:wait (int timeout) ,传入的timeout=-1,由于signaler的w/r是可阻塞的,这时调用process_commands()的线程将会阻塞在wait()的select调用.当然,context的I/O线程依然会继续loop()轮询.

那么阻塞了socket的线程如何被唤醒? 答案是通过给socket的mailbox发送消息.

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_){    ......    //  Create the socket and register its mailbox.
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
        return NULL;
    }
    sockets.push_back (s);
    slots [slot] = s->get_mailbox ();    ......}

在create_socket这个函数里,为context新增一个socket时,socket的mailbox就加入了slots的数组管理器里. 当I/O线程(或其他知道该scoket的mailbox对应的slot id的实例)给对应的mailbox发送消息,就会唤醒正在阻塞的socket了.

时间: 2024-10-17 16:34:30

zmq笔记三:socket和mailbox的相关文章

zmq笔记四: tcp的connect操作

int major, minor, patch;zmq_version(&major, &minor, &patch); //4.2.0 本文主要是分析代码,方便自己日后查阅. ========================================= 本文以REQ/REP为例,分析一下tcp的connect的实现过程. void *context = zmq_ctx_new(); void *requester = zmq_socket(context, ZMQ_REQ)

Caliburn.Micro学习笔记(三)----事件聚合IEventAggregator和 Ihandle&lt;T&gt;

Caliburn.Micro学习笔记(三)----事件聚合IEventAggregator和 Ihandle<T> 今天 说一下Caliburn.Micro的IEventAggregator和IHandle<T>分成两篇去讲这一篇写一个简单的例子 看一它的的实现和源码 下一篇用它们做一个多语言的demo 这两个是事件的订阅和广播,很强大,但用的时候要小心发生不必要的冲突. 先看一下它的实现思想 在Caliburn.Micro里EventAggregator要以单例的形式出现这样可以

构建之法阅读笔记三—结对编程

构建之法阅读笔记三——结对编程 何谓结对编程,结对编程就是程序员肩并肩,平等的,互补的进行开发工作,他们使用同一台电脑,编写同样的程序,一起分析,一起设计,一块交流想法. 然而我以前却并不是这样做的,我以前喜欢在没人打扰的环境下写代码,我觉得有人在我身边看着,会影响我的思路,还有我个人自尊心比较强,不太喜欢被人指指点点,所以每次都是,我写完代码之后,自己先找自己的bug,每当自己实在找不到之后,才会请教大神,但是有时候可能由于自己的能力不足,往往一个很简单的问题,我自己发现就会花费很久的时间,让

3. 蛤蟆的计算机组成原理笔记三系统总线

3. 蛤蟆的计算机组成原理笔记三系统总线 本篇名言:"公正,一定会打倒那些说假话和假作证的人. --赫拉克利特" 欢迎转载,转载请标明出处:http://blog.csdn.net/notbaron/article/details/47988545 1.  总线 总线是连接各个部件的信息传输线,是 各个部件共享的传输介质. 1.1             面向CPU 的双总线结构框图 1.2             单总线结构框图 1.3             以存储器为中心的双总线

OpenCV for Python 学习笔记 三

给源图像增加边界 cv2.copyMakeBorder(src,top, bottom, left, right ,borderType,value) src:源图像 top,bottem,left,right: 分别表示四个方向上边界的长度 borderType: 边界的类型 有以下几种: BORDER_REFLICATE # 直接用边界的颜色填充, aaaaaa | abcdefg | gggg BORDER_REFLECT # 倒映,abcdefg | gfedcbamn | nmabcd

NFC学习笔记——三(在windows操作系统上安装libnfc)

本篇翻译文章: 这篇文章主要是说明如何在windows操作系统上安装.配置和使用libnfc. 一.基本信息 1.操作系统: Windows Vista Home Premium SP 2 2.硬件信息: System: Dell Inspiron 1720 Processor: Intel Core 2 Duo CPU T9300 @ 2.5GHz 2.5GHz System type: 32-bit Operating System 3.所需软件: 在windows操作系统上安装软件需要下列

swift学习笔记(三)关于拷贝和引用

在swift提供的基本数据类型中,包括Int ,Float,Double,String,Enumeration,Structure,Dictionary都属于值拷贝类型. 闭包和函数同属引用类型 捕获则为拷贝.捕获即定义这些常量和变量的原作用域已不存在,闭包仍然可以在闭包函数体内引用和修改这些值 class属于引用类型. Array的情况稍微复杂一些,下面主要对集合类型进行分析: 一.关于Dictionary:无论何时将一个字典实例赋给一个常量,或者传递给一个函数方法时,在赋值或调用发生时,都会

《你必须知道的.NET》读书笔记三:体验OO之美

一.依赖也是哲学 (1)本质诠释:"不要调用我们,我们会调用你" (2)依赖和耦合: ①无依赖,无耦合: ②单向依赖,耦合度不高: ③双向依赖,耦合度较高: (3)设计的目标:高内聚,低耦合. ①低耦合:实现最简单的依赖关系,尽可能地减少类与类.模块与模块.层次与层次.系统与系统之间的联系: ②高内聚:一方面代表了职责的统一管理,一方面又代表了关系的有效隔离: (4)控制反转(IoC):代码的控制器交由系统控制而不是在代码内部,消除组件或模块间的直接依赖: (5)依赖注入(DI): ①

老男孩培训视频听课笔记三(在51cto上听的)

SSH 连接Linux工具CRT SSH概念: 现在有两个版本的SSH1/SSH2,建议选择SSH2 查看服务端启动情况:$netstat -lntup | grep 22 自己加的:现在CRT工具很多:crt xshell putty ,现在我使用的是xshell           另外在在centos系统里可以安装lrzsz的软件包,可以实现在crt里利用rz/sz上传/下载小文件,大文件容易出错           CRT连接经常出现的问题:              ·超时问题:利用p