6.SparkStreaming之WordCount(UpdateStateByKey)

代码:

import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}

object UpdateStateByKeyWordCount {  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.ERROR)    val conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount")    val ssc = new StreamingContext(conf, Seconds(5))    ssc.checkpoint("checkpointdirectory")    val lines = ssc.socketTextStream("localhost", 9999)    val words = lines.flatMap( _.split(" ")).map(word => (word, 1))    val result = words.updateStateByKey((values: Seq[Int], state: Option[Int]) => {      var newValue = state.getOrElse(0)      for(value <- values) {        newValue += value      }      Option(newValue)    })    result.print()

    ssc.start()    ssc.awaitTermination()  }}

结果:

原文地址:https://www.cnblogs.com/wddqy/p/12024343.html

时间: 2024-10-27 11:48:40

6.SparkStreaming之WordCount(UpdateStateByKey)的相关文章

sparkStreaming实现wordcount

import org.apache.spark.SparkConf import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext object  WordCount extends App { val conf=new SparkConf().setMaster("local[2]").setAppName("wordcount") val s

SparkStreaming

Spark Streaming用于流式数据的处理.Spark Streaming支持的数据输入源很多,例如:Kafka.Flume.Twitter.ZeroMQ和简单的TCP套接字等等.数据输入后可以用Spark的高度抽象原语如:map.reduce.join.window等进行运算.而结果也能保存在很多地方,如HDFS,数据库等 和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream.DStream

第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密

Spark Streaming的DStream为我们提供了一个updateStateByKey方法,它的主要功能是可以随着时间的流逝在Spark Streaming中为每一个key维护一份state状态,通过更新函数对该key的状态不断更新.对每一个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新(对每个新出现的key,会同样执行state的更新函数操作),但是如果通过更新函数对state更新后返回none

第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

什么是state(状态)管理?我们以wordcount为例.每个batchInterval会计算当前batch的单词计数,那如果需要单词计数一直的累加下去,该如何实现呢?SparkStreaming提供了两种方法:updateStateByKey和mapWithState .mapWithState 是1.6版本新增功能,目前属于实验阶段.mapWithState具官方说性能较updateStateByKey提升10倍.那么我们来看看他们到底是如何实现的. 代码示例如下: object Upda

SparkStreaming基础

* SparkStreaming基础 打开之前构建好的Maven工程,如何构建?请参看SparkCore基础(二)的最后部分. 在SparkCore中,我们操作的数据都在RDD中,是Spark的一个抽象概念,也是一个抽象类,是由SparkContext对象sc转换得到的. 那么在SparkStreaming中,我们使用的Spark的StreamingContext对象,简称ssc. 我们本节内容以动手为基础,直接开始一些测试案例:具体的框架结构请参看官方文档,写的非常之详细. SparkStre

SparkStreaming的实战案例

废话不多说,直接上干货!!!相关依赖: <properties> <project.build.sourceEncoding>UTF8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding

利用mapWithState实现按照首字母统计的有状态的wordCount

最近在做sparkstreaming整合kafka的时候遇到了一个问题: 可以抽象成这样一个问题:有状态的wordCount,且按照word的第一个字母为key,但是要求输出的格式为(word,1)这样的形式 举例来说: 例如第一批数据为: hello how when hello 则要求输出为:(hello,1) (how,2) (when,1) (hello,3) 第二批数据为: hello how when what hi 则要求输出为: (hello,4) (how,5) (when,2

Update:SparkStreaming原理_运行过程_高级特性

Spark Streaming 导读 介绍 入门 原理 操作 Table of Contents 1. Spark Streaming 介绍 2. Spark Streaming 入门 2. 原理 3. 操作 1. Spark Streaming 介绍 导读 流式计算的场景 流式计算框架 Spark Streaming 的特点 新的场景 通过对现阶段一些常见的需求进行整理, 我们要问自己一个问题, 这些需求如何解决? 场景 解释 商品推荐 京东和淘宝这样的商城在购物车, 商品详情等地方都有商品推

SparkStreaming DStream转换

1.无状态转换操作 (1)无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转换DStream中的每一个RDD. 部分无状态转化操作: (2)尽管这些函数韩起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上. 例如:reduceByKey()会化简每个事件区间中的数据,但不会化简不同区间之间的数据. (3)在wordcount中,我们只会统计几秒内接收到的数据的单词个数,而不会累加 (4)无状态转化操作也能