SparkStreaming(一)--核心概念及算子

1.环境

CDH 5.16.1
Spark 2.3.0.cloudera4

2.核心概念

官网: https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html

GitHub: https://github.com/apache/spark

2.1 StreamingContext

第一点
class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {

  def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
  }

  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }
第二点
def stop(
      stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
     ): Unit = synchronized {
    stop(stopSparkContext, false)
  }
  • streamingContext.stop 默认会停止 SparkContex 和 StreamingContext,可通过设置 spark.streaming.stopSparkContextByDefault 为false 让其只停止 StreamingContext
  • SparkContext 可以创建多个 StreamingContext

2.2 Input Dstreams 和 Recivers

第一点
Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

每一个 input DStream 都依赖于 Reciver (除文件系统外),接收数据,存放在Spark的内存中,以供处理。


streamingContext.socketTextStream 的返回值是 ReceiverInputDStream[String]
streamingContext.textFileStream 的返回值是 DStream[String]
说明:从文件系统中获取数据流是不需要启动 receiver线程 接收数据存放到 Spark中

第二点
When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using an input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).

当从用本地模式时,不能使用 local 和 local[1],因为 Input Dstream 基于 Receiver 接收数据(除文件系统外),是需要启动 receiver 线程的;如果设置 local[1],唯一的一个线程会被用来运行 receiver,主程序将没有线程去运行

3. 案例

3.1 updateStateByKey

有状态转化

package com.monk.spark

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @className: StatefulWordCount
  * @description: TODO
  * @author wu ning
  * @date 2019/12/17 0:11
  */
object StatefulWordCount {

  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)

    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(10))

    //生产环境中 checkpoint 应当设置在HDFS上
    ssc.checkpoint(".")

    val lineStream: ReceiverInputDStream[String] = ssc.socketTextStream("cdh02", 6789, StorageLevel.MEMORY_AND_DISK)

    //(key,1)
    val wordToOne: DStream[(String, Int)] = lineStream.flatMap(_.split(" "))
      .map((_, 1))

    val wordCount: DStream[(String, Int)] = wordToOne.updateStateByKey(updateFunction)

    wordCount.print()

    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 创建updateStateByKey的函数
    * @param current 在当前阶段 一个新的key对应的value组成的序列
    * @param pre 上一个阶段 这个key对应的value
    * @return
    */

  def updateFunction(current: Seq[Int], pre: Option[Int]): Option[Int] = {
    val currentValue:Int = current.sum

    //如果pre中没有值,那么使用默认值 0
    val preValue: Int = pre.getOrElse(0)

    Some(currentValue + preValue)
  }
}

本地调试报错:org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/F
原因:在 C:\Windows\System32目录下有 hadoop.dll 文件或环境变量Path中配置了 %Hadoop_Home%/bin 目录而导致的;简而言之,是因为配置的系统环境变量Path的任意目录下存在 hadoop.dll 文件,从而被认为这是一个hadoop集群环境,但是hadoop集群又不支持 window 环境而产生的异常。
解决方法:删除环境变量中的 hadoop.dll,确保环境变量中没有 hadoop.dll 文件即可。

3.2 foreachRDD

wordcount写入到Mysql

package com.monk.spark

import java.sql.Connection

import com.monk.utils.{DataSourceUtil, SqlProxy}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @className: ForeachRddWordCount
  * @description: TODO
  * @author wu ning
  * @date 2019/12/17 21:06
  */
object ForeachRddWordCount {

  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)

    val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(10))

    ssc.checkpoint(".")

    val lineStream: ReceiverInputDStream[String] = ssc.socketTextStream("cdh02", 9876, StorageLevel.MEMORY_AND_DISK)

    val wordToOne: DStream[(String, Int)] = lineStream.flatMap(_.split(" "))
      .map((_, 1))

    val wordCount: DStream[(String, Int)] = wordToOne.updateStateByKey(updateFunction)

    wordCount.print()

    wordCount.foreachRDD {
      rdd => {
        rdd.foreachPartition {
          partition => {
            //获取 Connection
            val connection: Connection = DataSourceUtil.getConnection()

            val sqlProxy = new SqlProxy

            //使用 replace into 语法,表中必须有 primary key
            val sql = "replace into wordcount(word,counts) values(?,?)"
            partition.foreach {
              record => {
                sqlProxy.excuteUpdate(connection, sql, Array(record._1, record._2))
              }
            }
            sqlProxy.shutdown(connection)
          }
        }
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }

  /**
    * 创建updateStateByKey函数
    *
    * @param current 当前 key 对应的 value 的 Seq 序列
    * @param pre     之前 key 对应的 value 的 值
    * @return
    */
  def updateFunction(current: Seq[Int], pre: Option[Int]): Option[Int] = {
    val currentValue: Int = current.sum

    //如果 pre 存在值就取出来,不存在就设置默认值为 0
    val preValue: Int = pre.getOrElse(0)

    Some(preValue + currentValue)
  }
}

Druid 连接池

package com.monk.utils

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Properties

import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource
import org.apache.log4j.Logger

/**
  * @className: DataSourceUtil
  * @description: TODO
  * @author wu ning
  * @date 2019/12/17 21:23
  */
object DataSourceUtil {

  //@transient注解将字段标记为瞬态的,不会被序列化,只会作为临时的缓存数据

  @transient
  lazy val logger = Logger.getLogger(this.getClass)

  private var dataSource: DataSource = _
  private val url: String = PropertiesUtil.getProperties("jdbc.url")
  private val user_name = PropertiesUtil.getProperties("jdbc.user")
  private val pass_word = PropertiesUtil.getProperties("jdbc.passwd")

  try {
    val props = new Properties
    props.setProperty("url", url)
    props.setProperty("username", user_name)
    props.setProperty("password", pass_word)
    //初始化大小
    props.setProperty("initialSize", "5")
    //最大连接
    props.setProperty("maxActive", "10")
    //最小连接
    props.setProperty("minIdle", "5")
    //等待时长
    props.setProperty("maxWait", "60000")
    //配置多久进行一次检测,检测需要关闭的连接 单位毫秒
    props.setProperty("timeBetweenEvictionRunsMillis", "2000")
    //配置连接在连接池中最小生存时间 单位毫秒
    props.setProperty("minEvictableIdleTimeMillis", "600000")
    //配置连接在连接池中最大生存时间 单位毫秒
    props.setProperty("maxEvictableIdleTimeMillis", "900000")
    props.setProperty("validationQuery", "select 1")
    props.setProperty("testWhileIdle", "true")
    props.setProperty("testOnBorrow", "false")
    props.setProperty("testOnReturn", "false")
    props.setProperty("keepAlive", "true")
    props.setProperty("phyMaxUseCount", "100000")
    dataSource = DruidDataSourceFactory.createDataSource(props)
  } catch {
    case e: Exception =>
      logger.error(s"设置参数出现了问题{},$e")
  }

  //获取连接
  def getConnection(): Connection = {
    try {
      dataSource.getConnection()
    } catch {
      case e: Exception =>
        logger.error(s"获取 connection 出现了问题{},$e")
        null
    }
  }

  //关闭资源
  def closeResource(resultSet: ResultSet, preStatement: PreparedStatement, connection: Connection): Unit = {
    closeResultSet(resultSet)
    closePrePareStateMent(preStatement)
    closeConnection(connection)
  }

  //关闭 ResultSet
  def closeResultSet(resultSet: ResultSet): Unit = {
    if (resultSet != null) {
      try {
        resultSet.close()
      } catch {
        case e: Exception =>
          logger.error(s"关闭 ResultSet 出现了问题{},$e")
      }
    }
  }

  //关闭 PreparedStatement
  def closePrePareStateMent(preparedStatement: PreparedStatement): Unit = {
    if (preparedStatement != null) {
      try {
        preparedStatement.close()
      } catch {
        case e: Exception =>
          logger.error(s"关闭 PrepareStatement 出现了问题{},$e")
      }
    }
  }

  //关闭 Connection
  def closeConnection(connection:Connection): Unit ={
    if(connection != null){
      try {
        connection.close()
      } catch {
        case e:Exception =>
          logger.error(s"关闭 Connection 出现了问题{},$e")
      }
    }
  }
}

sql 代理类

package com.monk.utils

import java.sql.{Connection, PreparedStatement, ResultSet}

import org.apache.log4j.Logger

/**
  * @className: SqlProxy
  * @description: TODO
  * @author wu ning
  * @date 2019/12/17 22:06
  */
class SqlProxy {

  private var rs: ResultSet = _
  private var psmt: PreparedStatement = _

  @transient lazy val logger = Logger.getLogger(this.getClass)

  /**
    * 执行更新语句
    *
    * @param connection
    * @param sql
    * @param params
    */
  def excuteUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {

    var result: Int = 0

    try {
      psmt = connection.prepareStatement(sql)

      if (params != null && params.length > 0) {
        //左闭右开
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
        }
      }

      result = psmt.executeUpdate()
    } catch {
      case e: Exception =>
        logger.error(s"执行更新语句报错:{},$e")
    }

    result
  }

  /**
    * 执行查询语句
    * @param connection
    * @param sql
    * @param params
    * @param queryCallback
    */
  def executeQuery(connection: Connection, sql: String, params: Array[Any], queryCallback: QueryCallback): Unit = {

    try {
      psmt = connection.prepareStatement(sql)

      if (params != null && params.length > 0) {
        for (i <- 0 until params.length) {
          psmt.setObject(i + 1, params(i))
        }
      }
      rs = psmt.executeQuery()

      queryCallback.process(rs)
    } catch {
      case e:Exception =>
        logger.error(s"执行查询语句报错:{},$e")
    }
  }

  def shutdown(connection: Connection): Unit = DataSourceUtil.closeResource(rs,psmt,connection)
}
package com.monk.utils

import java.sql.ResultSet

/**
  * @className: QueryCallback
  * @description: TODO
  * @author wu ning
  * @date 2019/12/17 22:04
  */
trait QueryCallback {
  def process(rs:ResultSet)
}

3.3 SparkStreaming 整合 SparkSQL

地址:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

原文地址:https://www.cnblogs.com/wuning/p/12046829.html

时间: 2024-10-08 01:21:18

SparkStreaming(一)--核心概念及算子的相关文章

Spark Streaming核心概念与编程

1. 核心概念 StreamingContext Create StreamingContext import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) //Second(1) #表示处理的批次, 当前1秒处理一次 val ssc = new StreamingContext(conf, Second

Hadoop hdfs&mapreduce核心概念

1.HDFS(分布式文件系统体系) 1.1.NameNode:(名称节点) Hdfs的守护程序 记录文件是如何分割成数据块的,以及这些数据块被存储到了哪些节点上 对内存和I/O进行集中管理 是个单点,发生故障将使集群崩溃 1.2.SecondaryNamenode(辅助名称节点):发生故障进行人工的设置才能实现集群崩溃的问题 监控HDFS状态的辅助后台程序 每个集群都有一个 与NameNode进行通讯,定期保存HDFS元数据快照 与NameNode故障可以作为备用NameNode使用 1.3.D

CSS的四个核心概念

CSS(Cascading Style Sheet)层叠样式表,又称级联样式表,是一组格式设置规则,用来进行网页风格设计.通过使用CSS样式设置页面的格式,可将页面的内容与表现形式分离.页面内容存放在HTML文档中,而用于定义表现形式的CSS规则则存放在另一个文件中或HTML文档的某一部分,通常为文件头部分.将内容与表现形式分离,不仅可使维护站点的外观更加容易,而且还可以使HTML文档代码更加简练,缩短浏览器的加载时间. CSS的核心概念有四个:标准流.盒模型.position.float,它们

Maven的几个核心概念

POM (Project Object Model) 一个项目所有的配置都放置在 POM 文件中:定义项目的类型.名字,管理依赖关系,定制插件的行为等等.比如说,你可以配置 compiler 插件让它使用 java 1.5 来编译. 示例的 POM: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

C#面向对象的核心概念

一.面向对象的核心概念 (一)抽象--面向对象的根基 讲到抽象,不得不涉及到现代科学技术的基础--数学. 数学是一门抽象的科学,面对着纷繁复杂的世间万物,数学不理会各种事物的独特特性,而只抽取它们在数量上的特性,深刻揭示了"世间万物"在数量上表现出的共同规律,抽象正是数学的本质特征. 数学的一个分支--离散数学是计算机科学的根基之一,因此,计算机科学从诞生之日起,就与数学有着密不可分的联系,抽象思维也是计算机科学的主要思维方法之一. 在使用面向对象的方法设计一个软件系统时,首先就要区分

Angularjs的核心概念

1. 客户端模板 多页面的应用通过组装和拼接服务器上的数据来生成HTML,然后输出到浏览器.Angularjs不同于此的是,传递模板和数据到浏览器,然后在浏览器端进行组装.浏览器的角色编程了只提供模板的静态资源和模板所需要的数据. <html ng-app> <head> <script src="angular.js"></script> <script src="controllers.js"><

面向对象的核心概念

1.面向对象的核心概念 从理论上说,面向对象技术拥有四大基本特性. (1)封装 这是一种隐藏信息的特征.封装这一特性不仅大大提高了代码的易用性,而且还使得类的开发者可以方便的更换新的算法,这种变化不会影响使用类的外部代码.可以用以下公式展示类的封装特性: 封装的类=数据+对此数据所进行的操作(即算法) 通俗的说,封装就是:包起外界不必需要知道的东西,只向外界展露可供展示的东西. 在面向对象理论中,封装这个概念拥有更为宽广的含义.小到一个简单的数据结构,大到一个完整的软件子系统,静态的如某软件系统

理解maven的核心概念

原文链接:http://www.cnblogs.com/holbrook/archive/2012/12/24/2830519.html 好久没进行java方面的开发了,最近又完成了一个java相关的任务,顺便重新体会了 maven 这一利器. 在使用过程中发现以前对maven的理解不够深入,借此机会重新梳理了一下maven的核心概念.相信理解了这些核心概念, 即使长时间不使用,以后再重新上手也会非常容易. 本文以类图的方式,介绍maven核心的12个概念以及相互之间的关系. Table of

Docker核心概念

Docker的三大核心概念 镜像 容器 仓库 1.Docker镜像 Docker镜像类似于虚拟机镜像,可以将它理解为一个面向Docker引擎的只读模板,包含了文件系统.镜像是创建容器的基础.通过版本管理和增量的文件系统,Docker提供了一套十分简单的机制来创建和更新现有的镜像,用户甚至可以从网上下载一个已经做好的应用镜像,并通过简单的命令就可以直接使用. 2.Docker容器 Docker容器类似于一个轻量级的沙箱(沙箱是一个虚拟的环境,在这份环境运行的程序都是独立的,不会对你的现用操作系统造