SBT 构建 spark streaming集成kafka (scala版本)

前言:

    
     最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家!

环境准备:

        操作系统 : ubuntu14.04 LTS

      hadoop 2.7.1   伪分布式搭建

      sbt-0.13.9

      kafka_2.11-0.8.2.2

      spark-1.3.1-bin-hadoop2.6

      scala 版本 : 2.10.4

     

      注: 请重视版本问题,之前作者用的是spark-1.4.1 ,scala版本是2.11.7  结果作业提交至spark-submit 总是失败,所以大家这点注意下!

     

hadooop 2.7.1 伪分布式搭建 大家可以参照 http://www.wjxfpf.com/2015/10/517149.html

    kafka安装与测试:        

  1. 到官网http://kafka.apache.org/downloads.html 下载 kafka_2.11-0.8.2.2.tgz    
  2. 进入下载目录,打开终端,输入以下命令,将其解压至 /usr/local 目录: sudo tar -xvzf   kafka_2.11-0.8.2.2.tgz -C /usr/local
  3. 敲入用户密码后,kafka 成功解压,继续输入以下命令:
    1. cd  /usr/local    跳转至/usr/local/ 目录;
    2. sudo  chmod 777 -R  kafka_2.11-0.8.2.2   获得该目录的所有执行权;  gedit  ~/.bashrc  打开个人配置 末尾添加 export KAFKA_HOME=/usr/local/kafka_2.11-0.8.2.2 
      export PATH=$PATH:$KAFKA_HOME/bin
    3. 保存,终端输入 source ~/.bashrc

kafka 有其自带默认的zookeeper 所以省去了我们一些功夫,现在可以开始测试下kafka:

  • 新建终端输入 cd $KAFKA_HOME 进入kafka 目录    (为了方便,我们称此终端为1号终端)
  • bin/zookeeper-server-start.sh config/zookeeper.properties &   后台运行zookeeper
  • bin/kafka-server-start.sh  config/server.properties & 后台启动kafka-server 
  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test    新建一个叫test的topic
  • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test   Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
  • 开启一个新的终端(为了方便,我们称此终端为2号终端),并进入kafka 目录 ,输入:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  • 现在,在1号终端输入HAHA,如果2号终端能输出HAHA,说明kafka测试成功!

SBT构建一个关于单词计数的scala程序

  • 新建一个文件夹,命名为spark_kafka
  • 进入spark_kafka,按/src/main/scala/KafkaDemo.scala层级目录   新建KafkaDemo.scala
  • 在spark_kafka目录下 新建project 目录 在project下新建plugins.sbt
  • 在spark_kafka目录下新建assembly.sbt
  • 最后,你所看到的目录结构如下 
      • spark_kafka/
      • spark_kafka/src
      • spark_kafka/src/main
      • spark_kafka/src/main/scala
      • spark_kafka/src/main/scala/KafkaDemo.scala
      • spark_kafka/project
      • spark_kafka/project/plugins.sbt
      • spark_kafka/assembly.sbt

其中,KafkaDemo.scala 代码如下

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object KafkaDemo {
    def main(args: Array[String]) {
  val zkQuorum = "127.0.0.1:2181"
  val group = "test-consumer-group"
  val topics = "test"
  val numThreads = 2
  val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
  val ssc =  new StreamingContext(sparkConf, Seconds(10))
  ssc.checkpoint("checkpoint")

  val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
  val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
  val words = lines.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts.print()
  ssc.start()
  ssc.awaitTermination()
  }
}

assmebly.sbt 代码如下

name := "KafkaDemo"
version := "1.0"
scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  ("org.apache.spark" %% "spark-core" % "1.3.1" % "provided")
)

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.3.1" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0"

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case PathList("org", "apache", xs @ _*)         => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
    case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
    case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
    case x => old(x)
  }
}

resolvers += "OSChina Maven Repository" at "http://maven.oschina.net/content/groups/public/"

externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)

  

plugins.sbt 内容如下:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

请大家注意 :

  

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "axiom.xml" => MergeStrategy.filterDistinctLines
case PathList(ps @ _*) if ps.last endsWith "Log$Logger.class" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith "ILoggerFactory.class" => MergeStrategy.first
case x => old(x)
}
}

这段代码只是针对我本机的解决依赖冲突的方法,如果没有这段代码,那么我打包的时候会有依赖冲突的发生,原因是不同包下有相同的类,解决的方法是合并依赖,下面是贴出没有这段代码的错误:

[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /home/hadoop/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.0.jar:org/apache/spark/unused/UnusedStubClass.class
[error] /home/hadoop/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class

大家注意红色高亮的代码,当大家发生其他依赖冲突的时候,可以照猫画虎,解决依赖冲突

接下来,就是在较好的网络环境下进行打包,终端进入spark_kafka 目录 ,敲入sbt assembly , 耐心等代下载打包

spark streaming 对接 kafka 生产消息端口

  • 启动hadoop
  • 后台启动kafka zookeeper 和 server 端
  • 启动producer 命令行(后续通过输入字符串,spark对其进行单词计数处理)
  • 新建终端进入spark_kafka 目录,输入

    $SPARK_HOME/bin/spark-submit --class "KafkaDemo" target/scala-2.10/KafkaDemo-assembly-1.0.jar

    (打包成功的话,会有一个target 目录,而且target下有scala-2.10/KafkaDemo-assembly-1.0.jar ) 。

  • 然后在producer 输入一系列字符串,spark streaming会进行处理

如果能看到该结果,那就恭喜你了。

弄这个其实弄了有一段时间,主要问题是依赖的解决,以及版本的问题。如果大家在做的过程发现出现有scala :no such method...    等问题的时候,说明是scala版本不符合了

其他的问题大家可以谷歌,此外强调一点,以上命令跟我个人目录环境有关,比如$SPARK_HOME代表我自己的spark 路径,如果你的目录跟我不一样,自己要换一换;

此文是面向有linux基础的同学,懂基本环境配置,这是最起码的要求!此文也给自己,毕竟确实辛苦!

时间: 2024-12-25 11:45:00

SBT 构建 spark streaming集成kafka (scala版本)的相关文章

spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Spark streaming集成kafka是企业应用中最为常见的一种场景. 一.安装kafka 参考文档: http://kafka.apache.org/quickstart#quickstart_createtopic 1.安

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

第89课:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

Spark Streaming和Kafka整合开发指南(一)

Apache Kafka是一个分布式的消息发布-订阅系统.可以说,任何实时大数据处理工具缺少与Kafka整合都是不完整的.本文将介绍如何使用Spark Streaming从Kafka中接收数据,这里将会介绍两种方法:(1).使用Receivers和Kafka高层次的API:(2).使用Direct API,这是使用低层次的KafkaAPI,并没有使用到Receivers,是Spark 1.3.0中开始引入的.这两种方法有不同的编程模型,性能特点和语义担保.下文将会一一介绍. 基于Receiver

第89讲:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

Spark Streaming使用Kafka保证数据零丢失

来自: https://community.qingcloud.com/topic/344/spark-streaming使用kafka保证数据零丢失 spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 数据输入需要可靠的sources和可靠的receivers 应用metadata必须通过应用driver checkpoint WAL(write ahead log) 可靠的sources和receivers spark streaming可以通过

Spark Streaming 交互 Kafka的两种方式

一.Spark Streaming连Kafka(重点) 方式一:Receiver方式连:走磁盘 使用High Level API(高阶API)实现Offset自动管理,灵活性差,处理数据时,如果某一时刻数据量过大就会磁盘溢写,通过WALS(Write Ahead Logs)进行磁盘写入,0.10版本之后被舍弃, 相当于一个人拿着一个水杯去接水,水龙头的速度不定,水杯撑不下就会往盆(磁盘)中接. zookeeper自动管理偏移量 Receiver方式说明:Receiver会以固定的时间向kafka

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S