在多线程开发中,消息队列是一种有效的线程间通讯方式.我在开发KendyNet的过程中一直在寻找一种高效而易用的消息队列实现.
期间使用过的一种实现可参考message queue的设计。这个实现的消息队列是相当高效的,但其存在的一个问题是,如果发送方相
对较慢,则需要一个定时机制以固定间隔将本线程中缓存的待发送消息同步到共享队列中,这也导致了消息有一定的延时.
然后我还考虑过无锁实现的队列,但无锁队列有一个问题,就是当队列为空的时候,不能给消息消费者提供一种可被唤醒的休眠手段.
下面的示例代码是我在很多线程的网络程序中看到的一种不大合理的处理流程:
while(msg = get_send_packet()){ process_send(msg) } sleep(1)
我的KendyNet希望提供一种手段,将网络消息,redis回调,消息队列消息的到达,标准输入,定时器等事件统一到一个消息分发器中
(有点类似libevent,只是不提供信号的处理但增加了消息队列的处理).因此,我选择了链表+管道通知的方式去实现消息队列.
在实现方案确定之后,应该考虑如何提供简洁的接口和保证多线程安全的访问消息队列.
我的设计是这样的,每个线程设置一个唯一的mailbox用于存放来自其它线程的消息
typedef void (*cb_on_mail)(kn_thread_mailbox_t *from,void *); void kn_setup_mailbox(engine_t,int mode,cb_on_mail);
通过调用kn_setup_mailbox
可以设置当前线程的mailbox,其中的engine_t
是消息分发器,当有消息到达时由它负责弹出消息并调用用
户传进的cb_on_mail
以处理消息.调用kn_setup_mailbox
其它线程就可以向它发送消息了.
关于第二个参数mode,可选的参数是MODE_FAST
和MODE_FAIR
,具体的含义在后面解释.
int kn_send_mail(kn_thread_mailbox_t,void *msg,void (*fn_destroy)(void*));
通过kn_send_mail
可以向一个线程邮箱发送消息,fn_destroy
消息的自动销毁,如果消息没有销毁需要可以传NULL.
还有一个问题,如果获得其它线程的邮箱:
kn_thread_mailbox_t kn_query_mailbox(pthread_t);
当一个线程调用kn_setup_mailbox
之后,任何线程都可以使用kn_query_mailbox
,通过传入目标线程的tid来获得它的线程邮箱.
还有一个关键的问题.邮箱的生存期如何控制.显然它的生存期应该和线程的生存期一致.请看下面的场景.
A 向 B发送消息, B 收到消息后给 A回消息,这个时候A被销毁,也就是说它的邮箱也被销毁.那么B在往 A的邮箱发送消息时就
可能访问已经销毁的对象.
为了处理这个问题,kn_thread_mailbox_t
被定义成ident
类型,实现在访问真实的对象之前首先会尝试将ident
转换成线程邮箱的
指针,如果邮箱被销毁,则返回的是空指针.具体原理可以参考处理由引用计数引起的泄漏 。
现在解释下MODE_FAST
和MODE_FAIR
,MODE_FAST
意味着快速处理消息模式,在此模式下邮箱消息将被优先处理.而MODE_FAIR
模式意味
着公平模式,也就是消息处理的优先级与其它事件是一样的.看下实现代码就可以知道两种模式的区别:
MODE_FAST
static inline struct mail* kn_getmail(kn_thread_mailbox *mailbox){ struct mail *mail = (struct mail*)kn_list_pop(&mailbox->private_queue); if(mail) return mail; LOCK(mailbox->mtx); if(!kn_list_size(&mailbox->global_queue)){ while(TEMP_FAILURE_RETRY(read(mailbox->notifyfd,buf,4096)) > 0); mailbox->wait = 1; UNLOCK(mailbox->mtx); return NULL; }else{ kn_list_swap(&mailbox->private_queue,&mailbox->global_queue); } UNLOCK(mailbox->mtx); return (struct mail*)kn_list_pop(&mailbox->private_queue); } static void on_events_fast(handle_t h,int events){ kn_thread_mailbox *mailbox = (kn_thread_mailbox*)h; struct mail *mail; int n = 65536;//关键参数 while((mail = kn_getmail(mailbox)) != NULL && n > 0){ kn_thread_mailbox_t *sender = NULL; if(mail->sender.ptr) sender = &mail->sender; mailbox->cb_on_mail(sender,mail->data); if(mail->fn_destroy) mail->fn_destroy(mail->data); free(mail); --n; } }
MODE_FAIR
static void on_events_fair(handle_t h,int events){ kn_thread_mailbox *mailbox = (kn_thread_mailbox*)h; struct mail *mail = NULL; do{ mail = (struct mail*)kn_list_pop(&mailbox->private_queue); if(mail) break; LOCK(mailbox->mtx); kn_list_swap(&mailbox->private_queue,&mailbox->global_queue); mail = (struct mail*)kn_list_pop(&mailbox->private_queue); if(mail){ UNLOCK(mailbox->mtx); break; } while(TEMP_FAILURE_RETRY(read(mailbox->notifyfd,buf,4096)) > 0); mailbox->wait = 1; UNLOCK(mailbox->mtx); }while(0); if(mail){ kn_thread_mailbox_t *sender = NULL; if(mail->sender.ptr) sender = &mail->sender; mailbox->cb_on_mail(sender,mail->data); if(mail->fn_destroy) mail->fn_destroy(mail->data); free(mail); } }
在MODE_FAST
下,on_events_fast
会尽量多的从线程邮箱中弹出消息并立即调用器消息回调.这意味着 如果邮箱消息很多,其它事件
的处理就会被延后.而MODE_FAIR
下,on_events_fair
每次仅从线程邮箱中弹出一个消息并处理.之后重新回到消息分发器中,让其它
事件有执行的机会.
完整的源码请查看:https://github.com/sniperHW/KendyNet/blob/master/refactoring/src/knthreadmailbox.c