Kafka consumer在项目中的多线程处理方式

对于KafkaConsumer而言,它不像KafkaProducer,不是线程安全的,状态是在consumer中维护的,所以实现时要注意多线程的使用,一般有2种使用方法:

1:每个Consumer有自己的线程,consumer去拉取数据,并对数据处理,这种方式比较简单,易于实现,容易顺序处理消息

2:消费者处理者方式,创建一个线程池,在consumer拉取数据后,由线程池来中的线程来处理数据,把拉取数据与处理数据解耦,但数据处理有可能破坏partition的消息顺序

从Kafka 文档中我们也可以查到有关consumer多线程的处理方式

项目实践:

下图是项目中对consumer的具体应用,虽然也使用了线程池,但其实还是上述第一种方式,线程池在此只是用于启动consumer的运行:

描述:

ConsumerGroup类:这对应于消费组,在上面第1步将创建一个监听对象,其将被传入到ConsumerGroup对象的创建过程中,在cg中将创建一个RunnableConsumer的对象列表(list),也就是上图第3步,列表中的consumer对象的数量将对应所期望的在Group组中consumer的数量。同时创建一个线程池对象executor,此处线程池的数量和consumer的数量一致

RunnableConsumer类:这是一个线程类,实现了Runnable接口,里面创建了一个KafkaConsumer对象,线程启动程序中执行对topic的订阅,并拉取消息

public class RunnableConsumer<K,V> implements Runnable {
    private Consumer<K,V> consumer;
    private final IConsumerListener<ConsumerRecords<K,V>> listener;

    private RunnableConsumer(final IConsumerListener<ConsumerRecords<K,V>> listener, Properties... props) {

        this.consumer = new KafkaConsumer<>(props, keyDeserClass, valueDeserClass);
        this.listener = listener;
    }

    public void run() {
        try {
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<K,V> records = null;
                try {

                    //now handle any new record(s)
                    records = consumer.poll(1000);
                    if(records != null && records.count() > 0) {
                        listener.notify(records);
                    }
                } catch(WakeupException wex) {
                    LOGGER.trace("Got a WakeupException. Doing nothing. Exception Details:",wex);
                } 

            }
        } catch (Throwable e) {
              // ignore as we are waking up from the poll, we need to cleanly shutdown our consumer
            LOGGER.error("getting non-recoverable throwable: ", e);
            throw e;
        } finally {
            //TODO: need to check on consumer closing that any outstanding offset commit is done.
            //otherwise we need to manually do it here.
            processCommit(SyncMode.SYNC);
            LOGGER.info("Trying to close Kafka consumer, ConsumerGroup.isRunning: {}", ConsumerGroup.this.isRunning);
            consumer.close();
        }
    }
}

Listener类:这是一个监听类,用于实际处理某一topic消息,先创建监听对象,在创建cg时,注册到RunnerConsumer类中,如果consumer拉取到消息,则将消息通知监听类去具体处理,不同的业务需要定义不同的业务监听类

修改为第二种方式

如果想使用第二种方式,将数据的处理从consumer中解耦出来,可以将上面的listener修改为一个线程类,在consumer中有拉取到消息,则从线程池中取出线程处理数据,这种方式的一个最大的问题,就是如何保证消息是按顺序处理的,例如,如果一个partition中先后有2条消息,当consumer poll到消息后,将提交到2个线程处理,这就无法保证顺序处理,需要额外的线程同步处理机制。同时因为不需要在consumer中对数据进行处理,consumer的性能也提高了,而且避免了数据处理超时,consumer Rebalance等潜在问题

records = consumer.poll(1000);
if(records != null && records.count() > 0) {
       executor.submit(new listener(records));
}

参考:

http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded

https://howtoprogram.xyz/2016/05/29/create-multi-threaded-apache-kafka-consumer/

原文地址:https://www.cnblogs.com/benfly/p/9242294.html

时间: 2024-10-10 16:17:55

Kafka consumer在项目中的多线程处理方式的相关文章

kafka在Maven项目中的使用

由于只是简单地了解和使用了kafka,所以对底层的东西并不做深入的分析,网上有很多资料介绍 kafka的安装以及它的配置,包括zookeeper集群的搭建.本文是在环境搭建好的情况下,介绍kafka在Maven项目中该如何使用. 1.kafka的配置文件 如果生产者和消费者都在一个模块里,那就只需要一个配置文件就行啦,如果在不同模块里的话就是两个(当然看你用什么环境,一个环境下一个配置文件,自己根据不同的环境进行配置,开发.测试.生产环境除了连接地址不一样外,其它可以配置成一样的).本文的生产者

.Net项目中通过ODAC方式链接Oracle数据库相关配置

.Net项目中通过ODAC方式链接Oracle数据库相关配置 一.开发环境 1.关于ODAC的链接方式 ODAC 直接使用 Oracle 调用接口 (OCI). OCI 是一种允许应用程序开发人员使用第三方开发语言存取Oracle数据服务器的过程或函数以及控制所有的SQL语句执行状态的应用程序接口. 2.使用ODAC方式的优点 使用ODAC的Net选项而无需在客户机上安装Oracle客户端来连接到Oracle.在这种情况下,ODAC仅需要TCP/IP协议的支持,从而可以创建真正的.最小的数据库应

node.js项目中使用coffeescript的方式汇总

Coffeescript作为Javascript低调的小弟实在是有过人之处,使用它可以增进开发效率,减少代码错误, 关键是能大幅提升开发愉悦感.我越来越觉得只要可能就在自己的项目中把coffee用起来. 然而也许你和我一样,在了解完coffeescript的语法后准备一试身手的时候,却面对如何把它引入项目而犯起愁来. 其实coffeescript这种语言因其可以一对一地翻译为javascript的特性,使用起来其实非常灵活. 将其引入项目的方式也不止一个.这里,我先就node项目引入coffee

在基于MVC的Web项目中使用Web API和直接连接两种方式混合式接入

在我之前介绍的混合式开发框架中,其界面是基于Winform的实现方式,后台使用Web API.WCF服务以及直接连接数据库的几种方式混合式接入,在Web项目中我们也可以采用这种方式实现混合式的接入方式,虽然Web API或者WCF方式的调用,相对直接连接数据库方式,响应效率上略差一些,不过扩展性强,也可以调动更多的设备接入,包括移动应用接入,网站接入,Winfrom客户端接入,这样可以使得服务逻辑相对独立,负责提供接口即可.这种方式中最有代表性的就是当前Web API的广泛应用,促进了各个接入端

(21)项目中Hibernate Session的管理方式

1.openSession和getCurrentSession的区别 package com.rk.hibernate.cache; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.hibernate.cfg.Configuration; import org.junit.Test; public class App_SessionInProject { private static Se

项目中遇到的进程间通信方式

1)socket:经常遇到,不讲了 2)信号:使用kill发送信号,signal,settimer等系统调用都能对另一个进程发送信号,达到了进程间通信的目的. kill(p1,16);    /*向进程号为p1的进程 发中断信号16*/ signal(SIGINT,go); /*接收到SIGINT信号后,转go函数去处理它*/ 3)共享内存:使用mmap系统调用能够做到共享内存,mmap的使用方式是以fd为入参,两个进程都打开一个文件名,并用mmap将这个fd映射到各自的进程环境,使用mmap反

web项目中实现页面跳转的两种方式

<a href="javascript:"></a>跳转在网页本身,URL不改变 <a href="#"></a> 跳转在网页本身,URL 改变 java web项目中实现页面跳转的主要方式有两种:第一种,<% response.sendRedirect("index.jsp");%>第二种<jsp:forward page="index.jsp"/>我做

Spring依赖注入——java项目中使用spring注解方式进行注入

注解注入顾名思义就是通过注解来实现注入,Spring和注入相关的常见注解有Autowired.Resource.Qualifier.Service.Controller.Repository.Component. Autowired是自动注入,自动从spring的上下文找到合适的bean来注入 Resource用来指定名称注入 Qualifier和Autowired配合使用,指定bean的名称 Service,Controller,Repository分别标记类是Service层类,Contro

开发技巧----------项目中常量类的定义方式

问题: 有开发经验的同学都知道,常量类是一个最常用的定义数据字典的方式.但是随着项目的开发时间和开发团队的变化经常会出现2中特别苦逼的情况.第一种情况是项目中到处都能看到各种各样的常量类:第二种情况是一个常量类里定义非常多的常量,甚至有的超过100了.这两种情况的缺点估计大家都非常的清楚,第一种代码离散.冗余.维护难:第二种也是维护难,更痛苦的时候用ide的时候很难找到自己需要的常量. 解决办法: 1.使用静态内部类对常量进行分组(可以多级分组,但是建议最多3级) 2.外部文件(这里不讨论) 3