spark streaming (二)

一、基础核心概念

1、StreamingContext详解 (一)

有两种创建StreamingContext的方式:
             val conf = new SparkConf().setAppName(appName).setMaster(master);
             val ssc = new StreamingContext(conf, Seconds(1));

StreamingContext, 还可以使用已有的SparkContext来创建
              val sc = new SparkContext(conf)
              val ssc = new StreamingContext(sc, Seconds(1));

appName, 是用来在Spark UI上显示的应用名称。 master, 是一个Spark、 Mesos或者Yarn集群的URL, 或者是local[*]。

2、StreamingContext详解 (二)

一个StreamingContext定义之后, 必须做以下几件事情:
                  1、 通过创建输入DStream来创建输入数据源。
                  2、 通过对DStream定义transformation和output算子操作, 来定义实时计算逻辑。
                  3 、 调用StreamingContext的start()方法, 来开始实时处理数据。
                  4、 调用StreamingContext的awaitTermination()方法, 来等待应用程序的终止。 可以使用CTRL+C手动停止,或者就是让它持续不断的运行进行计算。
                  5、 也可以通过调用StreamingContext的stop()方法, 来停止应用程序。

需要注意的要点:
                  1、 只要一个StreamingContext启动之后, 就不能再往其中添加任何计算逻辑了。 比如执行start()方法之后, 还给某个DStream执行一个算子。
                  2、 一个StreamingContext停止之后, 是肯定不能够重启的。 调用stop()之后, 不能再调用start()
                  3、 一个JVM同时只能有一个StreamingContext启动。 在你的应用程序中, 不能创建两个StreamingContext。
                  4、 调用stop()方法时, 会同时停止内部的SparkContext, 如果不希望如此, 还希望后面继续使用SparkContext创建其他类型的Context, 比如SQLContext, 那么就用stop(false)。
                  5、 一个SparkContext可以创建多个StreamingContext, 只要上一个先用stop(false)停止, 再创建下一个即可。

3、输入DStream和Receiver详解(一)

输入DStream代表了来自数据源的输入数据流。 在之前的wordcount例子中, lines就是一个输入 DStream( JavaReceiverInputDStream) , 代表了从netcat( nc) 服务接收到的数据流。 除了 文件数据流之外, 所有的输入DStream都会绑定一个Receiver对象,

该对象是一个关键的组件, 用来从数据源接收数据, 并将其存储在Spark的内存中, 以供后续处理。 
                 Spark Streaming提供了三种内置的数据源支持;
                              1、 基础数据源: StreamingContext API中直接提供了对这些数据源的支持, 比如文件、 socket、 Akka Actor等。
                              2、 高级数据源: 诸如Kafka、 Flume、 Kinesis、 Twitter等数据源, 通过第三方工具类提供支持。 这些数据源的使用, 需要引用其依赖。
                              3、 自定义数据源: 我们可以自己定义数据源, 来决定如何接受和存储数据。

4、输入DStream和Receiver详解(二)

如果你想要在实时计算应用中并行接收多条数据流, 可以创建多个输入DStream。 这样就会创建多个 Receiver, 从而并行地接收多个数据流。 但是要注意的是, 一个Spark Streaming Application的 Executor, 是一个长时间运行的任务, 因此,

它会独占分配给Spark Streaming Application的cpu core。从而只要Spark Streaming运行起来以后, 这个节点上的cpu core, 就没法给其他应用使用了。

使用本地模式, 运行程序时, 绝对不能用local或者local[1], 因为那样的话, 只会给执行输入DStream的 executor分配一个线程。 而Spark Streaming底层的原理是, 至少要有两条线程, 一条线程用来分配给 Receiver接收数据, 一条线程用来处理接收到的数据。

因此必须使用local[n], n>=2的模式。     (n不能大于当前节点的CPU核数)
                    如果不设置Master, 也就是直接将Spark Streaming应用提交到集群上运行, 那么首先, 必须要求集群 节点上, 有>1个cpu core, 其次, 给Spark Streaming的每个executor分配的core, 必须>1, 这样, 才能保证分配到executor上运行的输入DStream,

两条线程并行, 一条运行Receiver, 接收数据; 一条处 理数据。 否则的话, 只会接收数据, 不会处理数据。

总结:Receiver接收器

Receiver接收器,可以接收外部数据源中的数据,并将其保存到
内存中,以供后续使用。
在Spark中支持三大类型的数据源:
1、基础数据源:比如文件、Socket、Akka中的数据。
2、高级数据源,比如Flume、Kafka、推特中的数据。
3、自定义数据源。
补充:在Spark Streaming中,可以通过两种方式操作Kafka的数据。
一种是通过Receiver的方式,另一种Direct直接读取的方式。

5、输入DStream之基础数据源
               (1)Socket: 之前的wordcount例子,

object WordCoundStreaming {
  def main(args: Array[String]): Unit = {
    /**
      * 处理Spakr Streaming程序至少需要2个线程,其中,
      * 一个线程负责接收输入的数据
      * 另一个线程负责处理接收的数据
      * local[N] N>=2
      */
    val conf=new SparkConf().setAppName("WordCoundStreaming")
              .setMaster("local[2]")
    /**
      * SparkContext是用户与Spark集群交互的唯一接口,所以SparkContext是必需的。
      * 在创建StreamingContext的过程中,Spark会在源码中自动创建一个SparkContext对象。
      * 注意第二个参数Seconds(*),表示实时流数据中每批数据的时间间隔,
      * 也就是说,在DStream离散流中的每个RDD包含相应时间间隔的数据。
      */
    val ssc=new StreamingContext(conf,Seconds(5))
    /**
      * 通过socketTextStream()获取nc服务器中的数据
      * 需要指明获取nc服务器的节点名称和端口,这是的端口要和运行nc服务器的端口一致。
      * 此时产生的lines不是RDD,而是一个DStream离散流。
      */
    val lines=ssc.socketTextStream("tgmaster",9999)
    /**
      * 对离散流lines进行flatMap转换操作,实际上是对lines离散流中的每个RDD都进行
      * flatMap操作,从而产生了新的RDD,多个新的RDD构成了新的离散流words。
      */
    val words=lines.flatMap(_.split(" "))
    val pairs=words.map((_,1))
    val result=pairs.reduceByKey(_+_)

    //在控制台输出内容
    result.print()
    //启动StreamingContext
    ssc.start()
    //等待程序停止
    ssc.awaitTermination()
  }
}

    (2)基于HDFS文件的实时计算, 其实就是, 监控一个HDFS目录, 只要其中有新文件出现, 就实时处理。相当于处理实时的文件流。 
               streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
               streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
               Spark Streaming会监视指定的HDFS目录, 并且处理出现在目录中的文件。
                要注意的是, 所有放入HDFS目录中的文件, 都必须有相同的格式; 必须使用移动或者重命名的方式,将文件移入目录; 一旦处理之后, 文件的内容即使改变, 也不会再处理了; 基于HDFS文件的数据源是没有Receiver的, 因此不会占用一个cpu core。

 def main(args: Array[String]): Unit = {
    val conf =new SparkConf().setAppName("word").setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))
    val lines=ssc.textFileStream("hdfs://liuwei1:9000/homework")
    val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }

二、DStream的transformation

总结:DStream中Transformation类型的算子有以下三个最重要
1)updateStateByKey
可以为每个Key保留一份状态,并更新状态的值。
案例:在全局范围内统计每个单词出现的次数。
2)transform
可以执行RDD到RDD的操作,相当于对DStream API的一个补充。
3)window
滑动窗口操作,需要指明两个参数,一个是窗口的长度,另一个是
窗口滑动的时间间隔。

三、updateStateBykey  (全局范围之内处理数据,而不是一批一批的)
      updateStateByKey
    updateStateByKey操作, 可以让我们为每个key维护一份state, 并持续不断的更新该state。
          1、 首先, 要定义一个state, 可以是任意的数据类型;
          2、 其次, 要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。
    对于每个batch, Spark都会为每个之前已经存在的key去应用一次state更新函数, 无论这个key在batch中是否有新的数据。 如果state更新函数返回none, 那么key
对应的state就会被删除。 当然, 对于每个新出现的key, 也会执行state更新函数。
注意, updateStateByKey操作, 要求必须开启Checkpoint机制。
案例: 基于缓存的实时wordcount程序( 在实际业务场景中, 这个是非常有用的)

object updateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("updateStateByKey")
            .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))
    //updateStateByKey操作,要求必须开启Checkpoint机制。
    ssc.checkpoint("hdfs://tgmaster:9000/in/ch")

    val lines=ssc.socketTextStream("tgmaster",9999)
    val pairs=lines.flatMap(_.split(" "))
              .map((_,1))
    /**
      * values:多个新值的集合
      * state:是一个Option类型的状态,
      * 可以通过state.getOrElse(0)为newValue设置初始值
      */
    val result=pairs.updateStateByKey((values:Seq[Int],state:Option[Int])=>{
      //创建newValue并设初始值0
      var newValue=state.getOrElse(0)
      /**
        * 遍历values集合的新值,用以改变原先的旧值
        */
      for(value <- values){
        newValue+=value
      }
      /**
        * updateStateByKey算子需要一个Option类型的返回值,
        * 所以将更新之后的新值作为Option返回。
        */
      Option(newValue)
    })

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

四、transform

transform操作, 应用在DStream上时, 可以用于执行任意的RDD到RDD的转换操作。 它可以用于实现, DStream API中所没有提供的操作。 比如说, DStream API中, 并没有提 供将一个DStream中的每个batch, 与一个特定的RDD进行join的操作。

但是我们自己就 可以使用transform操作来实现该功能。
     DStream.join(), 只能join其他DStream。 在DStream每个batch的RDD计算出来之后, 会 去跟其他DStream的RDD进行join。
案例: 广告计费日志实时黑名单过滤

/**
  * Created by tg on 3/29/17.
  * 实时黑名单过滤
  */
object transformDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("transformDemo")
                .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))

    //模拟数据,创建黑名单RDD,(String,Boolean) (name,Boolean)
    val blackRDD=ssc.sparkContext.parallelize(Array(("tom",true)))
    //从nc服务器中获取数据,nc服务器中的数据格式:time name,比如: 1101 jack
    val linesDStream=ssc.socketTextStream("tgmaster",9999)
    val mapDStream=linesDStream.map(line=>{
      val log=line.split(" ")
      (log(1),line) //(name,time name)
    })
    /**
      * tansform()算子可以执行RDD到RDD的转换操作
      */
    mapDStream.transform(adsRDD=>{
      //让adsRDD与blackRDD进行leftOuterJoin左外连接操作,操作之后的数据包含所有的用户
     // (String,(String,Boolean))
      val joinRDD=adsRDD.leftOuterJoin(blackRDD)
      val filterRDD=joinRDD.filter(x=>{
        /**
          * 当if条件成立时,意味着是黑名单人员,返回false将其删除。
          * 当if条件不成立时,意味着不是黑名单人员,返回true将其保留。
          */
        if(x._2._2.getOrElse(false)) false else true
      })
      filterRDD.map(x=>{
          x._2._1
      })
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

五、window滑动窗口

Spark Streaming提供了滑动窗口操作的支持, 从而让我们可以对一个滑动窗口内的数据执行计算 操作。
     每次掉落在窗口内的RDD的数据, 会被聚合起来执行计算操作, 然后生成的RDD, 会作为window DStream的一个RDD。 比如下图中, 就是对每三秒钟的数据执行一次滑动窗口计算, 这3秒内的3个 RDD会被聚合起来进行处理, 然后过了两秒钟, 又会对最近三秒内的数据执行滑动窗口计算。 所以 每个滑动窗口操作, 都必须指定两个参数, 窗口长度以及滑动间隔, 而且这两个参数值都必须是 batch间隔的整数倍。 ( Spark Streaming对滑动窗口的支持, 是比Storm更加完善和强大的) 

window滑动窗口操作

案例:
      热点搜索词滑动统计, 每隔5秒钟, 统计最近20秒钟的搜索词的搜索频次, 并打印 出排名最靠前的3个搜索词以及出现次数

package SparkCore.day1

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by tg on 3/29/17.
  * 热点搜索词滑动统计,每隔5秒钟,统计最近20秒钟的搜索词的搜索频次,
  * 并打印出排名最靠前的3个搜索词以及出现次数。
  */
object reduceByKeyAndWindowDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("reduceByKeyAndWindowDemo")
              .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(1))
    //从nc服务器中获取数据,数据格式:name keyword,比如:张三 Spark
    val linesDStream=ssc.socketTextStream("tgmaster",9999)
    //取出每行的搜索关键词
    val wordsDStream=linesDStream.map(x=>x.split(" ")(1))
    val pairsDStream=wordsDStream.map(x=>(x,1))
    /**
      * 通过滑动窗口进行统计,每隔5秒钟,统计最近20秒钟的搜索词出现次数。
      * reduceByKeyAndWindow()算子中,
      * 第一部分v1+v2用于计算搜索词出现次数
      * 第二部分Seconds(20)用于设置窗口的长度
      * 第三部分Seconds(5)用于设置窗口的时间间隔
      */
    val resultDStream=pairsDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>
    v1+v2,Seconds(20),Seconds(5))

    resultDStream.transform(itemRDD=>{
      val result=itemRDD.map(x=>(x._2,x._1))
        .sortByKey(false).take(3)
        .map(x=>(x._2,x._1))

      val resultRDD=ssc.sparkContext.parallelize(result)
      resultRDD
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  

时间: 2024-11-08 01:31:22

spark streaming (二)的相关文章

Spark Streaming源码解读之Job动态生成和深度思考

本博文主要包含以下内容: 1. Spark Streaming Job 生成深度思考 2 .Spark Streaming Job 生成源码解析 一 :Spark Streaming Job 生成深度思考 输入的DStream有很多来源Kafka.Socket.Flume,输出的DStream其实是逻辑级别的Action,是Spark Streaming框架提出的,其底层翻译成为物理级别的Action,是RDD的Action,中间是处理过程是transformations,状态转换也就是业务处理

83课:Scala和Java二种方式实战Spark Streaming开发

一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming 基于Spark Core进行计算,需要注意事项: 设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有

spark streaming测试之二使用网络数据源

测试思路: 首先,创建网络数据源数据发送器(程序一): 其次,创建spark接收数据程序(程序二): 接着,将程序一打包,放在服务器上执行.这里有三个参数分别是:所要发送的数据文件,通过哪个端口号发送,每隔多少毫秒发送一次数据: 最后,运行spark程序,这里每隔5秒处理一次数据.有两个参数:监听的端口号,每隔多少毫秒接收一次数据. 观察效果. 程序一: sparkStreaming import java.io.PrintWriter import java.net.ServerSocket

第83课:Scala和Java二种方式实战Spark Streaming开发

一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming 基于Spark Core进行计算,需要注意事项: 设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有

第83讲:Scala和Java二种方式实战Spark Streaming开发

一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming 基于Spark Core进行计算,需要注意事项: 设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave2 具体请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 2. 安装zookeeper分布式集群具体请参考<Kafka:ZK+Kafka+Spark Streaming集群

Spark 定制版~Spark Streaming(二)

本讲内容: a. 解密Spark Streaming运行机制 b. 解密Spark Streaming架构 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾: 上节课谈到技术界的寻龙点穴,Spark就是大数据的龙脉,而Spark Streaming就是Spark的穴位.假如要构建一个强大的Spark应用程序 ,Spark Streaming 是一个值得借鉴的参考,Spark Streaming涉及多个job交叉配合,几乎可以包括spark的所

通过案例对 spark streaming 透彻理解三板斧之二:spark streaming运行机制

本期内容: 1. Spark Streaming架构 2. Spark Streaming运行机制 Spark大数据分析框架的核心部件: spark Core.spark  Streaming流计算.GraphX图计算.MLlib机器学习.Spark SQL.Tachyon文件系统.SparkR计算引擎等主要部件. Spark Streaming 其实是构建在spark core之上的一个应用程序,要构建一个强大的Spark应用程序 ,spark  Streaming是一个值得借鉴的参考,spa

Spark定制班第2课:通过案例对Spark Streaming透彻理解三板斧之二:解密Spark Streaming运行机制和架构

本期内容: 1 解密Spark Streaming运行机制 2 解密Spark Streaming架构 1 解密Spark Streaming运行机制 我们看看上节课仍没有停下来的Spark Streaming程序运行留下的信息. 这个程序仍然在不断地循环运行.即使没有接收到新数据,日志中也不断循环显示着JobScheduler.BlockManager.MapPartitionsRDD.ShuffledRDD等等信息.这些都是Spark Core相关的信息.其循环的依据,也就是时间这个维度.