spark streaming从指定offset处消费Kafka数据 2017-06-13 15:19 770人阅读 评论(2) 收藏 举报 分类: spark(5) 原文地址:http://blog.csdn.net/high2011/article/details/53706446 首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢 一、情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offset,以便下次检查并且从指定的offset开始读取 二、环境:kafka-0.9.0、Spark-1.6.0、jdk-1.7、Scala-2.10.5、idea16 三、实现代码: 1、引入spark和kafka的相关依赖包 [html] view plain copy <?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ngaa</groupId> <artifactId>test-my</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!--add maven release--> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <!--scala版本--> <scala.version>2.10.5</scala.version> <!--测试机器上的scala版本--> <test.scala.version>2.11.7</test.scala.version> <jackson.version>2.3.0</jackson.version> <!--slf4j版本--> <slf4j-version>1.7.20</slf4j-version> <!--cdh-spark--> <spark.cdh.version>1.6.0-cdh5.8.0</spark.cdh.version> <spark.streaming.cdh.version>1.6.0-cdh5.8.0</spark.streaming.cdh.version> <kafka.spark.cdh.version>1.6.0-cdh5.8.0</kafka.spark.cdh.version> <!--cdh-hadoop--> <hadoop.cdh.version>2.6.0-cdh5.8.0</hadoop.cdh.version> <!--http client必需要兼容CDH中的hadoop版本(cd /opt/cloudera/parcels/CDH/lib/hadoop/lib)--> <httpclient.version>4.2.5</httpclient.version> <!--http copre--> <httpcore.version>4.2.5</httpcore.version> <!--fastjson--> <fastjson.version>1.1.39</fastjson.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> <!--配置依赖库地址(用于加载CDH依赖的jar包) --> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!--httpclient--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>${httpclient.version}</version> </dependency> <!--http core--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>${httpcore.version}</version> </dependency> <!--slf4j--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j-version}</version> </dependency> <!--hadoop--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.cdh.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!--spark scala--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson.version}</version> </dependency> <!--spark streaming和kafka的相关包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.streaming.cdh.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${kafka.spark.cdh.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <!--引入windows本地库的spark包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-assembly_2.10</artifactId> <version>${spark.cdh.version}</version> <scope>system</scope> <systemPath>D:/crt_send_document/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar</systemPath> </dependency> <!--引入测试环境linux本地库的spark包--> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-assembly_2.10</artifactId>--> <!--<version>${spark.cdh.version}</version>--> <!--<scope>system</scope>--> <!--<systemPath>/opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar--> <!--</systemPath>--> <!--</dependency>--> <!--引入中央仓库的spark包--> <!--<dependency>--> <!--<groupId>org.apache.spark</groupId>--> <!--<artifactId>spark-assembly_2.10</artifactId>--> <!--<version>${spark.cdh.version}</version>--> <!--</dependency>--> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-web-proxy --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-server-web-proxy</artifactId> <version>2.6.0-cdh5.8.0</version> </dependency> </dependencies> <!--maven打包--> <build> <finalName>test-my</finalName> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.7</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project> 2、新建测试类 [java] view plain copy import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.slf4j.LoggerFactory /** * Created by yangjf on 2016/12/18 * Update date: * Time: 11:10 * Describle :从指定偏移量读取kafka数据 * Result of Test: * Command: * Email: [email protected] */ object ReadBySureOffsetTest { val logger = LoggerFactory.getLogger(ReadBySureOffsetTest.getClass) def main(args: Array[String]) { //设置打印日志级别 Logger.getLogger("org.apache.kafka").setLevel(Level.ERROR) Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR) Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) logger.info("测试从指定offset消费kafka的主程序开始") if (args.length < 1) { System.err.println("Your arguments were " + args.mkString(",")) System.exit(1) logger.info("主程序意外退出") } //hdfs://hadoop1:8020/user/root/spark/checkpoint val Array(checkpointDirectory) = args logger.info("checkpoint检查:" + checkpointDirectory) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { createContext(checkpointDirectory) }) logger.info("streaming开始启动") ssc.start() ssc.awaitTermination() } def createContext(checkpointDirectory: String): StreamingContext = { //获取配置 val brokers = "hadoop3:9092,hadoop4:9092" val topics = "20161218a" //默认为5秒 val split_rdd_time = 8 // 创建上下文 val sparkConf = new SparkConf() .setAppName("SendSampleKafkaDataToApple").setMaster("local[2]") .set("spark.app.id", "streaming_kafka") val ssc = new StreamingContext(sparkConf, Seconds(split_rdd_time)) ssc.checkpoint(checkpointDirectory) // 创建包含brokers和topic的直接kafka流 val topicsSet: Set[String] = topics.split(",").toSet //kafka配置参数 val kafkaParams: Map[String, String] = Map[String, String]( "metadata.broker.list" -> brokers, "group.id" -> "apple_sample", "serializer.class" -> "kafka.serializer.StringEncoder" // "auto.offset.reset" -> "largest" //自动将偏移重置为最新偏移(默认) // "auto.offset.reset" -> "earliest" //自动将偏移重置为最早的偏移 // "auto.offset.reset" -> "none" //如果没有为消费者组找到以前的偏移,则向消费者抛出异常 ) /** * 从指定位置开始读取kakfa数据 * 注意:由于Exactly Once的机制,所以任何情况下,数据只会被消费一次! * 指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据 */ val offsetList = List((topics, 0, 22753623L),(topics, 1, 327041L)) //指定topic,partition_no,offset val fromOffsets = setFromOffsets(offsetList) //构建参数 val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata //使用高级API从指定的offset开始消费,欲了解详情, //请进入"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$"查看 val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) //数据操作 messages.foreachRDD(mess => { //获取offset集合 val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges mess.foreachPartition(lines => { lines.foreach(line => { val o: OffsetRange = offsetsList(TaskContext.get.partitionId) logger.info("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++") logger.info(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") logger.info("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++") logger.info("The kafka line is " + line) }) }) }) ssc } //构建Map def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = { var fromOffsets: Map[TopicAndPartition, Long] = Map() for (offset <- list) { val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数 fromOffsets += (tp -> offset._3) // offset位置 } fromOffsets } } 四、参考文档: 1、spark API http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$ 2、Kafka官方配置说明:http://kafka.apache.org/documentation.html#configuration 3、Kafka SampleConsumer:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 4、Spark streaming 消费遍历offset说明:http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html 5、Kafka官方API说明:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 注:以上测试通过,可以根据需要修改。如有疑问,请留言!
时间: 2024-10-14 05:36:03