目录
- Kafka生产者端
- 可靠性保证:
- spring-kafka生产端
Kafka生产者端
可靠性保证:
producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据:
1、不丢失
2、不重复
producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性。
acks 参数配置
为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到 ack,就会进行下一轮的发送,否则重新发送数据。
0: producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟, broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据;
1: producer 等待 broker 的 ack, partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;
-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后, broker 发送 ack 之前leader 发生故障,那么会造成数据重复。
Exactly Once 语义
当ack级别设置为-1的时候,可以保证producer到broker之间不会丢失数据,即At
Least Once 语义 。相对的,将服务器ack级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once 语义 。
At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的, At Least Once可以保证数据不重复,但是不能保证数据不丢失。
对于一些重要信息,我们要求既不能重复也不能丢失,这时我们需要使用Exactly Once 语义 。0.11 版本的 Kafka,引入了一项重大特性:幂等性。 所谓幂等性就是producer无论向broker发送了多少次重复数据,broker都只会持久化一条。幂等性结合At Least Once语义,就结合成了Kafka的Exactly Once语义。
At Least Once + 幂等性 = Exactly Once
启动幂等性,只需要将Producer的参数enable.idompotence 设置为true,ack设置为-1即可。
开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个分区的消息会附带Sequence Number(自动增长)。Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息提交的时候,broker只会持久化一条消息。
msg1<PID:1,Partition:1,SeqNumber:0,message : a >
msg2<PID:1,Partition:1,SeqNumber:1,message : b >
msg2<PID:1,Partition:1,SeqNumber:2,message : c >
但是,PID重启就会变化,同时不同分区也会有不同主键,所以幂等性无法保证跨分区跨会话。这里我们就需要引进kafka事务。
事务
Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败 。为了实现跨分区跨会话事务,引入一个全局唯一的Transaction id ,将pproducer的pid和Transaction id进行绑定,这样,当producer重启后,就可以通过Transaction ID 获得原来的 PID。这个参数通过客户端程序来进行设定 。
我们使用kafka消息事务的场景有以下两种:
- 在一次业务中,存在消费消息,又存在生产消息。此时如果消息生产失败,那么消费者需要回滚。这种情况称为consumer-transform-producer
- 在一次业务中,存在多次生产消息,其中后续生产的消息抛出异常,前置生产的消息需要回滚。
事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时
需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设
置为false,则会抛出异常。
以上是保证producer发送数据可靠性保证的相关参数,结合spring-kafka的具体使用如下。
spring-kafka生产端
spring-kafkaProducer.xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
<context:component-scan base-package="producer" />
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<!--broker集群地址-->
<entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
<!--acks 参数配置-->
<entry key="acks" value="all"/>
<!--发送失败重试次数-->
<entry key ="retries" value="3"/>
<!--批次发送大小的内存阀值-->
<entry key="batch.size" value="16384"/>
<!--批处理延迟时间上限-->
<entry key="linger.ms" value="1"/>
<!--开启幂等性-->
<entry key="enable.idempotence" value="true"/>
<!--批处理缓冲区-->
<entry key="buffer.memory" value="33554432"/>
<!--key序列化器-->
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<!--value序列化器-->
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<!--配置一个生产者监听器,在该类写发送成功或失败的回调方法-->
<bean id="producerLisener" class="producer.KafkaSendResultHandler"></bean>
<!--springkafka提供的发送类,对kafka发送方法进行加强性的封装-->
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="myTopic"/>
<property name="producerListener" ref="producerLisener"></property>
</bean>
<!--producer工厂-->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>
</beans>
部分重要参数详解:
acks:
? 这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功
的。
- ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知
不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最
大速度发送消息,从而达到很高的吞吐量。 - ack=1,默认值为1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响
应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收
到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,
如果收到写成功通知,此时首领节点还没来的及同步数据到follower节点,首领节点崩溃,就会导
致数据丢失。 - ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,
这种模式是最安全的,它可以保证不止一个服务器收到消息。注意:acks参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出异常
retries :
? 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到
了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待
100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。
batch.size :
? 当有多个消息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可
以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消息会被发送出
去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也可能
被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置
的太小,生产者会因为频繁发送消息而增加一些额外的开销。
max.request.size :
? 该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里
所有消息的总大小。 broker 对可接收的消息最大值也有自己的限制( message.max.size ),所以两
边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。
linger.ms:批处理延迟时间上限
buffer.memory:批处理缓冲区
enable.idempotence:是否开启幂等性
ProducerListener类
消息发送后的回调方法,注意的是,这里的监听回显的数据时要发送的数据,不是返回的数据,可以通过日志来观察发送数据是否正确。
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
log.info("kafka message send successful : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value+"---RecordMetadata:"+recordMetadata);
}
public void onError(String topic, Integer partition, Object key, Object value, Exception e) {
log.error("kafka message send fail : "+"---topic:"+topic+"---partition:"+partition+"---key:"+key+"---value:"+value);
e.printStackTrace();
}
public boolean isInterestedInSuccess() {
log.info("ProducerListener started");
return true;
}
}
ProducerClient类
对kafkaTemplate的再一次封装,kafka在消息发送的时候发送方式可以分为同步发送和异步发送。
同步发送:
? 同步发送的意思就是,一条消息发送之后,会阻塞当前线程, 直至返回 ack。
//同步发送
public void syncSend(){
testTemplate.send("topic",result.toString()).get(10, TimeUnit.SECONDS);
}
异步发送:
//异步发送
public void asyncSend() {
ListenableFuture<SendResult<Integer, String>> future = testTemplate.send("topic",result.toString());
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
log.info("success");
}
@Override
public void onFailure(Throwable ex) {
log.error("failure");
}
});
}
ProducerClient对kafkaTemplate的封装(不带事务)
这里只封装了最简单的发送方法,同时可对其他发送方法进行封装,只需要修改传参即可。
public class ProducerClient {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/*同步发送*/
//轮询方式发送
public void sendMessage(String topicName,String message){
Map<String,Object> m = new HashMap<String,Object>();
SendResult<String, String> sendResult = null;
try {
sendResult = kafkaTemplate.send(topicName, message).get();
/*检查recordMetadata的offset数据,不检查producerRecord*/
if(sendResult!=null) {
Long offsetIndex = sendResult.getRecordMetadata().offset();
if (offsetIndex != null && offsetIndex >= 0) {
m.put("code", KafkaMesConstant.SUCCESS_CODE);
m.put("message", KafkaMesConstant.SUCCESS_MES);
} else {
m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);
}
}else {
m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);
m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);
}
} catch (InterruptedException e) {
e.printStackTrace();
m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
} catch (ExecutionException e) {
e.printStackTrace();
m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
}
System.out.println("kafkaServers response : "+m);
}
}
public class KafkaMesConstant {
public static final String SUCCESS_CODE = "00000";
public static final String SUCCESS_MES = "成功";
/*kakfa-code*/
public static final String KAFKA_SEND_ERROR_CODE = "30001";
public static final String KAFKA_NO_RESULT_CODE = "30002";
public static final String KAFKA_NO_OFFSET_CODE = "30003";
/*kakfa-mes*/
public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系liuhui";
public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系liuhui";
public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系liuhui";
}
测试一下
public class excuter {
@Test
public void producer() throws InterruptedException {
ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerClient producerClient = (ProducerClient) context.getBean("producerClient");
producerClient.sendMessage("topic2", new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()).toString());
Thread.sleep(1000);
}
}
控制台结果:(我这里没有使用日志输出,在实际开发中需要使用日志开发)
ProducerListener started
kafka message send successful : ---topic:topic2---partition:null---key:null---value:2019-11-19 02:57:07---RecordMetadata:[email protected]
kafkaServers response : {code=00000, message=成功}
原文地址:https://www.cnblogs.com/luckyhui28/p/12003641.html