Spark MaprLab-Auction Data分析

一、环境安装

1.安装hadoop

http://my.oschina.net/u/204498/blog/519789

2.安装spark

3.启动hadoop

4.启动spark

二、

1.数据准备

从MAPR官网上下载数据DEV360DATA.zip并上传到server上。

[[email protected] spark-1.5.1-bin-hadoop2.6]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6

[[email protected] spark-1.5.1-bin-hadoop2.6]$ cd test-data/

[[email protected] test-data]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6/test-data/DEV360Data

[[email protected] DEV360Data]$ ll
total 337940
-rwxr-xr-x 1 hadoop root    575014 Jun 24 16:18 auctiondata.csv        =>c测试用到的数据
-rw-r--r-- 1 hadoop root  57772855 Aug 18 20:11 sfpd.csv
-rwxrwxrwx 1 hadoop root 287692676 Jul 26 20:39 sfpd.json

[[email protected] DEV360Data]$ more auctiondata.csv 
8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3
8213034705,115,2.943484,davidbresler2,1,95,117.5,xbox,3
8213034705,100,2.951285,gladimacowgirl,58,95,117.5,xbox,3
8213034705,117.5,2.998947,daysrus,10,95,117.5,xbox,3
8213060420,2,0.065266,donnie4814,5,1,120,xbox,3
8213060420,15.25,0.123218,myreeceyboy,52,1,120,xbox,3
...
...

#数据结构如下
auctionid,bid,bidtime,bidder,bidrate,openbid,price,itemtype,daystolve

#把数据上传到HDFS中
[[email protected] DEV360Data]$ hdfs dfs -mkdir -p /spark/exer/mapr
[[email protected] DEV360Data]$ hdfs dfs -put auctiondata.csv /spark/exer/mapr
[[email protected] DEV360Data]$ hdfs dfs -ls /spark/exer/mapr
Found 1 items
-rw-r--r--   2 hadoop supergroup     575014 2015-10-29 06:17 /spark/exer/mapr/auctiondata.csv

2.运行spark-shell 我用的scala.并针对以下task,进行分析

tasks:

a.How many items were sold?

b.How many bids per item type?

c.How many different kinds of item type?

d.What was the minimum number of bids?

e.What was the maximum number of bids?

f.What was the average number of bids?

[[email protected] spark-1.5.1-bin-hadoop2.6]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6

[[email protected] spark-1.5.1-bin-hadoop2.6]$ ./bin/spark-shell 
...
...
scala >

#首先从HDFS加载数据生成RDD
scala > val originalRDD = sc.textFile("/spark/exer/mapr/auctiondata.csv")
...
...
scala > originalRDD      ==>我们来分析下originalRDD的类型 RDD[String] 可以看做是一条条String的数组,Array[String]
res26: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

##根据“,”把每一行分隔使用map
scala > val auctionRDD = originalRDD.map(_.split(","))
scala> auctionRDD        ==>我们来分析下auctionRDD的类型 RDD[Array[String]] 可以看做是String的数组,但元素依然是数组即,可以认为Array[Array[string]]
res17: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:23

a.How many items were sold?

 ==> val count = auctionRDD.map(bid => bid(0)).distinct().count()

根据auctionid去重即可:每条记录根据“,”分隔,再去重,再计数

#获取第一列,即获取auctionid,依然用map
#可以这么理解下面一行,由于auctionRDD是Array[Array[String]]那么进行map的每个参数类型是Array[String],由于actionid是数组的第一位,即获取第一个元素Array(0),注意是()不是[]
scala> val auctionidRDD = auctionRDD.map(_(0))
...
...

scala> auctionidRDD        ==>我们来分析下auctionidRDD的类型 RDD[String] ,理解为Array[String],即所有的auctionid的数组
res27: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:26

#对auctionidRDD去重
scala > val auctionidDistinctRDD=auctionidRDD.distinct()

#计数
scala > auctionidDistinctRDD.count()
...
...

b.How many bids per item type?

===> auctionRDD.map(bid => (bid(7),1)).reduceByKey((x,y) => x + y).collect()

#map每一行,获取出第7列,即itemtype那一列,输出(itemtype,1)
#可以看做输出的类型是(String,Int)的数组
scala > auctionRDD.map(bid=>(bid(7),1))
res30: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at map at <console>:26
...

#reduceByKey即按照key进行reduce
#解析下reduceByKey对于相同的key, 
#(xbox,1)(xbox,1)(xbox,1)(xbox,1)...(xbox,1) ==> reduceByKey ==> (xbox,(..(((1 + 1) + 1) + ... + 1))
scala > auctionRDD.map(bid=>(bid(7),1)).reduceByKey((x,y) => x + y)
#类型依然是(String,Int)的数组 String=>itemtype Int已经是该itemtype的计数总和了
res31: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:26

#通过collect() 转换成 Array类型数组
scala > auctionRDD.map(bid=>(bid(7),1)).reduceByKey((x,y) => x + y).collect()

res32: Array[(String, Int)] = Array((palm,5917), (cartier,1953), (xbox,2784))
时间: 2024-11-05 19:05:45

Spark MaprLab-Auction Data分析的相关文章

Spark的Python和Scala shell介绍(翻译自Learning.Spark.Lightning-Fast.Big.Data.Analysis)

Spark提供了交互式shell,交互式shell让我们能够点对点(原文:ad hoc)数据分析.如果你已经使用过R,Python,或者Scala中的shell,或者操作系统shell(例如bash),又或者Windows的命令提示符界面,你将会对Spark的shell感到熟悉. 但实际上Spark shell与其它大部分shell都不一样,其它大部分shell让你通过单个机器上的磁盘或者内存操作数据,Spark shell让你可以操作分布在很多机器上的磁盘或者内存里的数据,而Spark负责在集

Spark SQL 源码分析之 In-Memory Columnar Storage 之 in-memory query

/** Spark SQL源码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的. 那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式. 一.引子 本例使用hive console里查询cache后的src表. select value from src 当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用

使用Spark MLlib进行情感分析

使用Spark MLlib进行情感分析 使用Spark MLlib进行情感分析 一.实验说明 在当今这个互联网时代,人们对于各种事情的舆论观点都散布在各种社交网络平台或新闻提要中.我们可以在移动设备或是个人PC上轻松地发布自己的观点.对于这种网上海量分布地数据,我们可以利用文本分析来挖掘各种观点.如下图中,CognoviLabs利用Twitter上人们发布对于美国大选两个候选人的推特,进行情感分析的结果.从这张图我们也可以直观地感受到民意所向(此图发表日期为10月10日,早于今年美国大选的日子)

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

第十篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 query

/** Spark SQL源码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的. 那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式. 一.引子 本例使用hive console里查询cache后的src表. select value from src 当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用

第七篇:Spark SQL 源码分析之Physical Plan 到 RDD的具体实现

/** Spark SQL源码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. [java] view plain copy lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

Spark SQL源码分析之核心流程

自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql. 2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里. 前一段时间测试过Shark,并且对Spark

Spark SQL 源码分析之 In-Memory Columnar Storage 之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar

Spark SQL CLI 实现分析

背景 本文主要介绍了Spark SQL里目前的CLI实现,代码之后肯定会有不少变动,所以我关注的是比较核心的逻辑.主要是对比了Hive CLI的实现方式,比较Spark SQL在哪块地方做了修改,哪些地方与Hive CLI是保持一致的.可以先看下总结一节里的内容. Spark SQL的hive-thriftserver项目里是其CLI实现代码,下面先说明Hive CLI的主要实现类和关系,再说明Spark SQL CLI的做法. Hive CLI 核心启动类是org.apache.hive.se