Erlang点滴--当广播卡住时

很多时候游戏服务器总避免不了要向玩家广播一些消息,我所遇到的问题是假如这个广播操作由一个进程来对待广播玩家逐个进行的话,很容易让该进程卡住(尤其是在网络状况不好或者玩家数量太多的情况下)。

查了查大部分情况是卡在了fun prim_inet:send/3上,这其实是由于之前的广播直接调用了fun gen_tcp:send/2,而这其实是一个同步调用,一层一层剥代码就可以知道是怎么回事儿了。

首先这是fun gen_tcp:send/2的源码:

1 send(S, Packet) when is_port(S) ->
2     case inet_db:lookup_socket(S) of
3     {ok, Mod} ->
4         Mod:send(S, Packet);
5     Error ->
6         Error
7     end.

这里面会调用fun inet_tcp:send/2,所以再看看它的源码:

1 send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts).
2 send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).

最终事实上调用了fun prim_inet:send/3,这也就是最终卡住的那个函数,看看它的源码:

 1 send(S, Data, OptList) when is_port(S), is_list(OptList) ->
 2     ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
 3     try erlang:port_command(S, Data, OptList) of
 4     false -> % Port busy and nosuspend option passed
 5         ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
 6         {error,busy};
 7     true ->
 8         receive
 9         {inet_reply,S,Status} ->
10             ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
11             Status
12         end
13     catch
14     error:_Error ->
15         ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
16          {error,einval}
17     end.

这里可以看出它在调用fun erlang:port_command/3之后会调用一个receive来接收inet_reply消息。当然这里可以看出,假如这里调用fun erlang:port_command/3时传入了nosuspend参数,当端口忙时就不会再调用receive而卡住了。似乎这样子就可以解决问题了,但最后发现还是有可能卡住,尽管几率小了许多。再卡住那只可能是fun erlang:port_command/3了,这是它的源码:

1 port_command(Port, Data, Flags) ->
2     case case erts_internal:port_command(Port, Data, Flags) of
3          Ref when erlang:is_reference(Ref) -> receive {Ref, Res} -> Res end;
4          Res -> Res
5      end of
6     Bool when Bool == true; Bool == false -> Bool;
7     Error -> erlang:error(Error, [Port, Data, Flags])
8     end.

不难看出原来这里也有一个同步调用,亦即当调用了fun erts_internal:port_command/3之后返回的结果是一个ref时就是调用receive。那好吧,那么fun erts_internal:port_command/3还会不会卡住呢?这玩意儿就只有C语言实现了:

 1 BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3)
 2 {
 3     BIF_RETTYPE res;
 4     Port *prt;
 5     int flags = 0;
 6     Eterm ref;
 7
 8     if (is_not_nil(BIF_ARG_3)) {
 9     Eterm l = BIF_ARG_3;
10     while (is_list(l)) {
11         Eterm* cons = list_val(l);
12         Eterm car = CAR(cons);
13         if (car == am_force)
14         flags |= ERTS_PORT_SIG_FLG_FORCE;
15         else if (car == am_nosuspend)
16         flags |= ERTS_PORT_SIG_FLG_NOSUSPEND;
17         else
18         BIF_RET(am_badarg);
19         l = CDR(cons);
20     }
21     if (!is_nil(l))
22         BIF_RET(am_badarg);
23     }
24
25     prt = sig_lookup_port(BIF_P, BIF_ARG_1);
26     if (!prt)
27     BIF_RET(am_badarg);
28
29     if (flags & ERTS_PORT_SIG_FLG_FORCE) {
30     if (!(prt->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY))
31         BIF_RET(am_notsup);
32     }
33
34 #ifdef DEBUG
35     ref = NIL;
36 #endif
37
38     switch (erts_port_output(BIF_P, flags, prt, prt->common.id, BIF_ARG_2, &ref)) {
39     case ERTS_PORT_OP_CALLER_EXIT:
40     case ERTS_PORT_OP_BADARG:
41     case ERTS_PORT_OP_DROPPED:
42      ERTS_BIF_PREP_RET(res, am_badarg);
43     break;
44     case ERTS_PORT_OP_BUSY:
45     ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));
46     if (flags & ERTS_PORT_SIG_FLG_NOSUSPEND)
47         ERTS_BIF_PREP_RET(res, am_false);
48     else {
49         erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, prt);
50         ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_erts_internal_port_command_3],
51                  BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
52     }
53     break;
54     case ERTS_PORT_OP_BUSY_SCHEDULED:
55     ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));
56     /* Fall through... */
57     case ERTS_PORT_OP_SCHEDULED:
58     ASSERT(is_internal_ref(ref));
59     ERTS_BIF_PREP_RET(res, ref);
60     break;
61     case ERTS_PORT_OP_DONE:
62     ERTS_BIF_PREP_RET(res, am_true);
63     break;
64     default:
65     ERTS_INTERNAL_ERROR("Unexpected erts_port_output() result");
66     break;
67     }
68
69     if (ERTS_PROC_IS_EXITING(BIF_P)) {
70     KILL_CATCHES(BIF_P);    /* Must exit */
71     ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_ERROR);
72     }
73
74     return res;
75 }

从这段代码可以看出,如果调用fun erts_internal:port_command/3时传入了nosuspend参数的话,这个函数是不会再被卡住的了。

回过头来再说游戏里的广播消息,这东西很多时候并不需要太多的可靠性保证,万一没发给玩家那就没发嘛,这总比把我的进程卡死要好啊。因此在广播消息的时候可以考虑将fun gen_tcp:send/2改成最底层的fun erts_internal:port_command/3并且传入nosuspend参数,这样就可以保证广播的时候畅通无阻了。不过值得注意的是fun erts_internal:port_command/3的调用会造成进程可能会收到{inet_reply,S,Status}和{Ref, Res}这两类消息,需要对其作适当的处理。

时间: 2024-11-05 12:27:29

Erlang点滴--当广播卡住时的相关文章

Erlang点滴--try语句里的尾递归

像Erlang这种函数式编程语言,尾递归的使用对于减少栈开销是很重要的.尽管Erlang并不提倡防御式编程,但仍然提供了 try ... of ... catch ... after ... end 语句.这里有个需要注意的地方:try 语句模块里面的函数调用有可能无法形成尾递归. 下面是一个小实验: 1 -module(test). 2 -compile(export_all). 3 4 -define(LOOP_CNT, 10000000). 5 6 start_loop() -> 7 ti

Erlang点滴--杀死gen_server

前天同事碰到了一个问题:他为游戏写了一个模拟客户端的机器人程序,用的是gen_server行为.但是他启动这些机器人时并没有通过监控树,而是直接在Shell下启动了若干个.然后他就发现如果其中一个机器人进程挂掉的话,所有的机器人都会跟着挂掉. 当他把问题告诉我时我第一反应就是Shell挂掉了,因为所有的机器人都是在Shell下用start_link启动的,也就是说所有的机器人进程都和Shell进程建立了连接.因此如果其中一个机器人挂掉,它会向Shell发送EXIT消息导致Shell挂掉,Shel

某页游erlang服务端广播算法效率好差,应该算是一个bug了吧

偶然得到一份erlang网页服务端的代码 不得不说写的非常优雅,文档也非常不错,但我看到他的场景管理中的广播算法,不得不说写的很漂亮,但效率非常低. 每次都全局遍历全部在线玩家在某个场景的玩具ID,这样的算法,我不知道一个服能撑起多少人,优化一下,场景服务管理在他服务中的玩家,NPC,MON,效率会提升好几倍,只是代码会稍微复杂一点,没他写的优雅了!! %% 获取要广播的范围用户ID get_broadcast_id(Q, X0, Y0) -> AllUser = ets:match(?ETS_

[Erlang危机](5.0)执行时指标

原创文章.转载请注明出处:server非业余研究http://blog.csdn.net/erlib 作者Sunface 联系邮箱:[email protected] Chapter 5 Runtime Metrics 执行时指标(Runtime Metrics) One of the best selling points of the Erlang VM for production use is how transparent it can be for all kinds of intr

java点滴之MulticastSocket的使用

一基本概念 该类恰是上文介绍的DatagramSocket的子类. DatagramSocket仅仅同意数据报发送给指定的目标地址,而MulticastSocket能够将数据报以广播的方式发送到多个client 若要使用多点广播,则须要让一个数据报标有一组目标主机地址,当数据报发出后,整个组的全部全部主机都能收到该数据报.IP多点广播(或多点发送)实现了将单一信息发送到多个接受者的广播,其思想是设置一组特殊网络地址作为多点广播地址,每个多点广播地址都被看做一个组,当client须要发送.接收广播

冲突域 广播域

一.概念理解: 1.冲突域(物理分段): 连接在同一导线上的所有工作站的集合,或者说是同一物理网段上所有节点的集合或以太网上竞争同一带宽的节点集合.这个域代表了冲突在其中发生并传播的区域,这个区域可以被认为是共享段.在OSI模型中,冲突域被看作是第一层的概念,连接同一冲突域的设备有Hub,Reperter或者其他进行简单复制信号的设备.也就是说,用Hub或者Repeater连接的所有节点可以被认为是在同一个冲突域内,它不会划分冲突域.而第二层设备(网桥,交换机)第三层设备(路由器)都可以划分冲突

品茗论道说广播(Broadcast内部机制讲解)(上)

侯 亮 1 概述 我们在编写Android程序时,常常会用到广播(Broadcast)机制.从易用性的角度来说,使用广播是非常简单的.不过,这个不是本文关心的重点,我们希望探索得再深入一点儿.我想,许多人也不想仅仅停留在使用广播的阶段,而是希望了解一些广播机制的内部机理.如果是这样的话,请容我斟一杯红茶,慢慢道来. 简单地说,Android广播机制的主要工作是为了实现一处发生事情,多处得到通知的效果.这种通知工作常常要牵涉跨进程通讯,所以需要由AMS(Activity Manager Servi

Android广播错误.MainActivity$MyReceiver; no empty constructor

广播的定义,如果是内部类,必须为静态类 http://blog.csdn.net/chdjj/article/details/19496567 下面总结一下作为内部类的广播接收者在注册的时候需要注意的地方: 1.清单文件注册广播接收者时,广播接收者的名字格式需要注意.因为是内部类,所以需要在内部类所在的类与内部类之间加上$符号: android:name="com.example.brocastdemo.MainActivity$MyReceiver" 2.内部类在声明时一定要写成静态

品茗论道说广播(Broadcast内部机制讲解)(下)

下面我们来看,递送广播动作中最重要的processNextBroadcast(). 3.2 最重要的processNextBroadcast() 从processNextBroadcast()的代码,我们就可以看清楚前面说的“平行广播”.“有序广播”和“动态receiver”.“静态receiver”之间的关系了. 我们在前文已经说过,所有的静态receiver都是串行处理的,而动态receiver则会按照发广播时指定的方式,进行“并行”或“串行”处理.能够并行处理的广播,其对应的若干recei