skynet源码分析3:消息调度

消息调度在框架中分为两个层次,一个c层的分配,一个是lua层的分发。本文阐述的是c层,从两个方面来说:

  1. 工作线程的控制
  2. 信箱的调度

与调度相关的代码实现在/skynet-src/skynet_mq.c,/skynet-src/skynet_start.c,/skynet-src/skynet_server.c三个文件中,整体上是一个m:n的调度器。



工作线程的控制


框架运行后,会启动固定的线程来轮流调度sc(skynet_context),线程数由配置文件中的thread字段定义,默认是4个。那框架中是如何控制这些线程的呢?具体实现在/skynet-src/skynet_start.c中。

在208行,启动了工作线程:

static int weight[] = {
        -1, -1, -1, -1, 0, 0, 0, 0,
        1, 1, 1, 1, 1, 1, 1, 1,
        2, 2, 2, 2, 2, 2, 2, 2,
        3, 3, 3, 3, 3, 3, 3, 3, };
    struct worker_parm wp[thread];
    for (i=0;i<thread;i++) {
        wp[i].m = m;
        wp[i].id = i;
        if (i < sizeof(weight)/sizeof(weight[0])) {
            wp[i].weight= weight[i];
        } else {
            wp[i].weight = 0;
        }
        create_thread(&pid[i+3], thread_worker, &wp[i]);
    }

直接来看线程函数thread_worker把,在152行:

 1 static void *
 2 thread_worker(void *p) {
 3     struct worker_parm *wp = p;
 4     int id = wp->id;
 5     int weight = wp->weight;
 6     struct monitor *m = wp->m;
 7     struct skynet_monitor *sm = m->m[id];
 8     skynet_initthread(THREAD_WORKER);
 9     struct message_queue * q = NULL;
10     while (!m->quit) {
11         q = skynet_context_message_dispatch(sm, q, weight);
12         if (q == NULL) {
13             if (pthread_mutex_lock(&m->mutex) == 0) {
14                 ++ m->sleep;
15                 // "spurious wakeup" is harmless,
16                 // because skynet_context_message_dispatch() can be call at any time.
17                 if (!m->quit)
18                     pthread_cond_wait(&m->cond, &m->mutex);
19                 -- m->sleep;
20                 if (pthread_mutex_unlock(&m->mutex)) {
21                     fprintf(stderr, "unlock mutex error");
22                     exit(1);
23                 }
24             }
25         }
26     }
27     return NULL;
28 }

控制这种生命周期与进程一致的工作线程,主要有两个细节:1、均匀不重复的分配任务。2、不空转、最小时延。前者处理线程同步就好。来看看skynet是如何处理后者的吧:

它用得是条件变量来处理空转的,用条件变量有两点好处:1、让出cpu时间片.2、由外部决定何时唤醒,这样可以在有任务时再唤醒,既能最大化的不空转,又能减小处理任务的时延。

具体实现是条件变量的标准应用了,和《unix高级编程》条件变量的例子几乎一样。这里还有一个sleep的计数,有什么用呢?用来判断要不要调用pthread_cond_signal的。

最后还有一个问题,等待的线程是在哪里被唤醒的呢?在socket线程和timer线程里唤醒的,前者有socket消息时会调用一次,后者每个刷新时间会唤醒一次。



信箱的调度

上一篇时,在sc里我们看到过一个message_queue类型的字段,这就是信箱。skynet中用了两种队列来存储消息并完成调度,下面称为12级队列,1级队列是一个单链表,每个节点是2级队列,2级队列(message_queue)是一个自动扩展的循环队列,用来存储消息。这两个队列实现在/skynet-src/skynet_mq.c中,实现的很简单,并没有用复杂的无锁结构,而是自旋锁保证线程安全的链表,循环队列。

信箱的调度就是12级队列的调度,整体结构描述如下:

while(1){

  1级队列出队;

  调度2级队列;

1级队列入队;

}

这部分实现在/skynet-src/skynet_server的275行skynet_context_message_dispatch()中:

 1 struct message_queue *
 2 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
 3     if (q == NULL) {
 4         q = skynet_globalmq_pop();
 5         if (q==NULL)
 6             return NULL;
 7     }
 8
 9     uint32_t handle = skynet_mq_handle(q);
10
11     struct skynet_context * ctx = skynet_handle_grab(handle);
12     if (ctx == NULL) {
13         struct drop_t d = { handle };
14         skynet_mq_release(q, drop_message, &d);
15         return skynet_globalmq_pop();
16     }
17
18     int i,n=1;
19     struct skynet_message msg;
20
21     for (i=0;i<n;i++) {
22         if (skynet_mq_pop(q,&msg)) {
23             skynet_context_release(ctx);
24             return skynet_globalmq_pop();
25         } else if (i==0 && weight >= 0) {
26             n = skynet_mq_length(q);
27             n >>= weight;
28         }
29         int overload = skynet_mq_overload(q);
30         if (overload) {
31             skynet_error(ctx, "May overload, message queue length = %d", overload);
32         }
33
34         skynet_monitor_trigger(sm, msg.source , handle);
35
36         if (ctx->cb == NULL) {
37             skynet_free(msg.data);
38         } else {
39             dispatch_message(ctx, &msg);
40         }
41
42         skynet_monitor_trigger(sm, 0,0);
43     }
44
45     assert(q == ctx->queue);
46     struct message_queue *nq = skynet_globalmq_pop();
47     if (nq) {
48         // If global mq is not empty , push q back, and return next queue (nq)
49         // Else (global mq is empty or block, don‘t push q back, and return q again (for next dispatch)
50         skynet_globalmq_push(q);
51         q = nq;
52     }
53     skynet_context_release(ctx);
54
55     return q;
56 }

这个函数的作用是,调度传入的2级队列,并返回下一个可调度的2级队列。在上面的实现中,有四个细节之处:

1、22-24行,当2级队列为空时并没有将其压入1级队列,那它从此就消失了吗?不,这样做是为了减少空转1级队列,那这个2级队列是什么时候压回的呢?在message_queue中,有一个

in_global标记是否在1级队列中,当2级队列的出队(skynet_mq_pop)失败时,这个标记就会被置0,在2级队列入队时(skynet_mq_push)会判断这个标记,如果为0,那么就会将自己压入1级队列。(skynet_mq_mark_release也会判断)所以这个2级队列在下次入队时会压回。

2、25-27,修改了for循环的次数,也就是每次调度处理多少条消息。这个次数与传入的weight有关,我们回过头来看这个weight是从哪里来的,源头在工作线程创建时:

static int weight[] = {
        -1, -1, -1, -1, 0, 0, 0, 0,
        1, 1, 1, 1, 1, 1, 1, 1,
        2, 2, 2, 2, 2, 2, 2, 2,
        3, 3, 3, 3, 3, 3, 3, 3, };
    struct worker_parm wp[thread];
    for (i=0;i<thread;i++) {
        wp[i].m = m;
        wp[i].id = i;
        if (i < sizeof(weight)/sizeof(weight[0])) {
            wp[i].weight= weight[i];
        } else {
            wp[i].weight = 0;
        }
        create_thread(&pid[i+3], thread_worker, &wp[i]);
    }

再来看看 n >>= weight,嗯,大致就是:把工作线程分为组,前四组每组8个,超过的归入第5组,AE组每次调度处理一条消息,B组每次处理(n/2)条,C组每次处理(n/4)条,D组每次处理(n/8)条。是为了均匀的使用多核。

3、29-32做了一个负载判断,负载的阀值是1024。不过也仅仅是输出一条log提醒一下而以.

4、34、42触发了一下monitor,这个监控是用来检测消息处理是否发生了死循环,不过也仅仅只是输出一条log提醒一下。这个检测是放在一个专门的监控线程里做的,判断死循环的时间是5秒。具体机制这里就不说了,其实现在/skynet-src/skynet_monitor.c中

时间: 2024-10-05 16:37:02

skynet源码分析3:消息调度的相关文章

skynet源码分析:服务

skynet是为多人在线游戏打造的轻量级服务端框架,使用c+lua实现.使用这套框架的一个好处就是,基本只需要lua,很少用到c做开发,一定程度上提高了开发效率. skynet的例子是怎么调用的 服务器: simpledb.lua: skynet.register "SIMPLEDB" 向skynet里注册一个服务 agent.lua: skynet.call("SIMPLEDB", "text", text) 调用相应的服务 main.lua:

源码分析 Kafka 消息发送流程(文末附流程图)

温馨提示:本文基于 Kafka 2.2.1 版本.本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构. 从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下: Future<RecordMetadata> send(ProducerRecord<K, V> record) Future<RecordMetada

skynet源码分析1:开篇明义

skynet是云风基于actor模型实现的一个服务器框架,核心七千多行c代码,并提供了一个lua binding.写得比较简明,用起来比较爽快,很合我的胃口,再加之决定在公司最近的一个项目上skynet,所以就决定精读一遍源码,将所思所想所得记录于此,以便用起来心安理得. skynet的实现基于如下几条actor行为: 能创建,销毁其它actor,actor间为平级关系. 能发送消息给其它actor,能接收其它actor的消息. actor间的交互只能通过消息传递. 能处理自己的消息. 在sky

飞鸽传书源码分析二消息机制

转载请注明出处:http://blog.csdn.net/mxway/article/details/40225725 本篇文章是在飞鸽传书2.06源码的基础的分析的. 飞鸽传书的消息大致可分为三类:普通窗口类(后面以TMainWin为例进行分析)消息,对话框类(后面以TSendDlg为例进行分析)消息,对话框控件(后面以TEditSub为例进行分析)消息.这三类消息先合后分,这三类窗口设置的消息处理函数都是TApp::WinProc,在TApp::WinProc函数中再分发给各自的消息处理函数

Akka源码分析-Remote-发消息

上一篇博客我们介绍了remote模式下Actor的创建,其实与local的创建并没有太大区别,一般情况下还是使用LocalActorRef创建了Actor.那么发消息是否意味着也是相同的呢? 既然actorOf还是委托给了LocalActorRef,那么在本地创建的Actor发消息还是跟以前一样的,那么如果如何给远程的Actor发消息呢?我们一般是通过actorSelection或者给远程Actor发送一个Identify消息,来接收对应的ActorRef,然后再发消息.我们来分析一下这两者的区

Akka源码分析-Remote-收消息

上一遍博客中,我们分析了网络链接建立的过程,一旦建立就可以正常的收发消息了.发送消息的细节不再分析,因为对于本地的actor来说这个过程相对简单,它只是创立链接然后给指定的netty网路服务发送消息就好了.接收消息就比较麻烦了,因为这对于actor来说是透明的,netty收到消息后如何把消息分发给指定的actor呢?这个分发的过程值得研究研究. 之前分析过,在监听创立的过程中,有一个对象非常关键:TcpServerHandler.它负责链接建立.消息收发等功能.TcpServerHandler继

android smack源码分析——接收消息以及如何解析消息

在android里面用的smack包其实叫做asmack,该包提供了两种不同的连接方式:socket和httpclient.该并且提供了很多操作xmpp协议的API,也方便各种不同自定义协议的扩展.我们不需要自己重新去定义一套接收机制来扩展新的协议,只需继承然后在类里处理自己的协议就可以了.而本文今天主要说两点,一点就是消息是如何接收的,另一点就是消息是如何通知事件的. 总的思路 1.使用socket连接服务器 2.将XmlPullParser的数据源关联到socket的InputStream

rocketmq源码分析3-consumer消息获取

使用rocketmq的大体消息发送过程如下: 在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pull message)动作.. 1. 如何找到入口(MQ-broker端) 分析一个机制或者功能时,我们首先希望的是找到入口,前一篇我们是通过端口号方式顺藤摸瓜的方式找到了入口.但是此篇略微不同,涉及到consumer客户端与broker的两边分析,最终发现逻辑还是比较

skynet源码分析5:lua绑定之地基

前面四篇已经涵盖了skynet的c层核心,剩下的timer,socket模块本身和actor模型没什么关系,且比较独立,最后再看吧.光用skynet的c接口,是很难在这上面写业务逻辑的,所以要找一种更爽快的方式来使用.官方推荐的是lua,利用lua的协程对skynet的消息分发做了封装,使得actor之间的异步消息通信有同步一样的操作感,并且做了一些的扩展模块来方便使用.lua简洁实用的风格我个人也很钟意. 要想做一个lua binding来使用,要有两个必要条件: 根据skynet的模块契约实