Netty实战十二之WebSocket

如果你有跟进Web技术的最新进展,你很可能就遇到过“实时Web”这个短语,这里并不是指所谓的硬实时服务质量(QoS),硬实时服务质量是保证计算结果将在指定的时间间隔内被递交。仅HTTP的请求/响应模式设计就使得其很难被支持。

实时Web利用技术和实践,使用户在信息的作者发布信息之后就能够立即收到信息,而不需要他们或者他们的软件周期性地检查信息源以及获取更新。

1、WebSocket简介

WebSocket协议是完全重新设计的协议,旨在为Web上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间可以在任意时刻传输信息,因此,这也就要求他们异步地处理消息回执(作为HTML5客户端API的一部分,大部分最新的浏览器都已经支持了WebSocket)

Netty对于WebSocket的支持包含了所有正在使用中的主要实现,因此在你的下一个应用程序中采用它将是简单直接的。和往常使用Netty一样,你可以完全使用该协议,而无需关心它内部的实现细节,我们将通过创建一个基于WbeSocket的实时聊天应用程序来演示。

2、WebSocket示例应用程序

为了让示例应用程序展示它的实时功能,我们将通过使用WebSocket协议来实现一个基于浏览器的聊天应用程序,就像你可能在FaceBook的文本消息功能中见到过的那样。我们将通过使用多个用户之间可以同时进行相互通信,从而更进一步。

下图说明应用逻辑:

——客户端发送一个消息

——该消息将被广播到所有其他链接的客户端 这正如你可能会预期的一个聊天室应当的工作方式:所有的人都可以和其他的人聊天。在示例中,我们将只实现服务器端,而客户端则是通过Web页面访问该聊天室的浏览器。正如同你将在接下来的几页中所看到的,WebSocket简化了编写这样的服务器的过程。

3、添加WebSocket支持

在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;他可能会发生在启动时,也可能会发生在请求了某个特定的URL之后。

我们的应用程序将采用下面的约定:如果被请求的URL以/ws结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP/S。在连接已经升级完成之后,所有数据都将会使用WebSocket进行传输。下图说明了该服务器逻辑,一如在Netty中一样,它由一组ChannelHandler实现。 

4、处理HTTP请求

首先,我们将实现该处理HTTP请求的组件。这个组件将提供用于访问聊天室并显示由连接的客户端发送的消息的网页。如下代码给出了这个HttpRequestHandler对应的代码,其扩展了SimpleChannelInboundHandler以处理FullHttpRequest消息。需要注意是,channelRead0()方法的实现是如何转发任何目标URI为/ws的请求的。

//扩展SimpleChannelInboundHandler以处理FullHttpReuqest消息
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest>{
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain()
                .getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        }catch (URISyntaxException e){
            throw new IllegalStateException("Unable to locate index.html",e);
        }
    }

    public HttpRequestHandler(String wsUri){
        this.wsUri = wsUri;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                                FullHttpRequest request) throws Exception {
        //如果请求了WebSocket协议升级,则增加引用技术,并将它传递给下一个ChannelInboundHandler
        if (wsUri.equalsIgnoreCase(request.getUri())){
            ctx.fireChannelRead(request.retain());
        } else {
            //处理100Continue请求以符合HTTP1.1规范
            if (HttpHeaders.is100ContinueExpected(request)){
                send100Continue(ctx);
            }
            //读取“index.html”
            RandomAccessFile file = new RandomAccessFile(INDEX,"r");
            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(),HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE,"text/html;charset=UTF-8");
            boolean keepAlive = HttpHeaders.isKeepAlive(request);
            //如果请求了keep-alive,则添加所需要的HTTP头信息
            if (keepAlive){
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION,HttpHeaders.Values.KEEP_ALIVE);
            }
            //将HttpResponse写到客户端
            ctx.write(response);
            //将index.html写到客户端
            if (ctx.pipeline().get(SslHandler.class) == null){
                ctx.write(new DefaultFileRegion(file.getChannel(),0,file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            //写LastHttpContent并冲刷至客户端
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            if (!keepAlive){
                //如果没有请求keep-alive,则在写操作完成后关闭Channel
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx){
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

如果该HTTP请求指向了地址为/ws的URI,那么HttpRequestHandler将调用FullHttpRequest对象上的retain()方法。并通过调用fireChannelRead(msg)方法将它转发给下一个ChannelInboundHandler。之所以需要调用retain()方法,是因为调用channelRead()方法完成之后,它将调用FullHttpRequest对象上的release()方法以释放它的资源。

如果客户端发送了HTTP1.1的HTTP头信息Expect:100-continue,那么HttpRequestHandler将会发送一个100Continue响应。在该HTTP头信息被设置之后,HttpRequestHandler将会写回一个HttpResponse给客户端。这不是一个FullHttpResponse,因为它只是响应的第一部分。此外,这里也不会调用writeAndFlush()方法,在结束的时候才会调用。

如果不需要加密和压缩,那么可以通过将index.html的内容存储到DefaultFileRegion中来达到最佳效率。这将会利用零拷贝特性来进行内容的传输。为此,你可以检查一下,是否有SslHandler存在于在ChannelPipeline中。否则,你可以使用ChunkedNioFile。

HttpRequestHandler将写一个LastHttpContent来标记响应的结束。如果没有请求keep-alive,那么HttpRequestHandler将会添加一个ChannelFutureListener到最后一次写出动作的ChannelFuture,并关闭该连接。在这里,你将调用writeAndFlush()方法以冲刷所有之前写入的消息。

这部分代码代表了聊天服务器的第一个部分,它管理纯粹的HTTP请求和响应。接下来,我们将处理传输实际聊天消息的WebSocket帧。

WEBSOCKET帧:WebSocket以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

5、处理WebSocket帧

有IETF发布的WebSocket RFC,定义了6种帧,Netty为它们都提供了一个POJO实现。

BinaryWebSocketFrame——包含了二进制数据

TextWebSocketFrame——包含了文本数据

ContinuationWebSocketFrame——包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame的文本数据或者二进制数据

CloseWebSocketFrame——表示一个CLOSE请求,包含一个关闭的状态码和关闭的原因

PingWebSocketFrame——请求传输一个PongWebSocketFrame

PongWebSocketFrame——作为一个对于PingWebSocketFrame的响应被发送

TextWebSocketFrame是我们唯一真正需要处理的帧类型。为了符合WebSocket RFC,Netty提供了WebSocketServerProtocolHandler来处理其他类型的帧。

以下代码展示了我们用于处理TextWebSocketFrame的ChannelInboundHandler,其还将在它的ChannelGroup中跟踪所有活动的WebSocket连接。

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group){
        this.group = group;
    }

    //重写userEventTriggered方法以处理自定义事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE){
            //如果该事件表示握手成功,则从该ChannelPipeline中移除HttpRequestHandler,因为将不会接收到任何HTTP消息了
            ctx.pipeline().remove(HttpRequestHandler.class);
            //通知所有已经连接的WebSocket客户端新的客户端连接上了
            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
            //将新的WebSocket Channel添加到ChannelGroup中,以便它可以接收到所有的消息
            group.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx,evt);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext,
                                TextWebSocketFrame msg) throws Exception {
        //增加消息的引用计数,并将它写到ChannelGroup中所有已经连接的客户端
        group.writeAndFlush(msg.retain());
    }
}

TextWebSocketFrameHandler只有一组非常少量的责任。当和新客户端的WebSocket握手成功完成之后,它将通过把通知消息写到ChannelGroup中的所有Channel来通知所有已经连接的客户端,然后它将把这个新Channel加入到该ChannelGroup中。

如果接收到了TextWebSocketFrame消息,TextWebSocketFrameHandler将调用TextWebSocketFrame消息上的retain()方法,并使用writeAndFlush()方法来将它传输给ChannelGroup,以便所有已经连接的WebSocket Channel都将接收到它。

和之前一样,对于retain()方法的调用时必需的。因为当ChannelRead0()方法返回时,TextWebSocketFrame的引用技术将会被减少。由于所有的操作都是异步的,因此,writeAndFlush()方法可能会在channelRead0()方法返回之后完成,而且它绝对不能访问一个已经失效的引用。

因为Netty在内部处理了大部分剩下的功能,所有现在剩下唯一需要做的事情就是为每个新创建的Channel初始化其ChannelPipeline。为此,我们需要一个ChannelInitializer。

6、初始化ChannelPipeline

以下代码展示了生成的ChatServerInitializer。

public class ChatServerInitializer extends ChannelInitializer<Channel>{
    private final ChannelGroup group;

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler(group));
    }
}

对于initChannel()方法调用,通过安装所有必须的ChannelHandler来设置该新注册的Channel的ChannelPipeline。

Netty的WebSocketServerProtocolHandler处理了所有委托管理的WebSocket帧类型以及升级握手本身。如果握手成功,那么所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler则将会被移除。

WebSocket协议升级之前的ChannelPipeline的状态如下图,这代表了刚刚被ChatServerInitializer初始化之后的ChannelPipeline。 当WebSocket协议升级完成之后,WebSocketServerProtocolHandler将会把HttpRequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,它将移除任何不再被WebSocket连接所需要的ChannelHandler。这也包括上图所示的HttpObjectAggregator和HttpRequestHandler。

下图展示了这些操作完成之后的ChannelPipeline。需要注意的是,Netty目前支持4个版本的WebSocket协议,他们每个都具有自己的实现类。Netty将会根据客户端(这里指浏览器)所支持的版本,自动地选择正确版本的WebSocketFrameDecoder和WebSocketFrameEncoder。 

7、引导

这幅拼图最后的一部分是引导该服务器,并安装ChatSererInitializer的代码。这将有ChatServer类处理,如下代码所示。

public class ChatServer {
    //创建DefaultChannelGroup,其将保存所有已经连接的WebSocket Channel
    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address){
        //引导服务器
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    //创建ChatServerInitializer
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group){
        return new ChatServerInitializer(group);
    }

    //处理服务器关闭,并释放所有的资源
    public void destroy(){
        if (channel != null){
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1){
            System.out.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);
        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(
                new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

8、如何进行加密

在真实世界的场景中,你将很快就会被要求向该服务器添加加密。使用Netty,这不过是将一个SslHandler添加到ChannelPipeline中,并配置它的问题。以下代码展示了如何通过扩展我们的ChatServerInitializer来创建一个SecureChatServerInitializer以完成需求。

//扩展ChatServerInitializer以添加加密
public class SecureChatServerInitializer extends ChatServerInitializer{
    private final SslContext context;
    public SecureChatServerInitializer(ChannelGroup group,SslContext context) {
        super(group);
        this.context = context;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        //调用父类的initChannel()方法
        super.initChannel(channel);
        SSLEngine engine = context.newEngine(channel.alloc());
        engine.setUseClientMode(false);
        //将SslHandler添加到ChannelPipeline中
        channel.pipeline().addFirst(new SslHandler(engine));
    }
}

最后一步是调整ChatServer以使用SecureChatServerInitializer,以便在ChannelPipeline中安装SslHandler。

public class SecureChatServer extends ChatServer{
    private final SslContext context;

    public SecureChatServer(SslContext context) {
        this.context = context;
    }

    @Override
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        //返回之前创建的SecureChatServerInitializer以启用加密
        return new SecureChatServerInitializer(group,context);
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1){
            System.out.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);
        SelfSignedCertificate cert = new SelfSignedCertificate();
        SslContext context = SslContext.newServerContext(cert.certificate(),cert.privateKey());

        final SecureChatServer endpoint = new SecureChatServer(context);
        ChannelFuture future = endpoint.start(
                new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

这就是为所有的通信启用SSL/TLS加密需要做的全部。

原文地址:https://www.cnblogs.com/UncleCatMySelf/p/9190637.html

时间: 2024-11-05 22:55:52

Netty实战十二之WebSocket的相关文章

quick-cocos2d-x 学习系列之十二 关于websocket

quick-cocos2d-x 学习系列之十二 关于websocket 1.  概念 百度百科:WebSocket protocol 是HTML5一种新的协议.它实现了浏览器与服务器全双工通信(full-duplex). 在浏览器中通过http仅能实现单向的通信,comet可以一定程度上模拟双向通信,但效率较低,并需要服务器有较好的支持; flash中的socket和xmlsocket可以实现真正的双向通信,通过 flex ajax bridge,可以在javascript中使用这两项功能. 可

Netty in Action (二十三) 第十二章节 WebSocket

第三部分:网络协议 WebSocket是一个先进的网络协议,被开发用来用来提高网络的性能和web应用的响应率,我们将介绍Netty对WebSocket这两个特性的支持,同时我们也会举一个简单的实例来说明讲解这两个WebSocket的特性 在第十二章节中,你将学会如何使用WebSocket实现数据双向传输的功能,我们会写一个聊天室的方式讲解这个数据双向传输的问题,我们这个聊天室的实例是这样的:多个浏览器客户端可是实时的相互通信,你也会学会如何将普通的HTTP协议切换升级成WebSocket协议,当

Spark机器学习实战 (十二) - 推荐系统实战

0 相关源码 将结合前述知识进行综合实战,以达到所学即所用.在推荐系统项目中,讲解了推荐系统基本原理以及实现推荐系统的架构思路,有其他相关研发经验基础的同学可以结合以往的经验,实现自己的推荐系统. 1 推荐系统简介 1.1 什么是推荐系统 1.2 推荐系统的作用 1.2.1 帮助顾客快速定位需求,节省时间 1.2.2 大幅度提高销售量 1.3 推荐系统的技术思想 1.3.1 推荐系统是一种机器学习的工程应用 1.3.2 推荐系统基于知识发现原理 1.4 推荐系统的工业化实现 Apache Spa

应用程序框架实战十二:公共操作类开发技巧(初学者必读)

本文专门为初学者而写,因为很多初学者可能还不了解公共操作类的作用和封装技巧,大部分有经验的程序员都会把自己所碰到的技术问题整理封装成类,这就是公共操作类.公共操作类往往具有一些通用性,也可能专门解决某些棘手问题.公共操作类是应用程序框架的核心,主要目标是解决大部分技术问题.我将在本文介绍封装公共操作类的要点,供初学者参考. 开发公共操作类的原因 很多初学者会奇怪,.Net Framework提供的API相当易用,为何还要多此一举,进行一层封装呢.下面列举封装公共操作类的一些动机. .Net Fr

Node.js 切近实战(十二) 之Linux部署

之前的话我们的项目都是跑在windows上,今天我们要将我们的程序跑到linxu机器上.在看linux部署之前,我们先看一下node.js类似于asp.net mvc的过滤器或者叫拦截器.在app.js中我们加入如下代码 var beforeRequest = function (req, res, next) {     if (req.originalUrl == '/'          || req.originalUrl == '/login'          || req.orig

ASP.NET MVC4+BootStrap 实战(十二)

最近实在是太忙,没时间写博客,只能夜里等孩子睡着了再写.感觉最近一个月又是在浪费时间,心里万分焦急.感觉自己的技术还不行,但是却没有时间去加强.吉日嘎啦的<程序员你伤不起>一书中讲到要孩子要晚了的坏处,比如自己30岁要的孩子,自己60岁了,孩子都30岁了,可能谁也照顾不上谁.其实我想说的是生早了也同样有坏处,比如现在只能晚上12点后写博客写程序. 好了,不多说了,来点高兴的,第一次也是第一个获得组内季度之星奖杯的我,终于觉得自己这三个月来当Master并且承担Coding任务的艰辛得到了很好的

Hyperledger Fabric 实战(十二): Fabric 源码本地调试

借助开发网络调试 fabric 源码本地调试 准备工作 IDE Goland Go 1.9.7 fabric-samples 模块 chaincode-docker-devmode fabric 源码 步骤 添加本地域名 127.0.0.1 peer 127.0.0.1 orderer 用 ide 打开 $GOPATH 下的fabric源码目录 在源码目录下添加 dev-network 把 sampleconfig 下的所有文件复制到 dev-network 修改 core.yaml 中 fil

(转载)Android项目实战(三十二):圆角对话框Dialog

Android项目实战(三十二):圆角对话框Dialog 前言: 项目中多处用到对话框,用系统对话框太难看,就自己写一个自定义对话框. 对话框包括:1.圆角 2.app图标 , 提示文本,关闭对话框的"确定"按钮 难点:1.对话框边框圆角显示 2.考虑到提示文本字数不确定,在不影响美观的情况下,需要在一行内显示提示的文字信息   3.设置对话框的宽和高 技术储备: 1.安卓开发_使用AlertDialog实现对话框    知道AlertDialog有setView(view) ,Dia

Android项目实战(十二):解决OOM的一种偷懒又有效的办法

原文:Android项目实战(十二):解决OOM的一种偷懒又有效的办法 在程序的manifest文件的application节点加入android:largeHeap=“true” 即可. 对,只需要一句话! 那么这行代码的意思是什么呢? 简单的说就是使该APP获取最大可分配的内存,以便解决OOM问题. 但是.OOM问题出现的原因总得来说有两点: 1.某个手机的内存真的很少 2.代码问题,比如没有处理好Bitmap图片的大小 可以说,出现OOM的情况基本都是第二种情况,那么就需要修改代码,看看哪