zmq消息传输基本功能的实现、传输模式

zmq的基本功能:

(1)将消息快速高效地发送给其他节点,这里的节点可以是线程、进程、或是其他计算机;

(2)zmq为应用程序提供了一套简单的套结字API,不用考虑实际使用的协议类型(进程内、进程间、TPC、或广播);

(3)当节点调动时,zmq会自动进行链接或者重连;

(4)无论是发送消息还是接收消息,zmq都会先将消息放入队列中,并保证进程不会因为内存溢出而崩溃,适时地将消息写入磁盘;

(5)zmq会处理套接字异常;

(6)所有的I/O操作都在后台进行;

(7)zmq不会产生死锁。

zmq的使用:

在链接两个节点时,其中一个需要使用zmq_bind(),另一个则使用zmq_connect()。通常来讲,使用zmq_bind()链接的节点称之为服务端,它有着一个较为固定的网络地址;使用zmq_connect()链接的节点称为客户端,其地址不固定。每当客户端使用zmq_connect()链接上述某个端点时,服务端就会自动创建链接。zmq没有对链接数量进行限制。此外,客户端节点也可以使用一个套接字同时建立多个链接。

发送和接收消息使用的是zmq_send()和zmq_recv()这两个函数。zmq套接字可以发送消息给多个端点(扇出模型),或从多个端点中接收消息(扇入模型)。所以,向套接字写入一个消息时可能会将消息发送给很多节点,相应的,套接字又会从所有已建立的链接中接收消息。zmq_recv()方法使用了公平队列的算法来决定接收哪个链接的消息。

调用zmq_send()方法时其实并没有真正将消息发送给套接字链接。消息会在一个内存队列中保存下来,并由后台的I/O线程异步地进行发送。如果不出意外情况,这一行为是非阻塞的。所以说,即便zmq_send()有返回值,并不能代表消息已经发送。已发送消息不能重复使用。

使用举例:

//
//  管道模式 - 结构收集器 设计2
//  添加发布-订阅消息流,用以向worker发送自杀信号
//
#include "zhelpers.h"

int main (void)
{
    void *context = zmq_init (1);

    //  用于接收消息的套接字
    void *receiver = zmq_socket (context, ZMQ_PULL);
    zmq_bind (receiver, "tcp://*:5558");

    //  用以发送控制信息的套接字
    void *controller = zmq_socket (context, ZMQ_PUB);
    zmq_bind (controller, "tcp://*:5559");

    //  等待任务开始
    char *string = s_recv (receiver);
    free (string);
    //  开始计时
    int64_t start_time = s_clock ();

    //  确认100个任务处理完毕
    int task_nbr;
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        char *string = s_recv (receiver);
        free (string);
        if ((task_nbr / 10) * 10 == task_nbr)
            printf (":");
        else
            printf (".");
        fflush (stdout);
    }
    printf ("总执行时间: %d msec\n",
        (int) (s_clock () - start_time));
    //  发送自杀消息给worker
    s_send (controller, "KILL");
    //  结束
    sleep (1);              //  等待发送完毕
    zmq_close (receiver);
    zmq_close (controller);
    zmq_term (context);
    return 0;
}

zmq发送和接收消息C++代码

///接收消息:
    zmq_msg_t message; // 创建消息结构
    zmq_msg_init (&message); // 初始化空消息
    zmq_recv (socket, &message, 0); // 接收消息
    int size = zmq_msg_size (&message); // 计算消息的大小
    char *string = malloc (size + 1); // 分配string为指向size + 1大小的heap空间,那个多出来的1字节是‘\0‘的空间
    memcpy (string, zmq_msg_data (&message), size); // 通过zmq_msg_data(1)获得消息的data地址,拷贝到字符串中
    zmq_msg_close (&message); // 释放或销毁消息
    string [size] = 0; // 设置‘\0‘
    return (string); 

///发送消息:
    int rc;
    zmq_msg_t message; // 创建消息结构
    zmq_msg_init_size (&message, strlen (string)); // 以字符串长度(不包括‘\0‘)初始化成消息
    memcpy (zmq_msg_data (&message), string, strlen (string)); // 将字符串的内容(不包括‘\0‘)拷贝给消息
    rc = zmq_send (socket, &message, 0); // 发送消息
    assert (!rc);
    zmq_msg_close (&message); // 释放和销毁消息
    return (rc);   

socket types: zmq一共具有12种类型的socket,5种消息模式。

(1)请求/应答模式:ZMQ_REQ、ZMQ_REP、ZMQ_DEALER、ZMQ_ROUTER

(2)发布/订阅模式:ZMQ_PUB、ZMQ_SUB、ZMQ_XPUB、ZMQ_XSUB

(3)管道模式:ZMQ_PUSH、ZMQ_PULL

(4)本地模式:ZMQ_STREAM

请求/应答模式

ZMQ_REQ

一般用于客户端发送请求消息,此类型的socket必须严格遵循先发送後接收的顺序,如果发生异常或者当前没有可用的服务(连接),socket会阻塞,直到有可用的服务(新连接的到来),再把消息发送出去。REQ类型的socket不会丢弃信息。

ZMQ_REP发送消息时会自动在消息顶部插入一个空帧。

特点总结:

(1)可兼容的socket types: ZMQ_REP, ZMQ_ROUTER

(2)数据传输:双向

(3)发送/接收模式:发送a接收a

(4)发送路由策略:Round-robin(循环队列)

(5)接收路由策略:Last peer

(6)进入mute状态后: 阻塞。

ZMQ_REP

一般用于服务端接收消息,此类型的socket必须严格遵循先接收後发送的顺序,从客户端接收请求消息使用了公平队列,回应客户端时,所有的reply都会被路由到最后下达请求的客户端。

如果发生异常或者当前没有可用的客户端链接,所有消息都会毫无提示的被丢弃,不会发生阻塞。

特点总结:

(1)可兼容的socket types:ZMQ_REQ,ZMQ_DEALER

(2)数据传输:双向

(3)发送/接收模式:接收a发送a

(4)发送路由策略:Last peer

(5)接收路由策略:Fair-queued(公平队列)

ZMQ_DEALER

DEALER是一种用于请求/答应模式的更高级的扩展Socket,它可以自由的收发消息,没有ZMQ_REP/ZMQ_REQ那样的限制。

对于每一个连接,接收消息也是使用了公平队列,发送使用了循环队列(RR)。

ZMQ_DEALERE受ZMQ_RCVHW和ZMQ_SHDHW两个阈值影响(可通过zmq_setsockopt函数设置),一旦发送或接收消息队列达到阈值,socket就会进入mute状态,此时对DEALER的任何xmq_send操作都会阻塞,直到mute状态结束。

如果当前没有有效的链接,zmq_send操作也会阻塞,直到有新的链接到来为止。

DEALER发生阻塞并不会丢弃消息

注意:如果zmq_DEALER连接到ZMQ_REP,每一个消息包必须包含一个空帧,然后再紧跟着数据包体。

特点总结:

(1)可兼容的Socket types:ZMQ_ROUTER, ZMQ_REP, ZMQ_DEALER

(2)数据传输:双向

(3)发送/接收模式:无限制

(4)发送路由策略:Round-robin(循环队列)

(5)接收路由策略:Faie-queued(公平队列)

(6)进入mute状态後:阻塞

ZMQ_ROUTER

ZMQ_ROUTER是一种用于请求/答应模式的更高级的扩展Socket,它可以自由的收发消息。

当ZMQ_ROUTER接收到消息时,会自动在消息顶部加入来源地址标识符,接收消息使用了公平队列。当发送消息时,ZMQ_ROUTER又会自动去掉这个标识符,并且根据这个标识符路由到相应的端点。

如果此地址标识的端点不存在,默认会毫无征兆的丢弃消息,除非将ZMQ_ROUTE_MANDATORY选项设置为1。

当队列达到阈值时,socket就会进入mute状态,此时所有后续发送到此Socket的消息都会被丢弃,直到mute状态结束。同样的,如果对端的接收队列达到了阈值,消息也会被丢弃。

如果ZMQ_REQ连接到ZMQ_ROUTER,从ZMQ_ROUTER接收到ZMQ_REQ消息时,除了会在消息前加上来源地址标识符之外,还会加上一个空帧与原消息分隔。因此消息可以包含多个地址标识符和多个数据包体。地址和数据体之间必须用空帧分割;发送回应消息给ZMQ_REQ时,必须至少包含一个空帧;发送消息时,ZMQ_ROUTER会根据第一个地址标识符路由到对应的端点。

特点总结:

(1)可兼容的Sockte types:ZMQ_DEALER, ZMQ_REQ, ZMQ_ROUTER

(2)数据传输:双向

(3)发送/接收模式:无限制

(4)接收路由策略:Fair-queued(公平队列)

(5)进入mute状态後:丢弃消息

发布/订阅模式

ZMQ_PUB

ZMQ_PUB类型的Socket以发布者的身份向订阅者分发消息,消息以扇出的形式发送给所有订阅者链接。

ZMQ_PUB类型的Socket没有实现zmq_recv函数,所以不能对其调用zmq_recv函数!

当ZMQ_PUB Socket达到阈值时进入mute状态,此时后续发送的消息会被丢弃,直到mute状态结束。

对ZMQ_PUB Socket调用zmq_send永远不会发生阻塞。

特点总结:

(1)可兼容的Socket types: ZMQ_SUB, ZMQ_XSUB

(2)数据传输:单向

(3)发送/接收模式:只能发送

(4)接收路由策略:Fan out(扇出)

(5)进入mute状态後:丢弃消息

ZMQ_SUB

ZMQ_SUB类型的Socket以订阅者的身份接收消息。初始的ZMQ_SUB Socket没有订阅任何消息,可以通过设置ZMQ_SUBSRIBE选项来指定需要订阅的消息。

ZMQ_SUB Socket没有实现zmq_send函数,所以不能对其调用zmq_send函数!

特点总结:

(1)可兼容的Socket types: ZMQ_PUB, ZMQ_XPUB

(2)数据传输:单向

(3)发送/接收模式:只能接收

(4)接收路由策略:Fair-queued(公平队列)

Socket options(部分)

概要:通过zmq_setsockopt和zmq_getsocketopt函数来设置/读取指定选项

ZMQ_SNDHWM

设置指定Socket的发送消息队列在高水位标识(阈值),ZMQ会严格限制发送队列的上限数。0表示无限制。

如果达到了这个限制,socket就会进入异常状态,zmq此时会采取适当的措施——阻塞或丢弃消息,这取决于socket的类型。

Note: ZMQ不保证socket一定能接收ZMQ_SNDHWM这么多的消息,甚至可能会低60%-70%,这取决于socket上的信息流。

ZMQ_RCVHWM

设置指定Socket的接收消息队列的高水位标识(阈值),ZMQ会严格限制接收队列的上限数。0表示无限制。

如果达到了这个限制,socket就会进入异常状态,ZMQ此时会采取适当的措施——阻塞或丢弃消息,这取决于socket的类型

ZMQ_SUBSCRIBE

ZMQ_SUBSCRIBE选项会在ZMQ_SUB socket上建立一个消息过滤器。初始的ZMQ_SUB Socket会过滤掉所有的消息,因此必须设置这个选项,否则将收不到任何消息。

如果设置一个0长度的空值,ZMQ_SUB Socket会接受所有的消息。设置一个非空值将接受指定的消息。可以在同一个ZMQ_SUB Socket上设置多个过滤器,它将会接受至少一个匹配的消息。

ZMQ_UNSUBSCRIBE

此选项用来删除ZMQ_SUB Socket上通过ZMQ_SUBSCRIBE设置过的消息过滤器。如果Socket有多个实例有相同的过滤器,只删除其中一个。

ZMQ_IDENTITY

此选项用来设置Socket的身份标识,只能用于请求/答应模式。ROUTER Socket可以根据这个身份来标识路由信息。

身份标识的长度规定在1-255bytes, 由二进制零开头的标识符为ZMQ保留使用。

如果两个身份标识相同的Socket链接到同一个对端(ROUTER),结果行为是未定义的。

ZMQ_RECVTIMEO

设置Socket的receive操作的超时。如果为0,则zmq_recv会立即返回,如果没有接收到消息,会返回一个EAGAIN错误;如果为-1,Socket会阻塞到有可用消息为止;如果为其他值,Socket要么阻塞达到指定的时间还没接收到可用的消息,返回一个EAGAIN错误,要么在指定时间前接收到可用消息。

ZMQ_SENDTIMEO

设置Socket的Send操作的超时。如果为0,则zmq_send会立即返回,如果消息没有发送成功,会返回一个EAGAIN错误;如果为-1, Socket会抑制阻塞到消息发送完毕;

如果为其他值,Socket要么阻塞达到指定的时间还没发送完成,返回一个EAGAIN错误,要么在指定时间前发送完消息。

参考:https://www.kancloud.cn/wizardforcel/zmq-guide/146964

https://www.cnblogs.com/dvwei/p/3608119.html

原文地址:https://www.cnblogs.com/zhibei/p/12443218.html

时间: 2024-12-16 13:00:53

zmq消息传输基本功能的实现、传输模式的相关文章

ZeroMQ接口函数之 :zmq - 0MQ 轻量级消息传输内核

zmq(7) 0MQ Manual - 0MQ/3.2.5 Name zmq – ØMQ 轻量级消息传输内核 Synopsis #include <znq.h> cc [flags] files –lzmq [libraries] Description ØMQ轻量级消息传输内核是一个从标准socket接口的扩展而来的链接库,这些接口通常是由一些专门的传送中间设备来提供.ØMQ提供了一个步消息传送.多模式消息传送.消息过滤(订阅).对多种传输协议无缝接入的集合. 本文档呈现了ØMQ的概念,描述

WCF 客户端与服务端消息传输

WCF很多需要认证信息,保证服务的安全,可以使用消息来实现 WCF 实现消息的方式: WCF中有两个接口: IClientMessageInspector [定义一个消息检查器对象,该对象可以添加到 System.ServiceModel.Dispatcher.ClientRuntime.MessageInspectors集合来查看或修改消息] IDispatchMessageInspector  [定义一些方法,通过这些方法,可以在服务应用程序中对入站和出站应用程序消息进行自定义检查或修改.]

MQTT是IBM开发的一个即时通讯协议,构建于TCP/IP协议上,是物联网IoT的订阅协议,借助消息推送功能,可以更好地实现远程控制

最近一直做物联网方面的开发,以下内容关于使用MQTT过程中遇到问题的记录以及需要掌握的机制原理,主要讲解理论. 背景 MQTT是IBM开发的一个即时通讯协议.MQTT构建于TCP/IP协议上,面向M2M和物联网IoT的连接协议,采用轻量级发布和订阅消息传输机制.Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的短消息通信简单易用. 基本概念 [MQTT协议特点]——相比于RESTful架构的物联网系统,MQ

ZeroMQ接口函数之 :zmq_msg_init_data - 从一个指定的存储空间中初始化一个ZMQ消息对象的数据

ZeroMQ 官方地址 :http://api.zeromq.org/4-1:zmq_msg_init_data zmq_msg_init_data(3) ØMQ Manual - ØMQ/3.2.5 Name zmq_msg_init_data - 从一个指定的存储空间中初始化一个ZMQ消息对象的数据 Synopsis typedef void (zmq_free_fn) (void *data, void *hint); int zmq_msg_init_data (zmq_msg_t *m

实时消息传输协议 RTMP(Real Time Messaging Protocol)

原文: http://blog.csdn.net/defonds/article/details/17403225 译序:本文是维基百科关于 RTMP 的解释, 关于 RTMP 官方规范参见 RTMP 规范,关于 RTMP 官方规范的中文版,参见<Adobe 官方公布的 RTMP 规范>.以下是维基百科原文: 实时消息传输协议(RTMP)最初是由 Macromedia 为互联网上 Flash player 和服务器之间传输音频.视频以及数据流而开发的一个私有协议.Adobe 收购 Macrom

医疗行业多层级复杂网络环境下的消息传输(远程会诊)架构与实现

近期接手一个针对医疗系统远程会诊平台的技术改造工作,这项工作中的一些技术问题颇具代表性,我会在此记录这一工作的过程和技术细节,如果条件允许,会在 GitHub 上开源部分业务无关的纯技术实现,敬请关注.(https://github.com/iccb1013). 远程会诊平台的应用场景指的是乡镇或县卫生所,在接诊过程中,对疑难问题上报上级医疗机构,由上级医疗机构进行网络诊断并回复诊疗意见,但是这一过程,并不是简单的点对点的关系.主要特点:1)  包含多级机构:乡镇.县.区.市.省.由任意一级向上

ZeroMQ接口函数之 :zmq_msg_init_size - 使用一个指定的空间大小初始化ZMQ消息对象

ZeroMQ 官方地址 :http://api.zeromq.org/4-1:zmq_msg_init_size zmq_msg_init_size(3) ØMQ Manual - ØMQ/3.2.5 Name zmq_msg_init_size - 使用一个指定的空间大小初始化ZMQ消息对象 Synopsis int zmq_msg_init_size (zmq_msg_t *msg, size_t size); Description zmq_msg_init_size()函数会分配任何被请

高仿微信新消息提示音功能

最近公司在做一个项目,有一个切换消息提示音的功能,可以切换本应用收到消息的提示音,而不影响系统提示音.我就按照微信的那个样式进行了编程,最终得到想要的效果. 转载请注明出处,谢谢:http://blog.csdn.net/harryweasley/article/details/46408037 怕有些人不知道怎么进入微信的新消息提示音功能,我这里说下操作步骤: 打开微信----我---设置---新消息提醒---新消息提示音. 经过以上的步骤就进入了这样的界面 这个是微信的效果图. 下面是我自己

ZeroMQ接口函数之 :zmq_msg_init - 初始化一个空的ZMQ消息结构

ZeroMQ 官方地址 :http://api.zeromq.org/4-1:zmq_msg_init zmq_msg_init(3) ØMQ Manual - ØMQ/3.2.5 Name zmq_msg_init - 初始化一个空的ZMQ消息结构 Synopsis int zmq_msg_init (zmq_msg_t *msg); Description zmq_msg_init()函数会将msg参数引用的ZMQ消息对象进行初始化,使其成为一个空消息.在使用zmq_recv()函数接收消息