SparkStreaming

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等

和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。

离线数据:不可改变数据;实时数据:改变对数据;  流式处理批量处理

批量(微批次,不是流式处理)

SparkStreaming架构

WordCount案例

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

StreamingContext中有这个构造方法: def this(conf: SparkConf, batchDuration: Duration)
//测试Spark实时计算
object StreamWordCount {
  def main(args: Array[String]): Unit = {

    //创建配置对象
    val conf: SparkConf = new SparkConf().setAppName("Streaming").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))
    //通过监控端口创建DStream,读进来的数据为一行行
    val socket: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)
    //将每一行数据做切分,形成一个个单词  读取是按一行一行来读 line ==> word
    val dsTream: DStream[String] = socket.flatMap(_.split(" "))
    //将单词映射成元组(word,1)
    val word: DStream[(String, Int)] = dsTream.map((_, 1))
    //reduceByKey
    val wordCount: DStream[(String, Int)] = word.reduceByKey(_+_)
      //打印
    wordCount.print()
    //启动采集器
    streamContext.start()
    //Driver不能停止,等待采集器的结束
    streamContext.awaitTermination()
  }
[[email protected] ~]$ nc -lk 9999
Hello world
Hello
Hello java
Hello spark

如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件放中resources里边,日志级别改成ERROR

DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据

1. 文件数据源

文件数据流:能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取,Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目录。

streamingContext.textFileStream(dataDirectory), 其他与上边代码相同;

注意事项:

1)文件需要有相同的数据格式;

2)文件进入 dataDirectory的方式需要通过移动或者重命名来实现;

3)一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;

2. 自定义数据源

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。  自定义数据源,实现监控某个端口号,获取该端口号内容。

自定义数据采集器:

// 自定义数据采集器
class CustomerReceive(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ //有一个构造方法
  var socket: Socket = null
  //读数据并将数据发送给Spark
  def receive(): Unit = {
    //创建一个Socket
    val socket = new Socket(host, port)
    //字节流 ---->字符流
    val inputStream: InputStream = socket.getInputStream //字节流
    //字符流
    val bufferedReader: BufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"))
    var line: String = null
    while ((line = bufferedReader.readLine()) != null){
      if (!"--END--".equals(line)){
        store(line) //存储到这里边
      }else{
        return
      }
    }
  }
  //启动采集器
  //最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
  override def onStart(): Unit = {
    new Thread(new Runnable{
      override def run(): Unit = {
        receive()
      }
    }).start()
  }
  //关闭采集器
  override def onStop(): Unit = {
    if (socket != null){
      socket.close()
      socket = null
    }
  }

}
//测试:
object FileStream {
  def main(args: Array[String]): Unit = {
    // 创建流式处理环境对象
    // 创建对象时,需要传递采集数据的周期(时间)
    val conf: SparkConf = new SparkConf().setAppName("Streaming").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))

    // 从端口号获取数据
    val socketDStream: ReceiverInputDStream[String] = streamContext.receiverStream(new CustomerReceive("hadoop101", 9999))
    // 一行一行的数据 line ==> word
    val wordDStream: DStream[String] = socketDStream.flatMap(_.split(" "))
    // word ==> (word, 1)
    val wordToCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
    // reduceByKey
    val wordToSumDStream: DStream[(String, Int)] = wordToCountDStream.reduceByKey(_ + _)
    //打印数据
    wordToSumDStream.print()
    // TODO 启动采集器
    streamContext.start()
    // TODO Driver不能停止,等待采集器的结束
    // wait, sleep
    streamContext.awaitTermination()

  }
}

3. Kafka数据源(重点)

KafkaUtils 对象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息创建出 DStream。由于 KafkaUtils 可以订阅多个主题,因此它创建出的 DStream 由成对的主题和消息组成。要创建出一个流数据,需要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper 主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用 createStream() 方法。

//监听kafka消息
object KafkaStreaming {
  def main(args: Array[String]): Unit = {
    // 创建配置对象
    val sparkConf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]")
    // 创建流式处理环境对象
    // 创建对象时,需要传递采集数据的周期(时间)
    val socket: StreamingContext = new StreamingContext(sparkConf, Seconds(5))

    // 一个类如果创建SparkContext,那么这个类我们称之为Driver类
    // 从Kafka集群中获取数据
    //定义kafka参数
    val kafkaParams = Map[String, String](
      "group.id" -> "kris",
      "zookeeper.connect" -> "hadoop101:2181",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->"org.apache.kafka.common.serialization.StringDeserializer",//StringDeserializer的全类名,StringDeserializer implements Deserializer<String>
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"
    ) //别导错包流,是kafka.clients.consumer里对
    //定义topic参数
    val topicMap = Map("thrid" -> 3)

    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      socket,
      kafkaParams,
      topicMap,
      StorageLevel.MEMORY_ONLY) //StorageLevel别导错包流
    val wordToCountDStream = kafkaDStream.map {
      case (k, v) => {(v, 1)}
    }
    val wordToSumDStream: DStream[(String, Int)] = wordToCountDStream.reduceByKey(_ + _)
    //打印数据
    wordToSumDStream.print()
    //启动采集器
    socket.start()
    //Driver不能停,等待采集器对结束
    socket.awaitTermination()
  }
}

启动kafka并中控制台启动一个生产者

[[email protected] kafka]$ bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic thrid

打印:

-------------------------------------------
Time: 1555065970000 ms
-------------------------------------------
(Hello world,1)

-------------------------------------------
Time: 1555065975000 ms
-------------------------------------------
(Hello,1)

-------------------------------------------
Time: 1555065980000 ms
-------------------------------------------
(Hello,1)
(java,1)

-------------------------------------------
Time: 1555065985000 ms
-------------------------------------------
(spark,1)

-------------------------------------------

DStream转换

DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

4. 有状态转化操作(重点)

UpdateStateByKey

UpdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。

updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。

updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步: 
1. 定义状态,状态可以是一个任意的数据类型。 
2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。(key只要相同它的状态就会更新)

key单词相同了就会形成一个数量对集合,Seq[Int]就是那个数量(比如Hello, 1; Hello, 1;Seq即1 1 1);Option只有两个值(some有值和none没值),为了解决空指针出现对类,不用判断当前对象是否为空,可直接使用option

把数据保持中CheckPoint,buffer临时对缓冲

//SparkStreaming有状态转换操作
object DStreamState {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Stream").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(5))
    //设置Checkpoints的目录
    streamContext.sparkContext.setCheckpointDir("cp")
    val socketDStream: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)

    val wordDStream: DStream[String] = socketDStream.flatMap(_.split(" "))

    val wordToCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
//    进行有状态的转换操作
    val resultDStream: DStream[(String, Long)] = wordToCountDStream.updateStateByKey {// 要加范型
      case (seq, buffer) => {  //seq序列当前周期中单词对数量对集合, buffer表缓冲当中的值,所谓的checkPoint
        val sumCount = seq.sum + buffer.getOrElse(0L)
        Option(sumCount) //表往缓存里边更新对值  它需要返回一个Option
      }
    }
    resultDStream.print()
    streamContext.start()
    streamContext.awaitTermination()

  }
}

打印:

有状态转换操作
-------------------------------------------
Time: 1555070600000 ms
-------------------------------------------
(Hello,1)
(world,1)

-------------------------------------------
Time: 1555070605000 ms
-------------------------------------------
(Hello,2)
(world,2)

-------------------------------------------
Time: 1555070610000 ms
-------------------------------------------
(Hello,3)
(java,1)
(world,2)

-------------------------------------------
Time: 1555070615000 ms
-------------------------------------------
(Hello,3)
(java,1)
(world,2)

Window Operations

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

窗口数据是指一段时间范围内对数据作为一个整体使用,随着时间对推移,窗口数据也会发生变化,这样对函数称为窗口函数,并且这个窗口可以发生变化,也称为滑动窗口

object DStreamWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Stream").setMaster("local[*]")
    val streamContext: StreamingContext = new StreamingContext(conf, Seconds(3))

    val socketDStream: ReceiverInputDStream[String] = streamContext.socketTextStream("hadoop101", 9999)

    // 设定数据窗口:window
    // 第一个参数表示窗口的大小(时间的范围,应该为采集周期的整数倍)
    // 第二个参数表示窗口的滑动的幅度(时间的范围,应该为采集周期的整数倍)
    val windowDStream: DStream[String] = socketDStream.window(Seconds(6), Seconds(3))
    val wordDStream: DStream[String] = windowDStream.flatMap(_.split(" "))
    val wordCountDStream: DStream[(String, Int)] = wordDStream.map((_, 1))
    val wordSumDStream: DStream[(String, Int)] = wordCountDStream.reduceByKey(_+_)

    wordSumDStream.print()
    streamContext.start()
    streamContext.awaitTermination()
  }
}

Transform

Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

Transform和map对区别:

    // TODO XXXXXX (Drvier) * 1
    wordSumDStream.map{
      case(word, sum) => {
        // TODO YYYYYY (Executor) * N
        (word, 1)
      }
    }
    // transform可以将DStream包装好的RDD抽取出来进行转换操作
    // transform可以在每一个采集周期对rdd进行操作
    // TODO AAAAAA (Driver) * 1
    wordSumDStream.transform{
      rdd => {
        // TODO BBBBBBB (Driver) * N
        rdd.map{
          case (word, sum) => {
            // TODO CCCCCC (Executor) * N
            (word, 1)
          }
        }
      }
    }

DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

输出操作如下:

(1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。

(2)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”.

(3)saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。

(4)saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。
Python API Python中目前不可用。

(5)foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。

通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。

比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 注意:

(1)连接不能写在driver层面;

(2)如果写在foreach则每个RDD都创建,得不偿失;

(3)增加foreachPartition,在分区创建。

原文地址:https://www.cnblogs.com/shengyang17/p/10699511.html

时间: 2024-08-02 17:52:28

SparkStreaming的相关文章

spark-streaming读kafka数据到hive遇到的问题

在项目中使用spark-stream读取kafka数据源的数据,然后转成dataframe,再后通过sql方式来进行处理,然后放到hive表中, 遇到问题如下,hive-metastor在没有做高可用的情况下,有时候会出现退出,这个时候,spark streaminG的微批作业就会失败, 然后再启重动hive-metastore进程后,作业继续正常执行,数据就有丢失. 分析如下: 第一步,观察日志发现, 我原来的代码这么写的: xx.foreachRdd(rdd=> processRdd(rdd

基于spark和sparkstreaming的word2vec

概述 Word2vec是一款由谷歌发布开源的自然语言处理算法,其目的是把words转换成vectors,从而可以用数学的方法来分析words之间的关系.Spark其该算法进行了封装,并在mllib中实现. 整体流程是spark离线训练模型,可以是1小时1训练也可以1天1训练,根据具体业务来判断,sparkstreaming在线分析. 由于历史问题,spark还在用1.5.0,接口上和2.1还是有点区别,大概看了下文档,流程上差不多 spark离线训练 如下代码,通过word2vec训练出一个模型

Spark之SparkStreaming案例

一.Spark Streaming的介绍 ??Spark Streaming是Spark 核心API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理. 数据可以从诸如Kafka,Flume,Kinesis或TCP套接字的许多来源中获取,并且可以使用由高级功能(如map,reduce,join和window)表达的复杂算法进行处理. 最后,处理后的数据可以推送到文件系统,数据库和实时仪表板. 事实上,您可以在数据流上应用Spark的机器学习和图形处理算法. ??在内部,它的工作原理如下. S

python3+spark2.1+kafka0.8+sparkStreaming

python代码: import time from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from operator import add sc = SparkContext(master="local[1]",appName="PythonSparkStreamingR

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一.Hadoop配置安装 注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库, 所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译 1.修改Linux主机名 2.修改IP 3.修改主机名和IP的映射关系 ######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机.阿里云主机等) /etc/hosts里面要配置的是内网IP地址和主机名的映射关系 4.关闭防火墙 5.s

Spark版本定制第3天:通过案例对SparkStreaming透彻理解之三

本期内容: 1 解密Spark Streaming Job架构和运行机制 2 解密Spark Streaming 容错架构和运行机制 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应

第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密

一:Spark集群开发环境准备 启动HDFS,如下图所示: 通过web端查看节点正常启动,如下图所示: 2.启动Spark集群,如下图所示: 通过web端查看集群启动正常,如下图所示: 3.启动start-history-server.sh,如下图所示: 二:HDFS的SparkStreaming案例实战(代码部分) package com.dt.spark.SparkApps.sparkstreaming; import org.apache.spark.SparkConf; import o

Spark版本定制第2天:通过案例对SparkStreaming透彻理解之二

本期内容: 1 解密Spark Streaming运行机制 2 解密Spark Streaming架构 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序.如果可以掌握Spark

SparkStreaming与Kafka整合遇到的问题及解决方案

前言 最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志. 实现 Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式. 一. 基于Receiver方式

Spark源码定制第一课:通过案例对SparkStreaming透彻理解三板斧之一

第一课:通过案例对SparkStreaming透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析 本期导读: 1 Spark源码定制选择从SparkStreaming入手: 2 Spark Streaming另类在线实验: 3 瞬间理解SparkStreaming本质. 1.    从Spark Streaming入手开始Spark源码版本定制之路 1.1           从Spark Streaming入手Spark源码版本定制之路的理由 从