铭文一级:
第八章:Spark Streaming进阶与案例实战
updateStateByKey算子
需求:统计到目前为止累积出现的单词的个数(需要保持住以前的状态)
java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
需求:将统计结果写入到MySQL
create table wordcount(
word varchar(50) default null,
wordcount int(10) default null
);
通过该sql将统计结果写入到MySQL
insert into wordcount(word, wordcount) values(‘" + record._1 + "‘," + record._2 + ")"
存在的问题:
1) 对于已有的数据做更新,而是所有的数据均为insert
改进思路:
a) 在插入数据前先判断单词是否存在,如果存在就update,不存在则insert
b) 工作中:HBase/Redis
2) 每个rdd的partition创建connection,建议大家改成连接池
window:定时的进行一个时间段内的数据处理
window length : 窗口的长度
sliding interval: 窗口的间隔
这2个参数和我们的batch size有关系:倍数
每隔多久计算某个范围内的数据:每隔10秒计算前10分钟的wc
==> 每隔sliding interval统计前window length的值
铭文二级:
第七章:Spark Streaming核心概念与编程
实战:Spark Streaming处理文件系统数据=>
与处理socket数据类似
1.建FileWordCount类
2.建监控的路径,本次为:/Users/rocky/data/imooc/ss
3.只需修改SocketTextStream成textFileStream
参数设置为file:///Users/rocky/data/imooc/ss/ /* 前面的“///”、最后的“/” */
4.vi test.log //里面有内容,然后cp到监控的路径
nc监控6789端口即可
注意事项:
官网Basic Sources
1、必须每次相同的文件格式
2、必须使用移动的方式将内容move到路径
3、一旦移动,无法再修改里面的内容
原文地址:https://www.cnblogs.com/kkxwz/p/8378789.html