知识点-Spark小节

Spark处理字符串日期的max和min的方式
Spark处理数据存储到Hive的方式
Spark处理新增列的方式map和udf、functions
Spark处理行转列pivot的使用
Python 3.5.3
Spark1.6.2

欢迎访问个人主页博客

Spark处理字符串日期的max和min的方式

一般是字符串类型的日期在使用Spark的agg求max时,是不正确的,API显示只支持数值型的max、min
hive的SQL查询引擎是支持字符串日期的max和min的

字符串日期转为时间戳再聚合

unix_timestamp

public static Column unix_timestamp(Column s)
Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds), using the default timezone and the default locale, return null if fail.
Parameters:
s - (undocumented)
Returns:
(undocumented)
Since:
1.5.0
from pyspark.sql import functions as F

df.withColumn(‘startuptime_stamp‘, F.unix_timestamp(‘startuptime‘))
使用HiveSQL
select device_id, max(startuptime) as max_startuptime, min(startuptime) as min_startuptime from app_table group by device_id

Spark处理数据存储到Hive的方式

通常Spark任务处理后的结果数据会存储到Hive表中,可以先保存至HDFS目录再load、最方便还是直接使用临时表和HiveContext插入数据

saveAsTextFile & load data

repartition根据实际文件大小进行调整,数据比较小时,保存成一个文件

df.map(lambda r: func).repartition(1).saveAsTextFile(data_dir)

先删除分区,如果已经存在的话
再覆盖原来的数据【方便重新重复跑或修复数据】
此处使用shell,也可使用HiveContext的sql

alter table app_table drop if exists partition(datestr=‘$day_01‘);
load data inpath ‘hdfs://xx/out/$day_01‘ overwrite into table app_table partition(datestr=‘$day_01‘);
hivectx.sql & insert
app_table1_df.registerTempTable("app_table1_tmp")
app_table2_df.registerTempTable("app_table2_tmp")
hivectx.sql("set spark.sql.shuffle.partitions=1")
hivectx.sql("alter table app_table drop if exists partition(datestr=‘%s‘)" % daystr)
hivectx.sql("insert overwrite table app_table partition(datestr=‘%s‘) select * from app_table1_tmp" % daystr)
hivectx.sql("insert into app_table partition(datestr=‘%s‘) select * from app_table2_tmp" % daystr)

Spark处理新增列的方式map和udf、functions

Spark在处理数据转换时,通常需要使用map、flatmap等操作,其中使用map会产生新的列或修改某列字段的值
Spark同样支持自定义函数UDF以及提供了类似Hive内置函数的各种各样的处理函数

map

需要定义函数和StructType
忽略数值判断细节和精度等

from pyspark.sql.types import *

def a_func(_):
    return _[‘id‘], _[‘cnt1‘], _[‘cnt2‘], _[‘cnt1‘] / (_[‘cnt1‘] + _[‘cnt1‘])

a_schema = StructType([
    StructField(‘id‘, StringType(), True),
    StructField(‘cnt1‘, IntegerType(), True),
    StructField(‘cnt2‘, IntegerType(), True),
    StructField(‘cnt1_rate‘, IntegerType(), True)
])

a_new_df = sqlctx.createDataFrame(df.select(‘id‘, ‘cnt1‘, ‘cnt2‘).map(a_func), a_schema)
udf

需要定义函数和UDF
忽略数值判断细节和精度等

def a_func(cnt1, cnt2):
    return cnt1 / (cnt1 + cnt2)

a_udf = F.udf(a_func, IntegerType())

a_new_df = df.withColumn(‘cnt1_rate‘, a_udf(df[‘cnt1‘], df[‘cnt2‘])
functions

处理类似日期字符串的格式转换、等等等
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html

Spark处理行转列pivot的使用

在使用SQL查询数据时,很多情况下需要将行转为列,以有利于数据的展示和不同维度需求的利用
一般可采用子查询case when、连续join、字段补全union的形式
Spark的DataFrame中可以通过GroupedData的pivot函数来实现

df.groupBy([‘course_name‘]).pivot(‘daystr‘).sum(‘score‘)

df.groupBy([‘course_name‘]).pivot(‘daystr‘).count()

转换前

daystr course_name score
2017-11-15 yuwen 1
2017-11-15 yuwen 1
2017-11-15 shuxue 1
2017-11-15 yingyu 2
2017-11-16 yuwen 1
2017-11-16 shuxue 1
2017-11-16 yingyu 2

转换后

course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 2 2
course_name 2017-11-15 2017-11-16
yuwen 2 1
shuxue 1 1
yingyu 1 1

原文地址:https://blog.icocoro.me/2017/11/16/1711-zhishidian-spark%E5%B0%8F%E8%8A%8201/index.html
时间: 2024-08-30 04:48:27

知识点-Spark小节的相关文章

出租车数据分析

出租车数据分析 一.实验简介 图片来自pixabay.com 出租车是我们生活中经常乘坐的一种交通工具,但打车难的问题也限制了我们更好地利用这种交通方式.在哪些地方出租车更容易打到?在什么时候更容易打到出租车?本课程将基于某市的出租车行驶轨迹数据,带你学习如何应用Spark SQL和机器学习相关技巧,并且通过数据可视化手段展现分析结果. 1.1 知识点 Spark DataFrame操作 Spark SQL 的 API 查询 Spark MLlib 的 KMeans 算法应用 1.2 准备工作

第七章:选择器引擎

jQuery凭借选择器风靡全球,各大框架类库都争先开发自己的选择,一时间内选择器变为框架的标配 早期的JQuery选择器和我们现在看到的远不一样.最初它使用混杂的xpath语法的selector.第二代转换为纯css的自定义伪类,(比如从xpath借鉴过来的位置伪类)的sizzle,但sizzle也一直在变,因为他的选择器一直存在问题,一直到JQuery1.9才搞定,并最终全面支持css3的结构伪类. 2005 年,Ben Nolan的Behaviours.js 内置了闻名于世的getEleme

IT十八掌课程体系SPARK知识点总结

Spark知识点 IT十八掌课程体系SPARK知识点如下: 有需要IT十八掌体系课程的可以加微信:15210639973 1.定义 MapReduce-like集群计算框架设计的低延迟迭代和交互使用的工作. 2.体系结构 3.一些重要概念的解析 (1) RDD(resilient distributed dataset) 弹性分布式数据集一个只读的,可分区的分布式数据集,能够部分或全部的缓存在内存中(数据溢出时会根据LRU策略来决定哪些数据可以放在内存里,哪些存到磁盘上),用来减少Disk-io

Spark MLlib知识点整理

MLlib的设计原理:把数据以RDD的形式表示,然后在分布式数据集上调用各种算法.MLlib就是RDD上一系列可供调用的函数的集合. 操作步骤: 1.用字符串RDD来表示信息. 2.运行MLlib中的一个特征提取算法来吧文本数据转换为数值的特征.给操作会返回一个向量RDD. 3.对向量RDD调用分类算法,返回一个模型对象,可以使用该对象对新的数据点进行分类. 4.使用MLlib的评估函数在测试数据集上评估模型. 机器学习基础: 机器学习算法尝试根据 训练数据 使得表示算法行为的数学目标最大化,并

Spark学习知识点

1.Spark集群部署及开发环境搭建.内容包括Hadoop集群.Spark集群搭建,Intellij IDEA Spark开发环境搭建, Spark Shell的使用等.2.Spark 运行原理.内容包括spark脚本文件解析.Spark 几种不同运行方式.RDD原理.宽依赖与窄依赖.Spark 任务调度等.3.Spark 编程模型,介绍Spark编程模型.对常用的transformation及action操作进行介绍.4.Spark SQL与DataFrame,介绍Spark SQL 及Dat

Spark 知识点总结--调优(一)

搭建集群: SPARK_WORKER-CORES : 当计算机是32核双线程的时候,需要指定SPARK_WORKER_CORES的个数为64个 SPARK_WORKER_MEMORY : 任务提交: ./spark-submit --master node:port --executor-cores --class  ..jar xxx --executor-cores: 指定每个executor使用的core 的数量 --executor-memory: 指定每个executor最多使用的内存

【Spark亚太研究院系列丛书】Spark实战高手之路-第2章动手实战Scala第2小节(3)

5,动手实战Scala中的apply方法和单例对象 新建一个类: 额外提一点,放在object对象中的方法都是静态方法,如下所示: 接下来看一下apply方法的使用: 上面代码总当我们使用"val a = ApplyTest()"的使用会导致apply方法的调用并返回该方法调用的值,也就是ApplyTest的实例化对象. Class中也可以由apply方法,其使用方法如下所示: 由于object中的方法和属性都是静态的,所以就是单例对象的理想载体,实例代码如下所示: 换言之,objec

黑马程序员——C语言知识小节------初学者容易忽略的知识点

这两天复习了之前学习的C语言基础知识,发现了许多对于初学者来说重要但是却容易被忽略的问题,在此总结一下,与君共勉. 1.程序中如果用到头文件stdio.h的内容,即使不写#include <stdio.h>程序也可以正常运行,只会警告,不会报错.这是因为链接程序会自动链接库函数.所以如果使用了非库函数的头文件,就必须要写头文件,不然程序就会报错. 2.在使用scanf函数输入参数时,若以空格作为分隔,如: int a, b; scanf("%d %d", &a, &

javascript知识点小节

1.类型转换:parseInt\parseFloat\Number\== 2.本地对象的方法: String:concat\split\substring\substr\match\replace Number:toFixed Array:concat\join\slice\splice RegExp:exec\test 3.DOM操作: getElementById\getElementsByName createElement appendChild\removeChild\replaceC