Kafka生产者——结合spring开发

目录

  • Kafka生产者端

    • 可靠性保证:
    • spring-kafka生产端

Kafka生产者端

可靠性保证:

producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据:

1、不丢失

2、不重复

producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性。

acks 参数配置

为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到 ack,就会进行下一轮的发送,否则重新发送数据。

0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;

1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;

-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前leader 发生故障,那么会造成数据重复。

Exactly Once 语义

当ack级别设置为-1的时候,可以保证producer到broker之间不会丢失数据,即At
Least Once 语义 。相对的,将服务器ack级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once 语义 。

At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。

对于一些重要信息,我们要求既不能重复也不能丢失,这时我们需要使用Exactly Once 语义 。0.11 版本的 Kafka,引入了一项重大特性:幂等性。 所谓幂等性就是producer无论向broker发送了多少次重复数据,broker都只会持久化一条。幂等性结合At Least Once语义,就结合成了Kafka的Exactly Once语义。
At Least Once + 幂等性 = Exactly Once
启动幂等性,只需要将Producer的参数enable.idompotence 设置为true,ack设置为-1即可。

开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个分区的消息会附带Sequence Number(自动增长)。Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息提交的时候,broker只会持久化一条消息。
msg1<PID:1,Partition:1,SeqNumber:0,message : a >
msg2<PID:1,Partition:1,SeqNumber:1,message : b >
msg2<PID:1,Partition:1,SeqNumber:2,message : c >

但是,PID重启就会变化,同时不同分区也会有不同主键,所以幂等性无法保证跨分区跨会话。这里我们就需要引进kafka事务。

事务

Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。为了实现跨分区跨会话事务,引入一个全局唯一的Transaction id ,将pproducer的pid和Transaction id进行绑定,这样,当producer重启后,就可以通过Transaction ID 获得原来的 PID。这个参数通过客户端程序来进行设定 。

我们使用kafka消息事务的场景有以下两种:

  1. 在一次业务中,存在消费消息,又存在生产消息。此时如果消息生产失败,那么消费者需要回滚。这种情况称为consumer-transform-producer
  2. 在一次业务中,存在多次生产消息,其中后续生产的消息抛出异常,前置生产的消息需要回滚。

事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时
需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设
置为false,则会抛出异常。



以上是保证producer发送数据可靠性保证的相关参数,结合spring-kafka的具体使用如下。

spring-kafka生产端

spring-kafkaProducer.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
    <context:component-scan base-package="producer" />
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--broker集群地址-->
                <entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
                <!--acks 参数配置-->
                <entry key="acks" value="all"/>
                <!--发送失败重试次数-->
                <entry key ="retries" value="3"/>
                <!--批次发送大小的内存阀值-->
                <entry key="batch.size" value="16384"/>
                <!--批处理延迟时间上限-->
                <entry key="linger.ms" value="1"/>
                <!--开启幂等性-->
                <entry key="enable.idempotence" value="true"/>
                <!--批处理缓冲区-->
                <entry key="buffer.memory" value="33554432"/>
                <!--key序列化器-->
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <!--value序列化器-->
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!--配置一个生产者监听器,在该类写发送成功或失败的回调方法-->
    <bean id="producerLisener" class="producer.KafkaSendResultHandler"></bean>
    <!--springkafka提供的发送类,对kafka发送方法进行加强性的封装-->
    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="myTopic"/>
        <property name="producerListener" ref="producerLisener"></property>
    </bean>
    <!--producer工厂-->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties"/>
        </constructor-arg>
    </bean>
</beans>

部分重要参数详解:

acks:

? 这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功
的。

  • ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知
    不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最
    大速度发送消息,从而达到很高的吞吐量。
  • ack=1,默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响
    应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收
    到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,
    如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导
    致数据丢失。
  • ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,
    这种模式是最安全的,它可以保证不止一个服务器收到消息。

    注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常

retries :

? 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到
了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待
100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。

batch.size :

? 当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可
以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出
去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能
被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置
的太小,生产者会因为频繁发送消息而增加一些额外的开销。

max.request.size :

? 该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里
所有消息的总大小。 broker 对可接收的消息最大值也有自己的限制( message.max.size ),所以两
边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。

linger.ms:批处理延迟时间上限

buffer.memory:批处理缓冲区

enable.idempotence:是否开启幂等性

ProducerListener类

消息发送后的回调方法,注意的是,这里的监听回显的数据时要发送的数据,不是返回的数据,可以通过日志来观察发送数据是否正确。

public class KafkaSendResultHandler implements ProducerListener {
   private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        log.info("kafka message send successful : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value+"---RecordMetadata:"+recordMetadata);
    }

    public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
        log.error("kafka message send fail : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value);
        e.printStackTrace();
    }

    public boolean isInterestedInSuccess() {
        log.info("ProducerListener started");
        return true;
    }
}

ProducerClient类

对kafkaTemplate的再一次封装,kafka在消息发送的时候发送方式可以分为同步发送和异步发送。

同步发送:

? 同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回 ack。

  //同步发送
   public void syncSend(){
    testTemplate.send("topic",result.toString()).get(10, TimeUnit.SECONDS);
   }
    

异步发送:

//异步发送
   public void asyncSend() {

      ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString());

      future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
                    @Override
                    public void onSuccess(SendResult<Integer, String> result) {
                       log.info("success");
                    }
                    @Override
                    public void onFailure(Throwable ex) {
                       log.error("failure");
                    }
                });
}

ProducerClient对kafkaTemplate的封装(不带事务)

这里只封装了最简单的发送方法,同时可对其他发送方法进行封装,只需要修改传参即可。

public class ProducerClient {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    /*同步发送*/
    //轮询方式发送
    public void sendMessage(String topicName,String message){
        Map<String,Object> m = new HashMap<String,Object>();
        SendResult<String, String> sendResult = null;
        try {
            sendResult = kafkaTemplate.send(topicName, message).get();
            /*检查recordMetadata的offset数据,不检查producerRecord*/
            if(sendResult!=null) {
                Long offsetIndex = sendResult.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    m.put("code", KafkaMesConstant.SUCCESS_CODE);
                    m.put("message", KafkaMesConstant.SUCCESS_MES);
                } else {
                    m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
                    m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);
                }
            }else {
                m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);
                m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);
            }
        }  catch (InterruptedException e) {
            e.printStackTrace();
            m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
            m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
        } catch (ExecutionException e) {
            e.printStackTrace();
            m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
            m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
        }
        System.out.println("kafkaServers response : "+m);
    }
}
public class KafkaMesConstant {
    public static final String SUCCESS_CODE = "00000";
    public static final String SUCCESS_MES = "成功";

    /*kakfa-code*/
    public static final String KAFKA_SEND_ERROR_CODE = "30001";
    public static final String KAFKA_NO_RESULT_CODE = "30002";
    public static final String KAFKA_NO_OFFSET_CODE = "30003";

    /*kakfa-mes*/
    public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系liuhui";
    public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系liuhui";
    public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系liuhui";
}

测试一下

public class excuter {

    @Test
    public void producer() throws InterruptedException {
        ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
        ProducerClient producerClient = (ProducerClient) context.getBean("producerClient");
        producerClient.sendMessage("topic2", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()).toString());
        Thread.sleep(1000);
    }

}

控制台结果:(我这里没有使用日志输出,在实际开发中需要使用日志开发)

ProducerListener started
kafka message send successful : ---topic:topic2---partition:null---key:null---value:2019-11-19 02:57:07---RecordMetadata:[email protected]
kafkaServers response : {code=00000, message=成功}

原文地址:https://www.cnblogs.com/luckyhui28/p/12003641.html

时间: 2024-08-30 06:46:01

Kafka生产者——结合spring开发的相关文章

Kafka消费者——结合spring开发

Kafka消费者端 可靠性保证 作为消费端,消费数据需要考虑的是: 1.不重复消费消息 2.不缺失消费消息 自动提交 offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(true) auto.commit.interval.ms: 自动提交 offset 的时间间隔 (1000ms = 1s) 手动提交offset 的相关参数: enable.auto.commit: 是否开启自动提交 offset 功能(false) 异步提交也个缺点,那就

Kafka 入门和 Spring Boot 集成

Kafka 入门和 Spring Boot 集成 标签:博客 [TOC] 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流).由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护. 应用场景 下面列举了一些kafka常见的应用场景. 消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景. 应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用

基于Spring开发的DUBBO服务接口测试

基于Spring开发的DUBBO服务接口测试 知识共享主要内容: 1. Dubbo相关概念和架构,以及dubbo服务程序开发步骤. 2. 基于Spring开发框架的dubbo服务接口测试相关配置. 3. spring test+junit和spring test+TestNG两种测试框架脚本编写方法. 一.        DUBBO与DUBBO架构 1.          什么是dubbo?DUBBO是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,是阿里巴巴SOA服务化治

利用Maven搭建Spring开发环境 【转】

一.   概要说明 最近几天在测试Spring3.0的AOP功能,在测试功能之前,首先是要搭建出Spring3.0的开发功能.开始去官网下载Spring的相关jar包,但是这些jar包中还是会需要其他的一些jar包,于是又手动的去下载其他的相关jar包.这样也可以搭建出开发环境,但是需要频繁的去下载缺少的jar包,很麻烦.这里,我们可以还有一个更好的办法,采用maven来管理我们的工程,让maven来自动为我们去下载相关版本的jar包,具体的配置如下. 二.   下载并安装maven 去网上下载

使用Spring开发第一个HelloWorld应用

http://www.importnew.com/13246.html 让我们用Spring来写第一个应用程序吧. 完成这一章要求: 熟悉Java语言 设置好Spring的环境 熟悉简单的Eclipse IDE的操作 如果你还没有设置好环境,请参考Spring开发环境的配置. 我们第一个程序是打印”Hello World”语句,这个语句通过Spring的配置文件来设置. 1 – 新建Java项目: 第一步用Eclipse IDE新建一个项目. 点击 > File > New > Java

利用Maven搭建Spring开发环境

一.   概要说明 最近几天在测试Spring3.0的AOP功能,在测试功能之前,首先是要搭建出Spring3.0的开发功能.开始去官网下载Spring的相关jar包,但是这些jar包中还是会需要其他的一些jar包,于是又手动的去下载其他的相关jar包.这样也可以搭建出开发环境,但是需要频繁的去下载缺少的jar包,很麻烦.这里,我们可以还有一个更好的办法,采用maven来管理我们的工程,让maven来自动为我们去下载相关版本的jar包,具体的配置如下. 二.   下载并安装maven 去网上下载

学习spring2--跟我一起学Spring 3(3)–使用Spring开发第一个HelloWorld应用

http://www.importnew.com/13246.html 首页 所有文章 资讯 Web 架构 基础技术 书籍 教程 我要投稿 更多频道 » - 导航条 - 首页 所有文章 资讯 Web 架构 基础技术 书籍 教程 我要投稿 更多频道 » - iOS - Python - Android - Web前端 跟我一起学Spring 3(3)–使用Spring开发第一个HelloWorld应用 2014/10/10 | 分类: 教程 | 5 条评论 | 标签: SPRING, 教程 分享到

搭建Spring开发环境并编写第一个Spring小程序

一.前面,我写了一篇Spring框架的基础知识文章,里面没讲到如何配置Spring开发环境,今天就来讲一下,如果大家不知道怎么下载Spring软件包的话,可以看我那篇文章: http://blog.csdn.net/u012561176/article/details/45971917 ,里面讲述了2种获得Spring软件包的方式. 建议大家配置Spring环境之前先了解一下IoC(控制反转)的原理,可以看我写的文章:http://blog.csdn.net/u012561176/article

kafka生产者

1.kafka生产者是线程安全的,她允许多个线程共享一个kafka实例 2.kafka管理一个简单的后台线程,所有的IO操作以及与每个broker的tcp连接通信,如果没有正确的关闭生产者可能会造成资源泄露. kafka总共有以下的这些生产者实例   KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)           A producer is instantiated by providing a