Spark Streaming的样本demo统计

废话不多说,直接上代码

package com.demo;

import java.util.List;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;

import scala.Tuple2;

public class NetWorkWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) {
        //屏蔽日志
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);

        // Create the context with a 1 second batch size
        SparkConf sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

        // Create a JavaReceiverInputDStream on target ip:port and count the
        // words in input stream of \n delimited text (eg. generated by ‘nc‘)
        // Note that no duplication in storage level only for running locally.
        // Replication necessary in distributed scenario for fault tolerance.
        JavaReceiverInputDStream<String> lines = ssc.socketTextStream("192.168.49.151",9999, StorageLevels.MEMORY_AND_DISK_SER);
        //增加checkpoint
        ssc.checkpoint("/home/dinpay/stream/checkpoint");
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
          @Override
          public Iterable<String> call(String x) {
            return Lists.newArrayList(SPACE.split(x));
          }
        });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
          new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) {
              return new Tuple2<String, Integer>(s, 1);
            }
          });
        //无状态统计计算
        JavaPairDStream<String, Integer> nostat =  wordCounts.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
              return i1 + i2;
            }
          });

        //有状态统计计算
        JavaPairDStream<String, Integer> stat =  wordCounts.updateStateByKey(new Function2<List<Integer>, Optional<Integer>,
                Optional<Integer>>() {
            @Override
            public Optional<Integer> call(List<Integer> values, Optional<Integer> state){
                Integer updateValue = 0;
                if(state.isPresent()){
                    updateValue = state.get();
                }
                for (Integer value : values) {
                    updateValue += value;
                }
                return Optional.of(updateValue);
            }
        });

        //窗口计算 滑动10秒 统计窗口长度是15秒
        JavaPairDStream<String, Integer> windowstat = wordCounts
                .reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
                      @Override
                      public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                      }
                }, Durations.seconds(15), Durations.seconds(30));

        //nostat.print();
        //stat.print();
        windowstat.print();

        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }

}
时间: 2024-10-14 00:31:05

Spark Streaming的样本demo统计的相关文章

第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手机.

IMF传奇行动第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理

 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手机.平板.智能设备.物联网等会产生很多实时数据,数据流无处不在. 在大数据时代SparkStreaming能做什么? 平时用户都有网上购物的经历,用户在

Spark Streaming:大规模流式数据处理的新贵(转)

原文链接:Spark Streaming:大规模流式数据处理的新贵 摘要:Spark Streaming是大规模流式数据处理的新贵,将流式计算分解成一系列短小的批处理作业.本文阐释了Spark Streaming的架构及编程模型,并结合实践对其核心技术进行了深入的剖析,给出了具体的应用场景及优化方案. 提到Spark Streaming,我们不得不说一下BDAS(Berkeley Data Analytics Stack),这个伯克利大学提出的关于数据分析的软件栈.从它的视角来看,目前的大数据处

第82课 Spark Streaming第一课 案例动手实战并在电光石火间理解其工作原理

本课内容提要: (1)什么是流处理以及Spark Streaming主要介绍 (2)Spark Streaming初体验 一.什么是流处理以及Spark Streaming主要介绍 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.

第82讲:Spark Streaming第一讲:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 3.案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 B:创建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874 3.编写Pom文件 <?xml v

&lt;Spark Streaming&gt;&lt;Flume&gt;&lt;Integration&gt;

Overview Flume:一个分布式的,可靠的,可用的服务,用于有效地收集.聚合.移动大规模日志数据 我们搭建一个flume + Spark Streaming的平台来从Flume获取数据,并处理它. 有两种方法实现:使用flume-style的push-based方法,或者使用自定义的sink来实现pull-based方法. Approach 1: Flume-style Push-based Approach flume被设计用来在Flume agents之间推信息,在这种方式下,Spa

zeppelin中运行spark streaming kakfa &amp; 实时可视化

notebook方式运行spark程序是一种比较agile的方式,一方面可以体验像spark-shell那样repl的便捷,同时可以借助notebook的作图能力实现快速数据可视化,非常方便快速验证和demo.notebook有两种选择,一种是ipython notebook,主要针对pyspark:另一种是zeppelin,可以执行scala spark,pyspark以及其它执行引擎,包括hive等.比较而言,ipython notebook的可视化能力更强,zeppelin的功能更强.这里

Spark Streaming初步使用以及工作原理详解

在大数据的各种框架中,hadoop无疑是大数据的主流,但是随着电商企业的发展,hadoop只适用于一些离线数据的处理,无法应对一些实时数据的处理分析,我们需要一些实时计算框架来分析数据.因此出现了很多流式实时计算框架,比如Storm,Spark Streaming,Samaz等框架,本文主要讲解Spark Streaming的工作原理以及如何使用. 一.流式计算 1.什么是流? Streaming:是一种数据传送技术,它把客户机收到的数据变成一个稳定连续的流,源源不断地送出,使用户听到的声音或看