Netty学习摘记 —— 简单WEB聊天室开发

本文参考

本篇文章是对《Netty In Action》一书第十二章"WebSocket"的学习摘记,主要内容为开发一个基于广播的WEB聊天室

聊天室工作过程

请求的 URL 以/ws 结尾时,通过升级握手的机制把该协议升级为 WebSocket,之后客户端发送一个消息,这个消息会被广播到所有其它连接的客户端

当有新的客户端连入时,其它客户端也能得到通知

处理HTTP请求

首先实现该处理 HTTP 请求的组件,当请求的url没有指定的WebSocket连接的后缀时(如后缀/ws),这个组件将提供聊天室网页页面的http response响应

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
  private final String wsUri;
  private static final File INDEX;

  static {
    URL location = HttpRequestHandler.class
      .getProtectionDomain()
      .getCodeSource().getLocation();
    try {
      String path = location.toURI() + "index.html";
      path = !path.contains("file:") ? path : path.substring(6);
      INDEX = new File(path);
    } catch (URISyntaxException e) {
      throw new IllegalStateException("Unable to locate index.html", e);
    }
  }

  public HttpRequestHandler(String wsUri) {
    this.wsUri = wsUri;
  }

@Override
  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
  throws Exception {
    //(1) 如果请求了 WebSocket 协议升级,则增加引用计数(调用 retain()方法)
    // 并将它传递给下一个
ChannelInboundHandler
    if
(wsUri.equalsIgnoreCase(request.uri())) {
      ctx.fireChannelRead(request.retain());
    } else {
      //(2) 处理 100 Continue 请求以符合 HTTP 1.1 规范
      if (HttpUtil.is100ContinueExpected(request)) {
        send100Continue(ctx);
      }
      //读取 index.html
      RandomAccessFile file = new RandomAccessFile(INDEX, "r");
      HttpResponse response = new DefaultHttpResponse(
          request.protocolVersion(), HttpResponseStatus.OK);
      response.headers().set(
          HttpHeaderNames.CONTENT_TYPE,"text/html; charset=UTF-8");
      boolean keepAlive = HttpUtil.isKeepAlive(request);
      //如果请求了keep-alive,则添加所需要的 HTTP 头信息
      if (keepAlive) {
        response.headers().set(
           HttpHeaderNames.CONTENT_LENGTH,
           file.length());
        response.headers().set(
           HttpHeaderNames.CONNECTION,
           HttpHeaderValues.KEEP_ALIVE);
      }
      //(3) HttpResponse 写到客户端
      // 只是设置了响应的头信息,因此没有进行flush冲刷
      ctx.write(response);
      //(4) index.html 写到客户端,也不进行flush冲刷
      if (ctx.pipeline().get(SslHandler.class) == null) {
        // 如果不需要进行加密,则利用零拷贝特性达到最佳效率
        ctx.write(new DefaultFileRegion(
        file.getChannel(), 0, file.length()));
      } else {
        ctx.write(new ChunkedNioFile(
        file.getChannel()));
      }
      //(5) LastHttpContent 标记响应的结束并冲刷至客户端
      ChannelFuture future = ctx.writeAndFlush(
          LastHttpContent.EMPTY_LAST_CONTENT);
      //(6)如果没有请求keep-alive,则在写操作完成后关闭 Channel
      if
(!keepAlive) {
        future.addListener(ChannelFutureListener.CLOSE);
      }
    }
  }

  private static void send100Continue(ChannelHandlerContext ctx) {
    FullHttpResponse response = new DefaultFullHttpResponse(
        HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
    ctx.writeAndFlush(response);
  }

@Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
  throws Exception {
    cause.printStackTrace();
    ctx.close();
  }
}

  • 如果该 HTTP 请求指向了地址为/ws的URI,那么HttpRequestHandler将调用FullHttpRequest对象上的retain()方法,并通过调用fireChannelRead(msg)方法将它转发给下一 个ChannelInboundHandler。对于retain()方法的调用是必需的,因为当channelRead0()方法返回时, FullHttpRequest的引用计数将会被减少。由于所有的操作都是异步的,因此,fireChannelRead ()方法可能会在channelRead0()方法返回之后完成
  • 如果客户端发送了 HTTP 1.1 的 HTTP 头信息 Expect: 100-continue,那么 HttpRequestHandler将会发送一个100 Continue 响应
  • 在该HTTP头信息被设置之后,HttpRequestHandler 将会写回一个 HttpResponse 给客户端,因为这不是一个 FullHttpResponse,只是响应的第一个部分,所以不调用 writeAndFlush()方法,在response响应设置完成后才会调用
  • 通过检查是否有SslHandler存在于在ChannelPipeline中,判断是否有加密传输,如果不需要加密和压缩,那么可以通过零拷贝特性将 index.html 的内容存储到 DefaultFileRegion中来达到最佳效率,否则,使用ChunkedNioFile
  • HttpRequestHandler 将写一个 LastHttpContent 来标记响应的结束,因此可以调用writeAndFlush()方法
  • 如果没有请求 keep-alive,那么 HttpRequestHandler 将会添加一个 ChannelFutureListener 到最后一次写出动作的ChannelFuture,并关闭该连接

处理 WebSocket 帧

由 IETF 发布的 WebSocket RFC,定义了 6 种帧,Netty 为它们每种都提供了一个 POJO 实现

TextWebSocketFrame 是我们唯一真正需要处理的帧类型,下面展示了处理代码

public class TextWebSocketFrameHandler
  extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  private final ChannelGroup group;

  public TextWebSocketFrameHandler(ChannelGroup group) {
    this.group = group;
  }

  //重写 userEventTriggered()方法以处理自定义事件
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
  throws Exception {
    //如果该事件表示握手成功,则从该 ChannelPipeline 中移除HttpRequest-Handler
    //因为将不会接收到任何HTTP消息了
    if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
      ctx.pipeline().remove(HttpRequestHandler.class);
      //(1) 通知所有已经连接的WebSocket 客户端新的客户端已经连接上了
      group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
      //(2) 将新的 WebSocket Channel 添加到 ChannelGroup 中,以便它可以接收到所有的消息
      group.add(ctx.channel());
      System.out.println("a new channel added to group");
    } else {
      super.userEventTriggered(ctx, evt);
    }
  }

@Override
  public void channelRead0(ChannelHandlerContext ctx, final TextWebSocketFrame msg)
  throws Exception {
    //(3) 增加消息的引用计数,并将它写到 ChannelGroup中所有已经连接的客户端
    group.writeAndFlush(msg.retain());
  }
}

  • 当和新客户端的WebSocket 握手成功完成之后,会触发一个WebSocketServerProtocolHandler.HandshakeComplete事件,可以通过instanceof运算符进行判断

To know once a handshake was done you can intercept the ChannelInboundHandler.userEventTriggered(ChannelHandlerContext, Object) and check if the event was instance of WebSocketServerProtocolHandler.HandshakeComplete, the event will contain extra information about the handshake such as the request and selected subprotocol.

  • 握手成功事件触发后,会把通知消息写到 ChannelGroup 中的所有 Channel 来通知所有已经连接的客户端,然后它将把这个新Channel加入到该ChannelGroup中
  • ChannelGroup可以根据我们的具体需求添加相应的Channel,一个Channel可以添加到多个ChannelGroup,当Channel关闭时,会自动从ChannelGroup移除

A thread-safe Set that contains open Channels and provides various bulk operations on them. Using ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state basis.) A closed Channel is automatically removed from the collection, so that you don‘t need to worry about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.

  • 如果接收到了 TextWebSocketFrame 消息 ,TextWebSocketFrameHandler 将调用 TextWebSocketFrame 消息上的 retain()方法,并使用 writeAndFlush()方法来将它传输给ChannelGroup,以便所有已经连接的 WebSocket Channel都将接收到它,由此实现消息的广播

初始化ChannelPipline

将所有需要的ChannelHandler添加到ChannelPipeline

public class ChatServerInitializer extends ChannelInitializer<Channel> {
  private final ChannelGroup group;

  public ChatServerInitializer(ChannelGroup group) {
    this.group = group;
  }

@Override
  //将所有需要的ChannelHandler 添加到 ChannelPipeline 中
  protected void initChannel(Channel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new ChunkedWriteHandler());
    pipeline.addLast(new HttpObjectAggregator(64 * 1024));
    pipeline.addLast(new HttpRequestHandler("/ws"));
    pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
    pipeline.addLast(new TextWebSocketFrameHandler(group));
  }
}

各个ChannelHandler的作用如下

这里有一个我们从未接触过的ChannelHandler —— WebSocketServerProtocolHandler,它能够帮我们处理如"升级握手",以及Close、Ping、Pong三种控制帧等繁重的工作,Text和Binary两种数据帧会被发送到下一个ChannelHandler,能够方便我们将工作重点落在实际的数据处理上

This handler does all the heavy lifting for you to run a websocket server. It takes care of websocket handshaking as well as processing of control frames (Close, Ping, Pong). Text and Binary data frames are passed to the next handler in the pipeline (implemented by you) for processing.

WebSocket 协议升级之前的 ChannelPipeline 的状态如图所示。这代表了刚刚被 ChatServerInitializer初始化之后的ChannelPipeline

当 WebSocket 协议升级完成之后,WebSocketServerProtocolHandler 将会把 Http- RequestDecoder 替换为 WebSocketFrameDecoder,把 HttpResponseEncoder 替换为 WebSocketFrameEncoder。为了性能最大化,我们移除了不再被 WebSocket 连接所需要的 HttpRequestHandler

引导

将各组件组合到一起

public class ChatServer {
  //创建 DefaultChannelGroup,其将保存所有已经连接的 WebSocket Channel
  private final ChannelGroup
channelGroup =
      new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
  private final EventLoopGroup bossGroup = new NioEventLoopGroup();
  private final EventLoopGroup workGroup = new NioEventLoopGroup();
  private Channel channel;

  public ChannelFuture start(InetSocketAddress address) {
    //引导服务器
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(createInitializer(channelGroup));
    ChannelFuture future = bootstrap.bind(address);
    future.syncUninterruptibly();
    channel = future.channel();
    return future;
  }

  //创建 ChatServerInitializer
  protected
ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
    return new ChatServerInitializer(group);
  }

  //处理服务器关闭,并释放所有的资源
  public void destroy() {
    if (channel != null) {
      channel.close();
    }
    channelGroup.close();
    bossGroup.shutdownGracefully();
  }

  public static void main(String[] args) throws Exception {
    if (args.length != 1) {
      System.err.println("Please give port as argument");
      System.exit(1);
    }
    int port = Integer.parseInt(args[0]);
    final ChatServer endpoint = new ChatServer();
    ChannelFuture future = endpoint.start(new InetSocketAddress(port));
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        endpoint.destroy();
      }
    });
    future.channel().closeFuture().syncUninterruptibly();
  }
}

运行结果

当请求的URI不是以/ws结尾时,返回index.html页面内容,可见页面内容的长度为3985字节

下面是聊天功能的展示

WebSocket的加密

我们需要将SslHandler添加到ChannelPipeline的首部

public class SecureChatServerInitializer extends ChatServerInitializer {
  private final SslContext context;

  public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
    super(group);
    this.context = context;
  }

@Override
  protected void initChannel(Channel ch) throws Exception {
    //调用父类的 initChannel() 方法
    super.initChannel(ch);
    SSLEngine engine = context.newEngine(ch.alloc());
    engine.setUseClientMode(false);
    // SslHandler 添加到 ChannelPipeline 中
    ch.pipeline().addFirst(new SslHandler(engine));
  }
}

"引导"处的代码也要做相应调整

public class SecureChatServer extends ChatServer {
  private final SslContext context;

  public SecureChatServer(SslContext context) {
    this.context = context;
  }

@Override
  protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
    //返回之前创建的 SecureChatServerInitializer 以启用加密
    return new SecureChatServerInitializer(group, context);
  }

  public static void main(String[] args) throws Exception {
    if (args.length != 1) {
      System.err.println("Please give port as argument");
      System.exit(1);
    }
    int port = Integer.parseInt(args[0]);

SelfSignedCertificate cert = new SelfSignedCertificate();
    SslContext context = SslContextBuilder.forServer(
    cert.certificate(), cert.privateKey()).build();

    final SecureChatServer endpoint = new SecureChatServer(context);
    ChannelFuture future = endpoint.start(new InetSocketAddress(port));
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        endpoint.destroy();
      }
    });
    future.channel().closeFuture().syncUninterruptibly();
  }
}

注意要用HTTPS连接进行测试

原文地址:https://www.cnblogs.com/kuluo/p/12687942.html

时间: 2024-11-09 16:16:04

Netty学习摘记 —— 简单WEB聊天室开发的相关文章

WEB聊天室开发心得体会

花了5天时间做了一个WEB版聊天室程序,前端使用div+css模仿微信的一个界面,后端使用nodejs写服务器,采用websocket协议进行通信. 开发过程中因为不很了解websocket,所以查看了一些demo.具体说一下对websocket的一些认识. websocket协议是基于TCP协议而产生了,解决了http协议只能由客户端先发送信息的一些局限性.以前解决这个问题需要采用问询机制,客户端每隔一段时间就看有没有服务器端发送的数据,这种做法非常耗费客户端也就是浏览器的性能.websock

ASP.NET SignalR 与 LayIM2.0 配合轻松实现Web聊天室(一) 之 基层数据搭建,让数据活起来(数据获取)

大家好,本篇是接上一篇 ASP.NET SignalR 与 LayIM2.0 配合轻松实现Web聊天室(零) 前言  ASP.NET SignalR WebIM系列第二篇.本篇会带领大家将 LayIM界面中的数据动态化.当然还不涉及即时消息通讯,如果你已经搞定了数据界面,那么本文您可以简单的看一下,或者略过. 进入正题,layim帮我们定义好了数据规则,我们只要写一个接口实现那个json规范就可以了,剩下的事情就交给layim去做,看一下json格式.(对应文件夹:demo/json/getLi

ASP.NET SignalR 与 LayIM2.0 配合轻松实现Web聊天室(十四)之漏掉的客服消息

前言 不知不觉已经十四篇了,其实已经没有什么可写了.但是突然发现layim中带的客服功能没有用到.于是乎,抽点时间完成吧.其实之前的工作已经把客服功能完成了一大半,剩下的我们稍微调整即可.今天的演示我们放在后边,直接进入讲解. 客服思路讲解 大家去一些公司网站都会发现,网页侧面或者自动弹出一些客服聊天框,人家很热情的和你交谈.我们也可以用layim来实现.首先,页面添加一个按钮,点击按钮触发客服模式. <a onclick="javascript:global.other.kefu(148

ASP.NET SignalR 与 LayIM2.0 配合轻松实现Web聊天室(七) 之 历史记录查询(时间,关键字,图片,文件),关键字高亮显示。

前言 上一篇讲解了如何自定义右键菜单,都是前端的内容,本篇内容就一个:查询.聊天历史纪录查询,在之前介绍查找好友的那篇博客里已经提到过 Elasticsearch,今天它又要上场了.对于Elasticsearch不感冒的同学呢,本篇可以不用看啦. from baidu: ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,

ASP.NET SignalR 与 LayIM2.0 配合轻松实现Web聊天室 实战系列(不断更新中)

项目简介 利用ASP.NET SignalR技术与Layim前端im框架实现的一个简单的web聊天室,包括单聊,群聊,加好友,加群,好友搜索,管理,群组管理,好友权限设置等功能.涉及技术: ElasticSearch 搜索,支持各种条件搜索,效率高,速度快,稳准狠. Redis缓存,统计在线好友,登录token等 RabbitMQ消息队列,发送消息通过队列降低数据库访问压力,或者延迟执行任务. ASP.NET MVC,UI架构. 以及普通的三层架构等.CRUD 当然其中的这些技术也是纯粹为了使用

独家分享Asp聊天室开发背景

Asp聊天室开发背景其实可以分为4个方面来为大家讲述: 一.添加Global.asa文件里面的代码.这部分代码主要处理Application_onStart事件,在此事件中,定义了一个有15个元素的数据,并把它赋给了一个Application对象的属性.Global.asa文件的内容如下. <SCRIPT LANGUAGE="VBScript" RUNAT="Server"> SUB Application_OnStart dim maChats(15)

ASP.NET SignalR 与 LayIM2.0 配合轻松实现Web聊天室(十一) 代码重构使用反射工厂解耦

前言 自从此博客发表以及代码开源以来,得到了许多人的关注.也没许多吧,反正在我意料之外的.包括几位大牛帮我做订阅号推广,真的很感谢他们.另外,还有几个高手给我提了一些架构上的问题.其实本身这个项目是没有做什么架构设计的.只是简单分了分层.不过我在经过仔细思考之后决定对项目架构做些调整,当然在我的技术范围之内,我相信还会有第二次,第三次甚至更多重构,我希望把他变得更加完美. 重构思路 对于重构思路,我首先想到的是,让程序能够支持多种数据库,比如我现在用的是SQLServer,而好多朋友用MySQL

基于Node的Web聊天室

1 项目名称 Web聊天室(<这是NodeJs实战>第二章的一个案例,把整个开发过程记录下来) 2 项目描述 该项目是一个简单的在线聊天程序.打开聊天页面,程序自动给用户分配一个昵称,进入默认的Lobby聊天室.用户可以发送消息,也可以使用聊天命令(聊天命令以/开头)修改自己的昵称或者加入已有的聊天室(聊天室不存在时,创建新的聊天室).在加入或创建聊天室时,新聊天室的名称会出现在聊天程序顶端的水平条上,也会出现在聊天消息区域右侧的可用房间列表中.在用户换到新房间后,系统会显示信息以确认这一变化

ASP.NET SignalR 与 LayIM2.0 配合轻松实现Web聊天室(零) 前言

前端时间听一个技术朋友说 LayIM 2.0 发布了,听到这个消息抓紧去官网看了一下.(http://layim.layui.com/)哎呀呀,还要购买授权[大家支持一下哦],果断买了企业版,喜欢钻研的我没有源码怎么行,说来也惭愧,发布好久了我才知道.之前写过一系列的博客,当时是ASP.NET SignalR 结合 LayIM 1.0 的一个小程序.看了一下最新版本的LayIM,太赞了.我电脑里的VS已经蠢蠢欲动了.话不多说,先预览一下效果. 主聊天界面: 好友列表界面:        以及自定