Spark createDirectStream 维护 Kafka offset(Scala)

createDirectStream方式需要自己维护offset,使程序可以实现中断后从中断处继续消费数据。

KafkaManager.scala

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset

import scala.reflect.ClassTag

/**
  * Created by knowpigxia on 15-8-5.
  */
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {

  private val kc = new KafkaCluster(kafkaParams)

  /**
    * 创建数据流
    * @param ssc
    * @param kafkaParams
    * @param topics
    * @tparam K
    * @tparam V
    * @tparam KD
    * @tparam VD
    * @return
    */
  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
                                                                                                            ssc: StreamingContext,
                                                                                                            kafkaParams: Map[String, String],
                                                                                                            topics: Set[String]): InputDStream[(K, V)] =  {
    val groupId = kafkaParams.get("group.id").get
    // 在zookeeper上读取offsets前先根据实际情况更新offsets
    setOrUpdateOffsets(topics, groupId)

    //从zookeeper上读取offset开始消费message
    val messages = {
      val partitionsE = kc.getPartitions(topics)
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft)
        throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
      val consumerOffsets = consumerOffsetsE.right.get
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }
    messages
  }

  /**
    * 创建数据流前,根据实际消费情况更新消费offsets
    * @param topics
    * @param groupId
    */
  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {
      var hasConsumed = true
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) hasConsumed = false
      if (hasConsumed) {// 消费过
        /**
          * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
          * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
          * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
          * 这时把consumerOffsets更新为earliestLeaderOffsets
          */
        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (earliestLeaderOffsetsE.isLeft)
          throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
        val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
        val consumerOffsets = consumerOffsetsE.right.get

        // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
        var offsets: Map[TopicAndPartition, Long] = Map()
        consumerOffsets.foreach({ case(tp, n) =>
          val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
          if (n < earliestLeaderOffset) {
            println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
              " offsets已经过时,更新为" + earliestLeaderOffset)
            offsets += (tp -> earliestLeaderOffset)
          }
        })
        if (!offsets.isEmpty) {
          kc.setConsumerOffsets(groupId, offsets)
        }
      } else {// 没有消费过
      val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
        if (reset == Some("smallest")) {
          val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        } else {
          val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        }
        val offsets = leaderOffsets.map {
          case (tp, offset) => (tp, offset.offset)
        }
        kc.setConsumerOffsets(groupId, offsets)
      }
    })
  }

  /**
    * 更新zookeeper上的消费offsets
    * @param rdd
    */
  def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      if (o.isLeft) {
        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }
}

  主程序中

def initKafkaParams = {
    Map[String, String](
      "metadata.broker.list" -> Constants.KAFKA_BROKERS,
      "group.id " -> Constants.KAFKA_CONSUMER_GROUP,
      "fetch.message.max.bytes" -> "20971520",
      "auto.offset.reset" -> "smallest"
    )
  } 

// kafka参数
val kafkaParams = initKafkaParams
val manager = new KafkaManager(kafkaParams)
val messageDstream = manager.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) 

// 更新offsets
manager.updateZKOffsets(rdd)

  

原文地址:https://www.cnblogs.com/zhangtianyuan/p/8483082.html

时间: 2024-10-29 15:49:33

Spark createDirectStream 维护 Kafka offset(Scala)的相关文章

SBT 构建 spark streaming集成kafka (scala版本)

前言: 最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家! 环境准备:    操作系统 : ubuntu14.04 LTS hadoop 2.7.1   伪分布式搭建 sbt-0.13.9 kafka_2.11-0.8.2.2 spark-1.3.1-bin-hadoop2.6 scala 版本 : 2.10.4 注: 请重视版本问题,

spark streaming从指定offset处消费Kafka数据

spark streaming从指定offset处消费Kafka数据 2017-06-13 15:19 770人阅读 评论(2) 收藏 举报 分类: spark(5) 原文地址:http://blog.csdn.net/high2011/article/details/53706446 首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢 一.情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

Maven组件如下: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version></dependency> 官网代码如下: pasting /* * Licensed to the Apache Software

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

spark streaming 对接kafka记录

spark streaming 对接kafka 有两种方式: 参考: http://group.jobbole.com/15559/ http://blog.csdn.net/kwu_ganymede/article/details/50314901 Approach 1: Receiver-based Approach 基于receiver的方案: 这种方式使用Receiver来获取数据.Receiver是使用Kafka的高层次Consumer API来实现的.receiver从Kafka中获

spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Spark streaming集成kafka是企业应用中最为常见的一种场景. 一.安装kafka 参考文档: http://kafka.apache.org/quickstart#quickstart_createtopic 1.安

Spark Streaming 交互 Kafka的两种方式

一.Spark Streaming连Kafka(重点) 方式一:Receiver方式连:走磁盘 使用High Level API(高阶API)实现Offset自动管理,灵活性差,处理数据时,如果某一时刻数据量过大就会磁盘溢写,通过WALS(Write Ahead Logs)进行磁盘写入,0.10版本之后被舍弃, 相当于一个人拿着一个水杯去接水,水龙头的速度不定,水杯撑不下就会往盆(磁盘)中接. zookeeper自动管理偏移量 Receiver方式说明:Receiver会以固定的时间向kafka

Spark Streaming整合Kafka

0)摘要 主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式.这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html). 1)Kafka准备 启动zookeeper ./zkServer.sh start 启动kafka ./kafka-server-star