Spark之SparkStreaming案例

一、Spark Streaming的介绍

??Spark Streaming是Spark 核心API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从诸如Kafka,Flume,Kinesis或TCP套接字的许多来源中获取,并且可以使用由高级功能(如map,reduce,join和window)表达的复杂算法进行处理。 最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。 事实上,您可以在数据流上应用Spark的机器学习和图形处理算法。

??在内部,它的工作原理如下。 Spark Streaming接收实时输入数据流,并将数据分成批,然后由Spark引擎对其进行处理,以批量生成最终的结果流。

??Spark Streaming提供称为离散流或DStream的高级抽象,它表示连续的数据流。 可以从诸如Kafka,Flume和Kinesis等来源的输入数据流中创建DStream,或者通过对其他DStream应用高级操作来创建。 在内部,DStream表示为一系列RDD。

??本指南介绍如何开始使用DStreams编写Spark Streaming程序。 您可以在Scala,Java或Python(在Spark 1.2中引入)中编写Spark Streaming程序。

二、DStream

??DStream是一个抽象的概念, 表示一系列的RDD

三、简单案例

3.1、SparkStream从一个监听端口读取数据流

??在我们详细介绍如何编写自己的Spark Streaming程序之前,我们来看一下简单的Spark Streaming程序。 假设我们要计数从在TCP套接字上侦听的数据服务器接收的文本数据中的单词数。 所有你需要做的是如下。

3.1.1、首先,我们创建一个JavaStreamingContext对象,它是所有流功能的主要入口点。 我们创建一个带有两个执行线程的本地StreamingContext,并且间隔为1秒。

        //使用两个工作线程和1秒的批量间隔创建本地StreamingContext
        SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
        // 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
        // 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
        // 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));

3.1.2、使用此JavaStreamingContext,创建一个DStream,它表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如9999)。

        //首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
        //此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
        JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);

3.1.3、这个lines的DStream表示将从数据服务器接收的数据流。 此流中的每条记录都是一行文本。 然后,我们要将空格划分为单词。

        //使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

??flatMap是一个DStream操作,通过从源DStream中的每个记录生成多个新记录来创建新的DStream(line>words的操作)。 在这种情况下,每一行将被分割成多个单词,并将单词流表示为单词DStream。

请注意: 我们使用FlatMapFunction对象定义了转换。 我们可以发现,Java API中有许多这样的便利类可以帮助定义DStream转换。

3.1.4、统计单词数

        //Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //将此DStream中生成的每个RDD的前十个元素打印到控制台
        wcs.print();

The words DStream is further mapped (one-to-one transformation) to a DStream of
(word, 1) pairs, using a PairFunction object. Then, it is reduced to get the
frequency of words in each batch of data, using a Function2 object. Finally,
wcs.print() will print a few of the counts generated every second.

使用PairFunction对象将词wordsDStream进一步映射(one-to-one transformation)到(word,1)pairs的DStream。 然后,使用Function2对象减少每批数据中的单词的频率。 最后,wcs.print()将打印每秒产生的几个计数。

3.1.5、Note that when these lines are executed, SparkStreaming only sets up the computation it will perform after it is started,
and no real processing has started yet.
To start the processing after all the transformations have been setup,
we finally call start method.

jsc.start();              // Start the computation
jsc.awaitTermination();   // Wait for the computation to terminate

3.1.6、您可以运行此示例如下。 您将首先需要运行Netcat(大多数类Unix系统中的一个小型实用程序)作为数据服务器

yum install nc
nc -lk 9999

在控制台写入数据

运行结果

java完整代码

package com.chb.spark.streaming;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
        // 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
        // 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
        // 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));

        //首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
        //此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
        JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);

        //使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        // 最后每次计算完,都打印一下这10秒钟的单词计数情况,并休眠5秒钟,以便于我们测试和观察
        wcs.print();

        jsc.start();
        jsc.awaitTermination();
        jsc.close();

    }
}

scala完整代码

package com.chb.scala

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Counts words in UTF8 encoded, ‘\n‘ delimited text received from the network every second.
 *
 * Usage: NetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
    def main(args: Array[String]) {
        if (args.length < 2) {
            System.err.println("Usage: NetworkWordCount <hostname> <port>")
            System.exit(1)
        }

        // Create the context with a 1 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(1))

        // Create a socket stream on target ip:port and count the
        // words in input stream of \n delimited text (eg. generated by ‘nc‘)
        // Note that no duplication in storage level only for running locally.
        // Replication necessary in distributed scenario for fault tolerance.
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

四、从hdfs中读取数据

   JavaDStream<String> lines = jsc.textFileStream("hdfs://192.168.1.224:9000/user/root/");
时间: 2024-12-11 08:29:52

Spark之SparkStreaming案例的相关文章

第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密

一:Spark集群开发环境准备 启动HDFS,如下图所示: 通过web端查看节点正常启动,如下图所示: 2.启动Spark集群,如下图所示: 通过web端查看集群启动正常,如下图所示: 3.启动start-history-server.sh,如下图所示: 二:HDFS的SparkStreaming案例实战(代码部分) package com.dt.spark.SparkApps.sparkstreaming; import org.apache.spark.SparkConf; import o

第93课:Spark Streaming updateStateByKey案例实战和内幕源码解密

本节课程主要分二个部分: 一.Spark Streaming updateStateByKey案例实战二.Spark Streaming updateStateByKey源码解密 第一部分: updateStateByKey的主要功能是随着时间的流逝,在Spark Streaming中可以为每一个可以通过CheckPoint来维护一份state状态,通过更新函数对该key的状态不断更新:对每一个新批次的数据(batch)而言,Spark Streaming通过使用updateStateByKey

第85讲:基于HDFS的SparkStreaming案例实战和内幕源码解密

一:Spark集群开发环境准备 启动HDFS,如下图所示: 通过web端查看节点正常启动,如下图所示: 2.启动Spark集群,如下图所示: 通过web端查看集群启动正常,如下图所示: 3.启动start-history-server.sh,如下图所示: 二:HDFS的SparkStreaming案例实战(代码部分) package com.dt.spark.SparkApps.sparkstreaming; import org.apache.spark.SparkConf; import o

第93讲:Spark Streaming updateStateByKey案例实战和内幕源码

本节课程主要分二个部分: 一.Spark Streaming updateStateByKey案例实战 二.Spark Streaming updateStateByKey源码解密 第一部分: updateStateByKey它的主要功能是随着时间的流逝,在Spark Streaming中可以为每一个key可以通过CheckPoint来维护一份state状态,通过更新函数对该key的状态不断更新:在更新的时候,对每一个新批次的数据(batch)而言,Spark Streaming通过使用upda

第87课:Flume推送数据到SparkStreaming案例实战和内幕源码解密

本期内容: 1. Flume on HDFS案例回顾 2. Flume推送数据到Spark Streaming实战 3. 原理绘图剖析 1. Flume on HDFS案例回顾 上节课要求大家自己安装配置Flume,并且测试数据的传输.我昨天是要求传送的HDFS上. 文件配置: ~/.bashrc: export FLUME_HOME=/usr/local/flume/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf PA

基于spark和sparkstreaming的word2vec

概述 Word2vec是一款由谷歌发布开源的自然语言处理算法,其目的是把words转换成vectors,从而可以用数学的方法来分析words之间的关系.Spark其该算法进行了封装,并在mllib中实现. 整体流程是spark离线训练模型,可以是1小时1训练也可以1天1训练,根据具体业务来判断,sparkstreaming在线分析. 由于历史问题,spark还在用1.5.0,接口上和2.1还是有点区别,大概看了下文档,流程上差不多 spark离线训练 如下代码,通过word2vec训练出一个模型

Spark SQL入门案例之人力资源系统数据处理

通过该案例,给出一个比较完整的.复杂的数据处理案例,同时给出案例的详细解析. 人力资源系统的管理内容组织结构图 1) 人力资源系统的数据库与表的构建. 2) 人力资源系统的数据的加载. 3) 人力资源系统的数据的查询. 职工基本信息 职工姓名,职工id,职工性别,职工年龄,入职年份,职位,所在部门id Michael,1,male,37,2001,developer,2Andy,2,female,33,2003,manager,1Justin,3,female,23,2013,recruitin

第87讲:Flume推送数据到SparkStreaming案例实战和内幕源码解密

本期内容: 1. Flume on HDFS案例回顾 2. Flume推送数据到Spark Streaming实战 3. 原理绘图剖析 1. Flume on HDFS案例回顾 上节课要求大家自己安装配置Flume,并且测试数据的传输.我昨天是要求传送的HDFS上. 文件配置: ~/.bashrc: export FLUME_HOME=/usr/local/flume/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf PA

spark练习——影评案例

第一次写博客,新人上路,欢迎大家多多指教!!! ---------------------------------------------------------------------分割线--------------------------------------------------------------------- 现有如此三份数据:1.users.dat 数据格式为: 2::M::56::16::70072对应字段为:UserID BigInt, Gender String, A