sparkR 跑通的函数

spark1.4.0的sparkR的思路:用spark从大数据集中抽取小数据(sparkR的DataFrame),然后到R里分析(DataFrame)。

这两个DataFrame是不同的,前者是分布式的,集群上的DF,R里的那些包都不能用;后者是单机版的DF,包里的函数都能用。

sparkR的开发计划,个人觉得是将目前包里的函数,迁移到sparkR的DataFrame里,这样就打开一片天地。

> a<- sql(hiveContext, "SELECT count(*) FROM anjuke_scores where restaurant>=10");
> a<- sql(hiveContext, "SELECT * FROM anjuke_scores limit 5")
> a
DataFrame[city:string, housingname:string, ori_traffic_score:int, ori_traffic_score_normal:double, metro_station:double, metro_station_normal:double,...
> first(a)  #显示Formal Data Frame的第一行

> head(a) ; #列出a的前6行

> columns(a) # 列出全部的列

[1] "city" "housingname" "ori_traffic_score" "ori_traffic_score_normal"

[5] "metro_station" "metro_station_normal" "bus_station" "bus_station_normal" ...

> showDF(a)
> b<-filter(a, a$ori_comfort>8); # 行筛选, ori_comfort_normal:double
> print(a);    #打印列名及类型
DataFrame[city:string, housingname:string, ori_traffic_score:int, ......

> printSchema(a); # 打印列名的树形框架概要 root |-- city: string (nullable = true) |-- housingname: string (nullable = true) |-- ori_traffic_score: integer (nullable = true) |-- ori_traffic_score_normal: double (nullable = true) |-- metro_station: double (nullable = true)
> take(a,10) ; # 提取Formal class DataFrame的前面num行,成为R中普通的 data frame , take(x, num)

city housingname ori_traffic_score ori_traffic_score_normal metro_station metro_station_normal

1 \t\x9a \xddrw\xb8 NA 0 NA 0

2 \t\x9a \xe4\xf04\u03a2\021~ NA 0 NA 0

3 \t\x9a \xf6\xe3w\xb8 NA 0 NA 0

4 \t\x9a \x8e=\xb0w\xb8 NA 0 NA 0

5 \t\x9a \t\x9a\xe4\xf04\xce\xe4\xf0~ NA 0 NA 0

6 \t\x9a q4\xfdE NA 0 NA 0

7 \t\x9a \xe4\xf04\xce NA 0 NA 0

8 \t\x9a )\xfdVT NA 0 NA 0

9 \t\x9a q\177V NA 0 NA 0

10 \t\x9a \xe4\xf04\xceW\xb8 NA 0 NA 0

> b<-take(a,10)
> dim(b)
[1] 10 41
> aa <- withColumn(a, "ori_comfort_aa", a$ori_comfort * 5)   #用现有的列生成新的列, 新增一列,ori_comfort_aa,结果还是Formal data frame结构
> printSchema(aa)
root
 |-- city: string (nullable = true)
.........
 |-- comfort_normal: double (nullable = true)
 |-- ori_comfort_aa: double (nullable = true)

> aa <- mutate(a, newCol1 = a$commerce_normal * 5, newCol2 = a$bank_normal * 2) ; #与withColumn类似

> printSchema(aa)

root

|-- city: string (nullable = true)

。。。。。。。。。。。。。。。。。。

|-- comfort_normal: double (nullable = true)

|-- newCol1: double (nullable = true)

|-- newCol2: double (nullable = true)

a1<-arrange(a,asc(a$level_tow)); # 按列排序, asc升序,desc降序

a1<-orderBy(a,asc(a$level_tow)); # 按列排序

count(a) ; # 统计 Formal Data Frame有多少行数据

> dtypes(a);  #以list的形式列出Formal Data Frame的全部列名及类型
[[1]]
[1] "city"   "string"

[[2]]
[1] "housingname" "string"

> a<-withColumnRenamed(a,"comfort_normal","AA");  # 更改列名
> printSchema(a)
root
 |-- city: string (nullable = true)
 |-- housingname: string (nullable = true)
..........
 |-- AA: double (nullable = true)

创建sparkR的数据框的函数

createDataFrame
> df<-createDataFrame(sqlContext,a.df);  # a.df是R中的数据框, df是sparkR的数据框,注意:使用sparkR的数据库,需要sqlContext

> str(a.df)

‘data.frame‘: 5 obs. of 41 variables:

> str(df)

Formal class ‘DataFrame‘ [package "SparkR"] with 2 slots

[email protected] env:<environment: 0x4fce350>

[email protected] sdf:Class ‘jobj‘ <environment: 0x4fc70b0>

> destDF <- select(SFO_DF, "dest", "cancelled"); #选择列

> showDF(destDF); #显示sparkR的DF

+----+---------+

|dest|cancelled|

+----+---------+

| SFO| 0|

................

> registerTempTable(SFO_DF, "flightsTable"); #要对sparkDF使用SQL语句,首先需要将DF注册成一个table

> wa <- sql(sqlContext, "SELECT dest, cancelled FROM flightsTable"); #在sqlContext下使用SQL语句

> showDF(wa); #查询的结果还是sparkDF

+----+---------+

|dest|cancelled|

+----+---------+

| SFO| 0|

................

> local_df <- collect(wa); #将sparkDF转换成R中的DF

> str(local_df)

‘data.frame‘: 2818 obs. of 2 variables:

$ dest : chr "SFO" "SFO" "SFO" "SFO" ...

$ cancelled: int 0 0 0 0 0 0 0 0 0 0 ...

> wa<-flights_df[1:1000,]; #wa是R中的DF

> flightsDF<-createDataFrame(sqlContext,wa) ; #flightsDF是sparkR的DF

> library(magrittr); #管道函数的包对sparkRDF适用

> groupBy(flightsDF, flightsDF$date) %>%

+ summarize(avg(flightsDF$dep_delay), avg(flightsDF$arr_delay)) -> dailyDelayDF; #注意,语法和dplyr中的有所不同,结果还是sparkRDF

> str(dailyDelayDF)

Formal class ‘DataFrame‘ [package "SparkR"] with 2 slots

[email protected] env:<environment: 0x4cd3118>

[email protected] sdf:Class ‘jobj‘ <environment: 0x4cd6968>

> showDF(dailyDelayDF)

+----------+--------------------+--------------------+

| date| AVG(dep_delay)| AVG(arr_delay)|

+----------+--------------------+--------------------+

|2011-01-01| 5.2| 5.8|

|2011-01-02| 1.8333333333333333| -2.0|

................

在39机器上跑的

collect将sparkDF转化成DF

Collects all the elements of a Spark DataFrame and coerces them into an R data.frame.

collect(x, stringsAsFactors = FALSE),x:A SparkSQL DataFrame

> dist_df<- sql(hiveContext, "SELECT * FROM anjuke_scores where restaurant<=1");

> local_df <- dist_df %>%

groupBy(dist_df$city) %>%

summarize(count = n(dist_df$housingname)) %>%

collect

> local_df

city count

1 \t\x9a 5

2 8\xde 7

3 \xf0\xde 2

..........

..........

take也可将sparkDF转化成DF

Take the first NUM rows of a DataFrame and return a the results as a data.frame

take(x, num)

> local_df <- dist_df %>%

groupBy(dist_df$city) %>%

summarize(count = n(dist_df$housingname))

> a<-take(local_df,100)

[Stage 16:=========================================> (154 + 1) / 199] > View(a)

> a

city count

1 \t\x9a 5

2 8\xde 7

3 \xf0\xde 2

..........

..........

不通的函数:

> describe(a)
Error in x[present, drop = FALSE] :
  object of type ‘S4‘ is not subsettable
> jfkDF <- filter(flightsDF, flightsDF$dest == "DFW")
Error in filter(flightsDF, flightsDF$dest == "DFW") :
  no method for coercing this S4 class to a vector
时间: 2024-10-29 19:06:58

sparkR 跑通的函数的相关文章

如何在Windows下用cpu模式跑通py-faster-rcnn 的demo.py

关键字:Windows.cpu模式.Python.faster-rcnn.demo.py 声明:本篇blog暂时未经二次实践验证,主要以本人第一次配置过程的经验写成.计划在7月底回家去电脑城借台机子试试验证步骤的正确性,本blog将根据实际遇到的问题持续更新.另外blog中除提到的下载链接外我还会给出网盘链接方便下载,包括我的整个工程的网盘链接.如果有些报错解决不了可直接拿本人的相关文件替换,本篇blog具有较高的参考性. 本人微软版caffe工程     下载链接:http://pan.bai

ubuntu12.04+fuerte 下跑通lsd-slam——使用usb摄像头

上一篇介绍了如何使用数据集跑lsd-slam,这篇介绍如何用一个普通的usb摄像头跑lsd-slam,默认ubuntu12.04,fuerte已经安装好,workspace也已设置,如果没有,请参考上一篇数据集下跑lsd-slam的博文. 我使用的摄像头是罗技c310 usb摄像头,其他的usb摄像头应该也没什么问题. 1.测试摄像头好坏.安装cheese,执行 $ sudo apt-get install cheese 运行cheese,执行 $ cheese 确认摄像头是否能在ubuntu下

ubuntu12.04+fuerte 下跑通lsd-slam——数据集

第一次在博客园写文章,写的不好的地方,还请大家指出来:) lsd-slam(下载链接:https://github.com/tum-vision/lsd_slam)提供了两种方法,一种是用数据集(下载地址http://vision.in.tum.de/lsdslam),一种是用usb摄像头,github也有相应的使用说明,不是很详细,下面介绍我的步骤.ps:也是一个slam新手,很多东西不懂,有错误的地方请大家指出 环境:ubuntu12.04+fuerte 目标:使用数据集,跑通lsd-sla

飘逸的python - 极简的二叉树前中后序通杀函数

对于任一结点,可以按某种次序执行三个操作: 访问结点本身(N) 遍历该结点的左子树(L) 遍历该结点的右子树(R) 用来表示顺序,即,前序NLR/中序LNR/后序LRN. 下面我们用namedtuple来表达树,而通杀的遍历函数带一个order参数,只要我们把指定顺序传进去即可实现对应的遍历. #coding=utf-8 ''' 1 / / / 2 3 / \ / 4 5 6 / / 7 8 9 ''' from collections import namedtuple from sys im

C++的容器(一):c++中所有容器的共通操作函数

C++中的所有容器都支持三个核心能力: 所有容器提供的都是‘value语义’而非‘reference语义’.可以通过指针元素来实现某些功能. 容器的所有元素有一个固定的次序(order),你可以以相同的次序多次遍历每个元素.并且每个容器也提供“迭代器生成器”函数,运用这些生成的迭代器可以遍历整个容器. 你无法保证程序的每个操作都安全.操作者必须确保操作函数的参数都符合要求,因为容器不会抛出异常. 容器(Container)的共通操作函数集(使用ContType表示所有容器类型): 函数 功能 C

[erlang 002]gen_server中何时会跑到terminate函数

1. 从start方法产出的独立gen_server进程 实验代码: %%%-------------------------------------- %%% @Module  : %%% @Author  : %%% @Email   : %%% @Created : %%% @Description: %%%-------------------------------------- -module(ter_a). -behaviour(gen_server). %% gen_server

ORB-SLAM2编译并跑通自己的数据集

我的环境是Ubuntu14.04虚拟机. 首先一定要认真看官网的安装教程,把需要安装的依赖都装好.https://github.com/raulmur/ORB_SLAM2 然后按以下步骤下载编译,我最开始是文件夹名称不为ORB_SLAM2,一直编译不成功,很是郁闷,重新按官网的git方法下载下来,很顺畅的就编译成功了,遇到同样问题的小伙伴可以试一下次解决方法 git clone https://github.com/raulmur/ORB_SLAM2.git ORB_SLAM2 cd ORB_S

微信公众号开发技术基础(一):Eclipse+Tomcat搭建本地服务器并跑通HelloWorld程序

本文结构:(一)环境准备(二)在Eclipse里创建Dynamic Web工程(三)写一个简单的Servlet类并配置web.xml(四)运行程序 (一)环境准备 1.Eclipse:要使用for JavaEE版本的Eclipse,因为要创建Dynamic Web(动态Web)程序,这里使用eclipse-jee-mars-1-win32-x86_64版本.Eclipse是绿色软件,下载后解压缩即可打开使用. 百度网盘下载链接:https://pan.baidu.com/s/1dFvaKrJ 2

百度地图sdk跑通遇到的问题2--地图不显示数据

地图不显示数据. 原因使用了自己的签名,但跑demo的时候,使用的是eclipse默认的签名. 修正:百度应用申请的数字签名(SHA1)先改成eclipse默认的签名.