Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

主要内容

  1. Spark SQL、DataFrame与Spark Streaming

1. Spark SQL、DataFrame与Spark Streaming

源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel

object SqlNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 2 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by ‘nc‘)
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    //Socke作为数据源
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    //words DStream
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    //调用foreachRDD方法,遍历DStream中的RDD
    words.foreachRDD((rdd: RDD[String], time: Time) => {
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Register as table
      wordsDataFrame.registerTempTable("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        sqlContext.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class Record(word: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

运行程序后,再运行下列命令

[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data

处理结果:


========= 1448783840000 ms =========
+---------+-----+
|     word|total|
+---------+-----+
|    Spark|   12|
|   system|   12|
|  general|   12|
|     fast|   12|
|      and|   12|
|computing|   12|
|        a|   12|
|       is|   12|
|      for|   12|
|      Big|   12|
|  cluster|   12|
|     Data|   12|
+---------+-----+

========= 1448783842000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+

========= 1448783844000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+
时间: 2024-10-11 07:16:42

Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming的相关文章

Scala入门到精通——第十三节 高阶函数

本节主要内容 高阶函数简介 Scala中的常用高阶函数 SAM转换 函数柯里化 偏函数 1. 高阶函数简介 高阶函数主要有两种:一种是将一个函数当做另外一个函数的参数(即函数参数):另外一种是返回值是函数的函数.这两种在本教程的第五节 函数与闭包中已经有所涉及,这里简单地回顾一下: (1)函数参数 //函数参数,即传入另一个函数的参数是函数 //((Int)=>String)=>String scala> def convertIntToString(f:(Int)=>String

Spark修炼之道系列教程预告

课程内容 Spark修炼之道(基础篇)--Linux基础(12讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源码解析(50讲) 版权声明:本文为博主原创文章,未经博主允许不得转载.

Spark修炼之道——Spark学习路线、课程大纲

课程内容 Spark修炼之道(基础篇)--Linux基础(15讲).Akka分布式编程(8讲) Spark修炼之道(进阶篇)--Spark入门到精通(30讲) Spark修炼之道(实战篇)--Spark应用开发实战篇(20讲) Spark修炼之道(高级篇)--Spark源代码解析(50讲) 部分内容会在实际编写时动态调整.或补充.或删除. Spark修炼之道(基础篇)--Linux大数据开发基础(15讲). Linux大数据开发基础--第一节:Ubuntu Linux安装与介绍 Linux大数据

Spark修炼之道(进阶篇)——Spark入门到精通:第九节 Spark SQL运行流程解析

1.整体运行流程 使用下列代码对SparkSQL流程进行分析,让大家明白LogicalPlan的几种状态,理解SparkSQL整体执行流程 // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits

Spark修炼之道(进阶篇)——Spark入门到精通:第二节 Hadoop、Spark生成圈简介

本节主要内容 Hadoop生态圈 Spark生态圈 1. Hadoop生态圈 原文地址:http://os.51cto.com/art/201508/487936_all.htm#rd?sukey=a805c0b270074a064cd1c1c9a73c1dcc953928bfe4a56cc94d6f67793fa02b3b983df6df92dc418df5a1083411b53325 下图给出了Hadoop生态圈中的重要产品: 图片来源:http://www.36dsj.com/archiv

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

Scala入门到精通——第十四节 Case Class与模式匹配(一)

本节主要内容 模式匹配入门 Case Class简介 Case Class进阶 1. 模式匹配入门 在java语言中存在switch语句,例如: //下面的代码演示了java中switch语句的使用 public class SwitchDemo { public static void main(String[] args) { for(int i = 0; i < 100; i++) { switch (i) { case 10:System.out.println("10"

Spark修炼之道(进阶篇)——Spark入门到精通:第一节 Spark 1.5.0集群搭建

作者:周志湖 网名:摇摆少年梦 微信号:zhouzhihubeyond 本节主要内容 操作系统环境准备 Hadoop 2.4.1集群搭建 Spark 1.5.0 集群部署 注:在利用CentOS 6.5操作系统安装spark 1.5集群过程中,本人发现Hadoop 2.4.1集群可以顺利搭建,但在Spark 1.5.0集群启动时出现了问题(可能原因是64位操作系统原因,源码需要重新编译,但本人没经过测试),经本人测试在ubuntu 10.04 操作系统上可以顺利成功搭建.大家可以利用CentOS

Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark SQL案例实战(一)

作者:周志湖 放假了,终于能抽出时间更新博客了--. 1. 获取数据 本文通过将github上的Spark项目git日志作为数据,对SparkSQL的内容进行详细介绍 数据获取命令如下: [[email protected] spark]# git log --pretty=format:'{"commit":"%H","author":"%an","author_email":"%ae"