Kafka消费者——结合spring开发

Kafka消费者端

可靠性保证

作为消费端,消费数据需要考虑的是:

1、不重复消费消息

2、不缺失消费消息

自动提交 offset 的相关参数:

enable.auto.commit: 是否开启自动提交 offset 功能(true)
auto.commit.interval.ms: 自动提交 offset 的时间间隔 (1000ms = 1s)

手动提交offset 的相关参数:

enable.auto.commit: 是否开启自动提交 offset 功能(false)

异步提交也个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费 。所以,在保证数据完整性的前提下,选择同步提交同时尽量能在消费端进行消息去重的操作。

spring-kafka消费者端

spring-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">

    <context:component-scan base-package="listener" />
    <!--<context:component-scan base-package="concurrent" />-->

    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--broker集群-->
                <entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
                <!--groupid-->
                <entry key="group.id" value="group1"/>
                <!--
                earliest
                当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
                latest
                当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
                none
                topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
                -->
                <entry key="auto.offset.reset" value="earliest "/>
                <!--自动提交-->
                <entry key="enable.auto.commit" value="false"/>
                <!--自动提交重试等待时间-->
                <entry key="auto.commit.interval.ms" value="1000"/>
                <!--检测消费者故障的超时-->
                <entry key="session.timeout.ms" value="30000"/>
                <!--key反序列化-->
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <!--value反序列化-->
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!--consumer工厂-->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg  >
            <list>
                <value>topic1</value>
                <value>topic2</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerListener"/>
        <property name="pollTimeout" value="1000"/>
        <property name="AckMode" value="MANUAL"/>
    </bean>

    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" >
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>

    <!-- 并发消息监听容器,执行doStart()方法 -->
<!--    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        &lt;!&ndash;#消费监听器容器并发数&ndash;&gt;
        &lt;!&ndash;concurrency = 3&ndash;&gt;
        <property name="concurrency" value="3" />
    </bean>-->
</beans>

AckMode
RECORD每处理一条commit一次

BATCH(默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率

TIME 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)

COUNT 累积达到ackCount次的ack去commit

COUNT_TIMEackTime或ackCount哪个条件先满足,就commit

MANUAL listener负责ack,但是背后也是批量上去

MANUAL_IMMEDIATE listner负责ack,每调用一次,就立即commit

KafkaConsumerListener类

(同步提交)

@Component
public class KafkaConsumerListener implements AcknowledgingMessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> stringStringConsumerRecord, Acknowledgment acknowledgment) {
        System.out.printf("offset= %d, key= %s, value= %s,topic= %s,partition= %s\n",
                stringStringConsumerRecord.offset(),
                stringStringConsumerRecord.key(),
                stringStringConsumerRecord.value(),
                stringStringConsumerRecord.topic(),
                stringStringConsumerRecord.partition());
                acknowledgment.acknowledge();
    }
}

测试

    @Test
    public  void consumer() {
        ApplicationContext context = new ClassPathXmlApplicationContext("listener.xml");
        System.out.printf("启动listener");
        while (true) {

        }
    }

结果:

offset= 57, key= null, value= 2019-11-19 03:40:45,topic= topic1,partition= 0
offset= 4929, key= null, value= 2019-11-19 03:40:47,topic= topic2,partition= 2

kafka消费者如何才能从头开始消费某个topic的全量数据

消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):

(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);

(2)指定"auto.offset.reset"参数的值为earliest;

对应的spring-kafka消费者客户端配置参数为:

<!-- 指定消费组名 -->
<entry key="group.id" value="fg11"/>
<!-- 从何处开始消费,latest 表示消费最新消息,earliest 表示从头开始消费,none表示抛出异常,默认latest -->
<entry key="auto.offset.reset" value="earliest"/>

原文地址:https://www.cnblogs.com/luckyhui28/p/12003650.html

时间: 2024-10-07 08:39:41

Kafka消费者——结合spring开发的相关文章

Kafka生产者——结合spring开发

目录 Kafka生产者端 可靠性保证: spring-kafka生产端 Kafka生产者端 可靠性保证: producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据: 1.不丢失 2.不重复 producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性. acks 参数配置 为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledg

Spring整合kafka消费者和生产者&amp;redis的步骤

==================================================================================一.整合kafka(生产者)步骤1.导入依赖(pom.xml)2.编写配置文件,修改配置文件的ip和端口号,修改主题(producer.xml)3.如果再ssm项目中可以让spring.xml来加载这个配置文件 <import resource="classpath:XXX.xml" /> 如果是再测试类中如何加

Kafka 入门和 Spring Boot 集成

Kafka 入门和 Spring Boot 集成 标签:博客 [TOC] 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流).由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护. 应用场景 下面列举了一些kafka常见的应用场景. 消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景. 应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用

基于Spring开发的DUBBO服务接口测试

基于Spring开发的DUBBO服务接口测试 知识共享主要内容: 1. Dubbo相关概念和架构,以及dubbo服务程序开发步骤. 2. 基于Spring开发框架的dubbo服务接口测试相关配置. 3. spring test+junit和spring test+TestNG两种测试框架脚本编写方法. 一.        DUBBO与DUBBO架构 1.          什么是dubbo?DUBBO是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治

利用Maven搭建Spring开发环境 【转】

一.   概要说明 最近几天在测试Spring3.0的AOP功能,在测试功能之前,首先是要搭建出Spring3.0的开发功能.开始去官网下载Spring的相关jar包,但是这些jar包中还是会需要其他的一些jar包,于是又手动的去下载其他的相关jar包.这样也可以搭建出开发环境,但是需要频繁的去下载缺少的jar包,很麻烦.这里,我们可以还有一个更好的办法,采用maven来管理我们的工程,让maven来自动为我们去下载相关版本的jar包,具体的配置如下. 二.   下载并安装maven 去网上下载

使用Spring开发第一个HelloWorld应用

http://www.importnew.com/13246.html 让我们用Spring来写第一个应用程序吧. 完成这一章要求: 熟悉Java语言 设置好Spring的环境 熟悉简单的Eclipse IDE的操作 如果你还没有设置好环境,请参考Spring开发环境的配置. 我们第一个程序是打印”Hello World”语句,这个语句通过Spring的配置文件来设置. 1 – 新建Java项目: 第一步用Eclipse IDE新建一个项目. 点击 > File > New > Java

利用Maven搭建Spring开发环境

一.   概要说明 最近几天在测试Spring3.0的AOP功能,在测试功能之前,首先是要搭建出Spring3.0的开发功能.开始去官网下载Spring的相关jar包,但是这些jar包中还是会需要其他的一些jar包,于是又手动的去下载其他的相关jar包.这样也可以搭建出开发环境,但是需要频繁的去下载缺少的jar包,很麻烦.这里,我们可以还有一个更好的办法,采用maven来管理我们的工程,让maven来自动为我们去下载相关版本的jar包,具体的配置如下. 二.   下载并安装maven 去网上下载

学习spring2--跟我一起学Spring 3(3)–使用Spring开发第一个HelloWorld应用

http://www.importnew.com/13246.html 首页 所有文章 资讯 Web 架构 基础技术 书籍 教程 我要投稿 更多频道 » - 导航条 - 首页 所有文章 资讯 Web 架构 基础技术 书籍 教程 我要投稿 更多频道 » - iOS - Python - Android - Web前端 跟我一起学Spring 3(3)–使用Spring开发第一个HelloWorld应用 2014/10/10 | 分类: 教程 | 5 条评论 | 标签: SPRING, 教程 分享到

搭建Spring开发环境并编写第一个Spring小程序

一.前面,我写了一篇Spring框架的基础知识文章,里面没讲到如何配置Spring开发环境,今天就来讲一下,如果大家不知道怎么下载Spring软件包的话,可以看我那篇文章: http://blog.csdn.net/u012561176/article/details/45971917 ,里面讲述了2种获得Spring软件包的方式. 建议大家配置Spring环境之前先了解一下IoC(控制反转)的原理,可以看我写的文章:http://blog.csdn.net/u012561176/article