Apache Kafka系列(四) 多线程Consumer方案

Apache Kafka系列(一) 起步

Apache Kafka系列(二) 命令行工具(CLI)

Apache Kafka系列(三) Java API使用

Apache Kafka系列(四) 多线程Consumer方案

本文的图片是通过PPT截图出的,读者如果修改意见请联系我

一、Consumer为何需要实现多线程

  假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息。该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用API写入消息到Kafka Cluster中,然后消息的订阅者通过Consumer读取消息,刚开始的时候系统架构图如下:

但是,随着用户数量的增多,通知的数据也会对应的增长。总会达到一个阈值,在这个点上,Producer产生的数量大于Consumer能够消费的数量。那么Broker中未消费的消息就会逐渐增多。即使Kafka使用了优秀的消息持久化机制来保存未被消费的消息,但是Kafka的消息保留机制限制(时间,分区大小,消息Key)也会使得始终未被消费的Message被永久性的删除。另一方面从业务上讲,一个消息通知系统的高延迟几乎算作是废物了。所以多线程的Consumer模型是非常有必要的。

二、多线程的Kafka Consumer 模型类别

  基于Consumer的多线程模型有两种类型:

  • 模型一:多个Consumer且每一个Consumer有自己的线程,对应的架构图如下:

  • 模型二:一个Consumer且有多个Worker线程

  两种实现方式的优点/缺点比较如下:

名称 优点 缺点
模型一
1.Consumer Group容易实现

2.各个Partition的顺序实现更容易


1.Consumer的数量不能超过Partition的数量,否则多出的Consumer永远不会被使用到

2.因没个Consumer都需要一个TCP链接,会造成大量的系统性能损耗

模型二 1.由于通过线程池实现了Consumer,横向扩展更方便
1.在每个Partition上实现顺序处理更困难。

例如:同一个Partition上有两个待处理的Message需要被线程池中的2个线程消费掉,那这两个线程必须实现同步

三、代码实现

3.1 前提

    • Kafka Broker 0.11.0
    • JDK1.8
    • IDEA
    • Maven3
    • Kafka环境搭建及Topic创建修改等请参照本系列的前几篇文章。

3.2 源码结构

其中,consumergroup包下面对应的是模型一的代码,consumerthread包下是模型二的代码。ProducerThread是生产者代码。

3.3 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.randy</groupId>
  <artifactId>kafka_multithread_consumer_model</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>kafka_multithread_consumer_model Maven Webapp</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>kafka_multithread_consumer_model</finalName>
  </build>
</project>

3.4 方案一:Consumer Group

  ProducerThread.java是一个生产者线程,发送消息到Broker

  ConsumerThread.java是一个消费者线程,由于消费消息

  ConsumerGroup.java用于产生一组消费者线程

  ConsumerGroupMain.java是入口类

3.4.1 ProducerThread.java 

package com.randy;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  11:41
 * Comment :
 */
public class ProducerThread implements Runnable {
    private final Producer<String,String> kafkaProducer;
    private final String topic;

    public ProducerThread(String brokers,String topic){
        Properties properties = buildKafkaProperty(brokers);
        this.topic = topic;
        this.kafkaProducer = new KafkaProducer<String,String>(properties);

    }

    private static Properties buildKafkaProperty(String brokers){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    @Override
    public void run() {
        System.out.println("start sending message to kafka");
        int i = 0;
        while (true){
            String sendMsg = "Producer message number:"+String.valueOf(++i);
            kafkaProducer.send(new ProducerRecord<String, String>(topic,sendMsg),new Callback(){

                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e != null){
                        e.printStackTrace();
                    }
                    System.out.println("Producer Message: Partition:"+recordMetadata.partition()+",Offset:"+recordMetadata.offset());
                }
            });
            // thread sleep 3 seconds every time
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("end sending message to kafka");
        }
    }
}

3.4.2 ConsumerThread.java

package com.randy.consumergroup;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  12:03
 * Comment :
 */
public class ConsumerThread implements Runnable {
    private static KafkaConsumer<String,String> kafkaConsumer;
    private final String topic;

    public ConsumerThread(String brokers,String groupId,String topic){
        Properties properties = buildKafkaProperty(brokers,groupId);
        this.topic = topic;
        this.kafkaConsumer = new KafkaConsumer<String, String>(properties);
        this.kafkaConsumer.subscribe(Arrays.asList(this.topic));
    }

    private static Properties buildKafkaProperty(String brokers,String groupId){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    @Override
    public void run() {
        while (true){
            ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(100);
            for(ConsumerRecord<String,String> item : consumerRecords){
                System.out.println("Consumer Message:"+item.value()+",Partition:"+item.partition()+"Offset:"+item.offset());
            }
        }
    }
}

3.4.3 ConsumerGroup.java

package com.randy.consumergroup;

import java.util.ArrayList;
import java.util.List;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  14:09
 * Comment :
 */
public class ConsumerGroup {
    private final String brokers;
    private final String groupId;
    private final String topic;
    private final int consumerNumber;
    private List<ConsumerThread> consumerThreadList = new ArrayList<ConsumerThread>();

    public ConsumerGroup(String brokers,String groupId,String topic,int consumerNumber){
        this.groupId = groupId;
        this.topic = topic;
        this.brokers = brokers;
        this.consumerNumber = consumerNumber;
        for(int i = 0; i< consumerNumber;i++){
            ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
            consumerThreadList.add(consumerThread);
        }
    }

    public void start(){
        for (ConsumerThread item : consumerThreadList){
            Thread thread = new Thread(item);
            thread.start();
        }
    }
}

3.4.4 ConsumerGroupMain.java  

package com.randy.consumergroup;

import com.randy.ProducerThread;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  14:18
 * Comment :
 */
public class ConsumerGroupMain {

    public static void main(String[] args){
        String brokers = "Server2:9092";
        String groupId = "group01";
        String topic = "HelloWorld";
        int consumerNumber = 3;

        Thread producerThread = new Thread(new ProducerThread(brokers,topic));
        producerThread.start();

        ConsumerGroup consumerGroup = new ConsumerGroup(brokers,groupId,topic,consumerNumber);
        consumerGroup.start();
    }
}

3.5 方案二:多线程的Consumer

  ConsumerThreadHandler.java用于处理发送到消费者的消息

  ConsumerThread.java是消费者使用线程池的方式初始化消费者线程

  ConsumerThreadMain.java是入口类

3.5.1 ConsumerThreadHandler.java

package com.randy.consumerthread;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:29
 * Comment :
 */
public class ConsumerThreadHandler implements Runnable {
    private ConsumerRecord consumerRecord;

    public ConsumerThreadHandler(ConsumerRecord consumerRecord){
        this.consumerRecord = consumerRecord;
    }

    @Override
    public void run() {
        System.out.println("Consumer Message:"+consumerRecord.value()+",Partition:"+consumerRecord.partition()+"Offset:"+consumerRecord.offset());
    }
}

3.5.2 ConsumerThread.java

package com.randy.consumerthread;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:42
 * Comment :
 */
public class ConsumerThread {

    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    // Threadpool of consumers
    private ExecutorService executor;

    public ConsumerThread(String brokers, String groupId, String topic){
        Properties properties = buildKafkaProperty(brokers,groupId);
        this.consumer = new KafkaConsumer<>(properties);
        this.topic = topic;
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public void start(int threadNumber){
        executor = new ThreadPoolExecutor(threadNumber,threadNumber,0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        while (true){
            ConsumerRecords<String,String> consumerRecords = consumer.poll(100);
            for (ConsumerRecord<String,String> item : consumerRecords){
                executor.submit(new ConsumerThreadHandler(item));
            }
        }
    }

    private static Properties buildKafkaProperty(String brokers, String groupId){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

}

3.5.3 ConsumerThreadMain.java

package com.randy.consumerthread;

import com.randy.ProducerThread;

/**
 * Author  : RandySun ([email protected])
 * Date    : 2017-08-20  16:49
 * Comment :
 */
public class ConsumerThreadMain {

    public static void main(String[] args){
        String brokers = "Server2:9092";
        String groupId = "group01";
        String topic = "HelloWorld";
        int consumerNumber = 3;

        Thread producerThread = new Thread(new ProducerThread(brokers,topic));
        producerThread.start();

        ConsumerThread consumerThread = new ConsumerThread(brokers,groupId,topic);
        consumerThread.start(3);

    }
}

四. 总结

  本篇文章列举了两种不同的消费者模式。两者各有利弊。所有代码都上传到了https://github.com/qizhelongdeyang/kafka_multithread_consumer_model.git ,如有疑问或者错误请指正

时间: 2024-11-03 21:12:07

Apache Kafka系列(四) 多线程Consumer方案的相关文章

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试

apache kafka系列之Producer处理逻辑

最近研究producer的负载均衡策略,,,,我在librdkafka里边用代码实现了partition 值的轮询方法,,,但是在现场验证时,他的负载均衡不起作用,,,所以来找找原因: 下文是一篇描写kafka处理逻辑的文章,转载过来,研究一下. apache kafka系列之Producer处理逻辑 标签: Kafka ProducerKafka Producer处理逻辑kafka生产者处理逻辑apache kafka系列 2014-05-23 11:42 3434人阅读 评论(2) 收藏 举

Apache Kafka系列(二) 命令行工具(CLI)

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka命令行工具(Command Line Interface,CLI),下文简称CLI. 1. 启动Kafka 启动Kafka需要两步: 1.1. 启动ZooKeeper [[email protected] kafka_2.12-0.11.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties 1.2.

Apache Kafka系列(一)

摘要: 1.Apache Kafka基本概念 2.Kafka的安装 3.基本工具创建Topic 本文基于centos7, Apache Kafka 0.11.0 一.基本概念 Apache Kafka是一个发布/订阅的消息系统,于2009年源自Linkedin,并与2011年开源.在架构方面,Kafka类似于其他的消息系统(ActiveMQ,RabbitMQ).但是Kafka有诸多的特性使得越来越流行: Kafka本身支持分布式,很容易横向扩展 高吞吐量,高性能 高容错性,即使宕机 Kafka在

apache kafka系列之-监控指标

apache kafka中国社区QQ群:162272557 1.监控目标 1.当系统可能或处于亚健康状态时及时提醒,预防故障发生 2.报警提示 a.短信方式 b.邮件 2.监控内容 2.1 机器监控 Kafkaserver指标 CPU Load Disk IO Memory 磁盘log.dirs文件夹下数据文件大小,要有定时清除策略 2.2 JVM监控 主要监控JAVA的 GC time(垃圾回收时间).JAVA的垃圾回收机制对性能的影响比較明显 2.3 Kafka系统监控 1.Kafka整体监

Apache Shiro系列四:Shiro的架构

Shiro的设计目标就是让应用程序的安全管理更简单.更直观. 软件系统一般是基于用户故事来做设计.也就是我们会基于一个客户如何与这个软件系统交互来设计用户界面和服务接口.比如,你可能会说:“如果用户登录了我们的系统,我就给他们显示一个按钮,点击之后可以查看他自己的账户信息.如果没有登录,我就给他显示一个注册按钮.” 上述应用程序在很大程度上是为了满足用户的需求而编写的,即便这个“用户”不是人,而是一个其他的软件系统.你仍然是按照谁当前正在与你的系统交互的逻辑来编写你的逻辑代码. Shiro的设计

【Apache KafKa系列之一】KafKa安装部署

kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能. 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息. 支持通过kafka服务器和消费机集群来分区消息. 支持Hadoop并行数据加载. Kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因

Apache kafka系列之在zookeeper中存储结构

1.topic注册信息 /brokers/topics/[topic] :存储某个topic的partitions所有分配信息 Schema: { "version": "版本编号目前固定为数字1", "partitions": { "partitionId编号": [ 同步副本组brokerId列表 ], "partitionId编号": [ 同步副本组brokerId列表 ], ....... } }