Netty服务端的业务流程分析

Netty的服务端怎么和java NIO联系起来的,一直很好奇这块内容,这里跟下代码,下篇文章看下Channel相关的知识。

  1. final ChannelFuture initAndRegister() {
  2. final Channel channel = channelFactory().newChannel(); //
  3. try {
  4. init(channel);
  5. } catch (Throwable t) {
  6. channel.unsafe().closeForcibly(); //立即关闭通道且不会触发事件
  7. //因为这个通道还没有注册到EventLoop,所以我们需要强制GlobalEventExecutor的使用。
  8. return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
  9. }
  10. //注册一个EventLoop
  11. ChannelFuture regFuture = group().register(channel);
  12. //注册失败
  13. if (regFuture.cause() != null) {
  14. if (channel.isRegistered()) {
  15. channel.close();
  16. } else {
  17. channel.unsafe().closeForcibly();
  18. }
  19. }
  20. // If we are here and the promise is not failed, it‘s one of the following cases:
  21. // 程序运行到这里且promise没有失败,可能有如下几种情况
  22. // 1) If we attempted registration from the event loop, the registration has been completed at this point.
  23. // 如果试图注册到一个EventLoop,该注册完成,
  24. // i.e. It‘s safe to attempt bind() or connect() now because the channel has been registered.
  25. // 2) If we attempted registration from the other thread, the registration request has been successfully
  26. // added to the event loop‘s task queue for later execution.
  27. // 如果试图注册到其他线程,该注册已经成功,但是没有完成,添加一个事件到任务队列中,等会执行
  28. // i.e. It‘s safe to attempt bind() or connect() now:
  29. // because bind() or connect() will be executed *after* the scheduled registration task is executed
  30. // because register(), bind(), and connect() are all bound to the same thread.
  31. return regFuture;
  32. }

注意这里的Channle的类型为NioServerSocketChannel类型,group()返回NioEventLoopGroup类型,他继承MultithreadEventLoopGroup,那么看下register的实现:

  1. @Override
  2. public ChannelFuture register(Channel channel) {
  3. return next().register(channel);
  4. }

跟进去是调用了SingleThreadEventLoop类的register方法。实现如下.

  1. @Override
  2. public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
  3. if (channel == null) {
  4. throw new NullPointerException("channel");
  5. }
  6. if (promise == null) {
  7. throw new NullPointerException("promise");
  8. }
  9. channel.unsafe().register(this, promise);
  10. return promise;
  11. }

调用了NioServerSocketChannel的unsafe()的register方法。

  1. @Override
  2. public final void register(EventLoop eventLoop, final ChannelPromise promise) {
  3. if (eventLoop == null) {
  4. throw new NullPointerException("eventLoop");
  5. }
  6. if (isRegistered()) {
  7. promise.setFailure(new IllegalStateException("registered to an event loop already"));
  8. return;
  9. }
  10. if (!isCompatible(eventLoop)) {
  11. promise.setFailure(
  12. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
  13. return;
  14. }
  15. AbstractChannel.this.eventLoop = eventLoop;
  16. if (eventLoop.inEventLoop()) {
  17. register0(promise);
  18. } else {
  19. try {
  20. eventLoop.execute(new OneTimeTask() {
  21. @Override
  22. public void run() {
  23. register0(promise);
  24. }
  25. });
  26. } catch (Throwable t) {
  27. logger.warn(
  28. "Force-closing a channel whose registration task was not accepted by an event loop: {}",
  29. AbstractChannel.this, t);
  30. closeForcibly();
  31. closeFuture.setClosed();
  32. safeSetFailure(promise, t);
  33. }
  34. }
  35. }

对这个eventloop.inEventLoop的理解不是很深刻,有点像android开发里面费时的操作不要放到主线程里面。eventLoop.inEventLoop()表示不在主线程里面。

register的最终实现在,AbstractNioChannel类里面:

  1. @Override
  2. protected void doRegister() throws Exception {
  3. boolean selected = false;
  4. for (;;) {
  5. try {
  6. selectionKey = javaChannel().register(eventLoop().selector, 0, this);
  7. return;
  8. } catch (CancelledKeyException e) {
  9. if (!selected) {
  10. // Force the Selector to select now as the "canceled" SelectionKey may still be
  11. // cached and not removed because no Select.select(..) operation was called yet.
  12. eventLoop().selectNow();
  13. selected = true;
  14. } else {
  15. // We forced a select operation on the selector before but the SelectionKey is still cached
  16. // for whatever reason. JDK bug ?
  17. throw e;
  18. }
  19. }
  20. }
  21. }

javaChannel()返回Java的Channel对象,eventLoop()返回NioEventLoop对象。里面包含一个selector对象。selectNow是个非阻塞的调用,调用此方法会清除所有以前调用 wakeup 方法所得的结果

Netty的Channel是对JDK中Channel的包装和扩展。

注册成功后就需要绑定端口了,

  1. private static void doBind0(
  2. final ChannelFuture regFuture, final Channel channel,
  3. final SocketAddress localAddress, final ChannelPromise promise) {
  4. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
  5. // the pipeline in its channelRegistered() implementation.
  6. channel.eventLoop().execute(new Runnable() {
  7. @Override
  8. public void run() {
  9. if (regFuture.isSuccess()) {
  10. channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  11. } else {
  12. promise.setFailure(regFuture.cause());
  13. }
  14. }
  15. });
  16. }

除了监听端口还不够,还要处理IO事件:

  1. @Override
  2. protected void run() {
  3. for (;;) {
  4. boolean oldWakenUp = wakenUp.getAndSet(false);
  5. try {
  6. if (hasTasks()) {
  7. selectNow();
  8. } else {
  9. select(oldWakenUp);
  10. // ‘wakenUp.compareAndSet(false, true)‘ is always evaluated
  11. // before calling ‘selector.wakeup()‘ to reduce the wake-up
  12. // overhead. (Selector.wakeup() is an expensive operation.)
  13. //
  14. // However, there is a race condition in this approach.
  15. // The race condition is triggered when ‘wakenUp‘ is set to
  16. // true too early.
  17. //
  18. // ‘wakenUp‘ is set to true too early if:
  19. // 1) Selector is waken up between ‘wakenUp.set(false)‘ and
  20. // ‘selector.select(...)‘. (BAD)
  21. // 2) Selector is waken up between ‘selector.select(...)‘ and
  22. // ‘if (wakenUp.get()) { ... }‘. (OK)
  23. //
  24. // In the first case, ‘wakenUp‘ is set to true and the
  25. // following ‘selector.select(...)‘ will wake up immediately.
  26. // Until ‘wakenUp‘ is set to false again in the next round,
  27. // ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore
  28. // any attempt to wake up the Selector will fail, too, causing
  29. // the following ‘selector.select(...)‘ call to block
  30. // unnecessarily.
  31. //
  32. // To fix this problem, we wake up the selector again if wakenUp
  33. // is true immediately after selector.select(...).
  34. // It is inefficient in that it wakes up the selector for both
  35. // the first case (BAD - wake-up required) and the second case
  36. // (OK - no wake-up required).
  37. if (wakenUp.get()) {
  38. selector.wakeup();
  39. }
  40. }
  41. cancelledKeys = 0;
  42. needsToSelectAgain = false;
  43. final int ioRatio = this.ioRatio;
  44. if (ioRatio == 100) {
  45. processSelectedKeys();
  46. runAllTasks();
  47. } else {
  48. final long ioStartTime = System.nanoTime();
  49. processSelectedKeys();
  50. final long ioTime = System.nanoTime() - ioStartTime;
  51. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
  52. }
  53. if (isShuttingDown()) {
  54. closeAll();
  55. if (confirmShutdown()) {
  56. break;
  57. }
  58. }
  59. } catch (Throwable t) {
  60. logger.warn("Unexpected exception in the selector loop.", t);
  61. // Prevent possible consecutive immediate failures that lead to
  62. // excessive CPU consumption.
  63. try {
  64. Thread.sleep(1000);
  65. } catch (InterruptedException e) {
  66. // Ignore.
  67. }
  68. }
  69. }
  70. }

该方法是NioEventLoop

来自为知笔记(Wiz)

时间: 2024-10-12 13:01:55

Netty服务端的业务流程分析的相关文章

Netty 服务端启动过程

在 Netty 中创建 1 个 NioServerSocketChannel 在指定的端口监听客户端连接,这个过程主要有以下  个步骤: 创建 NioServerSocketChannel 初始化并注册 NioServerSocketChannel 绑定指定端口 首先列出一个简易服务端的启动代码: 1 public void start() { 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 3 EventLoopGroup work

netty服务端启动--ServerBootstrap源码解析

netty服务端启动--ServerBootstrap源码解析 前面的第一篇文章中,我以spark中的netty客户端的创建为切入点,分析了netty的客户端引导类Bootstrap的参数设置以及启动过程.显然,我们还有另一个重要的部分--服务端的初始化和启动过程没有探究,所以这一节,我们就来从源码层面详细分析一下netty的服务端引导类ServerBootstrap的启动过程. spark中netty服务端的创建 我们仍然以spark中对netty的使用为例,以此为源码分析的切入点,首先我们看

Netty服务端启动

Netty服务端启动过程 (1)创建服务端Channel:调用JDK底层的API创建一个JDK的Channel,然后netty将其包装成自己的Channel,同时创建一些基本组件绑定在此Channel上 (2)初始化服务端Channel:初始化一些基本属性,以及添加一些逻辑处理器 (3)注册selector:Netty将JDK底层的Channel注册到事件轮询器selector上面 (4)端口绑定:最终也是调用底层JDK的API实现对本地端口的监听 bind()[用户代码入口] initAndR

Netty服务端(源码一)

首先,整理NIO进行服务端开发的步骤: (1)创建ServerSocketChannel,配置它为非阻塞模式. (2)绑定监听,配置TCP参数,backlog的大小. (3)创建一个独立的I/O线程,用于轮询多路复用器Selector. (4)创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKeyACCEPT. (5)启动I/O线程,在循环体中执行Selector.select()方法,轮训就绪的Channel. (6)当轮

Redis源码分析(三十五)--- redis.c服务端的实现分析(2)

在Redis服务端的代码量真的是比较大,如果一个一个API的学习怎么实现,无疑是一种效率很低的做法,所以我今天对服务端的实现代码的学习,重在他的执行流程上,而对于他的模块设计在上一篇中我已经分析过了,不明白的同学可以接着看上篇.所以我学习分析redis服务端的实现也是主要从main函数开始.在分析main执行流程之前,Redis的作者在这里声明了几个变量,这个我们有必要知道一下. /* Our shared "common" objects */ /* 共享的对象 */ struct

Netty系列之Netty 服务端创建

1. 背景 1.1. 原生NIO类库的复杂性 在开始本文之前,我先讲一件自己亲身经历的事:大约在2011年的时候,周边的两个业务团队同时进行新版本开发,他们都需要基于NIO非阻塞特性构建高性能.异步和高可靠性的底层通信框架. 当时两个项目组的设计师都咨询了我的意见,在了解了两个项目团队的NIO编程经验和现状之后,我建议他们都使用Netty构建业务通信框架.令人遗憾的是其中1个项目组并没有按照我的建议做,而是选择直接基于JDK的NIO类库构建自己的通信框架.在他们看来,构建业务层的NIO通信框架并

Redis源码分析(三十四)--- redis.h服务端的实现分析(1)

上次刚刚分析过了客户端的结构体分析,思路比较简答,清晰,最后学习的是服务端的实现,服务端在Redis可是重中之重,里面基本上囊括了之前模块中涉及到的所有知识点,从redis的头文件就可以看出了,redis.h代码量就已经破1000+行了,而且都还只是一些变量,宏定义的声明,和一些方法原型的声明.所以,今天的总结跟昨天一样,先不做具体的实现学习,先从全局的角度思考,服务端的整体设计思路,这从头文件的声明正好可以学习. /* ----------------------- 声明了一下所需的头文件,主

【转】C# client 与java netty 服务端的简单通信,客户端采用Unity

http://blog.csdn.net/wilsonke/article/details/24721057 近日根据官方提供的通信例子自己写了一个关于Unity(C#)和后台通信的类,拿出来和大家分享一下. 具体请参考: 1.java服务端用的apach.mina框架搭建.java服务端请参考:http://blog.9tech.cn/?c=site&m=article&id=548 2.C#环境:.NET framework 2.0 3.C#帮组文档,及Socket注解:http://

netty服务端实现心跳超时的主动拆链

一.服务器启动示例: public class MySocketServer { protected static Logger logger = LoggerFactory.getLogger(MySocketServer.class); public void start(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopG