一文精通kafka 消费者的三种语义

本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等。

(一) 创建topic

bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1

(二) 生产者

public class ProducerExample {
public static void main(String[] str) throws InterruptedException, IOException {
System.out.println("Starting ProducerExample ...");
sendMessages();
}
private static void sendMessages() throws InterruptedException, IOException {
Producer<String, String> producer = createProducer();
sendMessages(producer);
// Allow the producer to complete sending of the messages before program exit.
Thread.sleep(20);
}
private static Producer<String, String> createProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
// Controls how much bytes sender would wait to batch up before publishing to Kafka.
props.put("batch.size", 10);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer(props);
}
private static void sendMessages(Producer<String, String> producer) {
String topic = "normal-topic";
int partition = 0;
long record = 1;
for (int i = 1; i <= 10; i++) {
producer.send(
new ProducerRecord<String, String>(topic, partition, Long.toString(record),Long.toString(record++)));
}
}
}

(三)消费者

消费者注册到kafka有多种方式:

subscribe:这种方式在新增topic或者partition或者消费者增加或者消费者减少的时候,会进行消费者组内消费者的再平衡。

assign:这种方式注册的消费者不会进行rebalance。

上面两种方式都是可以实现,三种消费语义的。具体API的使用请看下文。

  1. At-most-once Kafka Consumer

做多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是

1). enable.auto.commit设置为true。

2). auto.commit.interval.ms设置为一个较低的时间范围。

3). consumer.commitSync()不要调用该方法。

由于上面的配置,就可以使得kafka有线程负责按照指定间隔提交offset。

但是这种方式会使得kafka消费者有两种消费语义:

a.最多一次语义->at-most-once

消费者的offset已经提交,但是消息还在处理,这个时候挂了,再重启的时候会从上次提交的offset处消费,导致上次在处理的消息部分丢失。

b. 最少一次消费语义->at-least-once

消费者已经处理完了,但是offset还没提交,那么这个时候消费者挂了,就会导致消费者重复消费消息处理。但是由于auto.commit.interval.ms设置为一个较低的时间范围,会降低这种情况出现的概率。

代码如下:

public class AtMostOnceConsumer {
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AtMostOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic. ‘assign‘ could be used here
// instead of ‘subscribe‘ to subscribe to specific partition.
consumer.subscribe(Arrays.asList("normal-topic"));
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "true");
// Auto commit interval, kafka would commit offset at this interval.
props.put("auto.commit.interval.ms", "101");
// This is how to control number of records being read in each poll
props.put("max.partition.fetch.bytes", "135");
// Set this if you want to always read from beginning.
// props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void proce***ecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the message.
Thread.sleep(20);
}
}

  1. At-least-once kafka consumer

实现最少一次消费语义的消费者也很简单。

1). 设置enable.auto.commit为false

2). 消息处理完之后手动调用consumer.commitSync()

这种方式就是要手动在处理完该次poll得到消息之后,调用offset异步提交函数consumer.commitSync()。建议是消费者内部实现密等,来避免消费者重复处理消息进而得到重复结果。最多一次发生的场景是消费者的消息处理完并输出到结果库(也可能是部分处理完),但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息。

代码如下:

public class AtLeastOnceConsumer {
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic. ‘assign‘ could be used here
// instead of ‘subscribe‘ to subscribe to specific partition.
consumer.subscribe(Arrays.asList("normal-topic"));
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg1";
props.put("group.id", consumeGroup);
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "true");
// Make Auto commit interval to a big number so that auto commit does not happen,
// we are going to control the offset commit via consumer.commitSync(); after processing // message.
props.put("auto.commit.interval.ms", "999999999999");
// This is how to control number of messages being read in each poll
props.put("max.partition.fetch.bytes", "135");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void proce***ecords(KafkaConsumer<String, String> consumer) throws {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
// Below call is important to control the offset commit. Do this call after you
// finish processing the business process.
consumer.commitSync();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the record.
Thread.sleep(20);
}
}

  1. 使用subscribe实现Exactly-once

使用subscribe实现Exactly-once 很简单,具体思路如下:

1). 将enable.auto.commit设置为false。

2). 不调用consumer.commitSync()。

3). 使用subcribe定于topic。

4). 实现一个ConsumerRebalanceListener,在该listener内部执行

consumer.seek(topicPartition,offset),从指定的topic/partition的offset处启动。

5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。

6). 实现密等,作为保护层。

代码如下:

public class ExactlyOnceDynamicConsumer {
private static OffsetManager offsetManager = new OffsetManager("storage2");
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting ExactlyOnceDynamicConsumer ...");
readMessages();
}
private static void readMessages() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Manually controlling offset but register consumer to topics to get dynamically
// assigned partitions. Inside MyConsumerRebalancerListener use
// consumer.seek(topicPartition,offset) to control offset which messages to be read.
consumer.subscribe(Arrays.asList("normal-topic"),
new MyConsumerRebalancerListener(consumer));
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// Control maximum data on each poll, make sure this value is bigger than the maximum // single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void proce***ecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
// Save processed offset in external storage.
offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
}
}
}
}
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
private OffsetManager offsetManager = new OffsetManager("storage2");
private Consumer<String, String> consumer;
public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
this.consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
}
}
}
/**

  • The partition offset are stored in an external storage. In this case in a local file system where
  • program runs.
    */
    public class OffsetManager {
    private String storagePrefix;
    public OffsetManager(String storagePrefix) {
    this.storagePrefix = storagePrefix;
    }
    /**
    • Overwrite the offset for the topic in an external storage.
    • @param topic - Topic name.
    • @param partition - Partition of the topic.
    • @param offset - offset to be stored.
      */
      void saveOffsetInExternalStore(String topic, int partition, long offset) {
      try {
      FileWriter writer = new FileWriter(storageName(topic, partition), false);
      BufferedWriter bufferedWriter = new BufferedWriter(writer);
      bufferedWriter.write(offset + "");
      bufferedWriter.flush();
      bufferedWriter.close();
      } catch (Exception e) {
      e.printStackTrace();
      throw new RuntimeException(e);
      }
      }
      /**
    • @return he last offset + 1 for the provided topic and partition.
      */
      long readOffsetFromExternalStore(String topic, int partition) {
      try {
      Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
      return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
      } catch (Exception e) {
      e.printStackTrace();
      }
      return 0;
      }
      private String storageName(String topic, int partition) {
      return storagePrefix + "-" + topic + "-" + partition;
      }
      }
  1. 使用assign实现Exactly-once

使用assign实现Exactly-once 也很简单,具体思路如下:

1). 将enable.auto.commit设置为false。

2). 不调用consumer.commitSync()。

3). 调用assign注册kafka消费者到kafka

4). 初次启动的时候,调用consumer.seek(topicPartition,offset)来指定offset。

5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。

6). 实现密等,作为保护层。

代码如下:

public class ExactlyOnceStaticConsumer {
private static OffsetManager offsetManager = new OffsetManager("storage1");
public static void main(String[] str) throws InterruptedException, IOException {
System.out.println("Starting ExactlyOnceStaticConsumer ...");
readMessages();
}
private static void readMessages() throws InterruptedException, IOException {
KafkaConsumer<String, String> consumer = createConsumer();
String topic = "normal-topic";
int partition = 1;
TopicPartition topicPartition =
registerConsumerToSpecificPartition(consumer, topic, partition);
// Read the offset for the topic and partition from external storage.
long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
// Use seek and go to exact offset for that topic and partition.
consumer.seek(topicPartition, offset);
proce***ecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg2";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// control maximum data on each poll, make sure this value is bigger than the maximum // single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
/**

  • Manually listens for specific topic partition. But, if you are looking for example of how to * dynamically listens to partition and want to manually control offset then see
  • ExactlyOnceDynamicConsumer.java
    */
    private static TopicPartition registerConsumerToSpecificPartition(
    KafkaConsumer<String, String> consumer, String topic, int partition) {
    TopicPartition topicPartition = new TopicPartition(topic, partition);
    List<TopicPartition> partitions = Arrays.asList(topicPartition);
    consumer.assign(partitions);
    return topicPartition;
    }
    /**
  • Process data and store offset in external store. Best practice is to do these operations
  • atomically.
    */
    private static void proce***ecords(KafkaConsumer<String, String> consumer) throws {
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
    }
    }
    }
    }

[完]

喜欢小编轻轻点个关注吧!

原文地址:http://blog.51cto.com/13952975/2318040

时间: 2024-07-30 08:41:46

一文精通kafka 消费者的三种语义的相关文章

十五天精通WCF——第一天 三种Binding让你KO80%的业务

原文:十五天精通WCF--第一天 三种Binding让你KO80%的业务 转眼wcf技术已经出现很多年了,也在.net界混的风生水起,同时.net也是一个高度封装的框架,作为在wcf食物链最顶端的我们所能做的任务已经简单的不能再简单了, 再简单的话马路上的大妈也能写wcf了,好了,wcf最基本的概念我们放在后面慢慢分析,下面我们来看看神奇的3个binding如何KO我们实际场景中的80%的业务场景. 一:basicHttpBinding 作为入门第一篇,也就不深入谈谈basic中的信道栈中那些啥

kafka的三种部署模式

/************* *kafka 0.8.1.1的安装部署 *blog:www.r66r.net *qq:26571864 **************/ 相关部署视频地址:http://edu.51cto.com/course/course_id-2374.html kafka的部署模式为3种模式 1)单broker模式 2)单机多broker模式 (伪集群) 3)多机多broker模式 (真正的集群模式) 第一种模式安装 1.在hadoopdn2机器上面上传kafka文件,并解压到

Kafka的三种客户端线程模型和一个小惊喜

Kafka 作为一个流式数据平台,对开发者提供了三种客户端:生产者 / 消费者.连接器.流处理.本文着重分析这三种客户端的线程模型.看到最后的通常都有惊喜.消费者的线程模型0.8 版本以前的消费者客户端会创建一个基于 ZK 的消费者连接器,一个消费者客户端是一个 Java 进程,消费者可以订阅多个主题,每个主题也可以多个线程.为了让消息在多个节点被分布式地消费,提高消息处理的吞吐量,Kafka 允许多个消费者订阅同一个主题,这些消费者需要满足"一个分区只能被一个消费者中的一个线程处理"

latex三种标准文类book, report, article的章节命令与层次深度

Latex有三种标准文类:book, report, article. 每种文类的章节命令和层次深度如下: 三种标准文类的章节命令与层次深度 层次深度 层次名 book report article -1 part \part \part   0 chapter \chapter \chapter \part 1 section \section \section \section 2 subsection \subsection \subsection \subsection 3 subsub

HTML 4.01 规定了三种文档类型

HTML 4.01 规定了三种文档类型:Strict.Transitional 以及 Frameset. 严格的——该 DTD 包含所有 HTML 元素和属性,但不包括展示性的和弃用的元素(比如 font).不允许框架集(Framesets). <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd"> 过渡的——该 DTD 包含所有

Thinking in Java---线程通信+三种方式实现生产者消费者问题

前面讲过线程之间的同步问题:同步问题主要是为了保证对共享资源的并发访问不会出错,主要的思想是一次只让一个线程去访问共享资源,我们是通过加锁的方法实现.但是有时候我们还需要安排几个线程的执行次序,而在系统内部线程的调度是透明的,没有办法准确的控制线程的切换.所以Java提供了一种机制来保证线程之间的协调运行,这也就是我们所说的线程调度.在下面我们会介绍三种用于线程通信的方式,并且每种方式都会使用生产者消费者问题进行检验. 一.使用Object类提供的线程通信机制 Object类提供了wait(),

三种方法解决android帮助文档打开慢

三种方法解决android帮助文档打开慢 经查是因为本地文档中的网页有如下两段js代码会联网加载信息,将其注释掉后就好了 <link rel="stylesheet" href="http://fonts.googleapis.com/css?family=Roboto:regular,medium,thin,italic,mediumitalic,bold" title="roboto"> <script src="

生产者消费者问题Java三种实现

生产者-消费者Java实现 2017-07-27 1 概述 生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品. 解决生产者/消费者问题的方法可分为两类: 采用某种机制保护生产者和消费者之间的同步: 在生产者和消费者之间建立一个管道. 第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式.第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强. 在Java中有四种方法支持同步,其中前三个是同步

[精通Objective-C]三种实现并发编程的方式

[精通Objective-C]三种实现并发编程的方式 参考书籍:<精通Objective-C>[美] Keith Lee 目录 精通Objective-C三种实现并发编程的方式 目录 线程 隐式创建并启动线程 显示创建并启动线程 操作和操作队列 用操作类实现并发 用操作队列实现并发 分派队列GCD 三种方式的比较 线程 隐式创建并启动线程 使用NSObject类中的performSelectorInBackground: withObject:方法可以隐式地创建和启动用于执行对象中方法的新线程