【原创】Kakfa network包源代码分析

kafka.network包主要为kafka提供网络服务,通常不包含具体的逻辑,都是一些最基本的网络服务组件。其中比较重要的是Receive、Send和Handler。Receive和Send封装了底层的入站(inbound)和出站(outbound)字节传输请求,而Handler在此二者间做了一个映射。一个handler就代表了一个函数,该函数接收Receive类型的对象并返回Send类型的对象。用户可以处理过冲中添加逻辑代码,并且需要自行捕获传输读写过程中的异常,将其序列化之后发送给客户端。基本上, 如果你想要研究透这个包的代码,你必须要很了解Java NIO的原理。下面我们对包中的代码进行逐个分析:

一、Transmission.scala

Transmission trait表示一个有状态的网络数据传输。之所以说是有状态的是因为传输分为未完成状态和已完成状态。Transmission提供了一个抽象方法complete表示传输是否完成,同时还定义了2个无返回值的方法: expectIncomplete和expecteComplete分别检测此次传输是否未完成以及此次传输是否已完成——如果检测不通过直接抛出异常。

Receive trait继承了Transmission,Receive表示的是一次传输是从channel读取的,定义了两个抽象方法和一个具体方法:

1. buffer: 抽象方法,此次传输读取字节到一个ByteBuffer

2. readFrom: 抽象方法,从一个ReadableByteChannel中读取字节,并返回读取的字节数

3. readCompletely: 具体方法,完整地执行完此次读取传输请求并返回总读取字节数

Send trait也继承自Transmission,表示的是此次传输是将字节发送到指定的channel上。Send定义了一个抽象方法和一个具体方法:

1. writeTo: 抽象方法,将待传输的字节发送至指定的channel上,并返回写入的字节数

2. writeCompletely: 具体方法,完整地执行完此次写传输请求并返回总写入字节数

除了上述三个trait, 该文件还定义了一个抽象类MultiSend接收一组Send,顺序地执行发送请求。MultiSend是一个泛型类型,接收Send或所有Send的子类,同时构造函数还接收一个列表。该抽象类有三个字段分别保存期望写入的字节数,当前的待发送列表以及总的写入字节数。因为MultiSend继承自Send,它就必须要实现writeTo方法和complete方法:

1. complete: 判断当前的待发送列表中是否为空——如果不为空自然返回false,说明还未完成发送;如果是的话还需要比较总的发送字节数与期望的发送字节数比较,如果不匹配的话直接报错。不返回true的情况统称为不完整的发送(incomplete write)

2. writeTo: 这方法会持续地将待发送的字节发送到channel中直到出现不完整发送。一旦出现的话,该方法会立即返回本次调用中写入的字节数。

二、SocketServer.scala

又是超长的一个文件!! 首先定义了一个SocketServer类,就是提供NIO socket服务的,它是一个多线程模型,由下面三部分组成:

1. 1个接收者线程负责处理新的连接请求

2. N个处理者线程,每一个都有自己的选择器(java.nio.Selector)用于从socket中读取请求

3. M个handler线程,用于处理请求并返回response给处理者线程以便让其进行写入。

由于其构造函数非常复杂,我打算就其参数一个一个说:

1. brokerId: Kafka Broker Id

2. host, port: Socket的host和port

3. numProcessorThreads: 处理者线程数

4. maxQueuedRequest: 队列中最大请求数

5. sendBufferSize: 设置SO_SNDBUF值

6. recvBufferSize: 设置SO_RCVBUF值

7. maxRequestSize: 一个请求的最大长度(单位:字节)

8. maxConnectionsPerIp: 每个IP发起的最大连接数

9. connectionsMaxIdleMs: 每个连接的最大空闲时间(单位:毫秒)

10. maxConnectionsPerIpOverrides: 保存了每个IP的当前连接数,以Map[String, Int]格式保存,比如"127.0.0.1" -> 100

在详细展开SocketServer构造函数之前,我们先介绍本文件中其他的类:

ConnectionQuotas类

顾名思义,这个类应该是管理连接配额方面的事情。定义的两个字段也很直观:overrides保存的是Map[String, Int]类型的的map,具体含义是[IP地址,该IP当前连接数];而counts创建了一个可变Map保存该IP的InetAddress对象 ->连接数的映射

该类提供两个方法分别用于增减某个IP的连接数

1. inc: 以同步的方式为该IP增加一个连接,只要没有超过为其分配的最大连接数限制即可,当然了,如果超过了的话抛出异常TooManyConnectionsException。

2. dec: 以同步的方式执行与inc相反的操作,减少某个IP的当前连接数1个。如果操作前连接数就是1 ,直接把该ip记录从map中移除。

AbstractServerThread类

说完了ConnectionQuotas之后,我们就能学习AbstractServerThread类了。该类继承自Runnable,同时还是抽象类。前面说的处理者线程和接收者线程都继承了这个抽象类。AbstractServerThread是它们的基类,同时提提供了很多有用的变量和方法。

AbstractServerThread的构造函数接收一个ConnectionQuotas类在接收连接和关闭线程的时候都会用到ConnectionQuotas。在AbstractServerThread的构造函数中初始化了一个java.nio.Selector来管理通道,2个CountDownLatch变量分别表示启动阀门和关闭阀门,最后定义了一个线程安全的AtomicBoolean的字段表征线程当前是否是alive。定义好了这些之后我们再来看它顶一个的方法:

1. shutdown: 发起一个正常的关闭请求,具体方法是将alive设置为false表明该线程已不在处于存活状态,然后调用Selector.wakeup方法唤醒阻塞在select方法上的线程然后一直等待关闭阀门关闭(即shutdownLatch减少变为0)。

2. awaitStartup: 等待线程完全启动——具体方法很简单,就是等启动阀门开启(这里的开启其实是指startupLatch减少变为0)

3. startupComplete: 开启启动阀门——这里的开启其实是指startupLatch减少变为0

4. shutdownComplete: 开启关闭阀门————这里的开启是指将shutdownLatch减少为0

5. isRunning: 判断线程是否依然存活

6. wakeup: 主要是为了唤醒被阻塞在select上的线程——linux上面很常见的做法。通过管道写数据唤醒线程,MemCached的多线程也是这么实现的。

7. close: 接收一个SelectionKey类型,如果不为空的话,去除掉当前attached的对象,然后调用close(channel)的方法关闭socket,关闭channel,并忽略任何异常。

8. closeAll: 关闭所有打开的连接——具体方法就是遍历selector上的所有key,然后直接调用close方法

9. countInterestOps: 遍历所有SelectionKey的所有interest集合

Acceptor类

它是一个接收者线程,负责接收并配置新的连接请求。其构造函数参数如下:

1. host/port: Socket的host和port

2. processors: 接收者线程需要保存处理者线程数组

3. sendBufferSize: 设置Socket的SO_SNDBUF

4. recvBufferSize: 设置Socket的SO_RCVBUF

5. connectionQuotas: 持有ConnectionQuotas对象管理每个IP的连接数

Acceptor提供了三个方法:

1. openServerSocket: 顾名思义,就是创建一个socket套接字用于监听入站连接,并返回一个ServerSocketChannel。这个channel被设置成非阻塞模式。

2. accept: 就像这个方法的名字说的,这个方法就是接收一个新的连接请求。需要传入2个参数:SelectionKey和Processor线程。首先从传入的SelectionKey对象中获取一个channel,然后调用accept方法接受新的连接,之后为该套接字的IP地址增加一个连接数,并将该套接字设置为非阻塞模式并且关闭nagle算法且不使用缓存。最后调用处理者线程的accept方法返回。如果连接数已达最大直接打印异常信息,并关闭channel

3. run: 只要线程一直处于运行状态,该方法就会循环检查是否需要建立新连接。首先要注册OP_ACCEPT事件准备好接受客户端连接,然后开启线程。之后不断地检查线程是否存活,如果不在存活关闭socket,channel然后关闭线程。如果一直存活的话首先检查所有准备执行IO操作的key的数目,因为是blocking方法,所以设置了500毫秒的超时。如果存在的key数目大于0,遍历每个key检测其是否准备好接收新的Socket连接,如果准好了直接调用accept接收此连接,否则直接抛出异常。选择processor时候采用了轮询的方式,即顺序循环地指定processor号。

Processos类

处理来自单个连接的入站请求。可以并行运行多个Processor线程,每个线程都有自己的selector。还是同样的思路,我们先看构造函数参数:

1. id: 处理者线程id号

2. time: 时间戳

3. maxRequestSize: 一次Socket请求的最大字节数,默认是100MB

4. idleMeter/aggregateIdleMeter: 代码会计算每次Selector.select的时间(最多300ms)并使用idelMeter进行标记(通过mark方法),并调用aggregatedIdleMeter.mark(idleMeter/处理网络请求的总线程数)进行标记——即摊还idleTime到每个线程上。

5. totalProcessorThreads: 用于处理网络请求的线程数,配置文件中默认是2个线程

6. requestChannel: Processor处理请求所使用的channel对象

7. connectionQuotas: 管理连接数的对象,主要是父类构造函数需要这个对象,并没有在该类中做一些特别的操作

8. connectionMaxIdleMs: 设置服务器socket processor线程的空闲超时时间,默认是10分钟

该类开始会新建一个并发非阻塞队列保存SocketChannel连接,并创建了一个时间戳保存Processor创建时间,同时还初始化了一个LinkedHashMap以LRU方式管理连接。Processor定义了9个方法,分别是:

1. maybeCloseOldestConnection: 以LRU方法关闭最久的连接。判断方法:如果关闭时刻的时间已经过了当初创建时定好的检查空闲连接窗口,就判断一下那个LinkedHashMap是不是空——如果为空,很简单直接重设下一个检查窗口;如果不为空,直接获取该hashmap的第一个连SelectionKey(LinkedHashMap是有顺序的,所以第一个元素必定是当前map中最先进入的,也就是最久的)并取得该连接的最近一次使用的时间。把该时间往后推设置的最大空闲超时时间(默认是10分钟)并更新到下一次空闲检查的时间戳。如果此时的时间还是晚于已更新的检查时间点,说明一定要关闭这个空闲连接了,直接调用close方法断开这个连接。其实说了那么多,就是一点:获取最久的那个连接的最近一次使用时间点,如果当前时间晚于加上超时时间后的时间点,就关闭那个连接。

2. close: 从LRU map中移除给定的SelectionKey并调用父类的close方法关闭对应的socket

3. channelFor: 根据给定的SelectionKey返回SocketChannel

4. configureNewConnection: 将newConnections队列中所有SocketChannel注册读事件到selector。

5. accept: 将传入的SocketChannel入队列并唤醒sleep的线程

6. processNewResponses: 根据Processor Id获取一个response,如果不为空,从这个channel中拿到SelectionKey,并判断response的类型以采取不同的措施

7. read: 从就绪的channel中读取数据

8. write: 写入数据到就绪的socket上

9. run: 开启线程,只要一直处于运行状态,则配置链接并注册新的repsonse用于写操作。如果发现有准备就绪的,直接获取key和iterator对象遍历key的模式并执行相应的操作,最后关闭线程。

三、Handler.scsala

一个很简单的object,定义了2个Handler类型:一个是函数,接收一个Receive并输出到Send对象;另一个也是函数,接收一个(Short, Receive)元组并返回一个Handler

四、ConnectionConfig.scala

连接配置类,主要参数有host, port, sendBufferSize, receiveBufferSize,tcpNoDelay和keepAlive

五、ByteBufferSend.scala

底层保有一个ByteBuffer缓冲区。只提供了一个方法:

writeTo: 就是讲channel待发送的数据发的送到bufer中。

六、BlockingChannel.scala

以伴生对象的方式提供了一个带超时的阻塞式通道。object中定义了默认的buffer大小是-1,主要是用于初始化。BlockingChannel的构造函数接收一个host,一个port,2个buffersize,分别设置SO_SNDBUF和SO_RCVBUF,另外还提供了一个毫秒级的timeout。

该类在内部维护了一个boolean的字段表明是否连接上该channel,还有3个channel,一个SocketChannel,一个ReadableByteChannel和一个GatheringByteChannel,同时还提供了一个锁对象和连接超时字段。下面我们具体地分析一下BlockingChannel提供的方法:

1. isConnected: 表明该channel是否连接上了

2. receive: 判断连接状态,如果未连接直接抛出异常;否则创建一个BoundedByteBufferReceive对象(后面会说到这个对象)主要用于从该channel中读取字节并返回成Receive对象。

3. connect: 以同步的方式连接,如果已连接直接退出,否则直接打开channel,分别设置SO_SNDBUF和SO_RCVBUF, 并设置阻塞模式为true——这也是为什么叫BlockingChannel,并设置超时时间SO_TIMEOUT、SO_KEEPALIVE和TCP_NODELAY,之后连接Socket。连接成功后,将该channel赋值给writeChannel,并新建一个读channel给readChannel——所有这些事情做完之后将connected设置为true,表明此时连接成功。但这些字段都是var,也就是可变的共享对象,因此BlockingChannel不是线程安全的。

4. disconnect: 断开连接。如果channel本身是null的话,writeChannel肯定也是null, readChannel有可能不是null,所以需要单独关闭。最后将connected设置为false表明连接已关闭。

5. send: 如果没有连接自然要抛出异常,否则根据传入的request构建一个BoundedByteBufferSend写入channel,并返回写入的字节数

七、BoundedByteBufferReceive.scala

表示客户端与服务器的连接,继承了Receive类。从名字来看,这应该是一个有界的buffer,构造函数接收的就是buffer大小。提供了三个方法:

1. byteBufferAllocate: 创建一个size大小的ByteBuffer

2. readFrom:sizeBuffer是表示请求大小的buffer,总共4个字节。这个方法就是先要读取sizeBuffer中的大小,即整个请求的大小size,然后分配size大小的contentBuffer,最后返回总的buffer字节数,同时将complete设置为true表明读取完毕

3. buffer: 返回请求内容的buffer

八、BoundedByteBufferSend.scala

与BoundedByteBufferReceive对应的,只是执行channel写入的,就不赘述了。

九、Request.scala

该scala中有两组伴生对象RequestChannel和RequestMetrics。先说RequestMetrics一组,主要是处理与请求相关的度量,比如请求速率(每秒多少个请求)、计算请求在队列中的时间、计算请求在本地broker处理时间等。而RequestChannel内部维护了一个request queue和response queue都是阻塞方式的,并且提供了sendRequest和sendResponse方法分别将请求加入到各自的队列中。同时,还提供了receiveRequest和receiveResponse分别从各自queue中获取request和response

时间: 2024-10-23 01:27:39

【原创】Kakfa network包源代码分析的相关文章

【原创】Kakfa cluster包源代码分析

kafka.cluster包定义了Kafka的基本逻辑概念:broker.cluster.partition和replica——这些是最基本的概念.只有弄懂了这些概念,你才真正地使用kakfa来帮助完成你的需求.因为scala文件不多,还是老规矩,我们一个一个分析. 一.Broker.scala broker可以说是Kafka最基础的概念了,没有broker就没有kafka集群,更不用提负责解耦生产者和消费者了.Kafka使用了一个case class来定义Broker类.一个Broker的属性

【原创】Kakfa log包源代码分析(一)

Kafka日志包是提供的是日志管理系统.主要的类是LogManager——该类负责处理所有的日志,并根据topic/partition分发日志.它还负责flush策略以及日志保存策略.Kafka日志本身是由多个日志段组成(log segment).一个日志是一个FileMessageSet,它包含了日志数据以及OffsetIndex对象,该对象使用位移来读取日志数据. 下面我们一个一个地分析,先说最简单的: 一.LogConfig.scala 该scala定义了Defaults object,里

【原创】Kakfa message包源代码分析

笔者最近在研究Kafka的message包代码,有了一些心得,特此记录一下.其实研究的目的从来都不是只是看源代码,更多地是想借这个机会思考几个问题:为什么是这么实现的?你自己实现方式是什么?比起人家的实现方式,你的方案有哪些优缺点? 任何消息引擎系统最重要的都是定义消息,使用什么数据结构来保存消息和消息队列?刚刚思考这个问题的时候,我自己尝试实现了一下Message的定义: public class Message implements Serializable { private CRC32

【原创】Kakfa common包源代码分析

初一看common包的代码吓了一跳,这么多scala文件!后面仔细一看大部分都是Kafka自定义的Exception类,简直可以改称为kafka.exceptions包了.由于那些异常类的名称通常都定义得很直观,笔者就不在一一赘述了.现在我们说说common包中其他的代码. 一.AppInfo.scala 这是一个object,实现了KafkaMetricsGroup trait.后者可以认为是一个创建各种度量元的工厂类.主要利用Yammer Metrics框架创建各种度量元,比如guage,m

【原创】Kakfa serializer包源代码分析

这个包很简单,只有两个scala文件: decoder和encoder,就是提供序列化/反序列化的服务.我们一个一个说. 一.Decoder.scala 首先定义了一个trait: Decoder[T].在这个trait中定义了一个抽象方法fromBytes,用于将一个字节数组转换成一个类型T的对象.实现此trait的子类的构造函数中必须要接收一个VerifiableProperties. Kafka还定义了两个实现了 Decoder的子类: DefaultDecoder和StringDecod

【原创】Kakfa metrics包源代码分析

这个包主要是与Kafka度量相关的. 一.KafkaTimer.scala 对代码块的运行进行计时.仅提供一个方法: timer——在运行传入函数f的同时为期计时 二.KafkaMetricsConfig.scala 指定reporter类,以逗号分隔的reporter类,比如kafka.metrics.KafkaCSVMetricsReporter,这些类必须要在claasspath中指定.另外指定了度量的轮询间隔,默认是10秒. 三.KafkaMetricsReporter.scala Ka

【原创】Kakfa api包源代码分析

既然包名是api,说明里面肯定都是一些常用的Kafka API了. 一.ApiUtils.scala 顾名思义,就是一些常见的api辅助类,定义的方法包括: 1. readShortString: 从一个ByteBuffer中读取字符串长度和字符串.这个ByteBuffer的格式应该是:2个字节的字符串长度值N+N个字节的字符串 2. writeShortString: 与readShortString相反,先写入2个字节的长度N,然后写入N个字节到ByteBuffer中 3. shortStr

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

【原创】kafka server源代码分析(一)

这个是Kafka server的核心包,里面的类也很多,我们还是一个一个分析 一.BrokerStates.scala 定义了目前一个kafka broker的7中状态 —— 1. NotRunning:未运行 2. Starting:启动中 3. RecoveringFromUncleanShutdown:从上次异常恢复中 4. RunningAsBroker:已启动 5. RunningAsController:作为Controller运行 6. PendingControlledShutd