大数据学习——spark-steaming学习

官网http://spark.apache.org/docs/latest/streaming-programming-guide.html

1.1.  用Spark Streaming实现实时WordCount

1.安装并启动生成者

首先在一台Linux(ip:192.168.10.101)上用YUM安装nc工具

yum install -y nc

启动一个服务端并监听9999端口

nc -lk 9999

2.编写Spark Streaming程序

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object NetworkWordCount {
  def main(args: Array[String]) {
    //设置日志级别
    LoggerLevel.setStreamingLogLevels()
    //创建SparkConf并设置为本地模式运行
    //注意local[2]代表开两个线程
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    //设置DStream批次时间间隔为2秒
    val ssc = new StreamingContext(conf, Seconds(2))
    //通过网络读取数据
    val lines = ssc.socketTextStream("192.168.10.101", 9999)
    //将读到的数据用空格切成单词
    val words = lines.flatMap(_.split(" "))
    //将单词和1组成一个pair
    val pairs = words.map(word => (word, 1))
    //按单词进行分组求相同单词出现的次数
    val wordCounts = pairs.reduceByKey(_ + _)
    //打印结果到控制台
    wordCounts.print()
    //开始计算
    ssc.start()
    //等待停止
    ssc.awaitTermination()
  }
}

3.启动Spark Streaming程序:由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序

注意:要指定并行度,如在本地运行设置setMaster("local[2]"),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1

4.在Linux端命令行中输入单词

5.在IDEA控制台中查看结果

问题:结果每次在Linux段输入的单词次数都被正确的统计出来,但是结果不能累加!如果需要累加需要使用updateStateByKey(func)来更新状态,下面给出一个例子:

package cn.itcast.spark.streaming

import cn.itcast.spark.util.LoggerLevel
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Seconds}

object NetworkUpdateStateWordCount {
  /**
    * String : 单词 hello
    * Seq[Int] :单词在当前批次出现的次数
    * Option[Int] : 历史结果
    */
  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
    iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
  }

  def main(args: Array[String]) {
    LoggerLevel.setStreamingLogLevels()
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    //做checkpoint 写入共享存储中
    ssc.checkpoint("c://aaa")
    val lines = ssc.socketTextStream("192.168.10.100", 9999)
    //reduceByKey 结果不累加
    //val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
    //updateStateByKey结果可以累加但是需要传入一个自定义的累加函数:updateFunc
    val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

1.1.  使用reduceByKeyAndWindow计算每分钟数据

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by wangsenfeng on 2016/10/27.
  */
object SparkSqlTest {
  def main(args: Array[String]) {
    LoggerLevels.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("sparksql").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("./")
    val textStream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.56.151",9999)
    val result: DStream[(String, Int)] = textStream.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(5),Seconds(5))
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

1.1.  Spark Streaming整合Kafka完成网站点击流实时统计

1.安装并配置zk

2.安装并配置Kafka

3.启动zk

4.启动Kafka

5.创建topic

bin/kafka-topics.sh --create --zookeeper node1.itcast.cn:2181,node2.itcast.cn:2181 \

--replication-factor 3 --partitions 3 --topic urlcount

6.编写Spark Streaming应用程序

package cn.itcast.spark.streaming

package cn.itcast.spark

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UrlCount {
  val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
    iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
  }

  def main(args: Array[String]) {
    //接收命令行中的参数
   // val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
val Array(zkQuorum, groupId, topics, numThreads) = Array[String]("master1ha:2181,master2:2181,master2ha:2181","g1","wangsf-test","2")
    //创建SparkConf并设置AppName
    val conf = new SparkConf().setAppName("UrlCount")
    //创建StreamingContext
    val ssc = new StreamingContext(conf, Seconds(2))
    //设置检查点
    ssc.checkpoint(hdfs)
    //设置topic信息
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //重Kafka中拉取数据创建DStream
    val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
    //切分数据,截取用户点击的url
    val urls = lines.map(x=>(x.split(" ")(6), 1))
    //统计URL点击量
    val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    //将结果打印到控制台
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

生产数据测试:

kafka-console-producer.sh --broker-list h2slave1:9092 --topic wangsf-test

原文地址:https://www.cnblogs.com/feifeicui/p/11017411.html

时间: 2024-10-10 17:54:48

大数据学习——spark-steaming学习的相关文章

我对大数据相关技术的学习心得及理解

本篇为这一段时间以来我对大数据相关技术的学习心得及理解,主要涉及以下几个方面: noSql, 集群, 数据挖掘, 机器学习,云计算,大数据,以及Hadoop和Spark.主要都是一些初级的概念澄清之类的东西,并且比较凌乱,慎入.* 1. NoSQL我的理解是,NoSQL主要用于存储一些非结构化数据, 是关系数据库与文件存储方式(比如视频文件就适合使用文件的方式存储)的过度. ** 1.1 NoSQL的分类: 列存储: Hbase(BigTable的开源实现),可存储结构化数据. Cassandr

如何快速掌握大数据知识,按照学习路线来

第一阶段:linux 系统 这章是基础课程,帮大家进入大数据领域打好 Linux 基础,以便更好地学习 Hadoop, NOSQL, Oracle, MYSQL, Spark, Storm 等众多课程.因为企业中 无一例外的是使用 Linux 来搭建或部署项目. 第二阶段:大型网站高并发处理 通过本章的学习大家将会了解大数据的源头,数据从何而来,继而更好的了解大数据.并且通过学习如果处理大型网站高并发问题反向更深入的学习了 Linux 同时站在了更高的角度去触探了架构. 第三阶段:Hadoop

大数据下的“商业学习实验” 能否改变超市业?

浓缩观点 消费动力不足.利润下滑.电商侵蚀,大型超市的突围之路在哪里?掌握以大数据分析为基础的“商业实验学习”的5个方向,将给零售企业带来数以亿计的经济效益. 零售企业可能每天都会产生令人兴奋但有些风险的创意,这些点子要不要做?效果又会如何? 大数据下的“商业学习实验” 台湾的全家便利店,在台湾拥有2000多家门店.它们在调查中发现,消费者对现煮咖啡有显著需求.对全家而言,是贸然在所有门店都提供现煮咖啡,还是将这些咖啡机将优先添置在某些店铺?现煮咖啡大卖的同时,是否会挤占店里其他咖啡及饮料产品的

【互动问答分享】第8期决胜云计算大数据时代Spark亚太研究院公益大讲堂

“决胜云计算大数据时代” Spark亚太研究院100期公益大讲堂 [第8期互动问答分享] Q1:spark线上用什么版本好? 建议从最低使用的Spark 1.0.0版本,Spark在1.0.0开始核心API已经稳定: 从功能的角度考虑使用最新版本的Spark 1.0.2也是非常好的,Spark 1.0.2在Spark 1.0.1的基础上做了非常多的改进: Spark 1.0.2改进参考 http://spark.apache.org/releases/spark-release-1-0-2.ht

【互动问答分享】第2期决胜云计算大数据时代Spark亚太研究院公益大讲堂

"决胜云计算大数据时代" Spark亚太研究院100期公益大讲堂 [第2期互动问答分享] Q1:新手学习spark如何入手才好? 先学习Scala的内容,强烈推荐<快学Scala>: 然后按照我们免费发布的"云计算分布式大数据Spark实战高手之路(共3本书)"循序渐进的学习即可,其中"云计算分布式大数据Spark实战高手之路---从零开始"涵盖了Spark1.0的所有主题:包括Spark集群的构建,Spark架构设计.Spark内核

【互动问答分享】第3期决胜云计算大数据时代Spark亚太研究院公益大讲堂

决胜云计算大数据时代" Spark亚太研究院100期公益大讲堂 [第3期互动问答分享] Q1: groupbykey是排好序的吗?分组排序 怎么实现? groupByKey在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集,所以是没有排序的: 要想分组排序,首先要使用groupByKey完成分组功能,然后使用sortWith这个函数对指完成排序实现: 完整代码如下所示: spark.textFile(...).groupByKey().map{p => val 

【互动问答分享】第5期决胜云计算大数据时代Spark亚太研究院公益大讲堂

Spark亚太研究院100期公益大讲堂 [第5期互动问答分享] Q1:spark怎样支持即席,应该不是spark sql吧,是hive on spark么? Spark1.0 以前支持即席查询的技术是Shark; Spark 1.0和 Spark 1.0.1支持的即席查询技术是Spark SQL; 尚未发布的Spark 1.1开始 Spark SQL是即席查询的核心,我们期待Hive on Spark也能够支持即席查询: Q2:现在spark 1.0.0版本是支持hive on spark么,它

【互动问答分享】第15期决胜云计算大数据时代Spark亚太研究院公益大讲堂

"决胜云计算大数据时代" Spark亚太研究院100期公益大讲堂 [第15期互动问答分享] Q1:AppClient和worker.master之间的关系是什么? :AppClient是在StandAlone模式下SparkContext.runJob的时候在Client机器上应       用程序的代表,要完成程序的registerApplication等功能: 当程序完成注册后Master会通过Akka发送消息给客户端来启动Driver: 在Driver中管理Task和控制Work

【互动问答分享】第10期决胜云计算大数据时代Spark亚太研究院公益大讲堂

"决胜云计算大数据时代" Spark亚太研究院100期公益大讲堂 [第10期互动问答分享] Q1:Spark on Yarn的运行方式是什么? Spark on Yarn的运行方式有两种:Client和Cluster模式 Client模式如下所示: Cluster模式如下所示: Q2:Yarn的框架内部是如何实现的? Yarn是一个框架,内部实现好了RM和NM: 公开课: 上海:9月26-28日,<决胜大数据时代:Hadoop.Yarn.Spark企业级最佳实践> 北京:

【互动问答分享】第17期决胜云计算大数据时代Spark亚太研究院公益大讲堂

"决胜云计算大数据时代" Spark亚太研究院100期公益大讲堂 [第17期互动问答分享] Q1:为了加快spark shuffle 的执行速度是否可以把spark_local_dirs 指向一块固态硬盘上面,这样做是否有效果. 可以把spark_local_dirs指向一块固态硬盘上面,这样会非常有效的提升Spark执行速度: 同时想更快的提升Spark运行速度的话可以指定多个Shuffle输出的目录,让Shuffle并行读写磁盘: Q2:solidation=true只是在同一机器