SpringKafka消费端配置类ConsumerConfig.java源码

  1  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
  2 package org.apache.kafka.clients.consumer;
  3
  4 import org.apache.kafka.clients.CommonClientConfigs;
  5 import org.apache.kafka.common.config.AbstractConfig;
  6 import org.apache.kafka.common.config.ConfigDef;
  7 import org.apache.kafka.common.config.ConfigDef.Importance;
  8 import org.apache.kafka.common.config.ConfigDef.Type;
  9 import org.apache.kafka.common.serialization.Deserializer;
 10
 11 import java.util.HashMap;
 12 import java.util.Map;
 13 import java.util.Properties;
 14
 15 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 16 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 17
 18 /**
 19  * The consumer configuration keys
 20  */
 21 public class ConsumerConfig extends AbstractConfig {
 22     private static final ConfigDef CONFIG;
 23
 24     /*
 25      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
 26      * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
 27      */
 28
 29     /**
 30      * <code>group.id</code>
 31      */
 32     public static final String GROUP_ID_CONFIG = "group.id";
 33     private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
 34
 35     /**
 36      * <code>session.timeout.ms</code>
 37      */
 38     public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
 39     private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka‘s group management facilities.";
 40
 41     /**
 42      * <code>heartbeat.interval.ms</code>
 43      */
 44     public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
 45     private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka‘s group management facilities. Heartbeats are used to ensure that the consumer‘s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
 46
 47     /**
 48      * <code>bootstrap.servers</code>
 49      */
 50     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 51
 52     /**
 53      * <code>enable.auto.commit</code>
 54      */
 55     public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
 56     private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer‘s offset will be periodically committed in the background.";
 57
 58     /**
 59      * <code>auto.commit.interval.ms</code>
 60      */
 61     public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
 62     private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.";
 63
 64     /**
 65      * <code>partition.assignment.strategy</code>
 66      */
 67     public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
 68     private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used";
 69
 70     /**
 71      * <code>auto.offset.reset</code>
 72      */
 73     public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
 74     private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer‘s group</li><li>anything else: throw exception to the consumer.</li></ul>";
 75
 76     /**
 77      * <code>fetch.min.bytes</code>
 78      */
 79     public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
 80     private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
 81
 82     /**
 83      * <code>fetch.max.wait.ms</code>
 84      */
 85     public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
 86     private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn‘t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
 87
 88     /** <code>metadata.max.age.ms</code> */
 89     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
 90
 91     /**
 92      * <code>max.partition.fetch.bytes</code>
 93      */
 94     public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
 95     private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
 96
 97     /** <code>send.buffer.bytes</code> */
 98     public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
 99
100     /** <code>receive.buffer.bytes</code> */
101     public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
102
103     /**
104      * <code>client.id</code>
105      */
106     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
107
108     /**
109      * <code>reconnect.backoff.ms</code>
110      */
111     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
112
113     /**
114      * <code>retry.backoff.ms</code>
115      */
116     public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
117
118     /**
119      * <code>metrics.sample.window.ms</code>
120      */
121     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
122
123     /**
124      * <code>metrics.num.samples</code>
125      */
126     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
127
128     /**
129      * <code>metric.reporters</code>
130      */
131     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
132
133     /**
134      * <code>check.crcs</code>
135      */
136     public static final String CHECK_CRCS_CONFIG = "check.crcs";
137     private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
138
139     /** <code>key.deserializer</code> */
140     public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
141     public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
142
143     /** <code>value.deserializer</code> */
144     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
145     public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
146
147     /** <code>connections.max.idle.ms</code> */
148     public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
149
150     /** <code>request.timeout.ms</code> */
151     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
152     private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
153
154
155     static {
156         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
157                                         Type.LIST,
158                                         Importance.HIGH,
159                                         CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
160                                 .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
161                                 .define(SESSION_TIMEOUT_MS_CONFIG,
162                                         Type.INT,
163                                         30000,
164                                         Importance.HIGH,
165                                         SESSION_TIMEOUT_MS_DOC)
166                                 .define(HEARTBEAT_INTERVAL_MS_CONFIG,
167                                         Type.INT,
168                                         3000,
169                                         Importance.HIGH,
170                                         HEARTBEAT_INTERVAL_MS_DOC)
171                                 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
172                                         Type.LIST,
173                                         RangeAssignor.class.getName(),
174                                         Importance.MEDIUM,
175                                         PARTITION_ASSIGNMENT_STRATEGY_DOC)
176                                 .define(METADATA_MAX_AGE_CONFIG,
177                                         Type.LONG,
178                                         5 * 60 * 1000,
179                                         atLeast(0),
180                                         Importance.LOW,
181                                         CommonClientConfigs.METADATA_MAX_AGE_DOC)
182                                 .define(ENABLE_AUTO_COMMIT_CONFIG,
183                                         Type.BOOLEAN,
184                                         true,
185                                         Importance.MEDIUM,
186                                         ENABLE_AUTO_COMMIT_DOC)
187                                 .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
188                                         Type.LONG,
189                                         5000,
190                                         atLeast(0),
191                                         Importance.LOW,
192                                         AUTO_COMMIT_INTERVAL_MS_DOC)
193                                 .define(CLIENT_ID_CONFIG,
194                                         Type.STRING,
195                                         "",
196                                         Importance.LOW,
197                                         CommonClientConfigs.CLIENT_ID_DOC)
198                                 .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
199                                         Type.INT,
200                                         1 * 1024 * 1024,
201                                         atLeast(0),
202                                         Importance.HIGH,
203                                         MAX_PARTITION_FETCH_BYTES_DOC)
204                                 .define(SEND_BUFFER_CONFIG,
205                                         Type.INT,
206                                         128 * 1024,
207                                         atLeast(0),
208                                         Importance.MEDIUM,
209                                         CommonClientConfigs.SEND_BUFFER_DOC)
210                                 .define(RECEIVE_BUFFER_CONFIG,
211                                         Type.INT,
212                                         32 * 1024,
213                                         atLeast(0),
214                                         Importance.MEDIUM,
215                                         CommonClientConfigs.RECEIVE_BUFFER_DOC)
216                                 .define(FETCH_MIN_BYTES_CONFIG,
217                                         Type.INT,
218                                         1,
219                                         atLeast(0),
220                                         Importance.HIGH,
221                                         FETCH_MIN_BYTES_DOC)
222                                 .define(FETCH_MAX_WAIT_MS_CONFIG,
223                                         Type.INT,
224                                         500,
225                                         atLeast(0),
226                                         Importance.LOW,
227                                         FETCH_MAX_WAIT_MS_DOC)
228                                 .define(RECONNECT_BACKOFF_MS_CONFIG,
229                                         Type.LONG,
230                                         50L,
231                                         atLeast(0L),
232                                         Importance.LOW,
233                                         CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
234                                 .define(RETRY_BACKOFF_MS_CONFIG,
235                                         Type.LONG,
236                                         100L,
237                                         atLeast(0L),
238                                         Importance.LOW,
239                                         CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
240                                 .define(AUTO_OFFSET_RESET_CONFIG,
241                                         Type.STRING,
242                                         "latest",
243                                         in("latest", "earliest", "none"),
244                                         Importance.MEDIUM,
245                                         AUTO_OFFSET_RESET_DOC)
246                                 .define(CHECK_CRCS_CONFIG,
247                                         Type.BOOLEAN,
248                                         true,
249                                         Importance.LOW,
250                                         CHECK_CRCS_DOC)
251                                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
252                                         Type.LONG,
253                                         30000,
254                                         atLeast(0),
255                                         Importance.LOW,
256                                         CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
257                                 .define(METRICS_NUM_SAMPLES_CONFIG,
258                                         Type.INT,
259                                         2,
260                                         atLeast(1),
261                                         Importance.LOW,
262                                         CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
263                                 .define(METRIC_REPORTER_CLASSES_CONFIG,
264                                         Type.LIST,
265                                         "",
266                                         Importance.LOW,
267                                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
268                                 .define(KEY_DESERIALIZER_CLASS_CONFIG,
269                                         Type.CLASS,
270                                         Importance.HIGH,
271                                         KEY_DESERIALIZER_CLASS_DOC)
272                                 .define(VALUE_DESERIALIZER_CLASS_CONFIG,
273                                         Type.CLASS,
274                                         Importance.HIGH,
275                                         VALUE_DESERIALIZER_CLASS_DOC)
276                                 .define(REQUEST_TIMEOUT_MS_CONFIG,
277                                         Type.INT,
278                                         40 * 1000,
279                                         atLeast(0),
280                                         Importance.MEDIUM,
281                                         REQUEST_TIMEOUT_MS_DOC)
282                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
283                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
284                                         Type.LONG,
285                                         9 * 60 * 1000,
286                                         Importance.MEDIUM,
287                                         CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
288
289                                 // security support
290                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
291                                         Type.STRING,
292                                         CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
293                                         Importance.MEDIUM,
294                                         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
295                                 .withClientSslSupport()
296                                 .withClientSaslSupport();
297
298     }
299
300     public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
301                                                               Deserializer<?> keyDeserializer,
302                                                               Deserializer<?> valueDeserializer) {
303         Map<String, Object> newConfigs = new HashMap<String, Object>();
304         newConfigs.putAll(configs);
305         if (keyDeserializer != null)
306             newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
307         if (valueDeserializer != null)
308             newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
309         return newConfigs;
310     }
311
312     public static Properties addDeserializerToConfig(Properties properties,
313                                                      Deserializer<?> keyDeserializer,
314                                                      Deserializer<?> valueDeserializer) {
315         Properties newProperties = new Properties();
316         newProperties.putAll(properties);
317         if (keyDeserializer != null)
318             newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
319         if (valueDeserializer != null)
320             newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
321         return newProperties;
322     }
323
324     ConsumerConfig(Map<?, ?> props) {
325         super(CONFIG, props);
326     }
327
328     public static void main(String[] args) {
329         System.out.println(CONFIG.toHtmlTable());
330     }
331
332 }
时间: 2024-12-11 11:09:00

SpringKafka消费端配置类ConsumerConfig.java源码的相关文章

spring security4.2.2的maven配置+spring-security配置详解+java源码+数据库设计

最近项目需要添加权限拦截,经讨论决定采用spring security4.2.2!废话少说直接上干货! 若有不正之处,请谅解和批评指正,不胜感激!!!!! spring security 4.2.2文档:http://docs.spring.io/spring-security/site/docs/4.2.2.RELEASE/reference/htmlsingle/#el-access-web spring security 3 中文2文档:http://www.mossle.com/docs

红叶倾城一键网页游戏服务端+客户端(无须安装Java)源码

倾城的一键安装版服务端(包括客户端) , 无须安装Java,直接启动服务端就可进入游戏!!!新开服添加首个GM的方法:注册帐号并登陆游戏创建角色名称后,再服务端控制台中输入gmGrade 玩家角色名称 255即成功升级该角色为GM(注意输入的命令的大小写)//////////////////////////////////////////changeHeroLevel 玩家角色名称 级别/addPlayerTreasure 玩家角色名称 数量                /addGoods 物

Java面试准备之String类专项突破+源码分析

String的源码中有很多Arrays的方法使用,建议先参阅Arrays的类库 基本介绍: String是一个比较特殊的类,有很多种建立的方法. 如果使用传统的构造方法比如 String s = new String("字符串");这时的对象会在堆上分配,这时候比较两个字符串地址就不相等,而用""双引号包起来的内容会在常量池中做停留,这时如果有两个内容一样的地址就一样了. 因此,使用==来比较字符串是不靠谱的. String类还实现了三个接口:Serializabl

如何阅读Java源码 阅读java的真实体会

刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的源码很漂亮,但我绝不建议你一开始就读它.我文中会专门谈到这个,暂时不展开. 强烈

Java源码阅读的真实体会

原文:http://zwchen.iteye.com/blog/1154193 刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的

[收藏] Java源码阅读的真实体会

收藏自http://www.iteye.com/topic/1113732 刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的源码

Android源码开发利器——Java源码调试(基于4.1.2)

原文地址:http://blog.csdn.net/jinzhuojun/article/details/8868038 调试Android Java源码 草帽的后花园--Neo 写在之前的话:这里主要是以调试Java源码为主,应该说是在system_process之后的源码,这对于调试和修改frameworks层的人来说真是一个利器,但至于为什么在system_process之后,我还在分析,如果有结果我会更新此文章,并正在尝试调试C++的代码,就是native中的代码,如果这个可行那将会大大

解密随机数生成器(二)——从java源码看线性同余算法

Random Java中的Random类生成的是伪随机数,使用的是48-bit的种子,然后调用一个linear congruential formula线性同余方程(Donald Knuth的编程艺术的3.2.1节) 如果两个Random实例使用相同的种子,并且调用同样的函数,那么生成的sequence是相同的 也可以调用Math.random()生成随机数 Random实例是线程安全的,但是并发使用Random实例会影响效率,可以考虑使用ThreadLocalRandom变量. Random实

Java源码之LinkedList

Java源码之LinkedList 转载请注意出处:http://blog.csdn.net/itismelzp/article/details/51620311 一.LinkedList概述 本文采用jdk1.8进行分析. LinkedList实现了List,Deque接口的双向链表,实现了链表的所有可选操作,并且可有null值.查找某个值的时候必须从头到尾的遍历链表.它是非线程安全的,当多个线程结构化修改同一链表时需要加上同步处理.(程结构化修改包括:添加.删除,不包括:修改值)可使用Col