Spark DecisionTreeClassifier

1、概述

决策树及树集(算法)是用于机器学习任务的分类和回归的流行方法。决策树被广泛使用,因为它们易于解释,处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性和特征交互。树集分类算法(例如随机森林和boosting)在分类和回归任务中表现最佳。
spark.ml实现使用连续和分类特征,支持用于二元分类和多类分类以及用于回归的决策树。该实现按行对数据进行分区,从而允许对数百万甚至数十亿个实例进行分布式训练。

2、输入和输出

所有输出列都是可选的;要排除输出列,请将其对应的Param设置为空字符串。

Input Columns

Param name Type(s) Default Description
labelCol Double "label" Label to predict
featuresCol Vector "features" Feature vector

Output Columns

Param name Type(s) Default Description Notes
predictionCol Double "prediction" Predicted label  
rawPredictionCol Vector "rawPrediction" Vector of length # classes, with the counts of training instance labels at the tree node which makes the prediction Classification only
probabilityCol Vector "probability" Vector of length # classes equal to rawPrediction normalized to a multinomial distribution Classification only
varianceCol Double   The biased sample variance of prediction Regression only

3、code

package com.home.spark.ml

import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.sql.{Dataset, Row, SparkSession}

object Ex_DecisionTree {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf(true).setMaster("local[2]").setAppName("spark ml")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    //rdd转换成df或者ds需要SparkSession实例的隐式转换
    //导入隐式转换,注意这里的spark不是包名,而是SparkSession的对象名
    import spark.implicits._

    val data = spark.sparkContext.textFile("input/iris.data.txt")
      .map(_.split(","))
      .map(a => Iris(
        Vectors.dense(a(0).toDouble, a(1).toDouble, a(2).toDouble, a(3).toDouble),
        a(4))
      ).toDF()

    data.createOrReplaceTempView("iris")
    val df = spark.sql("select * from iris")
    df.map(r => r(1) + " : " + r(0)).collect().take(10).foreach(println)

    ////对特征列和标签列进行索引转换
    val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
    val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures")
      .setMaxCategories(4).fit(df)

    //决策树分类器
    val dtClassifier = new DecisionTreeClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")

    //将预测的类别重新转成字符型
    val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictionLabel").setLabels(labelIndexer.labels)

    //将原数据集拆分成两个部分,一部分用于训练,一部分用于测试
    val Array(trainingData, testData): Array[Dataset[Row]] = df.randomSplit(Array(0.7,0.3))

    //建立工作流
    val pipeline = new Pipeline().setStages(Array(labelIndexer,featureIndexer,dtClassifier,labelConverter))

    //生成训练模型
    val modelDecisionTreeClassifier = pipeline.fit(trainingData)

    //预测
    val result = modelDecisionTreeClassifier.transform(testData)

    result.show(150,false)

    /**
      * 样本分为:正类样本和负类样本。
      * TP:被分类器正确分类的正类样本数。
      * TN: 被分类器正确分类的负类样本数。
      * FP: 被分类器错误分类的正类样本数。(本来是负,被预测为正) ---------->正
      * FN: 被分类器错误分类的负类样本数。 (本来是正, 被预测为负) ---------->负
      *
      * 准确率(Accuracy ACC)
      * 总样本数=TP+TN+FP+FN
      * ACC=(TP+TN)/(总样本数)
      * 该评价指标主要针对分类均匀的数据集。
      */
    val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
      .setMetricName("accuracy")
    val accuracy: Double = evaluator.evaluate(result)

    println("Accuracy = " + accuracy)

    /**
      * 精确率(Precision 查准率)
      * Precision = TP / (TP+ FP) 准确率,表示模型预测为正样本的样本中真正为正的比例
      */
    val evaluator2 = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
      .setMetricName("weightedPrecision")
    val weightedPrecision: Double = evaluator2.evaluate(result)

    println("weightedPrecision = " + weightedPrecision)

    /**
      * 召回率(查全率)
      * Recall = TP /(TP + FN) 召回率,表示模型准确预测为正样本的数量占所有正样本数量的比例
      */
    val evaluator3 = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
      .setMetricName("weightedRecall")
    val weightedRecall: Double = evaluator3.evaluate(result)

    println("weightedRecall = " + weightedRecall)

    val treeModel = modelDecisionTreeClassifier.stages(2).asInstanceOf[DecisionTreeClassificationModel]
    println("Learned classification tree model:\n" + treeModel.toDebugString)

    //决策树回归器
    val dtRegressor = new DecisionTreeRegressor().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures")

    val pipelineRegressor = new Pipeline()
      .setStages(Array(labelIndexer,featureIndexer,dtRegressor,labelConverter))

    val modelRegressor = pipelineRegressor.fit(trainingData)
    val result2 = modelRegressor.transform(testData)

    result2.show(150,false)

    //评估
    val regressionEvaluator = new RegressionEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction")
        .setMetricName("rmse")
    val rmse = regressionEvaluator.evaluate(result2)
    println("rmse = " + rmse)
    spark.stop()
  }
}

case class Iris(features: Vector, label: String)

原文地址:https://www.cnblogs.com/asker009/p/12403407.html

时间: 2024-10-24 09:40:16

Spark DecisionTreeClassifier的相关文章

决策树的几种类型差异及Spark 2.0-MLlib、Scikit代码分析

概述 分类决策树模型是一种描述对实例进行分类的树形结构. 决策树可以看为一个if-then规则集合,具有"互斥完备"性质 .决策树基本上都是 采用的是贪心(即非回溯)的算法,自顶向下递归分治构造. 生成决策树一般包含三个步骤: 特征选择 决策树生成 剪枝 决策树算法种类 决策树主要有 ID3, C4.5, C5.0 and CART几种, ID3, C4.5, 和CART实际都采用的是贪心(即非回溯)的算法,自顶向下递归分治构造.对于每一个决策要求分成的组之间的"差异&quo

基于Apache Spark机器学习的客户流失预测

流失预测是个重要的业务,通过预测哪些客户可能取消对服务的订阅来最大限度地减少客户流失.虽然最初在电信行业使用,但它已经成为银行,互联网服务提供商,保险公司和其他垂直行业的通用业务. 预测过程是大规模数据的驱动,并且经常结合使用先进的机器学习技术.在本篇文章中,我们将看到通常使用的哪些类型客户数据,对数据进行一些初步分析,并生成流失预测模型 - 所有这些都是通过Spark及其机器学习框架来完成的. 使用数据科学更好地理解和预测客户行为是一个迭代过程,其中涉及: 1.发现和模型创建: 分析历史数据.

Spark机器学习基础三

监督学习 0.线性回归(加L1.L2正则化) from __future__ import print_function from pyspark.ml.regression import LinearRegression from pyspark.sql import SparkSession spark = SparkSession .builder .appName("LinearRegressionWithElasticNet") .getOrCreate() # 加载数据 t

Spark ML机器学习库评估指标示例

本文主要对 Spark ML库下模型评估指标的讲解,以下代码均以Jupyter Notebook进行讲解,Spark版本为2.4.5.模型评估指标位于包org.apache.spark.ml.evaluation下. 模型评估指标是指测试集的评估指标,而不是训练集的评估指标 1.回归评估指标 RegressionEvaluator Evaluator for regression, which expects two input columns: prediction and label. 评估

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand

Spark 整合hive 实现数据的读取输出

实验环境: linux centOS 6.7 vmware虚拟机 spark-1.5.1-bin-hadoop-2.1.0 apache-hive-1.2.1 eclipse 或IntelJIDea 本次使用eclipse. 代码: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import o

spark 教程三 spark Map filter flatMap union distinct intersection操作

RDD的创建 spark 所有的操作都围绕着弹性分布式数据集(RDD)进行,这是一个有容错机制的并可以被并行操作的元素集合,具有只读.分区.容错.高效.无需物化.可以缓存.RDD依赖等特征 RDD的创建基础RDD 1.并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行运算 var sc=new SparkContext(conf) var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)); rd