spark streaming从指定offset处消费Kafka数据

 spark streaming从指定offset处消费Kafka数据
2017-06-13 15:19 770人阅读 评论(2) 收藏 举报
 分类: spark(5)  

原文地址:http://blog.csdn.net/high2011/article/details/53706446

      首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢

一、情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offset,以便下次检查并且从指定的offset开始读取
二、环境:kafka-0.9.0、Spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16
三、实现代码:
      1、引入spark和kafka的相关依赖包
[html] view plain copy
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>  

    <groupId>com.ngaa</groupId>
    <artifactId>test-my</artifactId>
    <version>1.0-SNAPSHOT</version>
    <inceptionYear>2008</inceptionYear>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--add  maven release-->
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <!--scala版本-->
        <scala.version>2.10.5</scala.version>
        <!--测试机器上的scala版本-->
        <test.scala.version>2.11.7</test.scala.version>  

        <jackson.version>2.3.0</jackson.version>
        <!--slf4j版本-->
        <slf4j-version>1.7.20</slf4j-version>
        <!--cdh-spark-->
        <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version>
        <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version>
        <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version>
        <!--cdh-hadoop-->
        <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version>
        <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)-->
        <httpclient.version>4.2.5</httpclient.version>  

        <!--http copre-->
        <httpcore.version>4.2.5</httpcore.version>
        <!--fastjson-->
        <fastjson.version>1.1.39</fastjson.version>  

    </properties>  

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
        <!--配置依赖库地址(用于加载CDH依赖的jar包) -->
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>  

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>  

    <dependencies>  

        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
        <!--httpclient-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>${httpclient.version}</version>
        </dependency>  

        <!--http core-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>${httpcore.version}</version>
        </dependency>  

        <!--slf4j-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j-version}</version>
        </dependency>
        <!--hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.cdh.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.cdh.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.cdh.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--spark scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson.version}</version>
        </dependency>  

        <!--spark streaming和kafka的相关包-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>${spark.streaming.cdh.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>${kafka.spark.cdh.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>  

        <!--引入windows本地库的spark包-->
        <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-assembly_2.10</artifactId>
        <version>${spark.cdh.version}</version>
        <scope>system</scope>
        <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath>
        </dependency>  

        <!--引入测试环境linux本地库的spark包-->
        <!--<dependency>-->
            <!--<groupId>org.apache.spark</groupId>-->
            <!--<artifactId>spark-assembly_2.10</artifactId>-->
            <!--<version>${spark.cdh.version}</version>-->
            <!--<scope>system</scope>-->
            <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar-->
            <!--</systemPath>-->
        <!--</dependency>-->  

        <!--引入中央仓库的spark包-->
        <!--<dependency>-->
        <!--<groupId>org.apache.spark</groupId>-->
        <!--<artifactId>spark-assembly_2.10</artifactId>-->
        <!--<version>${spark.cdh.version}</version>-->
        <!--</dependency>-->  

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
            <version>2.6.0-cdh5.8.0</version>
        </dependency>  

    </dependencies>  

    <!--maven打包-->
    <build>
        <finalName>test-my</finalName>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.7</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <buildcommands>
                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <additionalProjectnatures>
                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                    </additionalProjectnatures>
                    <classpathContainers>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                    </classpathContainers>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <reporting>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </reporting>  

</project>  

 2、新建测试类
[java] view plain copy
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory  

/**
  * Created by yangjf on 2016/12/18
  * Update date:
  * Time: 11:10
  * Describle :从指定偏移量读取kafka数据
  * Result of Test:
  * Command:
  * Email: [email protected]
  */
object ReadBySureOffsetTest {
  val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass)  

  def main(args: Array[String]) {
    //设置打印日志级别
    Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    logger.info("测试从指定offset消费kafka的主程序开始")
    if (args.length < 1) {
      System.err.println("Your arguments were " + args.mkString(","))
      System.exit(1)
      logger.info("主程序意外退出")
    }
    //hdfs://hadoop1:8020/user/root/spark/checkpoint
    val Array(checkpointDirectory) = args
    logger.info("checkpoint检查:" + checkpointDirectory)
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(checkpointDirectory)
      })
    logger.info("streaming开始启动")
    ssc.start()
    ssc.awaitTermination()
  }  

  def createContext(checkpointDirectory: String): StreamingContext = {
    //获取配置
    val brokers = "hadoop3:9092,hadoop4:9092"
    val topics = "20161218a"  

    //默认为5秒
    val split_rdd_time = 8
    // 创建上下文
    val sparkConf = new SparkConf()
      .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]")
      .set("spark.app.id", "streaming_kafka")  

    val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time))  

    ssc.checkpoint(checkpointDirectory)  

    // 创建包含brokers和topic的直接kafka流
    val topicsSet: Set[String] = topics.split(",").toSet
    //kafka配置参数
    val kafkaParams: Map[String, String] = Map[String, String](
      "metadata.broker.list" -> brokers,
      "group.id" -> "apple_sample",
      "serializer.class" -> "kafka.serializer.StringEncoder"
//      "auto.offset.reset" -> "largest"   //自动将偏移重置为最新偏移(默认)
//      "auto.offset.reset" -> "earliest"  //自动将偏移重置为最早的偏移
//      "auto.offset.reset" -> "none"      //如果没有为消费者组找到以前的偏移,则向消费者抛出异常
    )
    /**
      * 从指定位置开始读取kakfa数据
      * 注意:由于Exactly  Once的机制,所以任何情况下,数据只会被消费一次!
      *      指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据
      */
    val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L))                          //指定topic,partition_no,offset
    val fromOffsets = setFromOffsets(offsetList)     //构建参数
    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata
   //使用高级API从指定的offset开始消费,欲了解详情,
   //请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看
    val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)  

    //数据操作
    messages.foreachRDD(mess => {
      //获取offset集合
      val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
      mess.foreachPartition(lines => {
        lines.foreach(line => {
          val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
          logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")
          logger.info(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}")
          logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")
          logger.info("The kafka  line is " + line)
        })
      })
    })
    ssc
  }  

  //构建Map
  def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
    var fromOffsets: Map[TopicAndPartition, Long] = Map()
    for (offset <- list) {
      val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数
      fromOffsets += (tp -> offset._3)           // offset位置
    }
    fromOffsets
  }
}  

四、参考文档:
    1、spark API  http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$
    2、Kafka官方配置说明:http://kafka.apache.org/documentation.html#configuration
    3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
    4、Spark streaming 消费遍历offset说明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html
    5、Kafka官方API说明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
注:以上测试通过,可以根据需要修改。如有疑问,请留言!
时间: 2024-10-14 05:36:03

spark streaming从指定offset处消费Kafka数据的相关文章

[Spark]Spark-streaming通过Receiver方式实时消费Kafka流程(Yarn-cluster)

1.启动zookeeper 2.启动kafka服务(broker) [[email protected] kafka_2.11-0.10.2.1]# ./bin/kafka-server-start.sh config/server.properties 3.启动kafka的producer(前提:已经创建好topic [[email protected] kafka_2.11-0.10.2.1]# ./bin/kafka-console-producer.sh --broker-list ma

SparkStreaming消费kafka数据

概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取.过滤.转换,然后存储到HDFS中. 实例代码 package com.fwmagic.test import com.alibaba.fastjson.{JSON, JSONException} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf imp

第96课: 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中

本期内容 技术实现解析 实现实战 SparkStreaming的DStream提供了一个dstream.foreachRDD方法,该方法是一个功能强大的原始的API,它允许将数据发送到外部系统.然而,重要的是要了解如何正确有效地使用这种原始方法.一些常见的错误,以避免如下: 写数据到外部系统,需要建立一个数据连接对象(例如TCP连接到远程的服务器),使用它将数据发送到外部存储系统.为此开发者可能会在Driver中尝试创建一个连接,然后在worker中使用它来保存记录到外部数据.例如如下scala

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

spark streaming 对接kafka记录

spark streaming 对接kafka 有两种方式: 参考: http://group.jobbole.com/15559/ http://blog.csdn.net/kwu_ganymede/article/details/50314901 Approach 1: Receiver-based Approach 基于receiver的方案: 这种方式使用Receiver来获取数据.Receiver是使用Kafka的高层次Consumer API来实现的.receiver从Kafka中获

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

Flink与Spark Streaming在与kafka结合的区别!

本文主要是想聊聊flink与kafka结合.当然,单纯的介绍flink与kafka的结合呢,比较单调,也没有可对比性,所以的准备顺便帮大家简单回顾一下Spark Streaming与kafka的结合. 看懂本文的前提是首先要熟悉kafka,然后了解spark Streaming的运行原理及与kafka结合的两种形式,然后了解flink实时流的原理及与kafka结合的方式. kafka kafka作为一个消息队列,在企业中主要用于缓存数据,当然,也有人用kafka做存储系统,比如存最近七天的数据.

Spark Streaming和Kafka集成深入浅出

写在前面 本文主要介绍Spark Streaming基本概念.kafka集成.Offset管理 本文主要介绍Spark Streaming基本概念.kafka集成.Offset管理 一.概述 Spark  Streaming顾名思义是spark的流式处理框架,是面向海量数据实现高吞吐量.高可用的分布式实时计算.关于spark的安装可以参考Spark入门.Spark Streaming并非像Storm那样是真正的流式计算,两者的处理模型在根本上有很大不同:Storm每次处理一条消息,更多详细信息可