Spark Dstream 创建

第 3 章 Dstream 创建

  Spark Streaming 原生支持一些不同的数据源。一些“核心”数据源已经被打包到 Spark

Streaming 的 Maven 工件中,而其他的一些则可以通过 spark-streaming-kafka 等附加工件获取。

每个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,因此会占据分配给应用

的 CPU 核心。此外,我们还需要有可用的 CPU 核心来处理数据。这意味着如果要运行多个接

收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所需要的核心数。例如,

如果我们想要在流计算应用中运行 10 个接收器,那么至少需要为应用分配 11 个 CPU 核心。

所以如果在本地模式运行,不要使用 local 或者 local[1]。

3.1 文件数据源

3.1.1 用法及说明

  文件数据流:能够读取所有 HDFS API 兼容的文件系统文件,通过 fileStream 方法进行读取,

Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目

录。

streamingContext.textFileStream(dataDirectory)

注意事项:

  1)文件需要有相同的数据格式;

  2)文件进入 dataDirectory 的方式需要通过移动或者重命名来实现;

  3)一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;

3.1.2 案例实操

(1)在 HDFS 上建好目录

[[email protected] spark]$ hadoop fs -mkdir /fileStream

(2)在/opt/module/data 创建三个文件

[[email protected] data]$ touch a.tsv
[[email protected] data]$ touch b.tsv
[[email protected] data]$ touch c.tsv
添加如下数据:
Helloatguigu
Hellospark

(3)编写代码

package com.lxl
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
object FileStream {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    val sparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("StreamWordCount")
    //2.初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    //3.监控文件夹创建 DStream
    val dirStream = ssc.textFileStream("hdfs://hadoop102:9000/fileStream")
    //4.将每一行数据做切分,形成一个个单词
    val wordStreams = dirStream.flatMap(_.split("\t"))
    //5.将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))
    //6.将相同的单词次数做统计
    val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
    //7.打印
    wordAndCountStreams.print()
    //8.启动 SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}

(4)启动程序并向 fileStream 目录上传文件

[[email protected] data]$ hadoop fs -put ./a.tsv /fileStream
[[email protected] data]$ hadoop fs -put ./b.tsv /fileStream
[[email protected] data]$ hadoop fs -put ./c.tsv /fileStream

(5)获取计算结果

-------------------------------------------
Time: 1539073810000 ms
-------------------------------------------
-------------------------------------------
Time: 1539073815000 ms
-------------------------------------------
(Hello,4)
(spark,2)
(atguigu,2)
-------------------------------------------
Time: 1539073820000 ms
-------------------------------------------
(Hello,2)
(spark,1)
(atguigu,1)
-------------------------------------------
Time: 1539073825000 ms
-------------------------------------------

3.2 RDD 队列

3.2.1 用法及说明

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到

这个队列中的 RDD,都会作为一个 DStream 处理。

3.2.2 案例实操

1)需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount

2)编写代码

package com.atguigu
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object RDDStream {
  def main(args: Array[String]) {
    //1.初始化 Spark 配置信息
    val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
    //2.初始化 SparkStreamingContext
    val ssc = new StreamingContext(conf, Seconds(4))
    //3.创建 RDD 队列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    //4.创建 QueueInputDStream
    val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
    //5.处理队列中的 RDD 数据
    val mappedStream = inputStream.map((_,1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    //6.打印结果
    reducedStream.print()
    //7.启动任务
    ssc.start()
    //8.循环创建并向 RDD 队列中放入 RDD
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }
    ssc.awaitTermination()
  }
}

3)结果展示

-------------------------------------------
Time: 1539075280000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)
-------------------------------------------
Time: 1539075284000 ms
-------------------------------------------
(4,60)
(0,60)
(6,60)
(8,60)
(2,60)
(1,60)
(3,60)
(7,60)
(9,60)
(5,60)
-------------------------------------------
Time: 1539075288000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)
-------------------------------------------
Time: 1539075292000 ms
-------------------------------------------

3.3 自定义数据源

3.3.1 用法及说明

需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

3.3.2 案例实操

1)需求:自定义数据源,实现监控某个端口号,获取该端口号内容。

2)自定义数据源

package com.lxl
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
class CustomerReceiver(host: String, port: Int) extends
  Receiver[String](StorageLevel.MEMORY_ONLY) {
  //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark
  override def onStart(): Unit = {
    new Thread("Socket Receiver") {
      override def run() {
        receive()
      }
    }.start()
  }
  //读数据并将数据发送给 Spark
  def receive(): Unit = {
    //创建一个 Socket
    var socket: Socket = new Socket(host, port)
    //定义一个变量,用来接收端口传过来的数据
    var input: String = null
    //创建一个 BufferedReader 用于读取端口传来的数据
    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,
      StandardCharsets.UTF_8))
    //读取数据
    input = reader.readLine()
    //当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark
    while (!isStopped() && input != null) {
      store(input)
      input = reader.readLine()
    }
    //跳出循环则关闭资源
    reader.close()
    socket.close()
    //重启任务
    restart("restart")
  }
  override def onStop(): Unit = {}
}
 

3)使用自定义的数据源采集数据

package com.atguigu
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
object FileStream {
  def main(args: Array[String]): Unit = {
    //1.初始化 Spark 配置信息
    Val sparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("StreamWordCount")
    //2.初始化 SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    //3.创建自定义 receiver 的 Streaming
    val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))
    //4.将每一行数据做切分,形成一个个单词
    val wordStreams = lineStream.flatMap(_.split("\t"))
    //5.将单词映射成元组(word,1)
    val wordAndOneStreams = wordStreams.map((_, 1))
    //6.将相同的单词次数做统计
    val wordAndCount = wordAndOneStreams.reduceByKey(_ + _)
    //7.打印
    wordAndCountStreams.print()
    //8.启动 SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }
}

3.4 Kafka 数据源

3.4.1 用法及说明

在工程中需要引入 Maven 工件 spark- streaming-kafka_2.10 来使用它。包内提供的

KafkaUtils 对象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息创建出

DStream。由于 KafkaUtils 可以订阅多个主题,因此它创建出的 DStream 由成对的主题和消息

组成。要创建出一个流数据,需要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper

主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数

的映射表来调用 createStream() 方法。

3.4.2 案例实操

1)需求 1:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计

算(WordCount),最终打印到控制台。

(1)导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.11</artifactId>
  <version>1.6.3</version>
</dependency>

(2)编写代码

package com.lxl
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaSparkStreaming {
  def main(args: Array[String]): Unit = {
    //1.创建 SparkConf 并初始化 SSC
    val sparkConf: SparkConf = new
        SparkConf().setMaster("local[*]").setAppName("KafkaSparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    //2.定义 kafka 参数
    val zookeeper = "hadoop102:2181,hadoop103:2181,hadoop104:2181"
    val topic = "source"
    val consumerGroup = "spark"
    //3.将 kafka 参数映射为 map
    val kafkaParam: Map[String, String] = Map[String, String](
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
        "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
        "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.GROUP_ID_CONFIG -> consumerGroup,
      "zookeeper.connect" -> zookeeper
    )
    //4.通过 KafkaUtil 创建 kafkaDSteam
    val kafkaDSteam: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
      ssc,
      kafkaParam,
      Map[String, Int](topic -> 3),
      StorageLevel.MEMORY_ONLY
    )
    //5.对 kafkaDSteam 做计算(WordCount)
    kafkaDSteam.foreachRDD {
      rdd => {
        val word: RDD[String] = rdd.flatMap(_._2.split(" "))
        val wordAndOne: RDD[(String, Int)] = word.map((_, 1))
        val wordAndCount: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
        wordAndCount.collect().foreach(println)
      }
    }
    //6.启动 SparkStreaming
    ssc.start()
    ssc.awaitTermination()
  }
}

原文地址:https://www.cnblogs.com/LXL616/p/11159239.html

时间: 2024-10-12 03:35:45

Spark Dstream 创建的相关文章

spark sql 创建dataframes

Table of Contents 1. spark sql 2. SQLContext 2.1. sql context是spark sql的所有功能入口点 2.2. 通过spark context创建sql context 2.3. hive context功能上多于sql context,未来sql context也会增加功能 3. DataFrames 3.1. 功能 3.2. 创建DataFrames 3.3. DSL 1 spark sql 是spark的一个模块 处理结构化数据 提

Spark DStream 转换

第 4 章 DStream 转换 DStream 上的原语与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输 出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey().transform()以及 各种 Window 相关的原语. 4.1 无状态转化操作 无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的 每一个 RDD.部分无状态转化操作列在了下表中.注意,针对键值

Java接入Spark之创建RDD的两种方式和操作RDD

首先看看思维导图,我的spark是1.6.1版本,jdk是1.7版本 spark是什么? Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark 部署在大量廉价硬件之上,形成集群. 下载和安装 可以看我之前发表的博客 Spark安装 安装成功后运行示例程序 在spark安装目录下examples/src/main目录中. 运行的一个Java或Scala示例程序,使用bin/run-examp

Spark 基础 —— 创建 DataFrame 的三种方式

1.自定义 schema(Rdd[Row] => DataSet[Row]) import org.apache.spark.sql.types._ val peopleRDD = spark.sparkContext.textFile("README.md") val schemaString = "name age" val fields = schemaString.split(" ") .map(fieldName => St

Spark修炼之道(高级篇)——Spark源码阅读:第二节 SparkContext的创建

博文推荐:http://blog.csdn.net/anzhsoft/article/details/39268963,由大神张安站写的Spark架构原理,使用Spark版本为1.2,本文以Spark 1.5.0为蓝本,介绍Spark应用程序的执行流程. 本文及后面的源码分析都以下列代码为样板 import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount{ def main(args: Array[String])

spark源码阅读笔记RDD(七) RDD的创建、读取和保存

Spark支持很多输入和输出源,同时还支持内建RDD.Spark本身是基于Hadoop的生态圈,它可以通过 Hadoop MapReduce所使用的InpoutFormat和OutputFormat接口访问数据.而且大部分的文件格式和存储系统 (HDFS,Hbase,S3等)都支持这种接口.Spark常见的数据源如下: (1) 文件格式和文件系统,也就是我们经常用的TXT,JSON,CSV等這些文件格式 (2)SparkSQL中的结构化数据源 (3)数据库与键值存储(Hbase和JDBC源) 当

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs

spark streaming 对接kafka记录

spark streaming 对接kafka 有两种方式: 参考: http://group.jobbole.com/15559/ http://blog.csdn.net/kwu_ganymede/article/details/50314901 Approach 1: Receiver-based Approach 基于receiver的方案: 这种方式使用Receiver来获取数据.Receiver是使用Kafka的高层次Consumer API来实现的.receiver从Kafka中获

Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析.然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索).最后两节将开始通过命令行与Spark进行交互,然后演示如何用Python写Spark应用,并作为Spark作业提交到集群上.同时也会提供相应的 Scala 版本. 1.设置Spark环境 在本机设置和运行Spark非常简单.你只需要下载一个预构建的包,只要你安装了Java 6+和Python 2.6+,就可以在Windows.Mac O