Apache Kafka系列(一) 起步
Apache Kafka系列(二) 命令行工具(CLI)
Apache Kafka系列(三) Java API使用
Apache Kafka系列(四) 多线程Consumer方案
本文的图片是通过PPT截图出的,读者如果修改意见请联系我
一、Consumer为何需要实现多线程
假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息。该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用API写入消息到Kafka Cluster中,然后消息的订阅者通过Consumer读取消息,刚开始的时候系统架构图如下:
但是,随着用户数量的增多,通知的数据也会对应的增长。总会达到一个阈值,在这个点上,Producer产生的数量大于Consumer能够消费的数量。那么Broker中未消费的消息就会逐渐增多。即使Kafka使用了优秀的消息持久化机制来保存未被消费的消息,但是Kafka的消息保留机制限制(时间,分区大小,消息Key)也会使得始终未被消费的Message被永久性的删除。另一方面从业务上讲,一个消息通知系统的高延迟几乎算作是废物了。所以多线程的Consumer模型是非常有必要的。
二、多线程的Kafka Consumer 模型类别
基于Consumer的多线程模型有两种类型:
- 模型一:多个Consumer且每一个Consumer有自己的线程,对应的架构图如下:
- 模型二:一个Consumer且有多个Worker线程
两种实现方式的优点/缺点比较如下:
名称 | 优点 | 缺点 |
---|---|---|
模型一 |
1.Consumer Group容易实现 2.各个Partition的顺序实现更容易 |
1.Consumer的数量不能超过Partition的数量,否则多出的Consumer永远不会被使用到 2.因没个Consumer都需要一个TCP链接,会造成大量的系统性能损耗 |
模型二 | 1.由于通过线程池实现了Consumer,横向扩展更方便 |
1.在每个Partition上实现顺序处理更困难。 例如:同一个Partition上有两个待处理的Message需要被线程池中的2个线程消费掉,那这两个线程必须实现同步 |
三、代码实现
3.1 前提
-
- Kafka Broker 0.11.0
- JDK1.8
- IDEA
- Maven3
- Kafka环境搭建及Topic创建修改等请参照本系列的前几篇文章。
3.2 源码结构
其中,consumergroup包下面对应的是模型一的代码,consumerthread包下是模型二的代码。ProducerThread是生产者代码。
3.3 pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.randy</groupId> <artifactId>kafka_multithread_consumer_model</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>kafka_multithread_consumer_model Maven Webapp</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies> <build> <finalName>kafka_multithread_consumer_model</finalName> </build> </project>
3.4 方案一:Consumer Group
ProducerThread.java是一个生产者线程,发送消息到Broker
ConsumerThread.java是一个消费者线程,由于消费消息
ConsumerGroup.java用于产生一组消费者线程
ConsumerGroupMain.java是入口类
3.4.1 ProducerThread.java
package com.randy; import org.apache.kafka.clients.producer.*; import java.util.Properties; /** * Author : RandySun ([email protected]) * Date : 2017-08-20 11:41 * Comment : */ public class ProducerThread implements Runnable { private final Producer<String,String> kafkaProducer; private final String topic; public ProducerThread(String brokers,String topic){ Properties properties = buildKafkaProperty(brokers); this.topic = topic; this.kafkaProducer = new KafkaProducer<String,String>(properties); } private static Properties buildKafkaProperty(String brokers){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return properties; } @Override public void run() { System.out.println("start sending message to kafka"); int i = 0; while (true){ String sendMsg = "Producer message number:"+String.valueOf(++i); kafkaProducer.send(new ProducerRecord<String, String>(topic,sendMsg),new Callback(){ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ e.printStackTrace(); } System.out.println("Producer Message: Partition:"+recordMetadata.partition()+",Offset:"+recordMetadata.offset()); } }); // thread sleep 3 seconds every time try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end sending message to kafka"); } } }
3.4.2 ConsumerThread.java
package com.randy.consumergroup; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * Author : RandySun ([email protected]) * Date : 2017-08-20 12:03 * Comment : */ public class ConsumerThread implements Runnable { private static KafkaConsumer<String,String> kafkaConsumer; private final String topic; public ConsumerThread(String brokers,String groupId,String topic){ Properties properties = buildKafkaProperty(brokers,groupId); this.topic = topic; this.kafkaConsumer = new KafkaConsumer<String, String>(properties); this.kafkaConsumer.subscribe(Arrays.asList(this.topic)); } private static Properties buildKafkaProperty(String brokers,String groupId){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } @Override public void run() { while (true){ ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100); for(ConsumerRecord<String,String> item : consumerRecords){ System.out.println("Consumer Message:"+item.value()+",Partition:"+item.partition()+"Offset:"+item.offset()); } } } }
3.4.3 ConsumerGroup.java
package com.randy.consumergroup; import java.util.ArrayList; import java.util.List; /** * Author : RandySun ([email protected]) * Date : 2017-08-20 14:09 * Comment : */ public class ConsumerGroup { private final String brokers; private final String groupId; private final String topic; private final int consumerNumber; private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>(); public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){ this.groupId = groupId; this.topic = topic; this.brokers = brokers; this.consumerNumber = consumerNumber; for(int i = 0; i< consumerNumber;i++){ ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic); consumerThreadList.add(consumerThread); } } public void start(){ for (ConsumerThread item : consumerThreadList){ Thread thread = new Thread(item); thread.start(); } } }
3.4.4 ConsumerGroupMain.java
package com.randy.consumergroup; import com.randy.ProducerThread; /** * Author : RandySun ([email protected]) * Date : 2017-08-20 14:18 * Comment : */ public class ConsumerGroupMain { public static void main(String[] args){ String brokers = "Server2:9092"; String groupId = "group01"; String topic = "HelloWorld"; int consumerNumber = 3; Thread producerThread = new Thread(new ProducerThread(brokers,topic)); producerThread.start(); ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber); consumerGroup.start(); } }
3.5 方案二:多线程的Consumer
ConsumerThreadHandler.java用于处理发送到消费者的消息
ConsumerThread.java是消费者使用线程池的方式初始化消费者线程
ConsumerThreadMain.java是入口类
3.5.1 ConsumerThreadHandler.java
package com.randy.consumerthread; import org.apache.kafka.clients.consumer.ConsumerRecord; /** * Author : RandySun ([email protected]) * Date : 2017-08-20 16:29 * Comment : */ public class ConsumerThreadHandler implements Runnable { private ConsumerRecord consumerRecord; public ConsumerThreadHandler(ConsumerRecord consumerRecord){ this.consumerRecord = consumerRecord; } @Override public void run() { System.out.println("Consumer Message:"+consumerRecord.value()+",Partition:"+consumerRecord.partition()+"Offset:"+consumerRecord.offset()); } }
3.5.2 ConsumerThread.java
package com.randy.consumerthread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Author : RandySun ([email protected]) * Date : 2017-08-20 16:42 * Comment : */ public class ConsumerThread { private final KafkaConsumer<String, String> consumer; private final String topic; // Threadpool of consumers private ExecutorService executor; public ConsumerThread(String brokers, String groupId, String topic){ Properties properties = buildKafkaProperty(brokers,groupId); this.consumer = new KafkaConsumer<>(properties); this.topic = topic; this.consumer.subscribe(Arrays.asList(this.topic)); } public void start(int threadNumber){ executor = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true){ ConsumerRecords<String,String> consumerRecords = consumer.poll(100); for (ConsumerRecord<String,String> item : consumerRecords){ executor.submit(new ConsumerThreadHandler(item)); } } } private static Properties buildKafkaProperty(String brokers, String groupId){ Properties properties = new Properties(); properties.put("bootstrap.servers", brokers); properties.put("group.id", groupId); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("session.timeout.ms", "30000"); properties.put("auto.offset.reset", "earliest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } }
3.5.3 ConsumerThreadMain.java
package com.randy.consumerthread; import com.randy.ProducerThread; /** * Author : RandySun ([email protected]) * Date : 2017-08-20 16:49 * Comment : */ public class ConsumerThreadMain { public static void main(String[] args){ String brokers = "Server2:9092"; String groupId = "group01"; String topic = "HelloWorld"; int consumerNumber = 3; Thread producerThread = new Thread(new ProducerThread(brokers,topic)); producerThread.start(); ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic); consumerThread.start(3); } }
四. 总结
本篇文章列举了两种不同的消费者模式。两者各有利弊。所有代码都上传到了https://github.com/qizhelongdeyang/kafka_multithread_consumer_model.git ,如有疑问或者错误请指正