Spark系列(二) Spark Shell各种操作及详细说明

  • 并行化scala集合(Parallelize)

//加载数据1~10

val num=sc.parallelize(1 to 10)

//每个数据项乘以2,注意 _*2记为一个函数(fun)

val doublenum = num.map(_*2)

//内存缓存数据

doublenum.cache()

//过滤数据,每个数据项 % 3 为0的数据为结果集;

val threenum = doublenum.filter(_ % 3 == 0)

//释放缓存

threenum.unpersist()

//出发action操作根据前面的步骤构建DAG并执行,以数据的形式返回结果集;

threenum.collect

//返回结果集中的第一个元素

threenum.first

//返回结果集中的前三个元素

threenum.take(3)

//对数据集中的元素个数统计

threenum.count

//查看以上步骤经过的RDD转换过程

threenum.toDebugString

结果:

  • K-V类型数据演示

// 加载数据

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

//根据数据集中的每个元素的K值对数据排序

kv1.sortByKey().collect

kv1.groupByKey().collect //根据数据集中的每个元素的K值对数据分组

kv1.reduceByKey(_+_).collect

注意:sortByKey 、groupByKey 、reduceByKey之间的结果集的区别;

val kv2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5)))

kv2.distinct.collect // distinct操作去重

kv1.union(kv2).collect //kv1与kv2联合

kv1.join(kv2).collect //kv1与kv2两个数据连接,相当于表的关联

val kv3=sc.parallelize(List(List(1,2),List(3,4)))

kv3.flatMap(x=>x.map(_+1)).collect //注意这里返回的数据集已经不是K-V类型了

  • HDFS文件操作演示

先将clk.tsv和reg.tsv文件上传到hdfs,文件格式如下;

// 定义一个对日期格式化的常量

val format = new java.text.SimpleDateFormat("yyyy-MM-dd")

// scala语法,定义Register类(根据reg.tsv数据格式)

case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float)

// scala语法,定义Click类(根据clk.tsv数据格式)

case class Click (d: java.util.Date, uuid: String, landing_page: Int)

// 加载hdfs上的文件reg.tsv并将每行数据转换为Register对象;

val reg = sc.textFile("hdfs://chenx:9000/week2/join/reg.tsv").map(_.split("\t")).map(r => (r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat)))

// 加载hdfs上的文件clk.tsv并将每行数据转换为Click对象;

val clk = sc.textFile("hdfs://chenx:9000/week2/join/clk.tsv").map(_.split("\t")).map(c => (c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt)))

reg.join(clk).collect

时间: 2024-10-17 08:47:43

Spark系列(二) Spark Shell各种操作及详细说明的相关文章

spark 教程二 spark中的一些术语和概念

1.Application:基于spark的用户程序,包含了一个driver program 和集群中多个 executor 2.Driver Program:运行application的main()函数并自动创建SparkContext.通常SparkContext 代表driver program 3.Executor:为某个Application运行在worker node 上的一个进程.该进程负责运行task并负责将数据存储在内存或者硬盘上,每个application 都有自己独立的 e

Docker教程系列二:Docker镜像操作

1什么是Docker镜像 Docker镜像是由文件系统叠加而成(是一种文件的存储形式).最底端是一个文件引导系统,即bootfs,这很像典型的Linux/Unix的引导文件系统.Docker用户几乎永远不会和引导系统有什么交互.实际上,当一个容器启动后,它将会被移动到内存中,而引导文件系统则会被卸载,以留出更多的内存供磁盘镜像使用.Docker容器启动是需要的一些文件,而这些文件就可以称为Docker镜像. 2列出镜像 列出docker下的所有镜像:docker images l  REPOSI

Spark系列之二——一个高效的分布式计算系统

1.什么是Spark? Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MadReduce所具有的优点:但不同于MapReduce的是Job中间输出的结果可以保存在内存中,从而不需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法. 2.Spark的架构? Bagel(Pregel on Spark)    

Spark+hadoop+mllib及相关概念与操作笔记

Spark+hadoop+mllib及相关概念与操作笔记 作者: lw 版本: 0.1 时间: 2016-07-18 1.调研相关注意事项 a) 理解调研 调研的意义在于了解当前情况,挖掘潜在的问题,解决存在的疑问,并得到相应的方案. b) 调研流程 首先明确和梳理现有的疑问是什么,要通过调研解决什么问题,然后再去做调研,发现问题,再解决问题. c) 调研成果 最终需要得到结论与方案,以及详尽的论证理由,让别人信服. d) 书写格式 版本与作者以及时间可以以表格的形式,整齐明了. 结论简洁明了,

Spark入门实战系列--1.Spark及其生态圈简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark St

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

spark streaming (二)

一.基础核心概念 1.StreamingContext详解 (一) 有两种创建StreamingContext的方式:             val conf = new SparkConf().setAppName(appName).setMaster(master);             val ssc = new StreamingContext(conf, Seconds(1)); StreamingContext, 还可以使用已有的SparkContext来创建         

Spark中的键值对操作-scala

1.PairRDD介绍     Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD. 2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairR

【spark系列3】spark开发简单指南

分布式数据集创建之textFile 文本文件的RDDs能够通过SparkContext的textFile方法创建,该方法接受文件的URI地址(或者机器上的文件本地路径,或者一个hdfs://, sdn://,kfs://,其他URI).这里是一个调用样例:scala> val distFile = sc.textFile("data.txt")distFile: spark.RDD[String] = [email protected] 分布式数据集操作之转换和动作 分布式数据集