Spark学习之路 (二十三)SparkStreaming的官方文档

一、SparkCore、SparkSQL和SparkStreaming的类似之处

二、SparkStreaming的运行流程

2.1 图解说明

2.2 文字解说

1、我们在集群中的其中一台机器上提交我们的Application Jar,然后就会产生一个Application,开启一个Driver,然后初始化SparkStreaming的程序入口StreamingContext;

2、Master会为这个Application的运行分配资源,在集群中的一台或者多台Worker上面开启Excuter,executer会向Driver注册;

3、Driver服务器会发送多个receiver给开启的excuter,(receiver是一个接收器,是用来接收消息的,在excuter里面运行的时候,其实就相当于一个task任务)

4、receiver接收到数据后,每隔200ms就生成一个block块,就是一个rdd的分区,然后这些block块就存储在executer里面,block块的存储级别是Memory_And_Disk_2;

5、receiver产生了这些block块后会把这些block块的信息发送给StreamingContext;

6、StreamingContext接收到这些数据后,会根据一定的规则将这些产生的block块定义成一个rdd;

三、SparkStreaming的3个组成部分

四、 离散流(DStream)

五、小栗子

5.1 简单的单词计数

Scala代码

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object NetWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sparkContext = new SparkContext(conf)
    val sc = new StreamingContext(sparkContext,Seconds(2))
    /**
      * 数据的输入
      * */
    val inDStream: ReceiverInputDStream[String] = sc.socketTextStream("bigdata",9999)
    inDStream.print()
    /**
      * 数据的处理
      * */
    val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
    /**
      * 数据的输出
      * */
    resultDStream.print()

    /**
      *启动应用程序
      * */
    sc.start()
    sc.awaitTermination()
    sc.stop()
  }
}

在Linux上执行以下命令

运行结果

5.2 监控HDFS上的一个目录

HDFS上的目录需要先创建

Scala代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HDFSWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val sc = new StreamingContext(conf,Seconds(2))

    val inDStream: DStream[String] = sc.textFileStream("hdfs://hadoop1:9000/streaming")
    val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
    resultDStream.print()

    sc.start()
    sc.awaitTermination()
    sc.stop()
  }
}

Linux上的命令

student.txt

95002,刘晨,女,19,IS
95017,王风娟,女,18,IS
95018,王一,女,19,IS
95013,冯伟,男,21,CS
95014,王小丽,女,19,CS
95019,邢小丽,女,19,IS

运行结果,默认展示的10条

5.3 第二次运行的时候更新原先的结果

Scala代码

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UpdateWordCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    System.setProperty("HADOOP_USER_NAME","hadoop")
    val sparkContext = new SparkContext(conf)

    val sc = new StreamingContext(sparkContext,Seconds(2))

    sc.checkpoint("hdfs://hadoop1:9000/streaming")
    val inDStream: ReceiverInputDStream[String] = sc.socketTextStream("hadoop1",9999)

    val resultDStream: DStream[(String, Int)] = inDStream.flatMap(_.split(","))
      .map((_, 1))
      .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
        val currentCount: Int = values.sum
        val lastCount: Int = state.getOrElse(0)
        Some(currentCount + lastCount)
      })
    resultDStream.print()

    sc.start()
    sc.awaitTermination()
    sc.stop()
  }
}

Linux运行命令

运行结果

5.4 DriverHA

5.3的代码一直运行,结果可以一直累加,但是代码一旦停止运行,再次运行时,结果会不会接着上一次进行计算,上一次的计算结果丢失了,主要原因上每次程序运行都会初始化一个程序入口,而2次运行的程序入口不是同一个入口,所以会导致第一次计算的结果丢失,第一次的运算结果状态保存在Driver里面,所以我们如果想用上一次的计算结果,我们需要将上一次的Driver里面的运行结果状态取出来,而5.3里面的代码有一个checkpoint方法,它会把上一次Driver里面的运算结果状态保存在checkpoint的目录里面,我们在第二次启动程序时,从checkpoint里面取出上一次的运行结果状态,把这次的Driver状态恢复成和上一次Driver一样的状态

原文地址:https://www.cnblogs.com/qingyunzong/p/9026429.html

时间: 2024-11-08 14:31:38

Spark学习之路 (二十三)SparkStreaming的官方文档的相关文章

Spark学习之路 (二十三)SparkStreaming的官方文档[转]

SparkCore.SparkSQL和SparkStreaming的类似之处 SparkStreaming的运行流程 1.我们在集群中的其中一台机器上提交我们的Application Jar,然后就会产生一个Application,开启一个Driver,然后初始化SparkStreaming的程序入口StreamingContext: 2.Master会为这个Application的运行分配资源,在集群中的一台或者多台Worker上面开启Excuter,executer会向Driver注册: 3

Spark学习之路 (十三)SparkCore的调优之资源调优JVM的基本架构

讨论QQ:1586558083 目录 一.JVM的结构图 1.1 Java内存结构 1.2 如何通过参数来控制各区域的内存大小 1.3 控制参数 1.4 JVM和系统调用之间的关系 二.JVM各区域的作用 2.1 Java堆(Heap) 2.2 方法区(Method Area) 2.3 程序计数器(Program Counter Register) 2.4 JVM栈(JVM Stacks) 2.5 本地方法栈(Native Method Stacks) 正文 回到顶部 一.JVM的结构图 1.1

嵌入式Linux驱动学习之路(二十三)NAND FLASH驱动程序

NAND FLASH是一个存储芯片. 在芯片上的DATA0-DATA7上既能传输数据也能传输地址. 当ALE为高电平时传输的是地址. 当CLE为高电平时传输的是命令. 当ALE和CLE都为低电平时传输的是数据. 将数据发给nand Flash后,在发送第二次数据之前还要判断芯片是否处于空闲状态.一般是通过引脚RnB来判断,一般是高电平代表就绪,低电平代表正忙. 操作Nand Flash的一般步骤是: 1. 发命令 选中芯片 CLE设置为高电平 在DATA0-DATA7上输出命令值 发出一个写脉冲

winform学习日志(二十三)---------------socket(TCP)发送文件

一:由于在上一个随笔的基础之上拓展的所以直接上代码,客户端: using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using System.Net.Sockets; using Sys

Android学习路线(二十三)运用Fragment构建动态UI——Fragment间通讯

先占个位置,下次翻译 :p In order to reuse the Fragment UI components, you should build each as a completely self-contained, modular component that defines its own layout and behavior. Once you have defined these reusable Fragments, you can associate them with

java痛苦学习之路[二] ---JSONObject使用

一.Strut2必须引入的包 要使程序可以运行必须引入JSON-lib包,JSON-lib包同时依赖于以下的JAR包: 1.commons-lang.jar 2.commons-beanutils.jar 3.commons-collections.jar 4.commons-logging.jar 5.ezmorph.jar 6.json-lib-2.2.2-jdk15.jar 当然除了这些包,strut2基础包也得引入 struts2-core-2.1.6.jar freemarker-2.

Spring 4 官方文档学习(十二)View技术

1.介绍 Spring 有很多优越的地方,其中一个就是将view技术与MVC框架的其他部分相隔离.例如,在JSP存在的情况下使用Groovy Markup Templates 还是使用Thymeleaf,仅仅是一个配置问题. 本章覆盖了主要的view技术,嗯嗯,可以与Spring结合的那些,并简明的说明了如何增加新的view技术. 本章假定你已经熟悉了Spring 4 官方文档学习(十一)Web MVC 框架之resolving views 解析视图 -- 它覆盖了views如何耦合到MVC框架

2DToolkit官方文档中文版打地鼠教程(二):设置摄像机

这是2DToolkit官方文档中 Whack a Mole 打地鼠教程的译文,为了减少文中过多重复操作的翻译,以及一些无必要的句子,这里我假设你有Unity的基础知识(例如了解如何新建Sprite等).当前2D Toolkit版本为2.4. 这是一篇系列教程,全文共13节(官方文档为4章,不过为了每节有明确目的,我根据官方文档的标题拆成了13节),下面是本系列教程的所有链接: 2DToolkit官方文档中文版打地鼠教程(一):初始设置 2DToolkit官方文档中文版打地鼠教程(二):设置摄像机

Jinja2学习笔记暨官方文档的翻译

http://blog.csdn.net/lgg201/article/details/4647471 呵呵, 刚刚看完Python模板引擎Jinja2的文档, 感觉很好, 觉得动态语言真是很好.  模板引擎竟然可以做的如此灵活....真是不错.... 下面直接把看文档过程的笔记发布出来, 呵呵, 基本上就是翻译, 加了不多的一点自己的解释......希望可以帮到大家 补充: 1. 在模板中设置自定义变量: {% set variable_name = value %} 比如设置{% set u