Spark Streaming编程示例

近期也有开始研究使用spark streaming来实现流式处理。本文以流式计算word count为例,简单描述如何进行spark streaming编程。

1. 依赖的jar包

参考《分别用Eclipse和IDEA搭建Scala+Spark开发环境》一文,pom.xml中指定依赖库spark-streaming_2.10.jar。

<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!-- Spark -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>1.1.0</version>
    </dependency>
    <!-- HDFS -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    </dependency>

2. WordCount代码示例

监听socket端口,每5秒统计一次收到的文本的单词数量,并输出到屏幕。

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

/**
 * Spark Streaming示例,统计输入中所有单词出现的次数
 *
 */
object StreamingWordCount {
    def main(args: Array[String]) {
        if (args.length < 2) {
            System.err.println("Usage: NetworkWordCount <hostname> <port>")
            System.exit(1)
        }

        // Create the context with a 5 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(5))

        // Create a socket stream on target ip:port and count the
        // words in input stream of \n delimited text
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

3. 提交任务和监听集群

socketTextStream是从监听service的socket端口。

(1)Job如何提交:
$SPARK_HOME/bin/spark-submit --name StreamingDemo --class StreamingWordCount ./sparktest-1.0-SNAPSHOT.jar localhost 1234

(2)监听socket端口:

nc -lk 1234

时间: 2024-12-20 17:03:55

Spark Streaming编程示例的相关文章

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Spark Streaming编程指南

本文基于Spark Streaming Programming Guide原文翻译, 加上一些自己的理解和小实验的结果. 一.概述 Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具有容错能力的用于处理实时数据流的一个组件.Spark Streaming可以接收各种数据源传递来的数据,比如Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可以使用一些用高阶函数(比如map, reduce, join及window)进行封装的复杂算法做进

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

SIX Spark Streaming 编程初级实践

Flume 官网下载 Flume1.7.0 安装文件,下载地址如下: http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz 下载后,把 Flume1.7.0 安装到 Linux 系统的“/usr/local/flume”目录下, ⑴解压安装包 1.cd ~/下载 2.sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local # 将 a

Spark(十) -- Spark Streaming API编程

本文测试的Spark版本是1.3.1 Spark Streaming编程模型: 第一步: 需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数: 1.SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息 2.Seconds对象:该对象设置了StreamingContext多久读取一次数据流 第二步: 构建好入口对象之后,直接调用该入口的方法读取各

Storm介绍及与Spark Streaming对比

1 Storm介绍 Storm是由Twitter开源的分布式.高容错的实时处理系统,它的出现令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求.Storm常用于在实时分析.在线机器学习.持续计算.分布式远程调用和ETL等领域. 在Storm的集群里面有两种节点:控制节点(Master Node)和工作节点(Worker Node).控制节点上面运行一个名为Nimbus的进程,它用于资源分配和状态监控:每个工作节点上面运行一个Supervisor的进程,它会监听分配给它所在机

Spark Streaming:大规模流式数据处理的新贵(转)

原文链接:Spark Streaming:大规模流式数据处理的新贵 摘要:Spark Streaming是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理作业.本文阐释了Spark Streaming的架构及编程模型,并结合实践对其核心技术进行了深入的剖析,给出了具体的应用场景及优化方案. 提到Spark Streaming,我们不得不说一下BDAS(Berkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈.从它的视角来看,目前的大数据处

[spark]Spark Streaming教程

? (一)官方入门示例 废话不说,先来个示例,有个感性认识再介绍. 这个示例来自spark自带的example,基本步骤如下: (1)使用以下命令输入流消息: $ nc -lk 9999 (2)在一个新的终端中运行NetworkWordCount,统计上面的词语数量并输出: $ bin/run-example streaming.NetworkWordCount localhost 9999 (3)在第一步创建的输入流程中敲入一些内容,在第二步创建的终端中会看到统计结果,如: 第一个终端输入的内

Storm与Spark Streaming比较

前言spark与hadoop的比较我就不多说了,除了对硬件的要求稍高,spark应该是完胜hadoop(Map/Reduce)的.storm与spark都可以用于流计算,但storm对应的场景是毫秒级的统计与计算,而spark(stream)对应的是秒级的.这是主要的差别.一般很少有对实时要求那么高的场景(哪怕是在电信领域),如果统计与计算的周期是秒级的话,spark的性能是要优于storm的. Storm风暴和Spark Streaming火花流都是分布式流处理的开源框架.这里将它们进行比较并