(二)storm-kafka源码走读之Config相关类走读

Config就是配置相关信息,下面是KafkaConfig的源码及小弟的相关注释,有错误的地方还望指出

public class KafkaConfig implements Serializable {

	/** 一个借口,实现类有ZkHosts,和StatisHosts	 **/
    public final BrokerHosts hosts;
    public final String topic; // kafka topic name
    public final String clientId; // 自己取一个唯一的ID吧

    public int fetchSizeBytes = 1024 * 1024; // 每次从kafka读取的byte数,这个变量会在KafkaUtils的fetchMessage方法中看到
    public int socketTimeoutMs = 10000; //  Consumer连接kafka server超时时间
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;   //Consumer端缓存大小
    public MultiScheme scheme = new RawMultiScheme(); // 数据发送的序列化和反序列化定义的Scheme,后续会专门有一篇介绍
    public boolean forceFromStart = false;	// 和startOffsetTime,一起用,默认情况下,为false,一旦startOffsetTime被设置,就要置为true
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); // -2 从kafka头开始  -1 是从最新的开始 0 =无 从ZK开始
    public long maxOffsetBehind = Long.MAX_VALUE;  // 每次kafka会读取一批offset存放在list中,当zk offset比当前本地保存的commitOffse相减大于这个值时,重新设置commitOffset为当前zk offset,代码见PartitionManager
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;

    public KafkaConfig(BrokerHosts hosts, String topic) {
        this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
    }

    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
        this.hosts = hosts;
        this.topic = topic;
        this.clientId = clientId;
    }

}

SpoutConfig继承了KafkaConfig

public class SpoutConfig extends KafkaConfig implements Serializable {
    public List<String> zkServers = null; // zk hosts 列表,格式就是简单ip:xxx.xxx.xxx.xxx,作为zkserver ,后续leader election用
    public Integer zkPort = null;	// zk端口,一般是2181
    public String zkRoot = null;  // 该参数是Consumer消费的meta信息,保存在zk的路径,自己指定
    public String id = null;		// 唯一id
    public long stateUpdateIntervalMs = 2000; // commit 消费的offset到zk的时间间隔

    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
        super(hosts, topic);
        this.zkRoot = zkRoot;
        this.id = id;
    }
}

config的参数,在后续的class中都会用,要创建一个KafkaSpout,需要的构造参数就是SpoutConfig

下一节讲述如何构建一个KafkaSpout

时间: 2024-10-10 04:57:35

(二)storm-kafka源码走读之Config相关类走读的相关文章

apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557 producer的发送方式剖析 Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式. sync架构图 async架构图 调用流程如下: 代码流程如下: Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer.DefaultEventHandler.在创建的同时,会默认new一个Prod

kafka源码走读-controller (创建topic过程)

晚上刚刚被媳妇骂,难过之余,还是要坚持继续写一篇kafka源码走读的博客,心情难过,原谅我开头发下牢骚... 源码版本依然是0.10.2.1,我们都知道,kafka在0.8版本前没有提供Partition的Replication机制,一旦Broker宕机,其上的所有Partition就都无法提供服务,而Partition又没有备份数据,数据的可用性就大大降低了,所以0.8后提供了Replication机制来保证Broker的failover,而controller则是实现副本机制的核心. con

Idea下Kafka源码阅读编译环境搭建

Kafka源码编译阅读环境搭建 开发环境: Oracle Java 1.7.0_25 + Idea + Scala 2.10.5 +Gradle 2.1 + Kafka 0.9.0.1 一.Gradle安装配置 Kafka代码自0.8.x之后就使用Gradle来进行编译和构建了,因此首先需要安装Gradle.Gradle集成并吸收了Maven主要优点的同时还克服了Maven自身的一些局限性--你可以访问https://www.gradle.org/downloads/ 下载最新的Gradle版本

kafka源码分析之一server启动分析

1. 分析kafka源码的目的 深入掌握kafka的内部原理 深入掌握scala运用 2. server的启动 如下所示(本来准备用时序图的,但感觉时序图没有思维图更能反映,故采用了思维图): 2.1 启动入口Kafka.scala 从上面的思维导图,可以看到Kafka的启动入口是Kafka.scala的main()函数: def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args)

搭建kafka源码开发环境时使用&quot;gradle idea&quot;命令构建源码失败

我的环境: JDK: 1.8.0_131 Gradle: Gradle 3.1 Kafka源码包: kafka-0.10.0.1-src.tgz Zookeeper安装包: zookeeper-3.4.6.tar.gz Scala版本: 2.10.6 提示在 D:\soft\kafka-0.10.0.1-src\build.gradle文件的230行有问题.解决办法: 打开build.gradle文件:在开头添加如下内容: ScalaCompileOptions.metaClass.daemon

windows下IntelliJ IDEA搭建kafka源码环境

于kafka核心原理的资料,网上有很多,但是如果不自己研究其源码,永远是知其然而不知所以然.下面就来演示如何在windows环境下来编译kafka源码,并通过IntelliJ IDEA开发工具搭建kafka的源码环境,以方便在本地通过debug调试来研究kafka的内部实现机制. 具体步骤: (1)安装jdk,版本为1.8.0_131,配置JAVA_HOME: (2)安装scala,版本为 2.10.6,配置SCALA_HOME: (3)安装Gradle,版本为 3.1,配置GRADLE_HOM

idea 编译 kafka 源码

1. 从 GitHub 网站,git clone kafka 源码 2. 下载安装好 gradle,scala 3. 进入 kafka 项目目录,依次执行 gradle wrapper,gradle idea 4. 将工程导入到 idea 4.1 启动主类 kafka.Kafka 4.2 program arguments:server.properties 文件路径 4.3 把 config 目录下的 log4j.properties 拷贝到 core/src/main/resources 目

Linux Kafka源码环境搭建

本文主要讲述的是如何搭建Kafka的源码环境,主要针对的Linux操作系统下IntelliJ IDEA编译器,其余操作系统或者IDE可以类推. 1.安装和配置JDK确认JDK版本至少为1.7,最好是1.8及以上.使用java -version命令来查看当前JDK的版本,示例如下: [email protected]:~/workspace/software/hadoop-2.7.3/bin$ java -version java version "1.8.0_191" Java(TM)

apache kafka源码project环境搭建(IDEA)

1.gradle安装 gradle安装 2.下载apache kafka源码 apache kafka下载 3.用gradle构建产生IDEAproject文件 先装好idea的scala插件,不然构建时就会自己主动下载,因为没有国内镜像.速度会非常慢. [email protected]:~/Downloads/kafka_2.10-0.8.1$ gradle idea 假设是eclipseproject,运行:gradle eclipse 生成IDEAproject文件例如以下: 4.项目导