配置 kafka 同步刷盘

之前参加 rocketmq 的 meetup,台上有人讲,kafka 不支持同步刷盘,当时没太在意,今天抽空看了下代码:

kafka 提供了配置参数来支持同步刷盘,和 rocktmq 的做法不同(4.7 的 rmq 在 sync_disk 模式,统一在 GroupCommitService 中刷盘,不阻塞 SendMessageProcessor 线程)

kafka.log.Log#append

看下代码片段就好,不深入讲

// always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn‘t any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)

// update the first unstable offset (which is used to compute LSO)
updateFirstUnstableOffset()

trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
  s"first offset: ${appendInfo.firstOffset}, " +
  s"next offset: ${nextOffsetMetadata.messageOffset}, " +
  s"and messages: $validRecords")

// 我们所需要的同步刷盘
if (unflushedMessages >= config.flushInterval)
  flush()

appendInfo

再看下文档

public static final String FLUSH_MESSAGES_INTERVAL_CONFIG = "flush.messages";
public static final String FLUSH_MESSAGES_INTERVAL_DOC = "This setting allows specifying an interval at " +
    "which we will force an fsync of data written to the log. For example if this was set to 1 " +
    "we would fsync after every message; if it were 5 we would fsync after every five messages. " +
    "In general we recommend you not set this and use replication for durability and allow the " +
    "operating system‘s background flush capabilities as it is more efficient. This setting can " +
    "be overridden on a per-topic basis (see <a href=\"#topicconfigs\">the per-topic configuration section</a>).";

public static final String FLUSH_MS_CONFIG = "flush.ms";
public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " +
    "force an fsync of data written to the log. For example if this was set to 1000 " +
    "we would fsync after 1000 ms had passed. In general we recommend you not set " +
    "this and use replication for durability and allow the operating system‘s background " +
    "flush capabilities as it is more efficient.";

所以,可以通过 flush.messages 和 flush.ms 来配置刷盘策略

原文地址:https://www.cnblogs.com/allenwas3/p/12684443.html

时间: 2024-10-12 03:06:52

配置 kafka 同步刷盘的相关文章

Rocket重试机制,消息模式,刷盘方式

一.Consumer 批量消费(推模式) 可以通过 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条 这里需要分为2种情况 Consumer端先启动 Consumer端后启动.   正常情况下:应该是Consumer需要先启动 注意:如果broker采用推模式的话,consumer先启动,会一条一条消息的消费,consumer后启动会才用批量消费 Consumer端先启动 1.Consumer.java package quickstart

MySQL延迟问题和数据刷盘策略

一.MySQL复制流程官方文档流程图如下: 1.绝对的延时,相对的同步 2.纯写操作,线上标准配置下,从库压力大于主库,最起码从库有relaylog的写入. 二.MySQL延迟问题分析 1.主库DML请求频繁 原因:主库并发写入数据,而从库为单线程应用日志,很容易造成relaylog堆积,产生延迟. 解决思路:做sharding,打散写请求.考虑升级到MySQL 5.7+,开启基于逻辑时钟的并行复制. 2.主库执行大事务 原因:类似主库花费很长时间更新了一张大表,在主从库配置相近的情况下,从库也

配置的同步机制是如何实现的?

配置的同步机制是如何实现的? 配置的同步涉及到两个方面:第一,对原始的配置文件实施监控并在其发生变化之后从新加载配置:第二,配置重新加载之后及时通知应用程序进而使后者能够使用最新的配置.要了解配置同步机制的实现原理,先得从认识一个名为ConfigurationReloadToken的类型开始. [ 本文已经同步到<ASP.NET Core框架揭秘>之中] 目录一.从ConfigurationReloadToken说起二.Configuration对象与配置文件的同步三.应用重新加载的配置四.同

公开课视频-《第11章 配置-Citrix-企业网盘-第12章 配置-UPM》

***************************************************************************************** <大企业云桌面部署实战>-培训班-即将开课,包学包会,欢迎咨询:3313395633(QQ) ***************************************************************************************** <大企业云桌面部署实战>-精讲课

ASP.NET Core的配置(5):配置的同步[设计篇]

本节所谓的"配置同步"主要体现在两个方面:其一,如何监控配置源并在其变化的时候自动加载其数据,其目的是让应用中通过Configuration对象承载的配置与配置源的数据同步:其二.当Configuration对象承载的配置放生变换的时候如何向应用程序发送通知,最终让应用程序使用最新的配置. 一.配置与配置源的同步 配置模型提供了三个原生ConfigurationProvider(JsonConfigrationProvider.XmlConfigurationProvider和IniC

ASP.NET Core的配置(5):配置的同步[ 实例篇]

ConfigurationBuilder在生成以Configuration对象的时候会利用注册其中的ConfigurationProvider加载原始的配置数据,那么一旦配置源中的数据发生变化,应用程序中的使用的配置信息如何与之同步呢?如果需要在应用程序中实现对配置信息的实施同步,就需要对原始配置数据的进行监控,并在数据改变的时候重新加载配置数据.除此之外,重新加载的配置需要应用到程序中,我们必然需要一种通知机制. 为了让读者朋友们对配置同步机制在具体项目中的应用有个感官认识,我们先通过一个简单

.NET Core采用的全新配置系统[10]: 配置的同步机制是如何实现的?

配置的同步涉及到两个方面:第一,对原始的配置文件实施监控并在其发生变化之后从新加载配置:第二,配置重新加载之后及时通知应用程序进而使后者能够使用最新的配置.要了解配置同步机制的实现原理,先得从认识一个名为ConfigurationReloadToken的类型开始. [ 本文已经同步到<ASP.NET Core框架揭秘>之中] 目录一.从ConfigurationReloadToken说起二.Configuration对象与配置文件的同步三.应用重新加载的配置四.同步流程总结 一.从Config

Kafka详解之二、如何配置Kafka集群

Kafka集群配置比较简单,为了更好的让大家理解,在这里要分别介绍下面三种配置 单节点:一个broker的集群 单节点:多个broker的集群 多节点:多broker集群 一.单节点单broker实例的配置 1.首先启动zookeeper服务 Kafka本身提供了启动zookeeper的脚本(在kafka/bin/目录下)和zookeeper配置文件(在kafka/config/目录下),首先进入Kafka的主目录(可通过 whereis kafka命令查找到): [[email protect

GG配置Oracle同步到SQLServer

源端目标端准备一张测试用的表. Oracle源端: drop table ggmgr.t1; create table ggmgr.t1( id int primary key, name varchar2(50), time date); Sqlserver目标端: BEGIN TRANSACTION SET QUOTED_IDENTIFIER ON SET ARITHABORT ON SET NUMERIC_ROUNDABORT OFF SET CONCAT_NULL_YIELDS_NULL