从源码分析如何优雅的使用 Kafka 生产者

前言

在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢?

正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。

内容较多,对源码感兴趣的朋友请系好安全带??(源码基于 v0.10.0.0 版本分析)。同时最好是有一定的 Kafka 使用经验,知晓基本的用法。

简单的消息发送

在分析之前先看一个简单的消息发送是怎么样的。(以下代码基于 SpringBoot 构建。)

首先创建一个 org.apache.kafka.clients.producer.Producer 的 bean。

主要关注 bootstrap.servers,它是必填参数。指的是 Kafka 集群中的 broker 地址,例如 127.0.0.1:9094。

接着注入这个 bean 即可调用它的发送函数发送消息。

这里我给某一个 Topic 发送了 10W 条数据,运行程序消息正常发送。

但这仅仅只是做到了消息发送,对消息是否成功送达完全没管,等于是纯异步的方式。

同步

那么我想知道消息到底发送成功没有该怎么办呢?

其实 Producer 的 API 已经帮我们考虑到了,发送之后只需要调用它的 get() 方法即可同步获取发送结果。

发送结果:

这样的发送效率其实是比较低下的,因为每次都需要同步等待消息发送的结果。

异步

为此我们应当采取异步的方式发送,其实 send() 方法默认则是异步的,只要不手动调用 get() 方法。

但这样就没法获知发送结果。

所以查看 send() 的 API 可以发现还有一个参数。

Future<RecordMetadata> send(ProducerRecord<K, V> producer, Callback callback);

Callback 是一个回调接口,在消息发送完成之后可以回调我们自定义的实现。

执行之后的结果:

同样的也能获取结果,同时发现回调的线程并不是上文同步时的主线程,这样也能证明是异步回调的。

同时回调的时候会传递两个参数:
RecordMetadata 和上文一致的消息发送成功后的元数据。
Exception 消息发送过程中的异常信息。

但是这两个参数并不会同时都有数据,只有发送失败才会有异常信息,同时发送元数据为空。

所以正确的写法应当是:

源码分析

现在只掌握了基本的消息发送,想要深刻的理解发送中的一些参数配置还是得源码说了算。

首先还是来谈谈消息发送时的整个流程是怎么样的,Kafka 并不是简单的把消息通过网络发送到了 broker 中,在 Java 内部还是经过了许多优化和设计。

发送流程

为了直观的了解发送的流程,简单的画了几个在发送过程中关键的步骤。

从上至下依次是:

  • 初始化以及真正发送消息的 kafka-producer-network-thread IO 线程。
  • 将消息序列化。
  • 得到需要发送的分区。
  • 写入内部的一个缓存区中。
  • 初始化的 IO 线程不断的消费这个缓存来发送消息。

初始化

调用该构造方法进行初始化时,不止是简单的将基本参数写入 KafkaProducer。比较麻烦的是初始化 Sender 线程进行缓冲区消费。

初始化 IO 线程处:

可以看到 Sender 线程有需要成员变量,比如:acks,retries,requestTimeout

序列化消息

在调用 send() 函数后其实第一步就是序列化,毕竟我们的消息需要通过网络才能发送到 Kafka。

其中的 valueSerializer.serialize(record.topic(), record.value()); 是一个接口,我们需要在初始化时候指定序列化实现类。

我们也可以自己实现序列化,只需要实现 org.apache.kafka.common.serialization.Serializer 接口即可。

路由分区

接下来就是路由分区,通常我们使用的 Topic 为了实现扩展性以及高性能都会创建多个分区。

如果是一个分区好说,所有消息都往里面写入即可。

但多个分区就不可避免需要知道写入哪个分区。

通常有三种方式。

指定分区

可以在构建 ProducerRecord 为每条消息指定分区。

这样在路由时会判断是否有指定,有就直接使用该分区。

这种一般在特殊场景下会使用。

自定义路由策略

如果没有指定分区,则会调用 partitioner.partition 接口执行自定义分区策略。

而我们也只需要自定义一个类实现 org.apache.kafka.clients.producer.Partitioner 接口,同时在创建 KafkaProducer 实例时配置 partitioner.class 参数。

通常需要自定义分区一般是在想尽量的保证消息的顺序性。

或者是写入某些特有的分区,由特别的消费者来进行处理等。

默认策略

最后一种则是默认的路由策略,如果我们啥都没做就会执行该策略。

该策略也会使得消息分配的比较均匀。

来看看它的实现:

简单的来说分为以下几步:

获取 Topic 分区数。

  • 将内部维护的一个线程安全计数器 +1。
  • 与分区数取模得到分区编号。
  • 其实这就是很典型的轮询算法,所以只要分区数不频繁变动这种方式也会比较均匀。

写入内部缓存

在 send() 方法拿到分区后会调用一个 append() 函数:

该函数中会调用一个 getOrCreateDeque() 写入到一个内部缓存中 batches。

消费缓存

在最开始初始化的 IO 线程其实是一个守护线程,它会一直消费这些数据。

通过图中的几个函数会获取到之前写入的数据。这块内容可以不必深究,但其中有个 completeBatch 方法却非常关键。

调用该方法时候肯定已经是消息发送完毕了,所以会调用 batch.done() 来完成之前我们在 send() 方法中定义的回调接口。

从这里也可以看出为什么之前说发送完成后元数据和异常信息只会出现一个。

Producer 参数解析

发送流程讲完了再来看看 Producer 中比较重要的几个参数。

acks

acks 是一个影响消息吞吐量的一个关键参数。

主要有 [all、-1, 0, 1] 这几个选项,默认为 1。

前提是 Topic 配置副本数量 replica > 1。

当 acks = all/-1 时:
意味着会确保所有的 follower 副本都完成数据的写入才会返回。
这样可以保证消息不会丢失!
但同时性能和吞吐量却是最低的。

当 acks = 0 时:
producer 不会等待副本的任何响应,这样最容易丢失消息但同时性能却是最好的!

当 acks = 1 时:
这是一种折中的方案,它会等待副本 Leader 响应,但不会等到 follower 的响应。

一旦 Leader 挂掉消息就会丢失。但性能和消息安全性都得到了一定的保证。

batch.size

这个参数看名称就知道是内部缓存区的大小限制,对他适当的调大可以提高吞吐量。

但也不能极端,调太大会浪费内存。小了也发挥不了作用,也是一个典型的时间和空间的权衡。

retries

retries 该参数主要是来做重试使用,当发生一些网络抖动都会造成重试。

这个参数也就是限制重试次数。

但也有一些其他问题。

因为是重发所以消息顺序可能不会一致,这也是上文提到就算是一个分区消息也不会是完全顺序的情况。
还是由于网络问题,本来消息已经成功写入了但是没有成功响应给 producer,进行重试时就可能会出现消息重复。这种只能是消费者进行幂等处理。

高效的发送方式

如果消息量真的非常大,同时又需要尽快的将消息发送到 Kafka。一个 producer 始终会收到缓存大小等影响。

那是否可以创建多个 producer 来进行发送呢?
配置一个最大 producer 个数。

发送消息时首先获取一个 producer,获取的同时判断是否达到最大上限,没有就新建一个同时保存到内部的 List 中,保存时做好同步处理防止并发问题。

获取发送者时可以按照默认的分区策略使用轮询的方式获取(保证使用均匀)。这样在大量、频繁的消息发送场景中可以提高发送效率减轻单个 producer 的压力。

关闭 Producer

最后则是 Producer 的关闭,Producer 在使用过程中消耗了不少资源(线程、内存、网络等)因此需要显式的关闭从而回收这些资源。

默认的 close() 方法和带有超时时间的方法都是在一定的时间后强制关闭。

但在过期之前都会处理完剩余的任务。

所以使用哪一个得视情况而定。

转自:
https://segmentfault.com/a/1190000016643285

原文地址:https://www.cnblogs.com/chen-chen-chen/p/12272403.html

时间: 2024-11-05 18:28:22

从源码分析如何优雅的使用 Kafka 生产者的相关文章

keystone源码分析(一)——Paste Deploy的应用

本keystone源码分析系列基于Juno版Keystone,于2014年10月16日随Juno版OpenStack发布. Keystone作为OpenStack中的身份管理与授权模块,主要实现系统用户的身份认证.基于角色的授权管理.其他OpenStack服务的地址发现和安全策略管理等功能.Keystone作为开源云系统OpenStack中至关重要的组成部分,与OpenStack中几乎所有的其他服务(如Nova, Glance, Neutron等)都有着密切的联系.同时,Keystone作为开源

轻量级控件SnackBar应用&amp;源码分析

前言 SnackBar是Android Support Design Library库支持的一个控件,它在使用的时候经常和CoordinatorLayout一起使用,它是介于Toast和Dialog之间的产物,属于轻量级控件很方便的提供提示和动作反馈,有时候我们需要这样的控件,和Toast一样显示便可以消失,又想这个消息提示上进行用户的反馈.然而写Dialog只能通过点击去取消它,所以SnackBar的出现更加让界面优雅. Part 1.SnackBar的常规使用 Snackbar snackb

[Android]Fragment源码分析(三) 事务

Fragment管理中,不得不谈到的就是它的事务管理,它的事务管理写的非常的出彩.我们先引入一个简单常用的Fragment事务管理代码片段: FragmentTransaction ft = this.getSupportFragmentManager().beginTransaction(); ft.add(R.id.fragmentContainer, fragment, "tag"); ft.addToBackStack("<span style="fo

Spring源码分析——BeanFactory体系之抽象类、类分析(一)

上一篇介绍了BeanFactory体系的所有接口——Spring源码分析——BeanFactory体系之接口详细分析,本篇就接着介绍BeanFactory体系的抽象类和接口. 一.BeanFactory的基本类体系结构(类为主): 上图可与 Spring源码分析——BeanFactory体系之接口详细分析 的图结合分析,一个以接口为主,一个以类为主(PS:Spring的体系结构要分析清楚,不得不曲线救国啊!不然27寸屏幕给我画估计都装不下.). 具体: 1.7层的类体系继承. 2.Abstrac

leveldb源码分析--BloomFilter

bloomfilter是leveldb中的一大性能利器,所以为了文章的表现完整性这里新启这么一篇文章.leveldb中的bloomfilter的实现在bloom.cc中,是一个较为简单的实现,所以就不再具体进行分析.本文列出两个参考地址: 那些优雅的数据结构(1) : BloomFilter——大规模数据处理利器 是一个简单的bloomfilter的介绍乐和实现 Bloom Filter  则是一位专业的bloomfilter的研究人士的博客,内容及其的牛逼,对此感兴趣的阅读以下其文章应该有不小

java-通过 HashMap、HashSet 的源码分析其 Hash 存储机制

通过 HashMap.HashSet 的源码分析其 Hash 存储机制 集合和引用 就像引用类型的数组一样,当我们把 Java 对象放入数组之时,并非真正的把 Java 对象放入数组中.仅仅是把对象的引用放入数组中,每一个数组元素都是一个引用变量. 实际上,HashSet 和 HashMap 之间有非常多相似之处,对于 HashSet 而言.系统採用 Hash 算法决定集合元素的存储位置,这样能够保证能高速存.取集合元素:对于 HashMap 而言.系统 key-value 当成一个总体进行处理

《极简笔记》源码分析(二)

0. 介绍 此文将对Github上lguipeng大神所开发的 极简笔记 v2.0 (点我下载源码)代码进行分析学习. 通过此文你将学到: 应用源码的研读方法 MVP架构模式 Application的应用 Degger2依赖注入框架 搜索控件的使用 ButterKnife库的使用 Material主题 RecyclerView等新控件的用法 Lambda表达式 Java自定义注解 aFinal框架 RxJava框架 EventBus消息框架 布局文件常用技巧 PreferenceFragment

Java多线程 -- JUC包源码分析11 -- ThreadPoolExecutor源码分析

在JUC包中,线程池部分本身有很多组件,可以说是前面所分析的各种技术的一个综合应用.从本文开始,将综合前面的知识,逐个分析线程池的各个组件. -Executor/Executors -ThreadPoolExecutor使用介绍 -ThreadPoolExecutor实现原理 –ThreadPoolExecutor的中断与优雅关闭 shutdown + awaitTermination –shutdown的一个误区 Executor/Executors Executor是线程池框架最基本的几个接

第五篇:Spark SQL Catalyst源码分析之Optimizer

/** Spark SQL源码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化