Kafka 温故(五):Kafka的消费编程模型

Kafka的消费模型分为两种:

1.分区消费模型

2.分组消费模型

一.分区消费模型

二、分组消费模型

Producer :

package cn.outofmemory.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Hello world!
 *
 */
public class KafkaProducer
{
    private final Producer<String, String> producer;
    public final static String TOPIC = "TEST-TOPIC";

    private KafkaProducer(){
        Properties props = new Properties();
        //此处配置的是kafka的端口
        props.put("metadata.broker.list", "192.168.193.148:9092");

        //配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        //request.required.acks
        //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
        //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
        //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
        props.put("request.required.acks","-1");

        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1000;
        final int COUNT = 10000;

        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key ,data));
            System.out.println(data);
            messageNo ++;
        }
    }

    public static void main( String[] args )
    {
        new KafkaProducer().produce();
    }

}
Consumer
package cn.outofmemory.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

    private final ConsumerConnector consumer;

    private KafkaConsumer() {
        Properties props = new Properties();
        //zookeeper 配置
        props.put("zookeeper.connect", "192.168.193.148:2181");

        //group 代表一个消费组
        props.put("group.id", "jd-group");

        //zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        //获取到的输入流
        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        //输出接受到的消息
        while (it.hasNext())
            System.out.println(it.next().message());
    }

    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }
}

kafka 学习告一段落,后面进入的为Spring 温习。

原文地址:https://www.cnblogs.com/pony1223/p/9807749.html

时间: 2024-10-08 13:28:38

Kafka 温故(五):Kafka的消费编程模型的相关文章

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Kafka详解五、Kafka Consumer的底层API- SimpleConsumer

Kafka提供了两套API给Consumer The high-level Consumer API The SimpleConsumer API 第一种高度抽象的Consumer API,它使用起来简单.方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情 一个消息读取多次 在一个处理过程中只消费Partition其中的一部分消息 添加事务管理机制以保证消息被处理且仅被处理一次 使用SimpleConsumer有哪些弊端呢? 必须在程序

Kafka 温故(二):Kafka的基本概念和结构

一.Kafka中的核心概念 Producer: 特指消息的生产者Consumer :特指消息的消费者Consumer Group :消费者组,可以并行消费Topic中partition的消息Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为 broker.Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类.Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队

Kafka 通过python简单的生产消费实现

使用CentOS6.5.python3.6.kafkaScala 2.10  - kafka_2.10-0.8.2.2.tgz (asc, md5) 一.下载kafka 下载地址 https://kafka.apache.org/downloads 里面包含zookeeper 二.安装Kafka 1.安装zookeeper mkdir /root/kafka/ tar -vzxf kafka_2.10-0.8.2.2 cd /root/kafka/kafka_2.10-0.8.2.2 cat 

Kafka创建&amp;查看topic,生产&amp;消费指定topic消息

启动zookeeper和Kafka之后,进入kafka目录(安装/启动kafka参考前面一章:https://www.cnblogs.com/cici20166/p/9425613.html) 1.创建Topic 1)运行命令: ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1 2181 是zookeeper 端口 图示为创建成

kafka入门二:Kafka的设计思想、理念

本节主要从整体角度介绍Kafka的设计思想,其中的每个理念都可以深入研究,以后我可能会发专题文章做深入介绍,在这里只做较概括的描述以便大家更好的理解Kafka的独特之处.本节主要涉及到如下主要内容: Kafka设计基本思想 Kafka中的数据压缩 Kafka消息转运过程中的可靠性 Kafka集群镜像复制 Kafka 备份机制 一.kafka由来 由于对JMS日常管理的过度开支和传统JMS可扩展性方面的局限,LinkedIn(www.linkedin.com)开发了Kafka以满足他们对实时数据流

Kafka系列之1—Kafka的总体认识

Kafka的总体认识 1.非中心的架构模型 2.基于TCP的一套Kafka通信协议 3.消息中间件&存储系统 4.存储逻辑层的高并发保证 5.isr机制降低了保证分布式一致性的代价 1. 非中心的架构模型 我们知道,在分布式系统的架构类型里,既有主从式的架构,也有非中心式的架构,像hadoop和hbase都采用了主从式的架构模型,主从式的架构优点有很多,但是主从式下为了避免单点故障而采取的各种策略使得主从式架构的优点并不那么理想,kafka作为一个分布式的消息系统,它采用了非中心式的架构模型,每

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

kafka学习总结之kafka简介

kafka是一个分布式,基于subscribe-publish的消息系统 特性:高吞吐量.低延迟.可扩展性.持久性(消息持久化到本地磁盘).可靠性.容错性(n个副本,允许n-1个节点失败).高并发(支持数千个客户端同时读写) 设计思想: (1)       Consumer group:多个consumer可以组成一个group(group-id区分),每个消息只能被组中的一个consumer消费,同时消费一个消息的consumer一定不在同一组: (2)       消息状态:kafka中,消