我们从框架的应用层面来分析,NioEventLoopGroup在netty中的使用。
这是我们需要配置的地方。
紧接着我们进入netty的运行中。ServerBootstrap.bind(PORT);
这是一个bind操作。我们来看一下NioEventLoopGroup在这个操作中的使用。
ChannelFuture regFuture = config().group().register(channel);
config()返回在ServerBootstrap中内部属性:private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
注意这里传入的是this即ServerBootstrap对象。
这个类主要是用来暴露我们的配置的,直接把我们配置的ServerBootstrap传递进来。
我们继续ChannelFuture regFuture = config().group().register(channel);
这里的config()返回一个ServerBootstrapConfig,然后调用他的group()。
这个类是ServerBootstrapConfig的父类。调用了ServerBootstrap.group().方法。注意因为group方法是在弗雷AbstractBootstrap中定义的,所以这里进入父类的group()方法中来。
这个方法直接返回EventLoopGroup对象。
我们在回过头来看我们最初的配置,来发现这个group属性究竟是什么。
我们配置了两个NioEventLoopGroup,一个一个线程的传递给了父类AbstractBootstrap.一个初始化给当前的ServerBootstrap的内部属性childGroup.
我们在回到这里ChannelFuture regFuture = config().group().register(channel);
现在通过AbstractBootstrap.group()方法返回了一个NioEventLoopGroup对象,即我们配置的第一个单线程的NioEventLoopGroup对象。
现在进入他的register(channel)方法。由于这个方法是在他的父类中定义的,所以我们进入他的父类MultithreadEventLoopGroup()中。
next()方法返回一个EventLoop对象。
下面重点分析这个chooser。(在=======以内,然后接着分析next()方法)
====================================================================
这个chooser是我们在初始化NioEventLoopGroup的时候初始化的。
回到我们初始化NioEventLoopGroup的地方:
还是这里。
下面就到了初始化chooser的地方了:
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
这个构造方法内容比较多。别的先不管,我们来看:chooser = chooserFactory.newChooser(children);
这个EventExecutorChooserFactory chooserFactory是我们上个构造方法中传过来的:
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
是一个DefaultEventExecutorChooserFactory.INSTANCE。
然后我们看他的newChooser(EventExecutor[] executors)方法。
@SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
这里有个判断就是:判断是否为2的次方。
private static boolean isPowerOfTwo(int val) { return (val & -val) == val; }
然后根据是否为2的次方分别进行两个构造方法。分别实现了EventExecutorChooser的next()方法。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } }
这两个实现类只是对于next的方法有不同的实现。但是都是遍历每一个executors。
为什么要这样呢?
原因是位操作& 比 % 操作要高效。netty为了提高效率也是拼了。
总的来说这个DefaultEventExecutorChooserFactory非常简单,就上面这些内容。现在我们也知道了chooser是什么了,就是一个时限了遍历所有EventExecutor的next()方法的对象。功能非常简单,只有一个方法next。遍历每一个EventExecutor。
=====================================================================