Extending sparklyr to Compute Cost for K-means on YARN Cluster with Spark ML Library

Machine and statistical learning wizards are becoming more eager to perform analysis with Spark MLlibrary if this is only possible. It’s trendy, posh, spicy and gives the feeling of doing state of the art machine learning and being up to date with the newest computational trends. It is even more sexy and powerful when computations can be performed on the extraordinarily enormous computation cluster - let’s say 100 machines on YARN hadoop cluster makes you the real data cruncher! In this post I presentsparklyr package (by RStudio), the connector that will transform you from a regular R user, to the supa! data scientist that can invoke Scala code to perform machine learning algorithms on YARN cluster just from RStudio! Moreover, I present how I have extended the interface to K-means procedure, so that now it is also possible to compute cost for that model, which might be beneficial in determining the number of clusters in segmentation problems. Thought about learnig Scala? Leave it - user sparklyr!

If you don’t know much about Spark yet, you can read my April post Answers to FAQ about SparkR for R users - where I explained how could we use SparkR package that is distributed with Spark. Many things (code) might have changed since that time, due to the rapid development caused by great popularity of Spark. Now we can use version 2.0.0 of Spark. If you are migrating from previous versions I suggest you should look at Migration Guide - Upgrading From SparkR 1.6.x to 2.0.

sparklyr basics

This packages is based on sparkapi package that enables to run Spark applications locally or on YARN cluster just from R. It translates R code to bash invocation of spark-shell. It’s biggest advantage is dplyrinterface for working with Spark Data Frames (that might be Hive Tables) and possibility to invoke algorithms from Spark ML library.

Installation of sparklyr, then Spark itself and simple application initiation is described by this code

library(devtools)
install_github(‘rstudio/sparklyr‘)
library(sparklyr)
spark_install(version = "2.0.0")
sc <-
spark_connect(master="yarn",
   config = list(
     default = list(
       spark.submit.deployMode= "client",
       spark.executor.instances= 20,
       spark.executor.memory= "2G",
       spark.executor.cores= 4,
       spark.driver.memory= "4G")))

One don’t have to specify config by himself, but if this is desired then remember that you could also specify parameters for Spark application with config.yml files so that you can benefit from many profiles (development, production). In version 2.0.0 it is desired to name master yarn instead of yarn-client and passing the deployMode parameter, which is different from version 1.6.x. All available parameters can be found in Running Spark on YARN documentation page.

dplyr and DBI interface on Spark

When connecting to YARN, it is most probable that you would like to use data tables that are stored on Hive. Remember that

Configuration of Hive is done by placing your hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) file in conf/.

where conf/ is set as HADOOP_CONF_DIR. Read more about using Hive tables from Spark

If everything is set up and the application runs properly, you can use dplyr interface to provide lazy evaluation for data manipulations. Data are stored on Hive, Spark application runs on YARN cluster, and the code is invoked from R in the simple language of data transformations (dplyr) - everything thanks to sparklyr team great job! Easy example is below

library(dplyr)
# give the list of tables
src_tbls(sc)
# copies iris from R to Hive
iris_tbl <- copy_to(sc, iris, "iris")
# create a hook for data stored on Hive
data_tbl <- tbl(sc, "table_name")
data_tbl2 <- tbl(sc, sql("SELECT * from table_name"))

You can also perform any operation on datasets use by Spark

iris_tbl %>%
   select(Petal_Length, Petal_Width) %>%
   top_n(40, Petal_Width) %>%
   arrange(Petal_Length)

Note that original commas in iris names have been translated to _.

This package also provides interface for functions defined in DBI package

library(DBI)
dbListTables(sc)
dbGetQuery(sc, "use database_name")
data_tbl3 <- dbGetQuery(sc, "SELECT * from table_name")
dbListFields(sc, data_tbl3)

Running Spark ML Machine Learning K-means Algorithm from R

The basic example on how sparklyr invokes Scala code from Spark ML will be presented on K-means algorithm. If you check the code of sparklyr::ml_kmeans function you will see that for inputtbl_spark object, named x and character vector containing features’ names (featuers)

envir <- new.env(parent = emptyenv())
df <- spark_dataframe(x)
sc <- spark_connection(df)
df <- ml_prepare_features(df, features)
tdf <- ml_prepare_dataframe(df, features, ml.options = ml.options, envir = envir)

sparklyr ensures that you have proper connection to spark data frame and prepares features in convenient form and naming convention. At the end it prepares a Spark DataFrame for Spark ML routines.

This is done in a new environment, so that we can store arguments for future ML algorithm and the model itself in its own environment. This is safe and clean solution. You can construct a simple model calling a Spark ML class like this

envir$model <- "org.apache.spark.ml.clustering.KMeans"
kmeans <- invoke_new(sc, envir$model)

which invokes new object of class KMeans on which we can invoke parameters setters to change default parameters like this

model <- kmeans %>%
    invoke("setK", centers) %>%
    invoke("setMaxIter", iter.max) %>%
    invoke("setTol", tolerance) %>%
    invoke("setFeaturesCol", envir$features)
# features where set in ml_prepare_dataframe

For an existing object of KMeans class we can invoke its method called fit that is responsible for starting the K-means clustering algorithm

fit <- model %>%
invoke("fit", tdf)

which returns new object on which we can compute, e.g centers of outputted clustering

kmmCenters <- invoke(fit, "clusterCenters")

or the Within Set Sum of Squared Errors (called Cost) (which is mine small contribution #173 )

kmmCost <- invoke(fit, "computeCost", tdf)

This sometimes helps to decide how many clusters should we specify for clustering problem

and is presented in print method for ml_model_kmeans object

iris_tbl %>%
   select(Petal_Width, Petal_Length) %>%
   ml_kmeans(centers = 3, compute.cost = TRUE) %>%
   print()

K-means clustering with 3 clusters

Cluster centers:
  Petal_Width Petal_Length
1    1.359259     4.292593
2    2.047826     5.626087
3    0.246000     1.462000

Within Set Sum of Squared Errors =  31.41289

All that can be better understood if we’ll have a look on Spark ML docuemtnation for KMeans (be carefull not to confuse with Spark MLlib where methods and parameters have different names than those in Spark ML). This enabled me to provide simple update for ml_kmeans() (#179) so that we can specify tol (tolerance) parameter in ml_kmeans() to support tolerance of convergence.

inShare37

BioC 2016 Conference Overview and Few Ways of Downloading TCGA Data

Few weeks ago I have a great pleasure of attending BioC 2016: Where Software and Biology Connect Conference at Stanford, where I have learned a lot! It wouldn’t be possible without the scholarship that I received from Bioconductor (organizers), which I deeply appreciate. It was an excellent place for software developers, statisticians and biologists to exchange their experiences and to better explain their work, as the understanding between collaborators in interdisciplinary teams is essential. In this post I present my thoughts and feelings about the event and I share the knowledge that I have learned during the event, i.e. about many ways of downloading The Cancer Genome Atlas data.

转自:http://r-addict.com/2016/08/25/Extending-Sparklyr.html

时间: 2024-08-06 03:16:59

Extending sparklyr to Compute Cost for K-means on YARN Cluster with Spark ML Library的相关文章

软件——机器学习与Python,聚类,K——means

K-means是一种聚类算法: 这里运用k-means进行31个城市的分类 城市的数据保存在city.txt文件中,内容如下: BJ,2959.19,730.79,749.41,513.34,467.87,1141.82,478.42,457.64TianJin,2459.77,495.47,697.33,302.87,284.19,735.97,570.84,305.08HeBei,1495.63,515.90,362.37,285.32,272.95,540.58,364.91,188.63

k means聚类过程

k-means是一种非监督 (从下图0 当中我们可以看到训练数据并没有标签标注类别)的聚类算法 0.initial 1.select centroids randomly 2.assign points 3.update centroids 4.reassign points 5.update centroids 6.reassign points 7.iteration reference: https://www.naftaliharris.com/blog/visualizing-k-me

Sklearn 速查

## 版权所有,转帖注明出处 章节 SciKit-Learn 加载数据集 SciKit-Learn 数据集基本信息 SciKit-Learn 使用matplotlib可视化数据 SciKit-Learn 可视化数据:主成分分析(PCA) SciKit-Learn 预处理数据 SciKit-Learn K均值聚类 SciKit-Learn 支持向量机 SciKit-Learn 速查 Scikit-learn是一个开源Python库,它使用统一的接口实现了一系列机器学习.预处理.交叉验证和可视化算法

Extending the Yahoo! Streaming Benchmark

could accomplish with Flink back at Twitter. I had an application in mind that I knew I could make more efficient by a huge factor if I could use the stateful processing guarantees available in Flink so I set out to build a prototype to do exactly th

快速查找无序数组中的第K大数?

1.题目分析: 查找无序数组中的第K大数,直观感觉便是先排好序再找到下标为K-1的元素,时间复杂度O(NlgN).在此,我们想探索是否存在时间复杂度 < O(NlgN),而且近似等于O(N)的高效算法. 还记得我们快速排序的思想麽?通过“partition”递归划分前后部分.在本问题求解策略中,基于快排的划分函数可以利用“夹击法”,不断从原来的区间[0,n-1]向中间搜索第k大的数,大概搜索方向见下图: 2.参考代码: 1 #include <cstdio> 2 3 #define sw

【HDOJ】1385 Minimum Transport Cost

Floyd.注意字典序!!! 1 #include <stdio.h> 2 #include <string.h> 3 4 #define MAXNUM 55 5 #define INF 0x1fffffff 6 7 int cost[MAXNUM][MAXNUM]; 8 int path[MAXNUM][MAXNUM]; 9 int taxes[MAXNUM]; 10 int que[MAXNUM]; 11 int n; 12 13 void floyd(int n) { 14

POJ 2516 Minimum Cost (最小费用最大流)

Minimum Cost Time Limit: 4000MS   Memory Limit: 65536K       Description Dearboy, a goods victualer, now comes to a big problem, and he needs your help. In his sale area there are N shopkeepers (marked from 1 to N) which stocks goods from him.Dearboy

HDU 1385 Minimum Transport Cost (字典序打印最短路)

题意  给你一个无向图的邻接矩阵  和途径每个点需要的额外花费首尾没有额外花费  求图中某两点之间的最短路并打印字典序最小路径 要求多组点之间的就用floyd咯  打印路径也比较方便  nex[i][j]表示从i点到j点最短路的第一个途经点  那么如果路径中加入一个节点k后 nex[i][j]应该更新为nex[i][k]  因为要途径k了 #include<cstdio> #include<cstring> using namespace std; const int N = 10

maximum sum of a subarray with at-least k elements.

// Returns maximum sum of a subarray with at-least // k elements. static int maxSumWithK(int a[], int n, int k) { // maxSum[i] is going to store maximum sum // till index i such that a[i] is part of the // sum. int maxSum[] = new int [n]; maxSum[0] =