【转】Kafka producer原理 (Scala版同步producer)

转载自:http://www.cnblogs.com/huxi2b/p/4583249.html     供参考

本文分析的Kafka代码为kafka-0.8.2.1。另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本;一套是Java版的新版本。虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API。今天我们就分析一下旧版producer的代码。

producer还分为同步和异步模式,由属性producer.type指定,默认是sync,即同步发送模式。本文主要关注于同步发送的代码走读。下面以console-producer为例——console producer是Kafka自带的一个工具,它可以很方便地以键盘输入的方式接收消息并发送给指定的topic,非常适合作为我们学习的一个起点。

一、运行console-producer命令

我们的第一步是要启动一个console-producer实例。最简单的方式就是使用下面的命令:

除了绝对必要的topic, borker-list属性,我们并没有指定其他的参数。这几乎是最简单的启动方式了。

【刊误】console-producer如果不指定--sync默认应该是异步发送消息而非同步的,笔者之前说错了,所以命令应该调整为:

  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --sync

二、构建Producer配置信息

producer的第一步就是要构造producer的配置信息,比如metadata.broker.list和request.required.acks等,完整的参数列表可以查询Kafka官网,这些参数部分可以由启动console-producer时候指定,部分是有默认值的。举例来说,对于metadata.broker.list这样必须要指定的参数,在调用console-producer时候就必须传入broker-list的值给它赋值;而像request.required.acks这样的参数,虽然从名称上来看也是必要参数,但console-producer代码中提供了默认值,因此我们可以选择不显式提供request-required-acks的值,如下面代码所示:


1

2

3

4

5

val requestRequiredAcksOpt = parser.accepts("request-required-acks""The required acks of the producer requests")

     .withRequiredArg

     .describedAs("request required acks")

     .ofType(classOf[java.lang.Integer])

     .defaultsTo(0// 此处默认设置为0

三、构建JVM Shutdownhook

console-producer代码此处添加了一个JVM关闭钩子,用于确保producer的关闭。

四、发送消息

代码此处循环从键盘中接收一行文本作为消息发送。需要注意的时,默认情况下构造的消息是没有key的。由于是同步发送,每条消息都会在Producer的send方法中调用DefaultEventHandler的send方法进行发送,以下代码是ConsoleProducer.scala中消息发送部分代码:


1

2

3

4

5

do {

          message = reader.readMessage()    // 从LineMessageReader类中读取消息。该类接收键盘输入的一行文本作为消息

          if(message != null)

            producer.send(message.topic, message.key, message.message) // key默认是空,如果想要指定,需传入参数parse.key=true,默认key和消息文本之间的分隔符是‘\t‘

while(message != null// 循环接收消息,除非Ctrl+C或其他其他引发IOException操作跳出循环

下面代码是Producer.scala中的发送方法:  


1

2

3

4

5

6

7

8

9

10

11

def send(messages: KeyedMessage[K,V]*) {

    lock synchronized {

      if (hasShutdown.get) //如果producer已经关闭了抛出异常退出

        throw new ProducerClosedException

      recordStats(messages //更新producer统计信息

      sync match {

        case true => eventHandler.handle(messages) //如果是同步发送,直接使用DefaultEventHandler的handle方法发送

        case false => asyncSend(messages) // 否则,使用ayncSend方法异步发送消息——本文不考虑这种情况

      }

    }

  }

由上面的分析可以看出,真正的发送逻辑其实是由DefaultEventHandler类的handle方法来完成的。下面我们重点分析一下这个类的代码结构。

五、DefaultEventHandler与消息发送

这个类的handler方法可以同时支持同步和异步的消息发送。我们这里只考虑同步的代码路径。下面是消息发送的完整流程图:

  

以下代码是发送消息的核心逻辑:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {  // 属性message.send.max.retries指定了消息发送的重试次数,而outstandingProducerRequests就是序列化之后待发送的消息集合

      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) //将待发送消息所属topic加入到待刷新元数据的topic集合

      if (topicMetadataRefreshInterval >= 0 &&   

          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { //查看是否已过刷新元数据时间间隔

        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) // 更新topic元数据信息

        sendPartitionPerTopicCache.clear() //如果消息key是空,代码随机选择一个分区并记住该分区,以后该topic的消息都会往这个分区里面发送。sendPartitionPerTopicCache就是这个缓存

        topicMetadataToRefresh.clear //清空待刷新topic集合

        lastTopicMetadataRefreshTime = SystemTime.milliseconds

      }

      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) // 真正的消息发送方法

      if (outstandingProduceRequests.size > 0) { // 如果还有未发送成功的消息

        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))

        // back off and update the topic metadata cache before attempting another send operation

        Thread.sleep(config.retryBackoffMs) // 等待一段时间并重试

        // get topics of the outstanding produce requests and refresh metadata for those

        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))

        sendPartitionPerTopicCache.clear()

        remainingRetries -= 1 // 更新剩余重试次数

        producerStats.resendRate.mark()

      }

    }

下面具体说说各个子模块的代码逻辑:  

5.1 serialize方法

该方法虽然是叫序列化,但其实主要的作用就是将字节数组格式的消息体转成KeyedMessage格式。由于默认情况下我们没有指定key,因此在构造KeyedMessage时就只需要指定消息体就好了,如下面的代码所示:


1

2

3

4

5

6

serializedMessages +=

    new KeyedMessage[K,Message](

    topic = e.topic,

    key = e.key,

    partKey = e.partKey,

    message = new Message(bytes = encoder.toBytes(e.message))) // new Message时没有指定key

构建完KeyedMessage之后返回对应的消息集合即可。

5.2 更新topic元数据信息

Kafka是如何刷新某些topic的元数据信息的呢?它会向任意一个broker发送TopicMetadataRequest请求(TopicMetadataRequest是唯一一个能发给任意broker的请求API),使用获取的响应来更新连入broker的缓存。TopicMetadataRequest的响应信息包括对应topic的Leader、AR、ISR信息。

具体到代码而言,BrokerPartitionInfo的updateInfo方法就是做这件事情的,这个方法代码不多,我们逐行分析下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

def updateInfo(topics: Set[String], correlationId: Int) {

    var topicsMetadata: Seq[TopicMetadata] = Nil // TopicMetadata = topic信息+ 一组PartitionMetadata (partitionId + leader + AR + ISR)

    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) //构造TopicMetadataRequest并随机排列所有broker,然后从第一个broker开始尝试发送请求。一旦成功就终止后面的请求发送尝试。

    topicsMetadata = topicMetadataResponse.topicsMetadata //从response中取出zookeeper中保存的对应topic元数据信息

    // throw partition specific exception

    topicsMetadata.foreach(tmd =>{

      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))

      if(tmd.errorCode == ErrorMapping.NoError) {

        topicPartitionInfo.put(tmd.topic, tmd) //更新到broker的topic元数据缓存中

      else

        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))

      tmd.partitionsMetadata.foreach(pmd =>{

        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {

          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,

            ErrorMapping.exceptionFor(pmd.errorCode).getClass))

        // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata

      })

    })

    producerPool.updateProducer(topicsMetadata)

  }

关于上面代码中的最后一行, 我们需要着重说一下。每个producer应用程序都会保存一个producer池对象来缓存每个broker上对应的同步producer实例。具体格式为brokerId -> SyncProducer。SyncProducer表示一个同步producer,其主要的方法是send,支持两种请求的发送:ProducerRequest和TopicMetadataRequest。前者是发送消息的请求,后者是更新topic元数据信息的请求。为什么需要这份缓存呢?我们知道,每个topic分区都应该有一个leader副本在某个broker上,而只有leader副本才能接收客户端发来的读写消息请求。对producer而言,即只有这个leader副本所在的broker才能接收ProducerRequest请求。在发送消息时候,我们会首先找出这个消息要发给哪个topic,然后发送更新topic元数据请求给任意broker去获取最新的元数据信息——这部分信息中比较重要的就是要获取topic各个分区的leader副本都在哪些broker上,这样我们稍后会创建连接那些broker的阻塞通道(blocking channel)去实现真正的消息发送。Kafka目前的做法就是重建所有topic分区的leader副本所属broker上对应的SyncProducer实例——虽然我觉得这样实现有线没有必要,只更新消息所属分区的缓存信息应该就够了(当然,这只是我的观点,如果有不同意见欢迎拍砖)。以下是更新producer缓存的一些关键代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

val newBrokers = new collection.mutable.HashSet[Broker]

    topicMetadata.foreach(tmd => {   

      tmd.partitionsMetadata.foreach(pmd => {   

        if(pmd.leader.isDefined) //遍历topic元数据信息中的每个分区元数据实例,如果存在leader副本的,添加到newBrokers中以备后面更新缓存使用

          newBrokers+=(pmd.leader.get)

      })

    })

    lock synchronized {

      newBrokers.foreach(b => { //遍历newBrokers中的每个broker实例,如果在缓存中已经存在,直接关闭掉然后创建一个新的加入到缓存中;否则直接创建一个加入

        if(syncProducers.contains(b.id)){

          syncProducers(b.id).close()

          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))

        else

          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))

      })

    }

前面说了,如果只发送一条消息的话,其实真正需要更新的分区leader副本所述broker对应的SyncProducer实例只有一个,但目前的代码中会更新所有分区,不知道Java版本的producer是否也是这样实现,这需要后面继续调研!  

5.3 发送消息

更新完topic元数据信息之后就该真正地发送消息了,这是由dispatchSerializedData方法来实现的。该方法接收一组KeyedMessage消息集合并返回发送失败的消息集合。如果返回None自然表示发送成功。该方法主要的逻辑如下图所示:

  

为了更加直观地说明上图是如何完成消息发送的,我们先对Kafka环境做一些基本的假设。假设我们的Kafka环境有5个broker,ID分别为0, 1, 2, 3, 4。我们还定义了一个topic,名字是test-topic(其实名字不重要)。该topic有3个分区,分区ID分别是0, 1, 2,并假设每个分区的leader replica都是存在的。现在假设leader与broker的对应关系假定如下:

Topic 分区 Leader副本所在的broker ID
test-topic P0 0
test-topic P1 1
test-topic P2 3

如果基于这样的配置,假定我们使用producer API一次性发送4条消息,分别是M1,M2, M3和M4。现在就可以开始分析代码了,首先从消息分组及整理开始:

5.3.1 partitionAndCollate方法

了解一个方法最简单的方式就是学习它的输入,分析它的输出。该方法接收一组待发送的消息集合——用Scala表示的话就是Seq[KeyedMessage[K, Message]],在我们的例子中很显然这个集合中有4条消息。这个方法的输出比较复杂,完整的写法是:

Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]]]

熟悉Scala语法的朋友可能会知道,这个返回值类型表示该方法可能会返回None——这表示producer代码没法对你要发送的消息按照broker进行分组或在分组过程中遇到了严重的错误,只能返回None由上层代码来处理这种情况。如果确实返回了值,这个值长的是什么样子呢?拿我们的例子来说,假定每条消息去被发送到的分区如下:(这里的对应关系是假设的,其实在partitionAndCollate方法中会为每条消息都分配它要去的分区!)

消息 要被发送到的分区ID 该分区leader副本所在broker ID
M1 P0 0
M2 P0 0
M3 P1 1
M4 P2 3

那么这个方法返回的结果就是:

0 - > {test-topic + P0 -> {M1, M2}},

1 -> {test-topic + P1 -> {M3}},

2 -> {test-topic + P2 -> {M4}} 

}

该方法的效果就是将所有待发送的消息首先按照broker进行分组,然后再按照分区进行整理。

当然了,上面我们假定了每条消息要去的分区,其实这也是在partitionAndCollate方法中被计算出来的。主要的逻辑是:

1. 首先判断每条消息的分区key是否指定,如果指定了调用默认的分区类Partitioner的partition计算目标分区就是了。

2. 如果没有指定key,就像默认使用console-producer的情况,代码会首先从缓存中判断以前是否保存该topic的信息——即该topic下所有没有key的消息默认会被发送到同一个分区下。如果存在直接找出来就好了;否则随机挑选一个返回并把它加入到缓存中,如下面代码所示:


1

2

3

val index = Utils.abs(Random.nextInt) % availablePartitions.size // 随机确定broker id

val partitionId = availablePartitions(index).partitionId

sendPartitionPerTopicCache.put(topic, partitionId) // 加入缓存中以便后续使用

5.3.2 groupMessagesToSet方法

通过上一步中将待发送消息集合按照broker和topic分区进行分组,Kafka对要发送的消息进行了分区。该操作完成之后代码就需要遍历整理过的消息数据,获取消息数据中每个broker对应的分区消息映射,也就是类似于{test-topic + P0 -> {M1, M2}}这样的数据。然后将每个映射转换为这样的格式:

{(topic + 分区,  ByteBufferMessageSet(message),  (topic + 分区, ByteBufferMessage(message) }。还是以我们的例子而言,经过groupMessageToSet之后,每个broker对应的数据变为:

{

(topic + P0, ByteBufferMessageSet(M1, M2)),

(topic + P1, ByteBufferMessageSet(M3)),

(topic + P2, ByteBufferMessageSet(M4)),

}

这个方法还考虑压缩的情况,即producer的属性compression.codec中指定的压缩策略。如果启用了压缩,追加写当前日志段的时候会先解压缩消息再写入(详见Log.scala的append方法)。

5.3.3 send发送消息

这个方法基于上一步中构造的(topic+分区, ByteBufferMessageSet)元组构造ProducerRequest发送给对应的broker,并返回发送失败的topic分区集合。具体的逻辑如下:

1. 判断要发送到的broker id是否合法,如果小于0的话(通常是-1),说明消息要发送到的分区没有leader。这种情况下直接记录一个警告信息并直接返回未发送的消息集合

2. 如果broker id是合法的,那么还需要再判断一下要发送的消息是否为空,如果为空自然也不需要做什么,直接返回空集合就好了

3. 如果上一步中的确有要发送的消息,那么就根据request.required.acks以及超时时间等配置构造一个ProducerRequest将消息封装进这个请求中。

4. 获取这个broker上的syncProducer——这个也是从producer池缓存中拿到的,如果池缓存中没有的话也只是记录为一个警告,下次重试的时候刷新一下topic元数据信息就能够创建出来了。

5. 一旦拿到目标broker上的syncProducer,就可以使用它来发送请求了,即调用syncProducer.send(producerRequest)

6. 请求被Kafka server处理之后(如何处理的下面会有详细介绍)会发送一个对应的响应(response)给eventHandler。

7. 拿到response之后需要判断一下response是否为空。这其实还要看下request.required.acks的设置。当该值是默认值0时表示producer不需要等待broker的应答(acknowledgement),这可以带来最低的延迟但持久性也最差,因为如果一个broker宕机了有可能会丢失数据。如果该值是0, 那么Kafka处理完ProducerRequest之后并不发送任何response。因此若发现response是空,那么自然表示所有数据已经被发送了,返回空集合表示没有发送失败的分区消息

8. 但倘若request.required.acks是1(其实还有两种情况,比如分区数是0等——这里不做讨论),那么就表示producer在leader副本获得数据后需要等待broker的应答。这个值的设置有更好的持久化效果。假设request.required.acks是1的话,那么Kafka处理完请求后悔发送response,因此代码还要继续解析response中的数据以确定到底有无失败消息

9. 在开始解析response代码之前,先来说说ProduceResponse的格式,如下图所示:

  

response中比较重要的信息是topic下面多个分区对应的错误码和消息待追加的第一条消息的位移。

因此,在拿到response之后,需要先判断一下response中总的分区数是否和请求中的分区数一样,如果不同的话说明在返回的response不完整,Kafka代码会抛出异常。否则,就从response中找出那些有错误的分区(即错误码不是NoError的)并返回。

至此,客户端的producer程序就已经执行完毕了。可能有些人会感到奇怪?貌似消息只是以请求的方式被发送到Kafka server上,但消息不是还要被写入到日志中吗?这部分功能又是在哪里做的呢? 下面我们来看看Kafka server是如何处理ProducerRequest的?

六、 KafkaServer处理请求

Kafka server在启动的时候会开启N个线程来处理请求。其中N是由num.io.threads属性指定,默认是8。Kafka推荐你设置该值至少是机器上磁盘数。在KafkaServer的startup方法中,如代码所示:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

def startup() {

    ...

    // 创建一个请求处理的线程池,在构造时就会开启多个线程准备接收请求

    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)

    ...

}

class KafkaRequestHandlerPool {

    ...

    for(i <- 0 until numThreads) {

        runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)

        threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))

        threads(i).start() // 启动每个请求处理线程

    }

    ...

}

KafkaRequestHandler实际上是一个Runnable,它的run核心方法中以while (true)的方式调用api.handle(request)不断地接收请求处理,如下面的代码所示:  


1

2

3

4

5

6

7

8

9

10

11

12

class KafkaRequestHandler... extends Runnable {

    ...

    def run() {

        ...

        while (true) {

            ...

            apis.handle(request) // 调用apis.handle等待请求处理

        }

        ...

    }

    ...   

}

在KafkaApis中handle的主要作用就是接收各种类型的请求。本文只关注ProducerRequest请求:  


1

2

3

4

5

6

7

8

def handle(request: RequestChannel.Request) {

    ...

    request.requestId match {

        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) // 如果接收到ProducerRequest交由handleProducerOrOffsetCommitRequest处理

        case ...

    }

    ...

}

如此看来,核心的方法就是handleProducerOrOffsetCommitRequest了。这个方法之所以叫这个名字,是因为它同时可以处理ProducerRequest和OffsetCommitRequest两种请求,后者其实也是一种特殊的ProducerRequest。从Kafka 0.8.2之后kafka使用一个特殊的topic来保存提交位移(commit offset)。这个topic名字是__consumer_offsets。本文中我们关注的是真正的ProducerRequest。下面来看看这个方法的逻辑,如下图所示:

整体逻辑看上去非常简单,如下面的代码所示:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {

    ...

    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) // 将消息追加写入本地提交日志

    val numPartitionsInError = localProduceResults.count(_.error.isDefined) // 计算是否存在发送失败的分区

    if(produceRequest.requiredAcks == 0) { // request.required.acks = 0时的代码路径

      if (numPartitionsInError != 0) {

        info(("Send the close connection response due to error handling produce request " +

          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")

          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))

        requestChannel.closeConnection(request.processor, request) // 关闭底层Socket以告知客户端程序有发送失败的情况

      else {

        ...

      }

    else if (produceRequest.requiredAcks == 1 || // request.required.acks = 0时的代码路径,当然还有其他两个条件

        produceRequest.numPartitions <= 0 ||

        numPartitionsInError == produceRequest.numPartitions) {

      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))

                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))

      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) // 发送response给客户端

    else //  request.required.acks = -1时的代码路径

      // create a list of (topic, partition) pairs to use as keys for this delayed request

      val producerRequestKeys = produceRequest.data.keys.toSeq

      val statuses = localProduceResults.map(r =>

        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap

      val delayedRequest =  new DelayedProduce(...) // 此时需要构造延时请求进行处理,此段逻辑比较复杂,需要理解Purgatory的概念,本文暂不考虑

        ...

}

由上面代码可见,无论request.required.acks是何值,都需要首先将待发送的消息集合追加写入本地的提交日志中。此时如何按照默认值是是0的情况,那么这写入日志后需要判断下所有消息是否都已经发送成功了。如果出现了发送错误,那么就将关闭连入broker的Socket Server以通知客户端程序错误的发生。现在的关键是追加写是如何完成的?即方法appendToLocalLog如何实现的?该方法整体逻辑流程图如下图所示:

  

由于逻辑很直观,不对代码做详细分析,不过值得关注的是这个方法会捕获很多异常:

异常名称 具体含义 异常处理
KafakStorageException 这可能是不可恢复的IO错误 既然无法恢复,则终止该broker上JVM进程
InvalidTopicException 显式给__consumer_offsets topic发送消息就会有这个异常抛出,不要这么做,因为这是内部topic 将InvalidTopicException封装进ProduceResult返回
UnknownTopicOrPartitionException topic或分区不在该broker上时抛出该异常 将UnknownTopicOrPartitionException封装进ProduceResult返回
NotLeaderForPartitionException 目标分区的leader副本不在该broker上 将NotLeaderForPartitionException封装进ProduceResult返回
NotEnoughReplicasException 只会出现在request.required.acks=-1且ISR中的副本数不满足min.insync.replicas指定的最少副本数时会抛出该异常 将NotEnoughReplicasException封装进ProduceResult返回
其他 处理ProducerRequest时发生的其他异常 将对应异常封装进ProduceResult返回

okay,貌似现在我们就剩下最后一个主要的方法没说了。分析完这个方法之后整个producer发送消息的流程应该就算是完整地走完了。最后的这个方法就是Partition的appendMessagesToLeader,其主要代码如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0= {

    inReadLock(leaderIsrUpdateLock) {

      val leaderReplicaOpt = leaderReplicaIfLocal() // 判断目标分区的leader副本是否在该broker上

      leaderReplicaOpt match {

        case Some(leaderReplica) =// 如果leader副本在该broker上

          val log = leaderReplica.log.get // 获取本地提交日志文件句柄

          val minIsr = log.config.minInSyncReplicas

          val inSyncSize = inSyncReplicas.size

          // Avoid writing to leader if there are not enough insync replicas to make it safe

          if (inSyncSize < minIsr && requiredAcks == -1) { //只有request.required.acks等于-1时才会判断ISR数是否不足

            throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"

              .format(topic,partitionId,minIsr,inSyncSize))

          }

          val info = log.append(messages, assignOffsets = true// 真正的写日志操作,由于涉及Kafka底层写日志的,以后有机会写篇文章专门探讨这部分功能

          // probably unblock some follower fetch requests since log end offset has been updated

          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))

          // we may need to increment high watermark since ISR could be down to 1

          maybeIncrementLeaderHW(leaderReplica)

          info

        case None =// 如果不在,直接抛出异常表明leader不在该broker上

          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"

            .format(topic, partitionId, localBrokerId))

      }

    }

至此,一个最简单的scala版同步producer的代码走读就算正式完成了,可以发现Kafka设计的思路就是在每个broker上启动一个server不断地处理从客户端发来的各种请求,完成对应的功能并按需返回对应的response。希望本文能对希望了解Kafka producer机制的人有所帮助。

时间: 2024-10-02 14:45:34

【转】Kafka producer原理 (Scala版同步producer)的相关文章

【原创】Kafka producer原理 (Scala版同步producer)

本文分析的Kafka代码为kafka-0.8.2.1.另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本:一套是Java版的新版本.虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API.今天我们就分析一下旧版producer的代码. producer还分为同步和异步模式,由属性producer.type指定,默认是sync,即同步发送模式.本文也主要关注于同步发送的代码走读.下面以console-pr

Kafka原理与java simple producer示例

brokers和消费者使用zk来获取状态信息和追踪消息坐标. 每一个partition是一个有序的,不可变的消息序列. 只有当partition里面的file置换到磁盘文件以后,才开放给消费者来消费. 每一个partition是跨服务器地被复制到其他地方,为了容错的目的. 这个partition可以理解为hadoop中block的单位. 但是只有被选择为leader的服务器partition来服务消费者的读和生产者的写, followers只是把数据同步过去.同步状态较好的被列入ISR,这些IS

Kafka深度解析(如何在producer中指定partition)(转)

原文链接:Kafka深度解析 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输 同时支持离线数据处理和实时数据处理 为什么要用消息系统 解耦在项目启动之初来预测将来项目会碰到

kafka 0.10.2 消息生产者(producer)

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka

Kafka 详解(三)------Producer生产者

在第一篇博客我们了解到一个kafka系统,通常是生产者Producer 将消息发送到 Broker,然后消费者 Consumer 去 Broker 获取,那么本篇博客我们来介绍什么是生产者Producer. 1.生产者概览 我们知道一个系统在运行过程中会有很多消息产生,比如前面说的对于一个购物网站,通常会记录用户的活动,网站的运行度量指标以及一些日志消息等等,那么产生这些消息的组件我们都可以称为生产者. 而对于生产者产生的消息重要程度又有不同,是否都很重要不允许丢失,是否允许丢失一部分?以及是否

Kafka详细原理总结

Kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统.低延迟的实时系统.storm/Spark流式处理引擎,web/nginx日志.访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目. 1.前言 消息队列的性能好坏

Kafka详细原理

Kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统.低延迟的实时系统.storm/Spark流式处理引擎,web/nginx日志.访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目. 1.前言 消息队列的性能好坏

kafka工作原理介绍

两张图读懂kafka应用: Kafka 中的术语 broker:中间的kafka cluster,存储消息,是由多个server组成的集群. topic:kafka给消息提供的分类方式.broker用来存储不同topic的消息数据. producer:往broker中某个topic里面生产数据. consumer:从broker中某个topic获取数据. Kafka 中的术语设计: 1.Broker 中间的kafka cluster,存储消息,是由多个server组成的集群. 2.topic与消

kafka设计原理介绍

背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输 同时支持离线数据处理和实时数据处理 为什么要用消息系统 解耦在项目启动之初来预测将来项目会碰到什么需求,是极其困难的.消息队