[spark]Spark Streaming教程

?

(一)官方入门示例

废话不说,先来个示例,有个感性认识再介绍。

这个示例来自spark自带的example,基本步骤如下:

(1)使用以下命令输入流消息:

$ nc -lk 9999

(2)在一个新的终端中运行NetworkWordCount,统计上面的词语数量并输出:

$ bin/run-example streaming.NetworkWordCount localhost 9999

(3)在第一步创建的输入流程中敲入一些内容,在第二步创建的终端中会看到统计结果,如:

第一个终端输入的内容:

hello world again

第二个端口的输出

-------------------------------------------
Time: 1436758706000 ms
-------------------------------------------
(again,1)
(hello,1)
(world,1)

简单解释一下,上面的示例通过手工敲入内容,并传给spark streaming统计单词数量,然后将结果打印出来。

附上代码:

package?org.apache.spark.examples.streaming

import?org.apache.spark.SparkConf
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
import?org.apache.spark.storage.StorageLevel

/**
?*?Counts?words?in?UTF8?encoded,?‘\n‘?delimited?text?received?from?the?network?every?second.
?*
?*?Usage:?NetworkWordCount
?*??and??describe?the?TCP?server?that?Spark?Streaming?would?connect?to?receive?data.
?*
?*?To?run?this?on?your?local?machine,?you?need?to?first?run?a?Netcat?server
?*????`$?nc?-lk?9999`
?*?and?then?run?the?example
?*????`$?bin/run-example?org.apache.spark.examples.streaming.NetworkWordCount?localhost?9999`
?*/
object?NetworkWordCount?{
??def?main(args:?Array[String])?{
????if?(args.length?<?2)?{
??????System.err.println("Usage:?NetworkWordCount??")
??????System.exit(1)
????}

????StreamingExamples.setStreamingLogLevels()

????//?Create?the?context?with?a?1?second?batch?size
????val?sparkConf?=?new?SparkConf().setAppName("NetworkWordCount")
????val?ssc?=?new?StreamingContext(sparkConf,?Seconds(1))

????//?Create?a?socket?stream?on?target?ip:port?and?count?the
????//?words?in?input?stream?of?\n?delimited?text?(eg.?generated?by?‘nc‘)
????//?Note?that?no?duplication?in?storage?level?only?for?running?locally.
????//?Replication?necessary?in?distributed?scenario?for?fault?tolerance.
????val?lines?=?ssc.socketTextStream(args(0),?args(1).toInt,?StorageLevel.MEMORY_AND_DISK_SER)
????val?words?=?lines.flatMap(_.split("?"))
????val?wordCounts?=?words.map(x?=>?(x,?1)).reduceByKey(_?+?_)
????wordCounts.print()
????ssc.start()
????ssc.awaitTermination()
??}
}

?

?

(二)Spark Streaming kafka示例

本示例使用java+maven来构建一个wordcount

1、创建项目,在pom.xml添加如下的依赖关系

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.4.0</version>
</dependency>
?
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>

?

2、写代码,此部分代码使用了官方的代码:

package?com.netease.gdc.kafkaStreaming;

import?java.util.Map;
import?java.util.HashMap;
import?java.util.regex.Pattern;

import?scala.Tuple2;
import?com.google.common.collect.Lists;
import?org.apache.spark.SparkConf;
import?org.apache.spark.api.java.function.FlatMapFunction;
import?org.apache.spark.api.java.function.Function;
import?org.apache.spark.api.java.function.Function2;
import?org.apache.spark.api.java.function.PairFunction;
import?org.apache.spark.streaming.Duration;
import?org.apache.spark.streaming.api.java.JavaDStream;
import?org.apache.spark.streaming.api.java.JavaPairDStream;
import?org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import?org.apache.spark.streaming.api.java.JavaStreamingContext;
import?org.apache.spark.streaming.kafka.KafkaUtils;

/**
?*?Consumes?messages?from?one?or?more?topics?in?Kafka?and?does?wordcount.
?*
?*?Usage:?JavaKafkaWordCount
?*?is?a?list?of?one?or?more?zookeeper?servers?that?make?quorum
?*?is?the?name?of?kafka?consumer?group
?*?is?a?list?of?one?or?more?kafka?topics?to?consume?from
?*is?the?number?of?threads?the?kafka?consumer?should?use
?*
?*?To?run?this?example:
?*???`$?bin/run-example?org.apache.spark.examples.streaming.JavaKafkaWordCount?zoo01,zoo02,??*????zoo03?my-consumer-group?topic1,topic2?1`
?*/

public?final?class?JavaKafkaWordCount?{
??private?static?final?Pattern?SPACE?=?Pattern.compile("?");

??private?JavaKafkaWordCount()?{
??}

??public?static?void?main(String[]?args)?{
????if?(args.length?<?4)?{
??????System.err.println("Usage:?JavaKafkaWordCount
");
??????System.exit(1);
????}

????SparkConf?sparkConf?=?new?SparkConf().setAppName("JavaKafkaWordCount");
????//?Create?the?context?with?a?1?second?batch?size
????JavaStreamingContext?jssc?=?new?JavaStreamingContext(sparkConf,?new?Duration(2000));

????int?numThreads?=?Integer.parseInt(args[3]);
????Map?topicMap?=?new?HashMap();
????String[]?topics?=?args[2].split(",");
????for?(String?topic:?topics)?{
??????topicMap.put(topic,?numThreads);
????}

????JavaPairReceiverInputDStream?messages?=
????????????KafkaUtils.createStream(jssc,?args[0],?args[1],?topicMap);

????JavaDStream?lines?=?messages.map(new?Function()?{
[email protected]
??????public?String?call(Tuple2?tuple2)?{
????????return?tuple2._2();
??????}
????});

????JavaDStream?words?=?lines.flatMap(new?FlatMapFunction()?{
[email protected]
??????public?Iterable?call(String?x)?{
????????return?Lists.newArrayList(SPACE.split(x));
??????}
????});

????JavaPairDStream?wordCounts?=?words.mapToPair(
??????new?PairFunction()?{
[email protected]
????????public?Tuple2?call(String?s)?{
??????????return?new?Tuple2(s,?1);
????????}
??????}).reduceByKey(new?Function2()?{
[email protected]
????????public?Integer?call(Integer?i1,?Integer?i2)?{
??????????return?i1?+?i2;
????????}
??????});

????wordCounts.print();
????jssc.start();
????jssc.awaitTermination();
??}
}

?

3、上传到服务器中然后编译

mvn?clean?package

4、提交job到spark中

?

/home/hadoop/spark/bin/spark-submit?--jars?../mylib/metrics-core-2.2.0.jar,../mylib/zkclient-0.3.jar,../mylib/spark-streaming-kafka_2.10-1.4.0.jar,../mylib/kafka-clients-0.8.2.1.jar,../mylib/kafka_2.10-0.8.2.1.jar??--class?com.netease.gdc.kafkaStreaming.JavaKafkaWordCount?--master?spark://192.168.16.102:7077??target/kafkaStreaming-0.0.1-SNAPSHOT.jar?192.168.172.111:2181/kafka?my-consumer-group?test?3

?

当然,前提是kafka集群已经正常运行,且存在test这个topic

?

5、验证

打开一个console producer,输入内容,然后观察wordcount的结果。

结果形式如下:

(hi,1)

  

(三)基本步骤

本部分介绍创建一个spark streaming应用的基本步骤

1、构建依赖关系,以maven为例,需要在pom.xml中添加以下内容

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>

?如果需要使用其它数据源,则还需要将相应的依赖关系放入pom.xml。

如使用kafka作为数据源:

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_2.10</artifactId>

<version>1.4.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka_2.10</artifactId>

<version>1.4.0</version>

</dependency>

?

当然,spark的核心包也要包含:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>

?

?

?

?

?

?

时间: 2024-11-05 17:24:27

[spark]Spark Streaming教程的相关文章

Spark Streaming教程

废话不说,先来个示例,有个感性认识再介绍. 这个示例来自spark自带的example,基本步骤如下: (1)使用以下命令输入流消息: $ nc -lk 9999 (2)在一个新的终端中运行NetworkWordCount,统计上面的词语数量并输出: $ bin/run-example streaming.NetworkWordCount localhost 9999 (3)在第一步创建的输入流程中敲入一些内容,在第二步创建的终端中会看到统计结果,如: 第一个终端输入的内容: hello wor

Spark Structured Streaming框架(3)之数据输出源详解

Spark Structured streaming API支持的输出源有:Console.Memory.File和Foreach.其中Console在前两篇博文中已有详述,而Memory使用非常简单.本文着重介绍File和Foreach两种方式,并介绍如何在源码基本扩展新的输出方式. 1. File Structured Streaming支持将数据以File形式保存起来,其中支持的文件格式有四种:json.text.csv和parquet.其使用方式也非常简单只需设置checkpointLo

Spark Structured Streaming框架(2)之数据输入源详解

Spark Structured Streaming目前的2.1.0版本只支持输入源:File.kafka和socket. 1. Socket Socket方式是最简单的数据输入源,如Quick example所示的程序,就是使用的这种方式.用户只需要指定"socket"形式并配置监听的IP和Port即可. val scoketDF = spark.readStream .format("socket") .option("host","

Spark&amp;Spark性能调优实战

Spark特别适用于多次操作特定的数据,分mem-only和mem & disk.其中mem-only:效率高,但占用大量的内存,成本很高;mem & disk:内存用完后,会自动向磁盘迁移,解决了内存不足的问题,却带来了数据的置换的消费.Spark常见的调优工具有nman.Jmeter和Jprofile,以下是Spark调优的一个实例分析: 1.场景:精确客户群 对一个容量为300g的客户信息表在spark上进行查询优化,该大宽表有1800多列,有效使用的有20列. 2.优化达到的效果:

Spark之Streaming

1. socket消息发送 import java.net.ServerSocket import java.io.PrintWriter import scala.collection.mutable.ListBuffer import java.util.Random /** * Created by zzy on 8/28/15. */ /** * 模拟socket消息发送 */ object SparkSoketSender { def main(args: Array[String])

Spark Structured Streaming框架(4)之窗口管理详解

1. 结构 1.1 概述 Structured Streaming组件滑动窗口功能由三个参数决定其功能:窗口时间.滑动步长和触发时间. 窗口时间:是指确定数据操作的长度: 滑动步长:是指窗口每次向前移动的时间长度: 触发时间:是指Structured Streaming将数据写入外部DataStreamWriter的时间间隔. 图 11 1.2 API 用户管理Structured Streaming的窗口功能,可以分为两步完成: 1) 定义窗口和滑动步长 API是通过一个全局的window方法

Spark Structured Streaming框架(5)之进程管理

Structured Streaming提供一些API来管理Streaming对象.用户可以通过这些API来手动管理已经启动的Streaming,保证在系统中的Streaming有序执行. 1. StreamingQuery 在调用DataStreamWriter方法的start启动Streaming后,会返回一个StreamingQuery对象.所以用户就可以通过这个对象来管理Streaming. 如下所示: val query = df.writeStream.format("console

&lt;Spark&gt;&lt;Spark Streaming&gt;&lt;作业分析&gt;

Intro 这篇是对一个Spark (Streaming)作业的log进行分析.用来加深对Spark application运行过程,优化空间的各种理解. Here to Start 从我这个初学者写得一个Spark Streaming程序开始... package com.wttttt.spark /** * Created with IntelliJ IDEA. * Description: * Author: wttttt * Github: https://github.com/wttt

&lt;Spark&gt;&lt;Spark Streaming&gt;

Overview Spark Streaming为用户提供了一套与batch jobs十分相似的API,以编写streaming应用 与Spark的基本概念RDDs类似,Spark Streaming提供了被称为DStreams/discretized streams的抽象. DStream is a sequence of data arriving over time. 其本质是,每个DStream被表示成来自每个时间阶段的RDDs的序列,因此被称为离散的. DStreams可以从各种输入数