利用Spark mllab进行机器学习的基本操作(聚类,分类,回归分析)

Spark作为一种开源集群计算环境,具有分布式的快速数据处理能力。而Spark中的Mllib定义了各种各样用于机器学习的数据结构以及算法。Python具有Spark的API。需要注意的是,Spark中,所有数据的处理都是基于RDD的。

首先举一个聚类方面的详细应用例子Kmeans:

   下面代码是一些基本步骤,包括外部数据,RDD预处理,训练模型,预测。

#coding:utf-8
from numpy import array
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel

if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample",master=‘local‘)  # SparkContext

    # 读取并处理数据
    data = sc.textFile("./kmeans_data.txt")
    print data.collect()
    parsedData = data.map(lambda line: array([float(x) for x in line.split(‘ ‘)]))

    # 训练数据
    print parsedData.collect()
    clusters = KMeans.train(parsedData, k=2, maxIterations=10,
                            runs=10, initializationMode="random")

    #求方差之和
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))
    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)

    print("Within Set Sum of Squared Error = " + str(WSSSE))

    #聚类结果
    def sort(point):
        return clusters.predict(point)
    clusters_result = parsedData.map(sort)
    # Save and load model
    # $example off$
    print ‘聚类结果:‘
    print clusters_result.collect()
    sc.stop()

可以看到在利用Spark进行机器学习时,我调用了一个外部的开源包numpy,并利用了数组作为数据结构。而在Mllib中其实已经定义了各种用于机器学习的数据结构,下面简单介绍两种在分类和回归分析中可以用到的DS。

稀疏向量(SparseVector):稀疏向量是指向量元素中有许多值是0的向量。

其初始化与简单操作如下:  

# coding:utf-8
from pyspark.mllib.linalg import *
v0 = SparseVector(4, [1, 2], [2, 3.0])  # 稀疏向量,第一个参数为维度,第二个参数是非0维度的下标的集合,第三个参数是非0维度的值的集合
v1 = SparseVector(4,{1: 3, 2: 4}) # 第一个参数是维度,第二个参数是下标和维度组成的字典
print v0.dot(v1)  # 计算点积
print v0.size  # 向量维度
print v0.norm(0)  # 返回维度0的值
print v0.toArray()  # 转化为array
print v0.squared_distance(v1)  # 欧式距离

  spark中的稀疏向量可以利用list或者dict进行初始化。

向量标签(Labeled  point):向量标签就是在向量和标签的组合,分类和回归中,标签可以作为分类中的类别,也可以作为回归中的实际值。

from pyspark.mllib.regression import LabeledPoint
data = [
  LabeledPoint(1.0, [1.0, 1.0]),
  LabeledPoint(4.0, [1.0, 3.0]),
  LabeledPoint(8.0, [2.0, 3.0]),
  LabeledPoint(10.0, [3.0, 4.0])]
print data[0].features
print data[0].label

下面是mllib中用于回归分析的一些基本实现(线性回归,岭回归):

# coding:UTF-8
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.context import SparkContext

# ----------------线性回归--------------

import numpy as np
sc = SparkContext(master=‘local‘,appName=‘Regression‘)
data = [
  LabeledPoint(1.0, [1.0, 1.0]),
  LabeledPoint(2.0, [1.0, 1.4]),
  LabeledPoint(4.0, [2.0, 1.9]),
  LabeledPoint(6.0, [3.0, 4.0])]  # 训练集
lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, initialWeights=np.array([1.0,1.0]))
print lrm.predict(np.array([2.0,1.0]))  # 利用训练出的回归模型进行预测

import os, tempfile
from pyspark.mllib.regression import LinearRegressionModel
from pyspark.mllib.linalg import SparseVector

path = tempfile.mkdtemp()
lrm.save(sc, path)  # 将模型保存至外存
sameModel = LinearRegressionModel.load(sc, path)  # 读取模型
print sameModel.predict(SparseVector(2, {0: 100.0, 1: 150}))  # 利用稀疏向量作为数据结构,返回单个预测值
test_set = []
for i in range(100):
  for j in range(100):
    test_set.append(SparseVector(2, {0: i,1: j}))
print sameModel.predict(sc.parallelize(test_set)).collect()  # 预测多值,返回一个RDD数据集
print sameModel.weights  # 返回参数

# -----------------岭回归------------------

from pyspark.mllib.regression import RidgeRegressionWithSGD
data = [
  LabeledPoint(1.0, [1.0, 1.0]),
  LabeledPoint(4.0, [1.0, 3.0]),
  LabeledPoint(8.0, [2.0, 3.0]),
  LabeledPoint(10.0, [3.0, 4.0])]
train_set = sc.parallelize(data)
rrm = RidgeRegressionWithSGD.train(train_set, iterations=100, initialWeights=np.array([1.0,1.0]))
test_set = []
for i in range(100):
  for j in range(100):
    test_set.append(np.array([i, j]))
print rrm.predict(sc.parallelize(test_set)).collect()
print rrm.weights

上述代码只是让大家弄懂一下简单的操作,对于数据的预处理没有在RDD的基础上做。

下面是一些分类算法的基本实现:

# coding:utf-8
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint

print ‘-------逻辑回归-------‘
from pyspark.mllib.classification import LogisticRegressionWithSGD
sc = SparkContext(appName="LRWSGD", master=‘local‘)
dataset = []
for i in range(100):
    for j in range(100):
        dataset.append([i,j])
dataset = sc.parallelize(dataset)  # 并行化数据,转化为RDD

data =[LabeledPoint(0.0, [0.0, 100.0]),LabeledPoint(1.0, [100.0, 0.0]),]

lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)  # 第二个参数是迭代次数
print lrm.predict(dataset).collect()

lrm.clearThreshold()
print lrm.predict([0.0, 1.0])
# ----------------------------------------------------------
from pyspark.mllib.linalg import SparseVector
from numpy import array
sparse_data = [
    LabeledPoint(0.0, SparseVector(2, {0: 0.0, 1: 0.0})),
    LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
    LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
    LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
]
train = sc.parallelize(sparse_data)
lrm = LogisticRegressionWithSGD.train(train, iterations=10)
print lrm.predict(array([0.0, 1.0]))  # 对单个数组进行预测
print lrm.predict(SparseVector(2, {1: 1.0}))  # 对单个稀疏向量进行预测

print ‘------svm-------‘

from pyspark.mllib.classification import SVMWithSGD
svm = SVMWithSGD.train(train,iterations=10)
print svm.predict(SparseVector(2, {1: 1.0}))

print ‘------bayes------‘
from pyspark.mllib.classification import NaiveBayes
nb = NaiveBayes.train(train)
print nb.predict(SparseVector(2, {1: 1.0}))

版权都是我所有的,(*^__^*) 哈哈哈~

时间: 2024-08-28 00:52:05

利用Spark mllab进行机器学习的基本操作(聚类,分类,回归分析)的相关文章

Spark MLBase分布式机器学习系统入门:以MLlib实现Kmeans聚类算法

1.什么是MLBaseMLBase是Spark生态圈的一部分,专注于机器学习,包含三个组件:MLlib.MLI.ML Optimizer. ML Optimizer: This layer aims to automating the task of ML pipeline construction. The optimizer solves a search problem over feature extractors and ML algorithms included inMLI and

基于Spark的机器学习实践 (九) - 聚类算法

0 相关源码 1 k-平均算法(k-means clustering)概述 1.1 回顾无监督学习 ◆ 分类.回归都属于监督学习 ◆ 无监督学习是不需要用户去指定标签的 ◆ 而我们看到的分类.回归算法都需要用户输入的训练数据集中给定一个个明确的y值 1.2 k-平均算法与无监督学习 ◆ k-平均算法是无监督学习的一种 ◆ 它不需要人为指定一个因变量,即标签y ,而是由程序自己发现,给出类别y ◆ 除此之外,无监督算法还有PCA,GMM等 源于信号处理中的一种向量量化方法,现在则更多地作为一种聚类

离线轻量级大数据平台Spark之MLib机器学习库概念学习

Mlib机器学习库 1.1机器学习概念 机器学习有很多定义,倾向于下面这个定义.机器学习是对能通过经验自动改进的计算机算法的研究.机器学习依赖数据经验并评估和优化算法所运行出的模型.机器学习算法尝试根据训练数据使得表示算法行为的数学目标最大化,并以此来进行预测或作出决定.机器学习问题分类为几种,包括分类.回归.聚类.所有的机器学习算法都经过一条流水线:提取训练数据的特征->基于特征向量训练模型->评估模型选择最佳.特征提取主要是提取训练数据中的数值特征,用于数学建模.机器学习一般有如下分类:

Mahout机器学习平台之聚类算法详细剖析(含实例分析)

第一部分: 学习Mahout必须要知道的资料查找技能: 学会查官方帮助文档: 解压用于安装文件(mahout-distribution-0.6.tar.gz),找到如下位置,我将该文件解压到win7的G盘mahout文件夹下,路径如下所示: G:\mahout\mahout-distribution-0.6\docs 学会查源代码的注释文档: 方案一:用maven创建一个mahout的开发环境(我用的是win7,eclipse作为集成开发环境,之后在Maven Dependencies中找到相应

【机器学习】K-Means 聚类是特殊的矩阵分解问题

[机器学习]K-Means 聚类是特殊的矩阵分解(Matrix Factorization)问题 原文是:<k-Means Clustering Is Matrix Factorization> 本博客是该论文的阅读笔记,不免有很多细节不对之处. 还望各位看官能够见谅,欢迎批评指正. 更多相关博客请猛戳:http://blog.csdn.net/cyh_24 如需转载,请附上本文链接:http://blog.csdn.net/cyh_24/article/details/50408884 论文

Spark调研笔记第7篇 - 应用实战: 如何利用Spark集群计算物品相似度

本文是Spark调研笔记的最后一篇,以代码实例说明如何借助Spark平台高效地实现推荐系统CF算法中的物品相似度计算. 在推荐系统中,最经典的推荐算法无疑是协同过滤(Collaborative Filtering, CF),而item-cf又是CF算法中一个实现简单且效果不错的算法. 在item-cf算法中,最关键的步骤是计算物品之间的相似度.本文以代码实例来说明如何利用Spark平台快速计算物品间的余弦相似度. Cosine Similarity是相似度的一种常用度量,根据<推荐系统实践>一

转:机器学习sklearn19.0聚类算法——Kmeans算法

https://blog.csdn.net/loveliuzz/article/details/78783773 机器学习sklearn19.0聚类算法--Kmeans算法 原文地址:https://www.cnblogs.com/ruogu2019/p/10291656.html

搜索引擎——用户搜索意图的理解及其难点解析,本质是利用机器学习用户的意图分类

用户搜索意图的理解及其难点解析 搜索引擎涉及的技术非常的繁复,既有工程架构方面的,又有算法策略方面的.综合来讲,一个搜索引擎的技术构建主要包含三大部分: 对 query 的理解 对内容(文档)的理解 对 query 和内容(文档)的匹配和排序 (点击放大图像) 我们今天主要探讨其中的 Query Understanding,即对 query 的理解.对 query 的理解, 换句话说就是对用户搜索意图的理解.先看垂直搜索中的一些例子: "附近的特价酒店" "上海到扬州高速怎么

机器学习 (一)------分类

机器学习 (一)------分类 机器学习分类 机器学习分为监督学习和无监督学习两类. 监督学习是指在有标记的样本上建立机器学习的模型(这类算法知道预测什么,即目标变量的分类信息). 无监督学习恰恰相反,是指没有标记的数据上建立学习模型. 主要任务: 分类:主要任务是将实例数据划分到合适的分类中. 回归:例如数据拟合曲线(根据给定数据点的最优拟合曲线),主要用于预测数值型数据. 如何选择合适的算法: 从上表中选择实际可用的算法,要考虑以下两个方面的问题: 1.使用机器学习算法的目的,想要算法完成