Homework 2: UDF Caching in Spark

为spark编写UDF cache:

作业介绍 https://github.com/cs186-spring15/course/tree/master/hw2

我花了点时间做了下,觉得是学习spark sql和scala的好材料。现在把我写的作业记录如下:

Task #1: Implementing DiskPartition and GeneralDiskHashedRelation

Task #2: Implementing object DiskHashedRelation

DiskPartition.scala

package org.apache.spark.sql.execution

import java.io._

import java.nio.file.{Path, StandardOpenOption, Files}

import java.util.{ArrayList => JavaArrayList}

import org.apache.spark.SparkException

import org.apache.spark.sql.catalyst.expressions.{Projection, Row}

import org.apache.spark.sql.execution.CS186Utils._

import scala.collection.JavaConverters._

/**

* This trait represents a regular relation that is hash partitioned and spilled to

* disk.

*/

private[sql] sealed trait DiskHashedRelation {

/**

*

* @return an iterator of the [[DiskPartition]]s that make up this relation.

*/

def getIterator(): Iterator[DiskPartition]

/**

* Close all the partitions for this relation. This should involve deleting the files hashed into.

*/

def closeAllPartitions()

}

/**

* A general implementation of [[DiskHashedRelation]].

*

* @param partitions the disk partitions that we are going to spill to

*/

protected [sql] final class GeneralDiskHashedRelation(partitions: Array[DiskPartition])

extends DiskHashedRelation with Serializable {

override def getIterator() = {

// IMPLEMENT ME

partitions.iterator

//null

}

override def closeAllPartitions() = {

// IMPLEMENT ME

partitions.foreach((_: DiskPartition).closePartition())

}

}

private[sql] class DiskPartition (

filename: String,

blockSize: Int) {

private val path: Path = Files.createTempFile("", filename)

private val data: JavaArrayList[Row] = new JavaArrayList[Row]

private val outStream: OutputStream = Files.newOutputStream(path)

private val inStream: InputStream = Files.newInputStream(path)

private val chunkSizes: JavaArrayList[Int] = new JavaArrayList[Int]()

private var writtenToDisk: Boolean = false

private var inputClosed: Boolean = false

/**

* This method inserts a new row into this particular partition. If the size of the partition

* exceeds the blockSize, the partition is spilled to disk.

*

* @param row the [[Row]] we are adding

*/

def insert(row: Row) = {

// IMPLEMENT ME

if (inputClosed) {

throw new SparkException("The partition is closed!")

}

data.add(row)

val partitionDataSize = measurePartitionSize()

if (partitionDataSize >blockSize) {

spillPartitionToDisk()

data.clear()

}

}

/**

* This method converts the data to a byte array and returns the size of the byte array

* as an estimation of the size of the partition.

*

* @return the estimated size of the data

*/

private[this] def measurePartitionSize(): Int = {

CS186Utils.getBytesFromList(data).size

}

/**

* Uses the [[Files]] API to write a byte array representing data to a file.

*/

private[this] def spillPartitionToDisk() = {

val bytes: Array[Byte] = getBytesFromList(data)

// This array list stores the sizes of chunks written in order to read them back correctly.

chunkSizes.add(bytes.size)

Files.write(path, bytes, StandardOpenOption.APPEND)

writtenToDisk = true

}

/**

* If this partition has been closed, this method returns an Iterator of all the

* data that was written to disk by this partition.

*

* @return the [[Iterator]] of the data

*/

def getData(): Iterator[Row] = {

if (!inputClosed) {

throw new SparkException("Should not be reading from file before closing input. Bad things will happen!")

}

new Iterator[Row] {

var currentIterator: Iterator[Row] = data.iterator.asScala

val chunkSizeIterator: Iterator[Int] = chunkSizes.iterator().asScala

var byteArray: Array[Byte] = null

override def next() = {

// IMPLEMENT ME

currentIterator.next()

//null

}

override def hasNext() = {

// IMPLEMENT ME

var hasNext = currentIterator.hasNext

if(!hasNext){

hasNext = chunkSizeIterator.hasNext

if(hasNext){

fetchNextChunk()

}

}

hasNext

// false

}

/**

* Fetches the next chunk of the file and updates the iterator. Should return true

* unless the iterator is empty.

*

* @return true unless the iterator is empty.

*/

private[this] def fetchNextChunk(): Boolean = {

// IMPLEMENT ME

if (!chunkSizeIterator.hasNext) {

return false

}

val size = chunkSizeIterator.next()

if (size <= 0) {

return false

}

byteArray = CS186Utils.getNextChunkBytes(inStream, size,byteArray)

currentIterator = CS186Utils.getListFromBytes(byteArray).iterator.asScala

true

}

}

}

/**

* Closes this partition, implying that no more data will be written to this partition. If getData()

* is called without closing the partition, an error will be thrown.

*

* If any data has not been written to disk yet, it should be written. The output stream should

* also be closed.

*/

def closeInput() = {

// IMPLEMENT ME

if(!data.isEmpty){

//println(data.size())

spillPartitionToDisk()

data.clear()

}

inputClosed = true

}

/**

* Closes this partition. This closes the input stream and deletes the file backing the partition.

*/

private[sql] def closePartition() = {

inStream.close()

Files.deleteIfExists(path)

}

}

private[sql] object DiskHashedRelation {

/**

* Given an input iterator, partitions each row into one of a number of [[DiskPartition]]s

* and constructors a [[DiskHashedRelation]].

*

* This executes the first phase of external hashing -- using a course-grained hash function

* to partition the tuples to disk.

*

* The block size is approximately set to 64k because that is a good estimate of the average

* buffer page.

*

* @param input the input [[Iterator]] of [[Row]]s

* @param keyGenerator a [[Projection]] that generates the keys for the input

* @param size the number of [[DiskPartition]]s

* @param blockSize the threshold at which each partition will spill

* @return the constructed [[DiskHashedRelation]]

*/

def apply (

input: Iterator[Row],

keyGenerator: Projection,

size: Int = 64,

blockSize: Int = 64000) = {

// IMPLEMENT ME

val partitionList: JavaArrayList[DiskPartition] = genDiskPartition(size, blockSize)

input.foreach { (row: Row) => {

val rowWithKey = keyGenerator(row)

val index = rowWithKey.hashCode() % size

partitionList.get(index).insert(row)

}

}

val partitions: Array[DiskPartition] = partitionList.toArray(new Array[DiskPartition](size))

partitionList.toArray(new Array[DiskPartition](size)).foreach((_: DiskPartition).closeInput())

new GeneralDiskHashedRelation(partitions)

//null

}

def genDiskPartition(size: Int, blockSize: Int): JavaArrayList[DiskPartition] = {

val partitionList: JavaArrayList[DiskPartition] = new JavaArrayList[DiskPartition]

(0 to size-1).foreach { (i: Int) => {

partitionList.add(new DiskPartition("partition" + i, blockSize))

}

}

partitionList

}

}

Task #3: Implementing CS186Utils methods

CS186Utils.scala


package org.apache.spark.sql.execution

import java.io._

import java.util.{ArrayList => JavaArrayList, HashMap => JavaHashMap}

import org.apache.spark.sql.catalyst.expressions._

object CS186Utils {

/**

* Returns a Scala array that contains the bytes representing a Java ArrayList.

*

* @param data the Java ArrayList we are converting

* @return an array of bytes

*/

def getBytesFromList(data: JavaArrayList[Row]): Array[Byte] = {

// create a ObjectOutputStream backed by a ByteArrayOutputStream

val bytes = new ByteArrayOutputStream()

val out = new ObjectOutputStream(bytes)

// write the object to the output

out.writeObject(data)

out.flush()

out.close()

bytes.close()

// return the byte array

bytes.toByteArray

}

/**

* Converts an array of bytes into a JavaArrayList of type [[Row]].

*

* @param bytes the input byte array

* @return a [[JavaArrayList]] of Rows

*/

def getListFromBytes(bytes: Array[Byte]): JavaArrayList[Row] = {

val result: JavaArrayList[Row] = new JavaArrayList[Row]()

var temp: JavaArrayList[Row] = null

// create input streams based on the input bytes

val bytesIn = new ByteArrayInputStream(bytes)

var in = new ObjectInputStream(bytesIn)

try {

// read each object in and attempt to interpret it as a JavaArrayList[Row]

while ((temp = in.readObject() match {

case value: JavaArrayList[Row] => value

case _: Throwable => throw new RuntimeException(s"Unexpected casting exception while reading from file.")

}) != null) {

// if it succeeds, add it to the result

result.addAll(temp)

// we need to create a new ObjectInputStream for each new object we read because of Java stream quirks

in = new ObjectInputStream(bytesIn)

}

} catch {

// ObjectInputStream control flow dictates that an EOFException will be thrown when the file is over -- this is expected

case e: EOFException => // do nothing

case other: Throwable => throw other

}

result

}

/**

* Reads the next nextChunkSize bytes from the input stream provided. If the previous array read into is availab

* please provide it so as to avoid allocating new object unless absolutely necessary.

*

* @param inStream the input stream we are reading from

* @param nextChunkSize the number of bytes to read

* @param previousArray the previous array we read into

* @return

*/

def getNextChunkBytes(inStream: InputStream, nextChunkSize: Int, previousArray: Array[Byte] = null): Array[Byte] = {

var byteArray = previousArray

if (byteArray == null || byteArray.size != nextChunkSize) {

byteArray = new Array[Byte](nextChunkSize)

}

// Read the bytes in.

inStream.read(byteArray)

byteArray

}

/**

* Return a new projection operator.

*

* @param expressions

* @param inputSchema

* @return

*/

def getNewProjection(

expressions: Seq[Expression],

inputSchema: Seq[Attribute]) = new InterpretedProjection(expressions, inputSchema)

/**

* This function returns the [[ScalaUdf]] from a sequence of expressions. If there is no UDF in the

* sequence of expressions then it returns null. If there is more than one, it returns the one that is

* sequentially last.

*

* @param expressions

* @return

*/

def getUdfFromExpressions(expressions: Seq[Expression]): ScalaUdf = {

// IMPLEMENT ME

var udf: ScalaUdf = null

expressions.foreach { (expression: Expression) => {

if (expression.isInstanceOf[ScalaUdf]) udf = expression.asInstanceOf[ScalaUdf]

}

}

udf

}

/**

* This function takes a sequence of expressions. If there is no UDF in the sequence of expressions, it does

* a regular projection operation.

*

* If there is a UDF, then it creates a caching iterator that caches the result of the UDF.

*

* NOTE: This only works for a single UDF. If there are multiple UDFs, then it will only cache for the last UDF

* and execute all other UDFs regularly.

*

* @param expressions

* @param inputSchema

* @return

*/

def generateCachingIterator(

expressions: Seq[Expression],

inputSchema: Seq[Attribute]): (Iterator[Row] => Iterator[Row]) = {

// Get the UDF from the expressions.

val udf: ScalaUdf = CS186Utils.getUdfFromExpressions(expressions)

udf match {

/* If there is no UDF, then do a regular projection operation. Note that this is very similar to Project in

basicOperators.scala */

case null => {

{ input =>

val projection = CS186Utils.getNewProjection(expressions, inputSchema)

input.map(projection)

}

/* def aaa (input: Iterator[Row]) : Iterator[Row] = {

val projection = CS186Utils.getNewProjection(expressions, inputSchema)

input.map(projection)

}

aaa*/

}

// Otherwise, separate the expressions appropriately and creating a caching iterator.

case u: ScalaUdf => {

val udfIndex: Int = expressions.indexOf(u)

val preUdfExpressions = expressions.slice(0, udfIndex)

val postUdfExpressions = expressions.slice(udfIndex + 1, expressions.size)

CachingIteratorGenerator(udf.children, udf, preUdfExpressions, postUdfExpressions, inputSchema)

}

}

}

}

object CachingIteratorGenerator {

/**

* This function takes an input iterator and returns an iterator that does in-memory memoization

* as it evaluates the projection operator over each input row. The result is the concatenation of

* the projection of the preUdfExpressions, the evaluation of the udf, and the projection of the

* postUdfExpressions, in that order.

*

* The UDF should only be evaluated if the inputs to the UDF have never been seen before.

*

* This method only needs to worry about caching for the UDF that is specifically passed in. If

* there are any other UDFs in the expression lists, then they can and should be evaluated

* without any caching.

*

* @param cacheKeys the keys on which we will cache -- the inputs to the UDF

* @param udf the udf we are caching for

* @param preUdfExpressions the expressions that come before the UDF in the projection

* @param postUdfExpressions the expressions that come after the UDF in the projection

* @param inputSchema the schema of the rows -- useful for creating projections

* @return

*/

//CachingIteratorGenerator(studentAttributes, udf, Seq(studentAttributes(1)), Seq(), studentAttributes)

//Student(sid: Int, gpa: Float)

def apply(

cacheKeys: Seq[Expression],

udf: ScalaUdf,

preUdfExpressions: Seq[Expression],

postUdfExpressions: Seq[Expression],

inputSchema: Seq[Attribute]): (Iterator[Row] => Iterator[Row]) = {

{ input =>

new Iterator[Row] {

val udfProject = CS186Utils.getNewProjection(Seq(udf), inputSchema)

val cacheKeyProjection = CS186Utils.getNewProjection(udf.children, inputSchema)

val preUdfProjection = CS186Utils.getNewProjection(preUdfExpressions, inputSchema)

val postUdfProjection = CS186Utils.getNewProjection(postUdfExpressions, inputSchema)

val cache: JavaHashMap[Row, Row] = new JavaHashMap[Row, Row]()

def hasNext() = {

// IMPLEMENT ME

val hasNext = input.hasNext

if(!hasNext){

cache.clear()

}

hasNext

// false

}

def next() = {

// IMPLEMENT ME

val row = input.next()

// print("(b "+row+",")

val computedKey:Row = cacheKeyProjection(row)

var computedValues: Row = cache.get(computedKey)

/* if(computedValues!=null){

print("effect key"+computedKey +" val "+computedValues)

}*/

if(computedValues == null){

val values: JavaArrayList[Any] = new JavaArrayList()

preUdfProjection(row).iterator.foreach { (i: Any) => {

values.add(i)

}

}

udfProject(row).iterator.foreach { (i: Any) => {

values.add(i)

}

}

postUdfProjection(row).iterator.foreach { (i: Any) => {

values.add(i)

}

}

computedValues = Row.fromSeq(values.toArray)

cache.put(computedKey,computedValues)

}

//  print(" a "+computedValues+")")

//  print(cache.size())

computedValues

}

}

}

}

}

Task 4: Implementing PartitionProject

basicOperators.scala

/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements.  See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License.  You may obtain a copy of the License at

*

*    http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

package org.apache.spark.sql.execution

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}

import org.apache.spark.annotation.DeveloperApi

import org.apache.spark.rdd.{RDD, ShuffledRDD}

import org.apache.spark.shuffle.sort.SortShuffleManager

import org.apache.spark.sql.catalyst.ScalaReflection

import org.apache.spark.sql.catalyst.errors._

import org.apache.spark.sql.catalyst.expressions._

import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}

import org.apache.spark.util.MutablePair

import org.apache.spark.util.collection.ExternalSorter

/**

* :: DeveloperApi ::

*/

@DeveloperApi

case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {

override def output = projectList.map(_.toAttribute)

@transient lazy val buildProjection = newMutableProjection(projectList, child.output)

def execute() = child.execute().mapPartitions { iter =>

val resuableProjection = buildProjection()

iter.map(resuableProjection)

}

}

/**

* A projection operator that is tailored to improve performance of UDF execution using

* in-memory memoization.

*

* NOTE: This assumes that we are only caching for a single UDF. If there are multiple

* UDFs, it will only cache for the last UDF. All other UDFs will be executed regularly.

*

* Once you have completed implementing the functions in [[CS186Utils]], this operator

* should work.

*/

@DeveloperApi

case class CacheProject(projectList: Seq[Expression], child: SparkPlan) extends UnaryNode {

override def output = child.output

def execute() = {

/* Generate the caching iterator. You should trace this code to understand it!

You have to implement parts of the stack to make this work. */

val generator: (Iterator[Row] => Iterator[Row]) = CS186Utils.generateCachingIterator(projectList, child.output)

/* This is Spark magic. In short, it applies the generator function to each of the slices of an RDD.

For the purposes of CS 186, we will only ever have one slice. */

child.execute().mapPartitions(generator)

}

}

/**

* A projection operator that is tailor to improve performance of UDF execution by using

* external hashing.

*

* @param projectList

* @param child

*/

@DeveloperApi

case class PartitionProject(projectList: Seq[Expression], child: SparkPlan) extends UnaryNode {

override def output = child.output

def execute() = {

child.execute().mapPartitions(generateIterator)

}

/**

* This method takes an iterator as an input. It should first partition the whole input to disk.

* It should then read each partition from disk and construct do in-memory memoization over each

* partition to avoid recomputation of UDFs.

*

* @param input the input iterator

* @return the result of applying the projection

*/

def generateIterator(input: Iterator[Row]): Iterator[Row] = {

// This is the key generator for the course-grained external hashing.

val keyGenerator = CS186Utils.getNewProjection(projectList, child.output)

// IMPLEMENT ME

val hashedRelation: DiskHashedRelation = DiskHashedRelation(input, keyGenerator,4,64000)

val partitions: Iterator[DiskPartition] = hashedRelation.getIterator()

var diskPartition:DiskPartition = null

var cachingIterator: Iterator[Row] =null

new Iterator[Row] {

def hasNext() = {

var hasNext = false

if(cachingIterator != null && cachingIterator.hasNext){

hasNext = true

}else{

hasNext = fetchNextPartition

}

/* else if(cachingIterator != null&& !cachingIterator.hasNext){

hasNext = fetchNextPartition

}

else if(cachingIterator == null){

hasNext = fetchNextPartition

}*/

// IMPLEMENT ME

hasNext

}

def next() = {

// IMPLEMENT ME

cachingIterator.next()

}

/**

* This fetches the next partition over which we will iterate or returns false if there are no more partitions

* over which we can iterate.

*

* @return

*/

private def fetchNextPartition(): Boolean  = {

// IMPLEMENT ME

var hasNext = partitions.hasNext

if(hasNext){

diskPartition = partitions.next()

val data:Iterator[Row]=diskPartition.getData()

if(data.hasNext){

cachingIterator = CS186Utils.generateCachingIterator(projectList, child.output)(data)

hasNext = true

}else{

hasNext = false

}

}

//println(hasNext)

hasNext

}

}

}

}

/**

* :: DeveloperApi ::

*/

@DeveloperApi

case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {

override def output = child.output

@transient lazy val conditionEvaluator = newPredicate(condition, child.output)

def execute() = child.execute().mapPartitions { iter =>

iter.filter(conditionEvaluator)

}

}

/**

* :: DeveloperApi ::

*/

@DeveloperApi

case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)

extends UnaryNode

{

override def output = child.output

// TODO: How to pick seed?

override def execute() = child.execute().sample(withReplacement, fraction, seed)

}

/**

* :: DeveloperApi ::

*/

@DeveloperApi

case class Union(children: Seq[SparkPlan]) extends SparkPlan {

// TODO: attributes output by union should be distinct for nullability purposes

override def output = children.head.output

override def execute() = sparkContext.union(children.map(_.execute()))

}

/**

* :: DeveloperApi ::

* Take the first limit elements. Note that the implementation is different depending on whether

* this is a terminal operator or not. If it is terminal and is invoked using executeCollect,

* this operator uses something similar to Spark‘s take method on the Spark driver. If it is not

* terminal or is invoked using execute, we first take the limit on each partition, and then

* repartition all the data to a single partition to compute the global limit.

*/

@DeveloperApi

case class Limit(limit: Int, child: SparkPlan)

extends UnaryNode {

// TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:

// partition local limit -> exchange into one partition -> partition local limit again

/** We must copy rows when sort based shuffle is on */

private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]

override def output = child.output

override def outputPartitioning = SinglePartition

/**

* A custom implementation modeled after the take function on RDDs but which never runs any job

* locally.  This is to avoid shipping an entire partition of data in order to retrieve only a few

* rows.

*/

override def executeCollect(): Array[Row] = {

if (limit == 0) {

return new Array[Row](0)

}

val childRDD = child.execute().map(_.copy())

val buf = new ArrayBuffer[Row]

val totalParts = childRDD.partitions.length

var partsScanned = 0

while (buf.size < limit && partsScanned < totalParts) {

// The number of partitions to try in this iteration. It is ok for this number to be

// greater than totalParts because we actually cap it at totalParts in runJob.

var numPartsToTry = 1

if (partsScanned > 0) {

// If we didn‘t find any rows after the first iteration, just try all partitions next.

// Otherwise, interpolate the number of partitions we need to try, but overestimate it

// by 50%.

if (buf.size == 0) {

numPartsToTry = totalParts - 1

} else {

numPartsToTry = (1.5 * limit * partsScanned / buf.size).toInt

}

}

numPartsToTry = math.max(0, numPartsToTry)  // guard against negative num of partitions

val left = limit - buf.size

val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

val sc = sqlContext.sparkContext

val res =

sc.runJob(childRDD, (it: Iterator[Row]) => it.take(left).toArray, p, allowLocal = false)

res.foreach(buf ++= _.take(limit - buf.size))

partsScanned += numPartsToTry

}

buf.toArray.map(ScalaReflection.convertRowToScala(_, this.schema))

}

override def execute() = {

val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {

child.execute().mapPartitions { iter =>

iter.take(limit).map(row => (false, row.copy()))

}

} else {

child.execute().mapPartitions { iter =>

val mutablePair = new MutablePair[Boolean, Row]()

iter.take(limit).map(row => mutablePair.update(false, row))

}

}

val part = new HashPartitioner(1)

val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)

shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))

shuffled.mapPartitions(_.take(limit).map(_._2))

}

}

/**

* :: DeveloperApi ::

* Take the first limit elements as defined by the sortOrder. This is logically equivalent to

* having a [[Limit]] operator after a [[Sort]] operator. This could have been named TopK, but

* Spark‘s top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion.

*/

@DeveloperApi

case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {

override def output = child.output

override def outputPartitioning = SinglePartition

val ord = new RowOrdering(sortOrder, child.output)

// TODO: Is this copying for no reason?

override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ord)

.map(ScalaReflection.convertRowToScala(_, this.schema))

// TODO: Terminal split should be implemented differently from non-terminal split.

// TODO: Pick num splits based on |limit|.

override def execute() = sparkContext.makeRDD(executeCollect(), 1)

}

/**

* :: DeveloperApi ::

* Performs a sort on-heap.

* @param global when true performs a global sort of all partitions by shuffling the data first

*               if necessary.

*/

@DeveloperApi

case class Sort(

sortOrder: Seq[SortOrder],

global: Boolean,

child: SparkPlan)

extends UnaryNode {

override def requiredChildDistribution =

if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute() = attachTree(this, "sort") {

child.execute().mapPartitions( { iterator =>

val ordering = newOrdering(sortOrder, child.output)

iterator.map(_.copy()).toArray.sorted(ordering).iterator

}, preservesPartitioning = true)

}

override def output = child.output

}

/**

* :: DeveloperApi ::

* Performs a sort, spilling to disk as needed.

* @param global when true performs a global sort of all partitions by shuffling the data first

*               if necessary.

*/

@DeveloperApi

case class ExternalSort(

sortOrder: Seq[SortOrder],

global: Boolean,

child: SparkPlan)

extends UnaryNode {

override def requiredChildDistribution =

if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute() = attachTree(this, "sort") {

child.execute().mapPartitions( { iterator =>

val ordering = newOrdering(sortOrder, child.output)

val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))

sorter.insertAll(iterator.map(r => (r, null)))

sorter.iterator.map(_._1)

}, preservesPartitioning = true)

}

override def output = child.output

}

/**

* :: DeveloperApi ::

* Computes the set of distinct input rows using a HashSet.

* @param partial when true the distinct operation is performed partially, per partition, without

*                shuffling the data.

* @param child the input query plan.

*/

@DeveloperApi

case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {

override def output = child.output

override def requiredChildDistribution =

if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil

override def execute() = {

child.execute().mapPartitions { iter =>

val hashSet = new scala.collection.mutable.HashSet[Row]()

var currentRow: Row = null

while (iter.hasNext) {

currentRow = iter.next()

if (!hashSet.contains(currentRow)) {

hashSet.add(currentRow.copy())

}

}

hashSet.iterator

}

}

}

/**

* :: DeveloperApi ::

* Returns a table with the elements from left that are not in right using

* the built-in spark subtract function.

*/

@DeveloperApi

case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {

override def output = left.output

override def execute() = {

left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))

}

}

/**

* :: DeveloperApi ::

* Returns the rows in left that also appear in right using the built in spark

* intersection function.

*/

@DeveloperApi

case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {

override def output = children.head.output

override def execute() = {

left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))

}

}

/**

* :: DeveloperApi ::

* A plan node that does nothing but lie about the output of its child.  Used to spice a

* (hopefully structurally equivalent) tree from a different optimization sequence into an already

* resolved tree.

*/

@DeveloperApi

case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {

def children = child :: Nil

def execute() = child.execute()

}


时间: 2024-08-06 12:21:40

Homework 2: UDF Caching in Spark的相关文章

spark编写UDF和UDAF

UDF: 一.编写udf类,在其中定义udf函数 package spark._sql.UDF import org.apache.spark.sql.functions._ /** * AUTHOR Guozy * DATE 2019/7/18-9:41 **/ object udfs { def len(str: String): Int = str.length def ageThan(age: Int, small: Int): Boolean = age > small val age

Spark高级

Spark java.lang.OutOfMemoryError: Java heap space My cluster: 1 master, 11 slaves, each node has 6 GB memory. My settings:spark.executor.memory=4g, Dspark.akka.frameSize=512Here is the problem: First, I read some data (2.19 GB) from HDFS to RDD:val i

【Spark SQL 源码分析系列文章】

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize

Spark SQL JSON数据处理

背景 这一篇可以说是“Hive JSON数据处理的一点探索”的兄弟篇. 平台为了加速即席查询的分析效率,在我们的Hadoop集群上安装部署了Spark Server,并且与我们的Hive数据仓库共享元数据.也就是说,我们的用户即可以通过HiveServer2使用Hive SQL执行MapReduce分析数据,也可以使用SparkServer使用Spark SQL(Hive SQL)执行Spark Application分析数据. 两者除去MapReduce和Spark Application计算

【Spark1.3官方翻译】Spark快速入门

英文标题:Quick Start 英文原址:http://spark.apache.org/docs/latest/quick-start.html Spark Version:1.3.0 1,       使用Spark-Shell进行交互式分析 1.1   基本使用 Spark-shell为学习API提供了简单的方式,它是一个非常强大的进行交互式数据分析的工具,在Scala或Python中都有提供.在Spark目录中运行下面的命令: ./bin/spark-shell Spark的主要抽象是

知识点-Spark小节

Spark处理字符串日期的max和min的方式Spark处理数据存储到Hive的方式Spark处理新增列的方式map和udf.functionsSpark处理行转列pivot的使用Python 3.5.3Spark1.6.2 欢迎访问个人主页和博客 Spark处理字符串日期的max和min的方式 一般是字符串类型的日期在使用Spark的agg求max时,是不正确的,API显示只支持数值型的max.minhive的SQL查询引擎是支持字符串日期的max和min的 字符串日期转为时间戳再聚合 uni

如何构建阿里小蜜算法模型的迭代闭环?

导读:伴随着AI的兴起,越来越多的智能产品诞生,算法链路也会变得越来越复杂,在工程实践中面临着大量算法模型的从0到1快速构建和不断迭代优化的问题,本文将介绍如何打通数据分析-样本标注-模型训练-监控回流的闭环,为复杂算法系统提供强有力的支持. 新技术/实用技术点: 实时.离线场景下数据加工的方案选型 高维数据的可视化交互 面对不同算法,不同部署场景如何对流程进行抽象 01. 背景 技术背景及业务需求 小蜜系列产品是阿里巴巴为消费者和商家提供的智能服务解决方案,分别在用户助理.电商客服.导购等方面

Spark SQL UDF

目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst . 在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找 EmptyFunctionRegistry 中lookup 只是抛出一个异常. 所以自定义了一个 FunctionRegistry ,SqlContext @transient protected[sql]lazyva

Spark(Hive) SQL中UDF的使用(Python)

相对于使用MapReduce或者Spark Application的方式进行数据分析,使用Hive SQL或Spark SQL能为我们省去不少的代码工作量,而Hive SQL或Spark SQL本身内置的各类UDF也为我们的数据处理提供了不少便利的工具,当这些内置的UDF不能满足于我们的需要时,Hive SQL或Spark SQL还为我们提供了自定义UDF的相关接口,方便我们根据自己的需求进行扩展. 在Hive的世界里使用自定义UDF的过程是比较复杂的.我们需要根据需求使用Java语言开发相应的