zmq笔记一: 对象关系

int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0

本文主要是分析代码,方便自己日后查阅.

=========================================

1.上下文对象以及socket对象创建

void *context = zmq_ctx_new(); //创建上下文对象
void *responder = zmq_socket(context, ZMQ_REP); //创建socket类型的对象 zmq::socket_base_t *s = ctx->create_socket (type_);
int rc = zmq_bind(responder, "tcp://*:6666"); //绑定端口

一般一个进程只有一个context对象,它管理着所有socket对象. context是线程安全的,可以在多线程之间传递使用,但是socket对象不是线程安全.

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
    slot_sync.lock ();
    if (unlikely (starting)) {

        starting = false;
        //  Initialise the array of mailboxes. Additional three slots are for
        //  zmq_ctx_term thread and reaper thread.
        opt_sync.lock ();
        int mazmq = max_sockets;//默认值
        int ios = io_thread_count; //最大io线程数量,默认只有1个
        opt_sync.unlock ();
        slot_count = mazmq + ios + 2; //slot_count决定了邮箱数量, 2这个数字是指下面的 term_tid + reaper_tid 两个mailbox的数组占坑位置
        slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
        alloc_assert (slots);

        //  Initialise the infrastructure for zmq_ctx_term thread.
        slots [term_tid] = &term_mailbox; //zmq终结处理时的唯一的一个邮箱

        //  Create the reaper thread.
        reaper = new (std::nothrow) reaper_t (this, reaper_tid); //收割线程,回收结束使用的socket
        alloc_assert (reaper);
        slots [reaper_tid] = reaper->get_mailbox ();//把线程的邮箱放在全局邮箱管理
        reaper->start ();

        //  Create I/O thread objects and launch them.
        for (int i = 2; i != ios + 2; i++) {//除终结处理和回收线程邮箱外,还要开启一共ios个io线程
            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
            alloc_assert (io_thread);
            io_threads.push_back (io_thread); //将所有开启的io线程统一管理
            slots [i] = io_thread->get_mailbox (); //线程的邮箱放在全局邮箱管理
            io_thread->start ();
        }

        //  In the unused part of the slot array, create a list of empty slots.
        for (int32_t i = (int32_t) slot_count - 1;
              i >= (int32_t) ios + 2; i--) {
            empty_slots.push_back (i); //还没被使用的邮箱占位
            slots [i] = NULL;
        }
    }

    //  Once zmq_ctx_term() was called, we can‘t create new sockets.
    if (terminating) {
        slot_sync.unlock ();
        errno = ETERM;
        return NULL;
    }

    //  If max_sockets limit was reached, return error.
    if (empty_slots.empty ()) {
        slot_sync.unlock ();
        errno = EMFILE;
        return NULL;
    }

    //  Choose a slot for the socket.
    uint32_t slot = empty_slots.back (); //当前新建的socket类型占一个邮箱位置
    empty_slots.pop_back ();

    //  Generate new unique socket ID.
    int sid = ((int) max_socket_id.add (1)) + 1; //原子递增的socket id

    //  Create the socket and register its mailbox.
    socket_base_t *s = socket_base_t::create (type_, this, slot, sid); //创建当前类型socket对象,并初始化它对应的邮箱
    if (!s) {
        empty_slots.push_back (slot);
        slot_sync.unlock ();
        return NULL;
    }
    sockets.push_back (s); //所有创建的socket对象统一管理
    slots [slot] = s->get_mailbox (); //对应占坑

    slot_sync.unlock ();
    return s;
}

每一种socket都有一个与之对应的对象类,创建时需要绑定context,邮箱位置,socket id 等.

zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
    uint32_t tid_, int sid_)
{
    socket_base_t *s = NULL;
    switch (type_) {
        case ZMQ_PAIR:
            s = new (std::nothrow) pair_t (parent_, tid_, sid_);
            break;
        case ZMQ_PUB:
            s = new (std::nothrow) pub_t (parent_, tid_, sid_);
            break;
        case ZMQ_SUB:
            s = new (std::nothrow) sub_t (parent_, tid_, sid_);
            break;
        case ZMQ_REQ:
            s = new (std::nothrow) req_t (parent_, tid_, sid_);
            break;
        case ZMQ_REP:
            s = new (std::nothrow) rep_t (parent_, tid_, sid_);
            break;
      ......
        case ZMQ_GATHER:
            s = new (std::nothrow) gather_t (parent_, tid_, sid_);
            break;
        case ZMQ_SCATTER:
            s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
            break;
        default:
            errno = EINVAL;
            return NULL;
    }

    alloc_assert (s);

    if (s->mailbox == NULL) {
        s->destroyed = true;
        LIBZMQ_DELETE(s);
        return NULL;
    }

    return s;
}

object_t 类对象继承关系图:( zmq版本:4.2.0, 生成工具:doxygen,graphviz )

2.线程与通信

I / O线程(io_thread_t)是ZMQ异步处理网络IO的后台线程。io_thread_t实现继承object_t ,并实现 i_poll_events 接口,其内部包含一个邮箱(mailbox_t)和一个poller对象(poller_t)。

class io_thread_t : public object_t, public i_poll_events{
 ...
}
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
    object_t (ctx_, tid_)
{
    poller = new (std::nothrow) poller_t (*ctx_); //注意,poller_t是根据操作系统来定义的,typedef select_t/poll_t/epoll_t, 本文在windows操作系统分析,用的是select_t,基本逻辑大同小异
    alloc_assert (poller);

    mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (mailbox_handle);
}
void zmq::io_thread_t::start ()
{
    //  Start the underlying I/O thread.
    poller->start ();
}
void zmq::select_t::start ()
{
    ctx.start_thread (worker, worker_routine, this);
}
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
    thread_.start(tfn_, arg_);
    thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}
void zmq::select_t::worker_routine (void *arg_)
{
    ((select_t*) arg_)->loop ();
}
void zmq::select_t::loop () //io线程循环是在poller_t(typedef select_t poller_t)的loop函数
{
     while (!stopping) {
     ....
   }
}

  

poller_t 是从不同操作系统提供的事件通知机制中抽象出来的概念,用来通知描述符和计时器事件,poller_t 通过 typedef定义为操作系统首选的通知机制(select_t/poll_t/epoll_t 等)。所有运行在 io_thread_t上的对象都继承自辅助类 io_object_t,该类实现了向io_thread_t注册/删除文件描述符 (add_fd/rm_fd)和计时器(add_timer/cancel_timer)事件的功能,同时io_object_t 还继承了 i_poll_events 接口来实现事件回调功能。i_poll_events 接口定义了文件描述符和计时器事件就绪时的回调处理函数(in_event/out_event/timer_event)。io_thread_t 实现此接口(in_event)来处理来自mailbox的事件。

继承object_t使得io_thread_t能够发送和接收command,mailbox_t 用来存储发送给任何居住在io_thread_t 上的object_t 的命令,每个io_thread_t 上有多个对象,这些对象公用同一个邮箱,邮箱的收件人就是对象。mailbox_t本质是一个具有就绪通知功能的存储命令的队列。就绪通知机制由signaler_t提供的文件描述符实现。队列是由ypipe_t实现的无锁无溢出队列。当mailbox_t事件触发时,io线程从mailbox中获取命令,并让命令的接收者进行处理。

io线程之间的通信是通过发命令消息(command_t)到对方的mailbox,而socket_base_t实例与session的消息通信则通过发送消息对象msg_t.

ZMQ内部使用两种不同类型的线程(拥有邮箱的对象):I/O线程(io_thread_t)和socket(socket_base_t). 其中io线程,像reaper_t、io_thread_t都属于这一类,这类线程的特点就是内含一个轮询器poller及mailbox_t,通过poller可以监听激活mailbox_t的信号 ;另一类是zmq的socket,所有socket_base_t实例化的对象都可以看做一个单独的线程,这类线程不含poller,但同样含有一个mailbox_t,可以用于收发命令; 由于不含poller,只能在每次使用socket_base_t实例的时候先处理一下mailbox_t,看是否有命令需要处理(process_commands函数), 例如:

int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{
    ENTER_MUTEX ();

    ......

    //  Process pending commands, if any.
    int rc = process_commands (0, true); //处理一次邮箱里的命令
    if (unlikely (rc != 0)) {
        EXIT_MUTEX ();
        return -1;
    }

    //  Clear any user-visible flags that are set on the message.
    msg_->reset_flags (msg_t::more);

    //  At this point we impose the flags on the message.
    if (flags_ & ZMQ_SNDMORE)
        msg_->set_flags (msg_t::more);

    msg_->reset_metadata ();

    //  Try to send the message using method in each socket class
    rc = xsend (msg_);
    ......
    while (true) {
        if (unlikely (process_commands (timeout, false) != 0)) {//再次处理邮箱里的命令
            EXIT_MUTEX ();
            return -1;
        }
        rc = xsend (msg_);
        ......
    return 0;
}

i_poll_events 类对象继承关系图:( zmq版本:4.2.0, 生成工具:doxygen,graphviz )

参考文章:

http://www.cnblogs.com/zengzy/p/5132437.html

http://watter1985.iteye.com/blog/1736023

时间: 2024-10-15 04:25:37

zmq笔记一: 对象关系的相关文章

[原创]java WEB学习笔记81:Hibernate学习之路--- 对象关系映射文件(.hbm.xml):hibernate-mapping 节点,class节点,id节点(主键生成策略),property节点

本博客的目的:①总结自己的学习过程,相当于学习笔记 ②将自己的经验分享给大家,相互学习,互相交流,不可商用 内容难免出现问题,欢迎指正,交流,探讨,可以留言,也可以通过以下方式联系. 本人互联网技术爱好者,互联网技术发烧友 微博:伊直都在0221 QQ:951226918 -----------------------------------------------------------------------------------------------------------------

重构笔记——隐藏“委托关系”

本文是在学习中的总结,欢迎转载但请注明出处:http://blog.csdn.net/pistolove/article/details/43769929         在上一篇文章中介绍了"将类内联化".本文将介绍"隐藏委托关系"这种重构手法.         下面让我们来学习这种重构手法吧. 开门见山         发现:客户通过一个委托关系来调用另一个对象. 解决:在服务类上建立客户所需的所有函数,用以隐藏委托关系. 动机 我们都知道,"封装&q

Effective c++(笔记)之继承关系与面向对象设计

1.公有继承(public inheritance) 意味着"是一种"(isa)的关系 解析:一定要深刻理解这句话的含义,不要认为这大家都知道,本来我也这样认为,当我看完这章后,就不这样认为了. 公有继承可以这样理解,如果令class D以public 的形式继承了class B ,那么可以这样认为,每一个类型为D的对象同时也可以认为是类型为B的对象,但反过来是不成立的,对象D是更特殊化更具体的的概念,而B是更一般化的概念,每一件事情只要能够施行于基类对象身上,就一定可以应用于派生类对

《python源码剖析》笔记 python对象初探

本文为senlie原创,转载请保留此地址:http://blog.csdn.net/zhengsenlie 1.      在python中,对象就是为C中的结构体在堆上申请的一块内存.对象不能被静态初始化,也不能在栈空间生存.但内建的类型对象都是被静态初始化的. 2.      对象创建后大小不变.可变对象有一指针指向可变大小的内存区域. 3.      对象机制的基石:PyObject 定长对象: typedef struct _object{ PyObject_HEAD }PyObject

JavaScript语言精粹读书笔记- JavaScript对象

JavaScript 对象 除了数字.字符串.布尔值.null.undefined(都不可变)这5种简单类型,其他都是对象. JavaScript中的对象是可变的键控集合(keyed collections). 对象是属性的容器,其中每个属性都拥有名字和值. JavaScript中的对象是无类别的(class-free)的.它对新属性的名字和值没有约束. JavaScript包括一个原型链特性,允许对象继承另一对象的属性. 对象的检索: stooge[“first-name”]或者stooge.

对象关系分析(非继承关系)

现实环境存在各种对象关系,可以使用特殊的关系类型描述词来形容这些关系. 例如: 1. 方形是属于形状的一种,可以用"is-a"来描述方形与形状的关系. 2. 汽车都有方向盘,可以用"has-a"来描述汽车与方向盘的关系. 3. 程序员使用键盘编辑代码,可以用"uses-a"来描述程序员与键盘的关系. 4. 花朵依赖蜜蜂传播花粉,可以使用"depends-on"来描述花朵与蜜蜂的关系. 5. 学生是一个班级的成员,可以使用&qu

Hibernate(开放源代码的对象关系映射框架)

Hibernate是一个开放源代码的对象关系映射框架,它对JDBC进行了非常轻量级的对象封装,它将POJO与数据库表建立映射关系,是一个全自动的orm框架,hibernate可以自动生成SQL语句,自动执行,使得Java程序员可以随心所欲的使用对象编程思维来操纵数据库. Hibernate可以应用在任何使用JDBC的场合,既可以在Java的客户端程序使用,也可以在Servlet/JSP的Web应用中使用,最具革命意义的是,Hibernate可以在应用EJB的J2EE架构中取代CMP,完成数据持久

Hibernate (开放源代码的对象关系映射框架)

Hibernate是一个开放源代码的对象关系映射框架,它对JDBC进行了非常轻量级的对象封装,它将POJO与数据库表建立映射关系,是一个全自动的orm框架,hibernate可以自动生成SQL语句,自动执行,使得Java程序员可以随心所欲的使用对象编程思维来操纵数据库. Hibernate可以应用在任何使用JDBC的场合,既可以在Java的客户端程序使用,也可以在Servlet/JSP的Web应用中使用,最具革命意义的是,Hibernate可以在应用EJB的J2EE架构中取代CMP,完成数据持久

ORM : 对象关系映射

ORM 对象关系映射(英语:Object Relational Mapping,简称ORM,或O/RM,或O/R mapping),是一种程序技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换.从效果上说,它其实是创建了一个可在编程语言里使用的“虚拟对象数据库”.