【pushlet学习】具体实战

业务需求


1. 前端界面需要实时显示空调、照明等设备的状态, 如:空调电压、空调电流、光照强度等,这些量每一个称作一个测点;

2. 不同的用户登录系统后,用户只能看到自己设备的运行状态,而看不到其他人设备的运行状态;

3. 由于每个用户的设备类型、种类、个数等都不相同,因此每个用户需要查询测点也不相同;

4. 当多个用户同时登陆系统时,其实就是在多个浏览器上打开多个浏览界面,去查看自己设备运行状态,

即:多个浏览器上的多个界面对后台请求的测点是不同的,例如:

用户1:<测点1,测点2,测点3,测点4,.....>;

用户2:<测点21,测点22,测点23,测点24,.....>;

用户3:<测点31,测点32,测点33,测点34,.....>;

用户n:<测点n1,测点n2,测点n3,测点n4,.....>;


功能需求:


采用传统的“请求/响应”方式,很难达到前台界面实时显示最新数据,为达到实时显示最新数据,我们采用一种“服务器推”的技术comet,而pushlet是“服务器推”技术的一种实现,这里我们采用pushlet技术来实现上述业务需求。

1. 假设后台的测点数据是实时变化的,且保存在HashMap中,即:HashMap<测点名,测点值>;当测点值发生变化时,后台就会实时更新Hashmap中对应的测点值,即Hashmap中保存的始终是实时数据;

2. 每隔固定的分钟数(如:5分钟),后台就会向前台推送最新数据,前台界面实时更新显示;

具体实现:


1. 前台界面打开时,会将测点名称集合以及主题名传递到后台,格式形如:{[测点1,测点2,测点3,....],subject};并在前台开启对此主题的监听;(注:主题名是动态随机生成的,每个界面的主题名都保证不相同)

2. 后台解析从前台传递来的{测点名称集合+主题名},并根据主题名创建响应的事件源(EventPollSource);需要说明的是:每个事件源(主题名)对应一个Thread,即前端打开m个界面,后台就会有m个对应的Thread,每个Thread处理一个事件源。

3. 当前端浏览器界面关闭时,后台能检测到关闭的界面对应的事件源(Thread),并将对应的Thread释放掉。


示例程序



环境如下:


TestServlet.java


主要功能:

1. 获取前台传递过来的测点名称数组ArrayList<Object> keyList和主题名称aSubject,

2. 根据测点名称数组主题名称开启一个新Thread,在Thread中处理业务;

PushThread pushThread = new PushThread(aSubject, keyList);

3. 每个session(或界面)对应一个Thread;


  1. package com.guoguo;
  2. import java.io.BufferedInputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.util.ArrayList;
  6. import javax.servlet.ServletException;
  7. import javax.servlet.http.HttpServlet;
  8. import javax.servlet.http.HttpServletRequest;
  9. import javax.servlet.http.HttpServletResponse;
  10. import org.stringtree.json.JSONReader;
  11. import org.stringtree.json.JSONValidatingReader;
  12. public class TestServlet extends HttpServlet {
  13. private static final long serialVersionUID = 1L;
  14. public TestServlet() {
  15. super();
  16. }
  17. /**
  18. * get/post方法的处理函数
  19. */
  20. protected void service(HttpServletRequest request,
  21. HttpServletResponse response) throws ServletException, IOException {
  22. // 读取请求报文数据
  23. request.setCharacterEncoding("UTF-8");
  24. // 获取请求的数据
  25. StringBuffer reqData = new StringBuffer();
  26. InputStream in = request.getInputStream();
  27. BufferedInputStream buf = new BufferedInputStream(in);
  28. byte[] buffer = new byte[1024];
  29. int iRead;
  30. while ((iRead = buf.read(buffer)) != -1) {
  31. reqData.append(new String(buffer, 0, iRead, "UTF-8"));
  32. }
  33. // 获取请求的测点名称数组
  34. JSONReader r = new JSONValidatingReader();
  35. @SuppressWarnings("unchecked")
  36. ArrayList<Object> keyList = (ArrayList<Object>) r.read(reqData
  37. .toString());
  38. // 获取订阅主题名称
  39. String aSubject = request.getParameter("subject");
  40. System.out.println("请求的测点:" + reqData.toString() + " , 主题名:" + aSubject);
  41. // 启动一个线程,实现创建Pushlet事件、做业务、向前台推送数据等功能
  42. PushThread pushThread = new PushThread(aSubject, keyList);
  43. pushThread.start();
  44. }
  45. }


PushThread.java



主要功能:

1. 这是一个Thread类,该类有两个属性:主题String aSubject和关键字列表 ArrayList<Object> keyList;

2. 线程首先根据主题名创建事件源:Event event = Event.createDataEvent(aSubject);

3. 线程运行的时候,会监测会话状态以及事件订阅情况,并基于此判断线程是否需要结束。

若session关闭或浏览器关闭,则线程退出;

若有会话订阅该事件源,线程则进行业务处理,处理过程如下:

  1. 获取各测点的值;
  2. 将各测点的值组装成字符串;
  3. 将该字符串设置为事件源的属性。

4.然后以广播的形式将事件发送出去,Dispatcher.getInstance().multicast(event);

5. 经测试:Thread的run()函数执行结束后,Thread就自动退出了,系统会释放该Thread的资源。


  1. package com.guoguo;
  2. import java.text.SimpleDateFormat;
  3. import java.util.ArrayList;
  4. import java.util.Date;
  5. import java.util.HashMap;
  6. import java.util.Random;
  7. import org.stringtree.json.JSONValidatingWriter;
  8. import nl.justobjects.pushlet.core.Dispatcher;
  9. import nl.justobjects.pushlet.core.Event;
  10. import nl.justobjects.pushlet.core.Session;
  11. import nl.justobjects.pushlet.core.SessionManager;
  12. public class PushThread extends Thread {
  13. // 主题
  14. public String aSubject; // 客户端传递过来
  15. // 关键字列表
  16. public ArrayList<Object> keyList; // 客户端传递过来
  17. /**
  18. * 构造函数
  19. *
  20. * @param aSubject
  21. * @param keyList
  22. */
  23. public PushThread(String aSubject, ArrayList<Object> keyList) {
  24. this.aSubject = aSubject;
  25. this.keyList = keyList;
  26. }
  27. @Override
  28. public void run() {
  29. Event event = Event.createDataEvent(aSubject);
  30. int i = 0;
  31. while (true) {
  32. try {
  33. Thread.sleep(5000);
  34. } catch (InterruptedException e) {
  35. // 线程阻塞,结束线程
  36. System.out.println("=========>sleep异常 --->" + "线程"
  37. + Thread.currentThread().getId() + "关闭");
  38. break;
  39. }
  40. System.out.println("\n-----Thread ID: "
  41. + Thread.currentThread().getId());
  42. // 判断当前连接的会话个数,没有会话,则线程退出
  43. Session[] sessions = SessionManager.getInstance().getSessions();
  44. // 当前无会话,结束线程
  45. if (0 == sessions.length) {
  46. System.out.println("=========>无sessions --->" + "线程"
  47. + Thread.currentThread().getId() + "关闭");
  48. break;
  49. }
  50. // 判断当前会话中是否存在订阅该主题的订阅者,不存在则结束线程
  51. boolean if_exist_subscriber = true;
  52. // 遍历所有session
  53. for (int j = 0; j < sessions.length; j++) {
  54. System.out
  55. .println(sessions[j].getSubscriber().match(event) == null ? "Session"
  56. + j + ": 未订阅该事件 "
  57. : "Session" + j + ":订阅了该事件 ");
  58. if (null != sessions[j].getSubscriber().match(event)) {
  59. if_exist_subscriber = false;
  60. }
  61. }
  62. if (if_exist_subscriber) {
  63. System.out.println("=========>无"+aSubject+"订阅者 --->" + "线程" + Thread.currentThread().getId() + "关闭");
  64. break;
  65. }
  66. // 模拟业务处理:获取各测点的值
  67. HashMap<Object, Object> ret_value = new HashMap<Object, Object>();
  68. for (Object keyStr : keyList) {
  69. SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");// 设置日期格式
  70. String currTm = df.format(new Date());
  71. ret_value.put(keyStr, currTm);
  72. // ret_value.put(keyStr, (10*(new Random().nextFloat())));
  73. }
  74. // 将返回值封装为json数据形式
  75. String ret_string = "[";
  76. ret_string += new JSONValidatingWriter().write(ret_value);
  77. ret_string += "]";
  78. event.setField("key1", ret_string);
  79. // 推送消息
  80. Dispatcher.getInstance().multicast(event); // 向所有和event名称匹配的事件推送
  81. }
  82. }
  83. }



sources.properties 本测试程序不需要在该文件中配置任何东西


  1. #
  2. # Properties file for EventSource objects to be instantiated.
  3. #
  4. # Place this file in the CLASSPATH (e.g. WEB-INF/classes) or directly under WEB-INF.
  5. #
  6. # $Id: sources.properties,v 1.2 2007/11/10 14:12:16 justb Exp $
  7. #
  8. # Each EventSource is defined as <key>=<classname>
  9. # 1. <key> should be unique within this file but may be any name
  10. # 2. <classname> is the full class name
  11. #
  12. #
  13. # Define Pull Sources here. These classes must be derived from
  14. # nl.justobjects.pushlet.core.EventPullSource
  15. # Inner classes are separated with a $ sign from the outer class.
  16. source1=nl.justobjects.pushlet.test.TestEventPullSources$TemperatureEventPullSource
  17. source2=nl.justobjects.pushlet.test.TestEventPullSources$SystemStatusEventPullSource
  18. source3=nl.justobjects.pushlet.test.TestEventPullSources$PushletStatusEventPullSource
  19. source4=nl.justobjects.pushlet.test.TestEventPullSources$AEXStocksEventPullSource
  20. source5=nl.justobjects.pushlet.test.TestEventPullSources$WebPresentationEventPullSource
  21. source6=nl.justobjects.pushlet.test.TestEventPullSources$PingEventPullSource
  22. source7=nl.justobjects.pushlet.test.TestEventPullSources$MyEventPullSource
  23. source8=nl.justobjects.pushlet.test.TestEventPullSources$SpEventPullSource
  24. # TO BE DONE IN NEXT VERSION
  25. # define Push Sources here. These must implement the interface
  26. # nl.justobjects.pushlet.core.EventSource




web.xml


这里主要是配置servlet:TestServlet,其他servlet用不到


  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <web-app>
  3. <!-- Define the pushlet servlet -->
  4. <servlet>
  5. <servlet-name>pushlet</servlet-name>
  6. <servlet-class>nl.justobjects.pushlet.servlet.Pushlet</servlet-class>
  7. <load-on-startup>1</load-on-startup>
  8. </servlet>
  9. <servlet-mapping>
  10. <servlet-name>pushlet</servlet-name>
  11. <url-pattern>/pushlet.srv</url-pattern>
  12. </servlet-mapping>
  13. <servlet>
  14. <display-name>ChatServlet</display-name>
  15. <servlet-name>ChatServlet</servlet-name>
  16. <servlet-class>com.guoguo.ChatServlet</servlet-class>
  17. </servlet>
  18. <servlet-mapping>
  19. <servlet-name>ChatServlet</servlet-name>
  20. <url-pattern>/ChatServlet</url-pattern>
  21. </servlet-mapping>
  22. <servlet>
  23. <display-name>TestServlet</display-name>
  24. <servlet-name>TestServlet</servlet-name>
  25. <servlet-class>com.guoguo.TestServlet</servlet-class>
  26. </servlet>
  27. <servlet-mapping>
  28. <servlet-name>TestServlet</servlet-name>
  29. <url-pattern>/TestServlet</url-pattern>
  30. </servlet-mapping>
  31. </web-app>



receive.jsp


前台界面,主要功能如下:

1. 负责生成随机主题名测点集合

2. 页面初始化时,将主题名测点集合发送到后台,并开起pushlet监听;

PL._init();

PL.joinListen(aSubject);

3. 编写onData()函数,用于处理“服务器推”送到前台的数据,并显示在页面上;

4. 同时编写了“取消订阅”按钮,点击时,前台主动“取消订阅”,后台服务器接收到后,就会释放与此对应的Thread。


  1. <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
  2. <%
  3. String path = request.getContextPath();
  4. String basePath = request.getScheme() + "://"
  5. + request.getServerName() + ":" + request.getServerPort()
  6. + path + "/";
  7. %>
  8. <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
  9. <html>
  10. <head>
  11. <base href="<%=basePath%>">
  12. <meta http-equiv="pragma" content="no-cache">
  13. <meta http-equiv="cache-control" content="no-cache">
  14. <meta http-equiv="expires" content="0">
  15. <meta http-equiv="keywords" content="keyword1,keyword2,keyword3">
  16. <meta http-equiv="description" content="This is my page">
  17. <script type="text/javascript"
  18. src="<%=basePath%>js/pushlet_js/ajax-pushlet-client.js"></script>
  19. <script type="text/javascript">
  20. var subscriptionId = null;
  21. window.onload = onInit;
  22. window.onbeforeunload = onUnsubscribe;
  23. // 监听后台返回的数据信息,更新页面
  24. function onData(event) {
  25. // 保存订阅编号,用于页面关闭时进行退订
  26. subscriptionId = event.get(‘p_sid‘);
  27. // 更新页面
  28. document.dataEventDisplay.event.value = event.get("key1");
  29. // ------实际案例处理(test)---------
  30. /* var respData = decodeURIComponent(event.get("key1"));
  31. var respObj = eval(respData);
  32. //var respActionNum = respObj.length;
  33. var obj = respObj[0];
  34. var str = "";
  35. for(var p in obj){
  36. if(typeof(obj[p]) != "function"){
  37. str += p + "=" + obj[p] + ", " ;
  38. }
  39. }
  40. alert(str); */
  41. }
  42. // 页面关闭时,取消订阅
  43. function onUnsubscribe() {
  44. if (subscriptionId != null) {
  45. PL.unsubscribe(subscriptionId);
  46. }
  47. }
  48. // 页面加载完,初始化请求、监听
  49. function onInit() {
  50. var aSubject = _getRandomString(6); //主题名
  51. var httpRequest = getXMLHttpRequest();
  52. if (httpRequest) {
  53. var reqData = getData();
  54. httpRequest.onreadystatechange = function() {
  55. if (httpRequest.readyState == 4) {
  56. if (httpRequest.status == 200) {
  57. // 请求成功,起pushlet监听
  58. PL._init();
  59. PL.joinListen(aSubject);
  60. } else {
  61. alert("实时请求失败!\n" + httpRequest.statusText);
  62. }
  63. }
  64. }
  65. url = ‘<%=request.getContextPath()%>‘ + ‘/TestServlet‘
  66. + ‘?subject=‘ + aSubject;
  67. httpRequest.open("POST", url, true);
  68. httpRequest.send(reqData);
  69. }
  70. }
  71. // 请求关键字
  72. function getData() {
  73. var reqData = "[30200000001010, 30200000001012, 30800000003009, 30800000006009]";
  74. return reqData;
  75. }
  76. // 获取http请求
  77. function getXMLHttpRequest() {
  78. req = false;
  79. //本地XMLHttpRequest对象
  80. if (window.XMLHttpRequest) {
  81. try {
  82. req = new XMLHttpRequest();
  83. } catch (e) {
  84. req = false;
  85. }
  86. //IE/Windows ActiveX版本
  87. } else if (window.ActiveXObject) {
  88. try {
  89. req = new ActiveXObject("Msxml2.XMLHTTP");
  90. } catch (e) {
  91. try {
  92. req = new ActiveXObject("Microsoft.XMLHTTP");
  93. } catch (e) {
  94. req = false;
  95. }
  96. }
  97. }
  98. return req;
  99. }
  100. // 获取长度为len的随机字符串
  101. function _getRandomString(len) {
  102. var len = len || 32;
  103. var chars = ‘ABCDEFGHJKMNPQRSTWXYZabcdefhijkmnprstwxyz‘;
  104. var maxPos = chars.length;
  105. var pwd = ‘‘;
  106. for (i = 0; i < len; i++) {
  107. pwd += chars.charAt(Math.floor(Math.random() * maxPos));
  108. }
  109. return pwd;
  110. }
  111. </script>
  112. </head>
  113. <body>
  114. <form name="dataEventDisplay">
  115. <table border="2" bordercolor="white" cellpadding="0" cellspacing="0">
  116. <tr>
  117. <td><textarea cols="60" rows="10" name="event">没有消息 </textarea></td>
  118. </tr>
  119. </table>
  120. </form>
  121. <button onclick="onUnsubscribe()">取消订阅</button>
  122. </body>
  123. </html>

测试结果分析:



运行程序,浏览器中输入:http://localhost:8080/pushletTest/receive.jsp

如下是运行界面:(只开启了一个session,即一个界面)

这是前台显示的界面:这些数据是服务器实时推送过来的。

当在前端打开多个不同的界面,如3个:

前台打开了3个界面,后台会自动开启3个对应的Thread,在各个Thread中分别处理每个session对应的业务。

我们来看一下后台的线程数:

我们再开启一个界面,现在总共有4个界面开启,见下图:

看一下对应的后台:

从上图可以看到,总共有4个线程在运行,对应4个session;在看一下线程数:

与前一个对比:

能看到线程由47个变为了48个。

接下来我们关掉其中的3个界面,只留下一个界面,按照之前的分析,应该只剩下1个Thread、1个session,

且后台的线程数应该由48个变为45个,下面我们看下截图:(现在只有一个界面留下,其他3个都关闭了):

后台的javaw.exe的线程数还是48个;

此时前台浏览器关闭了,但是Thread并没有立即关闭;

等待一会,大约半分钟(等待时间不会太长),再次出现下面的界面:

看一下后台:

从上面的分析可以看到,前台界面关闭后,稍后延时一会,后台会将其对应的Thread释放掉。

(多次测试后,发现关闭浏览器界面后,很快(大约2~3)后台线程j就释放掉了)

如果不关闭界面,而是直接点击“取消订阅”按键,会发现后台会立即将其对应的Thread释放掉。(这个已测试过);

所以,为了在关闭浏览器界面时,立即释放掉后台对应的Thread,我们可以在页面关闭时,自动执行“取消订阅”函数,

这样,在每次关闭浏览器界面时,后台检测到就会立即释放对应的线程。


还有一点需要注意,Thread释放了,但是我们发现session的个数并没有立即减少;

继续等待,大约几分钟过后,session的个数会减少到当前打开的界面个数。

这个还没有研究出为什么,需要继续研究。

来自为知笔记(Wiz)

附件列表

时间: 2024-11-02 12:50:56

【pushlet学习】具体实战的相关文章

NGUI 学习笔记实战——制作商城

Unity3D的uGUI听说最近4.6即将推出,但是目前NGUI等UI插件大行其道并且已经非常成熟,所以我们还是先看眼前吧. 一.实现思想 商城的功能是很多游戏都拥有的,按下一个界面按钮,弹出一个窗体. 然后是商城中的商品可以拖动,既可以用手,也可以用滑条等等,至于点击购买就不单单是UI层的事了.等到实现NDate的时候再进行讨论. 二.实现背景 1.NGUI->Open->Prefab tool bar ,拖一个black widget进去 2.之后布局如下,这些应该没什么难度,弄好锚点,d

深度学习Caffe实战笔记(19)Windows平台 Faster-RCNN 制作自己的数据集

"-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> 深度学习Caffe实战笔记(19)Windows平台 Faster-RCNN 制作自己的数据集 - gybheroin的博客 - 博客频道 - CSDN.NET gybheroin的博客 目录视图 摘要视图 订阅 [活动]2017 CSDN博客专栏评选 &nbsp [

Flask框架的学习与实战(一):开发环境搭建

Flask是一个使用 Python 编写的轻量级 Web 应用框架.其 WSGI 工具箱采用 Werkzeug ,模板引擎则使用 Jinja2.很多功能的实现都参考了django框架.由于项目需要,在此记录下学习的过程及心得. 工欲善其事,必先利其器.就从搭建一套flask开发环境开始flask之旅吧. 一.平台说明 操作系统:window 7  64bit  数据库:mysql5.6  python:v2.7  开发集成软件:PyCharm5.0 二.开发环境搭建 1.安装flask框架包 1

进阶学习项目实战链接

进阶学习目录 Python Django Ansible Playbook自动化运维项目实战 https://pan.baidu.com/s/1MAz_sNypeDSySQCdLiOuDw 顶级资深工程师深度讲解Go语言开发入门到精通 Go编程爬虫实战视频 https://pan.baidu.com/s/1nx82k7mOn8ErlPSsCdLfTw Zabbix监控系统深度实践 https://pan.baidu.com/s/1v3bAFqhk890KokIsmu3CMQ Spring Boo

蓝懿IOS学习UICollectionView实战轮播图

今天刘国斌老师讲了关于JSON数据源的获取与利用,通过微博的实战项目进行练习,获取的数据都是网络上请求的真实数据,这种方式学起来很轻松,很容易理解. 刘国斌老师把今天做的练习题UICollectionView轮播图实现功能的方法步骤都下了下来,我们学起来很方便.   实现轮播图 效果的步骤: 1.创建layout (UICollectionViewFlowLayout) 2.设置layout的方向 默认上下 3.创建UICollectionView 4.设置delegate dataSource

今天是项目经理指导学习PHP实战项目的第二天

最近比较浮躁不安,一直抗拒学习PHP,在身边的朋友悉心引导下, 慢慢静下心来.我好开心,能有这么一个好的契机一边上班 一边学习. <tr> <td width="100">品牌</td> <td> <?php echo SqlSelect('goods where 1=1','brand','select','brand','-请选择品牌-'); ?> </td></tr> /**********数据库

Php廖雪峰教程学习与实战

https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000 目录 Python教程 Python简介 安装Python 第一个Python程序 Python基础 函数 高级特性 函数式编程 模块 面向对象编程 面向对象高级编程 错误.调试和测试 IO编程 进程和线程 正则表达式 常用内建模块 常用第三方模块 virtualenv 图形界面 网络编程 电子邮件 访问数据库 Web开发 异步I

Pytorch学习--编程实战:猫和狗二分类

Pytorch学习系列(一)至(四)均摘自<深度学习框架PyTorch入门与实践>陈云 目录: 1.程序的主要功能 2.文件组织架构 3. 关于`__init__.py` 4.数据处理 5.模型定义 6.工具函数 7.配置文件 8.main.py 9.使用 1.程序的主要功能: 模型定义    数据加载    训练和测试 2.文件组织架构: ```├── checkpoints/├── data/│   ├── __init__.py│   ├── dataset.py│   └── get_

keytool 学习与实战

1.生成属于自己的私钥/公钥对,这可以通过 keytool -genkeypairs -alias xxx 命令得到.创建密钥对的时候, keytool 会在 keystore 中生成一个新的条目, -alias xxx 选项就是对该条目进行命名.生成密钥对之后,私钥是以原始数据直接储存在 keystore 中的,而公钥是要发布出去的,所以它被封装在一个 X.509 格式的自签名证书中.换句话说,创建密钥对的时候,同时就创建了一个自签名的证书. 将自己假想为一个认证机构,或者说一个只对我自己签发