Spark Structured Streaming框架(2)之数据输入源详解

  Spark Structured Streaming目前的2.1.0版本只支持输入源:File、kafka和socket。

1. Socket

  Socket方式是最简单的数据输入源,如Quick example所示的程序,就是使用的这种方式。用户只需要指定"socket"形式并配置监听的IP和Port即可。


val scoketDF = spark.readStream

.format("socket")

.option("host","localhost")

.option("port", 9999)

.load()

注意:

Socket方式Streaming是接收UTF8的text数据,并且这种方式最后只用于测试,不要用户端到端的项目中。

2. Kafka

  Structured streaming提供接收kafka数据源的接口,用户使用起来也非常方便,只是需要注意开发环境所依赖的特别库,同时streaming运行环境的kafka版本。

2.1 开发环境

  若以kafka作为输入源,那么开发环境需要再引入所依赖的架包。如使用了Spark版本是2.1.0,那么maven的pom.xml文件中需要添加如下的依赖库。


<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql-kafka-0.10_2.11</artifactId>

<version>2.1.0</version>

</dependency>

2.2 API

  与使用socket作为输入源类似,只需要指定"kafka"作为输入源,同时传递kafka的server集和topic集。如下所示:


// Subscribe to 1 topic

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribe", "topic1")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

// Subscribe to multiple topics

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribe", "topic1,topic2")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

// Subscribe to a pattern

val df = spark

.readStream

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribePattern", "topic.*")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)]

2.3 运行环境

  由于spark 2.1.0使用了kafka的版本是0.10,所以kafka server也要使用同样版本,即发送数据的kafka也需要使用0.10版本。

否则会出现如下的错误:

图 21

3. File

  Structured Streaming可以指定一个目录的文件作为数据输入源,其中支持的文件格式有:text、csv、json、parquet。

如下所示:


object StructuredFile{

def main(args:Array[String]){

val spark = SparkSession

.builder

.appName("StructuredNetWordCount")

.getOrCreate()

val userSchema = new StructType().add("name","string").add("age","integer")

val jsonDF = spark

.readStream

.schema(userSchema)

.json("/root/jar/directory")//Equivalent to format("json").load("/root/jar/directore")

Val query = jsonDF.writeStream

.format(console)

.start()

Query.awaitTermination()

}

}

 1) DataStreamReader接口

  读取文件的接口有5个:

  • format(source).load(path):source参数是指文件的形式,有text、csv、json、parquet四种形式;
  • text(path):其封装了format("text").load(path);
  • json(path):其封装了format("json").load(path);
  • csv(path):其封装了format("csv").load(path);
  • parquet(path):其封装了format("parquet").load(path);

  其中path参数为文件的路径,若该路径发现新增文件,则会被以数据流的形式被获取。但该路径只能是指定的格式文件,不能存放其它文件格式。

注意:

若是以Spark集群方式运行,则路径是hdfs种的文件路径;若是以local方式执行,则路径为本地路径。

 2) schema()方法

  获取的文件形式有四种,但并不是每种格式都需要调用schema()方法来配置文件信息:

  • csv、json、parquet:用户需要通过schema()方法手动配置文件信息;
  • text:不需要用户指定schema,其返回的列是只有一个"value"。

4) 自定义

  若上述Spark Structured Streaming API提供的数据输入源不能满足要求,那么还有一种方法可以使用:修改源码。

如下通过获取"socket"数据源相应类的内容为例,介绍具体使用方式:

4.1 实现Provider

  首先实现一个Provider,该类会返回一个数据的数据源对象。其中Provider实现类需要实现三个方法:


序号


方法


描述


1


souceSchema


该方法返回一个配置信息的词典,key是字符串,value是StructType对象


2


createSource


该方法返回一个接受数据源的对象,其为Source接口的子类


3


shortName


该方法返回一个数据源的标识符,如上述format()方法传递的参数:"socket"、"json"或"kafka";此时返回的字符串,就是format()方法传递的参数

  如下所示实现一个TextRabbitMQSourceProvider类:


class TextRabbitMQSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {

private def parseIncludeTimestamp(params: Map[String, String]): Boolean = {

Try(params.getOrElse("includeTimestamp", "false").toBoolean) match {

case Success(bool) => bool

case Failure(_) =>

throw new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"")

}

}

/** Returns the name and schema of the source that can be used to continually read data. */

override def sourceSchema(

sqlContext: SQLContext,

schema: Option[StructType],

providerName: String,

parameters: Map[String, String]): (String, StructType) = {

logWarning("The socket source should not be used for production applications! " +

"It does not support recovery.")

if (!parameters.contains("host")) {

throw new AnalysisException("Set a host to read from with option(\"host\", ...).")

}

if (!parameters.contains("port")) {

throw new AnalysisException("Set a port to read from with option(\"port\", ...).")

}

val schema =

if (parseIncludeTimestamp(parameters)) {

TextSocketSource.SCHEMA_TIMESTAMP

} else {

TextSocketSource.SCHEMA_REGULAR

}

("textSocket", schema)

}

override def createSource(

sqlContext: SQLContext,

metadataPath: String,

schema: Option[StructType],

providerName: String,

parameters: Map[String, String]): Source = {

val host = parameters("host")

val port = parameters("port").toInt

new TextRabbitMQSource(host, port, parseIncludeTimestamp(parameters), sqlContext)

}

/** String that represents the format that this data source provider uses. */

override def shortName(): String = "RabbitMQ"

}

4.2 实现Source

  用户需要实现一个真正接受数据的类,该类实例是由Provider实现类来实例化,如上述的createSource()方法。其中需要实现Source抽象类的几个方法,从而让Structured Streaming引擎能够调用:


序号


方法


描述


1


getOffset


获取可用的数据偏移量,表明是否有可用的数据


2


getBatch


获取可用的数据,以DataFrame对象形式返回


3


commit


传递已经接收的数据偏移量


4


stop


听着Source数据源


class TextRabbitMQSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)

extends Source with Logging {

@GuardedBy("this")

private var socket: Socket = null

@GuardedBy("this")

private var readThread: Thread = null

/**

* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.

* Stored in a ListBuffer to facilitate removing committed batches.

*/

@GuardedBy("this")

protected val batches = new ListBuffer[(String, Timestamp)]

@GuardedBy("this")

protected var currentOffset: LongOffset = new LongOffset(-1)

@GuardedBy("this")

protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)

initialize()

private def initialize(): Unit = synchronized {

socket = new Socket(host, port)

val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))

readThread = new Thread(s"TextSocketSource($host, $port)") {

setDaemon(true)

override def run(): Unit = {

try {

while (true) {

val line = reader.readLine()

if (line == null) {

// End of file reached

logWarning(s"Stream closed by $host:$port")

return

}

TextSocketSource.this.synchronized {

val newData = (line,

Timestamp.valueOf(

TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))

)

currentOffset = currentOffset + 1

batches.append(newData)

}

}

} catch {

case e: IOException =>

}

}

}

readThread.start()

}

/** Returns the schema of the data from this source */

override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP

else TextSocketSource.SCHEMA_REGULAR

override def getOffset: Option[Offset] = synchronized {

if (currentOffset.offset == -1) {

None

} else {

Some(currentOffset)

}

}

/** Returns the data that is between the offsets (`start`, `end`]. */

override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {

val startOrdinal =

start.flatMap(LongOffset.convert).getOrElse(LongOffset(-1)).offset.toInt + 1

val endOrdinal = LongOffset.convert(end).getOrElse(LongOffset(-1)).offset.toInt + 1

// Internal buffer only holds the batches after lastOffsetCommitted

val rawList = synchronized {

val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1

val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1

batches.slice(sliceStart, sliceEnd)

}

import sqlContext.implicits._

val rawBatch = sqlContext.createDataset(rawList)

// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp

// if requested.

if (includeTimestamp) {

rawBatch.toDF("value", "timestamp")

} else {

// Strip out timestamp

rawBatch.select("_1").toDF("value")

}

}

override def commit(end: Offset): Unit = synchronized {

val newOffset = LongOffset.convert(end).getOrElse(

sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +

s"originate with an instance of this class")

)

val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

if (offsetDiff < 0) {

sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")

}

batches.trimStart(offsetDiff)

lastOffsetCommitted = newOffset

}

/** Stop this source. */

override def stop(): Unit = synchronized {

if (socket != null) {

try {

// Unfortunately, BufferedReader.readLine() cannot be interrupted, so the only way to

// stop the readThread is to close the socket.

socket.close()

} catch {

case e: IOException =>

}

socket = null

}

}

override def toString: String = s"TextSocketSource[host: $host, port: $port]"

}

4.3 注册Provider

  由于Structured Streaming引擎会根据用户在format()方法传递的数据源类型来寻找具体数据源的provider,即在DataSource.lookupDataSource()方法中寻找。所以用户需要将上述实现的Provider类注册到Structured Streaming引擎中。所以用户需要将provider实现类的完整名称添加到引擎中的某个,这个地方就是在Spark SQL工程中的\spark-2.2.0\sql\core\src\main\resources\META-INF\services\org.apache.spark.sql.sources.DataSourceRegister文件中。用户通过将Provider实现类名称添加到该文件中,从而完成Provider类的注册工作。

如下所示在文件最后一行添加,我们自己自定义的实现类完整路径和名称:


org.apache.spark.sql.execution.datasources.csv.CSVFileFormat

org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider

org.apache.spark.sql.execution.datasources.json.JsonFileFormat

org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat

org.apache.spark.sql.execution.datasources.text.TextFileFormat

org.apache.spark.sql.execution.streaming.ConsoleSinkProvider

org.apache.spark.sql.execution.streaming.TextSocketSourceProvider

org.apache.spark.sql.execution.streaming.RateSourceProvider

org.apache.spark.sql.execution.streaming.TextRabbitMQSourceProvider

4.4 使用API

  再Spark SQL源码重新编译后,并肩其jar包丢进Spark的jars路径下。从而用户就能够像使用Structured Streaming自带的数据输入源一样,使用用户自定义的"RabbitMQ"数据输入源了。即用户只需将RabbitMQ字符串传递给format()方法,其使用方式和"socket"方式一样,因为上述的数据源内容其实是Socket方式的实现内容。

5. 参考文献

[1]. Structured Streaming Programming Guide.

[2]. Kafka Integration Guide.

时间: 2024-12-29 15:30:01

Spark Structured Streaming框架(2)之数据输入源详解的相关文章

Spark Structured Streaming框架(3)之数据输出源详解

Spark Structured streaming API支持的输出源有:Console.Memory.File和Foreach.其中Console在前两篇博文中已有详述,而Memory使用非常简单.本文着重介绍File和Foreach两种方式,并介绍如何在源码基本扩展新的输出方式. 1. File Structured Streaming支持将数据以File形式保存起来,其中支持的文件格式有四种:json.text.csv和parquet.其使用方式也非常简单只需设置checkpointLo

Spark Structured Streaming框架(4)之窗口管理详解

1. 结构 1.1 概述 Structured Streaming组件滑动窗口功能由三个参数决定其功能:窗口时间.滑动步长和触发时间. 窗口时间:是指确定数据操作的长度: 滑动步长:是指窗口每次向前移动的时间长度: 触发时间:是指Structured Streaming将数据写入外部DataStreamWriter的时间间隔. 图 11 1.2 API 用户管理Structured Streaming的窗口功能,可以分为两步完成: 1) 定义窗口和滑动步长 API是通过一个全局的window方法

Spark Structured Streaming框架(5)之进程管理

Structured Streaming提供一些API来管理Streaming对象.用户可以通过这些API来手动管理已经启动的Streaming,保证在系统中的Streaming有序执行. 1. StreamingQuery 在调用DataStreamWriter方法的start启动Streaming后,会返回一个StreamingQuery对象.所以用户就可以通过这个对象来管理Streaming. 如下所示: val query = df.writeStream.format("console

Java输入输出流详解

通过数据流.序列化和文件系统提供系统输入和输出. Java把这些不同来源和目标的数据都统一抽象为数据流.Java语言的输入输出功能是十分强大而灵活的,美中不足的是看上去输入输出的代码并不是很简洁,因为你往往需要包装许多不同的对象. 在Java类库中,IO部分的内容是很庞大的,因为它涉及的领域很广泛:标准输入输出,文件的操作,网络上的数据流,字符串流,对象流,zip文件流. 1.1.Java流的分类 按流向分: 输入流: 程序可以从中读取数据的流.输出流: 程序能向其中写入数据的流. 按数据传输单

Linux USB 鼠标输入驱动详解

平台:mini2440 内核:linux 2.6.32.2 USB设备插入时,内核会读取设备信息,接着就把id_table里的信息与读取到的信息做比较,看是否匹配,如果匹配,就调用probe函数.USB设备拔出时会调用disconnect函数.URB在USB设备驱动程序中用来描述与USB设备通信时用到的基本载体和核心数据结构. URB(usb request block)处理流程: ①USB设备驱动程序创建并初始化一个访问特定USB设备特定端点的urb并提交给USB core. ②USB cor

转 Java输入输出流详解(非常详尽)

转  http://blog.csdn.net/zsw12013/article/details/6534619 通过数据流.序列化和文件系统提供系统输入和输出. Java把这些不同来源和目标的数据都统一抽象为数据流.Java语言的输入输出功能是十分强大而灵活的,美中不足的是看上去输入输出的代码并不是很简洁,因为你往往需要包装许多不同的对象. 在Java类库中,IO部分的内容是很庞大的,因为它涉及的领域很广泛:标准输入输出,文件的操作,网络上的数据流,字符串流,对象流,zip文件流. 1.1.J

Android4.0 input事件输入流程详解(中间层到应用层)

在Android系统中,类似于键盘按键.触摸屏等事件是由WindowManagerService服务来管理的,然后再以消息的形式来分发给应用程序进行处理.系统启动时,窗口管理服务也会启动,该服务启动过程中,会通过系统输入管理器InputManager来负责监控键盘消息.当某一个Activity激活时,会在该Service下注册一个接收消息的通道,表明可以处理具体的消息,然后当有消息时,InputManager就会分发给当前处于激活状态下的Activity进行处理. InputManager的启动

CentOS 6.5 MySQL/MariaDB数据备份与恢复备份详解

MySQL/MariaDB数据备份与恢复备份 数据对我们来说再重要不过了,那我们如何做到对数据尽可能的安全呢,当我们的数据丢失了那又该怎么做呢,所以说数据备份对我们的数据安全性来说太重要了. 数据对我们来说再熟悉不过了,也最平常不过了,我们每天都在接触各色各样的数据,数据记录了我们平常相关的业务信息,所以数据对于我们来说是很重要的,这么重要的数据如果我们的数据丢失了那我们是不是相关的业务都没法进行了呢,这应该是个很麻烦的问题,那我们怎么保护我们的数据的安全呢,这就要用到我们的数据备份了. 如何执

数据传送指令详解

数据传送指令详解 前言 上一章我们说了汇编语言的基础,包括数据格式,寄存器以及操作数的标识方式,接下来我们就应该去认识一下hiU币按语言当红真难过的格各个指令了.这些指令大部署很简单,但是组合在一起却能模拟出我们程序当中香烟的任何效果,确实很神奇. 数据传送指令 数据传送指令的目的是我了将一个数据从一个位置复制到另一个位置.既然如此,那么数据传送至零就会包含一个源操作数和一个目的操作数,指令会将源操作数的值复制到目的操作数并覆盖. 数据传送指令一共可以分为5种,分别是mov,movs,movz,