Spark MLlib之线性回归源码分析

1.理论基础

线性回归(Linear Regression)问题属于监督学习(Supervised Learning)范畴,又称分类(Classification)或归纳学习(Inductive Learning);这类分析中训练数据集中给出的数据类标是确定的;机器学习的目标是,对于给定的一个训练数据集,通过不断的分析和学习产生一个联系属性集合和类标集合的分类函数(Classification
Function)或预测函数(Prediction Function),这个函数称为分类模型(Classification Model)或预测模型(Prediction Model);通过学习得到的模型可以是一个决策树,规格集,贝叶斯模型或一个超平面。通过这个模型可以对输入对象的特征向量预测或对对象的类标进行分类。

回归问题中通常使用最小二乘(Least Squares)法来迭代最优的特征中每个属性的比重,通过损失函数(Loss Function)或错误函数(Error Function)定义来设置收敛状态,即作为剃度下降算法的逼近参数因子。

2.矩阵向量运算库jblas介绍

由于spark MLlib中使用jlbas的线性代数运算库,因此学习和掌握jlbas库中基本的运算,对分析和学习spark中MLlib很多算法很有帮助;下面使用jlbas中DoubleMatrix矩阵对jlbas中基本运算进行简单介绍:

    val matrix1 = DoubleMatrix.ones(10, 1) //创建所有值为1的10*1矩阵
    val matrix2 = DoubleMatrix.zeros(10, 1) //创建所有值为0的10*1矩阵
    matrix1.put(1, -10)
    val absSum = matrix1.norm1() //绝对值之和
    val euclideanNorm = matrix1.norm2() //欧几里德距离
    val matrix3 = (matrix1.addi(matrix2))
    val matrix4 = new DoubleMatrix(1, 10, (1 to 10).map(_.toDouble): _*) //创建Double向量对象
    println("print init value:matrix3=" + matrix3)
    println("print init value:matrix4=" + matrix4)
    println("matrix sub matrix:" + matrix3.sub(matrix4) + "," + matrix4.sub(10)) //减法运算
    println("matrix add matrix:" + matrix3.add(matrix4) + "," + matrix4.add(10)) //加法运算
    println("matrix mul matrix:" + matrix3.mul(matrix4) + "," + matrix4.mul(10)) //乘法运算
    println("matrix div matrix:" + matrix3.div(matrix4) + "," + matrix4.div(10)) //除法运算
    println("matrix dot matrix:" + matrix3.dot(matrix4)) //向量积

    val matrix5 = DoubleMatrix.ones(10, 10)
    println("N*M Vector Matrix sub OP:\n" + matrix5.subRowVector(matrix4) + "\n" + matrix5.subColumnVector(matrix4)) //多对象减法运算
    println("N*M Vector Matrix add OP:\n" + matrix5.addRowVector(matrix4) + "\n" + matrix5.addColumnVector(matrix4)) //多对象加法运算
    println("N*M Vector Matrix mul OP:\n" + matrix5.mulRowVector(matrix4) + "\n" + matrix5.mulColumnVector(matrix4)) //多对象乘法运算
    println("N*M Vector Matrix div OP:\n" + matrix5.divRowVector(matrix4) + "\n" + matrix5.divColumnVector(matrix4)) //多对象除法运算

3.梯度下降(Gradient Descent)算法介绍

梯度下降算法用于在迭代过程中逐渐降阶,不断更新特征权重向量,从而得到无限接近或拟合的最优特征权重向量 ;梯度下降算法主要有两种,第一种是批量梯度下降(Batch Gradient Descent)算法,此种方式实现过程是对权重向量进行累加,然后批量更新的一种方式,一般不实用于大规模数据集处理;另外一种是随机梯度下降(Stochastic
Gradient Descent)算法,这种方式对给定训练数据集中每个对象都进行权重计算和更新,在某些情况下容易收敛到局部最优解上。Spark MLlib库中主要使用随机梯度下降算法。为了更好的理解MLlib库中随机梯度算法(MLlib库中类后缀名以SGD结尾的所有算法)实现,下面是使用随机梯度算法进行直线拟合的Demo:

  def sgdDemo {
    val featuresMatrix: List[List[Double]] = List(List(1, 4), List(2, 5), List(5, 1), List(4, 2))//特征矩阵
    val labelMatrix: List[Double] = List(19, 26, 19, 20)//真实值向量
    var theta: List[Double] = List(0, 0)
    var loss: Double = 10.0
    for {
      i <- 0 until 1000 //迭代次数
      if (loss > 0.01) //收敛条件loss<=0.01
    } {
      var error_sum = 0.0 //总误差
      var j = i % 4
      var h = 0.0
      for (k <- 0 until 2) {
        h += featuresMatrix(j)(k) * theta(k)
      } //计算给出的测试数据集中第j个对象的计算类标签
      error_sum = labelMatrix(j) - h //计算给出的测试数据集中类标签与计算的类标签的误差值
      var cacheTheta: List[Double] = List()

      for (k <- 0 until 2) {
        val updaterTheta = theta(k) + 0.001 * (error_sum) * featuresMatrix(j)(k)
        cacheTheta = updaterTheta +: cacheTheta
      } //更新权重向量
      cacheTheta.foreach(t => print(t + ","))
      print(error_sum + "\n")
      theta = cacheTheta
      //更新误差率
      var currentLoss: Double = 0
      for (j <- 0 until 4) {
        var sum = 0.0
        for (k <- 0 until 2) {
          sum += featuresMatrix(j)(k) * theta(k)
        }
        currentLoss += (sum - labelMatrix(j)) * (sum - labelMatrix(j))
      }
      loss = currentLoss
      println("loss->>>>" + loss / 4 + ",i>>>>>" + i)
    }
  }

4.MLlib线性回归源码分析

MLlib中可用的线性回归算法有:LinearRegressionWithSGD,RidgeRegressionWithSGD,LassoWithSGD;MLlib回归分析中涉及到的主要类有,GeneralizedLinearAlgorithm,GradientDescent。下面以对LinearRegressionWithSGD实现进行主要分析。

第一步:在使用LinearRegressionWithSGD算法之前首先将输入数据解析成包含类标和特征向量的LabeledPoint对象的RDD弹性分布式数据集合。

第二步:调用LinearRegressionWithSGD伴生对象中的train方法传输第一步创建的RDD集合和最大迭代次数,在train中主要实现创建一个新的LinearRegressionWithSGD对象,初始化梯度下降算使用使用最小二乘梯度下降算法SquaredGradient以及更新权重向量使用SimpleUpdater,执行父类GeneralizedLinearAlgorithm中的run方法进行权重向量和拦截参数计算,并返回训练得到的模型属性权重向量

LinearRegressionWithSGD伴生对象中train方法实现

  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,//默认步长为1
      miniBatchFraction: Double)//每次跌打使用的batch因子,默认为1
    : LinearRegressionModel =
  {
    new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
  }

LinearRegressionWithSGD中run方法实现

  def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {

    // Check the data properties before running the optimizer
    if (validateData && !validators.forall(func => func(input))) {//预验证输入数据的合法性,validators中存储验证的所有方法
      throw new SparkException("Input validation failed.")
    }
    // Prepend an extra variable consisting of all 1.0's for the intercept.
    val data = if (addIntercept) {//判读是否需要添加intercept
      input.map(labeledPoint => (labeledPoint.label, 1.0 +: labeledPoint.features))
    } else {
      input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
    }//对象转化为元组(类标签,特征)

    val initialWeightsWithIntercept = if (addIntercept) {
      0.0 +: initialWeights
    } else {
      initialWeights
    }//初始化权重特征

    val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)//返回优化后的权重

    val (intercept, weights) = if (addIntercept) {
      (weightsWithIntercept(0), weightsWithIntercept.tail)
    } else {
      (0.0, weightsWithIntercept)
    }

    logInfo("Final weights " + weights.mkString(","))
    logInfo("Final intercept " + intercept)

    createModel(weights, intercept)//使用计算后的权重向量和拦截参数创建模型
  }

其中optimizer.optimize(data, initialWeightsWithIntercept)是LinearRegressionWithSGD实现的核心,oprimizer的类型为GradientDescent,optimize方法中主要调用GradientDescent伴生对象的runMiniBatchSGD方法,返回当前迭代产生的最优特征权重向量。

GradientDescentd对象中optimize方法实现

  def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    : Array[Double] = {
	//返回优化后的权重向量,和迭代过程中误差
    val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
        data,
        gradient,
        updater,
        stepSize,
        numIterations,
        regParam,
        miniBatchFraction,
        initialWeights)
    weights
  }

GradientDescent伴生对象中runMiniBatchSGD方法实现

  def runMiniBatchSGD(
    data: RDD[(Double, Array[Double])],
    gradient: Gradient,//SquaredGradient—平方剃度下降算法
    updater: Updater,//SimpleUpdater
    stepSize: Double,//1.0
    numIterations: Int,//100
    regParam: Double,//0.0
    miniBatchFraction: Double,//1.0
    initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {

    val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

    val nexamples: Long = data.count()
    val miniBatchSize = nexamples * miniBatchFraction

    // Initialize weights as a column vector//创建一维向量,第一个参数为行数,第二个参数为列数,第三个参数开始为值
    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
    var regVal = 0.0

    for (i <- 1 to numIterations) {
      /**
       * 使用平方梯度下降算法
       * gradientSum:本次选择迭代样本的梯度总和,
       * lossSum:本次选择迭代样本的误差总和
       */
      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
        case (y, features) =>//(标签,特征)
          val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
          val (grad, loss) = gradient.compute(featuresCol, y, weights)//(特征,标签,特征属性权重向量)
          /**
           * class SquaredGradient extends Gradient {
			  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
			      (DoubleMatrix, Double) = {
			    val diff: Double = data.dot(weights) - label//计算当前计算对象的类标签与实际类标签值之差
			    val loss = 0.5 * diff * diff//当前平方梯度下降值
			    val gradient = data.mul(diff)
			    (gradient, loss)
			  }
			}
           */
          (grad, loss)//当前训练对象的特征权重向量和误差
      }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) //计算本次迭代所选训练数据新的特征权重向量之和和误差总和
      /**
       * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
       * and regVal is the regularization value computed in the previous iteration as well.
       */
      stochasticLossHistory.append(lossSum / miniBatchSize + regVal)//miniBatchSize=样本中对象数量*batch因子,regVal为回归因子
      val update = updater.compute(
        weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)//weights:属性向量中设置的权重因子,regParam:为回归参数,stepSize:计算步长,i:当前迭代次数
      /**
       * class SimpleUpdater extends Updater {
       * /**
          * weihtsOld:上一次迭代计算后的特征权重向量
          * gradient:本次迭代计算的特征权重向量
          * stepSize:迭代步长
          * iter:当前迭代次数
          * regParam:回归参数
          */
		  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
		      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
		    val thisIterStepSize = stepSize / math.sqrt(iter)//以当前迭代次数的平方根的倒数作为本次迭代趋近(下降)的因子
		    val normGradient = gradient.mul(thisIterStepSize)
		    (weightsOld.sub(normGradient), 0)//返回本次剃度下降后更新的特征权重向量
		  }
		}
	   *
       */
      weights = update._1
      regVal = update._2//使用SimpleUpdater值为0
    }

    logInfo("GradientDescent finished. Last 10 stochastic losses %s".format(
      stochasticLossHistory.takeRight(10).mkString(", ")))

    (weights.toArray, stochasticLossHistory.toArray)
  }

在MiniBatchSGD中主要实现对输入数据集进行迭代抽样,通过使用SquaredGradient作为梯度下降算法,使用SimpleUpdater作为更新算法,不断对抽样数据集进行迭代计算从而找出最优的特征权重向量解。

使用官方测试代码如下:

  def linearRegressionAPITest(sc: SparkContext) {
    val url = "/Users/yangguo/hadoop/spark/mllib/data/ridge-data/lpsa.data"
    val data = sc.textFile(url)
    val parseData = data.map { line =>
      val parts = line.split(',')
      LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
    }
    val numIterations = 20
    val model = LinearRegressionWithSGD.train(parseData, numIterations)
    val valuesAndPreds = parseData.map { point =>
      val prediction = model.predict(point.features)
      (point, prediction)
    }
    valuesAndPreds.foreach { case (v, p) => print("[" + v.label + "," + p + "]"); v.features.foreach(base => print(base + "--")); println("\n") }
    val isSuccessed = valuesAndPreds.map { case (v, p) => math.pow((p - v.label), 2) }.reduce(_ + _) / valuesAndPreds.count
    println(isSuccessed)
  }

参考:

[1].http://rdc.taobao.org/?p=2163

[2].http://cs229.stanford.edu/notes/cs229-notes1.pdf

[3].http://blog.sina.com.cn/s/blog_62339a2401015jyq.html

[4].http://blog.csdn.net/pennyliang/article/details/6998517

[5].http://en.wikipedia.org/wiki/Lasso_(statistics)#Lasso_method

Spark MLlib之线性回归源码分析

时间: 2024-10-07 18:37:56

Spark MLlib之线性回归源码分析的相关文章

spark内核揭秘-10-RDD源码分析

RDD的核心方法: 首先看一下getPartitions方法的源码: getPartitions返回的是一系列partitions的集合,即一个Partition类型的数组 我们就想进入HadoopRDD实现: 1.getJobConf():用来获取job Configuration,获取配置方式有clone和非clone方式,但是clone方式 是not thread-safe,默认是禁止的,非clone方式可以从cache中获取,如cache中没有那就创建一个新的,然后再放到cache中 2

Accuracy(准确率), Precision(精确率), 和F1-Measure, 结合Spark源码分析

例子 某大学一个系,总共100人,其中男90人,女10人,现在根据每个人的特征,预测性别 Accuracy(准确率) Accuracy=预测正确的数量需要预测的总数 计算 由于我知道男生远多于女生,所以我完全无视特征,直接预测所有人都是男生 我预测所的人都是男生,而实际有90个男生,所以 预测正确的数量 = 90 需要预测的总数 = 100 Accuracy = 90 / 100 = 90% 问题 在男女比例严重不均匀的情况下,我只要预测全是男生,就能获得极高的Accuracy. 所以在正负样本

Spark中决策树源码分析

1.Example 使用Spark MLlib中决策树分类器API,训练出一个决策树模型,使用Python开发. """ Decision Tree Classification Example. """from __future__ import print_functionfrom pyspark import SparkContextfrom pyspark.mllib.tree import DecisionTree, DecisionT

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

Spark SQL源码分析之核心流程

自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql. 2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里. 前一段时间测试过Shark,并且对Spark