Spark
重要的概念:resilient distributed dataset (RDD), a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel
two types of shared variables: broadcast variables & accumulators
A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
Spark Streaming
When data is streamed into Spark, there are two common use cases covered:
Windowed Calculations means that you only care about data received in the last N amount of time. When monitoring your web servers, perhaps you only care about what has happened in the last hour.
Spark Streaming conveniently splits the input data into the desired time windows for easy processing, using the window function of the streaming library.
The forEachRDD function allows you to access the RDD’s created each time interval.
Cumulative Calculations means that you want to keep cumulative statistics, while streaming in new data to refresh those statistics. In that case, you need to maintain the state for those statistics.
The Spark Streaming library has some convenient functions for maintaining state to support this use case, updateStateByKey.
Reusing code from Batching covers how to organize business logic code from the batch examples so that code can be reused in Spark Streaming.
The Spark Streaming library has transform functions which allow you to apply arbitrary RDD-to-RDD functions, and thus to reuse code from the batch mode of Spark.
rsync Linux同步工具
snippets
Spark 应用示例
前提条件
- 安装JDK
- 安装spark软件
- 安装JDK
注意:不要安装到默认路径“c:\Program Files”文件夹的名字包含空格会导致一些问题。
- 安装spark软件
从Spark网站下载pre-build版本,文件名类似spark-1.2.0-bin-hadoop2.4.tgz
解压
验证安装的正确性
c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\spark-shell
可以键入如下命令检查Spark Shell是否工作正常。
sc.version
或
sc.appName
完成后退出
:quit
- Spark Word Count示例
首先让我们用Spark API运行流行的Word Count示例。如果还没有运行Spark Scala Shell,首先打开一个Scala Shell窗口。这个示例的相关命令如下所示:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val txtFile = "README.md"
val txtData = sc.textFile(txtFile)
txtData.cache()
我们可以调用cache函数将上一步生成的RDD对象保存到缓存中,在此之后Spark就不需要在每次数据查询时都重新计算。需要注意的是,cache()是一个延迟操作。在我们调用cache时,Spark并不会马上将数据存储到内存中。只有当在某个RDD上调用一个行动时,才会真正执行这个操作。
现在,我们可以调用count函数,看一下在文本文件中有多少行数据。
txtData.count()
然后,我们可以执行如下命令进行字数统计。在文本文件中统计数据会显示在每个单词的后面。
val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wcData.collect().foreach(println)
流数据分析
流数据基本上是一组连续的数据记录,它们通常产生于诸如传感器、服务器流量与在线搜索等数据源。常见的流数据的例子有网站上的用户行为、监控数据、服务器日志与其他事件数据。
流数据处理应用会有助于现场面板、实时在线推荐与即时诈骗检测。
如果我们正在构建一个实时收集、处理与分析流数据的应用,我们需要按照与批处理数据应用不同的设计视角进行考虑。
下面列出了三种不同的流数据处理框架:
Apache Samza
Storm
Spark流
弹性分布式数据集(Resilient Distributed Datasets,RDDs)
Dstream(离散流,Discretized Stream,的缩写)
Spark流工作的方式是将数据流按照预先定义的间隔(N秒)划分为批(称微批次)然后将每批数据视为一个弹性分布式数据集(Resilient Distributed Datasets,RDDs)。随后我们就可以使用诸如map、reduce、reduceByKey、join和window这样的操作来处理这些RDDs。这些RDD操作的结果会以批的形式返回。通常我们会将这些结果保存到数据存储中以供未来分析并生成报表与面板,或是发送基于事件的预警。
为Spark流决定时间间隔是很重要的,这需要基于你的用例与数据处理要求。如果值N太低,那么在分析阶段微批次就没有足够的数据以给出有意义的结果。
与Spark流相比,其他流处理框架是基于每个事件而非一个微批次来处理数据流的。用微批次的方法,我们可以在同一应用下使用Spark流API来应用其他Spark库(比如核心、机器学习等)。
流数据可以来源于许多不同的数据源。下面列出一些这样的数据源:
Kafka
Flume
ZeroMQ
Amazon’s Kinesis
TCP sockets
若要编写Spark流程序,我们需要知晓两个组件:DStream与流上下文。
DStream
Dstream(离散流,Discretized Stream,的缩写)是Spark流中最基本的抽象,它描述了一个持续的数据流。DStream既可以从诸如Kafka、Flume与Kinesis这样的数据源中创建,也可以对其他DStream实施操作。在内部,一个DStream被描述为一个RDD对象的序列。
与RDDs上的转换与动作操作类似,DStream支持以下操作:
map
flatMap
filter
count
reduce
countByValue
reduceByKey
join
updateStateByKey
流上下文
与Spark中的Spark上下文(SparkContext)相似,流上下文(StreamingContext)是所有流功能的主入口。
流上下文拥有内置方法可以将流数据接收到Spark流程序中。
使用该上下文,我们可以创建一个描述基于TCP数据源的流数据的DStream,可以用主机名与端口号指定TCP数据源。比如,如果我们使用像netcat这样的工具来测试Spark流程序的话,我们将会从运行netcat的机器(比如localhost)的9999端口上接收到数据流。
当代码被执行,在启动时,Spark流仅是设置将要执行的计算,此时还没有进行实时处理。在所有的转换都被设置完毕后,为了启动处理,我们最终会调用start()方法来启动计算,还有awaitTermination()方法来等待计算终结。
Spark编程的步骤
在我们讨论样例应用之前,先来看看Spark流编程中与众不同的步骤:
Spark流上下文被用于处理实时数据流。因此,第一步就是用两个参数初始化流上下文对象,Spark上下文和切片间隔时间。切片间隔设置了流中我们处理输入数据的更新窗口。一旦上下文被初始化,就无法再向已经存在的上下文中定义或添加新的计算。并且,在同一时间只有一个流上下文对象可以被激活。
当Spark流上下文被定义后,我们通过创建输入DStreams来指定输入数据源。在我们的样例应用中,输入数据源是一个使用了Apache Kafka分布式数据库和消息系统的日志消息生成器。日志生成器程序创建随机日志消息以模拟网络服务器的运行时环境,作为各种网络应用服务用户而产生的流量,日志消息被持续不断地生成。
使用map和reduce这样的Spark流变换API为DStreams定义计算。
当流计算逻辑被定义好后,我们可以使用先前创建的流上下文对象中的start方法来开始接收并处理数据。
最终,我们使用流上下文对象的awaitTermination方法等待流数据处理完毕并停止它。
Anaconda Python发行版本
#
参考
用Apache Spark进行大数据处理——第一部分:入门介绍
用Apache Spark进行大数据处理——第一部分:Spark SQL
用Apache Spark进行大数据处理——第三部分:Spark流
Apache ZooKeeper
map 与 flatMap的区别
- Spark 中 map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;
- 而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:
- 操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象
- 操作2:最后将所有对象合并为一个对象
Master URLs
Spark Configuration
优先级
Any values specified as flags or in the properties file will be passed on to the application and merged with those specified through SparkConf. Properties set directly on the SparkConf take highest precedence, then flags passed to spark-submit or spark-shell, then options in the spark-defaults.conf file. A few configuration keys have been renamed since earlier versions of Spark; in such cases, the older key names are still accepted, but take lower precedence than any instance of the newer key.
Properties set directly on the SparkConf > flags passed to spark-submit or spark-shell > options in the spark-defaults.conf file
Launching Applications with spark-submit
./bin/spark-submit --class <main-class>
--master <master-url> --deploy-mode <deploy-mode> --conf <key>=<value> ... # other options
<application-jar> [application-arguments]
–class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
–master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
–deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) ?
–conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
application-arguments: Arguments passed to the main method of your main class, if any
# Run application locally on 8 cores
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master local[8] /path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 --executor-memory 20G --total-executor-cores 100 /path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 --deploy-mode cluster
--supervise
--executor-memory 20G --total-executor-cores 100 /path/to/examples.jar \
1000
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn --deploy-mode cluster \ # can be client for client mode
--executor-memory 20G --num-executors 50 /path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit --master spark://207.184.161.138:7077 examples/src/main/python/pi.py 1000
# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 --deploy-mode cluster
--supervise
--executor-memory 20G --total-executor-cores 100 http://path/to/examples.jar 1000
RDD
tutorialpoints: Apache Spark - RDD
Building Spark
API: persist vs cache
1)RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;
2)可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;
3)cache或者persist并不是action;
transformation vs action
- transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD
- action是得到一个值,或者一个结果(直接将RDD cache到内存中)
所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)
and pair RDD functions doc (Scala, Java) for details.
Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala) Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala) Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
Spark Regression
Collaborative Filtering
GraphX
At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge.
Scala Pattern Matching
spark小技巧-mapPartitions
与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
Broadcast Variables
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.
Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
API
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
该函数用于将RDD进行重分区,使用HashPartitioner。
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.