apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557

producer的发送方式剖析

Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式。

sync架构图

async架构图

调用流程如下:

代码流程如下:

Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer、DefaultEventHandler。在创建的同时,会默认new一个ProducerPool,即我们每new一个java的Producer类,就会有创建Producer、EventHandler和ProducerPool,ProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。

调用producer.send方法流程:

当应用程序调用producer.send方法时,其内部其实调的是eventhandler.handle(message)方法,eventHandler会首先序列化该消息,

eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()

调用逻辑解释:当客户端应用程序调用producer发送消息messages时(既可以发送单条消息,也可以发送List多条消息),调用eventhandler.serialize首先序列化所有消息,序列化操作用户可以自定义实现Encoder接口,下一步调用partitionAndCollate根据topics的messages进行分组操作,messages分配给dataPerBroker(多个不同的Broker的Map),根据不同Broker调用不同的SyncProducer.send批量发送消息数据,SyncProducer包装了nio网络操作信息。

Producer的sync与async发送消息处理,大家看以上架构图一目了然。

partitionAndCollate方法详细作用:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上),

创建一个HashMap<int, Map<TopicAndPartition, List<KeyedMessage<K,Message>>>>,把messages按照brokerId分组组装数据,然后为SyncProducer分别发送消息作准备工作。

名称解释:partKey:分区关键字,当客户端应用程序实现Partitioner接口时,传入参数key为分区关键字,根据key和numPartitions,返回分区(partitions)索引。记住partitions分区索引是从0开始的。

Producer平滑扩容机制

如果开发过producer客户端代码,会知道metadata.broker.list参数,它的含义是kafak broker的ip和port列表,producer初始化时,就连接这几个broker,这时大家会有疑问,producer支持kafka cluster新增broker节点?它又没有监听zk broker节点或从zk中获取broker信息,答案是肯定的,producer可以支持平滑扩容broker,他是通过定时与现有的metadata.broker.list通信,获取新增broker信息,然后把新建的SyncProducer放入ProducerPool中。等待后续应用程序调用。

DefaultEventHandler类中初始化实例化BrokerPartitionInfo类,然后定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代码如下:

  def handle(events: Seq[KeyedMessage[K,V]]) {
    ......
    while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
      if (topicMetadataRefreshInterval >= 0 &&
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }
      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
      if (outstandingProduceRequests.size > 0) {
        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
        //休眠时间,多长时间刷新一次
        Thread.sleep(config.retryBackoffMs)
        // 生产者定期请求刷新最新topics的broker元数据信息
        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
        .....
      }
    }

  }

BrokerPartitionInfo的updateInfo方法代码如下:

  def updateInfo(topics: Set[String], correlationId: Int) {
    var topicsMetadata: Seq[TopicMetadata] = Nil
    //根据topics列表,meta.broker.list,其他配置参数,correlationId表示请求次数,一个计数器参数而已
    //创建一个topicMetadataRequest,并随机的选取传入的broker信息中任何一个去取metadata,直到取到为止
    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
    topicsMetadata = topicMetadataResponse.topicsMetadata
    // throw partition specific exception
    topicsMetadata.foreach(tmd =>{
      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
      if(tmd.errorCode == ErrorMapping.NoError) {
        topicPartitionInfo.put(tmd.topic, tmd)
      } else
        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
      tmd.partitionsMetadata.foreach(pmd =>{
        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
      })
    })
    producerPool.updateProducer(topicsMetadata)
  }

ClientUtils.fetchTopicMetadata方法代码:

  def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
    var fetchMetaDataSucceeded: Boolean = false
    var i: Int = 0
    val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
    var topicMetadataResponse: TopicMetadataResponse = null
    var t: Throwable = null
    val shuffledBrokers = Random.shuffle(brokers) //生成随机数
    while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
      //对随机选到的broker会创建一个SyncProducer
      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
      info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
      try {  //发送topicMetadataRequest到该broker去取metadata,获得该topic所对应的所有的broker信息
        topicMetadataResponse = producer.send(topicMetadataRequest)
        fetchMetaDataSucceeded = true
      }
      catch {
        ......
      }
    }
    if(!fetchMetaDataSucceeded) {
      throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
    } else {
      debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
    }
    return topicMetadataResponse
  }

ProducerPool的updateProducer

def updateProducer(topicMetadata: Seq[TopicMetadata]) {
    val newBrokers = new collection.mutable.HashSet[Broker]
    topicMetadata.foreach(tmd => {
      tmd.partitionsMetadata.foreach(pmd => {
        if(pmd.leader.isDefined)
          newBrokers+=(pmd.leader.get)
      })
    })
    lock synchronized {
      newBrokers.foreach(b => {
        if(syncProducers.contains(b.id)){
          syncProducers(b.id).close()
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
        } else
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
      })
    }
  }

当我们启动kafka broker后,并且大量producer和consumer时,经常会报如下异常信息。

[email protected]:/opt/soft$ Closing socket connection to 192.168.11.166

笔者也是经常很长时间看源码分析,才明白了为什么ProducerConfig配置信息里面并不要求使用者提供完整的kafka集群的broker信息,而是任选一个或几个即可。因为他会通过您选择的broker和topics信息而获取最新的所有的broker信息。

值得了解的是用于发送TopicMetadataRequest的SyncProducer虽然是用ProducerPool.createSyncProducer方法建出来的,但用完并不还回ProducerPool,而是直接Close.

重难点理解:

刷新metadata并不仅在第一次初始化时做。为了能适应kafka broker运行中因为各种原因挂掉、paritition改变等变化,

eventHandler会定期的再去刷新一次该metadata,刷新的间隔用参数topic.metadata.refresh.interval.ms定义,默认值是10分钟。

这里有三点需要强调:

  • 客户端调用send, 才会新建SyncProducer,只有调用send才会去定期刷新metadata
  • 在每次取metadata时,kafka会新建一个SyncProducer去取metadata,逻辑处理完后再close。
  • 根据当前SyncProducer(一个Broker的连接)取得的最新的完整的metadata,刷新ProducerPool中到broker的连接.
  • 每10分钟的刷新会直接重新把到每个broker的socket连接重建,意味着在这之后的第一个请求会有几百毫秒的延迟。如果不想要该延迟,把topic.metadata.refresh.interval.ms值改为-1,这样只有在发送失败时,才会重新刷新。Kafka的集群中如果某个partition所在的broker挂了,可以检查错误后重启重新加入集群,手动做rebalance,producer的连接会再次断掉,直到rebalance完成,那么刷新后取到的连接着中就会有这个新加入的broker。

说明:每个SyncProducer实例化对象会建立一个socket连接

特别注意:

在ClientUtils.fetchTopicMetadata调用完成后,回到BrokerPartitionInfo.updateInfo继续执行,在其末尾,pool会根据上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)

在ProducerPool中,SyncProducer的数目是由该topic的partition数目控制的,即每一个SyncProducer对应一个broker,内部封了一个到该broker的socket连接。每次刷新时,会把已存在SyncProducer给close掉,即关闭socket连接,然后新建SyncProducer,即新建socket连接,去覆盖老的。

如果不存在,则直接创建新的。

apache kafka源码分析走读-Producer分析

时间: 2024-12-22 15:13:32

apache kafka源码分析走读-Producer分析的相关文章

apache kafka源码project环境搭建(IDEA)

1.gradle安装 gradle安装 2.下载apache kafka源码 apache kafka下载 3.用gradle构建产生IDEAproject文件 先装好idea的scala插件,不然构建时就会自己主动下载,因为没有国内镜像.速度会非常慢. [email protected]:~/Downloads/kafka_2.10-0.8.1$ gradle idea 假设是eclipseproject,运行:gradle eclipse 生成IDEAproject文件例如以下: 4.项目导

apache kafka源码分析-Producer分析---转载

原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读1.Kafka提供了Producer类作为java producer的api,此类有几种发送方式?2.总结调用producer.send方法包含哪些流程?3.Producer难以理解的在什么地方? producer的发送方式剖析Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式.sync架构图 async架构图 调用流程如下:

Apache Kafka源码分析 - KafkaApis

kafka apis反映出kafka broker server可以提供哪些服务,broker server主要和producer,consumer,controller有交互,搞清这些api就清楚了broker server的所有行为 handleOffsetRequest 提供对offset的查询的需求,比如查询earliest,latest offset是什么,或before某个时间戳的offset是什么 try { // ensure leader exists // 确定是否是lead

Apache Kafka源码分析 - autoLeaderRebalanceEnable

在broker的配置中,auto.leader.rebalance.enable (false) 那么这个leader是如何进行rebalance的? 首先在controller启动的时候会打开一个scheduler, if (config.autoLeaderRebalanceEnable) { //如果打开outoLeaderRebalance,需要把partiton leader由于dead而发生迁徙的,重新迁徙回去 info("starting the partition rebalan

Kafka源码中的Producer Record定义

1.ProducerRecord 含义: 发送给Kafka Broker的key/value 值对 2.内部数据结构: -- Topic (名字) -- PartitionID ( 可选) -- Key[( 可选 ) -- Value 3.生产者记录(简称PR)的发送逻辑: <1> 若指定Partition ID,则PR被发送至指定Partition <2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition <3&g

Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢. 在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理.在Spark Streaming的处理机制中,不能多算,比较容易理解.那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢? 环境搭建 为了有一个感性的认识,先运行一下简单的Spark Streaming示例.首先确认已经安装了openbsd-netcat. 运行netcatnc -lk 9999 运行spark-s

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外

Kafka源码分析-序列2 -Producer -Metadata的数据结构与读取、更新策略

在上一篇,我们从使用方式和策略上,对消息队列做了一个宏观描述.从本篇开始,我们将深入到源码内部,仔细分析Kafka到底是如何实现一个分布式消息队列.我们的分析将从Producer端开始. 从Kafka 0.8.2开始,发布了一套新的Java版的client api, KafkaProducer/KafkaConsumer,替代之前的scala版的api.本系列的分析将只针对这套Java版的api. 多线程异步发送模型 下图是经过源码分析之后,整理出来的Producer端的架构图: 在上一篇我们讲

Apache Spark源码走读之7 -- Standalone部署方式分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有1到