Spark简介

详细内容参照Spark官网:http://spark.apache.org/

Spark相关项目:

Spark SQL 、Spark Streaming 、Machine Learning 、GraphX

1、Spark SQL :用Spark编写的混合SQL查询,能在分布式数据集中查询结构化数据,使得复杂分析算法的查询更容易。

2、Spark Streaming :Spark Streaming 容易去建立一个可扩展的容错式流媒体应用,使得流处理应用与批处理工作一样。

3、Machine Learning:Machine Learning Lib是Apache的可扩展机器学习库,实现了一些机器学习算法,算法运行效率高,其实现的算法有:

基础:数据类型、概要统计

分类与回归:线性支持向量机(SVM)、逻辑回归、线性最小二乘法、套索和岭回归、决策树、朴素贝叶斯

协同过滤:交替最小二乘法(ALS)

聚类:K-means

最优化:随即梯度下降法、L-BFGS

4、GraphX:GraphX是对图标并行计算。

Spark官网例子:

1、Text Search:从日志文件中搜索错误

2、Word Count:统计单词

3、Estimateing PI:估计PI值

4、Logical Regression:寻找一个超平面,在多维空间中区分点集

Spark两种工作模式:

1、Spark Shell:运用scala命令交互式分析处理

2、在scala中独立工作:编写完scala程序后用工具sbt将其打成jar包,再运行。

Spark的弹性分布式数据集RDD:

1、并行集合:接收一个已经存在的scala集合,然后进行各种并行计算;

2、Hadoop数据集:从Hadoop的文件存储系统HDFS中获取数据

Spark对于数据集RDD的两种操作:转换和动作:

1、transformations:从现有数据集上创建一个新的数据集,如Map操作就是一种转换,另外还有filter、flatmap、diatinct、join等操作;

2、Action:在数据集上运行,并返回一个值给驱动程序,如reduce操作就是一种动作,另外还有collect()、count()等

Spark的两种共享变量(该变量在不同节点任务上被共享):

1、广播变量:可在内存的所有节点上缓存变量

2、累加器:用于做加法的变量

Spark: Cluster Computing with Working Sets

Spark是一个集群计算工作集,能对大规模数据进行快速处理,和Hadoop一样,对大量数据进行分布并行处理。实际上,spark是对Hadoop的补充,spark在某些工作负载方面更加优越,应用性更广。它可以像操作本地集合对象一样轻松地操作分布式数据集,也可以在Hadoop的文件系统HDFS中并行运行。Spark能更好的解决Hadoop中不能实现的迭代计算和交互式计算问题。Spark和Hadoop最大的不同是Spark可以重用存储在cache中的数据,极大提高运行效率,而Hadoop每次迭代都需要重新从硬盘读取数据。Spark在迭代机器学习的算法时速度是Hadoop的10倍以上。

一、Spark简介

Mapreduce编程模型实现他们的可扩展性和容错性是通过提供一个用户创建的编程模型,该编程模型操作的是无环数据流图。在大型应用程序中,无环数据流工作模式效率不高。Spark可以通过多路并行操作重用工作集中的数据。

在下列两个用例中可以看出spark比mapreduce更有效:

1、迭代工作

很多机器学习算法需要在同一数据集上进行多次运算来寻找最优参数,mapreduce每次运行都要从硬盘重新加载这些数据,而spark可以重用这些数据。

2、迭代分析

在数据库查询时需要加载有用的数据,spark是把数据缓存到内存,而mapreduce运行程序是从硬盘上加载数据,导致很大的时间延迟。

Spark的一个主要抽象概念是RDD(弹性数据集),RDD是一个元素集合,即spark操作的特殊格式的数据集。RDD划分到集群的各个节点上,用户可以明确的缓存RDD到内存中,并可以重用这些数据集进行并行操作。RDD通过一个称为lineage的概念来实现数据集容错:如果RDD中部分数据丢失,可以在其他RDD中重建这部分数据。

目前有两种类型的RDD:

1、并行集合(Parallelized Collections):通过调用SparkContext的parallelize方法,在一个已经存在的scala集合上创建的集合,scala集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集,例:val distData=sc.parallelize(data);

2、Hadoop数据集(Hadoop Datasets): Spark从HDFS上创建的分布式数据集,通过调用SparkContext的textFile方法。例:val distFile=sc.textFile(“hdfs://10.0.2.15:8080/user/input”);

另外还有两种方式来创建RDD:

3、从已存在的RDD中转换而来,通过flatMap操作,把类型A数据转换成类型B数据;

4、RDD的持久化:Spark最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(Actions)变得更加迅速(通常快10倍)。缓存是用Spark构建迭代算法的关键。

二、Spark的编程模型

应用spark时,开发人员编写一个驱动程序来完成高级应用程序的控制流和发布各种并行操作,实际上就是对分布式数据集进行操作。对于RDD的操作,spark提供了两个抽象概念:转换(transformation)和动作(action),另外spark有两种共享变量:广播变量和累加器。

2.1 转换transformation:

对输入的数据集进行的格式上的转换,如表为一系列transformation操作:


转换


含义


map(func)


返回一个新分布式数据集,由每一个输入元素经过func函数转换后组成


filter(func)


返回一个新数据集,由经过func函数计算后返回值为true的输入元素组成


flatMap(func)


类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素)


mapPartitions(func)


类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]


mapPartitionsWithSplit(func)


类似于mapPartitions, 但func带有一个整数参数表示分块的索引值。因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]


sample(withReplacement,fraction, seed)


根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子


union(otherDataset)


返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成


distinct([numTasks]))


返回一个包含源数据集中所有不重复元素的新数据集


groupByKey([numTasks])


在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集

注意:默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的numTasks参数来改变它


reduceByKey(func, [numTasks])


在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一起。类似groupByKey,reduce任务个数是可以通过第二个可选参数来配置的


sortByKey([ascending], [numTasks])


在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定


join(otherDataset, [numTasks])


在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集


cogroup(otherDataset, [numTasks])


在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操作也可以称之为groupwith


cartesian(otherDataset)


笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)

2.2动作action:

在转换完的数据集上进行的各种操作,返回结果给驱动程序,其操作有:reduce()、collect()、foreach()等。

如表为一系列action操作:


动作


含义


reduce(func)


通过函数func(接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。


collect()


在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。


count()


返回数据集的元素的个数。


first()


返回数据集的第一个元素(类似于take(1))


take(n)


返回一个由数据集的前n个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素


takeSample(withReplacement,num, seed)


返回一个数组,在数据集中随机采样num个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子


saveAsTextFile(path)


将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本行


saveAsSequenceFile(path)


将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的可以转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等)


countByKey()


对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数


foreach(func)


在数据集的每一个元素上,运行函数func进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase

2.3 Spark创建两种共享变量

该变量在不同节点任务上被共享使用:

1、广播变量:允许程序员保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份拷贝。Spark会尝试使用一种高效的广播算法来传播广播变量,从而减少通信的代价。

2、累加器:是一种只能通过关联操作进行“加”操作的变量,因此可以高效被并行支持。它们可以用来实现计数器和求和器。

三、spark分布式计算模型

Spark通过Mesos进行分布式计算,spark将RDDs和并行操作函数进行一次转换,变成标准的job和一系列task,提交给SparkScheduler,SparkScheduler将task提交给MesoMaster,由master分配给不同的workers.最终由worker中的SparkExecutor将分配到的任务一一执行,并返回结果或直接写入到分布式文件系统。

四、例子

1、text search:从存储在hdfs中的日志文件中找出里面的错误信息

val file = spark.textFile("hdfs://...")//创建一个分布式数据集
val errs = file.filter(_.contains("ERROR"))//RDDs转换的过程,找出日志中包含错误的行
Val cacheErrs=errs.cache();//RDD持久化
val ones = errs.map(_ => 1)//转换的过程,将这些行映射成1
val count = ones.reduce(_+_)//动作的过程,将这些行累加起来,worker节点扫描这些1,并将结果发给驱动程序。

2、Logistic Regression(逻辑回归):通过迭代一个分类算法来寻找一个超平面w将两类点分开

val points = spark.textFile(...).map(parsePoint).cache()// 从文件中读取数据,并缓存在内存中,有利于提高运行效率
var w = Vector.random(D)//给w一个随机值
for (i <- 1 to ITERATIONS) {//迭代,寻找最优w
val grad = spark.accumulator(new Vector(D))//使用累加器,对各节点上的梯度累加
for (p <- points) { //牛顿迭代法进行迭代计算
val s = (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
grad += s * p.x}
w -= grad.value}

完整代码:见后面 六、代码

3、Spark实现完整的WordCount程序

import spark.SparkContextimport SparkContext._//导入spark开发包
object SparkTest {  def main( args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: SparkTest <host> [<slices>]")
System.exit(1)
}
 val spark = new SparkContext(args(0), "SparkTest")//创建SparkContext对象,告诉spark如何访问spark集群
val slices = if (args.length > 1) args(1).toInt else 2//定义数据集分为几块
val myFile = spark.textFile("test.txt")//创建一个分布式数据集
 val counts = myFile.flatMap(line => line.split(" ")//文件中每一行单词以空格分隔,转换的过程
	.map(word => (word, 1))//数据集转换成(word,1)键值对,也是数据集转换的过程
	.reduceByKey(_ + _)//统计单词个数,动作的过程
	counts.saveAsTextFile("out.txt")//输出文件
}
}
 SparkTest.main(args)//设置main函数参数

五、spark未来工作

1、标准化RDDs的属性和特征,及Spark的其他抽象概念,使其在其他应用和工作中有更好的适应性;

2、减少程序员在存储和重构RDDs方面的花销;

3、定义新的关于RDDs转换方面的操作,如shuffle操作,可以根据给定的键值重新分配RDD;

4、对于spark解释器提供更高水平的交互接口,如跟SQL的交互接口。

六、代码

1、用Scala语言实现Logistic Regression Classifier的完整代码:

import java.util.Random
import java.lang.Math
import scala.collection.mutable.HashMap  import scala.io.Source
import org.apache.spark.SparkContext //创建SparkContext对象,告诉spark如何访问spark集群
import org.apache.spark.rdd.RDD;  //RDDs
import org.apache.spark.util.Vector  //spark容器
import org.apache.spark.broadcast.Broadcast  //广播变量
import spark.ml.utils.SparserVector
object SparseLR {  //定义一个类:LR分类器
val labelNum = 2; // 类别数
val dimNum = 124; // 维度
val iteration = 10; // 迭代次数
val alpha = 0.1 // 迭代步长
val lambda = 0.1
val rand = new Random(42)  //取一随机数
var w = Vector(dimNum, _ => rand.nextDouble) //用随机数初始化参数 ,w为一124维的容器变量
//定义一个数据点
case class DataPoint(x: SparserVector, y: Int) //整形y,SparserVector类型的x(124维)
// 解析一个读入的训练样本,构造DataPoint结构
def parsePoint(line: String): DataPoint = {  //line为传入的训练样本数据
var features = new SparserVector(dimNum)
val fields = line.split(" ") //transformation过程,将原RDDs数据集转换为以空格分割后的数据集
val y = fields(0).toInt
fields.filter(_.contains(":")).foreach(f => {
val feature = f.split(":")
features.insert(feature(0).toInt, feature(1).toDouble)
})
DataPoint(features, y)
}   

//读样本文件,构造训练数据
def genearteDataPoints(filename: String): Array[DataPoint] = {
val dataPoints = Source.fromFile(filename).getLines.map(parsePoint).toArray
dataPoints
}   

//定义sigmod函数
def sigmod(x: SparserVector): Double = {
val features = x.elements
val s = features.keySet.map(k => w(k) * features.get(k).get).reduce(_ + _)//权值和训练数据乘积之和
1 / (1 + Math.exp(-s))  //逻辑函数
}     

// train函数,根据样本训练参数
def train(sc: SparkContext, dataPoints: RDD[DataPoint]) {
val sampleNum = dataPoints.count        //开始迭代
for (i <- 0 until iteration) {
val g = dataPoints.map(p => p.x * (sigmod(p.x) - p.y)).reduce(_ + _) + lambda * w
w -= alpha * g  //牛顿-拉斐森(Newton-Raphson)方法进行迭代求解。
println("iteration " + i + ": g = " + g)
println("iteration " + i + ": w = " + w)
}
}    

//根据训练出的参数进行预测
def predict(dataPoints: RDD[DataPoint]): Double = {
val correct = dataPoints.map(p => {
val label = if (sigmod(p.x) > 0.5) 1 else 0
if (label == p.y) 1 else 0
}).reduce(_ + _)
(correct * 1.0) / dataPoints.count
  }    

  //main函数
  def main(args: Array[String]): Unit = {
  val trainfile = "data/a8a.train";
  //val sc = new SparkContext(args(0), "LR")
  val sc = new SparkContext("local", "LR")
  val trainset = sc.textFile(trainfile, 2).map(parsePoint).cache
  train(sc, trainset)  //训练样本,得到最优参数
  val testfile = "data/a8a.test";
  val testset = sc.textFile(testfile, 2).map(parsePoint).cache
  val accuracy = predict(testset)  //测试数据,得到分类结果
  println(accuracy)
  }
  }  

2、Spark的LR分类器

package spark.examples
Import scala.io.Source
import java.util.Random
import scala.math.exp
import spark.util.Vector
import spark._
*******************************************************************
object SparkLR {
  val N = 10000  // Number of data points
  val D = 10   // Numer of dimensions
  val R = 0.7  // Scaling factor
  val ITERATIONS = 5
  val rand = new Random(42)
  case class DataPoint(x: Vector, y: Double)
  case class DataPoint1(x: Vector)
********************************************************************
  def generateData = {     //自己构建了训练数据
    def generatePoint(i: Int) = {
      val y = if(i % 2 == 0) -1 else 1
      val x = Vector(D, _ => rand.nextGaussian + y * R)
      DataPoint(x, y)
    }
    Array.tabulate(N)(generatePoint)
  }
*************************************************************************
//fang该部分是对上面的数据结构改进,方便从文件中读入数据
def generateData = {     //自己构建了训练数据
Val src=Source.fromFile(“/home/jay/file01”)
Val iter=src.getLines()
    def generatePoint(i: Int) = {
      val y = if(i % 2 == 0) -1 else 1
      val x = Vector(D, iter.next().split(“ ”).map(w=>w.toDouble))
      DataPoint(x, y)
    }
Array.tabulate(N)(generatePoint)
  }

def generateData = {     //自己构建测试数据
Val src1=Source.fromFile(“/home/jay/file02”)
Val iter1=src1.getLines()
    def generatePoint1(i: Int) = {
      val x = Vector(D, iter1.next().split(“ ”).map(w=>w.toDouble))
      DataPoint1(x)
    }
Array.tabulate(N)(generatePoint1)
  }
//fang
*******************************************************************************
  def main(args: Array[String]) {
    Val conf = new SparkConf().setAppName(“SparkLR”)
    val sc = new SparkContext(conf)
    val numSlices = if (args.length > 1) args(1).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()
val points1 = sc.parallelize(generateData1, numSlices).cache()
    // Initialize w to a random value
    var w = Vector(D, _ => 2 * rand.nextDouble - 1)
    println("Initial w: " + w)

    for (i <- 1 to ITERATIONS) {//运用牛顿迭代法实现LR分类器
      println("On iteration " + i)
      val gradient = points.map { p =>
        (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x
      }.reduce(_ + _)
      w -= gradient
    }
println("Final w: " + w)
Val y=points1.map{p=>(1/(1+exp(w dot p.x)))}
var Y = Vector(N,  y.take(N)
Println(“result: ”, + Y)
    System.exit(0)
  }
}

训练数据模型:

xxx   xxx  xxx  xxx  xxx  xxx  xxx  xxx  xxx  xxx  yyy

详细逻辑回归分类器参考博客:http://blog.csdn.net/qustqustjay/article/details/46874527

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-04 21:32:51

Spark简介的相关文章

Spark-01 spark简介

前言:大牛说由spark入手比较合适 1.spark简介 spark是个计算框架,不存东西.MapReduce是Hadoop里面做计算的,也不存东西,出现比spark早,自从spark活跃起来后mc的很多程序就被重写为spark程序了.spark的api使用起来也比较简单. spark起源于2009年加州大学伯克利分校的实验室,20年成为开源项目,2014年就变成了apache的顶级项目.这里用spark2.3. 2,spark与MapReduce(mc)的比较 2-1 优缺点比较 yarn.M

三、spark简介

一.简介 spark的官网:http://spark.apache.org/ spark解决了什么问题? 我们都知道hadoop,hadoop以一个非常容易使用的编程模型解决了大数据的两大难题: 1)分布式存储hdfs: 2)分布式计算mapReduce: 但是hadoop也存在着一些问题,最主要的缺陷在于它的延迟比较严重,因为hadoop的mapReduce总是需要进行大量的I/O,即使是中间输出结果也需要通过I/O来保存到HDFS中并再次读取.如果是在大规模迭代的情况下hadoop的效率就更

Spark 简介

一.什么是 Spark Spark 是开源的类 Hadoop MapReduce 的通用的并行计算框架, Spark 基于 map reduce 算法实现的分布式计算,拥有 Hadoop MapReduce 所具有的优点:但不同于 MapReduce 的是 Job 中间输出和结果可以保存在内存中,从而不再需要读写 HDFS ,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 map reduce 的算法.其架构如下图所示: 二.Spark 与 Hadoop 的对比 Spark 的中

Spark调研笔记第1篇 - Spark简介

在公司线上项目中引入Spark已经将近1年时间了,从效果来看,Spark确实是能提高生产力的优秀分布式计算平台. 从本篇笔记开始,会把之前调研Spark时的调研报告分享出来(限于篇幅,会分成几篇文章),以便帮助刚接触Spark的朋友们尽快入门. 下面开始正文. 1. 项目背景 Spark项目于2009年诞生于UC Berkeley AMP Lab并于2010年正式提交Apache Software Foundation成为开源项目.目前已经成为Apache下的明星项目,其代码提交活跃度在整个社区

1.spark简介

spark是一个用于大规模数据处理的统一计算引擎.适用于各种各样原先需要多种不同的分布式平台处理的场景,包括批处理.迭代计算.交互式查询.流处理.通过统一的框架将各种处理流程整合到一起. spark特性 快速性 spark通过使用先进的DAG调度器.查询优化器和物理执行引擎,可以高性能地进行批量及流式处理.使用逻辑回归算法进行迭代计算,spark比hadoop速度快100多倍. 简单易用 spark支持多种编程语言,比如Java.Scala.Python.R及SQL. spark提供了超过80多

Spark简介 --大数据

一.Spark是什么? 快速且通用的集群计算平台 二.Spark的特点: 快速:Spark扩充流行的Mapreduce计算模型,是基于内存的计算 通用:Spark的设计容纳了其它分布式系统拥有的功能,批处理.迭代式计算.交互查询和流处理等,降低了维护成本 高度开放:Spark提供Python.Java.Scala.SQL的API和丰富的内置库,Spark和其它的大数据工作整合得很好,包括hadoop.Kafka 三.Spark的组件 1.Spark Core 包含基本功能,包括任务调度.内存管理

Spark入门实战系列--1.Spark及其生态圈简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark St

RDD机制实现模型Spark初识

Spark简介 Spark是基于内存计算的大数据分布式计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性.       在Spark中,通过RDD(Resilient Distributed Dataset,弹性分布式数据集)来进行计算,这些分布式集合,并行的分布在整个集群中.RDDs是Spark分发数据和计算的基础抽象类. RDD属性: - A list of partitions - A function for computing eac

新手入门:Spark部署实战入门

Spark简介 整体认识 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架.最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一. Spark在整个大数据系统中处于中间偏上层的地位,如下图,对hadoop起到了补充作用: 基本概念 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 第一步分割任务.首先我们需要有一个fo