Spring+Netty+WebSocket实例

比较贴近生产,详见注释

一、pom.xml

具体太长,详见源码

    </dependency>
        <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.2.Final</version>
    </dependency>

二、目录结构

三、AfterSpringBegin

继承了AfterSpringBegin的子类在spring加载成功后,会自动启动

package com.netty.init;

import java.util.Timer;
import java.util.TimerTask;

import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
/**
 *
 * spring加载后改方法的子类
 * */
public abstract class AfterSpringBegin extends TimerTask  implements ApplicationListener<ContextRefreshedEvent>{

    public void onApplicationEvent(ContextRefreshedEvent event) {
        // TODO Auto-generated method stub
        if(event.getApplicationContext().getParent() ==null){

            Timer timer = new Timer();
            timer.schedule(this, 0);
        }
    }

}

四、Constant

存放了websocket相关信道

package com.netty.constant;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 常量
 * */
public class Constant {
    //存放所有的ChannelHandlerContext
    public static Map<String, ChannelHandlerContext> pushCtxMap = new ConcurrentHashMap<String, ChannelHandlerContext>() ;

    //存放某一类的channel
    public static ChannelGroup aaChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}

五、WebSocketServer

启动服务

package com.netty.server;

import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;

import com.netty.init.AfterSpringBegin;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 启动服务
 * */

public class WebSocketServer extends AfterSpringBegin{

    //用于客户端连接请求
    @Autowired
    private EventLoopGroup bossGroup;

    //用于处理客户端I/O操作
    @Autowired
    private EventLoopGroup workerGroup;

    //服务器的辅助启动类
    @Autowired
    private ServerBootstrap serverBootstrap;

    //BS的I/O处理类
    private ChannelHandler childChannelHandler;

    private ChannelFuture channelFuture;

    //服务端口
    private int port;

    public WebSocketServer(){

        System.out.println("初始化");
    }

    public EventLoopGroup getBossGroup() {
        return bossGroup;
    }

    public void setBossGroup(EventLoopGroup bossGroup) {
        this.bossGroup = bossGroup;
    }

    public EventLoopGroup getWorkerGroup() {
        return workerGroup;
    }

    public void setWorkerGroup(EventLoopGroup workerGroup) {
        this.workerGroup = workerGroup;
    }

    public ServerBootstrap getServerBootstrap() {
        return serverBootstrap;
    }

    public void setServerBootstrap(ServerBootstrap serverBootstrap) {
        this.serverBootstrap = serverBootstrap;
    }

    public ChannelHandler getChildChannelHandler() {
        return childChannelHandler;
    }

    public void setChildChannelHandler(ChannelHandler childChannelHandler) {
        this.childChannelHandler = childChannelHandler;
    }

    public ChannelFuture getChannelFuture() {
        return channelFuture;
    }

    public void setChannelFuture(ChannelFuture channelFuture) {
        this.channelFuture = channelFuture;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            bulid(port);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void bulid(int port) throws Exception{

        try {

            //(1)boss辅助客户端的tcp连接请求  worker负责与客户端之前的读写操作
            //(2)配置客户端的channel类型
            //(3)配置TCP参数,握手字符串长度设置
            //(4)TCP_NODELAY是一种算法,为了充分利用带宽,尽可能发送大块数据,减少充斥的小块数据,true是关闭,可以保持高实时性,若开启,减少交互次数,但是时效性相对无法保证
            //(5)开启心跳包活机制,就是客户端、服务端建立连接处于ESTABLISHED状态,超过2小时没有交流,机制会被启动
            //(6)netty提供了2种接受缓存区分配器,FixedRecvByteBufAllocator是固定长度,但是拓展,AdaptiveRecvByteBufAllocator动态长度
            //(7)绑定I/O事件的处理类,WebSocketChildChannelHandler中定义
            serverBootstrap.group(bossGroup,workerGroup)
                           .channel(NioServerSocketChannel.class)
                           .option(ChannelOption.SO_BACKLOG, 1024)
                           .option(ChannelOption.TCP_NODELAY, true)
                           .childOption(ChannelOption.SO_KEEPALIVE, true)
                           .childOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(592048))
                           .childHandler(childChannelHandler);

            System.out.println("成功");
            channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            // TODO: handle exception
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();

        }

    }

    //执行之后关闭
    @PreDestroy
    public void close(){
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();

    }
}

六、WebSocketChildChannelHandler

五里面的private ChannelHandler childChannelHandler; 注入的就是这个类,注入配置在后面的xml中,用处在五代码里注解了

package com.netty.server;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

@Component
public class WebSocketChildChannelHandler extends ChannelInitializer<SocketChannel>{

    @Resource(name = "webSocketServerHandler")
    private ChannelHandler webSocketServerHandler;

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // TODO Auto-generated method stub
        ch.pipeline().addLast("http-codec", new HttpServerCodec());
        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        ch.pipeline().addLast("handler",webSocketServerHandler);
    }

}

七、WebSocketServerHandler

websocket具体的业务处理,六中的private ChannelHandler webSocketServerHandler;,注入的就是这个类

package com.netty.server;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;
import com.netty.constant.Constant;
import com.netty.manage.ManageMessage;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

/**
 * websocket 具体业务处理方法
 *
 * */

@Component
@Sharable
public class WebSocketServerHandler extends BaseWebSocketServerHandler{

    private WebSocketServerHandshaker handshaker;

    /**
     * 当客户端连接成功,返回个成功信息
     * */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        push(ctx, "连接成功");
    }

    /**
     * 当客户端断开连接
     * */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        for(String key:Constant.pushCtxMap.keySet()){

            if(ctx.equals(Constant.pushCtxMap.get(key))){
                //从连接池内剔除
                System.out.println(Constant.pushCtxMap.size());
                System.out.println("剔除"+key);
                Constant.pushCtxMap.remove(key);
                System.out.println(Constant.pushCtxMap.size());
            }

        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // TODO Auto-generated method stub

        //http://xxxx
        if(msg instanceof FullHttpRequest){

            handleHttpRequest(ctx,(FullHttpRequest)msg);
        }else if(msg instanceof WebSocketFrame){
        //ws://xxxx
            handlerWebSocketFrame(ctx,(WebSocketFrame)msg);
        }

    }

    public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception{

        //关闭请求
        if(frame instanceof CloseWebSocketFrame){

            handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());

            return;
        }
        //ping请求
        if(frame instanceof PingWebSocketFrame){

            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));

            return;
        }
        //只支持文本格式,不支持二进制消息
        if(!(frame instanceof TextWebSocketFrame)){

            throw new Exception("仅支持文本格式");
        }

        //客服端发送过来的消息

         String request = ((TextWebSocketFrame) frame).text();
         System.out.println("服务端收到:" + request);

         JSONObject jsonObject = null;

            try
            {
                jsonObject = JSONObject.parseObject(request);
                System.out.println(jsonObject.toJSONString());
            }
            catch (Exception e)
            {
            }
         if (jsonObject == null){

             return;
         }

          String id = (String) jsonObject.get("id");
          String type = (String) jsonObject.get("type");   

          //根据id判断是否登陆或者是否有权限等

          if(id!=null && !"".equals("id")  &&  type!=null && !"".equals("type")){

              //用户是否有权限
              boolean idAccess = true;
              //类型是否符合定义
              boolean typeAccess = true; 

              if(idAccess && typeAccess){
                  System.out.println("添加到连接池:"+request);
                  Constant.pushCtxMap.put(request,ctx);
                  Constant.aaChannelGroup.add(ctx.channel());
              }

              //根据type 存放进对于的channel池,这里就简单实现,直接放进aaChannelGroup,方便群发

          }

    }
    //第一次请求是http请求,请求头包括ws的信息
    public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){

        if(!req.decoderResult().isSuccess()){

            sendHttpResponse(ctx,req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }

        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws:/"+ctx.channel()+ "/websocket",null,false);
        handshaker = wsFactory.newHandshaker(req);

        if(handshaker == null){
            //不支持
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        }else{

            handshaker.handshake(ctx.channel(), req);
        }

    }

    public static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,DefaultFullHttpResponse res){

        // 返回应答给客户端
        if (res.status().code() != 200)
        {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }

        // 如果是非Keep-Alive,关闭连接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.status().code() != 200)
        {
            f.addListener(ChannelFutureListener.CLOSE);
        }

    }

    private static boolean isKeepAlive(FullHttpRequest req)
    {
        return false;
    }

    //异常处理,netty默认是关闭channel
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        // TODO Auto-generated method stub
        //输出日志
         cause.printStackTrace();
         ctx.close();
    }

}

八、BaseWebSocketServerHandler

把推送方法单独抽象出来,方便调用

package com.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * 发消息方式 抽象出来
 *
 * */
public abstract class BaseWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{

    /**
     * 推送单个
     *
     * */
    public static final void push(final ChannelHandlerContext ctx,final String message){

        TextWebSocketFrame tws = new TextWebSocketFrame(message);
        ctx.channel().writeAndFlush(tws);

    }
    /**
     * 群发
     *
     * */
    public static final void push(final ChannelGroup ctxGroup,final String message){

        TextWebSocketFrame tws = new TextWebSocketFrame(message);
        ctxGroup.writeAndFlush(tws);

    }
}

九、配置

application-netty.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:cache="http://www.springframework.org/schema/cache"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd">

    <bean id="bossGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean>
    <bean id="workerGroup" class="io.netty.channel.nio.NioEventLoopGroup"></bean>
    <bean id="serverBootstrap" class="io.netty.bootstrap.ServerBootstrap" scope="prototype"></bean>
    <bean id="webSocketServer" class="com.netty.server.WebSocketServer">

        <property name="port" value="${websocket.server.port}"></property>
        <property name="childChannelHandler" ref="webSocketChildChannelHandler" />
    </bean>
</beans>

application-beans.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context-3.0.xsd">

     <bean class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor"/>

     <context:annotation-config />

     <context:component-scan base-package="com.netty">
          <!-- 排除vst.back目录下Controller的service注入 -->
         <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>

    <bean id="configProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="locations">
            <list>
                <value>classpath*:conf/websocket.properties</value>
            </list>
        </property>
    </bean>

   <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
        <property name="properties" ref="configProperties" />
   </bean> 

</beans>

springmvc.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">

    <description>Spring-web MVC配置</description>

    <bean
        class="org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter">
        <property name="messageConverters">
            <list>
                <bean
                    class="org.springframework.http.converter.StringHttpMessageConverter">
                    <property name="supportedMediaTypes">
                        <list>
                            <value>text/html;charset=UTF-8</value>
                        </list>
                    </property>
                </bean>

            </list>
        </property>
    </bean>

    <mvc:annotation-driven />

    <context:component-scan base-package="com.netty.controller">
        <context:include-filter type="annotation"
            expression="org.springframework.stereotype.Controller" />
        <context:exclude-filter type="annotation"
            expression="org.springframework.stereotype.Service" />
    </context:component-scan>

</beans>

websocket.properties

websocket.server.port=7397

十、客户端

用的jsp页面,具体连接逻辑什么的看需要写

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title></title>
</head>
  </head>
  <script type="text/javascript">
  var socket;
  //实际生产中,id可以从session里面拿用户id
  var id  = Math.random().toString(36).substr(2);
  if(!window.WebSocket){
      window.WebSocket = window.MozWebSocket;
  }  

  if(window.WebSocket){
      socket = new WebSocket("ws://localhost:7397");  

      socket.onmessage = function(event){
            appendln("receive:" + event.data);
      };  

      socket.onopen = function(event){
            appendln("WebSocket is opened");
            login();
      };  

      socket.onclose = function(event){
            appendln("WebSocket is closed");
      };
  }else{
        alert("WebSocket is not support");
  }  

  function appendln(text) {
    var ta = document.getElementById(‘responseText‘);
    ta.value += text + "\r\n";
  }  

  function login(){
      console.log("aaaaaa");
      var date={"id":id,"type":"aa"};
      var login = JSON.stringify(date);
      socket.send(login);

  }  

  </script>
  <body>
    <form onSubmit="return false;">
        <input type = "text" name="message" value="hello"/>
        <br/><br/>  

        <textarea id="responseText" style="width: 800px;height: 300px;"></textarea>
    </form>
  </body>
</html> 

十一、源码

源码

时间: 2024-10-08 09:31:55

Spring+Netty+WebSocket实例的相关文章

Spring Chapter4 WebSocket 胡乱翻译 (二)

书接上文,Spring Chapter4 WebSocket 胡乱翻译 (一) 4.4.4. 消息流 一旦暴露了STOMP端点,Spring应用程序就成为连接客户端的STOMP代理. 本节介绍服务器端的消息流. Spring-messaging模块包含对源自Spring Integration的消息传递应用程序的基础支持,后来被提取并整合到Spring Framework中,以便在许多Spring项目和应用程序场景中得到更广泛的使用. 下面列出了一些可用的消息传递抽象: Message - 包含

Spring Netty (实战)

Spring Netty 实战 疯狂创客圈 死磕Netty 系列之11 主要介绍的是SpringBoot整合Netty.在使用Netty之前,建议先了解Netty的基本原理,请参阅疯狂创客圈. 这里仅仅是使用Netty的第一步,这里介绍一个最简单的Demo--EchoServer,也就是回写服务器.就是无论客户端发啥字符串到服务器端,服务器端接收字符串后直接回写到客户端. 本篇内容纲要 环境要求 Spring +netty 服务器端 Spring +netty 客户端 Spring读取配置文件中

玩转spring boot——websocket

前言 QQ这类即时通讯工具多数是以桌面应用的方式存在.在没有websocket出现之前,如果开发一个网页版的即时通讯应用,则需要定时刷新页面或定时调用ajax请求,这无疑会加大服务器的负载和增加了客户端的流量.而websocket的出现,则完美的解决了这些问题. spring boot对websocket进行了封装,这对实现一个websocket网页即时通讯应用来说,变得非常简单. 一.准备工作 pom.xml引入 <dependency> <groupId>org.springf

Spring之WebSocket网页聊天以及服务器推送

Spring之WebSocket网页聊天以及服务器推送 转自:http://www.xdemo.org/spring-websocket-comet/ /Springframework /Spring之WebSocket网页聊天以及服务器推送 1. WebSocket protocol 是HTML5一种新的协议.它实现了浏览器与服务器全双工通信(full-duplex). 2. 轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发出HTTP request,然后由服务器返回最新的数据给客服端

互联网架构师视频课程 Dubbo ActiveMQ Spring Netty MongoDB Jvm

互联网架构师视频课程 Dubbo ActiveMQ spring Netty MongoDB Jvm =================================================================== 2016年netty/mina/java nio视频教程java游戏服务器设计教程 互联网架构师教程:http://blog.csdn.net/pplcheer/article/details/71887910 需要的加qq:1225462853,备注:程序员学

Spring+EhCache缓存实例(详细讲解+源码下载)(转)

一.ehcahe的介绍 EhCache 是一个纯Java的进程内缓存框架,具有快速.精干等特点,是Hibernate中默认的CacheProvider.Ehcache是一种广泛使用的开源Java分布式缓存.主要面向通用缓存,Java EE和轻量级容器.它具有内存和磁盘存储,缓存加载器,缓存扩展,缓存异常处理程序,一个gzip缓存servlet过滤器,支持REST和SOAP api等特点. 优点: 1. 快速 2. 简单 3. 多种缓存策略 4. 缓存数据有两级:内存和磁盘,因此无需担心容量问题 

Spring.Net 简单实例-02(属性注入)

说明:接续Spring.Net 简单实例-01(IOC) 话不多说看操作 1:为UserInfo添加属性 2: 修改App.config中代码 <?xml version="1.0" encoding="utf-8" ?> <configuration> <configSections> <sectionGroup name="spring"> <section name="cont

Spring中AOP实例详解

Spring中AOP实例详解 需要增强的服务 假如有以下service,他的功能很简单,打印输入的参数并返回参数. @Service public class SimpleService { public String getName(String name) { System.out.println(get name is: + name); return name; } } 定义切面和切点 @Component @Aspect public class L ogAspect { // 定义切

netty入门实例

TimeServer.java package netty.timeserver.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGro