spark streaming测试之一使用本地数据源

直接上代码吧

说下测试思路:

该代码监控的/tmp/sparkStream/目录;

首先,创建该目录mkdir -p /tmp/sparkStream;

然后,运行spark程序;

最后,向监控目录/tmp/sparkStream/添加数据文件;

观察spark程序运行效果。

sparkStreaming

import org.apache.log4j.{LoggerLevel}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{SecondsStreamingContext}
import org.apache.spark.streaming.StreamingContext._

object HdfsWordCount {
  def main(args: Array[]){
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

    sparkConf = SparkConf().setAppName().setMaster()
    ssc = StreamingContext(sparkConf())

    lines = ssc.textFileStream()
    words = lines.flatMap(_.split())
    wordCounts = words.map(x=>(x)).reduceByKey(_+_)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
时间: 2024-12-09 12:40:15

spark streaming测试之一使用本地数据源的相关文章

spark streaming测试之二使用网络数据源

测试思路: 首先,创建网络数据源数据发送器(程序一): 其次,创建spark接收数据程序(程序二): 接着,将程序一打包,放在服务器上执行.这里有三个参数分别是:所要发送的数据文件,通过哪个端口号发送,每隔多少毫秒发送一次数据: 最后,运行spark程序,这里每隔5秒处理一次数据.有两个参数:监听的端口号,每隔多少毫秒接收一次数据. 观察效果. 程序一: sparkStreaming import java.io.PrintWriter import java.net.ServerSocket

spark streaming测试之四设置窗口大小接收数据

测试思路: 首先,使用网络数据发送程序发送数据: 然后,运行spark程序: 观察效果. 说明: 1. 这里也需要设置检查点目录 2. 这里有四个参数: 前两个分别是监听的端口和每隔多少毫秒接收一次数据: 第三个参数是接收前多少毫秒的数据:(详细请参见window具体含义) 第四个参数是每隔多少毫秒接收一次数据. sparkStreaming import org.apache.log4j.{LoggerLevel} import org.apache.spark.storage.Storage

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP

7.Spark Streaming(上)--Spark Streaming原理介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

Spark Streaming原理简析

执行流程 数据的接收 StreamingContext实例化的时候,需要传入一个SparkContext,然后指定要连接的spark matser url,即连接一个spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如 val lines = ssc.socketTextStream("localhost", 9999) 1 这样从socket接收文本数据.这个步骤返回的是一个ReceiverInputDStream的实现,内含Recei

Spark Streaming实时流处理项目实战

第1章 课程介绍   1-1 -导学-   1-2 -授课习惯和学习建议   1-3 -OOTB环境使用演示   1-4 -Linux环境及软件版本介绍   1-5 -Spark版本升级第2章 初识实时流处理   2-1 -课程目录   2-2 -业务现状分析   2-3 -实时流处理产生背景   2-4 -实时流处理概述   2-5 -离线计算和实时计算对比   2-6 -实时流处理框架对比   2-7 -实时流处理架构及技术选型   2-8 -实时流处理在企业中的应用第3章 分布式日志收集框