关于spark的mllib学习总结(Java版)

本篇博客主要讲述如何利用spark的mliib构建机器学习模型并预测新的数据,具体的流程如下图所示:

加载数据

对于数据的加载或保存,mllib提供了MLUtils包,其作用是Helper methods to load,save and pre-process data used in MLLib.博客中的数据是采用spark中提供的数据sample_libsvm_data.txt,其有一百个数据样本,658个特征。具体的数据形式如图所示:

加载libsvm

JavaRDD<LabeledPoint> lpdata = MLUtils.loadLibSVMFile(sc, this.libsvmFile).toJavaRDD();

LabeledPoint数据类型是对应与libsvmfile格式文件, 具体格式为:

Lable(double类型),vector(Vector类型)

转化dataFrame数据类型

JavaRDD<Row> jrow = lpdata.map(new LabeledPointToRow());
StructType schema = new StructType(new StructField[]{
                    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
                    new StructField("features", new VectorUDT(), false, Metadata.empty()),
        });
SQLContext jsql = new SQLContext(sc);
DataFrame df = jsql.createDataFrame(jrow, schema);

DataFrame:DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者1个Python(或者R)中的data frame一样,但是比他们更优化。DataFrame可以根据结构化的数据文件、hive表、外部数据库或者已经存在的RDD构造。

SQLContext:spark sql所有功能的入口是SQLContext类,或者SQLContext的子类。为了创建一个基本的SQLContext,需要一个SparkContext。

特征提取

特征归一化处理

StandardScaler scaler = new StandardScaler().setInputCol("features").setOutputCol("normFeatures").setWithStd(true);
DataFrame scalerDF = scaler.fit(df).transform(df);
scaler.save(this.scalerModelPath);

利用卡方统计做特征提取

ChiSqSelector selector = new ChiSqSelector().setNumTopFeatures(500).setFeaturesCol("normFeatures").setLabelCol("label").setOutputCol("selectedFeatures");
ChiSqSelectorModel chiModel = selector.fit(scalerDF);
DataFrame selectedDF = chiModel.transform(scalerDF).select("label", "selectedFeatures");
chiModel.save(this.featureSelectedModelPath);

训练机器学习模型(以SVM为例)

//转化为LabeledPoint数据类型, 训练模型
JavaRDD<Row> selectedrows = selectedDF.javaRDD();
JavaRDD<LabeledPoint> trainset = selectedrows.map(new RowToLabel());

//训练SVM模型, 并保存
int numIteration = 200;
SVMModel model = SVMWithSGD.train(trainset.rdd(), numIteration);
model.clearThreshold();
model.save(sc, this.mlModelPath);

// LabeledPoint数据类型转化为Row
static class LabeledPointToRow implements Function<LabeledPoint, Row> {

        public Row call(LabeledPoint p) throws Exception {
            double label = p.label();
            Vector vector = p.features();
            return RowFactory.create(label, vector);
        }
    }

//Rows数据类型转化为LabeledPoint
static class RowToLabel implements Function<Row, LabeledPoint> {

        public LabeledPoint call(Row r) throws Exception {
            Vector features = r.getAs(1);
            double label = r.getDouble(0);
            return new LabeledPoint(label, features);
        }
    }

测试新的样本

测试新的样本前,需要将样本做数据的转化和特征提取的工作,所有刚刚训练模型的过程中,除了保存机器学习模型,还需要保存特征提取的中间模型。具体代码如下:

//初始化spark
SparkConf conf = new SparkConf().setAppName("SVM").setMaster("local");
conf.set("spark.testing.memory", "2147480000");
SparkContext sc = new SparkContext(conf);

//加载测试数据
JavaRDD<LabeledPoint> testData = MLUtils.loadLibSVMFile(sc, this.predictDataPath).toJavaRDD();

//转化DataFrame数据类型
JavaRDD<Row> jrow =testData.map(new LabeledPointToRow());
        StructType schema = new StructType(new StructField[]{
                    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
                    new StructField("features", new VectorUDT(), false, Metadata.empty()),
        });
SQLContext jsql = new SQLContext(sc);
DataFrame df = jsql.createDataFrame(jrow, schema);

        //数据规范化
StandardScaler scaler = StandardScaler.load(this.scalerModelPath);
DataFrame scalerDF = scaler.fit(df).transform(df);

        //特征选取
ChiSqSelectorModel chiModel = ChiSqSelectorModel.load( this.featureSelectedModelPath);
DataFrame selectedDF = chiModel.transform(scalerDF).select("label", "selectedFeatures");

测试数据集

SVMModel svmmodel = SVMModel.load(sc, this.mlModelPath);
JavaRDD<Tuple2<Double, Double>> predictResult = testset.map(new Prediction(svmmodel)) ;
predictResult.collect();

static class Prediction implements Function<LabeledPoint, Tuple2<Double , Double>> {
        SVMModel model;
        public Prediction(SVMModel model){
            this.model = model;
        }
        public Tuple2<Double, Double> call(LabeledPoint p) throws Exception {
            Double score = model.predict(p.features());
            return new Tuple2<Double , Double>(score, p.label());
        }
    }

计算准确率

double accuracy = predictResult.filter(new PredictAndScore()).count() * 1.0 / predictResult.count();
System.out.println(accuracy);

static class PredictAndScore implements Function<Tuple2<Double, Double>, Boolean> {
        public Boolean call(Tuple2<Double, Double> t) throws Exception {
            double score = t._1();
            double label = t._2();
            System.out.print("score:" + score + ", label:"+ label);
            if(score >= 0.0 && label >= 0.0) return true;
            else if(score < 0.0 && label < 0.0) return true;
            else return false;
        }
    }

具体的代码,放在我的github上:https://github.com/Quincy1994/MachineLearning/

时间: 2024-11-05 14:38:56

关于spark的mllib学习总结(Java版)的相关文章

Spark之MLLib学习

基于Spark On Yarn的淘宝数据挖掘平台:http://www.doc88.com/p-7804379529208.html Spark之MLLib机器学习库:http://blog.csdn.net/johnny_lee/article/details/25656343 Spark之ALS(推荐系统)学习文档:http://spark.apache.org/docs/0.9.0/api/mllib/index.html#org.apache.spark.mllib.recommenda

spark Using MLLib in Scala/Java/Python

Using MLLib in ScalaFollowing code snippets can be executed in spark-shell. Binary ClassificationThe following code snippet illustrates how to load a sample dataset, execute a training algorithm on this training data using a static method in the algo

Jquery的DataTable插件 AJAX 服务器分页的的学习心得(java版)

首先得先引入对应的js 1.jquery.min.js  首先导入 2. File:        jquery.dataTables.min.js Version:     1.9.4     这是我使用的版本 这是  jsp 页面 关键的table  代码 <table id="fuck" class="table table-bordered data-table"> <thead> <tr> <span style=

Spark中的各种action算子操作(java版)

在我看来,Spark编程中的action算子的作用就像一个触发器,用来触发之前的transformation算子.transformation操作具有懒加载的特性,你定义完操作之后并不会立即加载,只有当某个action的算子执行之后,前面所有的transformation算子才会全部执行.常用的action算子如下代码所列:(java版) package cn.spark.study.core; import java.util.Arrays; import java.util.List; im

Spark+hadoop+mllib及相关概念与操作笔记

Spark+hadoop+mllib及相关概念与操作笔记 作者: lw 版本: 0.1 时间: 2016-07-18 1.调研相关注意事项 a) 理解调研 调研的意义在于了解当前情况,挖掘潜在的问题,解决存在的疑问,并得到相应的方案. b) 调研流程 首先明确和梳理现有的疑问是什么,要通过调研解决什么问题,然后再去做调研,发现问题,再解决问题. c) 调研成果 最终需要得到结论与方案,以及详尽的论证理由,让别人信服. d) 书写格式 版本与作者以及时间可以以表格的形式,整齐明了. 结论简洁明了,

回溯算法解八皇后问题(java版)

八皇后问题是学习回溯算法时不得不提的一个问题,用回溯算法解决该问题逻辑比较简单. 下面用java版的回溯算法来解决八皇后问题. 八皇后问题,是一个古老而著名的问题,是回溯算法的典型案例.该问题是国际西洋棋棋手马克斯·贝瑟尔于1848年提出:在8×8格的国际象棋上摆放八个皇后,使其不能互相攻击,即任意两个皇后都不能处于同一行.同一列或同一斜线上,问有多少种摆法. 思路是按行来规定皇后,第一行放第一个皇后,第二行放第二个,然后通过遍历所有列,来判断下一个皇后能否放在该列.直到所有皇后都放完,或者放哪

AKKA文档(java版)

目前我正在翻译AKKA官网文档.翻译:吴京润 译者注:本人正在翻译AKKA官网文档,本篇是文档第一章,欢迎有兴趣的同学加入一起翻译.更多内容请读这里:https://tower.im/projects/ac49db18a6a24ae4b340a5fa22d930dc/lists/ded96c34f7ce4a6bb8b5473f596e1008/show/https://tower.im/projects/ac49db18a6a24ae4b340a5fa22d930dc/todos/640e53d

Java基础学习总结——Java对象的序列化和反序列化

一.序列化和反序列化的概念 把对象转换为字节序列的过程称为对象的序列化. 把字节序列恢复为对象的过程称为对象的反序列化. 对象的序列化主要有两种用途: 1) 把对象的字节序列永久地保存到硬盘上,通常存放在一个文件中: 2) 在网络上传送对象的字节序列. 在很多应用中,需要对某些对象进行序列化,让它们离开内存空间,入住物理硬盘,以便长期保存.比如最常见的是Web服务器中的Session对象,当有 10万用户并发访问,就有可能出现10万个Session对象,内存可能吃不消,于是Web容器就会把一些s

Spark:用Scala和Java实现WordCount

为了在IDEA中编写scala,今天安装配置学习了IDEA集成开发环境.IDEA确实很优秀,学会之后,用起来很顺手.关于如何搭建scala和IDEA开发环境,请看文末的参考资料. 用Scala和Java实现WordCount,其中Java实现的JavaWordCount是spark自带的例子($SPARK_HOME/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java) 1.环境 OS:Red Hat Enterp