KafkaSpout分析:配置

public KafkaSpout(SpoutConfig spoutConf) {
        _spoutConfig = spoutConf;}

SpoutConfig继承自KafkaConfig。由于SpoutConfig和KafkaConfig所有的instance field全是public, 因此在使用构造方法后,可以直接设置各个域的值。

public class SpoutConfig extends KafkaConfig implements Serializable {
    public List<String> zkServers = null; //记录Spout读取进度所用的zookeeper的host
    public Integer zkPort = null;//记录进度用的zookeeper的端口
    public String zkRoot = null;//进度信息记录于zookeeper的哪个路径下
    public String id = null;//进度记录的id,想要一个新的Spout读取之前的记录,应把它的id设为跟之前的一样。
    public long stateUpdateIntervalMs = 2000;//用于metrics,多久更新一次状态。

    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
        super(hosts, topic);
        this.zkRoot = zkRoot;
        this.id = id;
    }
}
public class KafkaConfig implements Serializable {

    public final BrokerHosts hosts; //用以获取Kafka broker和partition的信息
    public final String topic;//从哪个topic读取消息
    public final String clientId; // SimpleConsumer所用的client id

    public int fetchSizeBytes = 1024 * 1024; //发给Kafka的每个FetchRequest中,用此指定想要的response中总的消息的大小
    public int socketTimeoutMs = 10000;//与Kafka broker的连接的socket超时时间
    public int fetchMaxWait = 10000;   //当服务器没有新消息时,消费者会等待这些时间
    public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的读缓冲区大小
    public MultiScheme scheme = new RawMultiScheme();//从Kafka中取出的byte[],该如何反序列化
    public boolean forceFromStart = false;//是否强制从Kafka中offset最小的开始读起
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//从何时的offset时间开始读,默认为最旧的offset
    public long maxOffsetBehind = 100000;//KafkaSpout读取的进度与目标进度相差多少,相差太多,Spout会丢弃中间的消息
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所请求的offset对应的消息在Kafka中不存在,是否使用startOffsetTime
    public int metricsTimeBucketSizeInSecs = 60;//多长时间统计一次metrics

    public KafkaConfig(BrokerHosts hosts, String topic) {
        this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
    }

    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
        this.hosts = hosts;
        this.topic = topic;
        this.clientId = clientId;
    }

}

对Zookeeper的使用

KafkaSpout的配置中有两个地方可以用到Zookeeper

  1. 用Zookeeper来记录KafkaSpout的处理进度,在topology重新提交或者task重启后继续之前的处理进度。在SpoutConfig中的zkServers, zkPort和zkRoot与此相关。如果zkServer和zkPort没有设置,那么KafkaSpout会使用Storm集群所用的Zookeeper记录这些信息。
  2. 用Zookeeper来获取Kafka中一个topic的所有partition,和每个partition的leader。这需要实现BrokerHosts的子类ZkHosts.但是,这个Zookeepr是可选的。如果使用BrokerHosts的另一个子类StaticHosts,把partition和leader的对应关系硬编码,则不需要Zookeeper来提供此功能。KafkaSpout会从Kafka集群使用的Zookeeper中提取partition和leader的对应关系。而且:
    • 如果使用StatisHosts,那么KafkaSpout会使用StaticCoordinator,这个coordinator不能响应partition leader的变化。
    • 如果使用ZkHosts,那么KafkaSpout会使用ZkCoordinator, 当其refresh()方法被调用后,这个cooridnator会检查发生leader变更的partition,并为之生成新的PartitionManager.从而能够在leader变更后,继续读取消息。

影响初始读取进度的配置项

在一个topology上线后,它从哪个offset开始读取消息呢?有一些配置项对此有影响:

  1. SpoutConfig中的id字段。如果想要一个topology从另一个topology之前的处理进度继续处理,它们需要有相同的id。
  2. KafkaConfig的forceFromStart字段。如果此字段设为true, 那么它一个topology上线后,它会忽略之前相同id的topology的进度,并且从Kafka中最早的消息开始处理。
  3. KafkaConfig的startOffsetTime字段。默认为kafka.api.OffsetRequest.EarliestTime()开始读,也就是从Kafka中最早的消息开始处理。也可以设成kafka.api.OffsetRequest.LatestOffset,也就是最早的消息开始读。也可以自己指定具体的值。
  4. KafkaConfig的maxOffsetBehind字段。这个字段对于KafkaSpout的多个处理流程都有影响。当提交一个新topology时,如果没有forceFromStart, 当KafkaSpout对某个partition的处理进度落后startOffsetTime对应的offset多于此值时,KafkaSpout会丢弃中间的消息,从而强制赶上目标进度.比如,如果startOffsetTime设成了lastestTime,那么如果进度落后超过maxOffsetBehind,KafkaSpout会直接从latestTime对应的offset开始处理。如果设成了froceFromStart,则在提交新任务时,始终会从EarliestTime开始读。
  5. KafkaSpout的userStartOffsetTimeIfOffsetOutOfRange字段。如果设成true,那么当fetch消息时出错,且FetchResponse显示的出错原因是OFFSET_OUT_OF_RANGE,那么就会尝试从KafkaSpout指定的startOffsetTime对应的消息开始读。例如,如果有一批消息因为超过了保存期限被Kafka删除,并且zk里记录的消息在这批被删除的消息里。如果KafkaSpout试图从zk的记录继续读,那么就会出现OFFSET_OUT_OF_RANGE的错误,从而触发这个配置。

实际上maxOffsetBehind有时候有点名不符实。当startOffsetTime为A, zk里的进度为B, A - B > maxOffsetBehind时,应该从A - maxOffsetBehind除开始读或许更好一些,而不是直接跳到startOffsetTime。此处的逻辑参见PartitionManager的实现。

附:其中KafkaConfig的maxWait的意义请参见这篇文章 《卡夫卡的炼狱》

卡夫卡的炼

时间: 2024-11-05 20:30:17

KafkaSpout分析:配置的相关文章

nginx源码分析--配置信息的继承&amp;合并

这里只讲述http{}模块下的配置: 在ngx_http_block()函数内(这个函数别调用时在ngx_inti_cycle内的ngx_conf_parse函数,这个函数遇到http命令时 回调ngx_http_block,开启http{}配置块的解读工作),针对每一个http模块,调用init_conf之后,有调用了ngx_http_merge_servers().这是为何! 首先明确几点:一个http{}配置块内可以包含多个server{}配置块,每个server{}配置块可以包含多个lo

[nginx源码分析]配置解析1

整个配置解析主要是函数ngx_init_cycle(&init_cycle)进行处理. ngx_init_cycle(&init_cycle) ngx_time_update()//时间更新,也是在main函数里面讲过 /* * 通过加锁和解锁,来更新如下时间 ngx_cached_time = tp; ngx_cached_http_time.data = p0; ngx_cached_err_log_time.data = p1; ngx_cached_http_log_time.da

[nginx源码分析]配置解析(http作用域)

分析完events后,开始分析http.sever.location模块. http上下文主要是创建三个用于保存每一个模块设置的变量结构,每个模块都可以保存变量到http三个数组指针中(main.server.location),每个模块的索引是模块的ctx_index变量指定在三个数组中的位置 核心代码: ctx = ngx_pcalloc(cf->pool, sizeof(ngx_http_conf_ctx_t)); //首先分配一个http上下文变量,里面是指针的指针,因为后面分配数组都是

Sonar6.0应用之五:Sonar web分析配置

一.排除不用扫描的代码目录及文件 1.以管理员登陆Sonar,进入配置-->通用设置-->排除-->排除的源文件 二.安装需要的语言代码扫描规则 1.进入配置-->系统-->更新中心->Available 2.针对Jave开发的漏洞.违规.BUG扫描的规则可以是如下 三.配置代码扫描规则 1.进入配置-->通用设置->Java.JaveScript.SCM.技术债务根据需要进行修改,一般默认就可以. 四.质量配置,设置各个语言默认的扫描规则 五.代码规则设置

MyBatis 源码分析——配置信息

MyBatis框架的启动前期需要加载相关的XML配置信息.从官网上我们可以了解到他具有十几个节点.其中笔者认为比较重要的节点是settings节点.properties节点.environments节点. transactionManager节点.至于相关的作用到官网看过一篇都会明白. MyBatis框架的XML配置到底有几个节点,这个必须去查看一下相关的DTD文件才行.DTA文件就在包名为org.apache.ibatis.builder.xml的下面.由于显示的问题笔者只复制出部分来. <?

webpack源码分析——配置调试环境

无论是阅读webpack源码,还是编写webpack的plugin和loader,配置调试环境都是很有必要的.weabpack的运行环境是nodejs,调试webpack就是调试nodejs程序.我们平时使用的IDE如eclipse.webstorm都支持nodejs的调试.本文以eclipse(Version: Oxygen.1a Release (4.7.1a))为例,进行讲解. 在这个例子里面,我们使用webpack <entry> [<entry>] <output&

web_profile(网站分析)配置

web_profiler: # DEPRECATED, it is not useful anymore and can be removed # safely from your configuration verbose: true # 开关页面显示能分析页面信息的工具条 toolbar: false position: bottom # 开关是否在页面重定向之前显示页面信息 intercept_redirects: false # Exclude AJAX requests in the

通过Spring Boot整合Mybatis分析自动配置详解

前言 SpringBoot凭借"约定大于配置"的理念,已经成为最流行的web开发框架,所以有必须对其进行深入的了解:本文通过整合Mybatis类来分析SpringBoot提供的自动配置(AutoConfigure)功能,在此之前首先看一个整合Mybatis的实例. SpringBoot整合Mybatis 提供SpringBoot整合Mybatis的实例,通过Mybatis实现简单的增删改查功能: 1.表数据 CREATE TABLE `role` (  `note` varchar(2

Spring Security4实战与原理分析视频课程( 扩展+自定义)

Spring Security概述与课程概要介绍 Spring Security快速入门(基于XML) Spring Security快速入门(基于XML) URL匹配详解 自定义登陆 配置退出 Ajax登陆退出 JDBC认证 层级角色关系 认证体系介绍 自定义认证 匿名认证 认证流程分析 配置权限 授权体系介绍 自定义授权 自定义JDBC授权 表达式权限原理分析 表达式权限扩展 自定义异常处理 过滤器分析 过滤器应用 FilterChainProxy初始化流程分析 授权流程分析 Spring