spark streaming的应用

  今天我们讲spark streaming的应用,这个是实时处理的,类似于Storm以及Flink相关的知识点,

  说来也巧,今天的自己也去听了关于Flink的相关的讲座,可惜自己没有听得特别清楚,好像是

  spark streaming与flink是竞争关系,好了,我们进入今天的主题吧

    1.一般会做用户画像的差不多集中在两个行业,电商以及广告行业

     一般根据现实给这个人打上一个标签,在根据标签来确定画像

    2.如果一个人不登录,怎样确定这个人的详情

    

    这个就是spark streaming的应用

    nc -lk 8888 这个端口可以一直发送数据

    请记住,spark中产生的rdd,可能会由于某种意外的原因,从而这个计算可能就要重新开始计算,

    但是假如我们设置了checkpoint(如果多个进程同时开始的话,我们可以搞一个共享存储)的时候,

    就可以保存这个值,当再一次出现意外的时候,就可以从恢复的这个值重新读取

      对于map来说,可以map(),同时也可以map{},这样的两种表达形式,不过当我们写成了case()的

      这种形式,则我们必须使用map的大括号的这种形式了,后文附带代码

    

package cn.wj.spark.day09

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
  * Created by WJ on 2017/1/18.
  */
object StateFulWordCount {

  //Seq这个批次某个单词的次数
   //Option[Int]:以前的结果
  //(hello,1),(hello,1),(tom,1)
  //(hello,Seq(1,1)),(tom,Seq(1))
  //此时x=>String(Key的值),y=>Seq[Int](当前的这个value的值),z=>Option[Int],这个代表的是以前的value的值
  val updateFunc = (iter:Iterator[(String,Seq[Int],Option[Int])]) =>{
     iter.flatMap{case(x,y,z) => Some(y.sum+z.getOrElse(0)).map(m =>(x,m))}
   }

  def main(args: Array[String]): Unit = {
    LoggerLevels.setStreamingLogLevels()
    //StreamingContext
    val conf = new SparkConf().setAppName("StreamingContext").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("/tmp/ck")
   // sc.setCheckpointDir("hdfs://192.168.109.136:9000/person/myfile")
    val ssc = new StreamingContext(sc,Seconds(5))
    val ds = ssc.socketTextStream("192.168.109.136",8888)
    //updateStateByKey:这个方法的意思是说将每一次的partition都进行一次累计
    val result = ds.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(sc.defaultParallelism),true)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

   其中,LoggerLevels.setStreamingLogLevels()这个是设置日志文件的显示情况的,是让打出来的日志更清晰,

   如果没必要,可以删除的。

   首先我们在linux里面向8888端口发送信息:

   

   然后启动项目,这个时候就可以看见这个效果了(可以叠加的spark streaming)

   

时间: 2024-08-05 11:50:43

spark streaming的应用的相关文章

Dataflow编程模型和spark streaming结合

Dataflow编程模型和spark streaming结合 主要介绍一下Dataflow编程模型的基本思想,后面再简单比较一下Spark  streaming的编程模型 == 是什么 == 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的dataflow model 流式计算框架处理框架很多,也有大量的模型/框架号称能较好的处理流式和批量计算场景,比如Lambda模型,比如Spark等等,那么dataflow模型有什么特别的呢? 这就要要从

Spark Streaming源码解读之Job详解

一:Spark Streaming Job生成深度思考 1. 做大数据例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务.例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理. 2. JobGenerator构造的时候有一个核心的参数是jobScheduler, jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 B:创建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874 3.编写Pom文件 <?xml v

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用.训练机器学习模型的应用, 还有自动检测异常的应用.Spark Streaming 是 Spark 为这些应用而设计的模型.它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码. Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream. DStream 是随时间推移而收到的数据的序列.在内部,每个时间区间收到

spark streaming (二)

一.基础核心概念 1.StreamingContext详解 (一) 有两种创建StreamingContext的方式:             val conf = new SparkConf().setAppName(appName).setMaster(master);             val ssc = new StreamingContext(conf, Seconds(1)); StreamingContext, 还可以使用已有的SparkContext来创建         

关于IDEA开发环境下的Kafka+Spark Streaming的classpath配置方式

一.前言 在使用Spark Streaming中的Kafka Direct API进行Kafka消费的过程中,通过spark-submit的方式提交jar包,会出现如下错误信息,提示无法找到KafkaUtils. Exceptionin thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at com.zhkmxx.scala.app.KafkaStream

spark streaming kafka1.4.1中的低阶api createDirectStream使用总结

转载:http://blog.csdn.net/ligt0610/article/details/47311771 由于目前每天需要从kafka中消费20亿条左右的消息,集群压力有点大,会导致job不同程度的异常退出.原来使用spark1.1.0版本中的createStream函数,但是在数据处理速度跟不上数据消费速度且job异常退出的情况下,可能造成大量的数据丢失.幸好,Spark后续版本对这一情况有了很大的改进,1.2版本加入WAL特性,但是性能应该会受到一些影响(本人未测试),1.3版本可

Spark Streaming写数据到Redis

参考2篇文章: 1.Kafka+Spark Streaming+Redis实时系统实践 https://www.iteblog.com/archives/1378 2.spark-stream 访问 Redis http://www.tuicool.com/articles/n6BRzi3

Spark Streaming中空RDD处理及流处理程序优雅的停止

本期内容 : Spark Streaming中的空RDD处理 Spark Streaming程序的停止 由于Spark Streaming的每个BatchDuration都会不断的产生RDD,空RDD有很大概率的,如何进行处理将影响其运行的效率.资源的有效使用. Spark Streaming会不断的接收数据,在不清楚接收的数据处理到什么状态,如果你强制停止掉的话,会涉及到数据不完整操作或者一致性相关问题. 一. Spark Streaming中的空RDD处理 : ForEachRDD是产生Ds