Kafka Consumer应用与高级应用
PS:本博客仅作学习、总结、交流使用,参考以下博客&资料
1.http://kafka.apache.org/intro.html
2.https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
3.http://www.cnblogs.com/luotianshuai/p/5206662.html
4.http://www.cnblogs.com/fxjwind/p/3794255.html
5.https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
一.Kafka使用背景
在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题:
- 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位
- 我想对用户的搜索关键词进行统计,分析出当前的流行趋势
- 有些数据,存储数据库浪费,直接存储硬盘效率又低
这些场景都有一个共同点:数据是由上游模块产生,下游模块,使用上游模块的数据计算、统计、分析,这个时候就可以使用消息系统,尤其是分布式消息系统!
二.Kafka(科技术语)
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
2.1 特性
Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息
- 支持通过Kafka服务器和消费机集群来分区消息
- 支持Hadoop并行数据加载
2.2 Kafka相关术语介绍
- Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker
- Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition Partition是物理上的概念,每个Topic包含一个或多个Partition.
- Producer 负责发布消息到Kafka broker
- Consumer 消息消费者,向Kafka broker读取消息的客户端。
- Consumer Group 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
三、KafKa安装&配置 参考http://www.cnblogs.com/denghongfu/p/6085685.html
四、KafKa Consumer接口
kafka的consumer接口,有两种版本:
A.high-level 比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset
B.就是官网上提供的 SimpleConsumer Example low-level
几点说明:
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的
1.High-level
如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset
package com.tydic.kafka.client; import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.message.MessageAndMetadata;import kafka.serializer.StringEncoder; /** * Description: * Created by hadoop on 2016/11/9. * Date 2016/11/9 */public class ConsumerKafka { private ConsumerConfig config; private String topic; private int partitionsNum; private MessageExecutor executor; private ConsumerConnector connector; private ExecutorService threadPool; public ConsumerKafka(String topic,int partitionsNum,MessageExecutor executor) throws Exception{ Properties prop = new Properties(); prop.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据 prop.put("zookeeper.connect", "cna3:2181,cna4:2181,cna5:2181"); prop.put("serializer.class", StringEncoder.class.getName()); prop.put("metadata.broker.list", "cna3:9092,cna4:9092,cna5:9092"); prop.put("group.id", "test-consumer-group"); config = new ConsumerConfig(prop); this.topic = topic; this.partitionsNum = partitionsNum; this.executor = executor; } public void start() throws Exception{ connector = Consumer.createJavaConsumerConnector(config); Map<String,Integer> topics = new HashMap<String,Integer>(); topics.put(topic, partitionsNum); Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics); List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream<byte[], byte[]> partition : partitions){ threadPool.execute(new MessageRunner(partition)); } } public void close(){ try{ threadPool.shutdownNow(); }catch(Exception e){ // }finally{ connector.shutdown(); } } class MessageRunner implements Runnable{ private KafkaStream<byte[], byte[]> partition; MessageRunner(KafkaStream<byte[], byte[]> partition) { this.partition = partition; } public void run(){ ConsumerIterator<byte[], byte[]> it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata<byte[],byte[]> item = it.next(); System.out.println("partiton:" + item.partition()); System.out.println("offset:" + item.offset()); executor.execute(new String(item.message()));//UTF-8 } } } interface MessageExecutor { public void execute(String message); } /** * @param args */ public static void main(String[] args) { ConsumerKafka consumer = null; try{ MessageExecutor executor = new MessageExecutor() { public void execute(String message) { System.out.println(message); } }; consumer = new ConsumerKafka("topic1",3, executor); consumer.start(); }catch(Exception e){ e.printStackTrace(); }finally{ if(consumer != null){ consumer.close(); } } } }
在用high-level的consumer时,两个给力的工具,
1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0
2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumer.properties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,Lag=logSize
多线程consumer的完整代码
import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置 createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int a_numThreads) { // 创建并发的consumers Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream // now launch all the threads // executor = Executors.newFixedThreadPool(a_numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic); example.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); } }
2.low-level SimpleConsumer Example
package com.tydic.kafka.client; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import com.alibaba.fastjson.JSON; import com.tydic.kafka.util.Utils; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; /** * Description:kafka消费者实现,关注信息消费位置 * Created by hadoop on 2016/11/18. * Date 2016/11/18 */ public class KafkaSimpleConsumer { public static void main(String arg[]) { KafkaSimpleConsumer example = new KafkaSimpleConsumer(); List<String> seeds = new ArrayList<>(); int maxReads = 0; int partition = 0; int port = 0; String topic = ""; try { System.out.println(JSON.toJSONString(arg, true)); Map<String, Object> paramMap = Utils.parseParam(arg); System.out.println(JSON.toJSONString(paramMap, true)); if (null == paramMap || paramMap.size() == 0) { return; } maxReads = (int) paramMap.get("consumer.maxReads"); partition = (int) paramMap.get("consumer.partition"); List<String> seedStr = (List<String>) paramMap.get("consumer.seedBrokers"); for(String s:seedStr){ seeds.add(s); } port = (int) paramMap.get("consumer.port"); topic = (String) paramMap.get("consumer.topic"); System.out.print("maxReads=" + maxReads + "partition=" + partition + "seedStr=" + seedStr + "port=" + port + "topic=" + topic); example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } private List<String> m_replicaBrokers = new ArrayList<>(); public KafkaSimpleConsumer() { m_replicaBrokers = new ArrayList<>(); } /** * * @param a_maxReads Maximum number of messages to read (so we don’t loop forever) 最大读取消息数量 * @param a_topic Topic to read from 订阅的topic * @param a_partition Partition to read from 查找的分区 * @param a_seedBrokers One broker to use for Metadata lookup broker节点 * @param a_port Port the brokers listen on 端口 * @throws Exception */ public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // find the meta data about the topic and partition we are interested in // 获取指定topic partition的元数据 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can‘t find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can‘t find Leader for Topic and Partition. Exiting"); return; } String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); System.out.print("readOffset="+readOffset+kafka.api.OffsetRequest.LatestTime()); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } // Note: this fetchSize of 100000 might need to be increased if // large batches are written to Kafka FetchRequest req = new FetchRequestBuilder().clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { System.out.println("error"); numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; // 处理offset非法的问题,用最新的offset if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for // the last element to reset readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; // 更新leader broker leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet( a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); System.out.println("currentOffset="+currentOffset+"readOffset="+readOffset); // 必要判断,因为对于compressed message,会返回整个block,所以可能包含old的message if (currentOffset < readOffset) { System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } // 获取下一个readOffset readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { ie.printStackTrace(); } } } if (consumer != null) consumer.close(); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); //build offset fetch request info requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); //取到offsets OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } //取到的一组offset long[] offsets = response.offsets(topic, partition); //取第一个开始读 return offsets[0]; } /** * @Descrition :Finding the Lead Broker for a Topic and Partition 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker *思路就是,遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata, * 如果找到我们要找的partition就返回根据返回的PartitionMetadata.leader().host()找到leader broker * @param a_oldLeader * @param a_topic * @param a_partition * @param a_port * @return * @throws Exception */ private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn‘t changed give // ZooKeeper a second to recover // second time, assume the broker did recover before failover, // or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { ie.printStackTrace(); } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: //遍历每个broker for (String seed : a_seedBrokers) { //遍历每个broker SimpleConsumer consumer = null; try { //创建Simple Consumer, consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); //发送TopicMetadata Request请求 TopicMetadataResponse resp = consumer.send(req); //取到Topic的Metadata List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { //遍历每个partition的metadata for (PartitionMetadata part : item.partitionsMetadata()) { //确认是否是我们要找的partition if (part.partitionId() == a_partition) { returnMetaData = part; break loop; //找到就返回 } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }
It‘s all。