Erlang中频繁发送远程消息要注意的问题

http://avindev.iteye.com/blog/76373

注:这篇文章可能会有争议,欢迎提出意见

在Erlang中,如果要实现两个远程节点之间的通信,就需要通过网络来实现,对于消息发送,是使用TCP。如果要在两个节点间频繁发送消息,比如每秒几百上千条,那样就要注意了。

无论是网游服务器开发的书籍,或是经验老道的工程师,都会告诉你,在发送数据包时,尽可能把小的消息组合为一个比较大的包来发送,毕竟一个TCP包的头也很大,首先是浪费带宽,其次调用底层发送的指令也是有开销的。有工程师告诉我,一般每秒大概是2W次左右。

简单测试一下,先是代码

一个接收消息并马上抛弃的Server:

Java代码  

  1. start() ->
  2. register(nullserver, self()),
  3. loop().
  4. loop() ->
  5. receive
  6. Any ->
  7. loop() %drop message and loop
  8. end.

一个在循环中向它发送消息的Client:

Java代码  

  1. start() ->
  2. start_send(100).
  3. start_send(0) ->
  4. ok;
  5. start_send(N) ->
  6. {nullserver, ‘[email protected]‘} ! hi,
  7. start_send(N-1).

然后打开截包工具,运行server和client,截取到接近200个包的发送和接收记录,其中,大部分是这样的数据:

引用

00 14 78 B9 14 BC 00 11-11 9F 91 1A 08 00 45 00 
00 45 EE 77 40 00 80 06-80 E4 C0 A8 00 CC DB E8 
ED F9 13 58 C1 C6 AA 4E-59 F2 38 CF 22 2D 50 18 
FF 19 B9 EE 00 00 00 00-00 19 70 83 68 04 61 06 
67 43 CC 00 00 00 01 00-00 00 00 02 43 05 43 BD 
83 43 BF

引用

00 14 78 B9 14 BC 00 11-11 9F 91 1A 08 00 45 00 
00 45 EE 78 40 00 80 06-80 E3 C0 A8 00 CC DB E8 
ED F9 13 58 C1 C6 AA 4E-5A 0F 38 CF 22 2D 50 18 
FF 19 B9 D1 00 00 00 00-00 19 70 83 68 04 61 06 
67 43 CC 00 00 00 01 00-00 00 00 02 43 05 43 BD 
83 43 BF

实际上,只有从 00 00-00 19 这里开始,才是TCP包的内容,前面都是底层协议的数据,就是这样的数据包发送了100次,浪费是巨大的。而且,在消息发送后,还收到同样数目类似

引用

00 11 11 9F 91 1A 00 14-78 B9 14 BC 08 00 45 00 
00 28 8C FC 40 00 32 06-30 7D DB E8 ED F9 C0 A8 
00 CC C1 C6 13 58 38 CF-22 2D AA 4E 59 F2 50 10 
19 20 D7 01 00 00 00 00-00 00 00 00

这样的响应包,也浪费着带宽。

从目前我所阅读过的文档来看,暂时没有有关如何缓存这些消息定期一并发送的参数设置。那么有什么解决办法,我自己有两种。

一种是将要发送的一批Message打包到一个list发送,接收方从list中取出所有message并处理。

另一种是通过一个Proxy,发送方不通过 {Name, Node} ! Message 这种方式来发送,而是通过一个本地的Proxy Process,代理会将所有发送到某个节点的消息累积起来,定时批量发送过去;接收方也有一个Listening Process,它接收批量的Message,遍历后发送给本地的相应进程。

这里是我初步写出来的实现,不太漂亮,仅供参考~

message_agent.erl: 实现消息的批量发送,接收和转发

Java代码  

  1. -module(message_agent).
  2. -export([listen/0, proxy/2, block_exit/1]).
  3. -export([loop_receive/0]).
  4. -define(MAX_BATCH_MESSAGE_SIZE, 50).
  5. listen() ->
  6. io:format("Message agent server start listen~n"),
  7. spawn(fun() -> register(‘MsgServerAgent‘, self()), loop_receive() end),
  8. ok.
  9. loop_receive() ->
  10. receive
  11. {forward_message, PName, Messages} ->
  12. forward_messages(PName, Messages),
  13. loop_receive();
  14. Any ->
  15. message_agent:loop_receive()
  16. end.
  17. forward_messages(PName, []) ->
  18. ok;
  19. forward_messages(PName, [H|T]) ->
  20. %io:format("Forward message ~w to process ~w~n", [H, PName]),
  21. catch PName ! H,
  22. forward_messages(PName, T).
  23. proxy(Node, PName) ->
  24. spawn_link(fun() -> handle_message_forward(Node, PName, []) end).
  25. block_exit(Agent) ->
  26. Agent ! {block_wait, self()},
  27. receive
  28. {unblock} ->
  29. ok
  30. end.
  31. handle_message_forward(Node, PName, Messages) ->
  32. receive
  33. {block_wait, Pid} ->
  34. catch send_batch(Node, PName, lists:reverse(Messages)),
  35. Pid ! {unblock};
  36. Any ->
  37. NewMessages = [Any|Messages],
  38. case length(NewMessages)>=?MAX_BATCH_MESSAGE_SIZE of
  39. true ->
  40. send_batch(Node, PName, lists:reverse(NewMessages)),
  41. handle_message_forward(Node, PName, []);
  42. false ->
  43. handle_message_forward(Node, PName, NewMessages)
  44. end
  45. after
  46. 0 ->
  47. case length(Messages)>0 of
  48. true ->
  49. catch send_batch(Node, PName, lists:reverse(Messages));
  50. false ->
  51. ok
  52. end,
  53. handle_message_forward(Node, PName, [])
  54. end.
  55. send_batch(Node, PName, Messages) ->
  56. %io:format("Send batch message, size ~p~n", [length(Messages)]),
  57. {‘MsgServerAgent‘, Node} ! {forward_message, PName, Messages}.

使用方式很简单,在接收Message的一端调用 message_agent:listen() 启动监听代理,客户端使用 register(agent, message_agent:proxy(?NODE, ‘MsgServer‘)) 的方式启动代理进程,消息发送给这个代理进程就可以了。下面是我写的简单例子:

Java代码  

  1. -module(message_server).
  2. -export([start/0]).
  3. -define(TIMEOUT_MS, 1000).
  4. start() ->
  5. io:format("Message server start~n"),
  6. register(‘MsgServer‘, self()),
  7. message_agent:listen(),
  8. loop_receive(0).
  9. loop_receive(Count) ->
  10. receive
  11. Any ->
  12. %io:format("Receive msg ~w~n", [Any]),
  13. loop_receive(Count+1)
  14. after
  15. ?TIMEOUT_MS ->
  16. if
  17. Count>0 ->
  18. io:format("Previous receive msg count: ~p~n", [Count]),
  19. loop_receive(0);
  20. true ->
  21. loop_receive(0)
  22. end
  23. end.

Java代码  

  1. -module(message_client).
  2. -define(NODE, ‘[email protected]‘).
  3. -define(COUNT, 20000).
  4. -export([start/0]).
  5. start() ->
  6. statistics(wall_clock),
  7. register(agent, message_agent:proxy(?NODE, ‘MsgServer‘)),
  8. send_loop(?COUNT).
  9. send_loop(0) ->
  10. message_agent:block_exit(agent),
  11. {_, Interval} = statistics(wall_clock),
  12. io:format("Finished ~p sends in ~p ms, exiting...~n", [?COUNT, Interval]);
  13. send_loop(Count) ->
  14. agent ! {self(), lalala},
  15. send_loop(Count-1).

这里要注意的是,消息发送端和接收端都是由一个单独的进程来处理消息。在Erlang的默认堆实现,是私有堆,本地进程间的消息发送是需要拷贝的,在数据量大的时候,该进程堆的垃圾回收会相当频繁。

时间: 2024-10-12 16:21:22

Erlang中频繁发送远程消息要注意的问题的相关文章

解决c#所有单线程单元(STA)线程都应使用泵式等待基元(如 CoWaitForMultipleHandles),并在运行时间很长的操作过程中定期发送消息。 转载

最近做一个后来程序,启动了事务后有一段操作业务,当运行一段时间后,出现这个异常 CLR 无法从 COM 上下文 0x1b1c38 转换为 COM 上下文 0x1b1da8,这种状态已持续 60 秒.拥有目标上下文/单元的线程很有可能执行的是非泵式等待或者在不发送 Windows 消息的情况下处理一个运行时间非常长的操作.这种情况通常会影响到性能,甚至可能导致应用程序不响应或者使用的内存随时间不断累积.要避免此问题,所有单线程单元(STA)线程都应使用泵式等待基元(如 CoWaitForMulti

iOS远程消息推送

iOS 推送基础知识 Apple 使用公共密钥数字证书对来自 iOS 应用程序的推送请求进行身份验证,所以您首先需要创建身份验证密钥,并向 Apple 注册它们.我将在下一节中花相当长的篇幅来直接介绍这一点. 接下来,需要确定安装该应用程序并选择接收该应用程序的推送通知的每台设备.工作顺序如下: iOS 应用程序中的一个警告对话框会请求用户的许可,以接收推送通知. 如果用户授予权限,iOS 应用程序会联系 Apple Push Notification 服务 (APNs) 获得一个 ID 字符串

Html5中的跨页面消息传输

1.如果要接受从其他的窗口那里发过来的消息,就必须对窗口对象的message事件进行监控. window.addEventListener("message",function(){},false); 使用window对象的postMessage方法向其他窗口发送消息, otherwindow.postMessage(message,targetOrigin); 参数:message为所发送的消息文本,但也可以是任何javascript对象; 第二个参数为接受消息的对象窗口的URL地址

ZeroMQ接口函数之 :zmq_msg_send – 从一个socket发送一个消息帧

ZeroMQ 官方地址 :http://api.zeromq.org/4-0:zmq_msg_send zmq_msg_send(3) ØMQ Manual - ØMQ/3.2.5 Name zmq_msg_send – 从一个socket发送一个消息帧 Synopsis int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags); Description zmq_msg_send函数和zmq_sendmsg(3)函数是完全相同的,只是z

如何解决群聊(MUC)聊天室重复存储、接收自己发送的消息的问题

CHENYILONG Blog 如何#解决方案#群聊(MUC)聊天室重复存储.接收自己发送的消息 编号 项目 描述 1 问题描述 单聊没问题,群聊会出现自动回复的问题 数据库中存储的数据出现的问题 界面上出现的问题:类似自动回复.回音壁一样一模一样地回答.  2 问题产生的原因 3 群聊基本的原理示意图 聊天内容的显示是经由从数据库进行的读取排序, 4 #解决方案# 拦截阻挡红色区域的执行  5 失败的尝试:尝试但是没有效果的方法 // AppDelegate.m中#pragma 接收消息代理监

微信发送模板消息代码示例

最近一个微信的项目里需要发送微信模板消息给卖家或者供应商等,微信开发其实也就按照微信的官方接口要求组装起来即可,下面简单介绍一下我的微信模板发送代码. 1.获取access token,至于access token是什么,大家可以自行微信接口文档看一下,这边不多说 获取access token我这边主要是用定时器没大概2分钟获取一次,每天获取的次数是100000次,用法如下: 1 #region 2 3 using System; 4 using System.Timers; 5 6 #endr

脱离微信客户端发送微信消息(二)

Python版本:使用微信API发送微信消息 本文代码借用朋友编写的成品代码,使用Python3编写,配合上一篇文章:<脱离微信客户端发送微信消息(一)>经过试验完全可以发送微信消息. 文件:BaseData.py Python3代码: 1 # -*- coding: utf-8 -*- 2 3 corpid="XXXXXXX" # 设置-权限设置-部门-查看CorpID 4 corpsecret="YYYYYYYYYY" # 设置-权限设置-部门-查看

C#开发微信.NET平台MVC微信开发 发送普通消息Demo以及收不到消息的问题

不得不说现在微信非常火,微信开放平台可以自己写程序跟用户交互,节省了前台开发成本,免去用户装客户端的烦恼.于是今天兴致来潮,想做一个试试. 首先找到了开发者文档,看了看,蛮简单的.(公众号早已申请,有兴趣可以关注看看:zyjsoft) 第一步(提供接口,供微信调用,由于是HTTP请求,于是我用MVC模式做了一个简单的接口): //认证接口 public ActionResult WeiXin(string signature, string timestamp, string nonce, st

iOS开发——远程消息推送的实现

在我们使用App的过程中.总是会收到非常多的消息推送.今天我们就要来实现这个功能.首先消息推送分为本地消息推送和远程消息推送.而当中又以远程消息最为经常使用. 可是在推送远程消息之前.有两个前提条件.你须要购买苹果的开发人员账号,也就是每年99刀:而且有一台iOS真机(模拟器不能測试推送).事实上远程推送须要有server,可是我们自己临时没有server,到时候后台使用Parse进行推送. 实现过程例如以下: (1)进入苹果的开发人员站点:https://developer.apple.com