spark streaming 实例

spark streaming 开发实例

本文将分以下几部分

  • spark 开发环境配置
  • 创建spark项目
  • 编写streaming代码示例
  • 调试

环境配置:

spark 原生语言是scala, 我用的是spark-1.4.1-bin-hadoop2.6,可以查阅官方说明,用的是scala-2.10.1。

网上下载 scala-2.10.1 安装包。解压即可。

配置环境变量:SCALA_HOME

path 增加 %SCALA_HOME%\bin

创建项目:

我使用的Ide 是Intellj idea  ,为了提供scala 支持,还要先安装Scala 插件。

插件安装后,新建一个 project ,选择scala

选择安装的 scala SDK 路径

add maven 支持

再新建一个maven module

编辑Pom文件

父级Pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.rihai.spark</groupId>
    <artifactId>spark-streaming</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>hdfs-streaming</module>
    </modules>

    <properties>
        <scala.version>2.10.1</scala.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.4.1</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

子级Pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spark-streaming</artifactId>
        <groupId>com.rihai.spark</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hdfs-streaming</artifactId>

</project>

pom 说明:

maven编译scala,需要一个专门的maven-scala-plugin。该插件可以编译java 和 scala。

项目结构图:

然后在main文件夹下,建一个scala文件夹,File->project structure  设置scala文件夹为 sources 文件类型。

一个spark 项目就创建好了。

streaming示例:

spark streaming 简单理解就是分批次处理数据源的数据,然后输出到外部数据源。最大优点是可以做到秒级实时处理。
streaming 数据源可以为:文件系统(hdfs),kafka,flume等。输出方式有:文件系统(hdfs),databases等。
上两个官方的图来直接说明问题:

分批次处理:

在本例中以 文件系统 作为数据源,实时处理用户访问的日志数据。
假设日志格式如下:
用户ID,页面ID,数值(暂无意义),访问时间
U86942038,P68658056,1,2016-09-27 15:17:01:137
U25452627,P27395813,5,2016-09-27 15:19:43:901

数据源目录的日志会不断更新,streaming 程序会定时处理该目录更新的日志数据。
处理需求如下:

  • 每隔20s统计出该批次 用户-页面访问数据
  • 利用窗口计算:每40s统计出前40s 用户-页面访问数据
  • 利用updateStateByKey:统计出所有批次 用户-页面访问数据

创建一个scala class ,如图:

streaming代码:

  1 package com.rihai.spark.hdfs
  2
  3
  4 import org.apache.spark._
  5 import org.apache.spark.streaming._
  6 import scala.collection.mutable
  7 import scala.collection.mutable.ListBuffer
  8
  9 /**
 10   * Created by rihaizhang on 2016/9/26.
 11   */
 12 object PageLoggingStreaming {
 13
 14   def createContext(appName: String, timeUnit: Duration, checkpointPath: String, dataPath: String): StreamingContext = {
 15
 16     println("Creating new context !")
 17     val conf = new SparkConf().setAppName(appName)
 18     val ssc = new StreamingContext(conf, timeUnit)
 19     ssc.checkpoint(checkpointPath)
 20
 21     //e.g.
 22     //U86942038,P68658056,1,2016-09-27 15:17:01:137
 23
 24     val lines = ssc.textFileStream(dataPath)
 25
 26     val page_user = lines.map(line => {
 27
 28       val strs = line.split(",")
 29
 30       (strs(1), strs(0))
 31
 32     })
 33
 34     //单次page-user
 35     val p_u = page_user.groupByKey().flatMap(s => {
 36
 37       val set = new mutable.HashSet[String]()
 38
 39       for (user <- s._2) {
 40         set.+=(user)
 41       }
 42
 43       val listBuffer = new ListBuffer[(String, String)]
 44
 45       for (elem <- set) {
 46         listBuffer.+=((s._1, elem))
 47       }
 48       listBuffer.toTraversable
 49     })
 50     //p_u.persist(StorageLevel.MEMORY_ONLY)
 51
 52     // window page-user
 53     val wp_u = p_u.window(timeUnit * 2, timeUnit * 2).groupByKey().flatMap(s => {
 54
 55       val set = new scala.collection.mutable.HashSet[String]()
 56
 57       for (user <- s._2) {
 58         set.+=(user)
 59       }
 60
 61       val listBuffer = new ListBuffer[(String, String)]
 62
 63       for (elem <- set) {
 64         listBuffer.+=((s._1, elem))
 65       }
 66       listBuffer.toTraversable
 67     })
 68
 69     val updateFun = (newValues: Seq[Iterable[String]], prevValues: Option[Iterable[String]]) => {
 70
 71       val set = new scala.collection.mutable.HashSet[String]()
 72
 73       for (user <- prevValues.getOrElse(Iterable[String]())) {
 74         set.+=(user)
 75       }
 76       for (value <- newValues) {
 77         for (user <- value) {
 78           set.+=(user)
 79         }
 80       }
 81       Some(set.toIterable)
 82     }
 83
 84     // updateState page-user
 85     val sp_u = p_u.groupByKey().updateStateByKey[Iterable[String]](updateFun).flatMap(s => {
 86
 87       val listBuffer = new ListBuffer[(String, String)]
 88       for (elem <- s._2) {
 89         listBuffer.+=((s._1, elem))
 90       }
 91       listBuffer.toTraversable
 92     })
 93
 94     sp_u.checkpoint(timeUnit * 8)
 95
 96     //print
 97     p_u.print()
 98     wp_u.print()
 99     sp_u.print()
100
101     ssc
102   }
103
104   def main(args: Array[String]): Unit = {
105
106     if (args.length < 2) {
107       System.err.println("Your arguments error !")
108       System.exit(1)
109     }
110
111     val time_unit = Seconds(20)
112     val checkpointPath = args(0)
113     val dataPath = args(1)
114
115     val ssc = StreamingContext.getOrCreate(checkpointPath, () => createContext("page.logging.streaming", time_unit, checkpointPath, dataPath))
116    // val ssc = createContext("page.logging.streaming", time_unit, checkpointPath, dataPath)
117     ssc.start()
118
119     for (i <- 1 to 10) {
120       println("loop-" + i)
121       Thread.sleep(1000 * 20)
122     }
123
124     ssc.stop(true, true)
125     //ssc.awaitTermination()
126
127     System.exit(0)
128   }
129
130
131 }

调试:

先用本地目录来调试

配置run configuration ,设置checkpoint目录和数据源目录

运行,然后手动往数据目录增加日志文件,如下图。

spark streaming会自动识别新增文件,并读取。

运行结果

Creating new context !
loop-1
16/10/11 18:35:40 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
-------------------------------------------
Time: 1476182140000 ms
-------------------------------------------

-------------------------------------------
Time: 1476182140000 ms
-------------------------------------------

loop-2
-------------------------------------------
Time: 1476182160000 ms
-------------------------------------------
(P68658056,U86942038)
(P27395813,U21453697)
(P27395813,U12142025)
(P27395813,U26712632)

-------------------------------------------
Time: 1476182160000 ms
-------------------------------------------
(P68658056,U86942038)
(P27395813,U21453697)
(P27395813,U12142025)
(P27395813,U26712632)

-------------------------------------------
Time: 1476182160000 ms
-------------------------------------------
(P68658056,U86942038)
(P27395813,U21453697)
(P27395813,U12142025)
(P27395813,U26712632)

loop-3
-------------------------------------------
Time: 1476182180000 ms
-------------------------------------------
(P68658056,U86142038)
(P27395813,U21453697)
(P27395813,U26941232)
(P27395814,U12142025)

-------------------------------------------
Time: 1476182180000 ms
-------------------------------------------
(P68658056,U86142038)
(P68658056,U86942038)
(P27395813,U21453697)
(P27395813,U12142025)
(P27395813,U26712632)
(P27395813,U26941232)
(P27395814,U12142025)

loop-4
-------------------------------------------
Time: 1476182200000 ms
-------------------------------------------

-------------------------------------------
Time: 1476182200000 ms
-------------------------------------------
(P68658056,U86142038)
(P27395813,U21453697)
(P27395813,U26941232)
(P27395814,U12142025)

-------------------------------------------
Time: 1476182200000 ms
-------------------------------------------
(P68658056,U86142038)
(P68658056,U86942038)
(P27395813,U21453697)
(P27395813,U12142025)
(P27395813,U26712632)
(P27395813,U26941232)
(P27395814,U12142025)

本地目录调试OK,接下来用hdfs目录来调试:

先写一个日志生成程序,定时往一个临时hdfs目录写入日志文件,并移动至最终目录。先放临时目录的原因是为了保证其原子性。hadoop 的开发可以参考 hadoop 开发&调试

临时目录:"/user/rihai/logdata/tmp",最终目录:"/user/rihai/logdata/"

代码:

  1 package com.rihai.hadoop.hdfs;
  2
  3
  4 import java.io.IOException;
  5 import java.text.SimpleDateFormat;
  6 import java.util.*;
  7
  8 /**
  9  * Created by rihaizhang on 9/7/2016.
 10  */
 11 public class CreateLogging {
 12
 13     private static List<String> usrIdList;
 14     private static List<String> pageList;
 15     private static int usrCount = 100;
 16     private static int pageCount = 1000;
 17     private static int scoreMax = 5;
 18     private static String tempPath = "/user/rihai/logdata/tmp";
 19     private static String mainPath = "/user/rihai/logdata/";
 20     //private static String tempPath = "hdfs://master:9000/user/rihai/logdata/tmp";
 21     //private static String mainPath = "hdfs://master:9000/user/rihai/logdata/";
 22
 23     public static void main(String[] args) throws InterruptedException, IOException {
 24
 25         buildUsrIds();
 26         bulidPageList();
 27         bulidLog();
 28         System.out.println("正在运行");
 29         Scanner sc = new Scanner(System.in);
 30         String input = sc.nextLine();
 31         System.out.println("运行结束");
 32     }
 33
 34     /**
 35      * 生成日志
 36      *
 37      * @throws InterruptedException
 38      * @throws IOException
 39      */
 40     private static void bulidLog() throws InterruptedException, IOException {
 41
 42         SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
 43         SimpleDateFormat format2 = new SimpleDateFormat("yyyyMMddHHmmss");
 44         for (int i = 0; i < 100; i++) {
 45             StringBuilder sb = new StringBuilder();
 46             //build log
 47             for (int j = 0; j < 100; j++) {
 48                 if (j > 0) {
 49                     sb.append("\n");
 50                 }
 51                 int pIndex = GetRandom(pageCount);
 52                 int uIndex = GetRandom(usrCount);
 53                 int score = GetRandom(scoreMax) + 1;
 54                 String datestr = format.format(Calendar.getInstance().getTime());
 55                 sb.append(String.format("%s,%s,%s,%s", usrIdList.get(uIndex), pageList.get(pIndex), score, datestr));
 56             }
 57             String fileName = String.format("log_%s.txt", format2.format(Calendar.getInstance().getTime()));
 58             //send to hdfs
 59             System.out.println("准备写入");
 60             SendToHdfs(fileName, sb.toString());
 61             //sleep
 62             Thread.sleep(1000 * 60 * 2);
 63         }
 64
 65     }
 66
 67     /**
 68      * 发送Hdfs
 69      *
 70      * @param fileName
 71      * @param content
 72      * @throws IOException
 73      */
 74     private static void SendToHdfs(String fileName, String content) throws IOException {
 75
 76         HdfsUtil hdfsUtil = new HdfsUtil();
 77
 78         if (!hdfsUtil.exists(tempPath)) {
 79             boolean result = hdfsUtil.createDirectory(mainPath);
 80             if (result) {
 81                 System.out.println(tempPath + " 创建成功!");
 82             } else {
 83                 System.out.println(tempPath + " 创建失败!");
 84                 return;
 85             }
 86         }
 87
 88         String tempFileName = tempPath + "/" + fileName;
 89         String newFileName = mainPath + "/" + fileName;
 90
 91         hdfsUtil.createFile(tempFileName, content);
 92         System.out.println(String.format("写入%s成功", tempFileName));
 93
 94         boolean result = hdfsUtil.renameFile(tempFileName, newFileName);
 95
 96         System.out.println(String.format("移动至%s%s", newFileName, result ? "成功" : "失败"));
 97
 98     }
 99
100     /**
101      * 随机生成页面
102      */
103     private static void bulidPageList() {
104         Random random = new Random();
105         /**
106          * e.g.
107          * P93002432
108          */
109         pageList = new ArrayList<String>();
110         for (int i = 0; i < pageCount; i++) {
111             int temp = random.nextInt(100000000);
112             pageList.add(String.format("P%08d", temp));
113         }
114     }
115
116     /**
117      * 随机生成用户
118      */
119     private static void buildUsrIds() {
120         Random random = new Random();
121         /**
122          *  e.g.
123          *  U00234999
124          */
125         usrIdList = new ArrayList<String>();
126         for (int i = 0; i < usrCount; i++) {
127             int temp = random.nextInt(100000000);
128             usrIdList.add(String.format("U%08d", temp));
129         }
130     }
131
132     /**
133      * 取随机数
134      *
135      * @param max
136      * @return
137      */
138     private static int GetRandom(int max) {
139         Random random = new Random();
140         int temp = random.nextInt(max);
141         return temp;
142     }
143
144 }

  1 package com.rihai.hadoop.hdfs;
  2
  3 import org.apache.hadoop.conf.Configuration;
  4 import org.apache.hadoop.fs.*;
  5 import org.apache.hadoop.io.IOUtils;
  6
  7 import java.io.ByteArrayOutputStream;
  8 import java.io.IOException;
  9
 10 /**
 11  * hdfs 工具类
 12  * Created by rihaizhang on 9/6/2016.
 13  */
 14 public class HdfsUtil {
 15     private Configuration conf = new Configuration();
 16
 17     public HdfsUtil() {
 18     }
 19
 20     public HdfsUtil(Configuration conf) {
 21         this.conf = conf;
 22     }
 23
 24     /**
 25      * 检查文件或目录是否存在
 26      *
 27      * @param path
 28      * @return
 29      * @throws IOException
 30      */
 31     public boolean exists(String path) throws IOException {
 32         try (FileSystem fs = FileSystem.get(conf)) {
 33             return fs.exists(new Path(path));
 34         }
 35     }
 36
 37     /**
 38      * 创建目录
 39      *
 40      * @param dirPath
 41      * @return
 42      * @throws IOException
 43      */
 44     public boolean createDirectory(String dirPath) throws IOException {
 45         try (FileSystem fs = FileSystem.get(conf)) {
 46             boolean result = fs.mkdirs(new Path(dirPath));
 47             return result;
 48         }
 49     }
 50
 51     /**
 52      * 创建文件
 53      *
 54      * @param filePath
 55      * @param bytes
 56      * @throws IOException
 57      */
 58     public void createFile(String filePath, byte[] bytes) throws IOException {
 59         try (FileSystem fs = FileSystem.get(conf)) {
 60             try (FSDataOutputStream output = fs.create(new Path(filePath))) {
 61                 output.write(bytes);
 62             }
 63         }
 64     }
 65
 66     /**
 67      * 创建文件
 68      *
 69      * @param filePath
 70      * @param contents
 71      * @throws IOException
 72      */
 73     public void createFile(String filePath, String contents) throws IOException {
 74         createFile(filePath, contents.getBytes("UTF-8"));
 75     }
 76
 77     /**
 78      * 追加文件
 79      *
 80      * @param filePath
 81      * @param bytes
 82      * @throws IOException
 83      */
 84     public void appendFile(String filePath, byte[] bytes) throws IOException {
 85         try (FileSystem fs = FileSystem.get(conf)) {
 86             try (FSDataOutputStream output = fs.append(new Path(filePath))) {
 87                 output.write(bytes);
 88             }
 89         }
 90     }
 91
 92     /**
 93      * 追加文件
 94      *
 95      * @param filePath
 96      * @param contents
 97      * @throws IOException
 98      */
 99     public void appendFile(String filePath, String contents) throws IOException {
100         appendFile(filePath, contents.getBytes("UTF-8"));
101     }
102
103     /**
104      * 删除文件或目录
105      *
106      * @param filePath
107      * @param recursive
108      * @return
109      * @throws IOException
110      */
111     public boolean deleteFile(String filePath, boolean recursive) throws IOException {
112         try (FileSystem fs = FileSystem.get(conf)) {
113             boolean result = fs.delete(new Path(filePath), recursive);
114             return result;
115         }
116     }
117
118     public boolean renameFile(String sourcePath, String targetPath) throws IOException {
119
120         try (FileSystem fs = FileSystem.get(conf)) {
121             boolean result = fs.rename(new Path(sourcePath), new Path(targetPath));
122             return result;
123         }
124     }
125
126     /**
127      * 读取文件
128      *
129      * @param filePath
130      * @return
131      * @throws IOException
132      */
133     public String readFile(String filePath) throws IOException {
134         try (FileSystem fs = FileSystem.get(conf)) {
135             FSDataInputStream input = fs.open(new Path(filePath));
136             //byte[] buffer = new byte[input.available()];
137             //input.readFully(0, buffer);
138             ByteArrayOutputStream output = new ByteArrayOutputStream(input.available());
139
140             IOUtils.copyBytes(input, output, conf);
141             String fileContent = output.toString("UTF-8");
142             return fileContent;
143         }
144     }
145
146 }

运行CreateLogging 程序:

重新配置PageLoggingStreaming 程序的run configuration ,设置checkpoint目录和数据源目录

运行结果:

时间: 2024-08-06 15:36:46

spark streaming 实例的相关文章

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照<Spark Streaming编程指南>. Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 获得一个DStream负责连接 监听端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 对每一行数据执行Split操作 val words = line

Spark Streaming 结合FlumeNG使用实例

SparkStreaming是一个对实时数据流进行高通量.容错处理的流式处理系统,可以对多种数据源(如Kdfka.Flume.Twitter.Zero和TCP 套接字)进行类似map.reduce.join.window等复杂操作,并将结果保存到外部文件系统.数据库或应用到实时仪表盘. Spark Streaming流式处理系统特点有: 将流式计算分解成一系列短小的批处理作业 将失败或者执行较慢的任务在其它节点上并行执行 较强的容错能力(基于RDD继承关系Lineage) 使用和RDD一样的语义

Spark Streaming中的基本操作函数实例

官网文档中,大概可分为这几个 TransformationsWindow OperationsJoin OperationsOutput Operations 请了解一些基本信息: DStream是Spark Streaming提供的基本抽象.它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流.在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象.DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示 Tra

Spark Streaming源码解读之Job详解

一:Spark Streaming Job生成深度思考 1. 做大数据例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务.例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理. 2. JobGenerator构造的时候有一个核心的参数是jobScheduler, jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生

Spark(十) -- Spark Streaming API编程

本文测试的Spark版本是1.3.1 Spark Streaming编程模型: 第一步: 需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数: 1.SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息 2.Seconds对象:该对象设置了StreamingContext多久读取一次数据流 第二步: 构建好入口对象之后,直接调用该入口的方法读取各

Spark Streaming源代码学习总结(一)

1.Spark Streaming 代码分析: 1.1 演示样例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def mai

Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

一:Receiver启动的方式设想 1. Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job. 2. Receiver属于Spark Streaming应用程序启动阶段,那么我们找Receiver在哪里启动就应该去找Spark Streaming的启动. 3. Receivers和InputDStreams是一一对应的,默认情况下一般只有一个Receiver.

Spark版本定制八:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

本期内容: 1.DStream与RDD关系彻底研究 2.Streaming中RDD的生成彻底研究 一.DStream与RDD关系彻底研究 课前思考: RDD是怎么生成的? RDD依靠什么生成?根据DStream来的 RDD生成的依据是什么? Spark Streaming中RDD的执行是否和Spark Core中的RDD执行有所不同? 运行之后我们对RDD怎么处理? ForEachDStream不一定会触发Job的执行,但是它一定会触发job的产生,和Job是否执行没有关系: 对于DStream

2.Spark Streaming运行机制和架构

1 解密Spark Streaming运行机制 上节课我们谈到了技术界的寻龙点穴.这就像过去的风水一样,每个领域都有自己的龙脉,Spark就是龙脉之所在,它的龙穴或者关键点就是SparkStreaming.这是上一节课我们非常清晰知道的结论之一.而且上一节课,我们采用了降维的方式.所谓降维的方式,是指把时间放大,就是把时间变长的情况下,我们做SparkStreaming的案例演示的实战,实战的结果是,我们发现在特定的时间段里面,确实是具体的RDD在工作,那么这一节课有必要在上一节课的基础上去谈一