ejabberd中的http反向推送

http的反向推送通常使用"长轮询"或"长连接"的方式。 所谓"长轮询"是指客户端发送请求给服务器,服务器发现没有数据需要发送给客户端。

http的反向推送通常使用"长轮询"或"长连接"的方式。
所谓"长轮询"是指客户端发送请求给服务器,服务器发现没有数据需要发送给客户端于是hold住不及时返回,等有数据需要发送给客户端时,进行回复,然后关闭连接,客户端收到回复后再发送新的http请求,以便服务器能有对应的请求用于消息的反向推送。
而"长连接"是在长轮询的基础上增加"keep-alive"属性,服务器收到请求后不直接回复,等有数据需要发送给客户端时再进行response,但是并不关闭连接,这样客户端收到服务器的response后在同一连接上再次发送http请求。


在ejabberd的实现中,采用了bosh技术来完成对应的工作,具体定义可参考:

英文: http://go.rritw.com/xmpp.org/extensions/xep-0124.html

中文: http://go.rritw.com/wiki.jabbercn.org/XEP-0124

大概实现原理:ejabberd收到一个客户端http请求后会为该客户端最终创建三个进程:ejabberd_http, ejabberd_http_bind, ejabberd_c2s。

ejabberd_http进程不断的从对应的socket上收客户端的请求,并转发交给对应的ejabberd_http_bind进程进行处理,然后同步等待处理结果,并将结果返回给客户端。

init() ->
    ...
    receive_headers(State).

receive_headers(#state{trail=Trail} = State) ->
    SockMod = State#state.sockmod,
    Socket = State#state.socket,
    Data = SockMod:recv(Socket, 0, 300000),
    case State#state.sockmod of
        gen_tcp ->
            NewState = process_header(State, Data),
            case NewState#state.end_of_request of
                true -> ok;
                _ -> receive_headers(NewState)
            end;
        _ ->
            case Data of
                {ok, D} ->
                    parse_headers(State#state{trail = <<Trail/binary, D/binary>>});
                {error, _} ->
                    ok
            end
    end.

process_header(State, Data) ->
    case Data of
        ...
        {ok, http_eoh} ->
            ...
            Out = process_request(State2),
            send_text(State2, Out),
            case State2#state.request_keepalive of
                true ->
                    ...
                    #state{sockmod = SockMod,
                           socket = Socket,
                           request_handlers = State#state.request_handlers};
                _ ->
                   #state{end_of_request = true,
                          request_handlers = State#state.request_handlers}

从代码中可看出,未设置keep-alive属性的时候,该进程处理完一次http请求后便自己结束(长轮询模式)。设置了keep-alive属性的时候,该进程不断的循环接收http请求,并转发接收与响应(长连接模式)。

ejabberd_http_bind进程负责hold住http请求,对于正常的客户端请求,ejabberd_http_bind进程会将请求转发给对应的ejabberd_c2s进程进行实际业务的处理,而对于空的请求(便于服务器反向推送数据),ejabberd_http_bind设置定时器,等待ejabberd_c2s进程对实际请求的响应或者是需要推送给客户端的消息。

handle_sync_event({send_xml,Packet},_From,StateName,
                  #state{http_receiver = undefined} = StateData) ->
    Output = [Packet | StateData#state.output],
    Reply = ok,
    {reply, Reply, StateName, StateData#state{output = Output}};

handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
    Output = [Packet | StateData#state.output],
    cancel_timer(StateData#state.timer),
    Timer = set_inactivity_timer(StateData#state.pause,
                                 StateData#state.max_inactivity),
    HTTPReply = {ok, Output},
    gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
    cancel_timer(StateData#state.wait_timer),
    Rid = StateData#state.rid,
    ReqList = [#hbr{rid = Rid,key = StateData#state.key,
                    out = Output
                   } |
               [El || El <- StateData#state.req_list,
                      El#hbr.rid /= Rid ]],
    Reply = ok,
    {reply, Reply, StateName,
     StateData#state{output = [],
                     http_receiver = undefined,
                     req_list = ReqList,
                     wait_timer = undefined,
                     timer = Timer}};

handle_sync_event({http_get,Rid,Wait,Hold},From,StateName,
                  StateData) ->
    %% setup timer
    send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
    cancel_timer(StateData#state.wait_timer),
    TNow = tnow(),
    if
        (Hold > 0) and
        (StateData#state.output == []) and
        ((TNow -StateData#state.ctime<(Wait*1000*1000)) and
        (StateData#state.rid == Rid) and
        (StateData#state.input /= cancel) and
        (StateData#state.pause == 0) ->
            WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
            %% MR: Not sure we should cancel the state timer here.
            cancel_timer(StateData#state.timer),
                         {next_state,StateName,
                          StateData#state{http_receiver = From,
                                          wait_timer = WaitTimer,
                                          timer = undefined}};
        (StateData#state.input == cancel) ->
            cancel_timer(StateData#state.timer),
            Timer = set_inactivity_timer(StateData#state.pause,
                                    StateData#state.max_inactivity),
            Reply = {ok, cancel},
            {reply, Reply, StateName,
             StateData#state{input = queue:new(),
                             http_receiver = undefined,
                             wait_timer = undefined,
                             timer = Timer}};
        true ->
            cancel_timer(StateData#state.timer),
            Timer = set_inactivity_timer(StateData#state.pause,
                                     StateData#state.max_inactivity),
            Reply = {ok, StateData#state.output},
            %% save request
            ReqList = [#hbr{rid = Rid,
                            key = StateData#state.key,
                            out = StateData#state.output
                           } |
                       [El || El <- StateData#state.req_list,
                             El#hbr.rid /= Rid]
                      ],
            {reply, Reply, StateName,
             StateData#state{output = [],
                             http_receiver = undefined,
                             wait_timer = undefined,
                             timer = Timer,
                             req_list = ReqList}}
    end;

handle_info({timeout, WaitTimer, _}, StateName,
             #state{wait_timer = WaitTimer} = StateData) ->
    if
        StateData#state.http_receiver /= undefined ->
            cancel_timer(StateData#state.timer),
            Timer = set_inactivity_timer(StateData#state.pause,
                                         StateData#state.max_inactivity),
            gen_fsm:reply(StateData#state.http_receiver, {ok, empty}),
            Rid = StateData#state.rid,
            ReqList = [#hbr{rid = Rid,
                            key = StateData#state.key,
                            out = []
                           } |
                       [El || El <- StateData#state.req_list,
                              El#hbr.rid /= Rid ]
                       ],
            {next_state, StateName,
             StateData#state{http_receiver = undefined,
                             req_list = ReqList,
                             wait_timer = undefined,
                             timer = Timer}};
        true ->
            {next_state, StateName, StateData}
    end;

ejabberd_http进程最终会调用ejabberd_http_bind的http_get方法获取请求的响应结果或者是需要推送的数据,ejabberd_http_bind进程收到请求后会进行相应处理,比如有数据则直接回复,或者设置定时器。当收到ejabberd_c2s进程推送过来的数据时,停止定时器并将数据立即回复给ejabberd_http进程,如果定时器超时则回复一个空的消息。
=============================
ejabberd_c2s为客户端对应的会话进程,负责维护客户端的在线状态,联系人列表,请求处理等等。
============
服务器在创建ejabberd_http_bind进程时,会生成一个唯一的sid,用于标识该进程,该sid与ejabberd_http_bind进程pid的对应关系会存储到mnesia中,同时服务器会将sid也会告诉客户端,客户端后续的请求也都需要带上该sid。http_bind收到请求后根据sid从mnesia查找匹配的ejabberd_http_bind进程。

process_request(Data, IP) ->
    ...
    case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
        %% No existing session:
        {ok, {"", Rid, Attrs, Payload}} ->
            ...
            Sid = sha:sha(term_to_binary({now(), make_ref()})),
            case start(XmppDomain, Sid, "", IP) of
                {ok, Pid} ->
                    handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,Payload, PayloadSize, IP)
                ...
             end;
        {ok, {Sid, Rid, Attrs, Payload1}} ->
            ...
            handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
                            StreamStart, IP);

handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
                     Payload, PayloadSize, IP) ->
    ...
    mnesia:dirty_write(#http_bind{id = Sid,
                                  pid = Pid,
                                  to = {XmppDomain,XmppVersion},
                                  hold = Hold,
                                  wait = Wait,
                                  process_delay = Pdelay,
                                  version = Version}),
    handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).

http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
    case mnesia:dirty_read({http_bind, Sid}) of
        [] ->
            {error, not_exists};
        [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] ->
            ...
            {gen_fsm:sync_send_all_state_event(FsmRef,
                #http_put{rid = Rid, attrs = Attrs,
                          payload = Payload,
                          payload_size = PayloadSize, hold = Hold,
                          stream = NewStream, ip = IP}, 30000), Sess}
    end.
ejabberd_http_bind与ejabberd_c2s会互相记住对方的进程pid,这样每个客户端都有自己唯一的ejabberd_http_bind与ejabberd_c2s进程进行相应的请求处理。【转自】  http://fanli7.net/a/bianchengyuyan/C__/20140213/469448.html
时间: 2024-11-08 23:39:02

ejabberd中的http反向推送的相关文章

在 Asp.NET MVC 中使用 SignalR 实现推送功能 [转]

在 Asp.NET MVC 中使用 SignalR 实现推送功能 罗朝辉 ( http://blog.csdn.net/kesalin ) CC许可,转载请注明出处 一,简介 Signal 是微软支持的一个运行在 Dot NET 平台上的 html websocket 框架.它出现的主要目的是实现服务器主动推送(Push)消息到客户端页面,这样客户端就不必重新发送请求或使用轮询技术来获取消息. 可访问其官方网站:https://github.com/SignalR/ 获取更多资讯. 二,实现机制

Asp.NET MVC 中使用 SignalR 实现推送功能

一,简介 Signal 是微软支持的一个运行在 Dot NET 平台上的 html websocket 框架.它出现的主要目的是实现服务器主动推送(Push)消息到客户端页面,这样客户端就不必重新发送请求或使用轮询技术来获取消息. 可访问其官方网站:https://github.com/SignalR/ 获取更多资讯. 二.Asp.net SignalR 是个什么东东 Asp.net SignalR是微软为实现实时通信的一个类库.一般情况下,SignalR会使用JavaScript的长轮询(lo

在 Asp.NET MVC 中使用 SignalR 实现推送功能

一,简介 Signal 是微软支持的一个运行在 Dot NET 平台上的 html websocket 框架.它出现的主要目的是实现服务器主动推送(Push)消息到客户端页面,这样客户端就不必重新发送请求或使用轮询技术来获取消息. 二,实现机制 SignalR 的实现机制与 .NET WCF 或 Remoting 是相似的,都是使用远程代理来实现.在具体使用上,有两种不同目的的接口:PersistentConnection 和 Hubs,其中 PersistentConnection 是实现了长

asp.net 中长尾链接实现推送 -- comet

一般需求推送服务时,都会去第三方拿推送组件,如”极光“,”百度“,”小米"什么的,自己用.net实现推送服务端需要面对很多问题,比如C10K,但是企业内部使用往往用不了10K的链接,有个1K,2K就足够,这个时候完全可以自己实现一个推送服务,这样手机应用就不用走外网了. 使用.net实现推送服务有几个选择 1.是使用WCF 基于TCP的回调-针对.net To .net 端,经过7*24小时测试,2K左右的链接能稳定hold住,参考:http://www.cnblogs.com/wdfrog/p

django中实现微信消息推送

-公众号(不能主动给用户发消息) -认证的公众号:需要营业执照,需要交钱,可以发多篇文章 -未认证的公众号:一天只能发一篇文章 -服务号(微信推送) -需要申请,需要认证 -可以主动给用户推送消息 -能给推送的人,必须关注我的服务号 -沙箱环境 -企业号 -企业里用的: -你们所见的二维码:其实就是一个url地址 -咱们在前端通过url(https://open.weixin.qq.com/connect/oauth2.....)生成一个二维码 -注意*****修改:网页授权获取用户基本信息 一

Ejabberd作为推送服务的优化手段(转)

AVOS Cloud目前还在用Ejabberd做Android的消息推送服务.当时选择Ejabberd,是因为Ejabberd是一个发展很长时间的XMPP实现,并且基于Erlang,设想能在我们自主研发的Push Server起来之间顶上一段时间. 我们自主研发的Push Server预计本月中旬就上线了.但是Ejabberd却先顶不住了.Ejabberd做推送,本身就有劣势,比如XMPP协议的冗余,XMPP协议本来就是IM协议,对推送这个简单的场景还是太复杂了一些.Ejabberd Clust

iOS开发中的远程推送实现(最新,支持iOS9)

我的个人项目<丁丁印记>中加入了远程推送功能,按照操作说明去做还是比较容易实现的,但是学的不够不系统,因此这篇文章希望总结一下最新的iOS推送功能,因为iOS8之后的推送和致之前的版本是有所不同的,也希望想能帮助到需要的朋友.这篇文章将从零开始,向大家介绍远程推送功能的原理和使用. 什么是远程推送通知 顾名思义,就是从远程服务器推送给客户端的通知(需要联网)远程推送服务,又称为APNs(Apple Push Notification Services). 为什么程序中需要远程推送功能 1.传统

在Android应用程序中实现推送通知

几乎每一个应用程序的一个重要特性是支持推送通知的能力.使用推送通知,您可以更新用户,而不需要应用程序在任何时候运行或轮询服务器, 避免潜在的电池电量不足. 随着火力点云信息的介绍(FCM),谷歌使得在Android应用程序中实现推送通知变得容易了一点.FCM是谷歌云消息(GCM)的新版本和改进版本,您可以使用它将远程通知发送到客户机应用程序.对于将瞄准多个平台或需要利用先进的推操作(如分段推送)的应用程序,我们可以使用带有Azure通知集线器的FCM. 与GCM不同,FCM负责为您提供基本的消息

Cygwin中使用git时无法远程推送(出现DLL文件不兼容)

Cygwin中使用Git远程推送出现DLL文件不兼容 之前在Window和Linux时使用git远程推送都没有什么问题,今天在Win7中试了下Cygwin的git push是却出现如下提示: 说是不兼容的cygwin DLL文件引起段错误. 网上试了好几个办法都不行,最后突然想到GitHub分布式主要是通过公钥和私钥的原理来实现的. 因为在Win7上已经安装了Git, 也就是说私钥已经有了.于是直接把Win7上.ssh下面的文件拷贝到Cywgin home路径下的.ssh路径,发现居然可以了.