Spark 实践——用Scala和Spark进行数据分析

本文基于《Spark 高级数据分析》第2章 用Scala和Spark进行数据分析。

完整代码见 https://github.com/libaoquan95/aasPractice/tree/master/c2/Into

1.获取数据集

数据集来自加州大学欧文分校机器学习资料库(UC Irvine Machine Learning Repository),这个资料库为研究和教学提供了大量非常好的数据源, 这些数据源非常有意义,并且是免费的。 我们要分析的数据集来源于一项记录关联研究,这项研究是德国一家医院在 2010 年完成的。这个数据集包含数百万对病人记录,每对记录都根据不同标准来匹配,比如病人姓名(名字和姓氏)、地址、生日。每个匹配字段都被赋予一个数值评分,范围为 0.0 到 1.0, 分值根据字符串相似度得出。然后这些数据交由人工处理,标记出哪些代表同一个人哪些代表不同的人。 为了保护病人隐私,创建数据集的每个字段原始值被删除了。病人的 ID、 字段匹配分数、匹配对标示(包括匹配的和不匹配的)等信息是公开的,可用于记录关联研究

下载地址:

  1. http://bit.ly/1Aoywaq (需FQ)
  2. https://github.com/libaoquan95/aasPractice/tree/master/c2/linkage(已解压,block_1.csv 到 block_10.csv)

2.设置Spark运行环境,读取数据

val sc = SparkSession.builder().appName("Into").master("local").getOrCreate()
import sc.implicits._

读取数据集

// 数据地址
val dataDir = "inkage/block_*.csv"
// 读取有头部标题的CSV文件,并设置空值
val parsed = sc.read .option("header", "true") .option("nullValue", "?") .option("inferSchema", "true") .csv(dataDir)
// 查看表
parsed.show()
// 查看表结构
parsed.printSchema()
parsed.cache()

3.处理数据

首先按 is_match 字段聚合数据,有两种方式可以进行数据聚合,一是使用 groupby 函数,二是使用 Spark Sql

// 聚合
parsed.groupBy("is_match").count().orderBy($"count".desc).show()

// 先注册为临时表
parsed.createOrReplaceTempView("linkage")
// 使用sql查询,效果同上
sc.sql("""
  SELECT is_match, COUNT(*) cnt
  FROM linkage
  GROUP BY is_match
  ORDER BY cnt DESC
""").show()

之后使用 describe 函数获取每个字段的最值,均值等信息

// 获取每一列的最值,平均值信息
val summary = parsed.describe()
summary.show()
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

按此方式获取匹配记录和不匹配记录的 describe

// 获取匹配和不匹配的信息
val matches = parsed.where("is_match = true")
val misses = parsed.filter($"is_match" === false)
val matchSummary = matches.describe()
val missSummary = misses.describe()
matchSummary .show()
missSummary .show()

可以看到这个数据不方便进行操作,可以考虑将其转置,方便使用sql对数据进行分析

def longForm(desc: DataFrame): DataFrame = {
  import desc.sparkSession.implicits._ // For toDF RDD -> DataFrame conversion
  val schema = desc.schema
  desc.flatMap(row => {
    val metric = row.getString(0)
    (1 until row.size).map(i => (metric, schema(i).name, row.getString(i).toDouble))
  })
    .toDF("metric", "field", "value")
}
def pivotSummary(desc: DataFrame): DataFrame = {
  val lf = longForm(desc)
  lf.groupBy("field").
    pivot("metric", Seq("count", "mean", "stddev", "min", "max")).
    agg(first("value"))
}

// 转置,重塑数据
val matchSummaryT = pivotSummary(matchSummary)
val missSummaryT = pivotSummary(missSummary)
matchSummaryT.createOrReplaceTempView("match_desc")
missSummaryT.createOrReplaceTempView("miss_desc")
sc.sql("""
  SELECT a.field, a.count + b.count total, a.mean - b.mean delta
  FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
  ORDER BY delta DESC, total DESC
""").show()

原文地址:https://www.cnblogs.com/libaoquan/p/9082972.html

时间: 2024-11-10 23:32:50

Spark 实践——用Scala和Spark进行数据分析的相关文章

大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统

一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS).影像存档和通信系统(PACS).电子病历系统(EMR)和区域医疗卫生服务(GMIS)等成功实施与普及推广,而且随着日新月异的计算机技术和网络技术的革新,进一步为数字化医院带来新的交互渠道譬如:远程医疗服务,网上挂号预约. 随着IT技术的飞速发展,80%以上的三级医院都相继建立了自己的医院信息系统

初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,

Scala深入浅出实战中级--进阶经典(第66讲:Scala并发编程实战初体验及其在Spark源码中应用解析)内容介绍和视频链接 2015-07-24 DT大数据梦工厂 从明天起,做一个勤奋的人 看视频.下视频,分享视频 DT大数据梦工厂-Scala深入浅出实战中级--进阶经典:第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析 本期视频通过代码实战详解了Java语言基于加锁的并发编程模型的弊端以及Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Sc

搭建scala 开发spark程序环境及实例演示

上一篇博文已经介绍了搭建scala的开发环境,现在进入正题.如何开发我们的第一个spark程序. 下载spark安装包,下载地址http://spark.apache.org/downloads.html(因为开发环境需要引用spark的jar包) 我下载的是spark-2.1.0-bin-hadoop2.6.tgz,因为我的scalaIDE版本是scala-SDK-4.5.0-vfinal-2.11-win32.win32.x86_64.zip 最好,IDE版本和spark版本要匹配,否则,开

Spark&Hive:如何使用scala开发spark作业,并访问hive。

背景: 接到任务,需要在一个一天数据量在460亿条记录的hive表中,筛选出某些host为特定的值时才解析该条记录的http_content中的经纬度: 解析规则譬如: 需要解析host: api.map.baidu.com 需要解析的规则:"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877}, "confidence&quo

大数据Spark学习:Scala基础第一课

计划: 阶段1: 精通Spark内核 阶段2: 精通千万级的项目 阶段3: 机器学习 JAVA本身不是伟大的语言,伟大的是JVM,构件分布式平台什么的,依赖的是JVM,不一定要JAVA语言 可认为Scala是JAVA的升级语言,JAVA是支持面向对象的语言,而非纯面向对象的语言.Scala是一切皆对象, 是纯面向对象语言.面向对象结合函数式编程. 不可变变量声明val result = 10+2  不可再次赋值,分布式数据的时候,传输数据.校验数据等不想改变这个数据 可变变量var name =

使用scala开发spark入门总结

使用scala开发spark入门总结 一.spark简单介绍 关于spark的介绍网上有很多,可以自行百度和google,这里只做简单介绍.推荐简单介绍连接:http://blog.jobbole.com/89446/ 1.    spark是什么? Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架.一般配合hadoop使用,可以增强hadoop的计算性能. 2.    Spark的优点有哪些? Sp

个推 Spark实践教你绕过开发那些“坑”

Spark作为一个开源数据处理框架,它在数据计算过程中把中间数据直接缓存到内存里,能大大提高处理速度,特别是复杂的迭代计算.Spark主要包括SparkSQL,SparkStreaming,Spark MLLib以及图计算. Spark核心概念简介 1.RDD即弹性分布式数据集,通过RDD可以执行各种算子实现数据处理和计算.比如用Spark做统计词频,即拿到一串文字进行WordCount,可以把这个文字数据load到RDD之后,调用map.reducebyKey 算子,最后执行count动作触发

地铁译:Spark for python developers --- 搭建Spark虚拟环境1

一个多月的地铁阅读时光,阅读<Spark for python developers>电子书,不动笔墨不看书,随手在evernote中做了一下翻译,多年不习英语,自娱自乐.周末整理了一下,发现再多做一点就可基本成文了,于是开始这个地铁译系列. 本章中,我们将为开发搭建一个独立的虚拟环境,通过Spark和Anaconda提供的PyData 库为该环境补充能力. 这些库包括Pandas,Scikit-Learn, Blaze, Matplotlib, Seaborn, 和 Bokeh. 我们的操作

Spark入门实战系列--1.Spark及其生态圈简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark St