Spark MLlib SVM算法

1.1 SVM支持向量机算法

支持向量机理论知识参照以下文档:

支持向量机SVM(一)

http://www.cnblogs.com/jerrylead/archive/2011/03/13/1982639.html

支持向量机SVM(二)

http://www.cnblogs.com/jerrylead/archive/2011/03/13/1982684.html

支持向量机(三)核函数

http://www.cnblogs.com/jerrylead/archive/2011/03/18/1988406.html

支持向量机(四)

http://www.cnblogs.com/jerrylead/archive/2011/03/18/1988415.html

支持向量机(五)SMO算法

http://www.cnblogs.com/jerrylead/archive/2011/03/18/1988419.html

SVM的目标函数及梯度下降更新公式如下:

MLlib 中 SVM的代码结构如下:

1.2 Spark Mllib SVM源码分析

1.2.1 SVMWithSGD

SVM算法的train方法,由SVMWithSGD类的object定义了train函数,在train函数中新建了SVMWithSGD对象。

package org.apache.spark.mllib.classification

// 1
类:SVMWithSGD

class SVMWithSGD
private (

privatevar stepSize: Double,

privatevar numIterations: Int,

privatevar regParam: Double,

privatevar miniBatchFraction: Double)

extends GeneralizedLinearAlgorithm[SVMModel]
with Serializable {

privateval gradient =
new HingeGradient()

privateval updater =
new SquaredL2Updater()

overrideval optimizer =
new GradientDescent(gradient, updater)

.setStepSize(stepSize)

.setNumIterations(numIterations)

.setRegParam(regParam)

.setMiniBatchFraction(miniBatchFraction)

overrideprotectedval validators
= List(DataValidators.binaryLabelValidator)

/**

* Construct a SVM object with default parameters: {stepSize: 1.0, numIterations: 100,

* regParm: 0.01, miniBatchFraction: 1.0}.

*/

defthis() =
this(1.0,
100, 0.01,
1.0)

overrideprotecteddef createModel(weights:
Vector, intercept: Double) = {

new SVMModel(weights, intercept)

}

}

SVMWithSGD类中参数说明:

stepSize:
迭代步长,默认为1.0

numIterations:
迭代次数,默认为100

regParam:
正则化参数,默认值为0.0

miniBatchFraction:
每次迭代参与计算的样本比例,默认为1.0

gradient:HingeGradient (),梯度下降;

updater:SquaredL2Updater (),正则化,L2范数;

optimizer:GradientDescent (gradient, updater),梯度下降最优化计算。

// 2 train方法

object SVMWithSGD {

/**

* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number

* of iterations of gradient descent using the specified step size. Each iteration uses

* `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in

* gradient descent are initialized using the initial weights provided.

*

* NOTE: Labels used in SVM should be {0, 1}.

*

* @param input RDD of (label, array of features) pairs.

* @param numIterations Number of iterations of gradient descent to run.

* @param stepSize Step size to be used for each iteration of gradient descent.

* @param regParam Regularization parameter.

* @param miniBatchFraction Fraction of data to be used per iteration.

* @param initialWeights Initial set of weights to be used. Array should be equal in size to

*        the number of features in the data.

*/

def train(

input: RDD[LabeledPoint],

numIterations: Int,

stepSize: Double,

regParam: Double,

miniBatchFraction: Double,

initialWeights: Vector): SVMModel = {

new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction)

.run(input, initialWeights)

}

train参数说明:

input:样本数据,分类标签lable只能是1.0和0.0两种,feature为double类型

numIterations:
迭代次数,默认为100

stepSize:
迭代步长,默认为1.0

miniBatchFraction:
每次迭代参与计算的样本比例,默认为1.0

initialWeights:初始权重,默认为0向量

run方法来自于继承父类GeneralizedLinearAlgorithm,实现方法如下。

1.2.2 GeneralizedLinearAlgorithm

LogisticRegressionWithSGD中run方法的实现。

package org.apache.spark.mllib.regression

/**

* Run the algorithm with the configured parameters on an input RDD

* of LabeledPoint entries starting from the initial weights provided.

*/

def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {

//
特征维度赋值。

if (numFeatures <
0) {

numFeatures = input.map(_.features.size).first()

}

//
输入样本数据检测。

if (input.getStorageLevel == StorageLevel.NONE) {

logWarning("The input data is not directly cached, which may hurt performance if its"

+ " parent RDDs are also uncached.")

}

//
输入样本数据检测。

// Check the data properties before running the optimizer

if (validateData && !validators.forall(func => func(input))) {

thrownew SparkException("Input validation failed.")

}

val scaler =
if (useFeatureScaling) {

new StandardScaler(withStd =
true, withMean =
false).fit(input.map(_.features))

} else {

null

}

// 输入样本数据处理,输出data(label, features)格式。

// addIntercept:是否增加θ0常数项,若增加,则增加x0=1项。

// Prepend an extra variable consisting of all 1.0‘s for the intercept.

// TODO: Apply feature scaling to the weight vector instead of input data.

val data =

if (addIntercept) {

if (useFeatureScaling) {

input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache()

} else {

input.map(lp => (lp.label, appendBias(lp.features))).cache()

}

} else {

if (useFeatureScaling) {

input.map(lp => (lp.label, scaler.transform(lp.features))).cache()

} else {

input.map(lp => (lp.label, lp.features))

}

}

//初始化权重。

// addIntercept:是否增加θ0常数项,若增加,则权重增加θ0。

/**

* TODO: For better convergence, in logistic regression, the intercepts should be computed

* from the prior probability distribution of the outcomes; for linear regression,

* the intercept should be set as the average of response.

*/

val initialWeightsWithIntercept =
if (addIntercept && numOfLinearPredictor ==
1) {

appendBias(initialWeights)

} else {

/** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */

initialWeights

}

//权重优化,进行梯度下降学习,返回最优权重。

val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)

val intercept =
if (addIntercept && numOfLinearPredictor ==
1) {

weightsWithIntercept(weightsWithIntercept.size - 1)

} else {

0.0

}

var weights =
if (addIntercept && numOfLinearPredictor ==
1) {

Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size -
1))

} else {

weightsWithIntercept

}

createModel(weights, intercept)

}

其中optimizer.optimize(data, initialWeightsWithIntercept)是实现的核心。

oprimizer的类型为GradientDescent,optimize方法中主要调用GradientDescent伴生对象的runMiniBatchSGD方法,返回当前迭代产生的最优特征权重向量。

GradientDescentd对象中optimize实现方法如下。

1.2.3 GradientDescent

optimize实现方法如下。

package org.apache.spark.mllib.optimization

/**

* :: DeveloperApi ::

* Runs gradient descent on the given training data.

* @param data training data

* @param initialWeights initial weights

* @return solution vector

*/

@DeveloperApi

def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {

val (weights, _) = GradientDescent.runMiniBatchSGD(

data,

gradient,

updater,

stepSize,

numIterations,

regParam,

miniBatchFraction,

initialWeights)

weights

}

}

在optimize方法中,调用了GradientDescent.runMiniBatchSGD方法,其runMiniBatchSGD实现方法如下:

/**

* Run stochastic gradient descent (SGD) in parallel using mini batches.

* In each iteration, we sample a subset (fraction miniBatchFraction) of the total data

* in order to compute a gradient estimate.

* Sampling, and averaging the subgradients over this subset is performed using one standard

* spark map-reduce in each iteration.

*

* @param data - Input data for SGD. RDD of the set of data examples, each of

*               the form (label, [feature values]).

* @param gradient - Gradient object (used to compute the gradient of the loss function of

*                   one single data example)

* @param updater - Updater function to actually perform a gradient step in a given direction.

* @param stepSize - initial step size for the first step

* @param numIterations - number of iterations that SGD should be run.

* @param regParam - regularization parameter

* @param miniBatchFraction - fraction of the input data set that should be used for

*                            one iteration of SGD. Default value 1.0.

*

* @return A tuple containing two elements. The first element is a column matrix containing

*         weights for every feature, and the second element is an array containing the

*         stochastic loss computed for every iteration.

*/

def runMiniBatchSGD(

data: RDD[(Double, Vector)],

gradient: Gradient,

updater: Updater,

stepSize: Double,

numIterations: Int,

regParam: Double,

miniBatchFraction: Double,

initialWeights: Vector): (Vector, Array[Double]) = {

//历史迭代误差数组

val stochasticLossHistory =
new ArrayBuffer[Double](numIterations)

//样本数据检测,若为空,返回初始值。

val numExamples = data.count()

// if no data, return initial weights to avoid NaNs

if (numExamples ==
0) {

logWarning("GradientDescent.runMiniBatchSGD returning initial weights, no data found")

return (initialWeights, stochasticLossHistory.toArray)

}

// miniBatchFraction值检测。

if (numExamples * miniBatchFraction <
1) {

logWarning("The miniBatchFraction is too small")

}

// weights权重初始化。

// Initialize weights as a column vector

var weights = Vectors.dense(initialWeights.toArray)

val n = weights.size

/**

* For the first iteration, the regVal will be initialized as sum of weight squares

* if it‘s L2 updater; for L1 updater, the same logic is followed.

*/

var regVal = updater.compute(

weights, Vectors.dense(new Array[Double](weights.size)),
0, 1, regParam)._2

// weights权重迭代计算。

for (i <-
1 to numIterations) {

val bcWeights = data.context.broadcast(weights)

// Sample a subset (fraction miniBatchFraction) of the total data

// compute and sum up the subgradients on this subset (this is one map-reduce)

// 采用treeAggregate的RDD方法,进行聚合计算,计算每个样本的权重向量、误差值,然后对所有样本权重向量及误差值进行累加。

// sample是根据miniBatchFraction指定的比例随机采样相应数量的样本 。

val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction,
42 + i)

.treeAggregate((BDV.zeros[Double](n), 0.0,
0L))(

seqOp = (c, v) => {

// c: (grad, loss, count), v: (label, features)

val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))

(c._1, c._2 + l, c._3 + 1)

},

combOp = (c1, c2) => {

// c: (grad, loss, count)

(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)

})

// 保存本次迭代误差值,以及更新weights权重向量。

if (miniBatchSize >
0) {

/**

* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration

* and regVal is the regularization value computed in the previous iteration as well.

*/

// updater.compute更新weights矩阵和regVal(正则化项)。根据本轮迭代中的gradient和loss的变化以及正则化项计算更新之后的weights和regVal。 

stochasticLossHistory.append(lossSum / miniBatchSize + regVal)

val update = updater.compute(

weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam)

weights = update._1

regVal = update._2

} else {

logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")

}

}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(

stochasticLossHistory.takeRight(10).mkString(", ")))

(weights, stochasticLossHistory.toArray)

}

runMiniBatchSGD的输入、输出参数说明:

data
样本输入数据,格式 (label, [feature values])

gradient
梯度对象,用于对每个样本计算梯度及误差

updater
权重更新对象,用于每次更新权重

stepSize
初始步长

numIterations
迭代次数

regParam
正则化参数

miniBatchFraction
迭代因子,每次迭代参与计算的样本比例

返回结果(Vector, Array[Double]),第一个为权重,每二个为每次迭代的误差值。

在MiniBatchSGD中主要实现对输入数据集进行迭代抽样,通过使用LogisticGradient作为梯度下降算法,使用SquaredL2Updater作为更新算法,不断对抽样数据集进行迭代计算从而找出最优的特征权重向量解。在LinearRegressionWithSGD中定义如下:

privateval gradient =
new HingeGradient()

privateval updater =
new SquaredL2Updater()

overrideval optimizer =
new GradientDescent(gradient, updater)

.setStepSize(stepSize)

.setNumIterations(numIterations)

.setRegParam(regParam)

.setMiniBatchFraction(miniBatchFraction)

runMiniBatchSGD方法中调用了gradient.compute、updater.compute两个方法,其实现方法如下。

1.2.4 gradient & updater

1)gradient

//计算当前计算对象的类标签:(2 * label - 1.0)

//计算当前梯度:-(2y - 1)*x

//计算当前误差:(0, 1 - (2y - 1) (f_w(x)))

//计算权重的更新值

//返回当前训练对象的特征权重向量和误差

/**

* :: DeveloperApi ::

* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.

* See also the documentation for the precise formulation.

* NOTE: This assumes that the labels are {0,1}

*/

@DeveloperApi

class HingeGradient
extends Gradient {

overridedef compute(data: Vector, label: Double, weights: Vector):
(Vector, Double) = {

val dotProduct = dot(data, weights)

// Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x)))

// Therefore the gradient is -(2y - 1)*x

val labelScaled =
2 * label -
1.0

if (1.0 > labelScaled * dotProduct) {

val gradient = data.copy

scal(-labelScaled, gradient)

(gradient, 1.0 - labelScaled * dotProduct)

} else {

(Vectors.sparse(weights.size, Array.empty, Array.empty),
0.0)

}

}

2)updater

//weihtsOld:上一次迭代计算后的特征权重向量

//gradient:本次迭代计算的特征权重向量

//stepSize:迭代步长

//iter:当前迭代次数

//regParam:正则参数 

//以当前迭代次数的平方根的倒数作为本次迭代趋近(下降)的因子  

//返回本次剃度下降后更新的特征权重向量  

//使用了L2 regularization(R(w) = 1/2 ||w||^2),weights更新规则为:

 

/**

* :: DeveloperApi ::

* Updater for L2 regularized problems.

*          R(w) = 1/2 ||w||^2

* Uses a step-size decreasing with the square root of the number of iterations.

*/

@DeveloperApi

class SquaredL2Updater
extends Updater {

overridedef compute(

weightsOld: Vector,

gradient: Vector,

stepSize: Double,

iter: Int,

regParam: Double): (Vector, Double) = {

// add up both updates from the gradient of the loss (= step) as well as

// the gradient of the regularizer (= regParam * weightsOld)

// w‘ = w - thisIterStepSize * (gradient + regParam * w)

// w‘ = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient

val thisIterStepSize = stepSize / math.sqrt(iter)

val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector

brzWeights :*= (1.0 - thisIterStepSize * regParam)

brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)

val norm = brzNorm(brzWeights,
2.0)

(Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)

}

}

1.3 Mllib SVM实例

1、数据

数据格式为:标签, 特征1 特征2 特征3……

0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224 211:252 212:253 213:252 214:202
215:84 216:252 217:253 218:122 236:163 237:252 238:252 239:252 240:253 241:252 242:252 243:96 244:189 245:253 246:167 263:51 264:238 265:253 266:253 267:190 268:114 269:253 270:228 271:47 272:79 273:255 274:168 290:48 291:238 292:252 293:252 294:179 295:12
296:75 297:121 298:21 301:253 302:243 303:50 317:38 318:165 319:253 320:233 321:208 322:84 329:253 330:252 331:165 344:7 345:178 346:252 347:240 348:71 349:19 350:28 357:253 358:252 359:195 372:57 373:252 374:252 375:63 385:253 386:252 387:195 400:198 401:253
402:190 413:255 414:253 415:196 427:76 428:246 429:252 430:112 441:253 442:252 443:148 455:85 456:252 457:230 458:25 467:7 468:135 469:253 470:186 471:12 483:85 484:252 485:223 494:7 495:131 496:252 497:225 498:71 511:85 512:252 513:145 521:48 522:165 523:252
524:173 539:86 540:253 541:225 548:114 549:238 550:253 551:162 567:85 568:252 569:249 570:146 571:48 572:29 573:85 574:178 575:225 576:253 577:223 578:167 579:56 595:85 596:252 597:252 598:252 599:229 600:215 601:252 602:252 603:252 604:196 605:130 623:28
624:199 625:252 626:252 627:253 628:252 629:252 630:233 631:145 652:25 653:128 654:252 655:253 656:252 657:141 658:37

1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 189:253 190:62 214:127 215:251 216:251 217:253 218:62 241:68 242:236 243:251 244:211 245:31 246:8 268:60 269:228 270:251 271:251 272:94 296:155 297:253
298:253 299:189 323:20 324:253 325:251 326:235 327:66 350:32 351:205 352:253 353:251 354:126 378:104 379:251 380:253 381:184 382:15 405:80 406:240 407:251 408:193 409:23 432:32 433:253 434:253 435:253 436:159 460:151 461:251 462:251 463:251 464:39 487:48 488:221
489:251 490:251 491:172 515:234 516:251 517:251 518:196 519:12 543:253 544:251 545:251 546:89 570:159 571:255 572:253 573:253 574:31 597:48 598:228 599:253 600:247 601:140 602:8 625:64 626:251 627:253 628:220 653:64 654:251 655:253 656:220 681:24 682:193 683:253
684:220

……

2、代码

//1
读取样本数据

valdata_path =
"/user/tmp/sample_libsvm_data.txt"

valexamples = MLUtils.loadLibSVMFile(sc,
data_path).cache()

//2
样本数据划分训练样本与测试样本

valsplits =
examples.randomSplit(Array(0.6,
0.4), seed =
11L)

valtraining =
splits(0).cache()

valtest =
splits(1)

valnumTraining =
training.count()

valnumTest =
test.count()

println(s"Training: $numTraining, test: $numTest.")

//3
新建SVM模型,并设置训练参数

valnumIterations =
1000

valstepSize =
1

valminiBatchFraction =
1.0

  valmodel = SVMWithSGD.train(training, numIterations, stepSize, miniBatchFraction)

//4
对测试样本进行测试

valprediction =
model.predict(test.map(_.features))

valpredictionAndLabel =
prediction.zip(test.map(_.label))

//5
计算测试误差

valmetrics =
new MulticlassMetrics(predictionAndLabel)

valprecision =
metrics.precision

println("Precision = " +
precision)

时间: 2024-10-15 02:06:09

Spark MLlib SVM算法的相关文章

Spark MLlib机器学习算法、源码及实战讲解pdf电子版下载

Spark MLlib机器学习算法.源码及实战讲解pdf电子版下载 链接:https://pan.baidu.com/s/1ruX9inG5ttOe_5lhpK_LQg 提取码:idcb <Spark MLlib机器学习:算法.源码及实战详解>书中讲解由浅入深慢慢深入,解析讲解了MLlib的底层原理:数据操作及矩阵向量计算操作,该部分是MLlib实现的基础:并对此延伸机器学习的算法,循序渐进的讲解其中的原理,是读者一点一点的理解和掌握书中的知识. 目录 · · · · · · 第一部分 Spa

Spark MLlib FPGrowth算法

1.1 FPGrowth算法 1.1.1 基本概念 关联规则挖掘的一个典型例子是购物篮分析.关联规则研究有助于发现交易数据库中不同商品(项)之间的联系,找出顾客购买行为模式,如购买了某一商品对购买其他商品的影响,分析结果可以应用于商品货架布局.货存安排以及根据购买模式对用户进行分类. 关联规则的相关术语如下: (1)项与项集 这是一个集合的概念,在一篮子商品中的一件消费品即为一项(Item),则若干项的集合为项集,如{啤酒,尿布}构成一个二元项集. (2)关联规则 一般记为的形式,X为先决条件,

spark Mllib SVM实例

Mllib SVM实例 1.数据 数据格式为:标签, 特征1 特征2 特征3…… 0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:252 189:57 190:6 208:10 209:60 210:224 211:252 212:253 213:252 214:202

Spark MLlib中分类和回归算法

Spark MLlib中分类和回归算法: -分类算法: pyspark.mllib.classification -朴素贝叶斯 NaiveBayes -支持向量机(优化:随机梯度下降)SVMWithSGD -逻辑回归  LogisticRegressionWithSGD // 从Spark 2.0开始,官方推荐使用BFGS方式优化LR算法 LogisticRegressionWithBFGS // 针对流式数据实时模型训练算法 StreamingLogisticRegressionWithSGD

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark MLlib Linear Regression线性回归算法

1.Spark MLlib Linear Regression线性回归算法 1.1 线性回归算法 1.1.1 基础理论 在统计学中,线性回归(Linear Regression)是利用称为线性回归方程的最小平方函数对一个或多个自变量和因变量之间关系进行建模的一种回归分析.这种函数是一个或多个称为回归系数的模型参数的线性组合. 回归分析中,只包括一个自变量和一个因变量,且二者的关系可用一条直线近似表示,这种回归分析称为一元线性回归分析.如果回归分析中包括两个或两个以上的自变量,且因变量和自变量之间

Spark MLlib KMeans聚类算法

1.1 KMeans聚类算法 1.1.1 基础理论 KMeans算法的基本思想是初始随机给定K个簇中心,按照最邻近原则把待分类样本点分到各个簇.然后按平均法重新计算各个簇的质心,从而确定新的簇心.一直迭代,直到簇心的移动距离小于某个给定的值. K-Means聚类算法主要分为三个步骤: (1)第一步是为待聚类的点寻找聚类中心: (2)第二步是计算每个点到聚类中心的距离,将每个点聚类到离该点最近的聚类中去: (3)第三步是计算每个聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心: 反复执行(

Spark机器学习(5):SVM算法

1. SVM基本知识 SVM(Support Vector Machine)是一个类分类器,能够将不同类的样本在样本空间中进行分隔,分隔使用的面叫做分隔超平面. 比如对于二维样本,分布在二维平面上,此时超平面实际上是一条直线,直线上面是一类,下面是另一类.定义超平面为: f(x)=w0+wTx 可以想象出,这样的直线可以有很多条,到底哪一条是超平面呢?规定超平面应该是距离两类的最近距离之和最大,因为只有这样才是最优的分类. 假设超平面是w0+wTx=0,那么经过上面这一类距离超平面最近点的直线是

Spark MLlib算法调用展示平台及其实现过程

1. 软件版本: IDE:Intellij IDEA 14,Java:1.7,Scala:2.10.6:Tomcat:7,CDH:5.8.0: Spark:1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0 : Hadoop:hadoop2.6.0-cdh5.8.0:(使用的是CDH提供的虚拟机) 2. 工程下载及部署: Scala封装Spark算法工程:https://github.com/fansy1990/Spark_MLlib_Algorithm_1.6.0.git