There Are Now 3 Apache Spark APIs. Here’s How to Choose the Right One

See Apache Spark 2.0 API Improvements: RDD, DataFrame, DataSet and SQL here.

Apache Spark is evolving at a rapid pace, including changes and additions to core APIs. One of the most disruptive areas of change is around the representation of data sets. Spark 1.0 used the RDD API but in the past twelve months, two new alternative and incompatible APIs have been introduced. Spark 1.3 introduced the radically different DataFrame API and the recently released Spark 1.6 release introduces a preview of the new Dataset API.

Many existing Spark developers will be wondering whether to jump from RDDs directly to the Dataset API, or whether to first move to the DataFrame API. Newcomers to Spark will have to choose which API to start learning with.

This article provides an overview of each of these APIs, and outlines the strengths and weaknesses of each one. A companion github repository provides working examples that are a good starting point for experimentation with the approaches outlined in this article.

Talk to a Spark expert.Contact Us.

The RDD (Resilient Distributed Dataset) API has been in Spark since the 1.0 release. This interface and its Java equivalent, JavaRDD, will be familiar to any developers who have worked through the standard Spark tutorials. From a developer’s perspective, an RDD is simply a set of Java or Scala objects representing data.

The RDD API provides many transformation methods, such as map()filter(), and reduce() for performing computations on the data. Each of these methods results in a new RDD representing the transformed data. However, these methods are just defining the operations to be performed and the transformations are not performed until an action method is called. Examples of action methods are collect() and saveAsObjectFile().

Example of RDD transformations and actions

Scala:

1

2

3

rdd.filter(_.age > 21)              // transformation

.map(_.last)                     // transformation

.saveAsObjectFile("under21.bin") // action

Java:

1

2

3

rdd.filter(p -> p.getAge() < 21)     // transformation

.map(p -> p.getLast())            // transformation

.saveAsObjectFile("under21.bin"); // action

The main advantage of RDDs is that they are simple and well understood because they deal with concrete classes, providing a familiar object-oriented programming style with compile-time type-safety. For example, given an RDD containing instances of Person we can filter by age by referencing the age attribute of each Person object:

Example: Filter by attribute with RDD

Scala:

1

rdd.filter(_.age > 21)

Java:

1

rdd.filter(person -> person.getAge() > 21)

The main disadvantage to RDDs is that they don’t perform particularly well. Whenever Spark needs to distribute the data within the cluster, or write the data to disk, it does so using Java serialization by default (although it is possible to use Kryo as a faster alternative in most cases). The overhead of serializing individual Java and Scala objects is expensive and requires sending both data and structure between nodes (each serialized object contains the class structure as well as the values). There is also the overhead of garbage collection that results from creating and destroying individual objects.

DataFrame API

Spark 1.3 introduced a new DataFrame API as part of the Project Tungsten initiative which seeks to improve the performance and scalability of Spark. The DataFrame API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization. There are also advantages when performing computations in a single process as Spark can serialize the data into off-heap storage in a binary format and then perform many transformations directly on this off-heap memory, avoiding the garbage-collection costs associated with constructing individual objects for each row in the data set. Because Spark understands the schema, there is no need to use Java serialization to encode the data.

The DataFrame API is radically different from the RDD API because it is an API for building a relational query plan that Spark’s Catalyst optimizer can then execute. The API is natural for developers who are familiar with building query plans, but not natural for the majority of developers. The query plan can be built from SQL expressions in strings or from a more functional approach using a fluent-style API.

Example: Filter by attribute with DataFrame

Note that these examples have the same syntax in both Java and Scala

SQL Style

1

df.filter("age > 21");

Expression builder style:

1

df.filter(df.col("age").gt(21));

Because the code is referring to data attributes by name, it is not possible for the compiler to catch any errors. If attribute names are incorrect then the error will only detected at runtime, when the query plan is created.

Another downside with the DataFrame API is that it is very scala-centric and while it does support Java, the support is limited. For example, when creating a DataFrame from an existing RDD of Java objects, Spark’s Catalyst optimizer cannot infer the schema and assumes that any objects in the DataFrame implement the scala.Product interface. Scala case classes work out the box because they implement this interface.

Dataset API

The Dataset API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.

When it comes to serializing data, the Dataset API has the concept of encoders which translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object. Spark does not yet provide an API for implementing custom encoders, but that is planned for a future release.

Additionally, the Dataset API is designed to work equally well with both Java and Scala. When working with Java objects, it is important that they are fully bean-compliant. In writing the examples to accompany this article, we ran into errors when trying to create a Dataset in Java from a list of Java objects that were not fully bean-compliant.

Need help with Spark APIs? Contact Us.

Example: Creating Dataset from a list of objects

Scala

1

2

3

4

5

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val sampleData: Seq[ScalaPerson] = ScalaData.sampleData()

val dataset = sqlContext.createDataset(sampleData)

Java

1

2

3

4

JavaSparkContext sc = new JavaSparkContext(sparkConf);

SQLContext sqlContext = new SQLContext(sc);

List data = JavaData.sampleData();

Dataset dataset = sqlContext.createDataset(data, Encoders.bean(JavaPerson.class));

Transformations with the Dataset API look very much like the RDD API and deal with the Person class rather than an abstraction of a row.

Example: Filter by attribute with Dataset

Scala

1

dataset.filter(_.age < 21);

Java

1

dataset.filter(person -> person.getAge() < 21);

Despite the similarity with RDD code, this code is building a query plan, rather than dealing with individual objects, and if age is the only attribute accessed, then the rest of the the object’s data will not be read from off-heap storage.

Get started with your Big Data Strategy. Contact Us.

Conclusions

If you are developing primarily in Java then it is worth considering a move to Scala before adopting the DataFrame or Dataset APIs. Although there is an effort to support Java, Spark is written in Scala and the code often makes assumptions that make it hard (but not impossible) to deal with Java objects.

If you are developing in Scala and need your code to go into production with Spark 1.6.0 then the DataFrame API is clearly the most stable option available and currently offers the best performance.

However, the Dataset API preview looks very promising and provides a more natural way to code. Given the rapid evolution of Spark it is likely that this API will mature very quickly through 2016 and become the de-facto API for developing new applications.

See Apache Spark 2.0 API Improvements: RDD, DataFrame, DataSet and SQL here.

原文地址:https://www.cnblogs.com/felixzh/p/9527754.html

时间: 2024-10-03 23:27:36

There Are Now 3 Apache Spark APIs. Here’s How to Choose the Right One的相关文章

Apache Spark(转)

来自:维基百科,自由的百科全书 Apache Spark是一个开源簇运算框架,最初是由加州大学柏克莱分校AMPLab所开发.相对于Hadoop的MapReduce会在运行完工作后将中介数据存放到磁盘中,Spark使用了内存内运算技术,能在数据尚未写入硬盘时即在内存内分析运算.Spark在内存内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍,即便是运行程序于硬盘时,Spark也能快上10倍速度.[1]Spark允许用户将数据加载至簇内存,并多次对其进行查询,非常适

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Why Apache Spark is a Crossover Hit for Data Scientists [FWD]

Spark is a compelling multi-purpose platform for use cases that span investigative, as well as operational, analytics. Data science is a broad church. I am a data scientist — or so I’ve been told — but what I do is actually quite different from what

Apache Spark 2.0三种API的传说:RDD、DataFrame和Dataset

Apache Spark吸引广大社区开发者的一个重要原因是:Apache Spark提供极其简单.易用的APIs,支持跨多种语言(比如:Scala.Java.Python和R)来操作大数据. 本文主要讲解Apache Spark 2.0中RDD,DataFrame和Dataset三种API:它们各自适合的使用场景:它们的性能和优化:列举使用DataFrame和DataSet代替RDD的场景.文章大部分聚焦DataFrame和Dataset,因为这是Apache Spark 2.0的API统一的重

Real Time Credit Card Fraud Detection with Apache Spark and Event Streaming

https://mapr.com/blog/real-time-credit-card-fraud-detection-apache-spark-and-event-streaming/ Editor's Note: Have questions about the topics discussed in this post? Search for answers and post questions in the Converge Community. In this post we are

Apache Spark 2.3 重要特性介绍

为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2 3 在许多模块都做了重要的更新,比如 Structured Streaming 引入了低延迟的连续处理(continuous processing):支持 stream-to-stream joins: 为了继续实现 Spark 更快,更轻松,更智能的目标,Spark 2.3 在许多模块都做了重要的更新,比如 Structured Streaming 引入了低延迟的连续处理(continuous processing):支持

Apache Spark 1.4 读取 hadoop 2.6 文件系统上文件

scala> val file = sc.textFile("hdfs://9.125.73.217:9000/user/hadoop/logs") scala> val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_) scala> count.collect() 以Spark上经典的wordcount为例,验证sp

mllib:Exception in thread &quot;main&quot; org.apache.spark.SparkException: Input validation failed.

当我们使用mllib做分类,用到逻辑回归或线性支持向量机做分类时,可能会出现下面的错误: 15/04/09 21:27:25 ERROR DataValidators: Classification labels should be 0 or 1. Found 3000000 invalid labels Exception in thread "main" org.apache.spark.SparkException: Input validation failed. 由于做调试时