Jafka来源分析——Processor

Jafka Acceptor接受client而建立后的连接请求,Acceptor会将Socket连接交给Processor进行处理。Processor通过下面的处理步骤进行client请求的处理:

1. 读取client请求。

2. 依据client请求类型的不同,调用对应的处理函数进行处理。

Processor读取client请求是一个比較有意思的事情,须要考虑两个方面的事情:第一,请求规则(Processor须要依照一定的规则进行请求的解析)。第二,怎样确定一次请求的读取已经结束(由于是非堵塞连接,很有可能第一次读操作读取了请求的一部分数据,第二次到第N次读取才干把整个client请求读取完整)。以下我们具体解析一下client请求的格式。

client请求首先包括一个int,该int指明本次client请求的大小(size)。随后,请求包括一个两个byte(short)的请求类型(请求类型包括:CreaterRequest、DeleterRequest、FetchRequest、MultiFetchRequest、MultiProducerRequest、OffsetRequest和ProducerRequest。然后每种请求类型有固定的格式。下图具体说明了ProducerRequest的格式:

知道了上面的格式之后,问题二(怎样确定一次请求已经读取完毕)就非常easy攻克了。

首先为“请求长度”分配一个4byte的ByteBuffer,直到该Buffer读满,否则说明长度一直没有读取完毕。“请求长度”读取完毕后,为请求分配一个“请求长度”大小的ByteBuffer,直到该Buffer读满则说明一次请求读取完毕。读取完毕后,依据“请求类型”调用对应的处理函数(Handler)进行处理。在jafka中,上述的两个Buffer在类BoundedByteBufferReceive中进行声明和管理。Processor接收到Acceptor分配的socket连接后。会为socke连接建立一个BoundedByteBufferReceive并将其与socket连接进行绑定。每当该socket连接“可读”时。将BoundedByteBufferReceive拿出来从上次读取的基础上继续读取。直到一次请求彻底读取完毕,详细过程如以下代码(Processor.read)所看到的:

private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}

BoundedByteBufferReceive.readFrom的实现详细例如以下:主要是申请两个Buffer并不断的读取数据。

public int readFrom(ReadableByteChannel channel) throws IOException {
        expectIncomplete();
        int read = 0;
        if (sizeBuffer.remaining() > 0) {
            read += Utils.read(channel, sizeBuffer);
        }
        if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
            sizeBuffer.rewind();
            int size = sizeBuffer.getInt();
            if (size <= 0) {
                throw new InvalidRequestException(...);
            }
            if (size > maxRequestSize) {
                final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
                throw new InvalidRequestException(format(msg, size, maxRequestSize));
            }
            contentBuffer = byteBufferAllocate(size);
        }
        //
        if (contentBuffer != null) {
            read = Utils.read(channel, contentBuffer);
            //
            if (!contentBuffer.hasRemaining()) {
                contentBuffer.rewind();
                setCompleted();
            }
        }
        return read;
    }

读取完毕后,Processor会解析“请求类型”,依据请求类型的不同调用不同的Handler处理对应于该请求。

版权声明:本文博主原创文章,博客,未经同意不得转载。

时间: 2024-11-08 03:25:32

Jafka来源分析——Processor的相关文章

Jafka源代码分析——Processor

Jafka Acceptor接收到客户端请求并建立连接后,Acceptor会将Socket连接交给Processor进行处理.Processor通过以下的处理步骤进行客户端请求的处理: 1. 读取客户端请求. 2. 根据客户端请求类型的不同,调用相应的处理函数进行处理. Processor读取客户端请求是一个比较有意思的事情,需要考虑两个方面的事情:第一,请求规则(Processor需要按照一定的规则进行请求的解析):第二,如何确定一次请求的读取已经结束(因为是非阻塞连接,非常有可能第一次读操作

Jafka来源分析——文章

Kafka它是一个分布式消息中间件,我们可以大致分为三个部分:Producer.Broker和Consumer.当中,Producer负责产生消息并负责将消息发送给Kafka:Broker能够简单的理解为Kafka集群中的每一台机器,其负责完毕消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....).Consumer从Broker获取消息并进行兴许的操作.每一个broker会有一个ID标识,该标识由人工在配置文件里配置. Kafka中的消息隶属于topic

Jafka源代码分析——网络架构

在kafka中,每一个broker都是一个服务器.按照一般理解,服务器就是一个SocketServer,其不断接收用户的请求并进行处理.在Java中进行网络连接有两种方式一种为阻塞模式一种为非阻塞模式.Jafka采用非阻塞模式进行网络通讯.在Java的非阻塞模式中,建立socket server的一般流程如下: 1.启动ServerSocketChannel并将其绑定到特定的端口. 2.将ServerSocketChannel以及其感兴趣的操作注册到Selector,在这里感兴趣的操作是Acce

Jafka源代码分析——随笔

Kafka是一个分布式的消息中间件,可以粗略的将其划分为三部分:Producer.Broker和Consumer.其中,Producer负责产生消息并负责将消息发送给Kafka:Broker可以简单的理解为Kafka集群中的每一台机器,其负责完成消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....):Consumer从Broker获取消息并进行后续的操作.每个broker会有一个ID标识,该标识由人工在配置文件中配置. Kafka中的消息隶属于topic

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包

微生物来源分析

目录 微生物来源分析 写在前面 准备 微生物来源分析 rm(list = ls()) gc() 导入主函数 导入分组文件和OTU表格 Load OTU table 下面区分目标样品和来源样品. Extract the source environments and source/sink indices 对两组样品进行抽平 Estimate source proportions for each sink 就正常样品而言,我们都会测定重复,这里基于多个样品的sourceracker分析 导入主函

电子地图(gis应用)开发数据来源分析

电子地图(gis应用)开发数据哪里来 要想实现电子地图应用,除了要有开发的电子地图GIS平台还需要支撑地图信息展示的数据和地图的底图,那么现在我们市面上有很多的公司都在使用电子地图和相关的一些应用.这类的应用数据到底是如何采集的和获取呢.这边从我们开发了这么多项目和对行业的了解来给大家作答,同时也希望有这方面开发需求的客户来点咨询我们将会耐心的协助您解决问题.上海为卓信息科技有限公司经过十年的研究和发展,致力于3s行业的电子地图开发应用,专业解决企业地图开发需求.下面我们就来分析一下地图数据的具

SDL2来源分析3:渲染(SDL_Renderer)

===================================================== SDL源代码分析系列文章上市: SDL2源码分析1:初始化(SDL_Init()) SDL2源码分析2:窗体(SDL_Window) SDL2源码分析3:渲染器(SDL_Renderer) SDL2源码分析4:纹理(SDL_Texture) SDL2源码分析5:更新纹理(SDL_UpdateTexture()) SDL2源码分析6:拷贝到渲染器(SDL_RenderCopy()) SDL2

JUnit4.8.2来源分析-2 org.junit.runner.Request

JUnit4.8.2源代码,最为yqj2065兴趣是org.junit.runner.Request,现在是几点意味着它? ①封装JUnit的输入 JUnit4作为信息处理单元,它的输入是单元測试类--布满各种JUnit4的RUNTIME标注的类,但因为使用反射机制,JUnit4的输入严格地说是一个或多个(组)单元測试类的Class对象.早期版本号的JUnit主要处理一个測试或測试构成的树,在增添了对过滤/ filtering和排序/ sorting支持后,JUnit4增加了这个概念.毕竟依照1