SparkStreaming的实战案例

废话不多说,直接上干货!!!
相关依赖

  <properties>
    <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.3.2</spark.version>
    <hadoop.version>2.7.6</hadoop.version>
    <scala.compat.version>2.11</scala.compat.version>
  </properties>
      <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- sparkStreaming -->
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.scalikejdbc/scalikejdbc -->
    <dependency>
      <groupId>org.scalikejdbc</groupId>
      <artifactId>scalikejdbc_2.11</artifactId>
      <version>3.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.8.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>

(1)spark streaminging无状态计算的WordCount

编程架构

在某个节点上中启动nc -lk 9999,然后用作数据源。编写程序实现网络的wordcount。
代码实现

object NetWordCount {
    /**
      * 编程套路:
      * 1.获取编程入口,StreamingContext
      * 2.通过StreamingContext构建第一个DStream
      * 3.对DStream进行各种的transformation操作
      * 4.对于数据结果进行output操作
      * 5.提交sparkStreaming应用程序
      */
    def main(args: Array[String]): Unit = {
        //屏蔽日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)

        //1.获取编程入口,StreamingContext
       val conf= new SparkConf().setMaster("local[2]")
            .setAppName("NetWordCount")
        //第二个参数,表示批处理时长
        val ssc=new StreamingContext(conf,Seconds(2))

        /**
          * 2.通过StreamingContext构建第一个DStream(通过网络去读数据)
          * 第一个参数:主机名
          * 第二个参数:端口号
          */
        val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)

        //3.对DStream进行各种的transformation操作
        val wordDS: DStream[String] = ReceiverInputDStream.flatMap(msg => {
            msg.split("\\s+")
        })
        val wordCountDS: DStream[(String, Int)] = wordDS.map(word=>(word,1)).reduceByKey(_+_)
        //4.对于数据结果进行output操作,这里是打印输出
        wordCountDS.print()
        //5.提交sparkStreaming应用程序
        ssc.start()
        ssc.awaitTermination()
    }
}

使用nc -lk 9999在相应的节点上发出消息(每隔一个批处理时间发送一次),查看控制台打印:
batch1

batch2

结果发现:由于现在的操作时无状态的,所以每隔2s处理一次,但是每次的单词数不会统计,也就是说,只会统计当前批处理的单词,之前输入的则不会统计。


(2)spark streaminging有状态计算的WordCount

同样是wordCounte,这次要实现的效果是:到现在为止,统计过去时间段内的所有单词的个数。
代码

object UpdateStateBykeyWordCount {
    /**
      * 编程套路:
      * 1.获取编程入口,StreamingContext
      * 2.通过StreamingContext构建第一个DStream
      * 3.对DStream进行各种的transformation操作
      * 4.对于数据结果进行output操作
      * 5.提交sparkStreaming应用程序
      */
    def main(args: Array[String]): Unit = {
        //屏蔽日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)

        //1.获取编程入口,StreamingContext
        val conf = new SparkConf().setMaster("local[2]")
            .setAppName("NetWordCount")
        //第二个参数,表示批处理时长
        val ssc = new StreamingContext(conf, Seconds(2))
        ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
        /**
          * 2.通过StreamingContext构建第一个DStream(通过网络去读数据)
          * 第一个参数:主机名
          * 第二个参数:端口号
          */
        val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)

        //3.对DStream进行各种的transformation操作
        val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
            msg.split("\\s+")
        }).map(word=>(word,1))
        /**
          * updateStateByKey是状态更新函数,
          * updateFunc: (Seq[V], Option[S]) => Option[S]
          * (U,C)=>C
          * values:Seq[Int],state:Option[Int]==>Option[Int]
 *
          * @param values :新值
          * @param state :状态值
          * @return
          */
        val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
            Option(values.sum + state.getOrElse(0))
        })
        //4.对于数据结果进行output操作,这里是打印输出
        updateDS.print()
        //5.提交sparkStreaming应用程序
        ssc.start()
        ssc.awaitTermination()
    }
}

使用 nc -kl 9999

观察控制台:
batch1

batch2

发现:两次批处理的结果,进行了聚合,也就是所谓的有状态的计算。
注意
ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
上面这句代码一定要加,他会将上一次的批处理计算的结果保存起来,如果不加:
错误:requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().


(2)spark streaminging的HA

?? 在上述的updateStateByKey代码中如果当前程序运行异常时,会丢失数据(重启之后,找不回原来计算的数据),因为编程入口StreamingContext在代码重新运行的时候,是重新生成的,为了使程序在异常退出的时候,在下次启动的时候,依然可以获得上一次的StreamingContext对象,保证计算数据不丢失,此时就需要将StreamingContext对象存储在持久化的系统中。也就是说需要制作StreamingContext对象的HA。
代码

object WC_DriverHA {
    def main(args: Array[String]): Unit = {
        //屏蔽日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
        /**
          * StreamingContext.getOrCreate()
          * 第一个参数:checkpointPath,和下面方法中的checkpointPath目录一致
          * 第二个参数:creatingFunc: () => StreamingContext:用于创建StreamingContext对象
          * 最终使用StreamingContext.getOrCreate()可以实现StreamingContext对象的HA,保证在程序重新运行的时候,之前状态仍然可以恢复
          */

       val ssc= StreamingContext.getActiveOrCreate("C:\\z_data\\checkPoint\\checkPoint_HA",functionToCreateContext)
        ssc.start()
        ssc.awaitTermination()
    }
    def functionToCreateContext():StreamingContext={
        //1.获取编程入口,StreamingContext
        val conf = new SparkConf().setMaster("local[2]")
            .setAppName("NetWordCount")
        //第二个参数,表示批处理时长
        val ssc = new StreamingContext(conf, Seconds(2))
        ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_HA")
        /**
          * 2.通过StreamingContext构建第一个DStream(通过网络去读数据)
          * 第一个参数:主机名
          * 第二个参数:端口号
          */
        val ReceiverInputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test", 9999)

        //3.对DStream进行各种的transformation操作
        val wordDS: DStream[(String,Int)] = ReceiverInputDStream.flatMap(msg => {
            msg.split("\\s+")
        }).map(word=>(word,1))
        val updateDS: DStream[(String, Int)] = wordDS.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
            Option(values.sum + state.getOrElse(0))
        })
        //4.对于数据结果进行output操作,这里是打印输出
        updateDS.print()
        //5.提交sparkStreaming应用程序
        ssc.start()
        ssc.awaitTermination()
        ssc
    }
}

测试
?? - 先正常运行一段时间,计算出结果
?? - 停止程序
?? - 再次启动
?? - 验证再次启动的程序,是否能够拿回停止前计算得到的结果
原理
??如果是第一次执行,那么在在这个checkpointDriectory目录中是不存在streamingContext对象的,所以要创建,第二次运行的时候,就不会在创建,则是从checkpointDriectory目录中读取进行恢复。
注意
??正常情况下,使用这种方式的HA,只能持久状态数据到持久化的文件中,默认情况是不会持久化StreamingContext对象到CheckPointDriectory中的。


(3)对checkpoint的总结:

?1)checkpoint的介绍:

??从故障中恢复checkpoint中有两种类型
???- Metadata checkpointing:driver节点中的元数据信息
?????- Configuration:用于创建流式应用程序的配置
?????- DStream:定义streaming程序的DStream操作
?????- Incomplete batches:批量的job排队但尚未完成。(程序上次运行到的位置)
???- Data checkpointing:将生成的RDD保存到可靠的存储
?????- 计算之后生成的RDD
?????- 在receiver接收到数据,转化的RDD

?2)checkpoint的启动时机:

?? - 从运行应用程序的driver的故障中恢复-元数据,(driver的HA)
?? - 使用有状态计算的时候启动checkPoint:updateStateByKey或者reduceByKeyAndWindow…

?3)checkpoint的配置:

?? - 有状态计算的时候:
?? ?ssc.checkpoint("C:\\z_data\\checkpoint")
?? - driver的HA的时候:

ssc.checkpoint("C:\\z_data\\checkpoint")
ssc =StreamingContext.getOrCreate("C:\\z_data\\checkpoint"
,functionToCreateContext)

(4)Spark Streaming 的 transform 操作

??在使用transform操作的时候介绍两个重要的概念:
??黑名单:如果允许的操作比不允许的操作多,那么将不允许的操作加入黑名单
??白名单:如果允许的操作比不允许的操作少,那么将允许的操作加入白名单
代码

object _1Streaming_tranform {
    def main(args: Array[String]): Unit = {
        //定义黑名单
        val black_list=List("@","#","$","%")
        Logger.getLogger("org.apache.hadoop").setLevel(Level.ERROR)
        Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN)
        Logger.getLogger("org.apache.hive").setLevel(Level.WARN)
        //1.获取编程入口,StreamingContext
        val conf = new SparkConf().setMaster("local[2]").setAppName("_1Streaming_tranform")
        val ssc=new StreamingContext(conf,Seconds(2))
        //2.从对应的网络端口读取数据
        val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream("test",9999)
        //2.1将黑名单广播出去
        val bc = ssc.sparkContext.broadcast(black_list)
        //2.2设置checkpoint
        ssc.checkpoint("C:\\z_data\\checkPoint\\checkPoint_1")
        //3业务处理
        val wordDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
        //transform方法表示:从DStream拿出来一个RDD,经过transformsFunc计算之后,返回一个新的RDD
        val fileterdDStream: DStream[String] = wordDStream.transform(rdd=>{
            //过滤掉黑名单中的数据
            val blackList: List[String] = bc.value
            rdd.filter(word=>{
                !blackList.contains(word)
            })
        })
        //3.2统计相应的单词数
        val resultDStream = fileterdDStream.map(msg => (msg, 1))
            .updateStateByKey((values: Seq[Int], stats: Option[Int]) => {
                Option(values.sum + stats.getOrElse(0))
            })
        //4打印output
        resultDStream.print()
        //5.开启streaming流
        ssc.start()
        ssc.awaitTermination()
    }
}

黑名单中的数据会被过滤:


(5)Spark Streaming 的 window 操作


注意
在做window操作时:
  - 窗口覆盖的数据流的时间长度,必须是批处理时间间隔的倍数
  - 前一个窗口到后一个窗口所经过的时间长度,必须是批处理时间间隔的倍数。
伪代码

 //1.获取编程入口,StreamingContext
    val conf = new SparkConf().setMaster("local[2]").setAppName("WordCount_Window")
    val ssc=new StreamingContext(conf,Seconds(batchInvail.toLong))
    //2.从对应的网络端口读取数据
    val inputDStream: ReceiverInputDStream[String] = ssc.socketTextStream(hostname,port.toInt)
    val lineDStream: DStream[String] = inputDStream.flatMap(_.split(" "))
    val wordDStream: DStream[(String, Int)] = lineDStream.map((_,1))

    /**
      * 每隔4秒,算过去6秒的数据
      * reduceFunc:数据合并的函数
      * windowDuration:窗口的大小(过去6秒的数据)
      * slideDuration:窗口滑动的时间(每隔4秒)
      */
    val resultDStream: DStream[(String, Int)] = wordDStream.reduceByKeyAndWindow((kv1:Int, kv2:Int)=>kv1+kv2,
      Seconds(batchInvail.toLong * 3),
      Seconds(batchInvail.toLong * 2))
    resultDStream.print()

    ssc.start()
    ssc.awaitTermination()

(6)Spark Streaming 的ForeachRDD 操作

概念

  • foreach: 遍历一个分布式集合(rdd)中的每一个元素
  • foreachPartition:遍历一个分布式集合(rdd)中的每一个分区
  • foreachRDD:遍历一个分布式集合(DStream)中的每一个RDD
    这个算子用的好,通常程序的性能会提升很多。
    伪代码
    //这个方法表示遍历DStream中的每一个rdd
        windowDS.foreachRDD(rdd=>{
            if(!rdd.isEmpty()){
                rdd.mapPartitions(ptn=>{
                    if(!ptn.isEmpty){
                        ptn.foreach(msg=>{
                            //在这里做相应的操作
                        })
                    }
                })
            }
        })

原文地址:http://blog.51cto.com/14048416/2339589

时间: 2024-10-29 12:42:38

SparkStreaming的实战案例的相关文章

运维实战案例之“Argument list too long”错误与解决方法

作为一名运维人员来说,这个错误并不陌生,在执行rm.cp.mv等命令时,如果要操作的文件数很多,可能会使用通配符批量处理大量文件,这时就可能会出现"Argument list too long"这个问题了. 1.错误现象 这是一台Mysql数据库服务器,在系统中运行了很多定时任务,今天通过crontab命令又添加了一个计划任务,退出时发生了如下报错: #crontab -e 编辑完成后,保存退出,就出现下面如下图所示错误: 2.解决思路 根据上面报错的提示信息,基本判定是磁盘空间满了,

Linux系统shell脚本编程——生产实战案例

Linux系统shell脚本编程--生产实战案例     在日常的生产环境中,可能会遇到需要批量检查内网目前在线的主机IP地址有哪些,还可能需要检查这些在线的主机哪些端口是开放状态,因此依靠手工来检查是可以实现,但比较费时费力,所以需要结合shell脚本来实现批量检查的功能,那么今天就来做个小小的实验. 1.开发脚本前准备 一般大家都知道,测试主机是否在线,常用的命令无非就是ping.nmap,因此,首先找一个地址来测试下ping命令的效果 [[email protected] scripts]

运维实战案例之文件已删除但空间不释放问题解析

1.错误现象 运维的监控系统发来通知,报告一台服务器空间满了,登陆服务器查看,根分区确实没有空间了,如下图所示: 这里首先说明一下服务器的一些删除策略,由于Linux没有回收站功能,我们的线上服务器所有要删除的文件都会首先移动到系统/tmp目录下,然后定期清除/tmp目录下的数据.这个策略本身没有问题,但是通过检查发现这台服务器的系统分区中并没有单独划分/tmp分区,这样/tmp下的数据其实是占用了根分区的空间.既然找到了问题,那么删除/tmp目录下一些大数据即可,执行如下命令,检查/tmp下最

《Web渗透技术及实战案例解析》pdf

下载地址:网盘下载 内容简介 编辑 本书从Web渗透的专业角度,结合网络安全中的实际案例,图文并茂地再现Web渗透的精彩过程.本书共分7章,由浅入深地介绍和分析了目前网络流行的Web渗透攻击方法和手段,并结合作者多年的网络安全实践经验给出了相对应的安全防范措施,对一些经典案例还给出了经验总结和技巧,通过阅读本书可以快速掌握目前Web渗透的主流技术.本书最大的特色就是实用和实战性强,思维灵活.内容主要包括Web渗透必备技术.Google黑客技术.文件上传渗透技术.SQL注入.高级渗透技术.0day

Storm容错机制Acker详解和实战案例

Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树.当acker发现一个Tuple树已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail(). Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪. 我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple.但很遗憾的是,框架不会自动重新发送,需要我们自己手工编

009.实战案例::产品设计实例精解

1.实战案例1 2.实战案例2 3.实战案例3 4.实战案例4 5.实战案例5 6.实战案例6 7.实战案例7 8.实战案例8 9.实战案例9 10.实战案例10 11.实战案例11 12.实战案例12 13.实战案例13 14.实战案例14 15.实战案例15 16.实战案例16 17.实战案例17 18.实战案例18 19.实战案例19:工兵铲 20.实战案例20 21.实战案例21 22.实战案例22 23.实战案例23 24.实战案例24 25.实战案例25 25.实战案例:25V型带轮

安卓实战案例——TextView的使用

上一博客 怎么新建一个安卓项目和运行了建好后自带的HelloWorld程序,各位看官是否觉得学习安卓不是很难呢. 这次本博客主要讲解安卓中文字标签的使用 TextView 组件 我们可以先看下 安卓API 由上图 我们可以知道TextView 静态文本显示的组件,并且我们可以对文本进行一定的编辑. TextView 在android.widget包中(我们在Aandroid中使用的具体视图组件,都是在android.widget包中的),而android.widget.TextView是andr

Vue2.0入坑教程— 实战案例

前言:下面我们将一起来学习制作一个简单vue的实战案例. 说明:默认我们已经用vue-cli(vue脚手架或称前端自动化构建工具)创建好项目了 一. 项目说明 ps:这个简单小项目只提供一个小小小的骨架,需要向"它"身上具体加多少"肉",需要大家考虑好功能和布局后进行完善. 1.首先看下主页效果:如下图 主页分析:大体上分为上(header).中(body或content).下(footer)三部分,中间body部分是由若干个相同的li组成的"列表&quo

Vue2.0史上最全入坑教程(下)—— 实战案例

前言:经过前两节的学习,我们已经可以创建一个vue工程了.下面我们将一起来学习制作一个简单的实战案例. 说明:默认我们已经用vue-cli(vue脚手架或称前端自动化构建工具)创建好项目了 一. 项目说明 ps:这个简单小项目只提供一个小小小的骨架,需要向"它"身上具体加多少"肉",需要大家考虑好功能和布局后进行完善. 1.首先看下主页效果:如下图 主页分析:大体上分为上(header).中(body或content).下(footer)三部分,中间body部分是由