spark streaming 接收kafka消息之二 -- 运行在driver端的receiver

先从源码来深入理解一下 DirectKafkaInputDStream 的将 kafka 作为输入流时,如何确保 exactly-once 语义。

val stream: InputDStream[(String, String, Long)] = KafkaUtils.createDirectStream
      [String, String, StringDecoder, StringDecoder, (String, String, Long)](
        ssc, kafkaParams, fromOffsets,
        (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message(), mmd.offset))

对应的源码如下:

def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag,
    R: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      messageHandler: MessageAndMetadata[K, V] => R
  ): InputDStream[R] = {
    val cleanedHandler = ssc.sc.clean(messageHandler)
    new DirectKafkaInputDStream[K, V, KD, VD, R](
      ssc, kafkaParams, fromOffsets, cleanedHandler)
  }

DirectKafkaInputDStream 的类声明如下:

A stream of org.apache.spark.streaming.kafka.KafkaRDD where each given Kafka topic/partition corresponds to an RDD partition. The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number of messages per second that each partition will accept. Starting offsets are specified in advance, and this DStream is not responsible for committing offsets, so that you can control exactly-once semantics. For an easy interface to Kafka-managed offsets, see org.apache.spark.streaming.kafka.KafkaCluster

简言之,Kafka RDD 的一个流,每一个指定的topic 的每一个 partition 对应一个 RDD partition

在父类 InputDStream 中,对 compute 方法的解释如下:

Method that generates a RDD for the given time
对于给定的时间,生成新的Rdd

这就是生成RDD 的入口:

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
  // 1. 先获取这批次数据的 until offsets
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
// 2. 生成KafkaRDD 实例
  val rdd = KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

  // Report the record number and metadata of this batch interval to InputInfoTracker.
// 获取 该批次 的 offset 的范围
  val offsetRanges = currentOffsets.map { case (tp, fo) =>
    val uo = untilOffsets(tp) // 获取 until offset
    OffsetRange(tp.topic, tp.partition, fo, uo.offset)
  }
//3. 将当前批次的metadata和offset 的信息报告给 InputInfoTracker
  val description = offsetRanges.filter { offsetRange =>
    // Don‘t display empty ranges.
    offsetRange.fromOffset != offsetRange.untilOffset
  }.map { offsetRange =>
    s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
      s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
  }.mkString("\n")
  // Copy offsetRanges to immutable.List to prevent from being modified by the user
  val metadata = Map(
    "offsets" -> offsetRanges.toList,
    StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
  val inputInfo = StreamInputInfo(id, rdd.count, metadata)
  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
  // 4. 更新当前的 offsets
  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
  Some(rdd)
}

1.先获取这批次数据的 until offsets

详细分析 获取 leaderOffset 的步骤,即 latestLeaderOffsets 方法:

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {

  val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
  // Either.fold would confuse @tailrec, do it manually
  if (o.isLeft) { // left 代表 error
    val err = o.left.get.toString
    if (retries <= 0) {
      throw new SparkException(err)
    } else {
      log.error(err)
      Thread.sleep(kc.config.refreshLeaderBackoffMs)
      latestLeaderOffsets(retries - 1)
    }
  } else { // right 代表结果
    o.right.get
  }
}

分析 kc.getLatestLeaderOffsets(currentOffsets.keySet)
字段赋值语句:protected val kc = new KafkaCluster(kafkaParams)
即调用了 KafkaCluster的getLatestLeaderOffsets
调用栈如下:

def getLatestLeaderOffsets(
    topicAndPartitions: Set[TopicAndPartition]
  ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
  getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
// 调用了下面的方法:
def getLeaderOffsets(
    topicAndPartitions: Set[TopicAndPartition],
    before: Long
  ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
  getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
    r.map { kv =>
      // mapValues isnt serializable, see SI-7005
      kv._1 -> kv._2.head
    }
  }
}
// getLeaderOffsets 调用了下面的方法,用于获取leader 的offset,现在是最大的offset:
def getLeaderOffsets(
    topicAndPartitions: Set[TopicAndPartition],
    before: Long,
    maxNumOffsets: Int
  ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
// 获取所有的partition 的leader的 host和 port 信息
  findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
    // tp -> (l.host -> l.port) ==> (l.host -> l.port) ->seq[tp]
val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
// 所有的leader 的 连接方式
    val leaders = leaderToTp.keys
    var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
    val errs = new Err
// 通过leader 获取每一个 leader的offset,现在是最大的 offset
    withBrokers(leaders, errs) { consumer =>
      val partitionsToGetOffsets: Seq[TopicAndPartition] =
        leaderToTp((consumer.host, consumer.port))
      val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
        tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
      }.toMap
      val req = OffsetRequest(reqMap)
      val resp = consumer.getOffsetsBefore(req)
      val respMap = resp.partitionErrorAndOffsets
      partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
          if (por.error == ErrorMapping.NoError) {
            if (por.offsets.nonEmpty) {
              result += tp -> por.offsets.map { off =>
                LeaderOffset(consumer.host, consumer.port, off)
              }
            } else {
              errs.append(new SparkException(
                s"Empty offsets for ${tp}, is ${before} before log beginning?"))
            }
          } else {
            errs.append(ErrorMapping.exceptionFor(por.error))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }
    val missing = topicAndPartitions.diff(result.keySet)
    errs.append(new SparkException(s"Couldn‘t find leader offsets for ${missing}"))
    Left(errs)
  }
}
// 根据 TopicAndPartition 获取partition leader 的 host 和 port 信息
def findLeaders(
    topicAndPartitions: Set[TopicAndPartition]
  ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
  val topics = topicAndPartitions.map(_.topic)
// 获取给定topics集合的所有的partition 的 metadata信息
  val response = getPartitionMetadata(topics).right
// 获取所有的partition 的 leader 的 host 和port 信息
  val answer = response.flatMap { tms: Set[TopicMetadata] =>
    val leaderMap = tms.flatMap { tm: TopicMetadata =>
      tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
        val tp = TopicAndPartition(tm.topic, pm.partitionId)
        if (topicAndPartitions(tp)) {
          pm.leader.map { l =>
            tp -> (l.host -> l.port)
          }
        } else {
          None
        }
      }
    }.toMap

    if (leaderMap.keys.size == topicAndPartitions.size) {
      Right(leaderMap)
    } else {
      val missing = topicAndPartitions.diff(leaderMap.keySet)
      val err = new Err
      err.append(new SparkException(s"Couldn‘t find leaders for ${missing}"))
      Left(err)
    }
  }
  answer
}
// 获取给定的 topic集合的所有partition 的metadata 信息
def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
// 创建TopicMetadataRequest对象
  val req = TopicMetadataRequest(
    TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
  val errs = new Err
// 随机打乱 broker-list的顺序
  withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
    val resp: TopicMetadataResponse = consumer.send(req)
    val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)

    if (respErrs.isEmpty) {
      return Right(resp.topicsMetadata.toSet)
    } else {
      respErrs.foreach { m =>
        val cause = ErrorMapping.exceptionFor(m.errorCode)
        val msg = s"Error getting partition metadata for ‘${m.topic}‘. Does the topic exist?"
        errs.append(new SparkException(msg, cause))
      }
    }
  }
  Left(errs)
}
// Try a call against potentially multiple brokers, accumulating errors
private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
  (fn: SimpleConsumer => Any): Unit = {
//这里虽然是一个 foreach循环,但一旦获取到metadata,就返回,之所以使用一个foreach循环,是为了增加重试次数,// 防止kafka cluster 的单节点宕机,除此之外,还设计了 单节点的多次重试机制。只不过是循环重试,即多个节点都访问完后,// 再sleep 200ms(默认),然后再进行下一轮访问,可以适用于节点瞬间服务不可用情况。
  brokers.foreach { hp =>
    var consumer: SimpleConsumer = null
    try {
// 获取SimpleConsumer 的连接
      consumer = connect(hp._1, hp._2)
      fn(consumer) // 发送请求并获取到partition 的metadata
/* fn 即 后面定义的
consumer =>
    val resp: TopicMetadataResponse = consumer.send(req)
    val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)

    if (respErrs.isEmpty) {
      return Right(resp.topicsMetadata.toSet)
    } else {
      respErrs.foreach { m =>
        val cause = ErrorMapping.exceptionFor(m.errorCode)
        val msg = s"Error getting partition metadata for ‘${m.topic}‘. Does the topic exist?"
        errs.append(new SparkException(msg, cause))
      }
    }
  }
  Left(errs)
*/
    } catch {
      case NonFatal(e) =>
        errs.append(e)
    } finally {
      if (consumer != null) {
        consumer.close()
      }
    }
  }
}

private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
  m.groupBy(_._2).map { kv =>
    kv._1 -> kv._2.keys.toSeq
  }

然后,根据获取的 每一个 partition的leader 最大 offset 来,确定每一个partition的 until offset,即clamp 函数的功能:

// limits the maximum number of messages per partition
protected def clamp(
  leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
  maxMessagesPerPartition.map { mmp =>
    leaderOffsets.map { case (tp, lo) =>
// 评估的until offset = 当前offset + 评估速率
// 从 每一个topic partition leader 的最大offset 和 评估的 until offset 中选取较小值作为 每一个 topic partition 的 until offset
      tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
    }
  }.getOrElse(leaderOffsets) // 如果是第一次获取数据,并且没有设置spark.streaming.kafka.maxRatePerPartition 参数,则会返回 每一个 leader 的最大大小
}

protected def maxMessagesPerPartition: Option[Long] = {
// rateController 是负责评估流速的
  val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
// 所有的 topic 分区数
  val numPartitions = currentOffsets.keys.size
  // 获取当前的流处理速率
  val effectiveRateLimitPerPartition = estimatedRateLimit
    .filter(_ > 0) // 过滤掉非正速率
    .map { limit =>
// 通过spark.streaming.kafka.maxRatePerPartition设置这个参数,默认是0
      if (maxRateLimitPerPartition > 0) {
// 从评估速率和设置的速率中取一个较小值
        Math.min(maxRateLimitPerPartition, (limit / numPartitions))
      } else { // 如果没有设置,评估速率 / 分区数
        limit / numPartitions
      }
    }.getOrElse(maxRateLimitPerPartition) // 如果速率评估率不起作用时,使用设置的速率,如果不设置是 0

  if (effectiveRateLimitPerPartition > 0) { // 如果每一个分区的有效速率大于0
    val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
// 转换成每ms的流速率
    Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
  } else {
    None
  }
}

第二步: 生成KafkaRDD

KafkaRDD 伴生对象的 apply 方法:

def apply[
  K: ClassTag,
  V: ClassTag,
  U <: Decoder[_]: ClassTag,
  T <: Decoder[_]: ClassTag,
  R: ClassTag](
    sc: SparkContext,
    kafkaParams: Map[String, String],
    fromOffsets: Map[TopicAndPartition, Long],
    untilOffsets: Map[TopicAndPartition, LeaderOffset],
    messageHandler: MessageAndMetadata[K, V] => R
  ): KafkaRDD[K, V, U, T, R] = {
// 从 untilOffsets 中获取 TopicAndPartition 和 leader info( host, port) 的映射关系
  val leaders = untilOffsets.map { case (tp, lo) =>
      tp -> (lo.host, lo.port)
  }.toMap

  val offsetRanges = fromOffsets.map { case (tp, fo) =>
// 根据 fromOffsets 和 untilOffset ,拼接成OffsetRange 对象
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
  }.toArray
  // 返回 KafkaRDD class 的实例
  new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
}

先看KafkaRDD 的解释:

A batch-oriented interface for consuming from Kafka.
Starting and ending offsets are specified in advance,
so that you can control exactly-once semantics.
从kafka 消费的针对批处理的API,开始和结束 的 offset 都提前设定了,所以我们可以控制exactly-once 的语义。

重点看 KafkaRDD 的 compute 方法,它以分区作为参数:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
  if (part.fromOffset == part.untilOffset) { // 如果 from offset == until offset,返回一个空的迭代器对象
    log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
      s"skipping ${part.topic} ${part.partition}")
    Iterator.empty
  } else {
    new KafkaRDDIterator(part, context)
  }
}

KafkaRDDIterator的源码如下,首先这个类比较好理解,因为只重写了两个非private 方法,close和 getNext, close 是用于关闭 SimpleConsumer 实例的(主要用于关闭socket 连接和 用于读response和写request的blockingChannel),getNext 是用于获取数据的

类源码如下:

private class KafkaRDDIterator(
    part: KafkaRDDPartition,
    context: TaskContext) extends NextIterator[R] {

  context.addTaskCompletionListener{ context => closeIfNeeded() }

  log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
    s"offsets ${part.fromOffset} -> ${part.untilOffset}")
 // KafkaCluster 是与 kafka cluster通信的client API
  val kc = new KafkaCluster(kafkaParams)
// kafka 消息的 key 的解码器
// classTag 是scala package 下的 package object – reflect定义的一个classTag方法,该方法返回一个 ClassTag 对象,// 该对象中 runtimeClass 保存了运行时被擦除的范型Class对象, Decoder 的实现类都有一个 以VerifiableProperties // 变量作为入参的构造方法。获取到构造方法后,利用反射实例化具体的Decoder实现对象,然后再向上转型为 Decoder
  val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    .newInstance(kc.config.props)
    .asInstanceOf[Decoder[K]]
// kafka 消息的 value 的解码器
  val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    .newInstance(kc.config.props)
    .asInstanceOf[Decoder[V]]
  val consumer = connectLeader
  var requestOffset = part.fromOffset
  var iter: Iterator[MessageAndOffset] = null

  // The idea is to use the provided preferred host, except on task retry atttempts,
  // to minimize number of kafka metadata requests
  private def connectLeader: SimpleConsumer = {
    if (context.attemptNumber > 0) {
// 如果重试次数大于 0, 则允许重试访问--bootstrap-server 列表里的所有 broker,一旦获取到 topic 的partition 的leader 信息,则马上返回
      kc.connectLeader(part.topic, part.partition).fold(
        errs => throw new SparkException(
          s"Couldn‘t connect to leader for topic ${part.topic} ${part.partition}: " +
            errs.mkString("\n")),
        consumer => consumer
      )
    } else {
      kc.connect(part.host, part.port)
    }
  }
 // 在fetch数据失败时所做的操作,无疑,这是一个hook 函数
  private def handleFetchErr(resp: FetchResponse) {
    if (resp.hasError) {
      val err = resp.errorCode(part.topic, part.partition)
      if (err == ErrorMapping.LeaderNotAvailableCode ||
        err == ErrorMapping.NotLeaderForPartitionCode) {
        log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
          s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
        Thread.sleep(kc.config.refreshLeaderBackoffMs)
      }
      // Let normal rdd retry sort out reconnect attempts
      throw ErrorMapping.exceptionFor(err)
    }
  }
  //注意此时的 返回结果是MessageAndOffset(Message(ByteBuffer)和 offset) 的迭代器
  private def fetchBatch: Iterator[MessageAndOffset] = {
// 首先,见名之意,这是一个builder,作用就是构建一个FetchRequest 对象
    val req = new FetchRequestBuilder()
      .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
      .build()
// 调用 SimpleConsumer 的 fetch 方法,发送 FetchRequest 请求并获取返回的 topic 消息
    val resp = consumer.fetch(req)
// 查看是否有错误,如果有,则抛出一场,否则继续处理返回的消息
    handleFetchErr(resp)
    // kafka may return a batch that starts before the requested offset
// 因为网络延迟等原因,可能会获取到之前的发送的请求结果,此时的 offset 是小于当前的 offset 的,需要过滤掉
    resp.messageSet(part.topic, part.partition)
      .iterator
      .dropWhile(_.offset < requestOffset)
  }

  override def close(): Unit = {
    if (consumer != null) {
      consumer.close()
    }
  }
 // 我们重点看getNext 方法, 它的返回值 为R, 从KafkaUtils类中的初始化KafkaRDD 方法可以看出 R 其实是 <K,V>, 即会返回一个key 和 value的pair
  override def getNext(): R = {
    if (iter == null || !iter.hasNext) { // 第一次或者是已经消费完了
      iter = fetchBatch // 调用 fetchBatch 方法,获取得到MessageAndOffset的迭代器
    }
    if (!iter.hasNext) { // 如果本批次没有数据需要处理或者本批次内还有所有数据均被处理,直接修改标识位,返回null
      assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
      finished = true
      null.asInstanceOf[R]
    } else {
      val item = iter.next() // 获取下一个 MessageAndOffset 对象
      if (item.offset >= part.untilOffset) { // 如果返回的消息大于等于本批次的until offset,则会返回 null
        assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
        finished = true
        null.asInstanceOf[R]
      } else { // 获取的 MessageAndOffse的Offset 大于等于 from offset并且小于 until offset
        requestOffset = item.nextOffset // 需要请求 kafka cluster 的消息是本条消息的下一个offset对应的消息
// MessageAndMetadata 是封装了单条消息的相关信息,包括 topic, partition, 对应的消息ByteBuffer,消息的offset,key解码器,value解码类
// messageHandler 是一个回调方法, 对应了本例中的(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message(), mmd.offset)代码
        messageHandler(new MessageAndMetadata(
          part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
      }
    }
  }
}
有如下问题:
1.这个类是如何接收 kafka 的消息的?
通过KafkaRDD来获取单批次的数据的,KafkaRDD的compute方法返回一个迭代器,这个迭代器封装了kafka partition数据的批量抓取以及负责调用传入的消息处理回调函数并将单条处理结果返回。
其中,spark streaming 的exactly-once 消费机制是通过 KafkaRDD 来保证的,在创建KafkaRDD之前,就已经通过 currentOffset和 估算出的速率,以及每个分区的自定义最大抓取速率,和从partition的leader获取的最大offset,确定分区untilOffset的值,最终fromOffset和untilOffset构成OffsetRange,在KafkaRDD中生成的迭代器中会丢弃掉offset不在该OffsetRange内的数据,最终调用用户传入的消息处理函数,处理数据成用户想要的数据格式。
2.这个类是如何将单个partition的消息转换为 RDD单个partition的数据的?
KafkaRDD 的compute 方法 以 partition 作为参数,这个partition是 KafkaRDDPartition 的实例, 包含了分区消息的 offset range,topic, partition 等信息,该方法会返回一个KafkaRDDIterat,该类提供了访问 该分区内kafka 数据的 数据,内部通过SimpleConsumer 来从leader 节点来批量获取数据,然后再从批量数据中获取我们想要的数据(由offset range来保证)。
3.这个类是如何估算 kafka 消费速率的?
提供了 PIDRateEstimator 类, 该类通过传入batch 处理结束时间,batch 处理条数, 实际处理时间和 batch 调度时间来估算速率的。
4.这个类是如何做WAL 的?这个类做不了 WAL


原文地址:https://www.cnblogs.com/johnny666888/p/11080499.html

时间: 2024-11-09 09:40:44

spark streaming 接收kafka消息之二 -- 运行在driver端的receiver的相关文章

spark streaming 接收kafka消息之一 -- 两种接收方式

源码分析的spark版本是1.6. 首先,先看一下 org.apache.spark.streaming.dstream.InputDStream 的 类说明: This is the abstract base class for all input streams. This class provides methods start() and stop() which is called by Spark Streaming system to start and stop receivi

spark streaming 接收 kafka 数据java代码WordCount示例

1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { pu

Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作: Kafka发送过来的数据格式为:id.name.cityId,分隔符为tab 1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3 MySQL的表city结构为:id int, name varchar 1 bj 2 sz 3 sh 本案例的结果为:select s.id, s.name, s.cityId, c.name from student s

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

官网文档:<http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example> Spark Streaming官网的例子reduceByKeyAndWindow 简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理. import java.util.Arrays; import org.apache.spark.S

第89课:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

第89讲:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S

Spark Streaming和Kafka整合开发指南(一)

Apache Kafka是一个分布式的消息发布-订阅系统.可以说,任何实时大数据处理工具缺少与Kafka整合都是不完整的.本文将介绍如何使用Spark Streaming从Kafka中接收数据,这里将会介绍两种方法:(1).使用Receivers和Kafka高层次的API:(2).使用Direct API,这是使用低层次的KafkaAPI,并没有使用到Receivers,是Spark 1.3.0中开始引入的.这两种方法有不同的编程模型,性能特点和语义担保.下文将会一一介绍. 基于Receiver