updateStateByKey--word count

http://blog.selfup.cn/619.html

private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
    StreamingExamples.setStreamingLogLevels();
 
    JavaStreamingContext jssc = new JavaStreamingContext("local[2]",
          "JavaNetworkWordCount", new Duration(10000));
    jssc.checkpoint(".");//使用updateStateByKey()函数需要设置checkpoint
    //打开本地的端口9999
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
    //按行输入,以空格分隔
    JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(SPACE.split(line)));
    //每个单词形成pair,如(word,1)
    JavaPairDStream<String, Integer> pairs = words.mapToPair(word -> new Tuple2<>(word, 1));
    //统计并更新每个单词的历史出现次数
    JavaPairDStream<String, Integer> counts = pairs.updateStateByKey((values, state) -> {
        Integer newSum = state.or(0);
        for(Integer i :values) {
            newSum += i;
        }
        return Optional.of(newSum);
    });
    counts.print();
    jssc.start();
    jssc.awaitTermination();
}
时间: 2024-10-06 14:58:18

updateStateByKey--word count的相关文章

word count程序,以及困扰人的宽字符与字符

一个Word Count程序,由c++完成,有行数.词数.能完成路径下文件的遍历. 遍历文件部分的代码如下: void FindeFile(wchar_t *pFilePath) { CFileFind finder; CString Finddir; Finddir.Format(pFilePath); BOOL ret = finder.FindFile(Finddir); while (ret) { ret = finder.FindNextFile(); CString strPath

Hadoop AWS Word Count 样例

在AWS里用Elastic Map Reduce 开一个Cluster 然后登陆master node并编译下面程序: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import o

Learn ZYNQ(10) &ndash; zybo cluster word count

1.配置环境说明 spark:5台zybo板,192.168.1.1master,其它4台为slave hadoop:192.168.1.1(外接SanDisk ) 2.单节点hadoop测试: 如果出现内存不足情况如下: 查看当前虚拟内存容量: free -m cd /mnt mkdir swap cd swap/ 创建一个swap文件 dd if=/dev/zero of=swapfile bs=1024 count=1000000 把生成的文件转换成swap文件 mkswap swapfi

软件工程第三个程序:“WC项目” —— 文件信息统计(Word Count ) 命令行程序

软件工程第三个程序:“WC项目” —— 文件信息统计(Word Count ) 命令行程序 格式:wc.exe [parameter][filename] 在[parameter]中,用户通过输入参数与程序交互,需实现的功能如下: 1.基本功能 支持 -c 统计文件字符数支持 -w 统计文件单词数支持 -l 统计文件总行数 2.拓展功能 支持 -a 返回高级选项(代码行 空行 注释行)支持 -s 递归处理符合条件的文件 3.高级功能 支持 -x 程序以图形界面与用户交互 [filename] 是

通过简单的Word Count讲解MapReduce原理以及Java实现

MapReduce原理: MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapReduce就是"任务的分解与结果的汇总". 在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker:另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的.一个Hadoop集群中

Hadoop AWS Word Count 例子

在AWS里用Elastic Map Reduce 开一个Cluster 然后登陆master node并编译以下程序: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import o

c语言简单实现word count功能

c语言简单实现word count功能 一:源码参考  参考地址:https://home.cnblogs.com/u/sunbuqiao/ 二:阅读               代码主要思路是先选定文件,将文件中的字符读入数组,利用for循环分别统计字符数.单词数.空格数.行数.实现过程使用了fseek函数判断指针用于判断数据总长度,根据转移字符判断行数. 三:代码上传                    地址:https://github.com/meinumber1

mac上eclipse上运行word count

1.打开eclipse之后,建立wordcount项目 package wordcount; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.had

Hive Word count

--https://github.com/slimandslam/pig-hive-wordcount/blob/master/wordcount.hql DROP TABLE myinput; DROP TABLE wordcount; CREATE TABLE myinput (line STRING); -- Load the text from the local (Linux) filesystem. This should be changed to HDFS -- for any

Spark的word count

word count 1 package com.spark.app 2 3 import org.apache.spark.{SparkContext, SparkConf} 4 5 /** 6 * Created by Administrator on 2016/7/24 0024. 7 */ 8 object WordCount { 9 def main(args: Array[String]) { 10 /** 11 * 第1步:创建Spark的配置对象SparkConf,设置Spark