java提供了方便的API进行kafka消息处理。简单总结一下:
学习参考:http://www.itnose.net/st/6095038.html
POM配置(关于LOG4J的配置参看 http://www.cnblogs.com/huayu0815/p/5341712.html)
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.7</version> </dependency> </dependencies>
PRODUCER
import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties; public class KafkaProducer { Producer<String, String> producer ; /*#指定kafka节点列表,用于获取metadata,不必全部指定 metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092 # 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区 #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner # 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。 compression.codec=none # 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultEncoder,即byte[] serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder # serializer.class=kafka.serializer.DefaultEncoder # serializer.class=kafka.serializer.StringEncoder # 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。 #compressed.topics= ########### request ack ############### # producer接收消息ack的时机.默认为0. # 0: producer不会等待broker发送ack # 1: 当leader接收到消息之后发送ack # 2: 当所有的follower都同步消息成功后发送ack. request.required.acks=0 # 在向producer发送ack之前,broker允许等待的最大时间 # 如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种 # 原因未能成功(比如follower未能同步成功) request.timeout.ms=10000 ########## end ##################### # 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量, # 也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息 producer.type=sync ############## 异步发送 (以下四个异步参数可选) #################### # 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms # 此值和batch.num.messages协同工作. queue.buffering.max.ms = 5000 # 在async模式下,producer端允许buffer的最大消息量 # 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积 # 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000 queue.buffering.max.messages=20000 # 如果是异步,指定每次批量发送数据量,默认为200 batch.num.messages=500 # 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后 # 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息) # 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间 # -1: 无阻塞超时限制,消息不会被抛弃 # 0:立即清空队列,消息被抛弃 queue.enqueue.timeout.ms=-1 ################ end ############### # 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数 # 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失) # 有可能导致broker接收到重复的消息,默认值为3. message.send.max.retries=3 # producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况 # 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新 # (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000 topic.metadata.refresh.interval.ms=60000*/ public Producer<String, String> getClient() { if (producer == null) { Properties props = new Properties() ; //此处配置的是kafka的端口 props.put("metadata.broker.list", "xxx.xxx.xxx.xxx:9092"); //配置value的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "async"); //配置key的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "0"); ProducerConfig config = new ProducerConfig(props) ; producer = new Producer<>(config) ; } return producer ; } public void shutdown(){ if (producer != null) { producer.close(); } } public static void main(String[] args) throws CloneNotSupportedException { KafkaProducer kafkaProducer = new KafkaProducer() ; for (int i=0 ; i< 10; i ++) { kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic1","topic1_" + i + "_测试")); kafkaProducer.getClient().send(new KeyedMessage<String, String>("topic2","topic2_" + i + "_测试")); } kafkaProducer.shutdown(); } }
总结:
1、producer每次new的时候,会自动创建线程池
2、producer在调用send方法时候,才会真正建立socket连接。
连接过程如下:
1>、通过metadata.broker.list获取对应的brokers全量信息(metadata.broker.list给的broker的ip和端口只要保证一个是可用的即可,无需全部列出。不过开发过程中,一般全部列出)。
2>、根据zookeeper的注册信息获取topic的分区信息
3>、建立client和broker的socket连接
3、send结束后,直接关闭socket连接。
4、每次send会重新建立连接
5、client会自动获取topic的分区信息,因此kafka rebalance的时候,是不受影响的
CONSUMER
consumer api官方有两种,一般称为:high-level Consumer API 和 SimpleConsumer API 。
第一种高度抽象的Consumer API,它使用起来简单、方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,先简单介绍下第二种API能够帮助我们做哪些事情
- 一个消息读取多次
- 在一个处理过程中只消费Partition其中的一部分消息
- 添加事务管理机制以保证消息被处理且仅被处理一次
使用第二种的弊端:
- 必须在程序中跟踪offset值
- 必须找出指定Topic Partition中的lead broker
- 必须处理broker的变动
我主要尝试了一下第一种也是大多数情况下使用的API。
使用high-level Consumer api,有两种用法:单个消费者和多个消费者
单消费者:
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; public class KafkaSingleConsumer { /** * # zookeeper连接服务器地址,此处为线下测试环境配置(kafka消息服务-->kafka broker集群线上部署环境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息 zookeeper.sync.time.ms=2000 #指定消费组 group.id=xxx # 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息 # 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true auto.commit.enable=true # 自动更新时间。默认60 * 1000 auto.commit.interval.ms=1000 # 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察 conusmer.id=xxx # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生 client.id=xxxx # 最大取多少块缓存到消费者(默认10) queued.max.message.chunks=50 # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 # 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 # "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, # 此值用于控制,注册节点的重试次数. rebalance.max.retries=5 # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk # 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存 fetch.min.bytes=6553600 # 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、 # anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest auto.offset.reset=smallest # 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder */ public static void main(String args[]) { String topic = "topic1" ; Properties props = new Properties(); props.put("zookeeper.connect", "xxx.xxx.xxx:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ; Map<String, Integer> topicMap = new HashMap<>(); // Define single thread for topic topicMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic); for (KafkaStream<byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } if (consumer != null) consumer.shutdown(); } }
多消费者
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaMultiConsumer { /** * # zookeeper连接服务器地址,此处为线下测试环境配置(kafka消息服务-->kafka broker集群线上部署环境wiki) # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉,当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息 zookeeper.sync.time.ms=2000 #指定消费组 group.id=xxx # 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息 # 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true auto.commit.enable=true # 自动更新时间。默认60 * 1000 auto.commit.interval.ms=1000 # 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察 conusmer.id=xxx # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生 client.id=xxxx # 最大取多少块缓存到消费者(默认10) queued.max.message.chunks=50 # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 # 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 # "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, # 此值用于控制,注册节点的重试次数. rebalance.max.retries=5 # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk # 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存 fetch.min.bytes=6553600 # 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、 # anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest auto.offset.reset=smallest # 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder */ public static void main(String args[]) { String topic = "topic1" ; int threadCount = 3; Properties props = new Properties(); props.put("zookeeper.connect", "xxx.xxx.xxx.xxx:2181"); props.put("group.id", "testgroup"); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config) ; Map<String, Integer> topicMap = new HashMap<>(); // Define single thread for topic topicMap.put(topic, 3); ExecutorService executor = Executors.newFixedThreadPool(threadCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic); int count = 0; for (final KafkaStream<byte[], byte[]> stream : streamList) { final String threadNumber = "Thread" + count ; executor.execute(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) { System.out.println("Thread Number " + threadNumber + ": " + new String(consumerIte.next().message())); } } }); count++ ; } } }
总结:
1、KAFKA允许多个consumer group,每个group允许多个consumer。不同group之间共享信息(类似发布-订阅模式),同一个group之间的多个consumer只会消费消息一次(类似生产-消费者模式)。
2、对同一个topic启动多个java consumer线程,在zookeeper上可以看到多个信息:
[zk:xxx.xxx.xxx.xxx:2181(CONNECTED) 120] ls /consumers/testgroup/ids [testgroup_xxx-1459926903849-fea50e90, testgroup_xxx-1459926619712-8d1caf90]
3、如果多线程方式启动consumer,可以看到不同的consumer绑定到不同的topic patition上
[zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 121] get /consumers/testgroup/owners/topic1/1 testgroup_xxx-1459926619712-8d1caf90-1 cZxid = 0x2000006e2 ctime = Wed Apr 06 03:15:04 EDT 2016 mZxid = 0x2000006e2 mtime = Wed Apr 06 03:15:04 EDT 2016 pZxid = 0x2000006e2 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x153413bc26e007e dataLength = 44 numChildren = 0 [zk: xxx.xxx.xxx.xxx:2181(CONNECTED) 122] get /consumers/testgroup/owners/topic1/0 testgroup_xxx-1459926619712-8d1caf90-0 cZxid = 0x2000006e3 ctime = Wed Apr 06 03:15:04 EDT 2016 mZxid = 0x2000006e3 mtime = Wed Apr 06 03:15:04 EDT 2016 pZxid = 0x2000006e3 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x153413bc26e007e dataLength = 44 numChildren = 0
4、对于启动多个consumer进程或是以多线程方式启动单个consumer进程,区别仅仅在与zookeeper上注册的consumer信息是多个或是一个“ls /consumers/testgroup/ids ”,但是对于消息的消费而言,都遵守消费只消费一次,同一个分区只会绑定一个consumer信息。
5、如果某个消费者挂掉的话,consumer和partition的绑定信息会重新分配,尽可能的保证负载平衡
6、如果consumer的数量大于分区数量,会造成多余的那部分线程无法获取消息,不断 Got ping response for sessionid: 0x153413bc26e0082 after 2ms。是一种资源的浪费
如果多台服务器都启动consumer进程,最好根据分区数合理分配consumer进程中,消费线程的数量
更底层的细节问题,后期遇到再继续调研,先会用,明白大致原理!