netty websocket 简单消息推送demo

今天心情很不好!!! 原因保密。

这篇是基于"netty与websocket通信demo"。

错误想法:大量客户请求,共用一个worker,来实现推送。

正确作法:应该是对Channel对应的ChannelGroup进行操作,来实现推送。

一个Channel可以划分到多个ChannelGroup中。

PushServerChannelHandler和DynMessage这两个类最重要,其实类基本没变。

package org.sl.demo.chatserver;

import java.util.List;
import java.util.Map;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;

public class PushServerChannelHandler extends SimpleChannelHandler {
	static boolean debug = true;

	@Override
	public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e){
		if(debug){
			System.out.println("channelOpen");
		}
		DynMessage.addAudience(e.getChannel());
	}

	@Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
		Channel ch = e.getChannel();
		Object msg = e.getMessage();

		if(debug){
			System.out.println("---------------");
			System.out.println("message: "+msg.getClass());
		}
		try{
			if(msg instanceof HttpRequest){
				processHttpRequest(ch, (HttpRequest)msg);
			}else if(msg instanceof WebSocketFrame){
				processWebsocketRequest(ch,(WebSocketFrame)msg);
			}else{
				//未处理的请求类型
			}
		}catch(Exception ex){
			ch.close().sync();
		}
		super.messageReceived(ctx, e);
	}

	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e){
		if(debug){
			System.out.println("channelClosed");
		}
		if(e instanceof MessageEvent){
			MessageEvent me = (MessageEvent) e;
		}
		DynMessage.removeAudience(e.getChannel());
		e.getChannel().close();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e){
		if(debug){
			System.out.println("channelClosed");
		}
		DynMessage.removeAudience(e.getChannel());
		e.getCause().printStackTrace();
		e.getChannel().close();
		try {
			super.exceptionCaught(ctx, e);
		} catch (Exception e1) {
			e1.printStackTrace();
		}
	}

	void processHttpRequest(Channel channel,HttpRequest request){
		HttpHeaders headers = request.headers();
		if(debug){
			List<Map.Entry<String,String>> ls = headers.entries();
			for(Map.Entry<String,String> i: ls){
				System.out.println("header  "+i.getKey()+":"+i.getValue());
			}
		}	

		//non-get request
		if(!HttpMethod.GET.equals(request.getMethod())){
			DefaultHttpResponse resp = new DefaultHttpResponse(
					HttpVersion.HTTP_1_1,
					HttpResponseStatus.BAD_REQUEST);
			channel.write(resp);
			channel.close();
			return;
		}

		WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory(
				"ws://"+request.headers().get(HttpHeaders.Names.HOST),
				null,false );
		WebSocketServerHandshaker wsShakerHandler = wsShakerFactory.newHandshaker(request);
		if(null==wsShakerHandler){
			//无法处理的websocket版本
			wsShakerFactory.sendUnsupportedWebSocketVersionResponse(channel);
		}else{
			//向客户端发送websocket握手,完成握手
			//客户端收到的状态是101 sitching protocol
			wsShakerHandler.handshake(channel, request);
		}
	}

	void processWebsocketRequest(Channel channel, WebSocketFrame request) throws Exception{
		if(request instanceof CloseWebSocketFrame){
			DynMessage.removeAudience(channel);
			channel.close().sync();
		}else if(request instanceof PingWebSocketFrame){
			channel.write(new PongWebSocketFrame(request.getBinaryData()));  
		}else if(request instanceof TextWebSocketFrame){
			//这个地方 可以根据需求,加上一些业务逻辑
			TextWebSocketFrame txtReq = (TextWebSocketFrame) request;
			if(debug){ System.out.println("txtReq:"+txtReq.getText());}
			if("disconnect".equalsIgnoreCase(txtReq.getText())){
				DynMessage.removeAudience(channel);
				channel.close().sync();
				return;
			}
			//把符合条件的channel添加到DynMessage的channelGroup中
			DynMessage.addAudience(channel);
		}else{
			//WebSocketFrame还有一些
		}
	}
}
package org.sl.demo.chatserver;

import java.util.Random;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
*动态产生消息,并向Channel组推送。
*/
public class DynMessage implements Runnable{
	public static ChannelGroup audiences = new DefaultChannelGroup("msg-group");

	static public void addAudience(Channel ch){
		audiences.add(ch);
	}

	static public void removeAudience(Channel ch){
		audiences.remove(ch);
	}

	static String[] names = {
		"Tom", "Jerry",
		"Terry", "Looney",
		"Merrie", "William",
		"Joseph", "Hanna",
		"Speike", "Tyke",
		"Tuffy", "Lightning",
	};
	static String message = "";

	public static String getMessage(){
		StringBuffer sb = new StringBuffer();
		sb.append("hello,my name is ");
		sb.append(names[new Random().nextInt(names.length)]);
		sb.append(".");
		return sb.toString();
//		return message;
	}

	@Override
	public void run() {
		System.out.println("DynMessage start");
		for(;;){
			String msg = getMessage();
			radiate(msg);
			try{Thread.sleep(1000); }catch(Exception ex){}
		}
	}

	void radiate(String msg){
		audiences.write(new TextWebSocketFrame(msg));
	}
}
<html>
<head>
<script src="jquery-1.9.1.js"></script>
<script src="messagepush.js"></script>
<script >
function doStop(){
	stopMsgPush();
}

function doWsStart(){
	var  r6 = generateMixed(6);
	$("#txtReq").val(r6);
	var  params = $("#txtReq").val();
	doStop();

	wsMsgPush(‘127.0.0.1‘,params,
		function(data){
			$("#txtResp").val(data);
		},
		function(){
			$("#txtResp").val("ws close...");
		} ,
		function(){
			$("#txtResp").val("ws error...");
		} );
}
</script>
</head>

<body>
 
<br/>
<br/><br/>
send: <input id="txtReq" readonly="readonly" type="text" value="" />
<input type="button" value="start" onclick="doWsStart()">
<input type="button" value="stop" onclick="doStop()"/> 
<br/>

recv: <input id="txtResp" type="text" value=""  size="50"/>
</body>
</html>
var _mp_ws = null;
var _mp_ajax_it = null;

function msgPush(url, params,onmessage,onclose,onerror){
	wsMsgPush(url,params,onmessage,onclose,onerror);
	if(!_mp_ws){
		ajaxMsgPush(url,params,10000,onmessage,onclose,onerror);
	}
}

function old_wsMsgPush(url, params,onmessage,onclose,onerror){
	var ws = new WebSocket("ws://"+url); 
	ws.onopen = function(){ws.send(‘1111‘)};
	ws.onmessage = function(evt){ onmessage(evt.data);};
}

function wsMsgPush(url, params,onmessage,onclose,onerror){
	_mp_ws = new WebSocket("ws://"+url); 
	if(!_mp_ws){ return; }

	_mp_ws.onopen = function(){ 
		_mp_ws.send(params); 
	};
	if(onmessage) _mp_ws.onmessage = function(evt){ onmessage(evt.data); }
	if(onerror) _mp_ws.onerror = function (evt){ onerror(); }
	if(onclose) _mp_ws.onclose = function (evt){ onclose(); }
}

function ajaxMsgPush(url, params,interval,onmessage,onclose,onerror){
	function __getmsg(){
		$.ajax({
			url:				url,
			data:			params,
			cache:			true,
			type:			"get",
			dataType:		"text",
			success:		function(data, textStatus, jqXHR){ 
				if(onmessage) onmessage(data);
			},
			error:			function(jqXHR, textStatus, errorThrown){
				if(onerror) onerror();
			},
			complete:		function(jqXHR, textStatus){
				if(onclose) onclose();
			}
		});
	}	

	_mp_ajax_it = setInterval("__getmsg()",interval);
}

function stopMsgPush(){
	if(_mp_ws){
		_mp_ws.send("disconnect");
		_mp_ws.close();
	}

	if(_mp_ajax_it){
		clearInterval(_mp_ajax_it);
	}
}

var chars = [‘0‘,‘1‘,‘2‘,‘3‘,‘4‘,‘5‘,‘6‘,‘7‘,‘8‘,‘9‘,‘A‘,‘B‘,‘C‘,‘D‘,‘E‘,‘F‘,‘G‘,‘H‘,‘I‘,‘J‘,‘K‘,‘L‘,‘M‘,‘N‘,‘O‘,‘P‘,‘Q‘,‘R‘,‘S‘,‘T‘,‘U‘,‘V‘,‘W‘,‘X‘,‘Y‘,‘Z‘];
function generateMixed(n) {
     var res = "";
     for(var i = 0; i < n ; i ++) {
         var id = Math.ceil(Math.random()*35);
         res += chars[id];
     }
     return res;
}
package org.sl.demo.chatserver;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.timeout.WriteTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer;

public class PushServerChannelPiplelineFactory  implements ChannelPipelineFactory{

	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline cp = Channels.pipeline();
		cp.addLast("decoder", new HttpRequestDecoder());
		cp.addLast("encoder", new HttpResponseEncoder());
		cp.addLast("writeTimeout", new WriteTimeoutHandler(new HashedWheelTimer(),10));
		cp.addLast("handler", new PushServerChannelHandler());
		return cp;
	}

}
package org.sl.demo.chatserver;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class PushServer implements Runnable{
	int port = 80;

	public PushServer(int port){
		this.port = port;
	}

	@Override
	public void run() {
		System.out.println("ChatServer "+port);

		ServerBootstrap b = new ServerBootstrap(
				new NioServerSocketChannelFactory(
						Executors.newCachedThreadPool(),
						Executors.newCachedThreadPool()));
		b.setOption("child.tcpNoDelay", true);  
		b.setOption("child.keepAlive", true);
		b.setPipelineFactory(new PushServerChannelPiplelineFactory());
		b.bind(new InetSocketAddress(port));
	}

	public static void main(String[] args){
		Thread t = new Thread(new DynMessage(),"DynMessage");
		t.start();
		new PushServer(80).run();
	}
}
时间: 2024-12-21 16:26:00

netty websocket 简单消息推送demo的相关文章

在Spring Boot框架下使用WebSocket实现消息推送

Spring Boot的学习持续进行中.前面两篇博客我们介绍了如何使用Spring Boot容器搭建Web项目(使用Spring Boot开发Web项目)以及怎样为我们的Project添加HTTPS的支持(使用Spring Boot开发Web项目(二)之添加HTTPS支持),在这两篇文章的基础上,我们今天来看看如何在Spring Boot中使用WebSocket. 什么是WebSocket WebSocket为浏览器和服务器之间提供了双工异步通信功能,也就是说我们可以利用浏览器给服务器发送消息,

nodejs实现简单消息推送

server1.js var http = require('http'); var express = require('express'); var sio = require('socket.io'); var app = express(); app.use(express.static(__dirname + '/')); var server = http.createServer(app); server.listen(3001, '127.0.0.1'); var io = si

关于前后端通过websocket实现消息推送的总结

公司要求做了一个看板,看板数据如果每隔一段时间刷新一次,就会增加服务器负担,所以使用了webscoket实现服务器推送, 主要工作集中在后端,前端相对简单,指等待后端建立好路径后给你你就可以通过以下方法实现消息推送了,非常简单,实用,常见的网页聊天等都是这种方式,没有深入了解,参考了网上的例子https://www.runoob.com/html/html5-websocket.html,仅供参考 beforeMount: function() { if ("WebSocket" in

WebSocket与消息推送

B/S结构的软件项目中有时客户端需要实时的获得服务器消息,但默认HTTP协议只支持请求响应模式,这样做可以简化Web服务器,减少服务器的负担,加快响应速度,因为服务器不需要与客户端长时间建立一个通信链接,但不容易直接完成实时的消息推送功能,如聊天室.后台信息提示.实时更新数据等功能,但通过polling.Long polling.长连接.Flash Socket以及HTML5中定义的WebSocket能完成该功能需要. 一.Socket简介 Socket又称"套接字",应用程序通常通过

C#解惑3——WebSocket与消息推送(转)

目录 一.Socket简介 二.WebSocket简介与消息推送 三.WebSocket客户端 四.WebSocket服务器端 五.测试运行 六.小结与消息推送框架 6.1.开源Java消息推送框架 Pushlet 6.2.开源DotNet消息推送框架SignalR 七.代码下载 7.1.Java实现的服务器端代码与客户端代码下载 7.2.DotNet服务器端手动连接实现代码下载 7.3.DotNet下使用SuperWebSocket三方库实现代码下载 B/S结构的软件项目中有时客户端需要实时的

使用Websocket实现消息推送

WebSocket 上 联系客服功能在项目中很难避免,一般有下面三种实现方式: 使用http的get方式轮询 接入第三方IM系统 自己的IM系统 基于socket 基于websocket 第一种方式,最low的,实现简单,但是浪费用户流量:第二种方式,接入简单,功能强大,但是可能需要一定的成本(比如付费):第三种方式,需要一定的开发成本(服务器托管费用忽略).对于第三种情况的 socket,实现IM的文字加音视频聊天,做过的话你可以也会直接懵逼.但是,简单的文字聊天还好,不过你还是需要去定义一些

websocket 实现消息推送(转)

介绍 现很多网站为了实现即时通讯,所用的技术都是轮询(polling).轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客服端的浏览器.这种传统的HTTP request 的模式带来很明显的缺点 – 浏览器需要不断的向服务器发出请求,然而HTTP request 的header是非常长的,里面包含的数据可能只是一个很小的值,这样会占用很多的带宽. 而最比较新的技术去做轮询的效果是Comet – 用了AJAX.但这种技术虽然可达到全双

android利用WebSocket实现消息推送

1.webSocket服务端的配置与代码: (1).服务器端工程目录结构: (2).web.xml的配置 <servlet-name>webSocketServlet</servlet-name>        <servlet-class>com.cn.controller.WebSocketServletService</servlet-class>      </servlet>    <servlet-mapping>    

spring boot下WebSocket消息推送

WebSocket协议 WebSocket是一种在单个TCP连接上进行全双工通讯的协议.WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范.WebSocket API也被W3C定为标准. WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据.在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输 STOMP协议 STOMP是面向文本的消息传