线程邮箱

在多线程开发中,消息队列是一种有效的线程间通讯方式.我在开发KendyNet的过程中一直在寻找一种高效而易用的消息队列实现.

期间使用过的一种实现可参考message queue的设计。这个实现的消息队列是相当高效的,但其存在的一个问题是,如果发送方相

对较慢,则需要一个定时机制以固定间隔将本线程中缓存的待发送消息同步到共享队列中,这也导致了消息有一定的延时.

然后我还考虑过无锁实现的队列,但无锁队列有一个问题,就是当队列为空的时候,不能给消息消费者提供一种可被唤醒的休眠手段.

下面的示例代码是我在很多线程的网络程序中看到的一种不大合理的处理流程:

while(msg = get_send_packet()){
    process_send(msg)
}
sleep(1)

我的KendyNet希望提供一种手段,将网络消息,redis回调,消息队列消息的到达,标准输入,定时器等事件统一到一个消息分发器中

(有点类似libevent,只是不提供信号的处理但增加了消息队列的处理).因此,我选择了链表+管道通知的方式去实现消息队列.

在实现方案确定之后,应该考虑如何提供简洁的接口和保证多线程安全的访问消息队列.

我的设计是这样的,每个线程设置一个唯一的mailbox用于存放来自其它线程的消息

typedef void (*cb_on_mail)(kn_thread_mailbox_t *from,void *);

void kn_setup_mailbox(engine_t,int mode,cb_on_mail);

通过调用kn_setup_mailbox可以设置当前线程的mailbox,其中的engine_t是消息分发器,当有消息到达时由它负责弹出消息并调用用

户传进的cb_on_mail以处理消息.调用kn_setup_mailbox 其它线程就可以向它发送消息了.

关于第二个参数mode,可选的参数是MODE_FASTMODE_FAIR,具体的含义在后面解释.

int  kn_send_mail(kn_thread_mailbox_t,void *msg,void (*fn_destroy)(void*));

通过kn_send_mail可以向一个线程邮箱发送消息,fn_destroy消息的自动销毁,如果消息没有销毁需要可以传NULL.

还有一个问题,如果获得其它线程的邮箱:

kn_thread_mailbox_t kn_query_mailbox(pthread_t);

当一个线程调用kn_setup_mailbox之后,任何线程都可以使用kn_query_mailbox,通过传入目标线程的tid来获得它的线程邮箱.

还有一个关键的问题.邮箱的生存期如何控制.显然它的生存期应该和线程的生存期一致.请看下面的场景.

A 向 B发送消息, B 收到消息后给 A回消息,这个时候A被销毁,也就是说它的邮箱也被销毁.那么B在往 A的邮箱发送消息时就

可能访问已经销毁的对象.

为了处理这个问题,kn_thread_mailbox_t被定义成ident类型,实现在访问真实的对象之前首先会尝试将ident转换成线程邮箱的

指针,如果邮箱被销毁,则返回的是空指针.具体原理可以参考处理由引用计数引起的泄漏 。

现在解释下MODE_FASTMODE_FAIR,MODE_FAST意味着快速处理消息模式,在此模式下邮箱消息将被优先处理.而MODE_FAIR模式意味

着公平模式,也就是消息处理的优先级与其它事件是一样的.看下实现代码就可以知道两种模式的区别:

MODE_FAST

static inline  struct mail* kn_getmail(kn_thread_mailbox *mailbox){
    struct  mail *mail = (struct mail*)kn_list_pop(&mailbox->private_queue);
    if(mail) return mail;
    LOCK(mailbox->mtx);
    if(!kn_list_size(&mailbox->global_queue)){
        while(TEMP_FAILURE_RETRY(read(mailbox->notifyfd,buf,4096)) > 0);
        mailbox->wait = 1;
        UNLOCK(mailbox->mtx);
        return NULL;
    }else{
        kn_list_swap(&mailbox->private_queue,&mailbox->global_queue);
    }
    UNLOCK(mailbox->mtx);
    return (struct mail*)kn_list_pop(&mailbox->private_queue);
}

static void on_events_fast(handle_t h,int events){
    kn_thread_mailbox *mailbox = (kn_thread_mailbox*)h;
    struct mail *mail;
    int n = 65536;//关键参数
    while((mail = kn_getmail(mailbox)) != NULL && n > 0){
        kn_thread_mailbox_t *sender = NULL;
        if(mail->sender.ptr) sender = &mail->sender;
        mailbox->cb_on_mail(sender,mail->data);
        if(mail->fn_destroy) mail->fn_destroy(mail->data);
        free(mail);
        --n;
    }
}

MODE_FAIR

static void on_events_fair(handle_t h,int events){
    kn_thread_mailbox *mailbox = (kn_thread_mailbox*)h;
    struct mail *mail = NULL;
    do{
        mail = (struct mail*)kn_list_pop(&mailbox->private_queue);
        if(mail) break;
        LOCK(mailbox->mtx);
        kn_list_swap(&mailbox->private_queue,&mailbox->global_queue);
        mail = (struct mail*)kn_list_pop(&mailbox->private_queue);
        if(mail){
            UNLOCK(mailbox->mtx);
            break;
        }
        while(TEMP_FAILURE_RETRY(read(mailbox->notifyfd,buf,4096)) > 0);
        mailbox->wait = 1;
        UNLOCK(mailbox->mtx);
    }while(0);

    if(mail){
        kn_thread_mailbox_t *sender = NULL;
        if(mail->sender.ptr) sender = &mail->sender;
        mailbox->cb_on_mail(sender,mail->data);
        if(mail->fn_destroy) mail->fn_destroy(mail->data);
        free(mail);
    }
}

MODE_FAST下,on_events_fast会尽量多的从线程邮箱中弹出消息并立即调用器消息回调.这意味着 如果邮箱消息很多,其它事件

的处理就会被延后.而MODE_FAIR下,on_events_fair每次仅从线程邮箱中弹出一个消息并处理.之后重新回到消息分发器中,让其它

事件有执行的机会.

完整的源码请查看:https://github.com/sniperHW/KendyNet/blob/master/refactoring/src/knthreadmailbox.c

线程邮箱,布布扣,bubuko.com

时间: 2024-12-28 08:58:03

线程邮箱的相关文章

zmq笔记一: 对象关系

int major, minor, patch;zmq_version(&major, &minor, &patch); //4.2.0 本文主要是分析代码,方便自己日后查阅. ========================================= 1.上下文对象以及socket对象创建 void *context = zmq_ctx_new(); //创建上下文对象 void *responder = zmq_socket(context, ZMQ_REP); //创

各科基础详实

一. Java基础部分 1. JAVA的基本数据类型有哪些 ?  String 是不是基本数据类型 ? 2. 一个".java"源文件中是否可以包括多个类(不是内部类)?有什么限制? 3. Java有没有goto? 7 4. 说说&和&&的区别. 7 5. 在JAVA中如何跳出当前的多重嵌套循环? 7 6. switch语句能否作用在byte上,能否作用在long上,能否作用在String上? 8 7. short s1 = 1; s1 = s1 + 1;有什么

用户注册的邮箱激活模块的设计与实现

----------------------------------------------------------------------------------------------[版权申明:本文系作者原创,转载请注明出处] 文章出处:http://blog.csdn.net/sdksdk0/article/details/52144698作者:朱培      ID:sdksdk0      邮箱: [email protected] --------------------------

在Maven项目中关于SSM框架中邮箱验证登陆

1.你如果要在maven项目中进行邮箱邮箱验证,你首先要先到pom.xml文件中配置mail.jar,activation.jar包 <dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4</version> </dependency> <dependency> <groupId

C#高级知识点概要(2) - 线程和并发

原文地址:http://www.cnblogs.com/Leo_wl/p/4192935.html 我也想过跳过C#高级知识点概要直接讲MVC,但经过前思后想,还是觉得有必要讲的.我希望通过自己的经验给大家一些指引,带着大家一起走上ASP.NET MVC大牛之路,少走弯路.同时也希望能和大家一起交流,这样也能发现我自己的不足,对我自己的帮助也是非常大的. 建议大家对C#撑握的不错的时候,可以去看一些开源项目.走技术这条路,就要耐得住寂寞(群里双休日说要让群主找妹子进群的人必须反思),练好内功.不

【Python之路Day11】网络篇之线程、进程、协程

目录: 基本概念 线程 进程 协程  一. 基本概念 现在的操作系统,如Unix.Linux.Windows.Mac OS X等,都是支持“多任务”的操作系统. 什么叫”多任务“呢?简单理解,就是我们可以一般上网浏览某车之家的网页,看看喜欢的车型信息:一边打开某易云音乐听听好歌:一边打开某软件股市行情图,不安的盯着曲线图...卧槽,又尼玛跌了!  这就是多任务喽. 多核心的CPU已经很普及了,但是,就是在过去的单核心CPU上,也可以执行多任务. PS: CPU是分时间片的,假设任务1执行0.01

Spring集成JavaMail并利用线程池发送邮件

我们系统存在大量发送邮件的需求,项目使用的是Spring框架而JavaMail也能很好的跟Spring进行集成,由于发送邮件最好还是使用异步进行发送,所以这里就采用线程池+JavaMail进行邮件发送,下面看具体代码实现: Step1.引入JavaMail <mail.version>1.4.7</mail.version> <dependency> <groupId>javax.mail</groupId> <artifactId>

小程序[邮箱提取器-EmailSplider]总结

1.背景情况 学东西做快的是付诸实践,写这个小程序的目的就是为了综合运用各个知识点,从而提升学习的效果. 2.涉及知识 A.Swing 的布局 B.Swing中,线程访问UI C.URLConnection 读取网页源码 D.IO流的基本操作 E.正则表达式的基本使用 F.Window Builder插件的发现和使用 G.jar包的制作和双击jar运行的修复 H.jdk1.8的新特性,优雅的 lambda 语法 3.效果图 1.windows上运行效果 ↓ 2.linux上运行效果 ↓ 4.源代

Java与邮件系统交互之使用Socket验证邮箱是否存在

最近遇到一个需求:需要验证用户填写的邮箱地址是否真实存在,是否可达.和普通的正则表达式不同,他要求尝试链接目标邮箱服务器并请求校验目标邮箱是否存在. 先来了解 DNS之MX记录 对于DNS不了解的,请移步百度搜索. DNS中除了A记录(域名-IP映射)之外,还有MX记录(邮件交换记录),CNAME记录(别名,咱不管). MX记录就是为了在发送邮件时使用友好域名规则,比如我们发送到QQ邮箱[email protected].我们填写地址是到“qq.com”,但实际上可能服务器地址千奇百怪/而且许有