kafka 0.8.2 消息消费者 consumer

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.hashleaf</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.9.2</artifactId>
            <version>0.8.2.1</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
            <exclusions>
                <exclusion>
                    <artifactId>jmxtools</artifactId>
                    <groupId>com.sun.jdmk</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jmxri</artifactId>
                    <groupId>com.sun.jmx</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jms</artifactId>
                    <groupId>javax.jms</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>mail</artifactId>
                    <groupId>javax.mail</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
package com.hashleaf.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;

/**
 * 自定义消息消费者
 * @author xiaojf [email protected]
 * @since 2015-7-15 下午11:10:28
 */
public class MyConsumer {

    private final ConsumerConnector consumer;

    public MyConsumer(){
        Properties originalProps = new Properties();

        //zookeeper 配置,通过zk 可以负载均衡的获取broker
        originalProps.put("zookeeper.connect", "192.168.66.2:2181,192.168.66.3:2181,192.168.66.4:2181");

        //group 代表一个消费组
        originalProps.put("group.id", "hashleaf-group");

        //zk连接超时时间
        originalProps.put("zookeeper.session.timeout.ms", "10000");
        //zk同步时间
        originalProps.put("zookeeper.sync.time.ms", "200");
        //自动提交间隔时间
        originalProps.put("auto.commit.interval.ms", "1000");
        //消息日志自动偏移量,防止宕机后数据无法读取
        originalProps.put("auto.offset.reset", "smallest");
        //序列化类
        originalProps.put("serializer.class", "kafka.serializer.StringEncoder");

        //构建consumer connection 对象
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(originalProps));
    }

    public void consume(){
        //指定需要订阅的topic
        Map<String ,Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(MyProducer.HASHLEAF_KAFKA_TOPIC, new Integer(5));

        //指定key的编码格式
        Decoder<String> keyDecoder = new kafka.serializer.StringDecoder(new VerifiableProperties());
        //指定value的编码格式
        Decoder<String> valueDecoder = new kafka.serializer.StringDecoder(new VerifiableProperties());

        //获取topic 和 接受到的stream 集合
        Map<String, List<KafkaStream<String, String>>> map = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        //根据指定的topic 获取 stream 集合
        List<KafkaStream<String, String>> kafkaStreams = map.get(MyProducer.HASHLEAF_KAFKA_TOPIC);

        ExecutorService executor = Executors.newFixedThreadPool(4);

        //因为是多个 message组成 message set , 所以要对stream 进行拆解遍历
        for(final KafkaStream<String, String> kafkaStream : kafkaStreams){

            executor.submit(new Runnable() {

                @Override
                public void run() {
                  //拆解每个的 stream
                    ConsumerIterator<String, String> iterator = kafkaStream.iterator();

                    while (iterator.hasNext()) {

                        //messageAndMetadata 包括了 message , topic , partition等metadata信息
                        MessageAndMetadata<String, String> messageAndMetadata = iterator.next();

                        System.out.println("message : " + messageAndMetadata.message() + "  partition :  " + messageAndMetadata.partition());

                    }
                }
            });

        }
    }

    public static void main(String[] args) {
        new MyConsumer().consume();
    }

}
时间: 2024-10-20 01:26:09

kafka 0.8.2 消息消费者 consumer的相关文章

kafka 0.10.2 消息消费者

package cn.xiaojf.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Kaf

kafka 0.8.2 消息生产者 KafkaProducer

package com.hashleaf.kafka; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerReco

kafka 0.8.2 消息生产者 producer

package com.hashleaf.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 消息生产者 * @author xiaojf [email protected] * @since 2015-7-15 下午10:50:01 */

kafka 0.10.2 消息生产者(producer)

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka

kafka 0.10.2 消息生产者

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by 肖建锋 on

【原创】Kafka 0.11消息设计

Kafka 0.11版本增加了很多新功能,包括支持事务.精确一次处理语义和幂等producer等,而实现这些新功能的前提就是要提供支持这些功能的新版本消息格式,同时也要维护与老版本的兼容性.本文将详细探讨Kafka 0.11新版本消息格式的设计,其中会着重比较新旧两版本消息格式在设计上的异同.毕竟只有深入理解了Kafka的消息设计,我们才能更好地学习Kafka所提供的各种功能. 1. Kafka消息层次设计 不管是0.11版本还是之前的版本,Kafka的消息层次都是分为两层:消息集合(messa

Kafka 0.10问题点滴

15.如何消费内部topic: __consumer_offsets 主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter 最后用看了它的源码,把这部分挑选出来,自己解析了得到的byte[].核心代码如下: // com.sina.mis.app.ConsumerInnerTopic             ConsumerRecords<byte[], byte[]> records = consumer.poll(512);    

Kafka 0.10 KafkaConsumer流程简述

ConsumerConfig.scala 储存Consumer的配置 按照我的理解,0.10的Kafka没有专门的SimpleConsumer,仍然是沿用0.8版本的. 1.从poll开始 消费的规则如下: 一个partition只能被同一个ConsumersGroup的一个线程所消费. 线程数小于partition数,某些线程会消费多个partition. 线程数等于partition数,一个线程正好消费一个线程. 当添加消费者线程时,会触发rebalance,partition的分配发送变化

Kafka 0.8协议

介绍 概述 预备知识 网络 分区和引导 分区策略 批量处理 版本控制和兼容性 协议 Protocol Primitive Types Notes on reading the request format grammars Common Request and Response Structure Requests Responses Message sets Compression The APIs Metadata API Metadata Request Metadata Response