【转】解决Maxwell发送Kafka消息数据倾斜问题

最近用Maxwell解析MySQL的Binlog,发送到Kafka进行处理,测试的时候发现一个问题,就是Kafka的Offset严重倾斜,三个partition,其中一个的offset已经快200万了,另外两个offset才不到两百。
Kafka数据倾斜的问题一般是由于生产者使用的Partition接口实现类对分区处理的问题,一般是对key做hash之后,对分区数取模。当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势(参考Apache Kafka 0.10 技术内幕:数据倾斜详解)。
而使用Maxwell解析MySQL的Binlog发送到Kafka的时候,生产者是Maxwell,那么数据倾斜的问题明细就是Maxwell引起的了。

Maxwell官网查文档(Producers:kafka-partitioning Maxwell’s Daemon)得知,在Maxwell没有配置的情况下,默认使用数据库名作为计算分区的key,并使用Java默认的hashcode算法进行计算:

  1. A binlog event‘s partition is determined by the selected hash function and hash string as follows
  2. | HASH_FUNCTION(HASH_STRING) % TOPIC.NUMBER_OF_PARTITIONS
  3. The HASH_FUNCTION is either java‘s hashCode or murmurhash3. The default HASH_FUNCTION
  4. is hashCode. Murmurhash3 may be set with the kafka_partition_hash option.
  5. …………
  6. The HASH_STRING may be (database, table, primary_key, column). The default HASH_STRING
  7. is the database. The partitioning field can be configured using the
  8. producer_partition_by option.

而在很多业务系统中,不同数据库的活跃度差异是很大的,主体业务的数据库操作频繁,产生的Binlog也就很多,而Maxwell默认使用数据库作为key进行hash,那么显而易见,Binglog的操作经常都被分到同一个分区里面了。

于是我们在Maxwell启动命令中加入对应参数即可,这里我选择了Rowkey作为分区key,同时选用murmurhash3
哈希算法,以获得更好的效率和分布:

nohup /opt/maxwell-1.11.0/bin/maxwell --user=‘maxwell‘ --password=‘***‘ --host=‘***‘
--exclude_dbs=‘/^(mysql|maxwell|test)/‘ --producer=kafka --kafka.bootstrap.servers=***
--kafka_partition_hash=murmur3 --producer_partition_by=primary_key >> /root/maxwell.log &

用此命令重新启动Maxwell之后,观察Offset的变化,隔一段时间之后,各分区Offset的增量基本一致,问题解决!

Reference:

https://leibnizhu.gitlab.io/2018/01/03/%E8%A7%A3%E5%86%B3Maxwell%E5%8F%91%E9%80%81Kafka%E6%B6%88%E6%81%AF%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C%E9%97%AE%E9%A2%98/index.html 转发此文 解决Maxwell发送Kafka消息数据倾斜问题

http://ningg.top/apache-kafka-10-best-practice-tips-data-skew-details/  Apache Kafka 0.10 技术内幕:数据倾斜详解

原文地址:https://www.cnblogs.com/piperck/p/9531232.html

时间: 2024-07-31 13:00:09

【转】解决Maxwell发送Kafka消息数据倾斜问题的相关文章

解决spark中遇到的数据倾斜问题

一. 数据倾斜的现象 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败. 二. 数据倾斜的原因 常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作. 数据问题 key本身分布不均匀(包括大量的key为空) key的设置不合理 spark使用问题 shuffle时的并发度不够 计算方式有误 三. 数据倾斜的后果 spark中一个stage的执行时间受限于最后那个执行完的task,因此运行缓慢的任务会拖累整个

Spark 数据倾斜及其解决方案

本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/lqMu6lfk-Ny1ZHYruEeBdA 作者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班).先后从事过电子商务.开放平台.移动浏览器.推荐广告和大数据.人工智能等相关开发和架构.目前在vivo智能平台中心从事 AI中台建设以及广告推荐业务.擅长各种业务形态的业务架构.平台化以及各种业务解决方案. 本文从数据倾斜的危害.现象.原因等方面,由浅入深阐述Spark数据倾斜及其解决方案.

hive大数据倾斜总结

在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显.主要是因为在Job完成后的所得到的 Counters是整个Job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜的原因造成map处理数据量的差异过大,使得这些平均 值能代表的价值降低.Hive的执行是分阶段的,map处理数据量的差异取决于上一个stage的reduce输出,所以如何将数据均匀的分配到各个 reduce中,就是解决数据倾斜的根本所在.规避错误来更好的运行比解决错误更高效.在

关于数据倾斜

参考:http://www.cnblogs.com/ggjucheng/archive/2013/01/03/2842860.html 在做Shuffle阶段的优化过程中,遇到了数据倾斜的问题,造成了对一些情况下优化效果不明显.主要是因为在Job完成后的所得到的Counters是整个Job的总和,优化是基于这些Counters得出的平均值,而由于数据倾斜的原因造成map处理数据量的差异过大,使得这些平均值能代表的价值降低.Hive的执行是分阶段的,map处理数据量的差异取决于上一个stage的r

Hive学习之路 (十九)Hive的数据倾斜

1.什么是数据倾斜? 由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点 2.Hadoop 框架的特性 A.不怕数据大,怕数据倾斜 B.Jobs 数比较多的作业运行效率相对比较低,如子查询比较多 C. sum,count,max,min 等聚集函数,通常不会有数据倾斜问题 3.主要表现 任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大. 单一 reduce 处理的记录数和平均记

hive高级操作(优化,数据倾斜优化)

2019/2/21 星期四 hive高级操作(优化,数据倾斜优化) 分区表/桶表应用,skew,map-join //见hive的基本语法行列转换 hive 优化hive 优化思想Explain 的使用经典案例(distinct count) 数据倾斜的原因操作:关键词 情形 后果1.Join 其中一个表较小,但是key 集中分发到某一个或几个Reduce 上的数据远高于平均值 :2.大表与大表,但是分桶的判断字段0 值或空值过多这些空值都由一个reduce 处理,非常慢:3.group by

数据倾斜的原因和解决方案

MapReduce简介MapReduce是面向大数据并行处理的计算模型.框架和平台,它隐含了以下三层含义: 1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure).它允许用市场上普通的商用服务器构成一个包含数十.数百至数千个节点的分布和并行计算集群. 2)MapReduce是一个并行计算与运行软件框架(Software Framework).它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,

分布式场景下Kafka消息顺序性的思考

如果业务中,对于kafka发送消息异步消费的场景,在业务上需要实现在消费时实现顺序消费, 利用kafka在partition内消息有序的特点,消息消费时的有序性. 1.在发送消息时,通过指定partition hash 2.consumer 消费消息时,需要使用亲缘性线程池进行消费,才能实现消息的基本有序.否则即使通过发送时指定partition,在消费端由于线程池的异步消费,消息之间的处理都是并发进行的,消息就会被打乱. 上面的方式基本可以实现消息的消费顺序性,除了在极端场景下,比如: 1.进

kafka通过控制台模拟消息发送和消息接收正常,但是通过javaAPI操作生产者发送消息不成功 消费者接收不到数据解决方案?

通过命令行工具(kafka-console-producer.sh和kafka-console-consumer.sh)是能够相互通信的,producer发布的信息consumer能够接收到. 但是java通过kafka-client的API写的代码始终不能跟kafka通信:java producer的消息发不出去, java comsumer也收不到任何消息.仔细检查了下代码中IP.端口都没有写错. 解决办法将kafka/config/server.properties文件中advertise