1 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE 2 package org.apache.kafka.clients.consumer; 3 4 import org.apache.kafka.clients.CommonClientConfigs; 5 import org.apache.kafka.common.config.AbstractConfig; 6 import org.apache.kafka.common.config.ConfigDef; 7 import org.apache.kafka.common.config.ConfigDef.Importance; 8 import org.apache.kafka.common.config.ConfigDef.Type; 9 import org.apache.kafka.common.serialization.Deserializer; 10 11 import java.util.HashMap; 12 import java.util.Map; 13 import java.util.Properties; 14 15 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; 16 import static org.apache.kafka.common.config.ConfigDef.ValidString.in; 17 18 /** 19 * The consumer configuration keys 20 */ 21 public class ConsumerConfig extends AbstractConfig { 22 private static final ConfigDef CONFIG; 23 24 /* 25 * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS 26 * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. 27 */ 28 29 /** 30 * <code>group.id</code> 31 */ 32 public static final String GROUP_ID_CONFIG = "group.id"; 33 private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy."; 34 35 /** 36 * <code>session.timeout.ms</code> 37 */ 38 public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms"; 39 private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka‘s group management facilities."; 40 41 /** 42 * <code>heartbeat.interval.ms</code> 43 */ 44 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms"; 45 private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the consumer coordinator when using Kafka‘s group management facilities. Heartbeats are used to ensure that the consumer‘s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances."; 46 47 /** 48 * <code>bootstrap.servers</code> 49 */ 50 public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; 51 52 /** 53 * <code>enable.auto.commit</code> 54 */ 55 public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"; 56 private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer‘s offset will be periodically committed in the background."; 57 58 /** 59 * <code>auto.commit.interval.ms</code> 60 */ 61 public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms"; 62 private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>."; 63 64 /** 65 * <code>partition.assignment.strategy</code> 66 */ 67 public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy"; 68 private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used"; 69 70 /** 71 * <code>auto.offset.reset</code> 72 */ 73 public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset"; 74 private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer‘s group</li><li>anything else: throw exception to the consumer.</li></ul>"; 75 76 /** 77 * <code>fetch.min.bytes</code> 78 */ 79 public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes"; 80 private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency."; 81 82 /** 83 * <code>fetch.max.wait.ms</code> 84 */ 85 public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; 86 private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn‘t sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; 87 88 /** <code>metadata.max.age.ms</code> */ 89 public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; 90 91 /** 92 * <code>max.partition.fetch.bytes</code> 93 */ 94 public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes"; 95 private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition."; 96 97 /** <code>send.buffer.bytes</code> */ 98 public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG; 99 100 /** <code>receive.buffer.bytes</code> */ 101 public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; 102 103 /** 104 * <code>client.id</code> 105 */ 106 public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; 107 108 /** 109 * <code>reconnect.backoff.ms</code> 110 */ 111 public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; 112 113 /** 114 * <code>retry.backoff.ms</code> 115 */ 116 public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; 117 118 /** 119 * <code>metrics.sample.window.ms</code> 120 */ 121 public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; 122 123 /** 124 * <code>metrics.num.samples</code> 125 */ 126 public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; 127 128 /** 129 * <code>metric.reporters</code> 130 */ 131 public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; 132 133 /** 134 * <code>check.crcs</code> 135 */ 136 public static final String CHECK_CRCS_CONFIG = "check.crcs"; 137 private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance."; 138 139 /** <code>key.deserializer</code> */ 140 public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; 141 public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface."; 142 143 /** <code>value.deserializer</code> */ 144 public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; 145 public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface."; 146 147 /** <code>connections.max.idle.ms</code> */ 148 public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; 149 150 /** <code>request.timeout.ms</code> */ 151 public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; 152 private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; 153 154 155 static { 156 CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, 157 Type.LIST, 158 Importance.HIGH, 159 CommonClientConfigs.BOOSTRAP_SERVERS_DOC) 160 .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) 161 .define(SESSION_TIMEOUT_MS_CONFIG, 162 Type.INT, 163 30000, 164 Importance.HIGH, 165 SESSION_TIMEOUT_MS_DOC) 166 .define(HEARTBEAT_INTERVAL_MS_CONFIG, 167 Type.INT, 168 3000, 169 Importance.HIGH, 170 HEARTBEAT_INTERVAL_MS_DOC) 171 .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 172 Type.LIST, 173 RangeAssignor.class.getName(), 174 Importance.MEDIUM, 175 PARTITION_ASSIGNMENT_STRATEGY_DOC) 176 .define(METADATA_MAX_AGE_CONFIG, 177 Type.LONG, 178 5 * 60 * 1000, 179 atLeast(0), 180 Importance.LOW, 181 CommonClientConfigs.METADATA_MAX_AGE_DOC) 182 .define(ENABLE_AUTO_COMMIT_CONFIG, 183 Type.BOOLEAN, 184 true, 185 Importance.MEDIUM, 186 ENABLE_AUTO_COMMIT_DOC) 187 .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, 188 Type.LONG, 189 5000, 190 atLeast(0), 191 Importance.LOW, 192 AUTO_COMMIT_INTERVAL_MS_DOC) 193 .define(CLIENT_ID_CONFIG, 194 Type.STRING, 195 "", 196 Importance.LOW, 197 CommonClientConfigs.CLIENT_ID_DOC) 198 .define(MAX_PARTITION_FETCH_BYTES_CONFIG, 199 Type.INT, 200 1 * 1024 * 1024, 201 atLeast(0), 202 Importance.HIGH, 203 MAX_PARTITION_FETCH_BYTES_DOC) 204 .define(SEND_BUFFER_CONFIG, 205 Type.INT, 206 128 * 1024, 207 atLeast(0), 208 Importance.MEDIUM, 209 CommonClientConfigs.SEND_BUFFER_DOC) 210 .define(RECEIVE_BUFFER_CONFIG, 211 Type.INT, 212 32 * 1024, 213 atLeast(0), 214 Importance.MEDIUM, 215 CommonClientConfigs.RECEIVE_BUFFER_DOC) 216 .define(FETCH_MIN_BYTES_CONFIG, 217 Type.INT, 218 1, 219 atLeast(0), 220 Importance.HIGH, 221 FETCH_MIN_BYTES_DOC) 222 .define(FETCH_MAX_WAIT_MS_CONFIG, 223 Type.INT, 224 500, 225 atLeast(0), 226 Importance.LOW, 227 FETCH_MAX_WAIT_MS_DOC) 228 .define(RECONNECT_BACKOFF_MS_CONFIG, 229 Type.LONG, 230 50L, 231 atLeast(0L), 232 Importance.LOW, 233 CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) 234 .define(RETRY_BACKOFF_MS_CONFIG, 235 Type.LONG, 236 100L, 237 atLeast(0L), 238 Importance.LOW, 239 CommonClientConfigs.RETRY_BACKOFF_MS_DOC) 240 .define(AUTO_OFFSET_RESET_CONFIG, 241 Type.STRING, 242 "latest", 243 in("latest", "earliest", "none"), 244 Importance.MEDIUM, 245 AUTO_OFFSET_RESET_DOC) 246 .define(CHECK_CRCS_CONFIG, 247 Type.BOOLEAN, 248 true, 249 Importance.LOW, 250 CHECK_CRCS_DOC) 251 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, 252 Type.LONG, 253 30000, 254 atLeast(0), 255 Importance.LOW, 256 CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) 257 .define(METRICS_NUM_SAMPLES_CONFIG, 258 Type.INT, 259 2, 260 atLeast(1), 261 Importance.LOW, 262 CommonClientConfigs.METRICS_NUM_SAMPLES_DOC) 263 .define(METRIC_REPORTER_CLASSES_CONFIG, 264 Type.LIST, 265 "", 266 Importance.LOW, 267 CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) 268 .define(KEY_DESERIALIZER_CLASS_CONFIG, 269 Type.CLASS, 270 Importance.HIGH, 271 KEY_DESERIALIZER_CLASS_DOC) 272 .define(VALUE_DESERIALIZER_CLASS_CONFIG, 273 Type.CLASS, 274 Importance.HIGH, 275 VALUE_DESERIALIZER_CLASS_DOC) 276 .define(REQUEST_TIMEOUT_MS_CONFIG, 277 Type.INT, 278 40 * 1000, 279 atLeast(0), 280 Importance.MEDIUM, 281 REQUEST_TIMEOUT_MS_DOC) 282 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */ 283 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG, 284 Type.LONG, 285 9 * 60 * 1000, 286 Importance.MEDIUM, 287 CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) 288 289 // security support 290 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 291 Type.STRING, 292 CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, 293 Importance.MEDIUM, 294 CommonClientConfigs.SECURITY_PROTOCOL_DOC) 295 .withClientSslSupport() 296 .withClientSaslSupport(); 297 298 } 299 300 public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs, 301 Deserializer<?> keyDeserializer, 302 Deserializer<?> valueDeserializer) { 303 Map<String, Object> newConfigs = new HashMap<String, Object>(); 304 newConfigs.putAll(configs); 305 if (keyDeserializer != null) 306 newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); 307 if (valueDeserializer != null) 308 newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); 309 return newConfigs; 310 } 311 312 public static Properties addDeserializerToConfig(Properties properties, 313 Deserializer<?> keyDeserializer, 314 Deserializer<?> valueDeserializer) { 315 Properties newProperties = new Properties(); 316 newProperties.putAll(properties); 317 if (keyDeserializer != null) 318 newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName()); 319 if (valueDeserializer != null) 320 newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName()); 321 return newProperties; 322 } 323 324 ConsumerConfig(Map<?, ?> props) { 325 super(CONFIG, props); 326 } 327 328 public static void main(String[] args) { 329 System.out.println(CONFIG.toHtmlTable()); 330 } 331 332 }
时间: 2024-10-03 21:54:18