Netty:EventLoopGroup

Group:群组,Loop:循环,Event:事件,这几个东西联在一起,相比大家也大概明白它的用途了。

Netty内部都是通过线程在处理各种数据,EventLoopGroup就是用来管理调度他们的,注册Channel,管理他们的生命周期,下面就来看看EventLoopGroup是怎样工作的。

Netty框架初探中,当我们启动客户端或者服务端时,都要声明一个Group对象

EventLoopGroup group = new NioEventLoopGroup();  

这里我们就以NioEventLoopGroup来说明。先看一下它的继承关系

NioEventLoopGroup extends MultithreadEventLoopGroup extends MultithreadEventExecutorGroup

看看NioEventLoopGroup的构造函数

public NioEventLoopGroup() {
	this(0);
}
//他会连续调用内部的构造函数,直到用下面的构造去执行父类的构造
//nThreads此时为0,马上就会提到这个参数的用处
public NioEventLoopGroup(
		int nThreads, Executor executor, final SelectorProvider selectorProvider) {
	super(nThreads, executor, selectorProvider);
}

基类MultithreadEventLoopGroup的构造

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
	super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
//nThreads:内部线程数,如果为0,就取默认值,通常我们会设置为处理器个数*2
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
		"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

继续调用再上一级的MultithreadEventExecutorGroup的构造

//这里会根据nThreads创建执行者数组
private final EventExecutor[] children;

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
	if (nThreads <= 0) {
		throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
	}

	if (executor == null) {
		executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
	}
//这里创建EventExecutor数组对象
	children = new EventExecutor[nThreads];
	if (isPowerOfTwo(children.length)) {
		chooser = new PowerOfTwoEventExecutorChooser();
	} else {
		chooser = new GenericEventExecutorChooser();
	}
//此处循环children数组,来创建内部的NioEventLoop对象
	for (int i = 0; i < nThreads; i ++) {
		boolean success = false;
		try {
			//newChild是abstract方法,运行期会执行具体的实例对象的重载
			children[i] = newChild(executor, args);
			success = true;
		} catch (Exception e) {
			// TODO: Think about if this is a good exception type
			throw new IllegalStateException("failed to create a child event loop", e);
		} finally {
			//如果没有成功,做关闭处理
			if (!success) {
				for (int j = 0; j < i; j ++) {
					children[j].shutdownGracefully();
				}

				for (int j = 0; j < i; j ++) {
					EventExecutor e = children[j];
					try {
						while (!e.isTerminated()) {
							e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
						}
					} catch (InterruptedException interrupted) {
						// Let the caller handle the interruption.
						Thread.currentThread().interrupt();
						break;
					}
				}
			}
		}
	}

	......
}

因为我们最初创建的是NioEventLoopGroup对象,所以newChild会执行NioEventLoopGroup的newChild方法,创建NioEventLoop对象。

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
    }

看看NioEventLoop的继承关系

NioEventLoop extends SingleThreadEventLoop extends SingleThreadEventExecutor

NioEventLoop通过自己的构造行数,一直调用到SingleThreadEventExecutor的构造

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
        super(parent);

        if (executor == null) {
            throw new NullPointerException("executor");
        }

        this.addTaskWakesUp = addTaskWakesUp;
        this.executor = executor;
        taskQueue = newTaskQueue();
    }

至此,Group和内部的Loop对象以及Executor就创建完毕,那么他们是什么时候被调用的呢?就是在服务端bind和客户端connect时。

服务端的bind方法执行的是AbstractBootstrap的bind方法,bind方法中先会调用validate()方法检查是否有group

    @SuppressWarnings("unchecked")
    public B validate() {
        if (group == null) {
            throw new IllegalStateException("group not set");
        }
        if (channelFactory == null) {
            throw new IllegalStateException("channel or channelFactory not set");
        }
        return (B) this;
    }

所以,假如初始时,没有设置Bootstrap的group的话,就会报错。

最终bind调用doBind,然后调用doBind0,启动一个Runnable线程

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

我们这里,channel.eventLoop()得到的是NioEventLoop对象,所以执行NioEventLoop的run方法,开始线程运行。

我们在创建Bootstrap初期,会调用它的group方法,绑定一个group,这样,一个循环就开始运行了。

时间: 2024-07-30 18:18:25

Netty:EventLoopGroup的相关文章

Netty:Channel

Channel是客户端和服务端得以传输信息的通道,它维护这套接字或者可进行IO操作的组件. 下面我们以客户端及NioSocketChannel为例,看看他是怎么和socket发生联系的. 当客户端初始化时,首先要进行这样的操作 Bootstrap b = new Bootstrap(); ...... b.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true); 通过channel方法去创建通道,流程如下:

常见面试问题总结

[Java] 1. 容器 1)队列(queue): 若Blocking:阻塞put/take  offer/peek poll 异常element/remove a. PriorityQueue:comparator  堆排序 b. Deque:双向队列  linkedList  addFirst() c. DelayQueue:类似timeTask,延迟期满才能提取元素 d. ConcurrentLinkedQueue:线程安全 2)数组(Array):注意左右边界校验,如arr!=null&

Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析

类结构图: 不了解Executor接口原理的可以查看concurrent包中的api介绍,这里只介绍Netty中EventExecutorGroup的主要功能! 从类的结构图中可以看到EventExecutorGroup是直接继承ScheduledExecutorService这个接口的,为了说明白Group的原理这里顺便提一下ScheduledExecutorService的用途! java.util.concurrent.ScheduledExecutorService An Executo

[编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现

分析NioEventLoopGroup最主有两个疑问 1.next work如何分配NioEventLoop 2.boss group 与child group 是如何协作运行的 从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象 next方法就是用来分配NioEventLoop public interface EventLoopGroup extends EventExecutorGroup { @Overrid

Netty源代码学习——EventLoopGroup原理:NioEventLoopGroup分析

类结构图: watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvd29ya2luZ19icmFpbg==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" > 不了解Executor接口原理的能够查看concurrent包中的api介绍.这里仅仅介绍Netty中EventExecutorGroup的主要功能. 从类的结构图中能够看到EventExecu

005——Netty之EventLoop与EventLoopGroup

Netty解决的事情 Netty主要解决两个相应关注领域.(1)异步和事件驱动的实现.(2)一组设计模式,将应用逻辑与网络层解耦. EventLoop接口 用于处理连接的生命周期中所发生的事件. 一个EventLoopGroup包含一个或者多个EventLoop 一个EventLoop在它的生命周期内只和一个Thread绑定 所有由EventLoop处理的I/O事件都将在它专有的Thread上被处理 一个Channel在它的生命周期内只注册于一个EventLoop 一个EventLoop可能会被

netty源码解解析(4.0)-6 线程模型-IO线程EventLoopGroup和NIO实现(一)

接口定义 io.netty.channel.EventLoopGroup extends EventExecutorGroup 方法 说明 ChannelFuture register(Channel channel) 把一个channel注册到一个EventLoop ChannelFuture register(Channel channel, ChannelPromise promise); 同上 io.netty.channel.EventLoop extends OrderedEvent

6. Netty源码分析之EventLoop与EventLoopGroup

一.NioEventLoop与NioEventLoopGroup的关系 二.NioEventLoop 1. 设计原理 1. 负责IO读写 2. 执行task.通过调用NioEventLoop的execute(Runnable task)方法实现.我们知道,为了防止资源竞争和并发操作,我们经常会判断当前操作线程是否为EventLoop线程,如果不是,则将操作封装成task放进NioEventLoop的执行队列中,这样就实现了局部无锁化. 3. 定时任务.通过调用NioEventLoop的sched

Netty学习之核心组件(EventLoop、EventLoopGroup)

一.EventLoop.EventLoopGroup概述 由下图所示,NioEventLop是EventLoop的一个具体实现,EventLoop是EventLoopGroup的一个属性,NioEventLoopGroup是EventLoopGroup的具体实现,都是基于ExecutorService进行的线程池管理,因此EventLoop.EventLoopGroup组件的核心作用就是进行Selector的维护以及线程池的维护. 其中EventLoop进行的是Selector的维护,如下图左: