- import org.apache.spark.sql.types._
- val pathA = "hdfs:/tpc-ds/data/store_sales"
- val pathB = "hdfs:/tpc-ds/data/store/"
- // For Spark 2.x use -> val df = spark.read.option("header", true).csv(path)
- val A_df = sqlContext.read.format("com.databricks.spark.csv")
- .option("header","false")
- .option("inferSchema","false")
- .option("delimiter","|")
- .load(pathA)
- // Assign column names to the Store Sales dataframe
- val storeSalesDF = A_df.select(
- A_df("_c0").cast(IntegerType).as("SS_SOLD_DATE_SK"),
- A_df("_c1").cast(IntegerType).as("SS_SOLD_TIME_SK"),
- A_df("_c2").cast(IntegerType).as("SS_ITEM_SK"),
- A_df("_c7").cast(IntegerType).as("SS_STORE_SK")
- )
- val B_df = sqlContext.read.format("com.databricks.spark.csv")
- .option("header","false")
- .option("inferSchema","false")
- .option("delimiter","|")
- .load(pathB)
- // Assign column names to the Region dataframe
- val storeDF = B_df.select(
- B_df("_c0").cast(IntegerType).as("S_STORE_SK"),
- B_df("_c1").cast(StringType).as("S_STORE_ID")
- B_df("_c5").cast(StringType).as("S_STORE_NAME")
- )
- val joinedDF = storeSalesDF.join(storeDF,
- storeSalesDF("SS_STORE_SK") === storeDF("S_STORE_SK")
- )
- joinedDF.take(5)
What is the full routine of Structured Streaming?
Let’s look at the code (the example is from the Spark source code and I made some edits):
val spark = SparkSession .builder .
master("local[2]") .
appName("StructuredNetworkWordCount").
getOrCreate()
val schemaExp = StructType(
StructField("name", StringType, false) ::
StructField("city", StringType, true)
:: Nil
)
//Standard DataSource API, only the read is changed to readStream.
val words = spark.readStream.format("json").schema(schemaExp)
.load("file:///tmp/dir")
//Some APIs of DataFrame.
val wordCounts = words.groupBy("name").count()
//Standard DataSource writing API, only the write is changed to writeStream.
val query = wordCounts.writeStream
//complete,append,update。Currently,
//only the first two types are supported.
.outputMode("complete")
//The console, parquet, memory, and foreach types
.format("console")
.trigger(ProcessingTime(5.seconds))//Here is where the timer is set.
.start()
query.awaitTermination()
This is the complete routine of Structured Streaming.
Structured Streaming currently only supports File and Socket sources. It can output four types, as mentioned above. The foreach can be infinitely expanded. For example:
val query = wordCounts.writeStream.trigger(ProcessingTime(5.seconds))
.outputMode("complete")
.foreach(new ForeachWriter[Row] {
var fileWriter: FileWriter = _
override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"/tmp/example/${partitionId}"))
fileWriter = new FileWriter(new File(s"/tmp/example/${partitionId}/temp"))
true
}
}).start()