sparklyr包--实现R与Spark接口

1.sparklyr包简介

  Rstudio公司发布的sparklyr包具有以下几个功能:

  • 实现R与Spark的连接;
  • sparklyr包提供了一个完整的dplyr后端,可筛选并聚合Spark数据集,接着在R中实现分析与可视化;
  • 利用Spark的MLlib机器学习库在R中实现分布式机器学习算法;
  • 可以创建一个扩展,用于调用Spark API,并为Spark的所有包集提供了一个接口。

2.RStudio Server安装sparklyr包

  Linux版本:Ubuntu 16.04 LTS 64bit

  R版本:R3.3.1 64bit

  RStudio Server版本:rstudio-server-0.99.902 64bit

  通过devtools包实现sparklyr包的安装:

install.packages("devtools")
devtools::install_github("rstudio/sparklyr")

注意:此处安装devtools时Ubuntu中可能会出现安装不上的错误:

看错误信息可以知道找不到openssl,需要安装libssl-dev(Ubuntu):

$ sudo apt-get install libssl-dev

然后安装sparklyr因为网速等的原因可能需要进行多次安装,多尝试几次就可以了。如果安装中断,很可能安装包被lock,可以使用下面的方案解决(以reader包为例):

install.packages(“readr”,dependencies=TRUE,INSTALL_opts = c(‘—no-lock’))

3.在本地安装Spark 1.6.1和Hadoop 2.6

library(sparklyr)
spark_install(version = "1.6.1")

  

此处默认的是使用Spark 1.6.1和Hadoop 2.6

如果用的是RStudio IDE,还需下载最新的预览版IDE。它包含有实现与Spark交互的若干增强功能(详情参考RStudio IDE)。

https://www.rstudio.com/products/rstudio/download/preview/

http://spark.rstudio.com/index.html#rstudio-ide

4.部署Spark

4.1本地部署

安装好sparklyr包后,我们连接本地的Spark,也可以连接远程的Spark集群。这里,使用spark_connect函数来连接本地的Spark:

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")

返回的Spark连接(sc)为Spark集群提供了一个远程的dplyr数据源。

4.2集群部署

使用sparklyr连接远程Spark集群Cluster Deployment,需要直接将R session部署在集群节点中的一个machine或者靠近集群处(根据网络性能)。在此种情况下,R不是直接在集群上运行,所以,必须要保证集群中的每个machine都有一个spark version并且具有相同的配置。

在集群节点中的一个machine或者靠近集群处运行R最直截了当的方式可以通过远程SSH会话或Rstudio server。在集群节点中使用的spark version必须已经在节点中部署,并且spark的路径需要添加到环境变量SPARK_HOME中,因此,在尝试一个连接之前必须保证SPARK_HOME环境变量在server中正确定义。通常是在Renviron.site配置文件中完成的。示例:

SPARK_HOME=/opt/spark/spark-1.6.1-bin-hadoop2.6

然后,通过spark_connect函数和主节点的地址连接,例如:

library(sparklyr)
sc <- spark_connect(master = "spark://local:7077")

如果在EC2上使用Spark EC2部署脚本,可以从/root/spark-ec2/cluster-url读取master,例如:

library(sparklyr)
cluster_url <- system(‘cat /root/spark-ec2/cluster-url‘, intern=TRUE)
sc <- spark_connect(master = cluster_url)

连接工具

可以通过spark-web函数观看Spark web UI,通过spark_log函数观看Spark log(日志)

spark_web(sc)
spark_log(sc)

使用spark_disconnect函数断开spark的连接:

spark_disconnect(sc)

5.配置configuration

本部分描述sparklyr包和潜在的spark集群的行为的配置的各个选项。同时介绍创建多个配置文件(比如开发、测试、生产)。

5.1配置文件Config Files

通过spark_connect函数的config参数可以指定Spark连接的配置。通过使用config=spark_config()创建默认的配置。下面的代码代表了默认的行为:

spark_connect(master = "local", config = spark_config())

通过config=spark_config()函数可以从本地当前工作目录(如果不是位于工作目在父目录录)的路径中读取配置文件config.yml中的数据。这个文件不是必需的,只是用来提供重写默认行为overriding default behavior。还可以指定另一个配置文件名称 和/或 位置。config.yml文件依次处理使用配置包(using the config package),配置包可以支持多命名配置文件。

5.2Package选项

有许多可用的选项配置sparklyr包的行为:


选项


描述


sparklyr.defaultPackages


自动包括在会话中的Spark packages (defaults to com.databricks:spark-csv_2.11:1.3.0” and “com.amazonaws:aws-java-sdk-pom:1.10.34”)


sparklyr.cores.local


当在本地运行时使用的内核数量 (defaults to parallel::detectCores)


sparklyr.shell.*


传递给spark-shell的命令行参数 (see the Spark documentation for details on supported options)

举个例子:下面的配置文件设置了本地内核数为4并分配给Spark驱动2G内存:

default:
sparklyr.cores.local: 4
sparklyr.shell.driver-memory: 4GB

注:多文件的default使用将在下面描述。

5.3Spark选项

可以使用config.yml指定任意的spark配置属性:


选项


描述


spark.*


任意配置属性 (通过创建一个SparkConf包含指定的属性应用)。spark的配置文档可以查看可用的属性。http://spark.apache.org/docs/latest/configuration.html


spark.sql.*


Spark SQL的任意配置属性 (applied using SET)。Spark SQL Programming Guide的配置文档可以查看可用的属性。http://spark.apache.org/docs/latest/sql-programming-guide.html

举个例子:下面的配置文件为spark设置了一个当前的工作目录,并指定当揉数据(joins or aggregations)时使用的分区数量为100。

default:
spark.local.dir: /tmp/spark-scratch
spark.sql.shuffle.partitions: 100

5.4多文件配置

config包允许为不同环境定义多命名配置文件(例如:default, test, production)。所有额环境默认继承default环境,并且可以相互继承。

举个例子:您可能想使用一个不同的数据集来开发和测试或可能希望使用只适用于生产集群上运行的自定义Spark配置属性。config.yml表示如下:

default:
dataset: "observations-dev.parquet"
sample-size: 10000

production:
spark.memory.fraction: 0.9
spark.rdd.compress: true
dataset: "observations.parquet"
sample-size: null

还可以使用这个特点来为不同的环境指定不同的Spark master:

default:
spark.master: "local"

production:
spark.master: "spark://local:7077"

使用上面的配置,可以在使用spark_connect()的时候彻底省略master参数:

sc <- spark_connect()

注意:当前活动配置通过R_CONFIG_ACTIVE环境变量的值决定,可以通过config package documentation详细的了解。https://github.com/rstudio/config

6.预览版RStudio Server

Rstudio server提供了一个基于web的IDE远程的R会话接口,使其spark集群可以供前端使用。本部分介绍一些对于RStudio Server非常有用的额外的配置选项。RStudio的最新预览版集成支持Spark和sparklyr包。包含以下工具https://www.rstudio.com/products/rstudio/download/preview/:

  • 创建和管理Spark连接
  • 浏览表格数据和Spark DataFrames的所有列
  • 可以预览Spark DataFrames的前1000行

6.1连接选项

一旦成功安装完sparklyr包,我们可以在IDE中可以看到一个新的Spark窗口。该窗口包含一个New Connection对话框,用于连接本地或者远程的Spark。如下所示:

可以使用rstudio.spark.connections选项配置哪一个连接,默认的可能是local和cluster连接,可以选择其中之一作为提供的连接,或者使用一个特殊的Spark master URL。一些常用的连接组合的选择包括:


Value


描述


c("local", "cluster")


Default 提供了本地和cluster spark instance的连接


"local"


提供了本地spark instance连接


"cluster"


提供了cluster spark instance连接


"spark://local:7077"


提供了特殊cluster的连接


c("spark://local:7077", "cluster")


提供了特殊cluster和其他cluster的连接

这些选项应该在Rprofile.site中设置,例如:

options(rstudio.spark.connections = "spark://local:7077")

6.2Spark安装

如果是在本地模式(相对于集群模式),需要预装spark version(s)并共享给使用该服务器的所有使用者。你可以安装spark version(s)在一个共享的目录中(e.g. /opt/spark),然后标明它作为spark安装目录。

options(spark.install.dir = "/opt/spark")

7.Sparklyr包的使用

7.1连接spark

安装好sparklyr包之后,我们连接本地的Spark,也可以连接远程的Spark集群。这里,我们使用spark_connect函数来连接本地的Spark:

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")

返回的Spark连接(sc)为Spark集群提供了一个远程的dplyr数据源。
出现下面的问题:

要求在Ubuntu中安装Java:

方法一:

Windows中下载

从Windows中复制到Ubuntu中:

打开新的控制台,创建目标文件夹:

root@love:/home/wangchao# cd /usr/lib
root@love:/usr/lib# sudo mkdir java

在原来的控制台中输入如下命令,安装JDK:

$ sudo tar zvxf jdk-7u67-linux-x64.gz -C /usr/lib/java
# 该命令的意思是解压jdk-7u67-linux-x64.gz文件,并把它安装到/usr/lib/java目录下,也就是前面创建的文件夹。注意命令中的-C是大写的字母C。

打开系统配置文件.bashrc

$ sudo gedit .bashrc

在其末端添加下面的代码,注意不要修改其他任何代码,添加环境变量:

export JAVA_HOME=/usr/lib/java/jdk1.7.0_67
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=${JAVA_HOME}/lib:${JRE_HOME}/lib:
export PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin:$PATH

测试是否安装成功:

解压后的文件:

方法二:

直接使用系统安装:

$ sudo apt-get install openjdk-8-jdk
$ sudo R CMD javareconf
$ sudo R

以上安装Java后还是出现了相同的问题:

安装好后发现还是不行,找不到Java,这下可以确定不是jdk的问题了,应该是R不能找到jdk,于是想到了一个打通R和Java的通道包rJava包,下载和安装rJava包:

Install.packages(“rJava”)

这下解决了以上的问题。

7.2读取数据

使用copy_to函数可以实现将R中的数据框导入到Spark。下面我将R自带的iris数据集,nycflights13包的flights数据集,以及Lahman包的Batting数据集复制到Spark(请确保安装了这两个包)。

install.packages(“nycflights13”)
install.packages(“Lahman”)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

此处由于在安装的时候出现了中断,导致依赖包readr包被锁(lock)如下所示:

当然,解决这个问题,就是删除已经安装好的了readr包然后重新在rstudio server中安装readr包,或者修改readr的访问权限:

install.packages(“readr”)
or
使用root权限
cd <readr存在的文件夹>
$ sudo chmod -R 777 readr

使用dplyr的src_tbls函数可以列出所有可用的表(包括预先加载在集群内的表)。

src_tbls(sc)

[1] "batting" "flights" "iris" 

7.3使用dplyr语法

利用dplyr语法来对集群内的所有表进行操作,下面是一个简单的数据筛选案例:

# 筛选出飞机晚点两分钟的航班信息
flights_tbl %>% filter(dep_delay == 2)
Source:   query [?? x 16]
Database: spark connection master=local app=sparklyr local=TRUE

    year month   day dep_time dep_delay arr_time arr_delay carrier tailnum flight origin  dest
   <int> <int> <int>    <int>     <dbl>    <int>     <dbl>   <chr>   <chr>  <int>  <chr> <chr>
1   2013     1     1      517         2      830        11      UA  N14228   1545    EWR   IAH
2   2013     1     1      542         2      923        33      AA  N619AA   1141    JFK   MIA
3   2013     1     1      702         2     1058        44      B6  N779JB    671    JFK   LAX
4   2013     1     1      715         2      911        21      UA  N841UA    544    EWR   ORD
5   2013     1     1      752         2     1025        -4      UA  N511UA    477    LGA   DEN
6   2013     1     1      917         2     1206        -5      B6  N568JB     41    JFK   MCO
7   2013     1     1      932         2     1219        -6      VX  N641VA    251    JFK   LAS
8   2013     1     1     1028         2     1350        11      UA  N76508   1004    LGA   IAH
9   2013     1     1     1042         2     1325        -1      B6  N529JB     31    JFK   MCO
10  2013     1     1     1231         2     1523        -6      UA  N402UA    428    EWR   FLL
..   ...   ...   ...      ...       ...      ...       ...     ...     ...    ...    ...   ...
Variables not shown: air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>.

dplyr导论提供了许多dplyr包中函数的使用案例。以下案例演示的是航班延误信息的数据可视化:

dplyr导论https://cran.rstudio.com/web/packages/dplyr/vignettes/introduction.html

delay <- flights_tbl %>%
group_by(tailnum) %>%
summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
filter(count > 20, dist < 2000, !is.na(delay)) %>%
collect
# 绘图
library(ggplot2)
ggplot(delay, aes(dist, delay)) +
geom_point(aes(size = count), alpha = 1/2) +
geom_smooth() +
scale_size_area(max_size = 2)

7.4窗口函数

支持dplyr的窗口函数。如下所示:

batting_tbl %>%
  select(playerID, yearID, teamID, G, AB:H) %>%
  arrange(playerID, yearID, teamID) %>%
  group_by(playerID) %>%
  filter(min_rank(desc(H)) <= 2 & H > 0)
Source:   query [?? x 7]
Database: spark connection master=local app=sparklyr local=TRUE
Groups: playerID

    playerID yearID teamID     G    AB     R     H
       <chr>  <int>  <chr> <int> <int> <int> <int>
1  anderal01   1941    PIT    70   223    32    48
2  anderal01   1942    PIT    54   166    24    45
3  balesco01   2008    WAS    15    15     1     3
4  balesco01   2009    WAS     7     8     0     1
5  bandoch01   1986    CLE    92   254    28    68
6  bandoch01   1984    CLE    75   220    38    64
7  bedelho01   1962    ML1    58   138    15    27
8  bedelho01   1968    PHI     9     7     0     1
9  biittla01   1977    CHN   138   493    74   147
10 biittla01   1975    MON   121   346    34   109
..       ...    ...    ...   ...   ...   ...   ...

更多dplyr在Spark中的用法参考http://spark.rstudio.com/dplyr.html

7.5调用MLlib

利用sparklyr包中的MLlib函数可以实现在Spark集群中调用机器学习算法。 这里,我们使用ml_linear_regression函数来拟合一个线性回归模型。数据为内置的mtcars数据集,我们想看看能否通过汽车的重量(wt)和发动机的气缸数(cyl)来预测汽车的油耗(mpg)。我们假设mpg跟这两个变量之间的关系是线性的。

# 将mtcar数据集复制到spark
mtcars_tbl <- copy_to(sc, mtcars)

# 先对数据做变换,然后将数据集分割为训练集和测试集
partitions <- mtcars_tbl %>%
filter(hp >= 100) %>%
mutate(cyl8 = cyl == 8) %>%
sdf_partition(training = 0.5, test = 0.5, seed = 1099)

# 对训练数据集做模型拟合
fit <- partitions$training %>%
ml_linear_regression(response = "mpg", features = c("wt", "cyl"))

Call:
mpg ~ wt + cyl

Coefficients:
(Intercept) wt cyl
33.499452 -2.818463 -0.923187 

对spark得到的线性回归模型,使用summary()函数可以查看模型的拟合效果以及每个预测指标的统计意义。

summary(fit)
Call:
mpg ~ wt + cyl

Residuals:
Min 1Q Median 3Q Max
-1.752 -1.134 -0.499 1.296 2.282
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) 33.49945 3.62256 9.2475 0.0002485 ***
wt -2.81846 0.96619 -2.9171 0.0331257 *
cyl -0.92319 0.54639 -1.6896 0.1518998
---
Signif. codes: 0 ‘***‘ 0.001 ‘**‘ 0.01 ‘*‘ 0.05 ‘.‘ 0.1 ‘ ‘ 1

R-Squared: 0.8274
Root Mean Squared Error: 1.422

参考:

Spark机器学习提供常用机器学习算法的实现和特征变换。更多信息请参考:

http://spark.rstudio.com/mllib.html

sparklyr包中的函数参考文档:

http://spark.rstudio.com/reference/sparklyr/latest/index.html

创建sparklyr扩展:

http://spark.rstudio.com/extensions.html

官方网站:

http://spark.rstudio.com/index.html

转载请注明地址:

http://www.cnblogs.com/homewch/p/5658970.html

时间: 2024-10-11 00:37:01

sparklyr包--实现R与Spark接口的相关文章

Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark) 概述 SparkDataFrame 启动: SparkSession 从 RStudio 来启动 创建 SparkDataFrames 从本地的 data frames 来创建 SparkDataFrames 从 Data Sources(数据源)创建 SparkDataFrame 从 Hive tables 来创建 SparkDataFrame SparkDataFrame 操作 Selecting rows(行), columns(列) Groupin

情感分析-R与spark机器学习库测试分类比较

1     环境 R 3.0以上版本 安装机器学习软件包: 说明:此两个包是R机器学习包.RTextTools包含文本处理,e1071包含分类器. > install.packages("RTextTools") > install.packages("e1071") 2     实验步骤 研究对象:http://www.xueqing.tv/cms/article/107#rd?sukey=3903d1d3b699c20870d8c0b36a06c86

rugarch包与R语言中的garch族模型

来源:http://www.dataguru.cn/article-794-1.html rugarch包是R中用来拟合和检验garch模型的一个包.该包最早在http://rgarch.r-forge.r-project.org上发布,现已发布到CRAN上.简单而言,该包主要包括四个功能: 拟合garch族模型 garch族模型诊断 garch族模型预测 模拟garch序列 拟合序列分布 下面分别说一下. 一.拟合garch族模型 拟合garch族模型分三个步骤:(1)通过ugarchspec

协同过滤算法 R/mapreduce/spark mllib多语言实现

用户电影评分数据集下载 http://grouplens.org/datasets/movielens/ 1) Item-Based,非个性化的,每个人看到的都一样 2) User-Based,个性化的,每个人看到的不一样 对用户的行为分析得到用户的喜好后,可以根据用户的喜好计算相似用户和物品,然后可以基于相似用户或物品进行推荐.这就是协同过滤中的两个分支了,基于用户的和基于物品的协同过滤. 在计算用户之间的相似度时,是将一个用户对所有物品的偏好作为一个向量,而在计算物品之间的相似度时,是将所有

使用util包里自带的接口和类实现观察者模式

之前的关于观察者模式的文章,是用自己写的Observable接口和Observer接口,然后进行实现.其实官方的util包下自带有实现观察者模式对应的接口和类,可以简化我们的代码结构. 比如我们可以直接创建一个子类通过继承 util包下的Observable类来成为被观察者,这样就不用自己去写一个接口,然后实现它的方法,或是自己创建一个List来存储所有观察者了. 我们只需要在需要通知观察者时先调用setChanged()来表示自己的数据或状态已经发生了改变,然后在调用notifyObserve

R︱Rstudio 1.0版本尝鲜(R notebook、下载链接、sparkR、代码时间测试profile)

2016年11月1日,RStudio 1.0版本正式发布!这是自2011年2月RStudio发布以来的第10个主要版本,也是更新最大的一次.主要亮点如下: 1.R Notebooks 的辅助工具(实时反馈结果,这个RMD做不来) 2.对sparklyr包的集成支持(R与Spark连接) 3.profvis包进行性能分析(代码运行步骤.代码运行时间) 4.基于readr/readxl/haven,优化数据读取(任意修改载入行名.列名以及一些字符串处理) 5.R Markdown的辅助工具(webs

R︱并行计算以及提高运算效率的方式(parallel包、clusterExport函数、foreach包、SupR包等)

终于开始攻克并行这一块了,有点小兴奋,来看看网络上R语言并行办法有哪些: 赵鹏老师(R与并行计算)做的总结已经很到位.现在并行可以分为:  隐式并行:隐式计算对用户隐藏了大部分细节,用户不需要知道具体数据分配方式 ,算法的实现或者底层的硬件资源分配.系统会根据当前的硬件资源来自动启动计算核心.显然,这种模式对于大多数用户来说是最喜闻乐见的. 显性并行:显式计算则要求用户能够自己处理算例中数据划分,任务分配,计算以及最后的结果收集.因此,显式计算模式对用户的要求更高,用户不仅需要理解自己的算法,还

R语言︱H2o深度学习的一些R语言实践——H2o包

R语言H2o包的几个应用案例 笔者寄语:受启发想了解H2o平台的一些R语言实现,网上已有一篇H2o的demo文件.笔者在这多贴一些案例,并且把自己实践的一些小例子贴出来. 关于H2o平台长啥样,可以看H2o的官网,关于深度学习长啥样,可以看一些教程,比如ParallelR博客之中的解析. 下面主要是贴几个案例,让大家看看. ------------------------------------------------------------ Matt︱R语言调用深度学习架构系列引文 R语言︱H

通过 Spark R 操作 Hive

作为数据工程师,我日常用的主力语言是R,HiveQL,Java与Scala.R是非常适合做数据清洗的脚本语言,并且有非常好用的服务端IDE--RStudio Server:而用户日志主要储存在hive中,因此用HiveQL写job也是日常工作之一:当然R的执行效率确实不靠谱,因此还需要Java(Elasticsearch,Hadoop的原生语言)与Scala(Spark的原生语言)的帮助. R和python一样也是一个很好的胶水语言,在搜索引擎的实战中,我就是用R来完成了ES集群索引的全量/增量