废话不多说,直接上代码
package com.demo; import java.util.List; import java.util.regex.Pattern; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.StorageLevels; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import com.google.common.base.Optional; import com.google.common.collect.Lists; import scala.Tuple2; public class NetWorkWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { //屏蔽日志 Logger.getLogger("org.apache.spark").setLevel(Level.OFF); // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by ‘nc‘) // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream("192.168.49.151",9999, StorageLevels.MEMORY_AND_DISK_SER); //增加checkpoint ssc.checkpoint("/home/dinpay/stream/checkpoint"); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); //无状态统计计算 JavaPairDStream<String, Integer> nostat = wordCounts.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //有状态统计计算 JavaPairDStream<String, Integer> stat = wordCounts.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state){ Integer updateValue = 0; if(state.isPresent()){ updateValue = state.get(); } for (Integer value : values) { updateValue += value; } return Optional.of(updateValue); } }); //窗口计算 滑动10秒 统计窗口长度是15秒 JavaPairDStream<String, Integer> windowstat = wordCounts .reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }, Durations.seconds(15), Durations.seconds(30)); //nostat.print(); //stat.print(); windowstat.print(); ssc.start(); ssc.awaitTermination(); ssc.close(); } }
时间: 2024-10-14 00:31:05