消息队列库——ZeroMQ

消息队列库——ZeroMQ

ZeroMQ(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。

ZMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。

ZMQ不是单独的服务,而是一个嵌入式库,它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。

主线程与I/O线程:

I/O线程,ZMQ根据用户调用zmq_init函数时传入的参数,创建对应数量的I/O线程。每个I/O线程都有与之绑定的Poller,Poller采用经典的Reactor模式实现。

Poller根据不同操作系统平台使用不同的网络I/O模型(select、poll、epoll、devpoll、kequeue等),所有的I/O操作都是异步的,线程不会被阻塞。。

主线程与I/O线程通过Mail Box传递消息来进行通信。

Server,在主线程创建zmq_listener,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_listener添加到Poller中用以侦听读事件。

Client,在主线程中创建zmq_connecter,通过Mail Box发消息的形式将其绑定到I/O线程,I/O线程把zmq_connecter添加到Poller中用以侦听写事件。

Client与Server第一次通信时,会创建zmq_init来发送identity,用以进行认证。认证结束后,双方会为此次连接创建Session,以后双方就通过Session进行通信。

每个Session都会关联到相应的读/写管道, 主线程收发消息只是分别从管道中读/写数据。Session并不实际跟kernel交换I/O数据,而是通过plugin到Session中的Engine来与kernel交换I/O数据。

ZMQ将消息通信分成4种模型:

  1. 一对一结对模型(Exclusive-Pair),可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。数据可以双向流动,这点不同于后面的请求回应模型。
  2. 请求回应模型(Request-Reply),由Client发起请求,并由Server响应,跟一对一结对模型的区别在于可以有多个Client。
  3. 发布订阅模型(Publish-Subscribe),Publish端单向分发数据,且不关心是否把全部信息发送给Subscribe端。如果Publish端开始发布信息时,Subscribe端尚未连接进来,则这些信息会被直接丢弃。Subscribe端只能接收,不能反馈,且在Subscribe端消费速度慢于Publish端的情况下,会在Subscribe端堆积数据。
  4. 管道模型(Push-Pull),从 PUSH 端单向的向 PULL 端单向的推送数据流。如果有多个PULL端同时连接到PUSH端,则PUSH端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到PULL端上。与发布订阅模型相比,管道模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。

这4种模型总结出了通用的网络通信模型,在实际中可以根据应用需要,组合其中的2种或多种模型来形成自己的解决方案。

ZMQ提供进程内(inproc://)、进程间(ipc://)、机器间(tcp://)、广播(pgm://)等四种通信协议。


ZMQ API

ZMQ提供的所有API均以zmq_开头,

#include <zmq.h>

gcc [flags] files -lzmq [libraries]

例如,返回当前ZMQ库的版本信息

void zmq_version (int *major, int *minor, int *patch);

Context

在使用任何ZQM库函数之前,必须首先创建ZMQ context(上下文),程序终止时,也需要销毁context。

创建context

void *zmq_ctx_new ();

ZMQ context是线程安全的,可以在多线程环境使用,而不需要程序员对其加/解锁。

在一个进程中,可以有多个ZMQ context并存。

设置context选项

int zmq_ctx_set (void *context, int option_name, int option_value);
int zmq_ctx_get (void *context, int option_name);

销毁context

int zmq_ctx_term (void *context);

Sockets

ZMQ Sockets 是代表异步消息队列的一个抽象,注意,这里的ZMQ socket和POSIX套接字的socket不是一回事,ZMQ封装了物理连接的底层细节,对用户不透明。

传统的POSIX套接字只能支持1对1的连接,而ZMQ socket支持多个Client的并发连接,甚至在没有任何对端(peer)的情况下,ZMQ sockets上也能放入消息;

ZMQ sockets不是线程安全的,因此,不要在多个线程中并行操作同一个sockets。

创建ZMQ  Sockets

void *zmq_socket (void *context, int type);

注意,ZMQ socket在bind之前还不能使用。

type参数含义

pattern


type


description


一对一结对模型


ZMQ_PAIR


请求回应模型


ZMQ_REQ


client端使用


ZMQ_REP


server端使用


ZMQ_DEALER


将消息以轮询的方式分发给所有对端(peers)


ZMQ_ROUTER


发布订阅模型


ZMQ_PUB


publisher端使用


ZMQ_XPUB


ZMQ_SUB


subscriber端使用


ZMQ_XSUB


管道模型


ZMQ_PUSH


push端使用


ZMQ_PULL


pull端使用


原生模型


ZMQ_STREAM

设置socket选项

int zmq_getsockopt (void *socket, int option_name, void *option_value, size_t *option_len);
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);

关闭socket

int zmq_close (void *socket);

创建一个消息流

int zmq_bind (void *socket, const char *endpoint);
int zmq_connect (void *socket, const char *endpoint);

bind函数是将socket绑定到本地的端点(endpoint),而connect函数连接到指定的peer端点。

endpoint支持的类型:


transports


description


uri example


zmp_tcp


TCP的单播通信


tcp://*:8080


zmp_ipc


本地进程间通信


ipc://


zmp_inproc


本地线程间通信


inproc://


zmp_pgm


PGM广播通信


pgm://

收发消息

int zmq_send (void *socket, void *buf, size_t len, int flags);
int zmq_recv (void *socket, void *buf, size_t len, int flags);
int zmq_send_const (void *socket, void *buf, size_t len, int flags);

zmq_recv()函数的len参数指定接收buf的最大长度,超出部分会被截断,函数返回的值是接收到的字节数,返回-1表示出错;

zmq_send()函数将指定buf的指定长度len的字节写入队列,函数返回值是发送的字节数,返回-1表示出错;

zmq_send_const()函数表示发送的buf是一个常量内存区(constant-memory),这块内存不需要复制、释放。

socket事件监控

int zmq_socket_monitor (void *socket, char * *addr, int events);

zmq_socket_monitor()函数会生成一对sockets,publishers端通过inproc://协议发布 sockets状态改变的events;
消息包含2帧,第1帧包含events id和关联值,第2帧表示受影响的endpoint。

监控支持的events:

ZMQ_EVENT_CONNECTED: 建立连接
ZMQ_EVENT_CONNECT_DELAYED: 连接失败
ZMQ_EVENT_CONNECT_RETRIED: 异步连接/重连
ZMQ_EVENT_LISTENING: bind到端点
ZMQ_EVENT_BIND_FAILED: bind失败
ZMQ_EVENT_ACCEPTED: 接收请求
ZMQ_EVENT_ACCEPT_FAILED: 接收请求失败
ZMQ_EVENT_CLOSED: 关闭连接
ZMQ_EVENT_CLOSE_FAILED: 关闭连接失败
ZMQ_EVENT_DISCONNECTED: 会话(tcp/ipc)中断

I/O多路复用

int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);

对sockets集合的I/O多路复用,使用水平触发。

与epoll类似,items参数指定一个结构体数组(结构体定义如下),nitems指定数组的元素个数,timeout参数是超时时间(单位:ms,0表示不等待立即返回,-1表示阻塞等待)。

typedef struct
{
    void *socket;
    int fd;
    short events;
    short revents;
} zmq_pollitem_t;

对于每个zmq_pollitem_t元素,ZMQ会同时检查其socket(ZMQ套接字)和fd(原生套接字)上是否有指定的events发生,且ZMQ套接字优先。

events指定该sockets需要关注的事件,revents返回该sockets已发生的事件,它们的取值为:

  • ZMQ_POLLIN,可读;
  • ZMQ_POLLOUT,可写;
  • ZMQ_POLLERR,出错;

Messages

一个ZMQ消息就是一个用于在消息队列(进程内部或跨进程)中进行传输的数据单元,ZMQ消息本身没有数据结构,因此支持任意类型的数据,这完全依赖于程序员如何定义消息的数据结构。

一条ZMQ消息可以包含多个消息片(multi-part messages),每个消息片都是一个独立zmq_msg_t结构。

ZMQ保证以原子方式传递消息,要么所有消息片都发送成功,要么都不成功。

初始化消息

typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);

zmq_msg_init()函数初始化一个消息对象zmq_msg_t ,不要直接访问zmq_msg_t对象,可以通过zmq_msg_* 函数来访问它。
zmq_msg_init()、zmq_msg_init_data()、zmq_msg_init_size() 三个函数是互斥的,每次使用其中一个即可。

设置消息属性

int zmq_msg_get (zmq_msg_t *message, int property);
int zmq_msg_set (zmq_msg_t *message, int property, int value);

释放消息

int zmq_msg_close (zmq_msg_t *msg);

收发消息

int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags);
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);

其中,flags参数如下:

ZMQ_DONTWAIT,非阻塞模式,如果没有可用的消息,将errno设置为EAGAIN;
ZMQ_SNDMORE,发送multi-part messages时,除了最后一个消息片外,其它每个消息片都必须使用 ZMQ_SNDMORE 标记位。

获取消息内容

void *zmq_msg_data (zmq_msg_t *msg);
int zmq_msg_more (zmq_msg_t *message);
size_t zmq_msg_size (zmq_msg_t *msg);

zmq_msg_data()返回指向消息对象所带内容的指针;
zmq_msg_size()返回消息的字节数;
zmq_msg_more()标识该消息片是否是整个消息的一部分,是否还有更多的消息片待接收;

控制消息

int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);

zmq_msg_copy()函数实现的是浅拷贝;
zmq_msg_move()函数中,将dst指向src消息,然后src被置空。

eg,接收消息的代码示例:

zmq_msg_t part;
while (true) {
    //  Create an empty ?MQ message to hold the message part
    int rc = zmq_msg_init (&part);
    assert (rc == 0);
    //  Block until a message is available to be received from socket
    rc = zmq_msg_recv (socket, &part, 0);
    assert (rc != -1);
    if (zmq_msg_more (&part))
        fprintf (stderr, "more\n");
    else {
        fprintf (stderr, "end\n");
        break;
    }
    zmq_msg_close (&part);
}

代理

ZMQ提供代理功能,代理可以在前端socket和后端socket之间转发消息。

int zmq_proxy (const void *frontend, const void *backend, const void *capture);
int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);

共享队列(shared queue),前端是ZMQ_ROUTER socket,后端是ZMQ_DEALER socket,proxy会把clients发来的请求,公平地分发给services;
转发队列(forwarded),前端是ZMQ_XSUB socket, 后端是ZMQ_XPUB socket, proxy会把从publishers收到的消息转发给所有的subscribers;
流(streamer),前端是ZMQ_PULL socket, 后端是ZMQ_PUSH socket.

proxy使用的一个示例:

//  Create frontend and backend sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);

//  Bind both sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);

//  Start the queue proxy, which runs until ETERM zmq_proxy frontend, backend, NULL);

错误处理

ZMQ库使用POSIX处理函数错误,返回NULL指针或者负数时表示调用出错。

int zmq_errno (void);
const char *zmq_strerror (int errnum);

zmq_errno()函数返回当前线程的错误码errno变量的值;

zmq_strerror()函数将错误映射成错误字符串。

加密传输

ZQM可以为IPC和TCP连接提供安全机制:

  • 不加密,zmq_null
  • 使用用户名/密码授权,zmq_plain
  • 椭圆加密,zmq_curve

这些通过 zmq_setsockopt()函数设置socket选项的时候配置。

总结:

1、仅仅提供24个API接口,风格类似于BSD Socket。

2、处理了网络异常,包括连接异常中断、重连等。

3、改变TCP基于字节流收发数据的方式,处理了粘包、半包等问题,以msg为单位收发数据,结合Protocol Buffers,可以对应用层彻底屏蔽网络通信层。

4、对大数据通过SENDMORE/RECVMORE提供分包收发机制。

5、通过线程间数据流动来保证同一时刻任何数据都只会被一个线程持有,以此实现多线程的“去锁化”。

6、通过高水位HWM来控制流量,用交换SWAP来转储内存数据,弥补HWM丢失数据的缺陷。

7、服务器端和客户端的启动没有先后顺序。

时间: 2024-11-14 04:53:55

消息队列库——ZeroMQ的相关文章

消息通信库ZeroMQ 4.0.4安装指南

消息通信库ZeroMQ 4.0.4安装指南 一.ZeroMQ介绍 ZeroMQ是一个开源的消息队列系统,按照官方的定义,它是一个消息通信库,帮助开发者设计分布式和并行的应用程序. 首先,我们需要明白,ZeroMQ不是传统的消息队列系统(比如ActiveMQ.WebSphereMQ.RabbitMQ等).ZeroMQ可以帮助我们建立自己的消息队列系统,它只是一个库.ZeroMQ可以运行于带x86处理器或ARM处理器的机器上,支持40多种编程语言. 消息队列,从技术的角度来讲,是以先进先出FIFO算

消息队列ZeroMq的安装和调试

最近几天,开发到进程间通信,找来找去,感觉最合适的还是消息队列,下面是有关消息队列zeromq的安装调试方法. 一.ZeroMQ 介绍 引用官方的说法: "ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 socket library,他使得 Socket 编程更加简单.简洁和性能更高.是一个消息处理队列库,可在多个线程.内核和主机盒之间弹性伸缩.ZMQ 的明确目标是"成为标准网络协议栈的一部分,之后进入 Linux 内核".现在还未看到它

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

Java消息队列总结只需一篇解决ActiveMQ、RabbitMQ、ZeroMQ、Kafka

一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式:2.并行方式 a.串行

Centos zeromq 消息队列 安装

一,什么是zeromq 这是个类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接.销毁连接.选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单.ZMQ用于node与node间的通信,node可以是主机或者是进程. 引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样

消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka). 四.JMS消息服务 讲消息队列就不得不提JMS .JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 在EJB架构中,有消息bean可以无缝的与JM消息服务集成.在J2EE架构模

大型网站架构系列:消息队列

出处:ITFLY8 网址:http://www.cnblogs.com/itfly8/p/5156155.html 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异

大型网站架构系列:消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

消息队列一览

谈谈互联网后端基础设施 http://www.rowkey.me/blog/2016/08/27/server-basic-tech-stack/ service之间的调用方式可以分为同步调用以及异步调用.异步调用是怎么进行的?一种很常见的方式就是使用消息队列,调用方把请求放到队列中即可返回,然后等待服务提供方去队列中去获取请求进行处理,然后把结果返回给调用方即可(可以通过回调). 异步调用就是消息中间件一个非常常见的应用场景.此外,消息队列的应用场景还有以下: 解耦:一个事务,只关心核心的流程