消息队列的实现

消息队列  ----  双向通信(读取不一定先入先出)

1、 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认

为是有一个类型,接收者进程接收的数据块可以有不同的类型值。我们可以通过发送消息

来避免命名管道的同步和阻塞问题。

消息队列与管道不同的是,消息队列是基于消息的,而管道是基于字节流的,且消息队列的读取不一定是先入先出。

2、消息队列的结构:

struct ipc_perm 
{
    key_t      _key;         /* Key supplied to xxxget(2) */
    uid_t      uid;          /* Effective UID of owner */
    gid_t      gid;          /* Effective GID of owner */
    uid_t      cuid;         /* Effective UID of creator */
    gid_t      cgid;         /* Effective GID of creator */
    unsigned short mode;     /* Permissions */
    unsigned short _seq;     /* Sequence number */
};

3、消息队列的实现:

头文件:

#include<sys/types.h>

#include<sys/ipc.h>

#include<sys/msg.h>

--------------------------------------------------------------------

(1)创建 新 消息队列 或 取得 已存在的消息队列

 int msgget(key_t key, int msgflg);//创建新消息队列或取得已存在消息队列

key:可以认为是一个端口号,也可以由函数ftok()生成。

msgflg:

IPC_CREAT  如果IPC不存在,则创建一个IPC资源,否则打开操作。

IPC_EXCL:只有在共享内存不存在的时候,新的共享内存才建立,否则就产生错误。

(2)向 队列 读/写 消息

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);//从队列中读消息
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);//将数据写入消息队列中

msgflg:指明核心程序在队列没有数据的情况下所应采取的行动。

如果msgflg和常数IPC_NOWAIT合用,则:1.在msgsnd()执行时若是消息队列已满,则msgsnd()将不会阻塞,而会立即返回-1;2.如果执行的是msgrcv(),则在消息队列呈空时,不做等待马上返回-1,并设定错误码为ENOMSG。

当msgflg为0时,msgsnd()及msgrcv()在队列呈满或呈空的情形时,采取

阻塞等待的处理模式。

(3)设置 消息队列 属性

int msgctl ( int msgqid, int cmd, struct msqid_ds *buf );//设置消息队列属性

 cmd 操作: IPC_STAT , IPC_SET , IPC_RMID

IPC_STAT : 获取消息队列对应的 msqid_ds 数据结构,并将其保存到 buf 指定的地址空间。

IPC_SET : 设置消息队列的属性,要设置的属性存储在buf中。

IPC_RMID : 从内核中删除 msqid 标识的消息队列。

4、实例:client server 双向通信

//comm.h

#ifndef _MSG_
#define _MSG_

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<errno.h>

#define _PATH_ "."
#define _PROC_ID_ 0x6666

#define _CLIENT_ID_ 1
#define _SERVER_ID_ 2
#define _BLOCK_SIZE_ 256
struct msg_buf
{
	long mtype;
	char mtext[_BLOCK_SIZE_];
};
int comm_msg_queue(int flag);
int create_msg_queue();
int get_msg_queue();
int msg_send(int msg_id,int _mtype,const char *_info);
int msg_recv(int msg_id,int recv_type,char buf[]);
int destroy_msg_queue(int msg_id);

#endif

//comm.c

#include"comm.h"

int comm_msg_queue(int flag)
{
	key_t _key=ftok(_PATH_,_PROC_ID_);
	if(_key < 0)
	{
		perror("ftok");
		return -1;
	}
	int msg_id=msgget(_key,flag);
	if(msg_id < 0)
	{
		perror("msgget");
		return -1;
	}
	return msg_id;
}
int create_msg_queue()
{
	int flag = (IPC_CREAT |IPC_EXCL |0666); //0666 :set permission 
	return comm_msg_queue(flag);
}

int get_msg_queue()
{
	int flag = IPC_CREAT;
	return comm_msg_queue(flag);
}

int msg_send(int msg_id,int _mtype,const char *_info)
{
	struct msg_buf _msg;
	_msg.mtype=_mtype;
	memset((void*)_msg.mtype,‘\0‘,sizeof(_msg.mtype));
	strcpy(_msg.mtext,_info);
	if(msgsnd(msg_id,&_msg,sizeof(_msg.mtext),IPC_NOWAIT) < 0)//non-blocking wait
	{
		perror("msgsnd");
		return -1;
	} 
	return 0;
}

int msg_recv(int msg_id,int recv_type,char buf[])
{
	struct msg_buf _msg;
	if(msgrcv(msg_id,&_msg,sizeof(_msg.mtext),recv_type,0) < 0)//blocking wait
	{
		perror("msgrcv");
		return -1;
	}
	strcpy(buf,_msg.mtext);
	return 0;
}

int destroy_msg_queue(int msg_id)
{
	if(msgctl(msg_id,IPC_RMID,NULL) < 0)//release
	{
		perror("msgctl");
		return -1;
	}
	else
	{
		printf("remove msg queue done ...\n");
	}
	return 0;
}

//server.c

#include"comm.h"

int main()
{
	int queue_id=create_msg_queue();
	if(queue_id < 0)
	{
		perror("create_msg_queue");
		exit(1);
	}
	char buf[_BLOCK_SIZE_];
	while(1)
	{
		memset(buf,‘\0‘,sizeof(buf));
		msg_recv(queue_id,_CLIENT_ID_,buf);//server receives info from client
		printf("Client #%s\n",buf);
		printf("Please Enter:");
		fflush(stdout);
		fgets(buf,sizeof(buf),stdin);
		msg_send(queue_id,_SERVER_ID_,buf);//server sends info to client
	}
	destroy_msg_queue(queue_id);
	return 0;
}

//client.c

#include"comm.h"

int main()
{
	int queue_id=get_msg_queue();
	if(queue_id < 0)
	{
		perror("get_msg_queue");
		exit(1);
	}
	char buf[_BLOCK_SIZE_];
	while(1)
	{
		printf("Please Enter:");//input info
		fflush(stdout);
		fgets(buf,sizeof(buf),stdin);
		msg_send(queue_id,_CLIENT_ID_,buf);//client sends info to server
		msg_recv(queue_id,_SERVER_ID_,buf);//client receives info from server
		printf("Server #%s\n",buf);
	}
	destroy_msg_queue(queue_id);
	return 0;
}
时间: 2024-10-08 03:47:47

消息队列的实现的相关文章

Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执

上篇博文中我们介绍了Azure Messaging的重复消息机制.At most once 和At least once. Azure Messaging-ServiceBus Messaging消息队列技术系列5-重复消息:at-least-once at-most-once 本文中我们主要研究并介绍Azure Messaging的消息回执机制:实际应用场景: 同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站

【转】MSMQ 微软消息队列 简单 示例

MSMQ它的实现原理是:消息的发送者把自己想要发送的信息放入一个容器中(我们称之为Message),然后把它保存至一个系统公用空间的消息队列(Message Queue)中:本地或者是异地的消息接收程序再从该队列中取出发给它的消息进行处理. 我个人的理解,你可以把他当做一种,把数据打包后,发送到一个地方,程序也可以去取到这个打包的程序,队列的机制就不讲了,并发问题荡然无存.呵呵. 上代码: 首先 using System.Messaging; public class MsmqManagerHe

消息队列(msg)

一.消息队列:从一个进程向另一个进程发送数据块,读取不一定是先入先出. 管道与消息队列区别:管道基于字节流的,消息队列基于消息: 管道只能发送字符串,消息队列有类型: 管道随进程,消息队列随内核. 二.创建函数原型:int msgget(key_t key, int msgflg);    //key由ftok生成,IPC_CREAT|IPC_EXCL 接收消息:ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, in

第15章 进程间通行 15.6 XSI IPC 15.7 消息队列

15.6 XSI IPC (1)3种称作XSI IPC的IPC是: 1)消息队列 2)信号量 3)共享存储器 (2)标识符和键 1)标识符:是一个非负整数,用于引用IPC结构.是IPC对象的内部名. 2)键:IPC对象的外部名.可使多个合作进程能够在同一IPC对象上汇聚. (3)IPC_PRIVATE键: 用于创建一个新的IPC结构.不能指定此键来引用一个现有的IPC结构. (4)ftok函数: 由一个路径名和项目ID产生一个键. (5)ipc_perm结构体 规定了ipc结构的权限和所有者.

XSI进程间通信-----消息队列

1. 基本特点 1) 消息队列是一个由系统内核负责存储和管理,并通过消息队列标识引用的数据链表,消息队列 和有名管道fifo的区别在: 后者一次只能放一个包,而前者则可以放很多包,这样就能处理发包快,哪包慢的问题 2) 可以通过msgget函数创建一个新的消息队列, 或获取一个已有的消息队列. 通过msgsnd函数 (send)向消息队列的后端追加消息, 通过msgrcv(receive)函数从消息队列的前端提取消息. 3) 消息队列中的每个消息单元除包含消息数据外,还包含消息类型和数据长度.消

android 中使用View的消息队列api更新数据

基本上只要继承自View的控件,都具有消息队列或者handler的一些处理方法,下面是一些handler方法以及被View封装了的方法,其底层用的基本都是handler的api. 我么开一下postDelay的定义 android.view.View  public boolean postDelayed(Runnable action, long delayMillis) {         final AttachInfo attachInfo = mAttachInfo;         

消息队列实现订单异步提交

what MSMQ(Microsoft Message Queue),微软消息队列,用于应用程序之间相互通信的一种异步传输模式.应用程序可以分布在同台机器上,也可以分布于互联的网络中的任意位置.基本原理:消息发送者把要发送的消息放入容器,也就是Message(消息),然后保存到系统公用空间的消息队列中(Message Queue)中,本地或互联位置上的消息接收程序再从队列中取出发给它的消息进行处理.消息类型可以是文本,图像,自定义对象等.消息队列分为公共队列和私有队列. why 一.用于进程间的

Windows消息队列

一 Windows中有一个系统消息队列,对于每一个正在执行的Windows应用程序,系统为其建立一个"消息队列",即应用程序队列,用来存放该程序可能 创建的各种窗口的消息.应用程序中含有一段称作"消息循环"的代码,用来从消息队列中检索这些消息并把它们分发到相应的窗口函数中.  二 Windows为当前执行的每个Windows程序维护一个「消息队列」.在发生输入事件之后,Windows将事件转换为一个「消息」并将消息放入程序的消息队列中.程序通过执行一块称之为「消息循

消息队列编程

消息队列:就是一个消息的链表.而一条消息则可看作一个记录,具有特定的格式.进程可以向中按照一定的规则添加新消息:另一些进程则可以从消息队列中读走消息 发送消息队列: #include<sys/types.h>#include<sys/msg.h>#include<sys/ipc.h>#include<stdio.h> struct msgt{ long msgtype; char msgtext[1024]; };int msg_type;char str[

Freertos-事件标志组,消息队列,信号量,二值信号量,互斥信号量

任务间的通信和同步机制  在裸机编程时,使用全局变量的确比较方便,但是在加上 RTOS 后就是另一种情况了. 使用全局变量相比事件标志组主要有如下三个问题: 1.使用事件标志组可以让 RTOS 内核有效地管理任务,而全局变量是无法做到的,任务的超时等机制需要用户自己去实现.2.使用了全局变量就要防止多任务的访问冲突,而使用事件标志组则处理好了这个问题,用户无需担心.3.使用事件标志组可以有效地解决中断服务程序和任务之间的同步问题. 事件标志组:事件标志组是实现多任务同步的有效机制之一. 每创建一