简单的线程消息队列实现

1. 线程使用场景
(1)流水线方式。根据业务特点,将一个流程的处理分割成多个线程,形成流水线的处理方式。产生的结果:延长单一流程的处理时间,提高系统整体的吞吐能力。
(2)线程池方式。针对处理时间比较长且没有内蕴状态的线程,使用线程池方式分流消息,加快对线程消息的处理,避免其成为系统瓶颈。
线程使用的关键是线程消息队列、线程锁、智能指针的使用。其中以线程消息队列最为重要。

2. 线程消息队列描述
所谓线程消息队列,就是一个普通的循环队列加上“多生产者-单(多)消费者的存/取操作”。流水线方式中的线程是单消费者,线程池方式中的线程是多消费者。
为了后文更好的描述问题,作如下说明:
(1)假定循环队列queue中, 入队操作put_queue, 出队操作get_queue。
(2)生产者消费者:生产者线程生产消息,放在一个空缓冲区中,供消费者线程消费,生产者生产消息(put_queue),如果缓冲区满,则被阻塞,消费者消费消息(get_queue),如果缓冲区空,则被阻塞。线程消息队列就是生产者消费者问题中的缓冲区,而它的生产者是不限定的,任何线程都可以作为生产者向其中进行put_queue操作,消费线程则可能是一个,也可能是多个。因此对循环队列的任何操作都要加锁,以保证线程安全。

3. 线程相关的操作
(1)pthread_t类型的创建、属性创建设置等。
这类具体可以: man pthread_creat; man pthread_attr_init; man pthread_detach; man pthread_join等查看
(2)pthread_mutex_t类型的操作。
这类具体可以: man pthread_mutex_init可以看到所有相关的操作。
(3)pthread_cond_t类型的操作。man pthread_cond_init。pthread_cond_t的wait和signal操作一定要和pthread_mutex_t的lock、unlock配合使用。类似于此:

4. linux的线程库
2.6之后的内核的默认使用的是redhat公司的NPTL(原生posix线程库),以前内核使用的是LinuxThreads库,两者的简单介绍可以看http://www.ibm.com/developerworks/cn/linux/l-threading.html。不过对于应用者,分析两者的区别和优劣也没什么大意义。这里特别提下NPTL的futex机制。借助该机制,pthread_mutex的性能大大提高,只要不进入竞争态,进程就不会陷入内核态。这点可以自己写示例程序,通过strace -c 跟踪进程的系统调用,另外还可以证实总是进入内核态的操作有pthread_cond_signal和sem_post。

5. 通过上面的分析,我们可以有如下结论:
(1)减少pthread_cond_signal和sem_post的调用,只在有必要的时候调用;
(2)尽量避免pthread_mutex进入竞争态。增大消息队列的大小,可以有效减少竞态条件的出现。

6. 实用的线程消息队列实现(msg_queue.h)

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

pthread_mutex_t mux;
pthread_cond_t cond_get, cond_put;

struct msg_queue {
    void** buffer; // 缓冲数据, .buffer = msg
    int size; // 队列大小,使用的时候给出稍大的size,可以减少进入内核态的操作
    int lget; // 取队列数据的偏移量
    int lput; // 放队列数据的偏移量
    int nData; // 队列中数据的个数,用来判断队列满/空
    int nFullThread; // 由于队列满而阻塞在put_queue的线程个数
    int nEmptyThread; // 由于队列空而阻塞在get_queue的线程个数
};

void* get_queue(struct msg_queue *q){
    void* data = NULL;
    pthread_mutex_lock(&mux);
    while(q->lget == q->lput && 0 == q->nData){
        // 此处循环判断的原因是:假设2个消费者线程在get_queue阻塞,然后两者都被激活,
        // 而其中一个线程运行比较块,快速消耗了2个数据,另一个线程醒来的时候已
        // 经没有新数据可以消耗了。这种情况是有可能的:比如,其它生产者线程快速
        // 调用put_queue两次,如果有2个线程在get_queue处阻塞,就会被同时激活,
        // 而完全有可能,其中一个被激活的线程获取到了cpu,快速处理了2个消息。

        // 对于循环队列,如果lget与lput相等,那么只有两种情况,
        // 1:nData不为0,队列满
        // 2:nData为0,队列空
        q->nEmptyThread++;
        pthread_cond_wait(&cond_get, &mux);
        q->nEmptyThread--;
    }
#ifdef DEBUG
    printf("get data! lget:%d", q->lget);
#endif
    data = (q->buffer)[q->lget++];
    if(q->lget == q->size){
        // queue用作循环队列
        q->lget = 0;
    }
    q->nData--;
#ifdef DEBUG
    printf(" nData:%d\n", q->nData);
#endif
    if(q->nFullThread){
        // 仅在必要时才调用pthread_cond_signal, 尽量少陷入内核态
        pthread_cond_signal(&cond_put);
    }
    pthread_mutex_unlock(&mux);
    return data;
}

void put_queue(struct msg_queue *q, void* data){
    pthread_mutex_lock(&mux);
    while(q->lget == q->lput && q->nData){
        q->nFullThread++;
        pthread_cond_wait(&cond_put, &mux);
        q->nFullThread--;
    }
#ifdef DEBUG
    printf("put data! lput:%d", q->lput);
#endif
    (q->buffer)[q->lput++] = data;
    if(q->lput == q->size){
        q->lput = 0;
    }
    q->nData++;
#ifdef DEBUG
    printf(" nData:%d\n", q->nData);
#endif
    if(q->nEmptyThread){
        pthread_cond_signal(&cond_get);
    }
    pthread_mutex_unlock(&mux);
}

7. demo程序(msg_queue.c)

#include "msg_queue.h"
struct msg_queue queue = {NULL, 10, 0, 0, 0, 0, 0};

void * produce(void * arg)
{
    pthread_detach(pthread_self());
    int i=0;
    while(1){
        put_queue(&queue, (void*)i++);
    }
}

void *consume(void *arg)
{
    int data;
    while(1){
        data = (int)(get_queue(&queue));
    }
}

int main()
{
    pthread_t pid;
    int i=0;

    pthread_mutex_init(&mux, 0);
    pthread_cond_init(&cond_get, 0);
    pthread_cond_init(&cond_put, 0);

    queue.buffer = malloc(queue.size * sizeof(void*));
    if(queue.buffer == NULL){
        printf("malloc failed!\n");
        exit(-1);
    }

    pthread_create(&pid, 0, produce, 0);
    pthread_create(&pid, 0, produce, 0);
    pthread_create(&pid, 0, produce, 0);
    pthread_create(&pid, 0, consume, 0);
    pthread_create(&pid, 0, consume, 0);
    pthread_create(&pid, 0, consume, 0);

    sleep(60);

    free(queue.buffer);
    pthread_mutex_destroy(&mux);
    pthread_cond_destroy(&cond_get);
    pthread_cond_destroy(&cond_put);
}

Reference: http://www.cppblog.com/CppExplore/archive/2008/01/15/41175.html

时间: 2024-10-10 10:37:38

简单的线程消息队列实现的相关文章

强化一下概念:程序自己不去取消息,消息不会自己跑过来运行(针对线程消息队列里的消息,也是绝大多数消息)

刚才看这段代码的时候: procedure TControl.SetBounds(ALeft, ATop, AWidth, AHeight: Integer); begin if CheckNewSize(AWidth, AHeight) and ((ALeft <> FLeft) or (ATop <> FTop) or (AWidth <> FWidth) or (AHeight <> FHeight)) then begin InvalidateCon

php-resque 简单的php消息队列

摘要: 消息队列是个好东西,各种×××MQ很多.然而看一下它们的文档,你得吓尿,什么鬼,我只是想用它触发个短信接口而已. 幸好也有简单的.这次是php-resque 安装 首先这货需要在linux下跑,非得用windows就别看了,也不是不能装,费劲且性能渣 得有composer,嫌慢的也拉倒吧,别看了,本文不介绍没有composer怎么办 安装composer也不是本文要点,在ubuntu(其实为了不折腾QQ我装的是deepin)中就是一行命令 php版本要高一点,5.4以下也别看了 该环境p

windows 窗口过程 线程消息队列

message loop window procedure message loop: a for loop in thread or winmain  GetMessage, TranslateMessage, and DispatchMessage GetMessage: get the message DispatchMessage: Dispatches a message to a window procedure. ----- winproc WindowProc callback

细说UI线程和Windows消息队列

在 Windows应用程序中,窗体是由一种称为" UI线程( User Interface Thread)"的特殊类型的线程创建的. 首先, UI线程是一种"线程",所以它具有一个线程应该具有的所有特征,比如有一个线程函数和一个线程 ID. 其次," UI线程"又是"特殊"的,这是因为 UI线程的线程函数中会创建一种特殊的对象--窗体,同时,还一并负责创建窗体上的各种控件. 窗体和控件大家都很熟悉了,这些对象具有接收用户操作的

【详解】消息队列和线程关系

1.进程-线程-消息队列 简单的来说,什么是进程?什么是线程?打个比方,你的程序要执行,操作系统就会把你的exe文件加载到内存中,那就生成一个进程了(当然还包含分配到的资源等):对于线程,你可以理解成是一个程序里的不同部分,这有点类似函数,所不同的是各个线程是同时执行的. 例如,你的主线程创建了另一个副线程,那么这两个线程是同时在工作的,不存在调用 - 返回的概念. 一个进程里可以有多个线程在执行,称为执行实例. shining:因为线程的资源是从进程资源中分配出来的,因此同一个进程中的多个线程

消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka). 四.JMS消息服务 讲消息队列就不得不提JMS .JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 在EJB架构中,有消息bean可以无缝的与JM消息服务集成.在J2EE架构模

大型网站架构系列:消息队列

出处:ITFLY8 网址:http://www.cnblogs.com/itfly8/p/5156155.html 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异

大型网站架构系列:消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

【翻译】DotNetMQ: 一个.NET版完整的消息队列系统

在一个大型的分布式系统中,消息队列是不可缺少的中间件,能很好的解决异步消息.应用解耦.均衡并发等问题.在.net中,偶然发现一个效率不错.安全可靠.功能齐全的消息组件,忍不住翻译过来,供大家快速预览. 注:原作者用windows服务启动消息队列服务,但是本人在win10上测试出错,可自行改成控制台启动消息队列服务,然后用第三方工具注册服务(如:SrvanyUI) 原文:http://www.codeproject.com/Articles/193611/DotNetMQ-A-Complete-M