网络通信 --> 消息队列

消息队列

  消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。

  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构。可以通过发送消息来避免命名管道的同步和阻塞问题。但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制。Linux用宏MSGMAX和MSGMNB来限制一条消息的最大长度和一个队列的最大长度。

msgget函数

  该函数用来创建和访问一个消息队列。

int msgget(key_t, key, int msgflg); 

  key: 与其他的IPC机制一样,程序必须提供一个键来命名某个特定的消息队列。

  msgflg: 是一个权限标志,表示消息队列的访问权限,它与文件的访问权限一样。msgflg可以与IPC_CREAT做或操作,表示当key所命名的消息队列不存在时创建一个消息队列,如果key所命名的消息队列存在时,IPC_CREAT标志会被忽略,而只返回一个标识符。

  它返回一个以key命名的消息队列的标识符(非零整数),失败时返回-1.

msgsnd函数

  该函数用来把消息添加到消息队列中。它的原型为:

int msgsend(int msgid, const void *msg_ptr, size_t msg_sz, int msgflg);

  msgid:是由msgget函数返回的消息队列标识符。

  msg_ptr:是一个指向准备发送消息的指针,但是消息的数据结构却有一定的要求,指针msg_ptr所指向的消息结构一定要是以一个长整型成员变量开始的结构体,接收函数将用这个成员来确定消息的类型。

  msg_sz:是msg_ptr指向的消息的长度,而不是整个结构体的长度,也就是说msg_sz是不包括长整型消息类型成员变量的长度。

  msgflg:用于控制当前消息队列满或队列消息到达系统范围的限制时将要发生的事情。

  消息结构要定义成这样:

    struct my_message{
        long int message_type;
        /* The data you wish to transfer*/
    };  

  如果调用成功,消息数据的一分副本将被放到消息队列中,并返回0,失败时返回-1.

msgrcv函数

  该函数用来从一个消息队列获取消息

int msgrcv(int msgid, void *msg_ptr, size_t msg_st, long int msgtype, int msgflg); 

  msgid, msg_ptr, msg_st的作用也函数msgsnd函数的一样。

  msgtype:可以实现一种简单的接收优先级。如果msgtype为0,就获取队列中的第一个消息。如果它的值大于零,将获取具有相同消息类型的第一个信息。如果它小于零,就获取类型等于或小于msgtype的绝对值的第一个消息。

  msgflg:用于控制当队列中没有相应类型的消息可以接收时将发生的事情。

  调用成功时,该函数返回放到接收缓存区中的字节数,消息被复制到由msg_ptr指向的用户分配的缓存区中,然后删除消息队列中的对应消息。失败时返回-1.

msgctl函数

  该函数用来控制消息队列,它与共享内存的shmctl函数相似

int msgctl(int msgid, int command, struct msgid_ds *buf); 

  command:是将要采取的动作,它可以取3个值,

     IPC_STAT:把msgid_ds结构中的数据设置为消息队列的当前关联值,即用消息队列的当前关联值覆盖msgid_ds的值。

    IPC_SET:如果进程有足够的权限,就把消息列队的当前关联值设置为msgid_ds结构中给出的值

    IPC_RMID:删除消息队列

  buf:是指向msgid_ds结构的指针,它指向消息队列模式和访问权限的结构。

msgid_ds结构至少包括以下成员:

struct msgid_ds
{
    uid_t shm_perm.uid;
    uid_t shm_perm.gid;
    mode_t shm_perm.mode;
}; 

成功时返回0,失败时返回-1.

示例

接收信息:

    #include <unistd.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <errno.h>
    #include <sys/msg.h>  

    struct msg_st
    {
        long int msg_type;
        char text[BUFSIZ];
    };  

    int main()
    {
        int running = 1;
        int msgid = -1;
        struct msg_st data;
        long int msgtype = 0; //注意1  

        //建立消息队列
        msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
        if(msgid == -1)
        {
            fprintf(stderr, "msgget failed with error: %d\n", errno);
            exit(EXIT_FAILURE);
        }
        //从队列中获取消息,直到遇到end消息为止
        while(running)
        {
            if(msgrcv(msgid, (void*)&data, BUFSIZ, msgtype, 0) == -1)
            {
                fprintf(stderr, "msgrcv failed with errno: %d\n", errno);
                exit(EXIT_FAILURE);
            }
            printf("You wrote: %s\n",data.text);
            //遇到end结束
            if(strncmp(data.text, "end", 3) == 0)
                running = 0;
        }
        //删除消息队列
        if(msgctl(msgid, IPC_RMID, 0) == -1)
        {
            fprintf(stderr, "msgctl(IPC_RMID) failed\n");
            exit(EXIT_FAILURE);
        }
        exit(EXIT_SUCCESS);
    }  

发送信息:

    #include <unistd.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <string.h>
    #include <sys/msg.h>
    #include <errno.h>  

    #define MAX_TEXT 512
    struct msg_st
    {
        long int msg_type;
        char text[MAX_TEXT];
    };  

    int main()
    {
        int running = 1;
        struct msg_st data;
        char buffer[BUFSIZ];
        int msgid = -1;  

        //建立消息队列
        msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
        if(msgid == -1)
        {
            fprintf(stderr, "msgget failed with error: %d\n", errno);
            exit(EXIT_FAILURE);
        }  

        //向消息队列中写消息,直到写入end
        while(running)
        {
            //输入数据
            printf("Enter some text: ");
            fgets(buffer, BUFSIZ, stdin);
            data.msg_type = 1;    //注意2
            strcpy(data.text, buffer);
            //向队列发送数据
            if(msgsnd(msgid, (void*)&data, MAX_TEXT, 0) == -1)
            {
                fprintf(stderr, "msgsnd failed\n");
                exit(EXIT_FAILURE);
            }
            //输入end结束输入
            if(strncmp(buffer, "end", 3) == 0)
                running = 0;
            sleep(1);
        }
        exit(EXIT_SUCCESS);
    } 

结果:

?  jiangtf  ./send
Enter some text: abs
You wrote: abs

Enter some text: tjdf
You wrote: tjdf

Enter some text: end
You wrote: end

[1]  + 59241 done       ./a.out

消息类型详解

  注意接收文件中的变量msgtype(注释为注意1),它作为 msgrcv函数的接收信息类型参数的值,其值为0,表示获取队列中第一个可用的消息。再来看看发送文件中while循环中的语句 data.msg_type = 1(注释为注意2),它用来设置发送的信息的信息类型,即其发送的信息的类型为1。所以程序msgreceive能够接收到程序msgsend发送的信息。

  如果把注意1,即long int msgtype = 0;改变为long int msgtype = 2;msgreceive将不能接收到程序msgsend发送的信息。因为在调用msgrcv函数时,如果msgtype(第四个参数)大于零,则将只获取具有相同消息类型的第一个消息,修改后获取的消息类型为2,而msgsend发送的消息类型为1,所以不能被msgreceive程序 接收。重新编译msgreceive.c文件并再次执行,其结果如下:

?  jiangtf  ./send
Enter some text: asd
Enter some text: intad
Enter some text: end

消息队列与命名管道的区别

  在命名管道中,发送数据用write,接收数据用read,则在消息队列中,发送数据用msgsnd,接收数据用msgrcv。而且它们对每个数据都有一个最大长度的限制。与命名管道相比,消息队列的优势在于:

  1、消息队列也可以独立于发送和接收进程而存在,从而消除了在同步命名管道的打开和关闭时可能产生的困难。

  2、同时通过发送消息还可以避免命名管道的同步和阻塞问题,不需要由进程自己来提供同步方法。

  3、接收程序可以通过消息类型有选择地接收数据,而不是 像命名管道中那样,只能默认地接收。

参考:http://www.cnblogs.com/lidabo/p/4323807.html

时间: 2024-12-15 01:53:59

网络通信 --> 消息队列的相关文章

消息队列库——ZeroMQ

消息队列库--ZeroMQ ZeroMQ(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型.连接处理.帧.甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字. ZMQ是网络通信中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间. ZMQ不是单独的服务,而是一个嵌入式库,它封装了网络通信.消息队列.线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信. 主线程与I/O线程: I

对分布式事务、消息队列的重新认识

本质上问题可以抽象为:当一个表数据更新后,怎么保证另一个表的数据也必须要更新成功.若两张表在同一个数据库实例中,则使用本地事务就好了.否则可以采用分布式事务,或者消息队列. 前阵子从支付宝转账1万块钱到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了. 上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品

分布式事务消息队列的应用

前阵子从支付宝转账1万块钱到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加1万,数据就会出现不一致状况了. 上述场景在各个类型的系统中都能找到相似影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量必须减1吧,怎么保证?!在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费吧,怎么保证?!等等,相信大家或

Netty构建分布式消息队列实现原理浅析

在本人的上一篇博客文章:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇 中,重点向大家介绍了AvatarMQ主要构成模块以及目前存在的优缺点.最后以一个生产者.消费者传递消息的例子,具体演示了AvatarMQ所具备的基本消息路由功能.而本文的写作目的,是想从开发.设计的角度,简单的对如何使用Netty,构建分布式消息队列背后的技术细节.原理,进行一下简单的分析和说明. 首先,在一个企业级的架构应用中,究竟何时需引入消息队列呢?本人认为,最经常的情况,无非这几种:做业务解耦.事件

(转)如何用消息队列来代替分布式事务

转自:http://blog.csdn.net/ss123465/article/details/7964184 由于数据量的巨大,大部分Web应用都需要部署很多个数据库实例.这样,有些用户操作就可能需要去修改多个数据库实例中的数据.传统的解决方法是使用分布式事务保证数据的全局一致性,经典的方法是使用两阶段提交协议. 长期以来,分布式事务提供的优雅的全局ACID保证麻醉了应用开发者的心灵,很多人都不敢越雷池一步,想像没有分布式事务的世界会是怎样.如今就如MySQL和PostgreSQL这类面向低

[转]【收藏】用消息队列和消息应用状态表来消除分布式事务

真正有了这样的需求,可以借鉴. 转自:http://csrd.aliapp.com/?p=671 由于数据量的巨大,大部分Web应用都需要部署很多个数据库实例.这样,有些用户操作就可能需要去修改多个数据库实例中的数据.传统的解决方法是使用分布式事务保证数据的全局一致性,经典的方法是使用两阶段提交协议. 长期以来,分布式事务提供的优雅的全局ACID保证麻醉了应用开发者的心灵,很多人都不敢越雷池一步,想像没有分布式事务的世界会是怎样.如今就如MySQL和PostgreSQL这类面向低端用户的开源数据

系统学习消息队列分享(一) 怎样系统学习消息队列?

从系统之间有通信需求开始呢,就产生了消息队列,它也是最古老的中间件之一.它的应用场景非常广泛,分布式系统中的很多进程间通信问题,都可以用消息队列来解决.可以说消息队列是所有后端程序员的必备技能.但是,想要系统.深入地学习消息队列,却并不容易. 要了解消息队列的完整知识体系,想深度进阶为消息队列达人,从理论到实践,从基础到进阶,从深度到广度,全方位吃透消息队列. 哪些人适合学消息队列? 后端开发者:消息队列几乎是每个后端程序员都会用到的中间件,无论你是开发微服务,实时计算,还是机器学习程序,都需要

消息队列高手课

<消息队列高手课>从源码角度全面解析 MQ 的设计与实现 消息队列中间件的使用并不复杂,但如果你对消息队列不熟悉,很难构建出健壮.稳定并且高性能的企业级系统,你会面临很多实际问题: 如何选择最适合系统的消息队列产品? 如何保证消息不重复.不丢失? 如何做到水平扩展? 如果你掌握了消息队列的底层技术,无论使用哪种消息队列产品,你都可以从原理层面来分析问题,再简单看一下它的 API 和相关配置项,就能很快知道该如何配置消息队列,写出高性能并且可靠的程序. 1.基础篇 以讲解消息队列的使用方法和最佳

使用消息队列规避分布式事务问题

前阵子从支付宝转账10000元到余额宝,这是日常生活的一件普通小事,但作为互联网研发人员的职业病,我就思考支付宝扣除1万之后,如果系统挂掉怎么办,这时余额宝账户并没有增加10000,数据就会出现不一致状况了.这样的场景在各个类型的系统中都能找到相似的影子,比如在电商系统中,当有用户下单后,除了在订单表插入一条记录外,对应商品表的这个商品数量也必须减1:在搜索广告系统中,当用户点击某广告后,除了在点击事件表中增加一条记录外,还得去商家账户表中找到这个商家并扣除广告费等等,相信大家或多或多少都能碰到