使用jetty的continuations实现"服务器推"

在实际的开发中,我们可能会有这样的场景:许多客户端都连接到服务器端,当有某个客户端的消息的时候,服务器端会主动"推"消息给客户端,手机app的推送是一个典型的场景(IOS的推送都是要经过苹果的服务器的,一般是通过苹果的APNS服务来实现,不需要做过多的开发,安卓的推送就需要我们自己来实现了)

我们可选的技术方案实际上是很多的,使用netty这样的异步的网络通信框架或者servlet容器提供的异步的方案都是可以实现的,它们的理念都是一样的,异步和事件驱动,客户端请求服务器,当服务器没有需要推送的数据(或者是需要执行很长时间的IO操作)的时候,请求会被挂起,当服务器端的数据准备好的时候(例如需要向客户端推送一个消息的时候,或者是服务器端IO操作执行完毕了)请求会被重新激活,数据返回客户端.

使用jetty的continuations或者是netty来实现这两种是我觉得比较好的实现方案,今天介绍一下如何使用jetty的continuations来实现一个服务器推的原型,和正式环境中向安卓手机的推送的实现方法是完全一样的

continuations介绍:jetty的continuations是jetty实现的实现异步请求和事件驱动的组件,从jetty7起,continuations不止在jetty中可以使用,任何支持servlet3.0规范的servlet容器都可以使用continuations来实现异步和事件驱动,相比servlet3.0规范中的异步servlet,continuations提供了更加简化的编程模型.

目标:用浏览器请求服务器的一个URL(用浏览器来模拟我们的客户端),实现任何时候当服务器需要推送数据的时候,浏览器能够立即显示出来

我们需要提供两个接口:提供给客户端做长连接的接口,向客户端发送数据的接口

提供给客户端连接的servlet:

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: [email protected]
  * Date: 2016/10/23
  * Time: 23:52
  * To change this template use File | Settings | Editor | File and Code Templates
 */

import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.continuation.ContinuationSupport;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Map;

@WebServlet(urlPatterns = "/pull", asyncSupported = true)
public class ContinuationServlet extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        String user = req.getParameter("user");
        Map<String, PushAgent> pushAgentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");
        if (pushAgentMap.containsKey(user)) {
            PushAgent pushAgent = pushAgentMap.get(user);
            Continuation continuation = ContinuationSupport.getContinuation(req);
            continuation.setTimeout(90000000);
            //第一次请求进来
            if (continuation.isInitial()) {
                resp.setContentType("text/evf;charset=utf-8");
                resp.setHeader("Connection", "keep-alive");
                resp.setHeader("Keep-Alive", "timeout=2000");
                PushAdapter pushAdapter = new PushAdapter(continuation, pushAgent);
                continuation.setAttribute("adapter", pushAdapter);
                continuation.addContinuationListener(new ContinuationListener() {
                    @Override
                    public void onComplete(Continuation continuation) {
                        PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");
                        if (null != adapter) {
                            continuation.setAttribute("adapter", null);
                        }
                    }

                    @Override
                    public void onTimeout(Continuation continuation) {
                        onComplete(continuation);
                    }

                });
                resp.flushBuffer();
            }
            if (continuation.isExpired()) {
                return;
            }
            Writer writer = getWriter(resp);
            PushAdapter adapter = (PushAdapter) continuation.getAttribute("adapter");
            Message message;
            while (true) {
                message = adapter.getPushAgent().pull();
                if (null == message)
                    break;
                try {
                    writer.write(message.getContent());
                    writer.flush();
                    writer.write("\r\n");
                    writer.flush();
                    resp.flushBuffer();
                } catch (Exception e) {
                    throw e;

                }
            }
            //若没有该客户端的消息,则请求被挂起
            continuation.suspend();
        }

    }

    private Writer getWriter(HttpServletResponse response) throws IOException {
        OutputStream os = response.getOutputStream();
        return new OutputStreamWriter(os, "UTF-8");
    }

}

向客户端推送消息的servlet:

package com.jiaoyiping.websample.asyncServlet.jetty;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;

/*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: [email protected]
  * Date: 2016/10/25
  * Time: 23:46
  * To change this template use File | Settings | Editor | File and Code Templates
 */
@WebServlet(urlPatterns = "/send")
public class MesssageSendServlet extends HttpServlet {
    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        //不要在自己实现的servlet中调用 super.doGet(0)或者是super.doPost()
        //因为在tomcat它们的默认实现是报405(HTTP1.1)或者400(其他版本的HTTP)
        //        super.doPost(req, resp);

        String target = req.getParameter("target");
        String messageStr = req.getParameter("message");

        Map<String, PushAgent> agentMap = (Map<String, PushAgent>) req.getServletContext().getAttribute("agentmap");
        if (agentMap.keySet().contains(target)) {
            Message message = new Message();
            message.setTarget(target);
            message.setContent(messageStr);
            if (agentMap.get(target).isInited()) {
                agentMap.get(target).onEvent(message);
            }
            agentMap.get(target).send(message);
            PrintWriter out = resp.getWriter();
            out.print("发送成功");
            out.flush();
            out.close();
        }

    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        this.doPost(req, resp);
    }
}

推送代理:就是可以拿到客户端相关信息,并且维护客户端消息队列的类,在这个推送代理中,我们可以加入一个监听器,当有数据需要推送的时候,激活请求

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: [email protected]
  * Date: 2016/10/25
  * Time: 22:56
  * To change this template use File | Settings | Editor | File and Code Templates
 */

public interface PushAgent {

    Terminal getTerminal();

    String getAddress();

    String getToken();

    Message send(Message message);

    Message pull();

    void addListener(MessageListener messageListener);

    void onEvent(Message message);

    boolean isInited();
}

默认实现(每个需要接受推送的用户对应一个PushAgent,和用户端保持长连接的线程从queue里读取mesage对象,向某个用户推送的时候将message对象放到该用户对应的PushAgent的queue里,这里是一个生产者-消费者模式):

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: [email protected]
  * Date: 2016/10/25
  * Time: 23:17
  * To change this template use File | Settings | Editor | File and Code Templates
 */

import java.util.PriorityQueue;
import java.util.Queue;

public class DefaultPushAgent implements PushAgent {

    private Terminal terminal;
    //客户端通过长连接连接到服务器时,服务器不断地从该队列poll(),若果拿到新的消息,则返回给客户端
    private Queue<Message> messages = new PriorityQueue<>();
    private MessageListener messageListener;

    @Override
    public Terminal getTerminal() {
        return this.terminal;
    }

    @Override
    public String getAddress() {
        return null;
    }

    @Override
    public String getToken() {
        return null;
    }

    @Override
    public Message send(Message message) {
        synchronized (message) {
            messages.add(message);
        }

        return message;
    }

    @Override
    public Message pull() {
        synchronized (messages) {
            return messages.poll();
        }

    }

    @Override
    public void addListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

    @Override
    public void onEvent(Message message) {
        this.messageListener.onMessage(message);
    }

    @Override
    public boolean isInited() {
        return this.messageListener != null;
    }

    public DefaultPushAgent(Terminal terminal) {
        this.terminal = terminal;
    }
}
PushAdapter的实现(用户将Continuation和PushAgent关联起来):
package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: [email protected]
  * Date: 2016/10/25
  * Time: 23:37
  * To change this template use File | Settings | Editor | File and Code Templates
 */

import org.eclipse.jetty.continuation.Continuation;

public class PushAdapter {
    private Continuation continuation;
    private PushAgent pushAgent;

    public PushAdapter(Continuation continuation, PushAgent pushAgent) {
        this.continuation = continuation;
        this.pushAgent = pushAgent;
        this.pushAgent.addListener(message -> {
            if (PushAdapter.this.continuation.isSuspended()) {
                PushAdapter.this.continuation.resume();
            }
        });
    }

    public Continuation getContinuation() {
        return continuation;
    }

    public void setContinuation(Continuation continuation) {
        this.continuation = continuation;
    }

    public PushAgent getPushAgent() {
        return pushAgent;
    }

    public void setPushAgent(PushAgent pushAgent) {
        this.pushAgent = pushAgent;
    }
}

MessageListener的实现(监听需要推送消息的事件,这里为了做演示,并没有实现一个完整的观察者模式,只是在需要推送消息的时候,手工调用 onMessage()):

package com.jiaoyiping.websample.asyncServlet.jetty;
 /*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: [email protected]
  * Date: 2016/10/26
  * Time: 2:03
  * To change this template use File | Settings | Editor | File and Code Templates
 */

public interface MessageListener {
    void onMessage(Message message);
}    

测试数据:使用一个listener在应用初始化的时候,初始化一些数据做为测试数据

package com.jiaoyiping.websample.asyncServlet.jetty;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import java.util.HashMap;
import java.util.Map;

/*
  * Created with Intellij IDEA
  * USER: 焦一平
  * Mail: [email protected]
  * Date: 2016/10/25
  * Time: 23:55
  * To change this template use File | Settings | Editor | File and Code Templates
 */
@WebListener
public class PushListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        Map<String, PushAgent> agentMap = new HashMap<>();
        agentMap.put("zhangsan", new DefaultPushAgent(new Terminal() {{
            setAddress("zhangsan");
            setToken("zhangsan_token");
        }}));
        agentMap.put("lisi", new DefaultPushAgent(new Terminal() {{
            setAddress("lisi");
            setToken("lisi_token");
        }}));

        sce.getServletContext().setAttribute("agentmap",agentMap);
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}

最终的效果是这样的,我截了一个git图:

时间: 2025-01-04 15:03:16

使用jetty的continuations实现"服务器推"的相关文章

HTML5 服务器推送事件(Server-sent Events)实战开发

转自:http://www.ibm.com/developerworks/cn/web/1307_chengfu_serversentevent/ http://www.ibm.com/developerworks/cn/web/wa-lo-comet/     --comet长连接 服务器推送事件(Server-sent Events)是 HTML 5 规范中的一个组成部分,可以用来从服务端实时推送数据到浏览器端.相对于与之类似的 COMET 和 WebSocket 技术来说,服务器推送事件的

服务器推送

WebSocket是HTML5开始提供的一种在单个 TCP 连接上进行全双工通讯的协议.WebSocket通信协议于2011年被IETF定为标准RFC 6455,WebSocketAPI被W3C定为标准. 在WebSocket API中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道.两者之间就直接可以数据互相传送. 背景 现在,很多网站为了实现推送技术,所用的技术都是轮询.轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然

Web端服务器推送技术原理分析

1 背景 "服务器推送技术"(ServerPushing)是最近Web技术中最热门的一个流行术语.它是继"Ajax"之后又一个倍受追捧的Web技术."服务器推送技术"最近的流行跟"Ajax "有着密切的关系. 随着 Ajax技术的兴起,让广大开发人员又一次看到了使用浏览器来替代桌面应用的机会,并且这次机会非常大.Ajax将整个页面的刷新变成页面局部的刷新,并且数据的传送是以异步方式进行,这使得网络延迟带来的视觉差异将会消失.

Web端服务器推送技术原理分析及dwr框架简单的使用

1 背景 “服务器推送技术”(ServerPushing)是最近Web技术中最热门的一个流行术语.它是继“Ajax”之后又一个倍受追捧的Web技术.“服务器推送技术”最近的流行跟“Ajax ”有着密切的关系. 随着 Ajax技术的兴起,让广大开发人员又一次看到了使用浏览器来替代桌面应用的机会,并且这次机会非常大.Ajax将整个页面的刷新变成页面局部的刷新,并且数据的传送是以异步方式进行,这使得网络延迟带来的视觉差异将会消失. 但是,在浏览器中的 Ajax应用中存在一个致命的缺陷无法满足传统桌面系

SSE技术详解:一种全新的HTML5服务器推送事件技术

前言 一般来说,Web端即时通讯技术因受限于浏览器的设计限制,一直以来实现起来并不容易,主流的Web端即时通讯方案大致有4种:传统Ajax短轮询.Comet技术.WebSocket技术.SSE(Server-sent Events).关于这4种技术方式的优缺点,请参考<Web端即时通讯技术盘点:短轮询.Comet.Websocket.SSE>.本文将专门讲解SSE技术. 服务器推送事件(Server-sent Events),简称SSE,是 HTML 5 规范中的一个组成部分,可以用来从服务端

Web端服务器推送技术原理分析及dwr框架简单的使用 转载

1 背景 “服务器推送技术”(ServerPushing)是最近Web技术中最热门的一个流行术语.它是继“Ajax”之后又一个倍受追捧的Web技术.“服务器推送技术”最近的流行跟“Ajax ”有着密切的关系. 随着 Ajax技术的兴起,让广大开发人员又一次看到了使用浏览器来替代桌面应用的机会,并且这次机会非常大.Ajax将整个页面的刷新变成页面局部的刷新,并且数据的传送是以异步方式进行,这使得网络延迟带来的视觉差异将会消失. 但是,在浏览器中的 Ajax应用中存在一个致命的缺陷无法满足传统桌面系

Spring之WebSocket网页聊天以及服务器推送

Spring之WebSocket网页聊天以及服务器推送 转自:http://www.xdemo.org/spring-websocket-comet/ /Springframework /Spring之WebSocket网页聊天以及服务器推送 1. WebSocket protocol 是HTML5一种新的协议.它实现了浏览器与服务器全双工通信(full-duplex). 2. 轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客服端

Comet:基于 HTTP 长连接的“服务器推”技术

“服务器推”技术的应用 传统模式的 Web 系统以客户端发出请求.服务器端响应的方式工作.这种方式并不能满足很多现实应用的需求,譬如: 监控系统:后台硬件热插拔.LED.温度.电压发生变化: 即时通信系统:其它用户登录.发送信息: 即时报价系统:后台数据库内容发生变化: 这些应用都需要服务器能实时地将更新的信息传送到客户端,而无须客户端发出请求.“服务器推”技术在现实应用中有一些解决方案,本文将这些解决方案分为两类:一类需要在浏览器端安装插件,基于套接口传送信息,或是使用 RMI.CORBA 进

【转】Comet:基于 HTTP 长连接的“服务器推”技术

原文链接:http://www.ibm.com/developerworks/cn/web/wa-lo-comet/ 很多应用譬如监控.即时通信.即时报价系统都需要将后台发生的变化实时传送到客户端而无须客户端不停地刷新.发送请求.本文首先介绍.比较了常用的“服务器推”方案,着重介绍了 Comet - 使用 HTTP 长连接.无须浏览器安装插件的两种“服务器推”方案:基于 AJAX 的长轮询方式:基于 iframe 及 htmlfile 的流方式.最后分析了开发 Comet 应用需要注意的一些问题