Netty5源码解析

Netty5源码解析

今天让我来总结下netty5的服务端代码。

  1. 服务端(ServerBootstrap)

    示例代码如下:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * Created by yaojiafeng on 16/1/17.
     */
    public class SimpleServer {
    
        public void bind(int port) throws Exception {
            // 配置服务端的NIO线程组
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(1);
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler());
                // 绑定端口,同步等待成功
                ChannelFuture f = b.bind(port).sync();
    
                // 等待服务端监听端口关闭
                f.channel().closeFuture().sync();
            } finally {
                // 优雅退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
            @Override
            protected void initChannel(SocketChannel arg0) throws Exception {
                arg0.pipeline().addLast(new SimpleServerHandler());
            }
        }
    
        /**
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            int port = 8081;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    // 采用默认值
                }
            }
            new SimpleServer().bind(port);
        }
    }

    1.1. 设置EventLoopGroup

    首先创建2个EventLoopGroup,一个parentGroup(用于接受新连接),childGroup(用于执行读写事件),NioEventLoopGroup内部根据设置的nEventLoops参数创建对应大小的NioEventLoop数组,并且每个NioEventLoop默认使用ForkJoinPool的一个线程,所以NioEventLoop称为单线程事件循环。

    1.2. 构造ServerBootstrap

    构造ServerBootstrap对象,并设置EventLoopGroup,channel(NioServerSocketChannel服务端套接字),一些option例如ChannelOption.SO_BACKLOG,childHandler(客户端连接后在管道链设置的ChannelHandler)

    1.3. 同步绑定端口

    b.bind(port).sync()

    1.3.1 validate方法

    validate方法验证parentGroup和channelFactory不能为null

    1.3.2 initAndRegister方法

    刚方法内部使用channelFactory通过反射构造NioServerSocketChannel的实例对象,NioServerSocketChannel实例对象构造内部主要包含ServerSocketChannel,DefaultChannelId(标识唯一性),Unsafe(所有IO操作都在这个类里),DefaultChannelPipeline(通道处理器管道链,自定义的ChannelHandler都在这里),NioServerSocketChannelConfig(一些配置信息)。

    构造完调用init初始化NioServerSocketChannel,包括设置自定义的ChannelHandler,ServerBootstrapAcceptor(专门用于接受客户端新连接时,初始化NioSocketChannel并注册进childGroup进行读写监听)。

    ChannelFuture regFuture = group().register(channel)

    异步注册NioServerSocketChannel到parentGroup里的NioEventLoop。

    因为注册过程是在NioEventLoop异步执行的,这里直接先分析register方法

    1.3.3 异步register

    channel.unsafe().register(this, promise)

    注册的时候会调用以上方法,委派给Unsafe的register方法,内部会给NioServerSocketChannel的字段eventLoop初始化(NioServerSocketChannel关联唯一的一个NioEventLoop),然后会调用

    eventLoop.execute(new OneTimeTask() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });

    这个会开启NioEventLoop的事件循环线程,并放task到taskQueue里,作为异步执行register0方法。

    register0方法会调用外部类(NioServerSocketChannel)的doRegister方法,

    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        ((NioEventLoop) eventLoop().unwrap()).selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }

    这里使用了NIO的API,把NioServerSocketChannel里的ServerSocketChannel注册到NioServerSocketChannel关联的NioEventLoop里的selector。

    接下来的safeSetSuccess会把Main线程设置的监听器,设置bind任务。

    1.3.4 执行bind操作

    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }

    注册成功的情况下,执行bind操作(NioServerSocketChannel的bind方法),一路追踪到

    unsafe.bind(localAddress, promise);

    unsafe的bind方法,内部调用NioServerSocketChannel的doBind方法

    protected void doBind(SocketAddress localAddress) throws Exception {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }

    并且设置pipeline.fireChannelActive()任务,fireChannelActive任务会调用channel.read()方法,内部会调用到unsafe.beginRead()方法,最终调用的是NioServerSocketChannel的doBeginRead方法,重新设置SelectionKey的感兴趣的事件readInterestOp(NioServerSocketChannel构造的时候确定的为SelectionKey.OP_ACCEPT),开始接收新连接。

    protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            if (inputShutdown) {
                return;
            }
    
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
    
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }

    1.4. NioEventLoop事件循环接受新连接

    NioEventLoop不停的通过ForkJoinPool执行它的asRunnable任务(通过每次执行任务将要完成时,重新把asRunnable设置到ForkJoinPool里)。

    从asRunnable的run方法开始,内部先执行selector的select操作,然后先调用processSelectedKeys()方法,获取到激活的selectedKeys数组,这里如果有新连接进来,那么就有一个SelectionKey,获取它的attachment(NioServerSocketChannel),然后调用processSelectedKey方法。

    private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }

    1.4.1 执行获取新连接方法

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }

    当readyOps等于SelectionKey.OP_ACCEPT调用unsafe.read(),这里调用到了AbstractNioMessageChannel的内部类NioMessageUnsafe的read方法。

    read方法会循环接受新连接,一次默认能接受16个连接,具体调用doReadMessages方法。

    protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = javaChannel().accept();
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }

    这里调用NIO的API,accept方法获取SocketChannel,并封装成NioSocketChannel(NioSocketChannel的构造字段和NioServerSocketChannel类似,只是NioSocketChannel默认的感兴趣事件为SelectionKey.OP_READ)。

    接受连接完成,循环调用pipeline.fireChannelRead()方法。

    1.4.2 ServerBootstrapAcceptor的channelRead方法

    上面的管道调用fireChannelRead方法,通过责任链方式依次调用ChannelHandler的channelRead方法,最重要的就是ServerBootstrapAcceptor的channelRead方法。

    它这个方法设置了childHandler到NioSocketChannel(新连接)的管道链里,然后又是异步注册NioSocketChannel到childGroup里的NioEventLoop里,注册过程和前面1.3章节的大体一致,也是启动了childGroup里的NioEventLoop的事件循环异步注册。只是因为是NioSocketChannel一些实现的方法不一样,执行的代码有点差别,最终注册完成也会调用pipeline的fireChannelActive()方法。

    1.4.3 fireChannelActive方法

    public ChannelPipeline fireChannelActive() {
            head.fireChannelActive();
    
            if (channel.config().isAutoRead()) {
                channel.read();
            }
    
            return this;
        }

    channel的read方法最终委派调用到unsafe.beginRead()方法,然后又是NioSocketChannel的doBeginRead方法,重新设置SelectionKey的感兴趣事件为SelectionKey.OP_READ(NioSocketChannel的默认值)。到这里连接已经建立,并且开启了客户端连接读事件的监听。

PS:上面的SimpleServerHandler代码如下:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by yaojiafeng on 16/1/17.
 */
public class SimpleServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        ByteBuf body = (ByteBuf) msg;
        byte[] bytes = new byte[body.readableBytes()];
        body.readBytes(bytes);
        System.out.println(new String(bytes));

        ByteBuf resp = Unpooled.copiedBuffer(bytes);
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}

原文地址:https://www.cnblogs.com/yaojf/p/8127198.html

时间: 2024-10-27 07:52:38

Netty5源码解析的相关文章

Netty5源码分析(八) -- 总结

这个系列通过七篇文章,结合Netty5的原代码 1. 分析了服务器绑定端口的过程,从整体上可以看到Netty执行的流程和主要组件 2. 分析了Netty的线程模型,解析了Reactor模式.很多人都不理解这块,被EventLoop的名称和复杂的类层次所迷惑 3. 通过比较使用Java原生的NIO API来编程的流程,再分析了Netty是如何把这些基本流程封装地,进一步地理清了Netty的封装思路 4. 分析了Netty的事件分发模型,描述了inbound,outbound事件模型,以及Pipel

ChrisRenke/DrawerArrowDrawable源码解析

转载请注明出处http://blog.csdn.net/crazy__chen/article/details/46334843 源码下载地址http://download.csdn.net/detail/kangaroo835127729/8765757 这次解析的控件DrawerArrowDrawable是一款侧拉抽屉效果的控件,在很多应用上我们都可以看到(例如知乎),控件的github地址为https://github.com/ChrisRenke/DrawerArrowDrawable

五.jQuery源码解析之jQuery.extend(),jQuery.fn.extend()

给jQuery做过扩展或者制作过jQuery插件的人这两个方法东西可能不陌生.jQuery.extend([deep],target,object1,,object2...[objectN]) jQuery.fn.extend([deep],target,object1,,object2...[objectN])这两个属性都是用于合并两个或多个对象的属性到target对象.deep是布尔值,表示是否进行深度合并,默认是false,不执行深度合并.通过这种方式可以在jQuery或jQuery.fn

eclipse中导入jdk源码、SpringMVC注解@RequestParam、SpringMVC文件上传源码解析、ajax上传excel文件

eclipse中导入jdk源码:http://blog.csdn.net/evolly/article/details/18403321, http://www.codingwhy.com/view/799.html. ------------------------------- SpringMVC注解@RequestParam:http://825635381.iteye.com/blog/2196911. --------------------------- SpringMVC文件上传源

String源码解析(一)

本篇文章内的方法介绍,在方法的上面的注释讲解的很清楚,这里只阐述一些要点. Java中的String类的定义如下: 1 public final class String 2 implements java.io.Serializable, Comparable<String>, CharSequence { ...} 可以看到,String是final的,而且继承了Serializable.Comparable和CharSequence接口. 正是因为这个特性,字符串对象可以被共享,例如下面

Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent 1.1 LifecycleAware @[email protected] interface LifecycleAware {  public void s

Spring源码解析-applicationContext

Demo uml类图 ApplicationContext ApplicationListener 源码解析 主流程 obtainFreshBeanFactory prepareBeanFactory invokeBeanFactoryPostProcessors registerBeanPostProcessors registerListeners finishRefresh 总结 在已经有BeanFactory可以完成Ioc功能情况下,spring又提供了ApplicationContex

socketserver源码解析和协程版socketserver

来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力 client import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print('receive:',data.decode()) inp = input('please input:') sk

Handler机制(四)---Handler源码解析

Handler的主要用途有两个:(1).在将来的某个时刻执行消息或一个runnable,(2)把消息发送到消息队列. 主要依靠post(Runnable).postAtTime(Runnable, long).postDelayed(Runnable, long).sendEmptyMessage(int).sendMessage(Message).sendMessageAtTime(Message).sendMessageDelayed(Message, long)这些方法来来完成消息调度.p