Spark ML Pipeline简介

Spark ML Pipeline基于DataFrame构建了一套High-level API,我们可以使用MLPipeline构建机器学习应用,它能够将一个机器学习应用的多个处理过程组织起来,通过在代码实现的级别管理好每一个处理步骤之间的先后运行关系,极大地简化了开发机器学习应用的难度。
        Spark ML Pipeline使用DataFrame作为机器学习输入输出数据集的抽象。DataFrame来自Spark SQL,表示对数据集的一种特殊抽象,它也是Dataset(它是Spark 1.6引入的表示分布式数据集的抽象接口),但是DataFrame通过为数据集中每行数据的每列指定列名的方式来组织Dataset,类似于关系数据库中的表,同时还在底层处理做了非常多的优化。DataFrame可以基于不同的数据源进行构建,比如结构化文件、Hive表、数据库、RDD等。或者更直白一点表达什么是DataFrame,可以认为它等价于Dataset[Row],表示DataFrame是一个Row类型数据对象的Dataset。
       机器学习可以被应用于各种数据类型,例如向量、文本、图片、结构化数据。Spark ML API采用DataFrame的理由是,来自Spark SQL中的DataFrame接口的抽象,可以支持非常广泛的类型,而且表达非常直观,便于在Spark中进行处理。所以说,DataFrame是Spark ML最基础的对数据集的抽象,所有各种ML Pipeline组件都会基于DataFrame构建更加丰富、复杂的数据处理逻辑。
      Spark ML Pipeline主要包含2个核心的数据处理组件:Transformer、Estimator,其中它们都是Pipeline中PipelineStage的子类,另外一些抽象,如Model、Predictor、Classifier、Regressor等都是基于这两个核心组件衍生出来,比如,Model是一个Transformer,Predictor是一个Estimator,它们的关系如下类图所示:

基于上图,我们对它们进行详细的说明,如下所示:

  • Transformer

Transformer对机器学习中要处理的数据集进行转换操作,类似于Spark中对RDD进行的Transformation操作(对一个输入RDD转换处理后生成一个新的RDD),Transformer是对DataFrame进行转换。我们可以从Transformer类的代码抽象定义,来看一下它定义的几个参数不同的transform方法,如下所示:

package org.apache.spark.ml

@DeveloperApi
abstract class Transformer extends PipelineStage {

  @Since("2.0.0")
  @varargs
  def transform(
      dataset: Dataset[_],
      firstParamPair: ParamPair[_],
      otherParamPairs: ParamPair[_]*): DataFrame = {
    val map = new ParamMap()
      .put(firstParamPair)
      .put(otherParamPairs: _*)
    transform(dataset, map)
  }

  @Since("2.0.0")
  def transform(dataset: Dataset[_], paramMap: ParamMap): DataFrame = {
    this.copy(paramMap).transform(dataset)
  }

  @Since("2.0.0")
  def transform(dataset: Dataset[_]): DataFrame

  override def copy(extra: ParamMap): Transformer
}

上面对应的多个transform方法,都会输入一个Dataset[_],经过转换处理后输出一个DataFrame,实际上你可以通过查看DataFrame的定义,其实它就是一个Dataset,如下所示:

type DataFrame = Dataset[Row]

Transformer主要抽象了两类操作:一类是对特征进行转换,它可能会从一个DataFrame中读取某列数据,然后通过map算法将该列数据转换为新的列数据,比如,输入一个DataFrame,将输入的原始一列文本数据,转换成一列特征向量,最后输出的数据还是一个DataFrame,对该列数据转换处理后还映射到输入时的列名(通过该列名可以操作该列数据)。
下面,我们看一下,Spark MLLib中实现的Transformer类继承关系,如下类图所示:

  • Estimator

Estimator用来训练模型,它的输入是一个DataFrame,输出是一个Model,Model是Spark ML中对机器学习模型的抽象和定义,Model其实是一个Transformer。一个机器学习算法是基于一个数据集进行训练的,Estimator对基于该训练集的机器学习算法进行了抽象。所以它的输入是一个数据集DataFrame,经过训练最终得到一个模型Model。
Estimator类定了fit方法来实现对模型的训练,类的代码如下所示:

package org.apache.spark.ml

@DeveloperApi
abstract class Estimator[M <: Model[M]] extends PipelineStage {

  @Since("2.0.0")
  @varargs
  def fit(dataset: Dataset[_], firstParamPair: ParamPair[_], otherParamPairs: ParamPair[_]*): M = {
    val map = new ParamMap()
      .put(firstParamPair)
      .put(otherParamPairs: _*)
    fit(dataset, map)
  }

  @Since("2.0.0")
  def fit(dataset: Dataset[_], paramMap: ParamMap): M = {
    copy(paramMap).fit(dataset)
  }

  @Since("2.0.0")
  def fit(dataset: Dataset[_]): M

  @Since("2.0.0")
  def fit(dataset: Dataset[_], paramMaps: Array[ParamMap]): Seq[M] = {
    paramMaps.map(fit(dataset, _))
  }

  override def copy(extra: ParamMap): Estimator[M]
}

通过上面代码可以看到,Estimator调用fit方法以后,得到一个Model,也就是Transformer,一个Transformer又可以对输入的DataFrame执行变换操作。
下面,我们看一下,Spark MLLib中实现的Estimator类,如下类图所示:

  • PipelineStage

PipelineStage是构建一个Pipeline的基本元素,它或者是一个Transformer,或者是一个Estimator。

  • Pipeline

Pipeline实际上是Estimator的实现类,一个Pipeline是基于多个PipelineStage构建而成的DAG图,简单一点可以使用线性的PipelineStage序列来完成机器学习应用的构建,当然也可以构建相对复杂一些的PipelineStage DAG图。
调用Pipeline的fit方法,会生成一个PipelineModel,它是Model的子类,所以也就是一个Transformer。在训练过程中,Pipeline中的多个PipelineStage是运行在训练数据集上的,最后生成了一个Model。我们也可以看到,训练模型过程中,处于最后面的PipelineStage应该是一个或多个连续的Estimator,因为只有Estimator运行后才会生成Model。
        接着,就是Pipeline中处于训练阶段和测试阶段之间,比较重要的一个PipelineStage了:PipelineModel,它起了承上启下的作用,调用PipelineModel的transform方法,按照和训练阶段类似的数据处理(转换)流程,经过相同的各个PipelineState对数据集进行变换,最后将训练阶段生成模型作用在测试数据集上,从而实现最终的预测目的。
基于Spark ML Pipeline,可以很容易地构建这种线性Pipeline,我们可以看到一个机器学习应用构建过程中(准备数据、训练模型、评估模型)的各个处理过程,可以通过一个同一个Pipeline API进行线性组合,非常直观、容易管理。

Spark ML Pipeline实践

这里,我们直接根据Spark ML Pipeline官方文档给出的示例——基于Logistic回归实现文本分类,来详细说明通过Spark ML Pipeline API构建机器学习应用,以及具体如何使用它。官网给出的这个例子非常直观,后续有关在实际业务场景中的实践,我们会单独在另一篇文章中进行分享。

  • 场景描述

这个示例:
在训练阶段,需要根据给定的训练文本行数据集,将每个单词分离出来;然后根据得到的单词,生成特征向量;最后基于特征向量,选择Logistic回归算法,进行训练学习生成Logistic模型。
在测试阶段,需要按照如上相同的方式去处理给定的测试数据集,基于训练阶段得到的模型,进行预测。

  • 训练阶段

训练阶段各个数据处理的步骤,如下图所示:

上图中,蓝色方框表示的都是Transformer,红色方框表示Estimator。
在训练阶段,通过Pipeline运行时,Tokenizer和HashingTF都会将输入的DataFrame进行转换,生成新的DataFrame;LogisticRegression是一个Estimator,当调用LogisticRegression的fit方法时,会生成一个LogisticRegressionModel,它是一个Transformer,可以在测试阶段使用。

  • 测试阶段

上面的过程都是在调用Pipeline的fit方法时进行处理的,最后会生成一个PipelineModel,它是一个Transformer,会被用于测试阶段。测试阶段运行始于该PipelineModel,具体处理流程如下图所示:

PipelineModel作为一个Transformer,首先也会对输入的测试数据集执行转换操作,对比训练阶段的处理流程,可以看到,在训练阶段的Estimator都变成了Transformer,因为我们在测试阶段的输出就是一个结果集DataFrame,而不需要训练阶段生成Model了。

  • 示例代码

首先,准备模拟的训练数据集,代码如下所示:

val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

模拟的训练数据集中,有3个字段,分别为ID、文本内容、标签。在实际应用中,我们应该是从指定的文件系统中去读取数据,如HDFS,只需要根据需要修改即可。
其次,创建一个Pipeline对象,同时设置对应的多个顺序执行的PipelineStage,代码如下所示:

val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")

val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr)) // 包含3个PipelineStage

接着,就可以基于训练数据集进行训练操作了,代码如下所示:

val model = pipeline.fit(training)

调用Pipeline的fit方法生成了一个Model,我们可以根据实际情况,选择是否将生成模型进行保存(以便后续重新加载使用模型),如下所示:

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

然后,创建一个模拟测试数据集,用来测试前面训练生成的模型,代码如下所示:

val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

测试数据集中,标签(Label)都是未知的,通过将前面生成的模型作用在该测试数据集上,就会预测生成对应的标签数据,代码如下所示:

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

这样就能够基于预测的结果,验证分类模型的准确性。
最后,可以将生成模型用于实际应用场景中,完成需要的功能。

有关更多使用Spark ML Pipeline的例子,可以参考Spark发行包中,examples里面src/main/scala/ml下面的很多示例代码,非常好的学习资源。

原文地址:https://www.cnblogs.com/itboys/p/8232779.html

时间: 2024-07-30 18:35:08

Spark ML Pipeline简介的相关文章

Spark ml pipeline - transforming feature - StringIndexer

在spark ml pipeline的特征提取和转换阶段,有一种transformer可以将机器学习训练数据中常见的字符串列(例如表示各种分类)转换为数值索引列,以便于计算机处理.它就是StringIndexer.它支持的索引范围为[0, numLabels)(不支持的会编码为numLabels),并且支持四种排序方式,frequencyDesc(频率最高的索引赋值为0),frequencyAsc,alphabetDesc,alphabetAsc. 假设我们有dataframe id | cat

spark ml 的例子

一.关于spark ml pipeline与机器学习 一个典型的机器学习构建包含若干个过程 1.源数据ETL 2.数据预处理 3.特征选取 4.模型训练与验证 以上四个步骤可以抽象为一个包括多个步骤的流水线式工作,从数据收集开始至输出我们需要的最终结果.因此,对以上多个步骤.进行抽象建模,简化为流水线式工作流程则存在着可行性,对利用spark进行机器学习的用户来说,流水线式机器学习比单个步骤独立建模更加高效.易用. 受 scikit-learn 项目的启发,并且总结了MLlib在处理复杂机器学习

Spark ML机器学习库评估指标示例

本文主要对 Spark ML库下模型评估指标的讲解,以下代码均以Jupyter Notebook进行讲解,Spark版本为2.4.5.模型评估指标位于包org.apache.spark.ml.evaluation下. 模型评估指标是指测试集的评估指标,而不是训练集的评估指标 1.回归评估指标 RegressionEvaluator Evaluator for regression, which expects two input columns: prediction and label. 评估

Spark ML下实现的多分类adaboost+naivebayes算法在文本分类上的应用

1. Naive Bayes算法 朴素贝叶斯算法算是生成模型中一个最经典的分类算法之一了,常用的有Bernoulli和Multinomial两种.在文本分类上经常会用到这两种方法.在词袋模型中,对于一篇文档$d$中出现的词$w_0,w_1,...,w_n$, 这篇文章被分类为$c$的概率为$$p(c|w_0,w_1,...,w_n) = \frac{p(c,w_0,w_1,...,w_n)}{p(w_0,w_1,...,w_n)} = \frac{p(w_0,w_1,...,w_n|c)*p(c

Extending sparklyr to Compute Cost for K-means on YARN Cluster with Spark ML Library

Machine and statistical learning wizards are becoming more eager to perform analysis with Spark MLlibrary if this is only possible. It’s trendy, posh, spicy and gives the feeling of doing state of the art machine learning and being up to date with th

Spark入门实战系列--1.Spark及其生态圈简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark St

Spark修炼之道(进阶篇)——Spark入门到精通:第二节 Hadoop、Spark生成圈简介

本节主要内容 Hadoop生态圈 Spark生态圈 1. Hadoop生态圈 原文地址:http://os.51cto.com/art/201508/487936_all.htm#rd?sukey=a805c0b270074a064cd1c1c9a73c1dcc953928bfe4a56cc94d6f67793fa02b3b983df6df92dc418df5a1083411b53325 下图给出了Hadoop生态圈中的重要产品: 图片来源:http://www.36dsj.com/archiv

Spark及其生态系统简介总结

Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算 Spark不仅支持Scala编写应用程序,而且支持Java和Python等语言进行编写,特别是Scala是一种高效.可拓展的语言,能够用简洁的代码处理较为复杂的处理工作. Spark生态圈即BDAS===> Spark具有很强的适应性,能够读取HDFS.Cassandra.HBase.S3和Techyon为持久层读写原生数据,能够以Mesos.YARN和自身携带的Standalone作为资源管理器调度job,来完成Spark应用程序的

Webx框架:Pipeline简介

Pipeline.它的含义就是管道,一个管道可以安装很多的阀门,可以有很多分支.它用于控制页面的处理流程.它需要定义在pipeline.xml文件中,该文件中的每个标签都是一个阀门.该文件中可以放一些简单的控制语句.在项目中,下面这样的管道配置就已经够用了. <services:pipeline xmlns="http://www.alibaba.com/schema/services/pipeline/valves"> <!-- 初始化 turbine rundat