玩转Netty – 从Netty3升级到Netty4

这篇文章主要和大家分享一下,在我们基础软件升级过程中遇到的经典Netty问题。当然,官方资料也许是一个更好的补充。另外,大家如果对Netty及其Grizzly架构以及源码有疑问的,欢迎交流。后续会为大家奉献我们基于Grizzly和Netty构建的RPC框架的压测分析,希望大家能够喜欢!

  好了,言归正传~

依赖

  Netty小组大概从3.3.0开始,将依赖坐标从

<dependency>
    <groupId>org.jboss.netty</groupId>
    <artifactId>netty</artifactId>
    <version>3.2.10.Final</version>
</dependency>

  改成了(Netty作者离开了Jboss公司)

<dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty</artifactId>
                <version>3.3.0.Final</version>
        </dependency>

  这样,将其替换为Netty4,只需要替换一下版本就ok了,如替换成最新稳定版本:

  

<dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.0.23.Final</version>
          </dependency>

  但请注意,从4开始,Netty团队做了模块依赖的优化,像Grizzly一样,分离出很多功能独立的Package。比方说,你希望使用Netty的buffer组件,只需简单依赖这个包就好了。还是让我们来看下netty-all里面都有哪些依赖吧,如:

 

   <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-buffer</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
          </dependency>
          <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-codec</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
           </dependency>
           <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-codec-http</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-codec-socks</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-common</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-handler</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-transport</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-transport-rxtx</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-transport-sctp</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>netty-transport-udt</artifactId>
                <version>${project.version}</version>
                <scope>compile</scope>
                <optional>true</optional>
            </dependency>

  每个包都代表什么呢?描述如下:

Artifact ID  Description
netty-parent  Maven parent POM
netty-common Utility classes and logging facade
netty-buffer ByteBuf API that replaces java.nio.ByteBuffer
netty-transport  Channel API and core transports
netty-transport-rxtx Rxtx transport
netty-transport-sctp  SCTP transport
netty-transport-udt UDT transport
netty-handler Useful ChannelHandler implementations
netty-codec Codec framework that helps write an encoder and a decoder
netty-codec-http Codecs related with HTTP, Web Sockets, SPDY, and RTSP
netty-codec-socks Codecs related with SOCKS protocol
netty-all All-in-one JAR that combines all artifacts above
netty-tarball  Tarball distribution
netty-example Examples
netty-testsuite-* A collection of integration tests
netty-microbench Microbenchmarks

  通过依赖分析,最终我选择了精简依赖,如下:

 <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-handler</artifactId>
            <version>4.0.23.Final</version>
        </dependency>
    </dependencies>

为什么?因为 netty-handler依赖了 netty-codec, netty-transport, netty-buffer等,所以我的依赖最终可以瘦身到只依赖这个包。顺便说一下,在版本4中,针对Linux平台做了AIO的优化实现,如:

 <dependency>
        <groupId>${project.groupId}</groupId>
        <artifactId>netty-transport-native-epoll</artifactId>
        <version>${project.version}</version>
        <classifier>${os.detected.classifier}</classifier>
        <scope>compile</scope>
        <optional>true</optional>
    </dependency>

更多的细节,可以参看这里

顺便说一句,Netty3和Netty4是可以共存的,其根本原因在于Netty小组为3和4分别设计了不同的基础package名(org.jboss.netty与io.netty)。就像我的工程,服务发现依赖了Curator,而它依赖了ZK,依赖了Netty3,而我的RPC部分仅仅依赖Netty4。

线程模型

  Netty3只保证 upstream事件在IO线程里执行,但是所有的downstream事件会被调用线程处理,它可能是IO线程,也可能是用户自定义线程,这就带来了一个问题,用户需要小心地处理同步操作。除此之外,还会面临线程上下文切换的风险,设想一下,你在write的时候遇到了异常,转而触发exceptionCaught,但这是一个upstream事件,怎么办?

  Netty4的线程模型则不存在此类问题,因为所有的操作都被保证在同一个EventLoop里的同一个Thread完成。也就是说Netty4不存在并发访问 ChannelHandler,当然这个前提是你没有给该handler打上Sharable注解。同时它也能保证 happens-before关系,所以你也没必要在 ChannelHandler声明volatile field。

  用户可以指定自己的 EventExecutor来执行特定的 handler。通常情况下,这种EventExecutor是单线程的,当然,如果你指定了多线程的 EventExecutor或者 EventLoop,线程sticky特性会保证,除非出现 deregistration,否则其中的一个线程将一直占用。如果两个handler分别注册了不同的EventExecutor,这时就要注意线程安全问题了。

  Netty4的线程模型还是有很多可以优化的地方,比方说目前Eventloop对channel的处理不均等问题,而这些问题都会在Netty 5里面优化掉,感兴趣的朋友可以参看官方Issues

  

Channel状态模型

  先来看两幅图,第一幅图是Netty3的Channel状态模型,第二附图是Netty4优化过的模型。可以看到,channelOpen,channelBound,和channelConnected 已经被channelActive替代。channelDisconnected,channelUnbound和channelClosed 也被 channelInactive替代。

Netty 3

Netty 4

  这里就产生了两个问题:

其一,channelRegistered and channelUnregistered 不等价于 channelOpen and channelClosed,它是Netty4新引入的状态为了实现Channel的dynamic registration, deregistration, and re-registration。

第二, 既然是合并,那原先针对channelOpen的方法如何迁移?简单来做,可以直接迁移到替代方法里面。

Codec和Handler

1. ChannelPipelineFactory ---->  ChannelInitializer

这里需要注意的是,ChannelPipeline的创建方式发生了变化,原先是这么玩的,

ChannelPipeline cp = Channels.pipeline();

现在得这么玩

ChannelPipeline cp = ch.pipeline();

用Netty小组的话来说就是:

“Please note that you don‘t create a new ChannelPipeline by yourself. After observing many use cases reported so far, the Netty project team concluded that it has no benefit for a user to create his or her own pipeline implementation
or to extend the default implementation. Therefore, ChannelPipeline is not created by a user anymore. ChannelPipeline is automatically created by a Channel.”

 

  2. SimpleChannelHandler ----> ChannelDuplexHandler

之前是这么玩的

 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception
    {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent cse = (ChannelStateEvent) e;
            switch (cse.getState()) {
                case OPEN:
                    if (Boolean.TRUE.equals(cse.getValue())) {
                        // connect
                        channelCount.incrementAndGet();
                        allChannels.add(e.getChannel());
                    }
                    else {
                        // disconnect
                        channelCount.decrementAndGet();
                        allChannels.remove(e.getChannel());
                    }
                    break;
                case BOUND:
                    break;
            }
        }

        if (e instanceof UpstreamMessageEvent) {
            UpstreamMessageEvent ume = (UpstreamMessageEvent) e;
            if (ume.getMessage() instanceof ChannelBuffer) {
                ChannelBuffer cb = (ChannelBuffer) ume.getMessage();
                int readableBytes = cb.readableBytes();
                //  compute stats here, bytes read from remote
                bytesRead.getAndAdd(readableBytes);
            }
        }
        ctx.sendUpstream(e);
    }

    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
            throws Exception
    {
        if (e instanceof DownstreamMessageEvent) {
            DownstreamMessageEvent dme = (DownstreamMessageEvent) e;
            if (dme.getMessage() instanceof ChannelBuffer) {
                ChannelBuffer cb = (ChannelBuffer) dme.getMessage();
                int readableBytes = cb.readableBytes();
                // compute stats here, bytes written to remote
                bytesWritten.getAndAdd(readableBytes);
            }
        }
        ctx.sendDownstream(e);
    }

改成ChannelDuplexHandler之后,我只需要重写read和write方法,来完成同样的功能。

其它

1. 通过下面的代码来完成Channel的限流

 ctx.channel().setReadable(false);//Before
            ctx.channel().config().setAutoRead(false);//After

2.  TCP参数优化

 

// Before:
cfg.setOption("tcpNoDelay", true);
   cfg.setOption("tcpNoDelay", 0);  // Runtime ClassCastException
   cfg.setOption("tcpNoDelays", true); // Typo in the option name - ignored silently

   // After:
   cfg.setOption(ChannelOption.TCP_NODELAY, true);
   cfg.setOption(ChannelOption.TCP_NODELAY, 0); // Compile error

3. 单元测试经常用到的CodecEmbedder类已经变名为EmbeddedChannel

@Test
      public void testMultipleLinesStrippedDelimiters() {
          EmbeddedChannel ch = new EmbeddedChannel(new DelimiterBasedFrameDecoder(8192, true,
                Delimiters.lineDelimiter()));
          ch.writeInbound(Unpooled.copiedBuffer("TestLine\r\ng\r\n", Charset.defaultCharset()));
          assertEquals("TestLine", releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
          assertEquals("g", releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
          assertNull(ch.readInbound());
          ch.finish();
      }

4. 简化的关闭操作,以前我是这么玩stop的

 if (serverChannel != null) {
            log.info("stopping transport {}:{}",getName(), port);
            // first stop accepting
            final CountDownLatch latch = new CountDownLatch(1);
            serverChannel.close().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // stop and process remaining in-flight invocations
                    if (def.getExecutor() instanceof ExecutorService) {
                        ExecutorService exe = (ExecutorService) getExecutor();
                        ShutdownUtil.shutdownExecutor(exe, "dispatcher");
                    }
                    latch.countDown();
                }
            });
            latch.await();
            serverChannel = null;
        }

        // If the channelFactory was created by us, we should also clean it up. If the
        // channelFactory was passed in by Bootstrap, then it may be shared so don't clean  it up.
        if (channelFactory != null) {
            ShutdownUtil.shutdownChannelFactory(channelFactory, bossExecutor, ioWorkerExecutor,allChannels);
            }
}

现在我得这么玩

 public void stop() throws InterruptedException {
        // Wait until the server socket is closed.
        channelFuture.channel().closeFuture().syncUninterruptibly();

        bossGroup.shutdownGracefully().syncUninterruptibly();
        workerGroup.shutdownGracefully().syncUninterruptibly();
    }

5. 编解码命名改变

FrameDecoder ----> ByteToMessageDecoder

OneToOneEncoder  ----> MessageToMessageEncoder

OneToOneDecoder ----> MessageToMessageDecoder

6. 心跳逻辑优化,之前我是这么玩的

 cp.addLast("idleTimeoutHandler", new IdleStateHandler(getTimer(),
                                                                          getClientIdleTimeout().toMillis(),
                                                                          NO_WRITER_IDLE_TIMEOUT,
                                                                          NO_ALL_IDLE_TIMEOUT,
                                                                          TimeUnit.MILLISECONDS));
  cp.addLast("heartbeatHandler", new HeartbeatHandler());

其中HeartbeatHandler 继承了IdleStateAwareChannelHandler。在Netty4里,IdleStateAwareChannelHandler已经去除,但 IdleStateHandler类还存在,所以我会这么玩

   cp.addLast("idleTimeoutHandler", new IdleStateHandler(
                                NO_WRITER_IDLE_TIMEOUT, NO_WRITER_IDLE_TIMEOUT,
                                NO_ALL_IDLE_TIMEOUT, TimeUnit.MILLISECONDS));

  cp.addLast("heartbeatHandler", new HeartbeatHandler());

其中,HeartbeatHandler 继承了ChannelInboundHandlerAdapter。具体的实现逻辑这里就不贴出来了。再啰嗦几句,很多同学喜欢自己启线程去做心跳逻辑,根据经验,这里是不推荐这种方式的。利用Netty的链路空闲检测机制可以很好的完成这个功能,能更好地配合Netty线程模型和异常捕获机制。自己定制,处理不好,会带来很大的线上隐患。

小结

这篇文章简单记录了升级过程中遇到的一些比较higher的话题,配上代码,希望能更好的重现整个升级思路和过程,也希望能给大家带来帮助。如果你在升级过程中遇到了问题,欢迎留言交流。最后,祝玩的开心~

参考文档

1. http://www.infoq.com/news/2013/11/netty4-twitter

2. http://netty.io/wiki/all-documents.html

3. http://netty.io/wiki/index.html

时间: 2024-10-22 05:26:56

玩转Netty – 从Netty3升级到Netty4的相关文章

Netty版本升级血泪史之线程篇

1. 背景 1.1. Netty 3.X系列版本现状 根据对Netty社区部分用户的调查,结合Netty在其它开源项目中的使用情况,我们可以看出目前Netty商用的主流版本集中在3.X和4.X上,其中以Netty 3.X系列版本使用最为广泛. Netty社区非常活跃,3.X系列版本从2011年2月7日发布的netty-3.2.4 Final版本到2014年12月17日发布的netty-3.10.0 Final版本,版本跨度达3年多,期间共推出了61个Final版本. 1.2. 升级还是坚守老版本

新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析(阿里)

1.引言 Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端. 本文基于 Netty 4.1 展开介绍相关理论模型,使用场景,基本组件.整体架构,知其然且知其所以然,希望给大家在实际开发实践.学习开源项目方面提供参考. 本文作者的另两篇<高性能网络编程(五):一文读懂高性能网络编程中的I/O模型>.<高性能网络编程(六):一文读懂高性能网络编程中的线程模型>也写的很好,有兴趣的读者可以一并看看. 关于作者: 陈彩华(

netty总结

一. netty是什么 java有提供的操作网络的接口有两种IO和NIO,JDK7还有NIO2,NIO就是非阻塞的IO,NIO是一个线程对应多个客户端,IO则是一个SOCKET对一个线程,两者的模型图如下 Netty则是一个封装了io和nio的高性能高可用的网络框架,通过netty简化又丰富了原生的NIO操作,并且避免了NIO的原生缺陷. 二. netty设计思想 一个简单的nettydemo最少需要配置以下这些东西:Bootstrap ,然后在Bootstrap中分别指定EventLoopGr

Netty简单应用与线上服务器部署_netty视频

Netty简单应用与线上服务器部署 课程学习地址:http://www.xuetuwuyou.com/course/198 课程出自学途无忧网:http://www.xuetuwuyou.com 一.开发环境 4.1.11.Final   jdk1.8 maven 3.2 Spring 4.3.9 二.适合人群 ①想深入学习java ClassLoader ②想在线上linux服务器上运行netty或Springboot服务 三.课程目标 ①掌控ClassLoader ②学会编写shell脚本

【转】Netty那点事(三)Channel中的Pipeline

[原文]https://github.com/code4craft/netty-learning/blob/master/posts/ch3-pipeline.md Channel是理解和使用Netty的核心.Channel的涉及内容较多,这里我使用由浅入深的介绍方法.在这篇文章中,我们主要介绍Channel部分中Pipeline实现机制.为了避免枯燥,借用一下<盗梦空间>的“梦境”概念,希望大家喜欢. 一层梦境:Channel实现概览 在Netty里,Channel是通讯的载体,而Chann

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

细数那些年我们一起玩过的Unity3D游戏(unity开发的游戏有哪些)

经典重现<新仙剑OL> <新仙剑OL>采用跨平台Unity3D引擎,耗资数千万,历时三年多,由台湾大宇正版授权,"仙剑之父"姚壮宪监制的全球首款Unity3D航母级双端(网页和客户端)中国风MMORPG网络游戏巨作.主打温情牌并且延续了仙剑系列的国风雅韵,人物塑造细腻唯美,场景构建精致逼真. <蒸汽之城>(City of Steam) 由国内游戏公司参与开发的Unity3D页游<蒸汽之城>(City of Steam)在北美地区呼声颇高,

填坑netty io.netty.util.internal.OutOfDirectMemoryError

我们有个与外部交互的接口是采用netty http,具体版本netty-4.1.18,为什么使用这个版本,我也不知道,历史原因. 由于netty都是异步请求,所以与外部交互总有些唯一的业务标识需要保存,以便前后数据可以勾兑. 这里先说明下,netty里的ByteBuf在读取channelRead未进行写write操作时,需要自己释放release.这和本次Error关系不大,继续说重点. 查看日志,首先发现了OutOfDirectMemoryError错误,这个错误也是间断性的出现,显然是内存不

一文读懂高性能网络编程中的I/O模型

1.前言 随着互联网的发展,面对海量用户高并发业务,传统的阻塞式的服务端架构模式已经无能为力.本文(和下篇<高性能网络编程(六):一文读懂高性能网络编程中的线程模型>)旨在为大家提供有用的高性能网络编程的I/O模型概览以及网络服务进程模型的比较,以揭开设计和实现高性能网络架构的神秘面纱. 限于篇幅原因,请将本文与<高性能网络编程(六):一文读懂高性能网络编程中的线程模型>连起来读,这样会让知识更连贯. 学习交流: - 即时通讯开发交流3群:185926912[推荐] - 移动端IM