ZeroMQ源码分析之Message

使用ZeroMQ创建消息时的代码通常如下:

zmq_msg_tmsgName;	//1
zmq_msg_init(&msgName);	//2

这两条代码做了什么呢?

首先对第1行代码,在zmq.h中有如下定义:

typedef structzmq_msg_t {unsigned char _ [32];} zmq_msg_t;

what?消息体就这样定义?也许这不是它的真面目。

看第2行代码,在zmq.cpp找到zmq_msg_init的实现方式。

<p style="margin-bottom: 0in; line-height: 100%">int zmq_msg_init(zmq_msg_t *msg_)</p><p style="margin-bottom: 0in; line-height: 100%">{</p><p style="margin-bottom: 0in; line-height: 100%">    return((zmq::msg_t*) msg_)->init ();</p><p style="margin-bottom: 0in; line-height: 100%">}</p>

晕,居然强制类型转换,把原本指向32字节空间的zmq_msg_t类型指针转换成了指向msg_t类型的指针。

现在来看一下这个神秘的msg_t,也就是ZeroMQ的消息类。

首先看msg.cpp中init()的实现

<p style="margin-bottom: 0in; line-height: 100%">int zmq::msg_t::init()</p><p style="margin-bottom: 0in; line-height: 100%">{</p><p style="margin-bottom: 0in; line-height: 100%">    u.vsm.type =type_vsm;</p><p style="margin-bottom: 0in; line-height: 100%">    u.vsm.flags =0;</p><p style="margin-bottom: 0in; line-height: 100%">    u.vsm.size = 0;</p><p style="margin-bottom: 0in; line-height: 100%">    return 0;</p><p style="margin-bottom: 0in; line-height: 100%">}</p>

可以大致看出就是初始化了消息的类型、标志和消息内容大小。对应的看一下在msg.hpp是如何定义这个消息结构体的:

在msg.hpp的msg_t类中有:

union {
            struct {
                unsigned char unused [max_vsm_size + 1];
                unsigned char type;
                unsigned char flags;
            } base;
            struct {
                unsigned char data [max_vsm_size];
                unsigned char size;
                unsigned char type;
                unsigned char flags;
            } vsm;
            struct {
                content_t *content;
                unsigned char unused [max_vsm_size + 1 - sizeof (content_t*)];
                unsigned char type;
                unsigned char flags;
            } lmsg;
            struct {
                void* data;
                size_t size;
                unsigned char unused
                    [max_vsm_size + 1 - sizeof (void*) - sizeof (size_t)];
                unsigned char type;
                unsigned char flags;
            } cmsg;
            struct {
                unsigned char unused [max_vsm_size + 1];
                unsigned char type;
                unsigned char flags;
            } delimiter;
        } u;

可见这里利用union来压缩空间。union维护足够的空间来置放多个数据成员中的“一种”,而不是为每一个数据成员配置空间,在union中所有的数据成员共用一个空间,同一时间只能储存其中一个数据成员,所有的数据成员具有相同的起始地址。

从这里可以看出ZeroMQ的几种消息类型:vsm(verysmall
message?), lmsg(long message?), cmsg(constant message)
和delimiter .

每个struct人为地控制为等长,其中unused数组就是用来控制每个struct的长度,使得后面的type和flags在每个struct中的存储位置是一样的.这样就可以做到,无论该消息是vsm或者lmsg或其他类型,只要调用u.base.type就能获取到这个消息的类型了.

enum {max_vsm_size =29};

通过vsm类型和lmsg类型的对比可以知道,ZeroMQ对短消息和长消息是区别对待的.对于短的消息,即不超过29字节的消息,直接复制赋值;而对于长消息,则需要在内存中分配空间,如下面代码所示:

//初始化消息大小

int zmq::msg_t::init_size (size_t size_)
{
    if (size_ <= max_vsm_size) {
	//当消息为小消息时
        u.vsm.type = type_vsm;
        u.vsm.flags = 0;
        u.vsm.size = (unsigned char) size_;
    }
    else {

        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content =
            (content_t*) malloc (sizeof (content_t) + size_);
		//消息为长消息,需要分配内存空间
        if (unlikely (!u.lmsg.content)) {
            errno = ENOMEM;
            return -1;
        }

        u.lmsg.content->data = u.lmsg.content + 1;
			//指向在内存空间中分配的消息内容的地址
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = NULL;
        u.lmsg.content->hint = NULL;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
    }
    return 0;
}

//初始化消息内容

int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
    void *hint_)
{
    //  If data is NULL and size is not 0, a segfault
    //  would occur once the data is accessed
    assert (data_ != NULL || size_ == 0);

    //  Initialize constant message if there's no need to deallocate
    if(ffn_ == NULL) {
	//如果销毁函数为空,则该消息为常量消息
        u.cmsg.type = type_cmsg;
        u.cmsg.flags = 0;
        u.cmsg.data = data_;
        u.cmsg.size = size_;
    }
    else {
        u.lmsg.type = type_lmsg;
        u.lmsg.flags = 0;
        u.lmsg.content = (content_t*) malloc (sizeof (content_t));
        if (!u.lmsg.content) {
            errno = ENOMEM;
            return -1;
        }

        u.lmsg.content->data = data_;
        u.lmsg.content->size = size_;
        u.lmsg.content->ffn = ffn_;
        u.lmsg.content->hint = hint_;
        new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
		//placement new 的用法,后面说明
    }
    return 0;

}

这里有必要看一下上面出现的content的结构:

 struct content_t
        {
            void *data;
            size_t size;
            msg_free_fn *ffn;
            void *hint;
            zmq::atomic_counter_t refcnt;
        };

其中ffn为销毁消息时使用的函数指针,而refcnt则是该消息被共享次数的计数器,当该计算器计数为0,即该消息以及没有被使用时,则该消息销毁.

在上面msg_t::init_data()中出现了这么一行:

new(&u.lmsg.content->refcnt) zmq::atomic_counter_t ();

使用了placementnew的写法.placementnew是用来实现定位构造的,也就是在取得了一块可以容纳指定类型对象的内存后,在这块内存上构造一个对象.对new的深入了解,可以参考这个博客:http://blog.csdn.net/songthin/article/details/1703966.

再回过头来看最开头的地方,好像还有一个问题没解决:

typedef structzmq_msg_t {unsigned char _ [32];} zmq_msg_t;
int zmq_msg_init (zmq_msg_t *msg_)
{
    return ((zmq::msg_t*) msg_)->init ();
}

这里做的强制类型转换,把原本指向32字节空间的zmq_msg_t类型指针转换成了指向msg_t类型的指针,为什么是32位呢,通过下面代码,对消息结构进行字节计算,不难发现每个消息结构就是占了32个字节的.只不过长消息中使用了指针指向了用于存储长消息数据的内存空间而已.所以不要被外表所蒙骗,要看到内在,才知道她的心是怎样的.

struct {
                unsigned char data [max_vsm_size];
                unsigned char size;
                unsigned char type;
                unsigned char flags;
            } vsm;
struct {
                content_t *content;
                unsigned char unused [max_vsm_size + 1 - sizeof (content_t*)];
                unsigned char type;
                unsigned char flags;
            } lmsg;
enum {max_vsm_size =29}; 
时间: 2024-10-12 08:34:37

ZeroMQ源码分析之Message的相关文章

zeromq源码分析笔记之线程间收发命令(2)

在zeromq源码分析笔记之架构说到了zmq的整体架构,可以看到线程间通信包括两类,一类是用于收发命令,告知对象该调用什么方法去做什么事情,命令的结构由command_t结构体确定:另一类是socket_base_t实例与session的消息通信,消息的结构由msg_t确定.命令的发送与存储是通过mailbox_t实现的,消息的发送和存储是通过pipe_t实现的,这两个结构都会详细说到,今天先说一下线程间的收发命令. zeromq的线程可分为两类,一类是io线程,像reaper_t.io_thr

RocketMQ 源码分析(二) —— Message 存储

CommitLog 结构 CommitLog.MappedFileQueue.MappedFile 的关系如下: CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N. 反应到系统文件如下: ··· Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd /Users/yunai/store/commitlog Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l t

zeromq源码分析笔记之准备篇

zeromq这个库主要用于进程通信,包括本地进程.网络通信,涉及到一些基础知识,主要包括管道通信,socket编程的内容,反应器模式(使用IO多路复用实现),无锁队列这几块比较重要的部分,下面的几个链接是这几块内容的学习笔记,有了这些知识,能比较好的理解这个开源库 1.socket原理详解 2.I/O多路复用之select 3.I/O多路复用之poll 4.I/O多路复用之epoll

转:Mongodb源码分析之Replication模式

原文出处:http://www.cnblogs.com/daizhj/archive/2011/06/13/mongodb_sourcecode_rep mongodb中提供了复制(Replication)机制,通过该机制可以帮助我们很容易实现读写分离方案,并支持灾难恢复(服务器断电)等意外情况下的数据安全. 在老版本(1.6)中,Mongo提供了两种方式的复制:master-slave及replica pair模式(注:mongodb最新支持的replset复制集方式可看成是pair的升级版,

Android异步消息处理 Handler Looper Message关系源码分析

# 标签: 读博客 对于Handler Looper Message 之前一直只是知道理论,知其然不知所以然,看了hongyang大神的源码分析,写个总结帖. 一.概念.. Handler . Looper .Message 这三者都与Android异步消息处理线程相关的概念. 异步消息处理线程启动后会进入一个无限的循环体之中,每循环一次,从其内部的消息队列中取出一个消息,然后回调相应的消息处理函数,执行完成一个消息后则继续循环.若消息队列为空,线程则会阻塞等待. 说了这一堆,那么和Handle

Android -- 消息处理机制源码分析(Looper,Handler,Message)

android的消息处理有三个核心类:Looper,Handler和Message.其实还有一个Message Queue(消息队列),但是MQ被封装到Looper里面了,我们不会直接与MQ打交道,因此我没将其作为核心类.下面一一介绍: Looper Looper的字面意思是“循环者”,它被设计用来使一个普通线程变成Looper线程.所谓Looper线程就是循环工作的线程.在程序开发中(尤其是GUI开发中),我们经常会需要一个线程不断循环,一旦有新任务则执行,执行完继续等待下一个任务,这就是Lo

supervisor启动worker源码分析-worker.clj

supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见"storm启动supervisor源码分析-supervisor.clj".sync-processes函数代码片段如下: sync-processes函数代码片段 ;; sync-processes函数用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers;; supervisor标识supervis

RocketMQ 源码分析

RocketMQ 源码分析 RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异).接下来记录下自己阅读源码的一些探索. RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务:生产者和消费者是消息的源头和归宿. 在知道各个角色的基本位置后,就该让程序跑

keystone源码分析(一)——Paste Deploy的应用

本keystone源码分析系列基于Juno版Keystone,于2014年10月16日随Juno版OpenStack发布. Keystone作为OpenStack中的身份管理与授权模块,主要实现系统用户的身份认证.基于角色的授权管理.其他OpenStack服务的地址发现和安全策略管理等功能.Keystone作为开源云系统OpenStack中至关重要的组成部分,与OpenStack中几乎所有的其他服务(如Nova, Glance, Neutron等)都有着密切的联系.同时,Keystone作为开源