Managing Spark data handles in R

When working with big data with R (say, using Spark and sparklyr) we have found it very convenient to keep data handles in a neat list ordata_frame.

Please read on for our handy hints on keeping your data handles neat.

When using R to work over a big data system (such as Spark) much of your work is over "data handles" and not actual data (data handles are objects that control access to remote data).

Data handles are a lot like sockets or file-handles in that they can not be safely serialized and restored (i.e., you can not save them into a .RDS file and then restore them into another session). This means when you are starting or re-starting a project you must "ready" all of your data references. Your projects will be much easier to manage and document if you load your references using the methods we show below.

Let’s set-up our example Spark cluster:

library("sparklyr")
#packageVersion(‘sparklyr‘)
suppressPackageStartupMessages(library("dplyr"))
#packageVersion(‘dplyr‘)
suppressPackageStartupMessages(library("tidyr"))

# Please see the following video for installation help
#  https://youtu.be/qnINvPqcRvE
# spark_install(version = "2.0.2")

# set up a local "practice" Spark instance
sc <- spark_connect(master = "local",
                    version = "2.0.2")
#print(sc)

Data is much easier to manage than code, and much easier to compute over. So the more information you can keep as pure data the better off you will be. In this case we are loading the chosen names and paths ofparquet data we wish to work with from an external file that is easy for the user to edit.

# Read user‘s specification of files and paths.
userSpecification <- read.csv(‘tableCollection.csv‘,
                             header = TRUE,
                 strip.white = TRUE,
                 stringsAsFactors = FALSE)
print(userSpecification)
##   tableName tablePath
## 1   data_01   data_01
## 2   data_02   data_02
## 3   data_03   data_03

We can now read these parquet files (usually stored in Hadoop) into ourSpark environment as follows.

readParquets <- function(userSpecification) {
  userSpecification <- as_data_frame(userSpecification)
  userSpecification$handle <- lapply(
    seq_len(nrow(userSpecification)),
    function(i) {
      spark_read_parquet(sc,
                         name = userSpecification$tableName[[i]],
                         path = userSpecification$tablePath[[i]])
    }
  )
  userSpecification
}

tableCollection <- readParquets(userSpecification)
print(tableCollection)
## # A tibble: 3 x 3
##   tableName tablePath          handle
##       <chr>     <chr>          <list>
## 1   data_01   data_01 <S3: tbl_spark>
## 2   data_02   data_02 <S3: tbl_spark>
## 3   data_03   data_03 <S3: tbl_spark>

data.frame is a great place to keep what you know about your Sparkhandles in one place. Let’s add some details to our Spark handles.

addDetails <- function(tableCollection) {
  tableCollection <- as_data_frame(tableCollection)
  # get the references
  tableCollection$handle <-
    lapply(tableCollection$tableName,
           function(tableNamei) {
             dplyr::tbl(sc, tableNamei)
           })

  # and tableNames to handles for convenience
  # and printing
  names(tableCollection$handle) <-
    tableCollection$tableName

  # add in some details (note: nrow can be expensive)
  tableCollection$nrow <- vapply(tableCollection$handle,
                                 nrow,
                                 numeric(1))
  tableCollection$ncol <- vapply(tableCollection$handle,
                                 ncol,
                                 numeric(1))
  tableCollection
}

tableCollection <- addDetails(userSpecification)

# convenient printing
print(tableCollection)
## # A tibble: 3 x 5
##   tableName tablePath          handle  nrow  ncol
##       <chr>     <chr>          <list> <dbl> <dbl>
## 1   data_01   data_01 <S3: tbl_spark>    10     1
## 2   data_02   data_02 <S3: tbl_spark>    10     2
## 3   data_03   data_03 <S3: tbl_spark>    10     3
# look at the top of each table (also forces
# evaluation!).
lapply(tableCollection$handle,
       head)
## $data_01
## Source:   query [6 x 1]
## Database: spark connection master=local[4] app=sparklyr local=TRUE
##
## # A tibble: 6 x 1
##        a_01
##       <dbl>
## 1 0.8274947
## 2 0.2876151
## 3 0.6638404
## 4 0.1918336
## 5 0.9111187
## 6 0.8802026
##
## $data_02
## Source:   query [6 x 2]
## Database: spark connection master=local[4] app=sparklyr local=TRUE
##
## # A tibble: 6 x 2
##        a_02       b_02
##       <dbl>      <dbl>
## 1 0.3937457 0.34936496
## 2 0.0195079 0.74376380
## 3 0.9760512 0.00261368
## 4 0.4388773 0.70325800
## 5 0.9747534 0.40327283
## 6 0.6054003 0.53224218
##
## $data_03
## Source:   query [6 x 3]
## Database: spark connection master=local[4] app=sparklyr local=TRUE
##
## # A tibble: 6 x 3
##         a_03      b_03        c_03
##        <dbl>     <dbl>       <dbl>
## 1 0.59512263 0.2615939 0.592753768
## 2 0.72292799 0.7287428 0.003926143
## 3 0.51846687 0.3641869 0.874463146
## 4 0.01174093 0.9648346 0.177722575
## 5 0.86250126 0.3891915 0.857614579
## 6 0.33082723 0.2633013 0.233822140

A particularly slick trick is to expand the columns column into a taller table that allows us to quickly identify which columns are in which tables.

columnDictionary <- function(tableCollection) {
  tableCollection$columns <-
    lapply(tableCollection$handle,
           colnames)
  columnMap <- tableCollection %>%
    select(tableName, columns) %>%
    unnest(columns)
  columnMap
}

columnMap <- columnDictionary(tableCollection)
print(columnMap)
## # A tibble: 6 x 2
##   tableName columns
##       <chr>   <chr>
## 1   data_01    a_01
## 2   data_02    a_02
## 3   data_02    b_02
## 4   data_03    a_03
## 5   data_03    b_03
## 6   data_03    c_03

The idea is: place all of the above functions into a shared script or package, and then use them to organize loading your Spark data references. With this practice you will have much less "spaghetti code", better document intent, and have a versatile workflow.

The principles we are using include:

  • Keep configuration out of code (i.e., maintain the file list in a spreadsheet). This makes working with others much easier.
  • Treat configuration as data (i.e., make sure the configuration is a nice regular table so that you can use R tools such as tidyr::unnest() to work with it).

转自:http://www.win-vector.com/blog/2017/05/managing-spark-data-handles-in-r/

时间: 2024-12-03 03:52:48

Managing Spark data handles in R的相关文章

7 Tools for Data Visualization in R, Python, and Julia

7 Tools for Data Visualization in R, Python, and Julia Last week, some examples of creating visualizations with htmlwidgets and R were presented. Fortunately, there are many more options available for creating nice visualizations. Tools and libraries

Managing Hierarchical Data in MySQL(邻接表模型)[转载]

原文在:http://dev.mysql.com/tech-resources/articles/hierarchical-data.html 来源: http://www.cnblogs.com/phaibin/archive/2009/06/09/1499687.html 译文:Yimin 引言 大多数用户都曾在数据库中处理过分层数据(hierarchical data),认为分层数据的管理不是关系数据库的目的.之所以这么认为,是因为关系数据库中的表没有层次关系,只是简单的平面化的列表:而分

Java生成-zipf分布的数据集(自定义倾斜度,用作spark data skew测试)

1.代码 import java.io.Serializable; import java.util.NavigableMap; import java.util.Random; import java.util.TreeMap; public class Zifp_gen implements Serializable { private Random random = new Random(0); NavigableMap<Double, Integer> map; private sta

Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark) 概述 SparkDataFrame 启动: SparkSession 从 RStudio 来启动 创建 SparkDataFrames 从本地的 data frames 来创建 SparkDataFrames 从 Data Sources(数据源)创建 SparkDataFrame 从 Hive tables 来创建 SparkDataFrame SparkDataFrame 操作 Selecting rows(行), columns(列) Groupin

Data manipulation primitives in R and Python

Data manipulation primitives in R and Python Both R and Python are incredibly good tools to manipulate your data and their integration is becoming increasingly important1. The latest tool for data manipulation in R is Dplyr2 whilst Python relies onPa

Econ 3818 R data project

Econ 3818Spring 2019R data project Unlike your other R assignments, this assignment is individual work only. You may discuss your project with classmates but you must have your own unique project.. Final write-up is due via email by 5 pm on Monday, A

sparklyr-R语言访问Spark的另外一种方法

Connect to Spark from R. The sparklyr package provides a complete dplyr backend. Filter and aggregate Spark datasets then bring them into R for analysis and visualization. Use Spark<u+2019>s distributed machine learning library from R. Create extens

Spark机器学习读书笔记-CH05

5.2.从数据中提取合适的特征 [[email protected] ch05]# sed 1d train.tsv > train_noheader.tsv[[email protected] ch05]# lltotal 42920-rw-r--r-- 1 root root 21972457 Jan 31 15:03 train_noheader.tsv-rw-r--r-- 1 root root 21972916 Jan 31 15:00 train.tsv[[email protect

Indexing Sensor Data

In particular embodiments, a method includes, from an indexer in a sensor network, accessing a set of sensor data that includes sensor data aggregated together from sensors in the sensor network, one or more time stamps for the sensor data, and metad