关于Flink slot 和kafka topic 分区关系的说明

今天又有小伙伴在群里问 slot 和 kafka topic 分区(以下topic,默认为 kafka 的 topic )的关系,大概回答了一下,这里整理一份

首先必须明确的是,Flink Task Manager 的 slot 数 和 topic 的分区数是没有直接关系的,而这个问题其实是问的是: 任务的并发数与 slot 数的关系

最大并发数 =  slot 数

这里有两个原因:每个算子的不同并行不能在同一slot,不同的算子可以共享 slot ,所以最大并行度 就等于 slot 数。

这样就有了slot数和topic 分区数的间接关系在:我们可能会根据 kafka 的分区数配置我们 source (和后续的其他算子)算子的 并行度,而算子的 最大并行度决定 slot 数据(TM 的数量由 slot 的数量反向计算)

看一张官网的图:

说明:

第一个图:3 个 Task Manager,每个 3 个 slot,总共 9 个 slot

第二个图:Example 1 ,wordcount 案例,1 个并发,算子chain 在一起,只占一个 slot

第三个图:Examlple 2,wordcount 案例,2 个并发,占2 slot 。三种设置并行度的方式:

flink-conf.yaml 参数 parallelism.default: 2
flink -p 2  # 启动时加 -p 参数指定
env.setParallelism(2)

第四个图:Example 3,wordcount 案例,9 个并发,占 9  slot

第五个图:Example 3,wordcount 案例,source 9 个并发,sink 1 个并发,占 9 个slot(sink 和其中一个 source chain 在一起了)

看一个具体的任务:  

我们要读的 topic 有 2 个 partition,我们设置 source 算子的并行度为 2,那我们最小就需要 4 个 slot,Task Manager 配置的 slot 数为2, 那最少就需要 2 个 TM 任务才能正常运行(不考虑其他算子)。

关键代码:

env.setParallelism(2)
env.addSource(source).addSink(sink)

提交到yarn 上

上面说明了算子的并发度与TM 的 slot 数的关系。

下面看下,kafka 分区数与 source 算子的并行度关系。

在不修改 kafka consumer 的分区分配策略的情况下,soure 的并行度与 topic 分区数在不同情况下,会有不同的表现,如下:

1、source 并行度 =  topic 分区数,正好的情况,一个 并行度,读一个分区的数据

2、source 并行读  < topic 分区数, 会出现部分 并行度读多个 分区的情况,具体可见:flink 读取kafka 数据,partition分配

3、source 并行度 > topic 分区数,会出现部分并行度没有数据的情况

总结下问题:slot 数和 topic 的分区数并没有直接关系,以kafka 做 source 的情况最多,而 kafka topic 的分区数一般又是 Flink source 的并行度,又是 Flink 任务的最大并发度,一般情况下又是 slot 的数量,所以会有一种 slot 数 和 topic 分区数 有直接关系的假象。

注:Task Manager 的 slot 数在 flink-conf.yaml 中配置 参数:

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 2 # 默认值为1

官网 slot 配置说明:https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#configuring-taskmanager-processing-slots (slot 数量推荐是在只有一个任务的情况下,具体配置要看实际情况)

 

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

原文地址:https://www.cnblogs.com/Springmoon-venn/p/12023350.html

时间: 2024-08-04 08:42:11

关于Flink slot 和kafka topic 分区关系的说明的相关文章

kafka集群扩容后的topic分区迁移

kafka集群扩容后的topic分区迁移 ./bin/kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181  --alter --topic dftt --partitions 4 kafka集群扩容后,新的broker上面不会数据进入这些节点,也就是说,这些节点是空闲的:它只有在创建新的topic时才会参与工作.除非将已有的partition迁移到新的服务器上面:所以需要将一些topic的分区迁移到新的broker上. kaf

kafka的分区模式?

当别人问这个问题的时候,别人肯定是想你是否看过源码.是否针对不同场景改过kafka的分区模式 这是别人最想知道的是,你的message如何负载均衡的发送给topic的partition 我们用kafka的时候,可以动态指定partition,也可以不指定partition 当我们动态指定了partition的时候,kafka会将消息发送到指定的partition 如果没有指定partition 这就是关键了, 如何让这些消息,均衡的发送给每个partition 先看看发送消息的方式 kafka首

kafka topic制定规则

kafka topic的制定,我们要考虑的问题有很多,比如生产环境中用几备份.partition数目多少合适.用几台机器支撑数据量,这些方面如何去考量?笔者根据实际的维护经验,写一些思考,希望大家指正. 1.replicas数目 可以从上图看到,备份越多,性能越低,因为kafka的写入只写入主分区,备份相当于消费者从主分区pull数据,这样势必会造成性能的损耗,故建议在生产环境中使用一主一备即可. 2. partition数量 (1)设置partition数量的时候我们需要注意:kafka的pa

Kafka Topic Partition Replica Assignment实现原理及资源隔离方案

本文共分为三个部分: Kafka Topic创建方式 Kafka Topic Partitions Assignment实现原理 Kafka资源隔离方案 1. Kafka Topic创建方式 Kafka Topic创建方式有以下两种表现形式: (1)创建Topic时直接指定Topic Partition Replica与Kafka Broker之间的存储映射关系 /usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeep

手动删除Kafka Topic

一.删除Kafka topic 运行./bin/kafka-topics  --delete --zookeeper [zookeeper server]  --topic [topic name]:如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion 可以通过命令:./bin/kafka-topics --zookeeper

Flink批处理优化器之范围分区重写

为最终计划应用范围分区重写 Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下: final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); final DataSet<Tuple2<Integer, String>> ds = getTupleDataSet(env); ds.p

u-boot中分区和内核MTD分区关系

一.u-boot中环境变量与uImage中MTD的分区关系 分区只是内核的概念,就是说A-B地址放内核,C-D地址放文件系统,(也就是规定哪个地址区间放内核或者文件系统)等等. 一般我们只需要分3-4个区,第一个为boot区,一个为boot参数区(传递给内核的参数),一个为内核区,一个为文件系统区.(但是有的内核就会有很多分区,比如内核参数会有两个,还有会Logo的地址) 而对于bootloader中只要能将内核下载到A~B区的A地址开始处就可以,C~D区的C起始地址下载文件系统…….这些起始地

用canal同步binlog到kafka,spark streaming消费kafka topic乱码问题

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有kafka和RocketMQ. 在投递的时候我们使用的是非压平的消息模式(canal.mq.flatMessage =false //是否为flat json格式对象),然后消费topic的时候就一直无法正常显示和序列化,通过kafka-console-consumer.sh命令收到的消息如下图 在github上也能找到相关问题 canal-kafka 数据同步到

Flink生产数据到Kafka频繁出现事务失效导致任务重启

在生产中需要将一些数据发到kafka,而且需要做到EXACTLY_ONCE,kafka使用的版本为1.1.0,flink的版本为1.8.0,但是会很经常因为提交事务引起错误,甚至导致任务重启 kafka producer的配置如下 def getKafkaProducer(kafkaAddr: String, targetTopicName: String, kafkaProducersPoolSize: Int): FlinkKafkaProducer[String] = { val prop