netty-read

NioEventLoop是ServerSocketChannel和SocketChannel通用的EventLoop,从NioEventLoop的执行逻辑开始

 1 protected void run() {
 2   for (;;) {
 3     switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
 4       case SelectStrategy.CONTINUE:
 5         continue;
 6       case SelectStrategy.SELECT:
 7         select(wakenUp.getAndSet(false));
 8         if (wakenUp.get()) {
 9           selector.wakeup();
10         }
11       default:
12         // fallthrough
13     }
14
15     cancelledKeys = 0;
16     needsToSelectAgain = false;
17     final int ioRatio = this.ioRatio;
18     if (ioRatio == 100) {
19       try {
20         //处理IO事件
21         processSelectedKeys();
22       } finally {
23         // Ensure we always run tasks.
24         //处理队列中任务
25         runAllTasks();
26       }
27     } else {
28       final long ioStartTime = System.nanoTime();
29       try {
30         processSelectedKeys();
31       } finally {
32         // Ensure we always run tasks.
33         final long ioTime = System.nanoTime() - ioStartTime;
34         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
35       }
36     }
37   }
38 }
 1 //eventLoop持有selector,可以得到selectedKeys
 2 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
 3
 4   if (selectedKeys.isEmpty()) {
 5     return;
 6   }
 7
 8   Iterator<SelectionKey> i = selectedKeys.iterator();
 9   for (;;) {
10     final SelectionKey k = i.next();
11     //java.nio.channels.ServerSocketChannel的attachment是io.netty.channel.socket.ServerSocketChannel,
12     //java.nio.channels.SocketChannel的attachment是io.netty.channel.socket.SocketChannel,
13     final Object a = k.attachment();
14     i.remove();
15
16     if (a instanceof AbstractNioChannel) {
17       processSelectedKey(k, (AbstractNioChannel) a);
18     } else {
19       @SuppressWarnings("unchecked")
20       NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
21       processSelectedKey(k, task);
22     }
23
24     if (!i.hasNext()) {
25       break;
26     }
27
28     if (needsToSelectAgain) {
29       selectAgain();
30       selectedKeys = selector.selectedKeys();
31
32       // Create the iterator again to avoid ConcurrentModificationException
33       if (selectedKeys.isEmpty()) {
34         break;
35       } else {
36         i = selectedKeys.iterator();
37       }
38     }
39   }
40 }
 1 //涵盖了SelectionKey.OP_CONNECT、SelectionKey.OP_WRITE、SelectionKey.OP_READ、SelectionKey.OP_ACCEPT四种事件
 2 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 3   final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
 4
 5   int readyOps = k.readyOps();
 6
 7   if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
 8     int ops = k.interestOps();
 9     ops &= ~SelectionKey.OP_CONNECT;
10     k.interestOps(ops);
11
12     unsafe.finishConnect();
13   }
14
15   if ((readyOps & SelectionKey.OP_WRITE) != 0) {
16     ch.unsafe().forceFlush();
17   }
18   if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
19       //重点关注
20     unsafe.read();
21   }
22 }

下面分别对NioMessageUnsafe以及NioByteUnsafe的read操作进行分析

NioMessageUnsafe用于ServerSocketChannel,读取的是SocketChannel对象

 1 private final class NioMessageUnsafe extends AbstractNioUnsafe {
 2     //......
 3
 4     public void read() {
 5       assert eventLoop().inEventLoop();
 6       final ChannelConfig config = config();
 7       final ChannelPipeline pipeline = pipeline();
 8       final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 9       allocHandle.reset(config);
10
11       boolean closed = false;
12       Throwable exception = null;
13
14       try {
15         do {
16             //真正的读取操作
17           int localRead = doReadMessages(readBuf);
18           //读完了
19           if (localRead == 0) {
20             break;
21           }
22           //localRead<0说明对端关闭了
23           if (localRead < 0) {
24             closed = true;
25             break;
26           }
27
28           allocHandle.incMessagesRead(localRead);
29         } while (allocHandle.continueReading());
30       } catch (Throwable t) {
31         exception = t;
32       }
33
34       int size = readBuf.size();
35       for (int i = 0; i < size; i ++) {
36         readPending = false;
37         //发送给Handler处理,即回调Handler的channelRead。
38         //对于serverSocketChannel来说,readBuf中装的是SocketChannel,
39         //fireChannelRead就是发送给ServerBootstrapAcceptor,由ServerBootstrapAcceptor注册SocketChannel
40         pipeline.fireChannelRead(readBuf.get(i));
41       }
42       readBuf.clear();
43       allocHandle.readComplete();
44       //回调Handler的channelReadComplete
45       pipeline.fireChannelReadComplete();
46
47       if (exception != null) {
48         closed = closeOnReadError(exception);
49             //回调Handler的ExceptionCaught
50         pipeline.fireExceptionCaught(exception);
51       }
52
53       if (closed) {
54         inputShutdown = true;
55         if (isOpen()) {
56             close(voidPromise());
57         }
58       }
59     }
60
61     //......
62 }
 1 protected int doReadMessages(List<Object> buf) throws Exception {
 2     //调用java.nio.channels.ServerSocketChannel.accept()
 3   SocketChannel ch = SocketUtils.accept(javaChannel());
 4
 5   try {
 6     if (ch != null) {
 7       buf.add(new NioSocketChannel(this, ch));
 8       return 1;
 9     }
10   } catch (Throwable t) {
11     try {
12       ch.close();
13     } catch (Throwable t2) {
14       logger.warn("Failed to close a socket.", t2);
15     }
16   }
17   return 0;
18 }

NioByteUnsafe用于SocketChannel,读取的是字节序列

 1 protected class NioByteUnsafe extends AbstractNioUnsafe {
 2     //......
 3     public final void read() {
 4       final ChannelConfig config = config();
 5       final ChannelPipeline pipeline = pipeline();
 6       final ByteBufAllocator allocator = config.getAllocator();
 7       final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
 8       allocHandle.reset(config);
 9
10       ByteBuf byteBuf = null;
11       boolean close = false;
12       try {
13         do {
14             //每次读取都重新分配Buffer
15           byteBuf = allocHandle.allocate(allocator);
16           //真正的读取操作
17           allocHandle.lastBytesRead(doReadBytes(byteBuf));
18           //读到0表示数据读完,-1表示对端已关闭
19           if (allocHandle.lastBytesRead() <= 0) {
20             // nothing was read. release the buffer.
21             byteBuf.release();
22             byteBuf = null;
23             //读到-1,表示对端已关闭
24             close = allocHandle.lastBytesRead() < 0;
25             break;
26           }
27
28           allocHandle.incMessagesRead(1);
29           readPending = false;
30           //将读取到的数据发送给Handler,即回调Handler的ChannelRead
31           pipeline.fireChannelRead(byteBuf);
32           byteBuf = null;
33         } while (allocHandle.continueReading());
34
35         allocHandle.readComplete();
36         //回调Handler的ChannelReadComplete
37         pipeline.fireChannelReadComplete();
38
39         if (close) {
40           closeOnRead(pipeline);
41         }
42       } catch (Throwable t) {
43         handleReadException(pipeline, byteBuf, t, close, allocHandle);//会调用pipeline.fireChannelReadComplete和pipeline.fireExceptionCaught
44       }
45     }
46     //......
47 }
1 protected int doReadBytes(ByteBuf byteBuf) throws Exception {
2   final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
3   allocHandle.attemptedBytesRead(byteBuf.writableBytes());
4   //从java.nio.channels.SocketChannel中读取allocHandle.attemptedBytesRead()个字节置byteBuf中
5   return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
6 }
时间: 2024-10-07 13:44:05

netty-read的相关文章

下载-深入浅出Netty源码剖析、Netty实战高性能分布式RPC、NIO+Netty5各种RPC架构实战演练三部曲视频教程

下载-深入浅出Netty源码剖析.Netty实战高性能分布式RPC.NIO+Netty5各种RPC架构实战演练三部曲视频教程 第一部分:入浅出Netty源码剖析 第二部分:Netty实战高性能分布式RPC 第三部分:NIO+Netty5各种RPC架构实战演练

Netty对Protocol Buffer多协议的支持(八)

Netty对Protocol Buffer多协议的支持(八) 一.背景 在上篇博文中笔者已经用代码演示了如何在netty中使用Protocol Buffer,然而细心的用户可能会发现一个明显的不足之处就是,我们的Handler只能处理一种特定的类型,而我们的项目中又不可能只有一种类型,那么这个问题该怎么解决了?多的不说,笔者直接上代码. 二.代码实现 2.1 message的编写 syntax = "proto2"; package com.rsy.netty.protobuf; op

Java Netty (1)

Netty是由JBOSS提供的一个java开源框架,本质上也是NIO,是对NIO的封装,比NIO更加高级,可以说发展的路线是IO->NIO->Netty. ServerBootstrap和ClientBootstrap是Netty中两个比较重要的类,分别用来进行服务器和客户端的初始化. 服务器: // ChannelFactory final ChannelFactory channelFactory = new NioServerSocketChannelFactory( // Boss线程

netty 解决TCP粘包与拆包问题(二)

TCP以流的方式进行数据传输,上层应用协议为了对消息的区分,采用了以下几种方法. 1.消息固定长度 2.第一篇讲的回车换行符形式 3.以特殊字符作为消息结束符的形式 4.通过消息头中定义长度字段来标识消息的总长度 一.采用指定分割符解决粘包与拆包问题 服务端 1 package com.ming.netty.nio.stickpack; 2 3 4 5 import java.net.InetSocketAddress; 6 7 import io.netty.bootstrap.ServerB

用Netty解析Redis网络协议

用Netty解析Redis网络协议 根据Redis官方文档的介绍,学习了一下Redis网络通信协议.然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下! 1.RESP协议 Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol)的网络通信协议交换数据.RESP的设计权衡了实现简单.解析快速.人类可读这三个因素.Redis客户端通过RESP序列化整数.字符串.数据等数据类型,发送字符串数组表示参数的命

tomcat 、NIO、netty 本质

tomcat 基于 web 浏览器的通信容器 nio 同步非阻塞的I/O模型 netty 通信框架,对 nio 的封装

Netty中的那些坑

Netty中的那些坑(上篇) 最近开发了一个纯异步的redis客户端,算是比较深入的使用了一把netty.在使用过程中一边优化,一边解决各种坑.儿这些坑大部分基本上是Netty4对Netty3的改进部分引起的. 注:这里说的坑不是说netty不好,只是如果这些地方不注意,或者不去看netty的代码,就有可能掉进去了. 坑1: Netty 4的线程模型转变 在Netty 3的时候,upstream是在IO线程里执行的,而downstream是在业务线程里执行的.比如netty从网络读取一个包传递给

Netty利用ChannelGroup广播消息

在Netty中提供了ChannelGroup接口,该接口继承Set接口,因此可以通过ChannelGroup可管理服务器端所有的连接的Channel,然后对所有的连接Channel广播消息. Server端: public class BroadCastServer { public static void run(int port) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioE

Netty简单应用与线上服务器部署_netty视频

Netty简单应用与线上服务器部署 课程学习地址:http://www.xuetuwuyou.com/course/198 课程出自学途无忧网:http://www.xuetuwuyou.com 一.开发环境 4.1.11.Final   jdk1.8 maven 3.2 Spring 4.3.9 二.适合人群 ①想深入学习java ClassLoader ②想在线上linux服务器上运行netty或Springboot服务 三.课程目标 ①掌控ClassLoader ②学会编写shell脚本

netty中使用IdleStateHandler来发起心跳

网络连接中,处理Idle事件是很常见的,一般情况下,客户端与服务端在指定时间内没有任何读写请求,就会认为连接是idle的.此时,客户端需要向服务端发送ping消息,来维持服务端与客户端的链接.那么怎么判断客户端在指定时间里没有任何读写请求呢?netty中为我们提供一个特别好用的IdleStateHandler来干这个苦差事!请看下面代码: public class EchoClient { private final static int readerIdleTimeSeconds = 40;/