Spark Streaming带状态更新

带状态的更新是使用的updateStateByKey方法,里面传入一个函数,函数要自己写,注意需要设置checkpoint

import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 需要设置checkpoint
  * 有状态的计算
  */
class UpdataByKey {

}
object UpdataByKey{
    //自定义函数进行带状态更新
  def addFunc (currValue:Seq[Int],point:Option[Int])={
    Some(currValue.sum+point.getOrElse(0));
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UpdataByKey").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(10))
    val topics = "xiaopeng";
    val topicMap = topics.split(",").map((_,2)).toMap
    val lines = KafkaUtils.createStream(ssc,"192.168.10.219:2181","han",topicMap)
    val words = lines.flatMap(line =>line._2.split(" ")).map(word =>(word,1))
    words.updateStateByKey[Int](addFunc _)
    words.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
时间: 2024-08-04 12:13:23

Spark Streaming带状态更新的相关文章

spark streaming测试之三有状态的接收数据

测试思路: 首先,使用上篇文章的程序一发送网络数据: 其次,运行spark程序,观察效果. 说明: 1. 这里使用到了更新函数: 2. 使用检查点来保证状态. sparkStreaming import org.apache.log4j.{LoggerLevel} import org.apache.spark.streaming.{SecondsStreamingContext} import org.apache.spark.{SparkContextSparkConf} import or

spark 笔记 13: 再看DAGScheduler,stage状态更新流程

当某个task完成后,某个shuffle Stage X可能已完成,那么就可能会一些仅依赖Stage X的Stage现在可以执行了,所以要有响应task完成的状态更新流程. =======================DAG task完成后的更新流程=================== ->CoarseGrainedSchedulerBackend::receiveWithLogging  --调度器的事件接收器 ->case StatusUpdate(executorId, taskId

(转)用Flink取代Spark Streaming!知乎实时数仓架构演进

转:https://mp.weixin.qq.com/s/e8lsGyl8oVtfg6HhXyIe4A AI 前线导读:“数据智能” (Data Intelligence) 有一个必须且基础的环节,就是数据仓库的建设,同时,数据仓库也是公司数据发展到一定规模后必然会提供的一种基础服务.从智能商业的角度来讲,数据的结果代表了用户的反馈,获取结果的及时性就显得尤为重要,快速的获取数据反馈能够帮助公司更快的做出决策,更好的进行产品迭代,实时数仓在这一过程中起到了不可替代的作用. 更多优质内容请关注微信

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费.在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase

Dataflow编程模型和spark streaming结合

Dataflow编程模型和spark streaming结合 主要介绍一下Dataflow编程模型的基本思想,后面再简单比较一下Spark  streaming的编程模型 == 是什么 == 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的dataflow model 流式计算框架处理框架很多,也有大量的模型/框架号称能较好的处理流式和批量计算场景,比如Lambda模型,比如Spark等等,那么dataflow模型有什么特别的呢? 这就要要从

spark streaming (二)

一.基础核心概念 1.StreamingContext详解 (一) 有两种创建StreamingContext的方式:             val conf = new SparkConf().setAppName(appName).setMaster(master);             val ssc = new StreamingContext(conf, Seconds(1)); StreamingContext, 还可以使用已有的SparkContext来创建         

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

本期内容: 1.updateStateByKey解密 2.mapWithState解密 背景:整个Spark Streaming是按照Batch Duractions划分Job的.但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD, 所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成

第4课:Spark Streaming的Exactly Once的事务处理

本期内容: Exactly once 输出不重复 Exactly once 1,事务一定会被处理,且只被处理一次: 2,输出能够输出且只会被输出. Receiver:数据通过BlockManager写入内存+磁盘或者通过WAL来保证数据的安全性. WAL机制:写数据时先通过WAL写入文件系统然后存储的Executor(存储在内存和磁盘中,由StorageLevel设定),假设前面没有写成功后面一定不会存储在Executor,如不存在Executor中的话,汇报Driver数据一定不被处理.WAL

Spark Streaming实践和优化

发表于:<程序员>杂志2016年2月刊.链接:http://geek.csdn.net/news/detail/54500 作者:徐鑫,董西成 在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎.其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎.如图1所示,Spark Streaming支持的数据源有很多,如Kafka.Flume.TCP等.Spark Streaming的内部数据表示形式为DStrea