Linux编程之自定义消息队列

我这里要讲的并不是IPC中的消息队列,我要讲的是在进程内部实现自定义的消息队列,让各个线程的消息来推动整个进程的运动。进程间的消息队列用于进程与进程之间的通信,而我将要实现的进程内的消息队列是用于有序妥当处理来自于各个线程请求,避免一窝蜂的请求而导致消息的异常丢失。想想socket编程里的listen函数吧,里面要设置一个队列长度的参数,其实来自网络的请求已经排成一个请求队列了,只是这个队列是系统帮我们做好了,我们看不到而已。如果系统不帮我们做这个等待队列的话,那就需要我们程序员在应用层实现了。

进程内的消息队列实现并不难,总的来说有以下几点:

  • 自定义消息结构,并构造队列
  • 一个线程负责依次从消息队列中取出消息,并处理该消息
  • 多个线程产生事件,并将消息放进消息队列,等待处理

长话短说,我们开始动手吧!

一、定义消息结构

先贴代码再解释:

typedef struct Msg_Hdr_s  
{  
    uint32 msg_type;  
    uint32 msg_len;  
    uint32 msg_src;  
    uint32 msg_dst;      
}Msg_Hdr_t;  
  
typedef struct Msg_s  
{  
    Msg_Hdr_t hdr;  
    uint8 data[100];  
} Msg_t;

下面是我设计的消息格式内容的解释:

  • msg_type:标记消息类型,当消息接收者看到该msg_type后就知道他要干什么事了
  • msg_len:消息长度,待扩展,暂时没用到(以后会扩展为变长消息)
  • msg_src:消息的源地址,即消息的发起者
  • msg_dst:消息的目的地,即消息的接受者
  • data[100]:消息除去消息头外可以携带的信息量,定义为100字节

由该消息数据结构可以知道,这个消息是定长的,当然也可以实现为变长消息,但现在暂不实现,今天先把定长消息实现了,以后再完善变长消息。

二、构造循环队列

队列可以由链表实现,也可以由数组实现,这里就使用数组实现的循环链表作为我们消息队列的队列模型。

typedef struct Queue_s  
{  
    int head;  
    int rear;  
    sem_t sem;  
    Msg_t data[QUEUE_SIZE];  
}Queue_t;  
  
int MsgQueueInit(Queue_t* Q)  
{  
    if(!Q)  
    {  
        printf("Invalid Queue!\n");  
        return -1;  
    }  
    Q->rear = 0;  
    Q->head = 0;  
    sem_init(&Q->sem, 0, 1);  
    return 0;      
}  
  
int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
{  
    if(!Q)  
    {  
        printf("Invalid Queue!\n");  
        return -1;  
    }  
    if(Q->rear == Q->head) //only one consumer,no need to lock head  
    {  
        printf("Empty Queue!\n");  
        return -1;  
    }  
    memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
    Q->head = (Q->head+1)%QUEUE_SIZE;  
    return 0;         
  
}  
  
int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
{  
    if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
    {  
        printf("Full Queue!\n");  
        return -1;  
    }  
    sem_wait(&Q->sem);  
    memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
    Q->rear = (Q->rear+1)%QUEUE_SIZE;  
    sem_post(&Q->sem);  
    return 0;  
}

循环队列的实现想必大家都比较熟悉,但这里需要提示的几点是:

  • 队列中应加入信号量或锁来保证进队时的互斥访问,因为多个消息可能同时进队,互相覆盖其队列节点
  • 这里的信号量仅用于进队而没用于出队,理由是消息处理者只有一个,不存在互斥的情形

三、构造消息处理者

if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
{  
    printf("create handler thread fail!\n");  
    return -1;          
}  
  
void msg_printer(Msg_t* msg)  
{  
    if(!msg)  
    {  
        return;  
    }  
    printf("%s: I have recieved a message!\n", __FUNCTION__);  
    printf("%s: msgtype:%d   msg_src:%d  dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
  
}  
  
void msg_handler()  
{  
    sleep(5);  //let‘s wait 5s when starts  
    while(1)  
    {  
        Msg_t msg;  
        memset(&msg, 0 ,sizeof(Msg_t));  
        int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
        if(res != 0)  
        {  
            sleep(10);  
            continue;  
        }  
        msg_printer(&msg);  
        sleep(1);  
    }  
}

我在进程里create了一个线程作为消息处理者(handler)来处理消息队列的消息,甘进入该线程时先等个5秒钟来让生产者往队列里丢些消息,然后再开始消息处理。当队列没消息可取时,就休息十秒,再去取消息。

这里的消息处理很简单,我只是简单地将受到的消息打印一下,证明受到的消息正是其他线程发给我的。当然,你也可以在这里扩展功能,根据受到的消息类型进一步决定该做什么事。比如:

enum MSG_TYPE  
{  
    GO_HOME,  
    GO_TO_BED,  
    GO_TO_LUNCH,  
    GO_TO_CINAMA,  
    GO_TO_SCHOOL,  
    GO_DATEING,  
    GO_TO_WORK,//6  
};  
  
void handler()  
{  
    switch(msgtype)  
    {  
        case GO_HOME: go_home(); break;  
        case GO_TO_BED:  go_to_bed(); break;  
        .......  
    }  
}

这里的handler就是一个简单的状态机了,根据给定的消息类型(事件)去做特定的事,推动状态机的转动。

四、构造消息生产者

if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
{  
    printf("create thread1 fail!\n");  
    return -1;  
}  
  
if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
{  
    printf("create thread2 fail!\n");  
    return -1;  
}      
  
if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
{  
    printf("create thread3 fail!\n");  
    return -1;  
}      
  
  
void msg_sender1()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD1;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread1 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}  
  
void msg_sender2()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD2;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread2 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}  
  
void msg_sender3()  
{  
    int i = 0;  
    while(1)  
    {  
        if(i > 10)  
        {  
            i = 0;  
        }  
        Msg_t msg;  
        msg.hdr.msg_type = i++;  
        msg.hdr.msg_src = THREAD3;  
        msg.hdr.msg_dst = HANDLER;  
        MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
        printf("%s: Thread3 send a message!\n",__FUNCTION__);  
        sleep(1);  
    }  
}

这里我create了三个线程来模拟消息生产者,每个生产者每隔1秒往消息队列里写消息。

五、跑起来看看

先贴完整的代码:

msg_queue.c:

  1 #include <stdio.h>  
  2 #include <pthread.h>  
  3 #include <semaphore.h>  
  4 #include <unistd.h>  
  5 #include <string.h>  
  6 #include "msg_def.h"  
  7   
  8 Queue_t MsgQueue;  
  9   
 10 int main(int argc, char* argv[])  
 11 {  
 12     int ret;  
 13     pthread_t thread1_id;  
 14     pthread_t thread2_id;  
 15     pthread_t thread3_id;  
 16     pthread_t handler_thread_id;  
 17   
 18     ret = MsgQueueInit((Queue_t*)&MsgQueue);  
 19     if(ret != 0)  
 20     {  
 21         return -1;  
 22     }  
 23   
 24     if(pthread_create(&handler_thread_id, NULL, (void*)msg_handler, NULL))  
 25     {  
 26         printf("create handler thread fail!\n");  
 27         return -1;          
 28     }  
 29   
 30   
 31     if(pthread_create(&thread1_id, NULL, (void*)msg_sender1, NULL))  
 32     {  
 33         printf("create thread1 fail!\n");  
 34         return -1;  
 35     }  
 36   
 37     if(pthread_create(&thread2_id, NULL, (void*)msg_sender2, NULL))  
 38     {  
 39         printf("create thread2 fail!\n");  
 40         return -1;  
 41     }      
 42   
 43     if(pthread_create(&thread3_id, NULL, (void*)msg_sender3, NULL))  
 44     {  
 45         printf("create thread3 fail!\n");  
 46         return -1;  
 47     }      
 48   
 49   
 50     while(1)  
 51     {      
 52         sleep(1);  
 53     }  
 54   
 55     return 0;  
 56 }  
 57   
 58   
 59   
 60   
 61 int MsgQueueInit(Queue_t* Q)  
 62 {  
 63     if(!Q)  
 64     {  
 65         printf("Invalid Queue!\n");  
 66         return -1;  
 67     }  
 68     Q->rear = 0;  
 69     Q->head = 0;  
 70     sem_init(&Q->sem, 0, 1);  
 71     return 0;      
 72 }  
 73   
 74 int MsgDeQueue(Queue_t* Q, Msg_t* msg)  
 75 {  
 76     if(!Q)  
 77     {  
 78         printf("Invalid Queue!\n");  
 79         return -1;  
 80     }  
 81     if(Q->rear == Q->head) //only one cosumer,no need to lock head  
 82     {  
 83         printf("Empty Queue!\n");  
 84         return -1;  
 85     }  
 86     memcpy(msg, &(Q->data[Q->head]), sizeof(Msg_t));  
 87     Q->head = (Q->head+1)%QUEUE_SIZE;  
 88     return 0;         
 89   
 90 }  
 91   
 92 int MsgEnQueue(Queue_t* Q, Msg_t* msg)  
 93 {  
 94     if(Q->head == (Q->rear+1)%QUEUE_SIZE)  
 95     {  
 96         printf("Full Queue!\n");  
 97         return -1;  
 98     }  
 99     sem_wait(&Q->sem);  
100     memcpy(&(Q->data[Q->rear]), msg, sizeof(Msg_t));  
101     Q->rear = (Q->rear+1)%QUEUE_SIZE;  
102     sem_post(&Q->sem);  
103     return 0;  
104 }  
105   
106 void msg_printer(Msg_t* msg)  
107 {  
108     if(!msg)  
109     {  
110         return;  
111     }  
112     printf("%s: I have recieved a message!\n", __FUNCTION__);  
113     printf("%s: msgtype:%d   msg_src:%d  dst:%d\n\n",__FUNCTION__,msg->hdr.msg_type,msg->hdr.msg_src,msg->hdr.msg_dst);  
114   
115 }  
116   
117 int msg_send()  
118 {  
119   
120     Msg_t msg;  
121     msg.hdr.msg_type = GO_HOME;  
122     msg.hdr.msg_src = THREAD1;  
123     msg.hdr.msg_dst = HANDLER;  
124     return MsgEnQueue((Queue_t*)&MsgQueue, &msg);      
125   
126 }  
127   
128 void msg_handler()  
129 {  
130     sleep(5);  //let‘s wait 5s when starts  
131     while(1)  
132     {  
133         Msg_t msg;  
134         memset(&msg, 0 ,sizeof(Msg_t));  
135         int res = MsgDeQueue((Queue_t*)&MsgQueue, &msg);  
136         if(res != 0)  
137         {  
138             sleep(10);  
139             continue;  
140         }  
141         msg_printer(&msg);  
142         sleep(1);  
143     }  
144 }  
145   
146   
147 void msg_sender1()  
148 {  
149     int i = 0;  
150     while(1)  
151     {  
152         if(i > 10)  
153         {  
154             i = 0;  
155         }  
156         Msg_t msg;  
157         msg.hdr.msg_type = i++;  
158         msg.hdr.msg_src = THREAD1;  
159         msg.hdr.msg_dst = HANDLER;  
160         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
161         printf("%s: Thread1 send a message!\n",__FUNCTION__);  
162         sleep(1);  
163     }  
164 }  
165   
166 void msg_sender2()  
167 {  
168     int i = 0;  
169     while(1)  
170     {  
171         if(i > 10)  
172         {  
173             i = 0;  
174         }  
175         Msg_t msg;  
176         msg.hdr.msg_type = i++;  
177         msg.hdr.msg_src = THREAD2;  
178         msg.hdr.msg_dst = HANDLER;  
179         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
180         printf("%s: Thread2 send a message!\n",__FUNCTION__);  
181         sleep(1);  
182     }  
183 }  
184   
185 void msg_sender3()  
186 {  
187     int i = 0;  
188     while(1)  
189     {  
190         if(i > 10)  
191         {  
192             i = 0;  
193         }  
194         Msg_t msg;  
195         msg.hdr.msg_type = i++;  
196         msg.hdr.msg_src = THREAD3;  
197         msg.hdr.msg_dst = HANDLER;  
198         MsgEnQueue((Queue_t*)&MsgQueue, &msg);  
199         printf("%s: Thread3 send a message!\n",__FUNCTION__);  
200         sleep(1);  
201     }  
202 }

msg_def.h:

 1 #include <stdio.h>  
 2 #include <pthread.h>  
 3 #include <semaphore.h>  
 4   
 5 typedef unsigned char uint8;  
 6 typedef unsigned short unit16;  
 7 typedef unsigned int uint32;  
 8   
 9 #define QUEUE_SIZE 1000  
10   
11 typedef struct Msg_Hdr_s  
12 {  
13     uint32 msg_type;  
14     uint32 msg_len;  
15     uint32 msg_src;  
16     uint32 msg_dst;      
17 }Msg_Hdr_t;  
18   
19 typedef struct Msg_s  
20 {  
21     Msg_Hdr_t hdr;  
22     uint8 data[100];  
23 } Msg_t;  
24   
25 typedef struct Queue_s  
26 {  
27     int head;  
28     int rear;  
29     sem_t sem;  
30     Msg_t data[QUEUE_SIZE];  
31 }Queue_t;  
32   
33 typedef struct Queue_s QueueNode;  
34   
35 enum MSG_TYPE  
36 {  
37     GO_HOME,  
38     GO_TO_BED,  
39     GO_TO_LUNCH,  
40     GO_TO_CINAMA,  
41     GO_TO_SCHOOL,  
42     GO_DATEING,  
43     GO_TO_WORK,//6  
44 };  
45   
46 enum SRC_ADDR  
47 {  
48     THREAD1,  
49     THREAD2,  
50     THREAD3,  
51     HANDLER,  
52 };  
53   
54   
55 int MsgQueueInit(Queue_t* Q);  
56 int MsgDeQueue(Queue_t* Q, Msg_t* msg);  
57 int MsgEnQueue(Queue_t* Q, Msg_t* msg);  
58 void msg_handler();  
59 void msg_sender1();  
60 void msg_sender2();  
61 void msg_sender3();  
62 void msg_printer(Msg_t* msg);  
63 int msg_send();

看看跑起来的现象:

Finish!

现在这套进程内的消息队列的架构在实际工程中非常实用(当然实际工程的框架会复杂健壮得多),很多工程都需要这种基于事件推动的思想来保证每条请求都可以有条不絮地执行,所以这个框架也是有用武之地的,尤其配合状态机非常适合!

时间: 2024-12-28 11:41:45

Linux编程之自定义消息队列的相关文章

【Linux编程】XSI IPC

三种IPC被称作XSI IPC,分别是: 消息队列 信号量 共享存储器 下面分别介绍三种IPC的用法. 1.消息队列 消息队列是消息的链接表,具有如下函数接口: msgget:创建一个新队列或打开一个现存的队列. msgsnd:将消息添加到队列尾端. msgrcv:从队列中取消息. 我们可以自行定义一个表示消息的结构体,它由类型字段和实际数据组成: struct mest_t { long type; // 消息类型 char text[512]; // 消息内容 }; 有了消息类型,当我们用m

Linux编程---线程

首先说一下线程的概念.其实就是运行在进程的上下文环境中的一个执行流.普通进程只有一条执行流,但是线程提供了多种执行的路径并行的局面. 同时,线程还分为核心级线程和用户级线程.主要区别在属于核内还是核外. 核心级线程,地位基本和进程相当,由内核调度.也就是说这种系统时间片是按线程来分配的.这种线程的好处就是可以适当的运用SMP,即针对多核CPU进行调度. 用户级线程,在用户态来调度.所以相对来说,切换的调度时间相对核心级线程来说要快不少.但是不能针对SMP进行调度. 对于现在的系统来说,纯粹的用户

笔记整理--Linux编程

linux c编程open() read() write()函数的使用方法及实例 | 奶牛博客 - Google Chrome (2013/8/31 17:56:10) 今天把文件IO操作的一些东东整理下.基本的,对于锁机制下次再整理.常用的文件IO函数有标题的三个open() read() write() .首先打开一个文件使用open()函数,然后可以获取到一个文件描述符,这个就是程序中调用这个打开文件的一个链接,当函数要求到文件描述符fd的时候就把这个返回值给函数即可.read跟write

Linux编程---套接字

网络相关的东西几乎都是建立在套接字之上.所以这个内容对于程序员来说还是蛮重要的啊. 其实套接字也就是一个特殊的设备文件而已,我始终不能明白为什么要叫套接字.这么个奇怪的名字.不过还是就这样算了吧.关键还是编程上.由于其重要性,我还是写的详细一点吧. 一.套接字 核心函数: int  socket(int domain,int type,int protocol); 这个函数在通信域domain中创建一个类型为type,使用协议protocol的套接字.并且返回一个描述字,也就是相当于打开了一个特

Linux编程---进程通信

Linux的通信方式主要有分类有下面几种: -匿名管道和FIFO有名管道 -消息队列,信号量和共享存储 -套接字 .对于套接字的进程通信,我就留在套接字的文章中再写了. 一.管道 管道是最古老的进程通信机制了.提供进程间的单向通信. 1.创建管道 int pipe(int fdes[2]); 实际上管道通过参数返回读和写的两个文件描述符.相当于是打开了两个文件吧.但是这个文件是特殊的pipe文件.fdes[0]表示的是输入,fdes[2]表示的是输出.注意,这个函数只创建一个文件,而不是创建两个

PHP高级编程之消息队列

PHP高级编程之消息队列 http://netkiller.github.io/journal/php.mq.html Mr. Neo Chen (陈景峰), netkiller, BG7NYT 中国广东省深圳市龙华新区民治街道溪山美地 518131 +86 13113668890 +86 755 29812080 <[email protected]> 版权声明 转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明. 文档出处: http://netkiller.github.i

【转】牛人整理分享的面试知识:操作系统、计算机网络、设计模式、Linux编程,数据结构总结

基础篇:操作系统.计算机网络.设计模式 一:操作系统 1. 进程的有哪几种状态,状态转换图,及导致转换的事件. 2. 进程与线程的区别. 3. 进程通信的几种方式. 4. 线程同步几种方式.(一定要会写生产者.消费者问题,完全消化理解) 5. 线程的实现方式. (也就是用户线程与内核线程的区别) 6. 用户态和核心态的区别. 7. 用户栈和内核栈的区别. 8. 内存池.进程池.线程池.(c++程序员必须掌握) 9. 死锁的概念,导致死锁的原因. 10. 导致死锁的四个必要条件. 11. 处理死锁

转 Linux kernel 无锁队列

struct kfifo { unsigned char *buffer;    /* the buffer holding the data */ unsigned int size;    /* the size of the allocated buffer */ unsigned int in;    /* data is added at offset (in % size) */ unsigned int out;    /* data is extracted from off. 

I/O重定向和管道——《Unix/Linux编程实践教程》读书笔记(第10章)

1.I/O重定向的概念与原因 及 标准输入.输出的标准错误的定义 所以的Unix I/O重定向都基于标准数据流的原理.三个数据了分别如下: 1)标准输入--需要处理的数据流 2)标准输出--结果数据流 3)标准错误输出--错误消息流 概念:所以的Unix工具都使用文件描述符0.1和2.标准输入文件的描述符是0,标准输出的文件描述符是1,而标准错误输出的文件描述符则是2.Unix假设文件描述符0.1.2已经被打开,可以分别进行读写操作. 通常通过shell命令行运行Unix系统工具时,stdin.