代码:
import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext} object UpdateStateByKeyWordCount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount") val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("checkpointdirectory") val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap( _.split(" ")).map(word => (word, 1)) val result = words.updateStateByKey((values: Seq[Int], state: Option[Int]) => { var newValue = state.getOrElse(0) for(value <- values) { newValue += value } Option(newValue) }) result.print() ssc.start() ssc.awaitTermination() }} 结果:
原文地址:https://www.cnblogs.com/wddqy/p/12024343.html
时间: 2024-10-27 11:48:40