Pipeline 为流程,是Spark创建机器学习的一个流程控制的类
下面直接贴出创建的代码,以及整个流程
第一种:
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.ml.linalg.Vector /** * Created by xiaopengpeng on 2016/12/20. */ class Popeline_ { } object Popeline_{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("popeline") .master("local[*]") .config("spark.sql.warehouse.dir", "warehouse/dir") .getOrCreate() //创建原始数据 val training = spark.createDataFrame(Seq((0L,"a b c d e spark",1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0) )).toDF("id","text","label") //创建分词 val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") //创建hashingTF val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") //创建模型 val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01) //创建流程 val pipeline = new Pipeline().setStages(Array(tokenizer,hashingTF,lr)) //进行模型训练 val model = pipeline.fit(training) //把模型存储到磁盘上 model.write.overwrite().save("result/model/popeline") //把没有训练的模型存储到磁盘上 pipeline.write.overwrite().save("result/unmodel/poeline") //从磁盘上读取 val sameModel = PipelineModel.load("result/model/popeline") //创建测试数据 val test = spark.createDataFrame(Seq((4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id","text") //测试的输出 model.transform(test).select("id","text","probability","prediction").collect() .foreach{case Row(id:Long,text:String,prob:Vector,prediction:Double) => println(s"($id,$text) --> prob=$prob, prediction = $prediction")} spark.stop() } }
第二种:
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.{Row, SparkSession} /** * Created by xiaopengpeng on 2016/12/20. */ class Popeline_2 { } object Popeline_2{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("流程") .master("local[*]") .config("spark.sql.warehouse.dir", "warehouse/dir") .getOrCreate() val training = spark.createDataFrame(Seq((1.0,Vectors.dense(0.0,1.1,0.1), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)) ))).toDF("label","features") val lr = new LogisticRegression() println("LogisticRegression parameters:\n"+lr.explainParams()+"\n") lr.setMaxIter(10).setRegParam(0.01) val model1 = lr.fit(training) println("Model 1 was fit using parameters: "+model1.parent.extractParamMap()) val paramMap = ParamMap(lr.maxIter -> 20) .put(lr.maxIter -> 30)//这个会覆盖上一个 .put(lr.regParam -> 0.1 ,lr.threshold -> 0.55) val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") //改变输出列名 val paramMapCombined = paramMap++paramMap2 val model2 = lr.fit(training,paramMapCombined) println("Model 2 was fit using parameters: "+model2.parent.extractParamMap) val test = spark.createDataFrame(Seq((1.0,Vectors.dense(-1.0,1.5,1.3)), (0.0, Vectors.dense(3.0, 2.0, -0.1)), (1.0, Vectors.dense(0.0, 2.2, -1.5)) )).toDF("label","features") model2.transform(test) .select("features","label","myProbability","prediction") .collect() .foreach{case Row(features:Vector,lable:Double,prob:Vector,prediction:Double) => println(s"($features,$lable) ->prob=$prob,prediction=$prediction")} } }
时间: 2024-07-30 07:17:56