【转】Netty源码的阅读学习

Netty 源码阅读学习

博客分类:

背景 

最忌工作中接触到Netty相关应用场景,之前看过mima的部分源码,所以最近看了Netty的部分源码和学习其设计思想,做个简单的分享(学习代码为:Netty:3.6.3.FINALE)。

Netty概述

官方:Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

特点:和Mina,Grizzly一样都是基于nio的通信框架,事件驱动,异步非阻塞,高性能,高可靠性。

Mina和Neety的主导作者都是Trustin Lee. Mina是apache社区提供的一个基于NIO的高性能网络应用程序框架,Trustin Lee离开apache后加入redhat开启了Netty,所以两者的设计思路基本一致,目前Netty的更新更加频繁,文档也比较全。下图为 netty官网上的一张体系结构图。


 

源码阅读:

先从Netty的ServerBootstrap初始化到bind()过程梳理

Java代码  

  1. bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(serverBossTF),
  2. Executors.newCachedThreadPool(serverWorkerTF)));
  3. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  4. public ChannelPipeline getPipeline() throws Exception {
  5. ChannelPipeline pipeline = new DefaultChannelPipeline();
  6. pipeline.addFirst("firewall", firewall);
  7. pipeline.addLast("decoder", new NettyProtocolDecoder());
  8. pipeline.addLast("encoder", new NettyProtocolEncoder());
  9. pipeline.addLast("handler", serverHandler);
  10. return pipeline;
  11. }
  12. });
  13. bootstrap.bind(new InetSocketAddress(bindHost, listenPort));

以上三步为:

1. 初始化ServerBootstrap,指定其ChannelFactory为NioServerSocketChannelFactory(用来创建NioServerSocketChannel)

2. 为bootstrap指定了PipelineFactory,所有Accepted Channel在创建时会使用该工厂创建一个pipeline处理事件。最后的serverHandler为自定义实现的Handler,最终负责处理请求相关业务逻辑等。

3. 绑定一个端口监听请求

从上面三步可以看到Netty的使用还是比较简单,可以自定义的扩展pipeline和实现自己的Handler。

整个过程屏蔽复杂的nio和多线程并发处理的细节,所以最近通过阅读源码了解Netty的部分实现。

NioServerSocketChannelFactory关键构造方法:

Java代码  

  1. public NioServerSocketChannelFactory(BossPool<NioServerBoss> bossPool, WorkerPool<NioWorker> workerPool) {
  2. if (bossPool == null) {
  3. throw new NullPointerException("bossExecutor");
  4. }
  5. if (workerPool == null) {
  6. throw new NullPointerException("workerPool");
  7. }
  8. this.bossPool = bossPool;
  9. this.workerPool = workerPool;
  10. sink = new NioServerSocketPipelineSink();
  11. }

bossPool 通过new NioServerBossPool(bossExecutor, bossCount,
null)创建,这里bossCount默认为1,这里的boss线程通过new NioServerBoss(executor,
determiner)创建,在这里完成Selector.open();主要会处理accept事件。

workerPool 通过new NioWorkerPool(workerExecutor,
workerCount)创建,workerCount为当前可以cpu数
×2(Runtime.getRuntime().availableProcessors() * 2 ),通过new
NioWorker(executor, determiner)创建,每个worker持有一个Selector;主要处理数据读写操作。

sink NioServerSocketPipelineSink 接收并处理下行(downstream)末端的ChannelEvent,主要负责执行最终ServerSocketChannel绑定以及注册OP_ACCEPT到Selector上。

接下来看bootstrap.bind()

Java代码  

  1. // ServerBootstrap.java --> bindAsync()
  2. // 初始一个Binder,Binder继承自SimpleChannelUpstreamHandler,接收并处理上行的ChannelEvent
  3. Binder binder = new Binder(localAddress);
  4. // 初始化一个bossPipeline,之前初始化那个pipeline是工作线程中用的
  5. ChannelPipeline bossPipeline = pipeline();
  6. // 将binder放入pipline中
  7. bossPipeline.addLast("binder", binder);
  8. ... ...
  9. // 获取ServerBootstrap初始化时候初始的NioServerSocketChannelFactory来完成服务端channel创建,创建出的是一个NioServerSocketChannel
  10. Channel channel = getFactory().newChannel(bossPipeline);

NioServerSocketChannel

Java代码  

  1. // 这是真正java nio的Channel,Netty包装了一层channel的概念
  2. socket = ServerSocketChannel.open();
  3. // 设置为非阻塞模式
  4. socket.configureBlocking(false);
  5. // 将open事件作为一个上行事件丢到pipeline中(由对应的handler来handle事件)
  6. // 这里的handler从前面的初始化来看就是交给binder来处理,binder继承自SimpleChannelUpstreamHandler,可以处理上行事件
  7. fireChannelOpen(this);

Binder --> channelOpen()

Java代码  

  1. // 执行Channel的bind, 在pipeline中发送一个BOUND的下行事件
  2. evt.getChannel().bind(localAddress)

DefaultChannelPipeline

Java代码  

  1. // 这里的sink是初始化时上面提到的NioServerSocketPipelineSink
  2. getSink().eventSunk(this, e);

NioServerSocketPipelineSink --> eventSunk() --> handleServerSocket()

Java代码  

  1. // 这里的boss是初始化时设置的 NioServerBoss
  2. ((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);

在boss的bind方法中new了一个内部的RegisterTask来完成真正的ServerSocket.bind()及注册到selector上,并唤醒Selector

NioServerBoss --> RegisterTask

Java代码  

  1. //java NIO ServerSocketChannel.bind()操作
  2. channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
  3. bound = true;
  4. // 更新future
  5. future.setSuccess();
  6. // 出发一个绑定成功的事件
  7. fireChannelBound(channel, channel.getLocalAddress());
  8. // 向selector中注册OP_ACCEPT事件
  9. channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

接下来boss线程和worker线程就要开始干活接客了

NioServerBoss 继承自 AbstractNioSelector,其run方法:

Java代码  

  1. public void run() {
  2. ...
  3. for (;;) {
  4. ...
  5. // 执行 selector.select(SELECT_TIMEOUT)
  6. // 阻塞等待客户端连接
  7. int selected = select(selector);
  8. ....
  9. // 当请求到达唤醒select,处理具体的事件
  10. // 该抽象方法由NioServerBoss实现
  11. process(selector);
  12. }
  13. ...
  14. }
  15. protected void process(Selector selector) {
  16. // 获取select出来的SelectionKey,逐个accept
  17. Set<SelectionKey> selectedKeys = selector.selectedKeys();
  18. for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
  19. NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
  20. // accept connections in a for loop until no new connection is ready
  21. for (;;) {
  22. // NIO 的accept,建立和客户端之间的socket连接
  23. SocketChannel acceptedSocket = channel.socket.accept();
  24. ....
  25. registerAcceptedChannel(channel, acceptedSocket, thread);
  26. }
  27. ....
  28. }
  29. }
  30. //将accept成功的SocketChannel交给工作线程处理读写操作
  31. private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
  32. Thread currentThread) {
  33. ....
  34. ChannelSink sink = parent.getPipeline().getSink();
  35. ChannelPipeline pipeline =
  36. parent.getConfig().getPipelineFactory().getPipeline();
  37. NioWorker worker = parent.workerPool.nextWorker();
  38. worker.register(new NioAcceptedSocketChannel(
  39. parent.getFactory(), pipeline, parent, sink
  40. , acceptedSocket,
  41. worker, currentThread), null);
  42. ....
  43. }

总结

以上过程梳理了Netty初始化的主流程,以及NIO服务端的初始化过程(以上红色部分);顺序梳理完一遍源码,反过来总结下Netty的设计和结构。

代码梳理过程中涉及的核心接口及作用

Bootstrap:Netty框架启动的工具类,分client,server和udp3种,其核心功能是初始化主channel和pipeline。

ChannelFactory:创建Channel的工厂。针对不同场景,netty提供了各种ChannelFactory,比如ServerChannelFactory用来创建server端的Channel。

Channel:封装了java.nio.channels包中Channel的相关实现,封装了各种IO操作。

ChannelEvent:Netty中几乎所有的操作,都是以事件(ChannelEvent)驱动。事件处理都是通过Channels类的静态
方法调用开始的,将事件、channel传递给 channel持有的Pipeline进行处理。ChannelEvent分为两种,upstream
events和downstream events。Upstream
events指由底层传到高层的事件,一般指socket层发生某些事件通知到应用程序,比如当socket接受到数据后会发送
UpstreamMessageEvent事件,交给ChannelHandler来处理。Downstream
events指由高层传到底层的事件,一般指用户主动发起的操作,比如用户调用channel.write(buffer),则会生成
DownstreamMessageEvent事件,经过ChannelHandler处理后,交给socket层发起通信。

ChannelHandler:真正的处理类,一般由应用程序自己实现。主要分2个子接口ChannelUpstreamHandler和ChannelDownstreamHandler,分别处理UpstreamEvent和DownstreamEvent。

ChannelPipeline:就是由ChannelHandler(ChannelHanlderContext)组成的双向链表。如果是
upstream
events,则经过ChannelPipeline中由前往后的所有ChannelUpstreamHandler依次处理,最后事件被丢弃。如果是
downstream
events,则经过ChannelPipeline中由后往前的所有ChannelDownstreamHandler依次处理,最后事件被转交给
ChannelSink。

ChannelSink:所有的downstream events经过ChannelPipeline处理后,最后由ChannelSink的eventSunk方法处理。它是管理IO线程和事件处理的桥梁。

ChannelPipeline的处理流程(源码注释中的流程图)

服务端初始化和接收请求抽象流程

Netty中的Reactor模式

使用Doug Lea大师在Scalable IO in Java中的一张图来说明:

这里的mainReactor对应了Netty中的NioServerBoss(AbstractNioSelector.run()),被分配到bossPool中执行 ,selector.select(SELECT_TIMEOUT)阻塞。

有客户端连接请求被唤醒后执行accept,然后通过worker.register()
交给subReactor的ThreadPool执行(注册OP_READ到NioWorker的Selector中);在netty里为
workerPool,其中Worker的线程数由WorkerPoo大小决定,从上面初始化代码中看到默认是可用cpu个数*2。

当客户端有数据写入时,NioWorker持有的Selector被唤醒执行read事件,后会触发messageReceived事件,交给pipeline中的handler执行。

这里事件驱动的关键Selector在不同的操作系统中有不同的实现,系统级的select和pool原理都是通过轮询内核中等待中的FD,轮询到
就绪的FD后写用户空间,然后唤醒阻塞的Selector;java1.5版本开始支持epoll,epoll则是在等待的FD上注册回调函数(真正的事
件驱动),根本的提升了NIO的性能。

参考资源

Netty官网:http://netty.io/

Mina官网:http://mina.apache.org/

Trustin Lee :LinkedIn: http://www.linkedin.com/in/trustin

Doug Lea: Scalable IO in Java: http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Netty高性能: http://www.infoq.com/cn/articles/netty-high-performance

Netty线程模型:http://www.infoq.com/cn/articles/netty-threading-model

Netty-Mina:http://ifeve.com/netty-mina-in-depth-1/

Select,poll,epoll: http://www.cnblogs.com/xuxm2007/archive/2011/08/15/2139809.html

时间: 2024-10-24 12:20:23

【转】Netty源码的阅读学习的相关文章

Netty源码阅读(一) ServerBootstrap启动

Netty源码阅读(一) ServerBootstrap启动 转自我的Github Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序.本文讲会对Netty服务启动的过程进行分析,主要关注启动的调用过程,从这里面进一步理解Netty的线程模型,以及Reactor模式. 这是我画的一个Netty启动过程中使用到的主要的类的概要类图,当然是用到的类比这个多得多,而且我也忽略了各个类的继承关系

Netty源码学习——Included transports(传输方式)

Transport API的核心: Channel接口 类图表示Channel含有Pipeline和Config接口,pipeline上一节有所介绍. Channel是线程安全的,这表示在多线环境下操作同一个Channel,不会有数据问题. final Channel channel = null; final ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); // #1 Runnable writ

Netty源码学习——ChannelPipeline模型分析

参考Netty API io.netty.channel.ChannelPipeline A list of ChannelHandlers which handles or intercepts inbound events and outbount operations of aChannel.ChannelPipeline implements an advanced form of theIntercepting Filter pattern to give a user full co

Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析

类结构图: 不了解Executor接口原理的可以查看concurrent包中的api介绍,这里只介绍Netty中EventExecutorGroup的主要功能! 从类的结构图中可以看到EventExecutorGroup是直接继承ScheduledExecutorService这个接口的,为了说明白Group的原理这里顺便提一下ScheduledExecutorService的用途! java.util.concurrent.ScheduledExecutorService An Executo

Netty源码解读(一)概述

Netty和Mina是Java世界非常知名的通讯框架.它们都出自同一个作者,Mina诞生略早,属于Apache基金会,而Netty开始在Jboss名下,后来出来自立门户netty.io.关于Mina已有@FrankHui的Mina系列文章,我正好最近也要做一些网络方面的开发,就研究一下Netty的源码,顺便分享出来了. Netty目前有两个分支:4.x和3.x.4.0分支重写了很多东西,并对项目进行了分包,规模比较庞大,入手会困难一些,而3.x版本则已经被广泛使用.本系列文章针对netty 3.

Netty源码分析第3章(客户端接入流程)----&gt;第3节: NioSocketChannel的创建

Netty源码分析第三章: 客户端接入流程 第三节: NioSocketChannel的创建 回到上一小结的read()方法: public void read() { //必须是NioEventLoop方法调用的, 不能通过外部线程调用 assert eventLoop().inEventLoop(); //服务端channel的config final ChannelConfig config = config(); //服务端channel的pipeline final ChannelPi

DICOM医学图像处理:storescp.exe与storescu.exe源码剖析,学习C-STORE请求

背景: 上一篇专栏博文中针对PACS终端(或设备终端,如CT设备)与RIS系统之间worklist查询进行了介绍,并着重对比分析了DICOM3.0中各部分对DICOM网络通讯服务的定义.此次通过结合早些时间的博文DICOM医学图像处理:基于DCMTK工具包学习和分析worklist,对DCMTK开源库中提供的storescp.exe和storescu.exe工具的源码进行剖析,从底层深入了解C-STORE服务的触发及响应. 分析思路: storescp.exe和storescu.exe分别充当着

netty源码分析之揭开reactor线程的面纱(二)

如果你对netty的reactor线程不了解,建议先看下上一篇文章netty源码分析之揭开reactor线程的面纱(一),这里再把reactor中的三个步骤的图贴一下 reactor线程 我们已经了解到netty reactor线程的第一步是轮询出注册在selector上面的IO事件(select),那么接下来就要处理这些IO事件(process selected keys),本篇文章我们将一起来探讨netty处理IO事件的细节 我们进入到reactor线程的 run 方法,找到处理IO事件的代

netty源码解析

最近在看netty的源码,本来想写一些东西的,但是无意间看到了一个牛人写的一些有关netty的博客,感觉写得太好了,故对他的博客中有关netty的部分整理了一下放入了我的印象笔记中,现在把链接公开出来,希望对想学习netty的同学有所帮助: https://app.yinxiang.com/pub/topxiall/netty netty源码解析