Structured Streaming

  1. import org.apache.spark.sql.types._
  2. val pathA = "hdfs:/tpc-ds/data/store_sales"
  3. val pathB = "hdfs:/tpc-ds/data/store/"
  4. // For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)
  5. val A_df = sqlContext.read.format("com.databricks.spark.csv")
  6. .option("header","false")
  7. .option("inferSchema","false")
  8. .option("delimiter","|")
  9. .load(pathA)
  10. // Assign column names to the Store Sales dataframe
  11. val storeSalesDF = A_df.select(
  12. A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
  13. A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
  14. A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
  15. A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
  16. )
  17. val B_df = sqlContext.read.format("com.databricks.spark.csv")
  18. .option("header","false")
  19. .option("inferSchema","false")
  20. .option("delimiter","|")
  21. .load(pathB)
  22. // Assign column names to the Region dataframe
  23. val storeDF = B_df.select(
  24. B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
  25. B_df("_c1").cast(StringType).as("S_STORE_ID")
  26. B_df("_c5").cast(StringType).as("S_STORE_NAME")
  27. )
  28. val joinedDF = storeSalesDF.join(storeDF,
  29. storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
  30. )
  31. joinedDF.take(5)

What is the full routine of Structured Streaming? 
Let’s look at the code (the example is from the Spark source code and I made some edits): 
val spark = SparkSession  .builder  . 
master("local[2]")  . 
appName("StructuredNetworkWordCount"). 
getOrCreate()

val schemaExp = StructType( 
      StructField("name", StringType, false) :: 
        StructField("city", StringType, true) 
        :: Nil 
    )

//Standard DataSource API, only the read is changed to readStream. 
   val words = spark.readStream.format("json").schema(schemaExp) 
      .load("file:///tmp/dir")

//Some APIs of DataFrame. 
    val wordCounts = words.groupBy("name").count()

//Standard DataSource writing API, only the write is changed to writeStream. 
    val query = wordCounts.writeStream 
//complete,append,update。Currently, 
//only the first two types are supported. 
      .outputMode("complete") 
//The console, parquet, memory, and foreach types 
      .format("console") 
      .trigger(ProcessingTime(5.seconds))//Here is where the timer is set. 
      .start()

query.awaitTermination()

This is the complete routine of Structured Streaming. 
Structured Streaming currently only supports File and Socket sources. It can output four types, as mentioned above. The foreach can be infinitely expanded. For example: 
val query = wordCounts.writeStream.trigger(ProcessingTime(5.seconds)) 
      .outputMode("complete") 
      .foreach(new ForeachWriter[Row] {

var fileWriter: FileWriter = _

override def process(value: Row): Unit = { 
        fileWriter.append(value.toSeq.mkString(",")) 
      }

override def close(errorOrNull: Throwable): Unit = { 
        fileWriter.close() 
      }

override def open(partitionId: Long, version: Long): Boolean = { 
        FileUtils.forceMkdir(new File(s"/tmp/example/${partitionId}")) 
        fileWriter = new FileWriter(new File(s"/tmp/example/${partitionId}/temp")) 
        true 
      } 
    }).start()

时间: 2024-10-20 08:29:41

Structured Streaming的相关文章

Spark Structured Streaming框架(3)之数据输出源详解

Spark Structured streaming API支持的输出源有:Console.Memory.File和Foreach.其中Console在前两篇博文中已有详述,而Memory使用非常简单.本文着重介绍File和Foreach两种方式,并介绍如何在源码基本扩展新的输出方式. 1. File Structured Streaming支持将数据以File形式保存起来,其中支持的文件格式有四种:json.text.csv和parquet.其使用方式也非常简单只需设置checkpointLo

Spark Structured Streaming框架(4)之窗口管理详解

1. 结构 1.1 概述 Structured Streaming组件滑动窗口功能由三个参数决定其功能:窗口时间.滑动步长和触发时间. 窗口时间:是指确定数据操作的长度: 滑动步长:是指窗口每次向前移动的时间长度: 触发时间:是指Structured Streaming将数据写入外部DataStreamWriter的时间间隔. 图 11 1.2 API 用户管理Structured Streaming的窗口功能,可以分为两步完成: 1) 定义窗口和滑动步长 API是通过一个全局的window方法

学习Spark2.0中的Structured Streaming(一)

转载自:http://lxw1234.com/archives/2016/10/772.htm Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL).Structured Streaming顾名思义,它将数据源和计算结果都映射成一张"结构化"的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率. Sp

Spark Structured Streaming框架(5)之进程管理

Structured Streaming提供一些API来管理Streaming对象.用户可以通过这些API来手动管理已经启动的Streaming,保证在系统中的Streaming有序执行. 1. StreamingQuery 在调用DataStreamWriter方法的start启动Streaming后,会返回一个StreamingQuery对象.所以用户就可以通过这个对象来管理Streaming. 如下所示: val query = df.writeStream.format("console

Spark Structured Streaming框架(2)之数据输入源详解

Spark Structured Streaming目前的2.1.0版本只支持输入源:File.kafka和socket. 1. Socket Socket方式是最简单的数据输入源,如Quick example所示的程序,就是使用的这种方式.用户只需要指定"socket"形式并配置监听的IP和Port即可. val scoketDF = spark.readStream .format("socket") .option("host","

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Structured Streaming教程(1) —— 基本概念与使用

近年来,大数据的计算引擎越来越受到关注,spark作为最受欢迎的大数据计算框架,也在不断的学习和完善中.在Spark2.x中,新开放了一个基于DataFrame的无下限的流式处理组件--Structured Streaming,它也是本系列的主角,废话不多说,进入正题吧! 简单介绍 在有过1.6的streaming和2.x的streaming开发体验之后,再来使用Structured Streaming会有一种完全不同的体验,尤其是在代码设计上. 在过去使用streaming时,我们很容易的理解

Structured Streaming教程(2) —— 常用输入与输出

上篇了解了一些基本的Structured Streaming的概念,知道了Structured Streaming其实是一个无下界的无限递增的DataFrame.基于这个DataFrame,我们可以做一些基本的select.map.filter操作,也可以做一些复杂的join和统计.本篇就着重介绍下,Structured Streaming支持的输入输出,看看都提供了哪些方便的操作. 数据源 Structured Streaming 提供了几种数据源的类型,可以方便的构造Steaming的Dat

Structured Streaming教程(3) —— 与Kafka的集成

Structured Streaming最主要的生产环境应用场景就是配合kafka做实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本.就在前一个月,我们才从0.9升级到0.10,终于可以尝试structured streaming的很多用法,很开心~ 引入 如果是maven工程,直接添加对应的kafka的jar包即可: <dependency> <groupId>org.apache.spark</groupI