第15课:Spark Streaming源码解读之No Receivers彻底思考

本期内容:

  • Direct Access
  • Kafka

前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读。但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势:

1. 更强的控制自由度

2. 语义一致性

其实No Receivers的方式更符合我们读取数据,操作数据的思路的。因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式。 如果要操作数据来源,肯定要有一个封装器,这个封装器一定是RDD类型。 以直接访问Kafka中的数据为例:

object DirectKafkaWordCount {  def main(args: Array[String]) {    val Array(brokers, topics) = args    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")    val ssc = new StreamingContext(sparkConf, Seconds(2))    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)    val words = lines.flatMap(_.split(" "))    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Spark Streaming会封装一个KafkaRDD:

/** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */private[kafka]class KafkaRDD[  K: ClassTag,  V: ClassTag,  U <: Decoder[_]: ClassTag,  T <: Decoder[_]: ClassTag,  R: ClassTag] private[spark] (    sc: SparkContext,    kafkaParams: Map[String, String],    val offsetRanges: Array[OffsetRange],    leaders: Map[TopicAndPartition, (String, Int)],    messageHandler: MessageAndMetadata[K, V] => R
  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {  override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
    }.toArray
  }
...  override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {    val part = thePart.asInstanceOf[KafkaRDDPartition]
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))    if (part.fromOffset == part.untilOffset) {
      log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
        s"skipping ${part.topic} ${part.partition}")      Iterator.empty
    } else {      new KafkaRDDIterator(part, context)
    }
  }

RDD中重要的方法 getPartitions 和 compute 其中compute中返回了一个 KafkaRDDIterator:

private class KafkaRDDIterator(      part: KafkaRDDPartition,      context: TaskContext) extends NextIterator[R] {    val kc = new KafkaCluster(kafkaParams)

...    private def fetchBatch: Iterator[MessageAndOffset] = {      val req = new FetchRequestBuilder()
        .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
        .build()      val resp = consumer.fetch(req)
      handleFetchErr(resp)      // kafka may return a batch that starts before the requested offset
      resp.messageSet(part.topic, part.partition)
        .iterator
        .dropWhile(_.offset < requestOffset)
    }    override def close(): Unit = {      if (consumer != null) {
        consumer.close()
      }
    }    override def getNext(): R = {      if (iter == null || !iter.hasNext) {
        iter = fetchBatch
      }      if (!iter.hasNext) {
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {        val item = iter.next()        if (item.offset >= part.untilOffset) {
          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
          finished = true
          null.asInstanceOf[R]
        } else {
          requestOffset = item.nextOffset
          messageHandler(new MessageAndMetadata(
            part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
        }
      }
    }
  }

其中会调用KafkaCluster的connect方法:

org/apache/spark/streaming/kafka/KafkaCluster.scala  def connect(host: String, port: Int): SimpleConsumer =
    new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)

KafkaCluster的connect方法返回了一个 SimpleConsumer,如果想自定义控制kafka消息的消费,则可自定义Kafka的consumer。

我们再回过头看看:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

实际生成了什么:

 def createDirectStream[    K: ClassTag,    V: ClassTag,    KD <: Decoder[K]: ClassTag,    VD <: Decoder[V]: ClassTag,    R: ClassTag] (      ssc: StreamingContext,      kafkaParams: Map[String, String],      fromOffsets: Map[TopicAndPartition, Long],      messageHandler: MessageAndMetadata[K, V] => R
  ): InputDStream[R] = {    val cleanedHandler = ssc.sc.clean(messageHandler)    new DirectKafkaInputDStream[K, V, KD, VD, R]
      ssc, kafkaParams, fromOffsets, cleanedHandler)
  }

生成了一个DirectKafkaInputDStream:

org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }    val description = offsetRanges.filter { offsetRange =>
      // Don‘t display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(      "offsets" -> offsetRanges.toList,      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)
  }

这里面即产生了KafkaRDD实例。

我们再重新思考有Receiver和No Receiver的Spark Streaming应用 Direct访问的好处:

1. 不需要缓存,不会出现OOM等问题(数据缓存在Kafka中)

2. 如果采用Receiver的方式,Receiver和Worker上Executor绑定了,不方便做分布式(配置一下也可以做)。如果采用Direct的方式,直接是RDD操作,数据默认分布在多个Executor上,天然就是分布式的。

3. 数据消费的问题,在实际操作的时候,如果采用Receiver的方式,如果数据操作来不及消费,Delay多次之后,Spark Streaming程序有可能崩溃。如果是Direct的方式,就不会。

4. 完全的语义一致性,不会重复消费,且只被消费一次。

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-10-05 11:22:34

第15课:Spark Streaming源码解读之No Receivers彻底思考的相关文章

15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

hu本期内容: 1.Kafka解密 背景: 目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性.No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的. 所以Spark Streaming就产生了自定义RDD –> KafkaRDD. 源码分析: 1.KafkaRDD源码 private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decode

Spark Streaming源码解读之No Receivers彻底思考

本期内容 : Direct Acess Kafka Spark Streaming接收数据现在支持的两种方式: 01. Receiver的方式来接收数据,及输入数据的控制 02. No Receiver的方式 以上两种方式中,No Receiver的方式更符合读取.操作数据的思路,Spark作为一个计算框架他的底层有数据来源,也就是直接操作数据来源中的数据, 如果操作数据来源的话肯定需要一个封装器,这个封装的类型一定是RDD的封装类型,Spark Streaming为了封装类型推出了自定义的RD

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

本节的主要内容: 一.数据接受架构和设计模式 二.接受数据的源码解读 Spark Streaming不断持续的接收数据,具有Receiver的Spark 应用程序的考虑. Receiver和Driver在不同进程,Receiver接收数据后要不断给Deriver汇报. 因为Driver负责调度,Receiver接收的数据如果不汇报给Deriver,Deriver调度时不会把接收的数据计算入调度系统中(如:数据ID,Block分片). 思考Spark Streaming接收数据: 不断有循环器接收

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

本期内容: 1.updateStateByKey解密 2.mapWithState解密 背景:整个Spark Streaming是按照Batch Duractions划分Job的.但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD, 所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成

(版本定制)第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本期内容: 1.Receiver启动方式的设想 2.Receiver启动源码彻底分析 一:Receiver启动方式的设想 1. Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job. 2. Receiver是在Spark Streaming应用程序启动时启动的,那么我们找Receiver在哪里启动就应该去找Spark Streaming的启动. 3. Receiv

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1.Spark Streaming元数据清理详解 2.Spark Streaming元数据清理源码解析 一.如何研究Spark Streaming元数据清理 操作DStream的时候会产生元数据,所以要解决RDD的数据清理工作就一定要从DStream入手.因为DStream是RDD的模板,DStream之间有依赖关系. DStream的操作产生了RDD,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的.由此,DStream负责RDD的整个