Structured Streaming 实战案例 读取文本数据

1.1.1.读取文本数据

spark应用可以监听某一个目录,而web服务在这个目录上实时产生日志文件,这样对于spark应用来说,日志文件就是实时数据

Structured Streaming支持的文件类型有text,csv,json,parquet

●准备工作

在people.json文件输入如下数据:

{"name":"json","age":23,"hobby":"running"}

{"name":"charles","age":32,"hobby":"basketball"}

{"name":"tom","age":28,"hobby":"football"}

{"name":"lili","age":24,"hobby":"running"}

{"name":"bob","age":20,"hobby":"swimming"}

注意:文件必须是被移动到目录中的,且文件名不能有特殊字符

●需求

接下里使用Structured Streaming统计年龄小于25岁的人群的爱好排行榜

●代码演示:

import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
  * {"name":"json","age":23,"hobby":"running"}
  * {"name":"charles","age":32,"hobby":"basketball"}
  * {"name":"tom","age":28,"hobby":"football"}
  * {"name":"lili","age":24,"hobby":"running"}
  * {"name":"bob","age":20,"hobby":"swimming"}
  * 统计年龄小于25岁的人群的爱好排行榜
  */
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    val Schema: StructType = new StructType()
      .add("name","string")
      .add("age","integer")
      .add("hobby","string")
    //2.接收数据
    import spark.implicits._
    // Schema must be specified when creating a streaming source DataFrame.
    val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")
    //3.处理数据
    val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)
    //4.输出结果
    result.writeStream
      .format("console")
      .outputMode("complete")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }
}

1

import org.apache.spark.SparkContext

2

import org.apache.spark.sql.streaming.Trigger

3

import org.apache.spark.sql.types.StructType

4

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

5

/**

6

  * {"name":"json","age":23,"hobby":"running"}

7

  * {"name":"charles","age":32,"hobby":"basketball"}

8

  * {"name":"tom","age":28,"hobby":"football"}

9

  * {"name":"lili","age":24,"hobby":"running"}

10

  * {"name":"bob","age":20,"hobby":"swimming"}

11

  * 统计年龄小于25岁的人群的爱好排行榜

12

  */

13

object WordCount2 {

14

  def main(args: Array[String]): Unit = {

15

    //1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSet

16

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()

17

    val sc: SparkContext = spark.sparkContext

18

    sc.setLogLevel("WARN")

19

    val Schema: StructType = new StructType()

20

      .add("name","string")

21

      .add("age","integer")

22

      .add("hobby","string")

23

    //2.接收数据

24

    import spark.implicits._

25

    // Schema must be specified when creating a streaming source DataFrame.

26

    val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")

27

    //3.处理数据

28

    val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)

29

    //4.输出结果

30

    result.writeStream

31

      .format("console")

32

      .outputMode("complete")

33

      .trigger(Trigger.ProcessingTime(0))

34

      .start()

35

      .awaitTermination()

36

  }

37

}

代码截图:

原文地址:https://www.cnblogs.com/TiePiHeTao/p/e83029055db1b4a602e15785b2d079c1.html

时间: 2024-10-07 01:55:02

Structured Streaming 实战案例 读取文本数据的相关文章

Structured Streaming 实战案例 读取Scoker

1.1.1.读取Socket数据 ●准备工作 nc -lk 9999 hadoop spark sqoop hadoop spark hive hadoop ●代码演示: import org.apache.spark.SparkContext import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object WordCo

【Excel&amp;VBA】VBA解析本地文件目录 读取文本数据

偶尔用到Excel中VBA处理一些事情,毕竟鄙人比较懒,总是喜欢把那些重复性的工作想方设法交给计算机来处理,那样才能证明自己的大脑还在运转着,总结一些VBA中读取解析本地文件目录的方法. 1 ' 该函数是获取指定目录下所有的 文件夹 名 2 Sub readFile() 3 Path = "d:\" 4 sonpath = Dir(Path, vbDirectory) 5 Do While sonpath <> "" 6 ' 跳过当前的目录及上层目录 7

使用Scanner输入数据-读取文本数据

Scanner类的方法定义: · 构造方法:public Scanner(InputStream source): · 设置读取分割符:public Scanner useDelimiter(String pattern): · 判断是否有数据:public boolean hasNextXxx(): · 取出数据:public 数据类型 nextXxx(): 1 package cn.demo; 2 3 import java.io.File; 4 import java.io.FileInp

QT中读取文本数据(txt)

下面的代码实现读取txt文档中的数据,并且是一行一行的读取. void MainWindow::on_pushButton_clicked() { QFile file("abcd.txt"); if(! file.open(QIODevice::ReadOnly|QIODevice::Text)) qDebug()<<file.errorString(); else qDebug()<<"openok"; file.seek(0); QTe

python读取文本数据写入到数据库及查询优化

文本数据格式 ip2int函数用于IP地址转化为整数 int2ip函数用于整数转化为IP地址 insert_row函数用于插入数据库记录 from __future__ import print_function import torndb def get_mysql_conn():     return torndb.Connection(         host=mysql["host"] + ":" + mysql["port"],   

MySQL中游标使用以及读取文本数据

前言 之前一直没有接触数据库的学习,只是本科时候修了一本数据库基本知识的课.当时只对C++感兴趣,天真的认为其它的课都没有用,数据库也是半懂不懂,胡乱就考试过了.现在学习大数据分析,接触了数据挖掘,才感觉到数据库是不可跨越的坎.直到现在才感觉到<操作系统>.<编译原理>.<计算机组成原理>等等课程的重要性.在浩瀚的知识面前,个人是非常渺小的.掌握了一种思想之后,任何事情都不困难,困难的是你是否真的静下心看一看帮助文档.认真的Google.静心.静气.认真.执着. 游标-

C++文本数据读取

读取文本数据的时候,其相应的流程与写入颇为相似,但也有一些区别,毕竟是两个不同的功能. 1.添加必须的头文件:#include <fstream> .#include <cstdlib>. 2.定义相应的数组,用于存储文件的名称. 3.定义相应的变量,用于存储文件写入的数据. 4.创建一个ifstream对象. 5.将ifstream与文本文件进行关联. 6.测试文件打开是否正常. 7.使用ifstream对象和<<运算符进行数据写入. 8.使用完ifstream对象后

学习Spark2.0中的Structured Streaming(一)

转载自:http://lxw1234.com/archives/2016/10/772.htm Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL).Structured Streaming顾名思义,它将数据源和计算结果都映射成一张"结构化"的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率. Sp

文本数据增量导入到mysql

实现思路:       实现Java读取TXT文件中的内容并存到内存,将内存中的数据和mysql 数据库里面某张表数据的字段做一个比较,如果比较内存中的数据在mysql 里存在则不做处理,如果不存在则将该数据插入mysql数据库中 步骤1.读取文本数据   给一个string 返回值 步骤2.查询mysql 表中数据  给一个String 返回值 步骤3  .内存中文本数据和读取的mysql 数据做比较 /** * 实现读取文件信息 * * @param fileName * @return *