利用mapWithState实现按照首字母统计的有状态的wordCount

最近在做sparkstreaming整合kafka的时候遇到了一个问题:

可以抽象成这样一个问题:有状态的wordCount,且按照word的第一个字母为key,但是要求输出的格式为(word,1)这样的形式

举例来说:

例如第一批数据为: hello how when hello

则要求输出为:(hello,1) (how,2) (when,1) (hello,3)

第二批数据为: hello how when what hi

则要求输出为: (hello,4) (how,5) (when,2) (what,3) (hi,6)

首先了解一下mapWithState的常规用法:

ref: https://www.jianshu.com/p/a54b142067e5

http://sharkdtu.com/posts/spark-streaming-state.html

稍微总结一下mapWithState的几个tips:

  1. mapWithState是1.6版本之后推出的
  2. 必须设置checkpoint来储存历史数据
  3. mapWithState和updateStateByKey的区别 : 他们类似,都是有状态DStream操作, 区别在于,updateStateByKey是输出增量数据,随着时间的增加, 输出的数据越来越多,这样会影响计算的效率, 对CPU和内存压力较大.而mapWithState则输出本批次数据,但是也含有状态更新.
  4. checkpoint的数据会分散存储在不同的分区中, 在进行状态更新时, 首先会对当前 key 做 hash , 再到对应的分区中去更新状态 , 这种方式大大提高了效率.

解决问题的思路:

State中保存状态为(String,Int) 元组类型, 其中String为word的全量, 而Int为word的计数.

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.MapWithStateDStream
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}

object MapWithStateApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("MapWithStateApp")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("C:\\Users\\hylz\\Desktop\\checkpoint")
    val lines = ssc.socketTextStream("192.168.100.11",8888)
    val words = lines.flatMap(_.split(" "))

    def mappingFunc(key: String, value: Option[(String, Int)], state: State[(String, Int)]): (String, Int) = {
      val cnt: Int = value.getOrElse((null, 0))._2 + state.getOption.getOrElse((null, 0))._2
      val allField: String = value.getOrElse((null, 0))._1
      state.update((allField, cnt))
      (allField, cnt)
    }

    val cnt: MapWithStateDStream[String, (String, Int), (String, Int), (String, Int)] = words.map(x => (x.substring(0, 1), (x, 1))).mapWithState(StateSpec.function(mappingFunc _))

    cnt.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

测试结果如下

input: hello how when hello

input: hello how when what hi

原文地址:https://www.cnblogs.com/icecola/p/11145957.html

时间: 2024-11-08 22:35:28

利用mapWithState实现按照首字母统计的有状态的wordCount的相关文章

利用排序规则特点计算汉字笔划和取得拼音首字母

SQL SERVER的排序规则平时使用不是很多,也许不少初学者还比较陌生,但有一个错误大家应是经常碰到: SQL SERVER数据库,在跨库多表连接查询时,若两数据库默认字符集不同,系统就会返回这样的错误:     "无法解决 equal to 操作的排序规则冲突." 一.错误分析: 这个错误是因为排序规则不一致造成的,我们做个测试,比如:create table #t1(name varchar(20) collate Albanian_CI_AI_WS, value int) cr

利用MySQL数据库来处理中英文取首字母排序

很久都没有写博客了, 但是也积攒了了很多的问题, 下次找个时间一起整理一下, 然后记录下来, 以备不时之需. 我们的联系人列表经常会有按照首字母排序的需求.  这个很方便, 很清晰, 如果不说性能如何, 就像让数据库来做呢? 其实MySQL是支持的. 我们建立一个函数: DELIMITER $$ CREATE FUNCTION `fristPinyin`(P_NAME VARCHAR(255)) RETURNS varchar(255) CHARSET utf8 DETERMINISTIC BE

java首字母大写的修改方式(利用ascii编码前移的方式)

ascii编码中字母大小写之间相差32个单位,大写在前小写在后 public static String UpperCase(String name){ //将string类型转化成char类型 char []  cs = name.toCharArray(); //通过ASCII表格将首字母减32个位 cs[0]  -= 32; //输出 return String.valueOf(cs);        }

【C语言】输入一个字符串,统计其中的单词个数,将第一个单词的首字母改为大写,并输出改写后的字符串

#include<stdio.h> int main() { char a[100]; int i, j=1; printf("请输入一串字符:"); gets_s(a); for (i = 0; a[i] != '\0'; i++)/*找出单词个数*/ { if (a[i] == ' ') j += 1; } printf("单词个数:%d\n", j); if (a[0] >= 'a' && a[0] <= 'z')/*判

css实现将英文语句第一个单词首字母大写

css实现将英文语句第一个单词首字母大写:英文语句,通常第一个单词的首字母是大写的,下面就通过代码实例介绍一下如何利用css实现此功能.代码如下: <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <meta name="author" content="http://www.51texiao.cn/" /> <title&g

20150912华为机考2之&quot;输入一段字符串(英文),将每个单词首字母大写后输出&quot;

还有其他一些(隐性)要求(要不然无法通过测试): 1.如果首字母已经大写,则不用变 2.不是英文字母的不变 e.g. Input: hello world! this is _Ljj speaking! Output: Hello World! This Is _ljj Speaking! 思路写在注释里面了 /* Input a string * Output: uppercase the first character of evrey word * if already uppercase

fastjson将bean转成字符串时首字母变小写问题

一个项目需求要求返回值为JSON格式,且大多数字段是首字母大写,还有些是类似N_TX这样的格式,在输出这样的结果时遇到了问题,由于时间紧,就直接拷贝需要的结果字段建立JavaBean类,本以为最后直接调用JSON.toString(obj)返回结果即可,没想到返回值中自动将首字母变小写.查看fastjson源码发现关键在下面一段 public static List<FieldInfo> computeGetters(Class<?> clazz, Map<String, S

excel提取汉字拼音首字母

本文转载:http://jingyan.baidu.com/article/63acb44adca44461fcc17e85.html 利用Excel表格中的宏,轻松提取首字母 方法/步骤 1.启动Excel 2003(其它版本请仿照操作),打开相应的工作表: 2.执行"工具→宏→Visual Basic编辑器"命令(或者直接按"Alt+F11"组合键),进入Visual Basic编辑状态: 3.执行"插入→模块"命令,插入一个新模块.再双击插

python首字母转换成大写函数

现在有个需求: 假设用户输入的英文名字不规范,没有按照首字母大写,后续字母小写的规则,请利用map()函数,把一个list(包含若干不规范的英文名字)变成一个包含规范英文名字的list: 输入:['adam', 'LISA', 'barT']            输出:['Adam', 'Lisa', 'Bart'] 原先想法是将字符分离,第一个转换为大写,其余转换为小写: def format_name(s):     return "%s" % (s[:1].upper() +