进程间通信-队列



消息队列

消息队列是通过标识来引用,消息队列类似于一个消息链表,通过队列标识来引用,标识通过key来获取msgget(key_t key, int flag)。

创建队列过程:

1.确定队列的key,key可以自定义,也可以利用ftok来获取一个key

①利用ftok来得到一个key值ftok(const char * path, int id)路径必须是一个存在的文件路径,否则一直阻塞在那里id取值范围0-255

②key自定义,然后把key值放在公共头文件里,方便包含该头文件的其他文件使用

③key自定义,然后把key放在一个文件中,不同进程都可以通过访问该文件来访问这个key

2.通过key来创建消息队列并获取队列标识

3.向队列发送数据msgsnd或读取数据msgrcv来进行通信

读取消息队列中的数据是先进先出或者按照设置的读取的数据类型来随机读取

创建消息队列的进程结束,但是消息队列以及队列中的数据还是依然存在,除非显示的删除:

1.由其他进程读或者删除消息队列

2.由系统删除

函数msgget

msgget - get a message queue identifier

获取消息队列标识

函数概述

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

int msgget(key_t key, int msgflg);

返回与key相关的队列的标识,如果key的值为IPC_PRIVATE或者key的值不等于其他已存在的队列的key,而且msgflg指定了IPC_CREAT,则创建一个新的队列

如果msgflg设置为IPC_CREAT|IPC_EXCL,则如果对应的key已经存在于已存在的队列,则创建失败,errno为EEXIST。

在创建队列的时候,参数msgflg最低有效位是定义队列权限的,这些权限位和open的参数mode用法一样除可执行权限不能用

如果一个队列被创建,那么与它相关的结构msqid_ds 会被初始化:

msg_perm.cuid and msg_perm.uid会被设置成引用队列的进程的用户id

msg_perm.cgid and msg_perm.gid会被设置成引用队列的进程的组id

msg_perm.mode低9位与msgflg的低9位对应

msg_qnum, msg_lspid, msg_lrpid, msg_stime and msg_rtime初始化为0

msg_ctime 设为当前时间

msg_qbytes 会初始化为系统限制队列的最大字节数

如果要创建的队列已经存在而且具有读写权限,则检查是否成功可以查看返回值

成功返回队列的id(非负数),失败返回-1

失败则会设置errno,值描述如下:

EACCES  参数key对应的队列已经存在,且调用进程对队列没有权限,也没有CAP_IPC_OWNER(绕过检查系统对对象的权限)

EEXIST  参数key对应的队列已经存在,而且msgflg设置为  IPC_CREAT and IPC_EXCL

ENOMEM  一个新的队列会被创建,但是系统没有更多的内存了

ENOSPC  超过了系统允许创建的队列中最大存储数据(65536)

注意

1.IPC_PRIVATE 类型是key_t,不是一个标志字段,当用于参数key调用时,系统会忽略其他参数除msgflg的低9位的权限位,关于megget调用相关联的系统限制:MSGMNI 系统允许的队列最大个数,与系统相关(linux 限制值可通过/proc/sys/kernel/msgmni修改或获取 31683)

2.在linux2.3.20版本以前,当调用msgget()而队列已被删除时会返回EIDRM

3.创建队列时使用IPC_PRIVATE会有很多不确定因素,建议使用IPC_NEW

#define FUN1_ID			10001
#define FUN2_ID			10002
#define usr_path		"/usr/test/ok/test"
#define usr_id			109870

typedef struct _ipc_data_info_
{
	long	s_type;
	char	s_data[70000];
}ipc_data_info;

int ipc_msg_fun1(pid_t a_id)
{
	key_t				b_msg_key;
	int					b_msg_id;
	int					b_ret, b_loop, b_id;
	char				b_tmp_buf[1024] = {0};
	char			  *	b_tmp_ptr = (char *)malloc(65530);
	ipc_data_info		b_ipc_snd = {0}, b_ipc_rcv = {0};
	struct msqid_ds		b_ipc_buf = {0};

	//msgctl();

	//b_msg_key = ftok(usr_path, usr_id);
	b_msg_key = ftok(usr_path, usr_id);
	if(b_msg_key < 0)
	{
		printf("ftok fail %d, line = %d, fun = %s, file = %s\n", 			b_msg_key, __LINE__, __func__, __LINE__);
		return -1;
	}

	b_msg_id = msgget(b_msg_key, IPC_CREAT|0666);
	if(b_msg_id < 0)
	{
		printf("msgget fail %d, line = %d, fun = %s, file = %s\n", 			b_msg_id, __LINE__, __func__, __LINE__);
		return -1;
	}

	memset(&b_ipc_buf, 0, sizeof(b_ipc_buf));
	msgctl(b_msg_id, IPC_STAT, &b_ipc_buf);

	printf("id = %d, key = %d, limit = %d\n", b_msg_id, b_msg_key, b_ipc_buf.msg_qbytes);
	//msgctl(b_msg_id, IPC_RMID, &b_ipc_buf);

	b_ipc_snd.s_type = FUN1_ID;
	b_ipc_rcv.s_type = FUN2_ID;
	b_loop = 0;
	while(1)
	{
		memset(b_tmp_ptr, 0, sizeof(b_tmp_ptr));
		sprintf(b_tmp_ptr, "test_data %d", b_loop);
		memset(b_ipc_snd.s_data, 0, sizeof(b_ipc_snd.s_data));
		memcpy(b_ipc_snd.s_data, b_tmp_ptr, strlen(b_tmp_ptr));
		b_ret = msgsnd(b_msg_id, &b_ipc_snd, 1024, 0);
		if(b_ret < -1)
		{
			printf("msgsnd fail %d, line = %d, fun = %s, file = %s\n", b_ret, __LINE__, __func__, __FILE__);
			return -1;
		}
		printf("send data success data = %s\n", b_ipc_snd.s_data);
		#if 0
		b_ret = msgrcv(b_msg_id, &b_ipc_rcv, 1024, b_ipc_rcv.s_type, 0);
		if(b_ret < -1)
		{
			printf("msgsnd fail %d, line = %d, fun = %s, file = %s\n", b_ret, __LINE__, __func__, __FILE__);
			return -1;
		}
		printf("recv data = %s\n", b_ipc_rcv.s_data);
		#endif
		b_loop++;
		if(b_loop > 10000)
		{
			b_loop = 0;
		}
		sleep(2);
	}

	return 0;
}

int ipc_msg_fun2(pid_t a_id)
{
	key_t				b_msg_key;
	int					b_msg_id;
	int					b_ret, b_loop;
	char				b_tmp_buf[1024] = {0};
	ipc_data_info		b_ipc_snd = {0}, b_ipc_rcv = {0};

	b_msg_key = ftok(usr_path, usr_id);
	if(b_msg_key < 0)
	{
		printf("ftok fail %d, line = %d, fun = %s, file = %s\n", 			b_msg_key, __LINE__, __func__, __LINE__);
		return -1;
	}

	b_msg_id = msgget(b_msg_key, IPC_CREAT|0666);
	if(b_msg_id < 0)
	{
		printf("msgget fail %d, line = %d, fun = %s, file = %s\n", 			b_msg_id, __LINE__, __func__, __LINE__);
		return -1;
	}

	b_ipc_snd.s_type = FUN2_ID;
	b_ipc_rcv.s_type = FUN1_ID;
	b_loop = 0;
	while(1)
	{
		#if 0
		memset(b_tmp_buf, 0, sizeof(b_tmp_buf));
		sprintf(b_tmp_buf, "test_data1 %d", b_loop);
		memset(b_ipc_snd.s_data, 0, sizeof(b_ipc_snd.s_data));
		memcpy(b_ipc_snd.s_data, b_tmp_buf, strlen(b_tmp_buf));
		b_ret = msgsnd(b_msg_id, &b_ipc_snd, strlen(b_ipc_snd.s_data), 0);
		if(b_ret < -1)
		{
			printf("msgsnd fail %d, line = %d, fun = %s, file = %s\n", b_ret, __LINE__, __func__, __FILE__);
			return -1;
		}
		printf("send data success\n");
		#endif
		memset(&b_ipc_rcv, 0, sizeof(b_ipc_rcv));
		b_ret = msgrcv(b_msg_id, &b_ipc_rcv, 1024, b_ipc_rcv.s_type, 0);
		if(b_ret < -1)
		{
			printf("msgsnd fail %d, line = %d, fun = %s, file = %s\n", b_ret, __LINE__, __func__, __FILE__);
			return -1;
		}
		printf("recv data = %s\n", b_ipc_rcv.s_data);
		#if 0
		b_loop++;
		if(b_loop > 10000)
		{
			b_loop = 0;
		}
		#endif
		sleep(1);
	}

	return 0;
}

int main()
{
	/*============================================*/
	/*队列*/
	int			b_loop = 0, b_ret;
	pid_t		b_id[10];

	b_id[b_loop] = fork();
	if(b_id[b_loop] < 0)
	{
		printf("fork fail %d, line = %d, fun = %s, file = %s\n", b_id, __LINE__, __func__, __FILE__);
		return -1;
	}
	else if(0 == b_id[b_loop])
	{
		/*子进程*/
		ipc_msg_fun1(b_id[b_loop]);
	}
	b_loop++;

	b_id[b_loop] = fork();
	if(b_id[b_loop] < 0)
	{
		printf("fork fail %d, line = %d, fun = %s, file = %s\n", b_id, __LINE__, __func__, __FILE__);
		return -1;
	}
	else if(0 == b_id[b_loop])
	{
		/*子进程*/
		ipc_msg_fun2(b_id[b_loop]);
	}
	b_loop++;

	while(1)
	{
		b_ret = wait(NULL);
		if(b_ret < -1)
		{
			printf("son process over %d, line = %d, fun = %s, file = %s\n", b_ret, __LINE__, __func__, __FILE__);
			break;
		}
	}

	return 0;
	/*============================================*/
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2025-01-05 08:52:29

进程间通信-队列的相关文章

Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型

一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例子 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

multiprocess模块补充

进程间通信--队列和管道(multiprocess.Queue.multiprocess.Pipe) 在电脑的多个进程中,他们之间要想通讯需要使用IPC协议(Inter-Process Communication) Queue([maxsize]) 创建共享的进程队列. 参数 :maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制. 底层队列使用管道和锁定实现. Queue([maxsize]) 创建共享的进程队列.maxsize是队列中允许的最大项数.如果省略此参数,则无大小限制

python之路——进程

阅读目录 理论知识 操作系统背景知识 什么是进程 进程调度 进程的并发与并行 同步\异步\阻塞\非阻塞 进程的创建与结束 在python程序中的进程操作 multiprocess模块 进程的创建和multiprocess.Process 进程同步控制 -- 锁\信号量\事件 (multiprocess.Lock.multiprocess.Semaphore.multiprocess.Event) 进程间通信 -- 队列和管道(multiprocess.Queue.multiprocess.Pip

Python并发编程之进程

一.理论概念 1.定义 进程(Process 也可以称为重量级进程)是程序的一次执行.在每个进程中都有自己的地址空间.内存.数据栈以及记录运行的辅助数据,它是系统进行资源分配和调度的一个独立单位. 2.并行和并发 并行:并行是指多个任务同一时间执行: 并发:是指在资源有限的情况下,两个任务相互交替着使用资源: 3.同步和异常 同步是指多个任务在执行时有一个先后的顺序,必须是一个任务执行完成另外一个任务才能执行: 异步是指多个任务在执行时没有先后顺序,多个任务可以同时执行: 4.同步/异步/阻塞/

铁乐学python_Day39_多进程和multiprocess模块2

锁 -- multiprocess.Lock (进程同步) 之前我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理, 但是它们之间的运行没有顺序,一旦开启也不受我们控制. 尽管并发编程能让我们更加充分的利用IO资源,但是也会带来新的问题. 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题. 遇到数据.安全比速度重要的场景,我们就需要将进程变回受同步控制. 例: #!/usr/bin/env python # _*_ coding: utf-8 _*_ impo

python进程-进阶

进程同步(multiprocessing.Lock(锁机制).multiprocessing.Semaphore(信号量机制).multiprocessing.Event(事件机制)) 在计算机中,有一些硬件和软件,例如处理器.打印机等,都属于竞争类资源,当有需求时,很多进程都要争抢这些资源,而对于这类资源,就属于临界资源.当多进程共同处理某一个数据时,这个数据也就属于一个临界资源.操作系统对计算机内各种资源都使其在竞争中有序化,但是对于数据来说,尤其是用户动态产生的数据,当处理时就变成了临界资

并发编程小结

目录 多道技术 并发与并行 进程 程序与进程 进程调度 进程的状态 同步异步阻塞非阻塞 创建进程的两种方式 回收进程资源的两种方式 僵尸进程.孤儿进程.守护进程 进程互斥锁 进程间通信 队列 堆栈 生产者与消费者模型 线程 进程与线程的优缺点 线程间数据是共享的 GIL全局解释器锁 死锁与递归锁 死锁 递归锁 信号量 Event事件 线程队列 进程池与线程池 协程 gevent IO模型 多道技术 单道:一台哦到 多道: 时间上复用, 遇到IO操作就会切换,程序占用CPU时间过长就会切换 空间上

Linux进程间通信 -- 消息队列 msgget()、msgsend()、msgrcv()、msgctl()

下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处.有关命名管道的更多内容可以参阅我的另一篇文章:Linux进程间通信 -- 使用命名管道 一.什么是消息队列 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法.  每个数据块都被认为含有一个类型,接收进程可以独立地接收含有不同类型的数据结构.我们可以通过发送消息来避免命名管道的同步和阻塞问题.但是消息队列与命名管道一样,每个数据块都有一个最大长度的限制. Linux用宏MSGMAX和MSGMNB来限制一条

进程间通信(6) - 消息队列posix

1.前言 本篇文章的所有例子,基于RHEL6.5平台(linux kernal: 2.6.32-431.el6.i686). 2.介绍 消息队列是先进先出FIFO原则. 消息队列就是一个消息的链表.可以把消息看作一个记录,具有特定的格式以及特定的优先级.对消息队列有写权限的进程可以向其中按照一定的规则添加新消息:对消息队列有读权限的进程则可以从消息队列中读走消息.消息队列是随内核持续的. 目前主要有两种类型的消息队列:POSIX消息队列以及System V消息队列,System V消息队列目前被