从netty-example分析Netty组件续

上文我们从netty-example的Discard服务器端示例分析了netty的组件,今天我们从另一个简单的示例Echo客户端分析一下上个示例中没有出现的netty组件。

1. 服务端的连接处理,读写处理

echo客户端代码:

/**
 * Sends one message when a connection is open and echoes back any received
 * data to the server.  Simply put, the echo client initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

从上面的代码可以看出,discard的服务端代码和echo的客户端代码基本相似,不同的是一个使用ServerBootStrap,另一个使用BootStrap而已。先看一下连接过程

NioEventLoop处理key的过程,

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

2.1 连接流程

调用AbstractNioByteChannel的finishConnect()方法

        @Override
        public final void finishConnect() {
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();

            try {
                boolean wasActive = isActive();
                doFinishConnect();
                fulfillConnectPromise(connectPromise, wasActive);
            } catch (Throwable t) {
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
            } finally {
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See https://github.com/netty/netty/issues/1770
                if (connectTimeoutFuture != null) {
                    connectTimeoutFuture.cancel(false);
                }
                connectPromise = null;
            }
        }

触发channelActive操作:

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            if (!wasActive && isActive()) {
                pipeline().fireChannelActive();
            }

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }

2.2 读操作流程

调用AbstractNioByteChannel的read()方法,

  典型的autoRead流程如下:

  1. 当socket建立连接时,Netty触发一个inbound事件channelActive,然后提交一个read()请求给本身(参考DefaultChannelPipeline.fireChannelActive())

  2. 接收到read()请求后,Netty从socket读取消息。

  3. 当读取到消息时,Netty触发channelRead()。

  4. 当读取不到消息后,Netty触发ChannelReadCompleted().

  5. Netty提交另外一个read()请求来继续从socket中读取消息。

@Override
        public final void read() {
            final ChannelConfig config = config();
            if (!config.isAutoRead() && !isReadPending()) {
                // ChannelConfig.setAutoRead(false) was called in the meantime
                removeReadOp();
                return;
            }

            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            try {
                boolean needReadPendingReset = true;
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    if (needReadPendingReset) {
                        needReadPendingReset = false;
                        setReadPending(false);
                    }
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (allocHandle.lastBytesRead() < 0) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!config.isAutoRead() && !isReadPending()) {
                    removeReadOp();
                }
            }
        }
    }

触发读操作

    @Override
    public ChannelHandlerContext fireChannelRead(Object msg) {
        AbstractChannelHandlerContext next = findContextInbound();
        next.invoker().invokeChannelRead(next, pipeline.touch(msg, next));
        return this;
    }

读完触发完成事件

    @Override
    public ChannelPipeline fireChannelReadComplete() {
        head.fireChannelReadComplete();
        if (channel.config().isAutoRead()) {
            read();
        }
        return this;
    }

   @Override
    public ChannelHandlerContext fireChannelReadComplete() {
        AbstractChannelHandlerContext next = findContextInbound();
        next.invoker().invokeChannelReadComplete(next);
        return this;
    }

2.3 写操作流程

写操作

 @SuppressWarnings("deprecation")
        protected void flush0() {
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            inFlush0 = true;

            // Mark all pending write requests as failure if the channel is inactive.
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    /**
                     * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                     * failing all flushed messages and also ensure the actual close of the underlying transport
                     * will happen before the promises are notified.
                     *
                     * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                     * may still return {@code true} even if the channel should be closed as result of the exception.
                     */
                    close(voidPromise(), t, false);
                } else {
                    outboundBuffer.failFlushed(t, true);
                }
            } finally {
                inFlush0 = false;
            }
        }

写操作具体实现(以NioSocketChannel为例):

 @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        for (;;) {
            int size = in.size();
            if (size == 0) {
                // All written so clear OP_WRITE
                clearOpWrite();
                break;
            }
            long writtenBytes = 0;
            boolean done = false;
            boolean setOpWrite = false;

            // Ensure the pending writes are made of ByteBufs only.
            ByteBuffer[] nioBuffers = in.nioBuffers();
            int nioBufferCnt = in.nioBufferCount();
            long expectedWrittenBytes = in.nioBufferSize();
            SocketChannel ch = javaChannel();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                    // We have something else beside ByteBuffers to write so fallback to normal writes.
                    super.doWrite(in);
                    return;
                case 1:
                    // Only one ByteBuf so use non-gathering write
                    ByteBuffer nioBuffer = nioBuffers[0];
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final int localWrittenBytes = ch.write(nioBuffer);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
                default:
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                        final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                        if (localWrittenBytes == 0) {
                            setOpWrite = true;
                            break;
                        }
                        expectedWrittenBytes -= localWrittenBytes;
                        writtenBytes += localWrittenBytes;
                        if (expectedWrittenBytes == 0) {
                            done = true;
                            break;
                        }
                    }
                    break;
            }

            // Release the fully written buffers, and update the indexes of the partially written buffer.
            in.removeBytes(writtenBytes);

            if (!done) {
                // Did not write all buffers completely.
                incompleteWrite(setOpWrite);
                break;
            }
        }
    }

2. ChannelInboundHandler和ChannelInboundHandler

Echo的handler代码如下:

/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        firstMessage = Unpooled.buffer(EchoClient.SIZE);
        for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }

上面的代码出现了两个重要的netty组件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在另一篇文章已经讲到。我们这次重点分析一下    ChannelInboundHandlerAdapter及其相关类。

  ChannelInboundHandlerAdapter继承了ChannelInboundHandler,它的作用是将operation转到ChannelPipeline中的下一个ChannelHandler。子类可以重写一个方法的实现来改变。注意:在方法#channelRead(ChannelHandlerContext, Object)自动返回前,message不会释放。若需要一个可以自动释放接收消息的ChannelInboundHandler实现时,请考虑SimpleChannelInboundHandler。

  ChannelOutboundHandlerAdapter继承了ChannelOutboundHandler,它仅通过调用ChannelHandlerContext跳转到每个方法。

  ChannelInboundHandler处理输入的事件,事件由外部事件源产生,例如从一个socket接收到数据。

  ChannelOutboundHandler解析你自己应用提交的操作。

 2.1 ChannelInboundHandler.channelActive() 

从源码角度看一下,Netty触发一个inbound事件channelActive(以LoggingHandler为例):

   @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "ACTIVE"));
        }
        ctx.fireChannelActive();
    }

触发操作如下:

     @Override
    public ChannelHandlerContext fireChannelActive() {
        AbstractChannelHandlerContext next = findContextInbound();
        next.invoker().invokeChannelActive(next);
        return this;
    }

   private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

invokeChannelActive方法实现:

    @Override
    public void invokeChannelActive(final ChannelHandlerContext ctx) {
        if (executor.inEventLoop()) {
            invokeChannelActiveNow(ctx);
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    invokeChannelActiveNow(ctx);
                }
            });
        }
    }
    public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
        try {
            ((ChannelInboundHandler) ctx.handler()).channelActive(ctx);
        } catch (Throwable t) {
            notifyHandlerException(ctx, t);
        }
    }

2.2 ChannelOutboundHandler.Read()

读的流程:

    @Override
    public ChannelHandlerContext read() {
        AbstractChannelHandlerContext next = findContextOutbound();
        next.invoker().invokeRead(next);
        return this;
    }

查找outbound的过程:

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

触发读操作:

    @Override
    public void invokeRead(final ChannelHandlerContext ctx) {
        if (executor.inEventLoop()) {
            invokeReadNow(ctx);
        } else {
            AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
            Runnable task = dctx.invokeReadTask;
            if (task == null) {
                dctx.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        invokeReadNow(ctx);
                    }
                };
            }
            executor.execute(task);
        }
    }

2.3 ChannelOutboundHandler.write()

以实现类LoggingHandler为例:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "WRITE", msg));
        }
        ctx.write(msg, promise);
    }

具体实现:

    @Override
    public ChannelFuture write(Object msg, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise);
        return promise;
    }

写操作的触发

    @Override
    public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
        if (!validatePromise(ctx, promise, true)) {
            // promise cancelled
            ReferenceCountUtil.release(msg);
            return;
        }

        if (executor.inEventLoop()) {
            invokeWriteNow(ctx, msg, promise);
        } else {
            safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);
        }
    }

立刻触发

    public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

小结:

  Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:

参考文献

【1】http://blog.csdn.net/u013252773/article/details/21195593

【2】http://stackoverflow.com/questions/22354135/in-netty4-why-read-and-write-both-in-outboundhandler

时间: 2024-08-22 22:09:56

从netty-example分析Netty组件续的相关文章

Netty原理分析往这边看!

Netty是一个高性能.异步事件驱动的NIO框架,它提供了对TCP.UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果. 作为当前最流行的NIO框架,Netty在互联网领域.大数据分布式计算领域.游戏行业.通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建. Netty架构分析 Netty 采用了比较典型的三层网络架构进行设计,逻辑

netty作为基础通信组件

阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信.其中,服务提供者和服务消费者之间,服务提供者.服务消费者和性能统计节点之间使用 Netty 进行异步/同步通信. 除了 Dubbo 之外,淘宝的消息中间件 RocketMQ 的消息生产者和消息消费者之间,也采用 Netty 进行高性能.异步通信.

Netty系列之Netty高性能之道(转载InfoQ)

1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用.相比于传统基于Java序列化+BIO(同步阻塞IO)的通信框架,性能提升了8倍多. 事实上,我对这个数据并不感到惊讶,根据我5年多的NIO编程经验,通过选择合适的NIO框架,加上高性能的压缩二进制编解码技术,精心的设计Reactor线程模型,达到上述性能指标是完全有可能的. 下面我们就一起来看下N

【读后感】Netty 系列之 Netty 高性能之道 - 相比 Mina 怎样 ?

太阳火神的漂亮人生 (http://blog.csdn.net/opengl_es) 本文遵循"署名-非商业用途-保持一致"创作公用协议 转载请保留此句:太阳火神的漂亮人生 -  本博客专注于 敏捷开发及移动和物联设备研究:iOS.Android.Html5.Arduino.pcDuino,否则,出自本博客的文章拒绝转载或再转载,谢谢合作. [读后感] 不知道这是什么节奏,或许人家早就春意盎然了.仅仅是我方才感觉到而已! 研究 Mina 的过程中,偶然发现了 Netty,有人说 Min

Netty 系列之 Netty 高性能之道

1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用 Netty4 + Thrift 压缩二进制编解码技术,他们实现了 10 W TPS(1 K 的复杂 POJO 对象)的跨节点远程服务调用.相比于传统基于 Java 序列化 +BIO(同步阻塞 IO)的通信框架,性能提升了8倍多. 事实上,我对这个数据并不感到惊讶,根据我5年多的 NIO 编程经验,通过选择合适的 NIO 框架,加上高性能的压缩二进制编解码技术,精心的设计 Reactor 线程模型,达到上述性能指标是完

【读后感】Netty系列之Netty高性能之道 - 相比 Mina 如何 ?

[读后感] 不知道这是什么节奏,也许人家早就春意盎然了,只是我方才感觉到而已! 研究 Mina 的过程中,偶然发现了 Netty,有人说 Mina 好久不更新了,而 Netty 一直很活跃, 这只能说, Netty 在高速的完善当中, 至于 Mina,是没有后劲儿了呢,还是已经很完善了,不需要再继续更新,这个到是不得而知, 至少目前看,还不错, 相比 Netty 的大数据.大并发.高效率...还有什么,请移步下文,自品其详吧! Netty系列之Netty高性能之道 1. 背景 1.1. 惊人的性

【React源码分析】组件通信、refs、key和ReactDOM

React源码系列文章,请多支持:React源码分析1 - 组件和对象的创建(createClass,createElement)React源码分析2 - React组件插入DOM流程React源码分析3 - React生命周期详解React源码分析4 - setState机制React源码分析5 -- 组件通信,refs,key,ReactDOMReact源码分析6 - React合成事件系统 1 组件间通信 父组件向子组件通信 React规定了明确的单向数据流,利用props将数据从父组件传

Netty学习——基于netty实现简单的客户端聊天小程序

Netty学习——基于netty实现简单的客户端聊天小程序 效果图,聊天程序展示 (TCP编程实现) 后端代码: package com.dawa.netty.chatexample; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEven

Netty系列之Netty可靠性分析

作者 李林锋 发布于 2014年6月19日 | 29 讨论 分享到:微博微信FacebookTwitter有道云笔记邮件分享 稍后阅读 我的阅读清单 1. 背景 1.1. 宕机的代价 1.1.1. 电信行业 毕马威国际(KPMG International)在对46个国家的74家运营商进行调查后发现,全球通信行业每年的收益流失约为400亿美元,占总收入的1%-3%.导致收益流失的因素有多种,主要原因就是计费BUG. 1.1.2. 互联网行业 美国太平洋时间8月16日下午3点50分到3点55分(北