spark2.1.1创建Pipeline

Pipeline 为流程,是Spark创建机器学习的一个流程控制的类

下面直接贴出创建的代码,以及整个流程

第一种:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.ml.linalg.Vector

/**
  * Created by xiaopengpeng on 2016/12/20.
  */
class Popeline_ {

}
object Popeline_{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("popeline")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "warehouse/dir")
      .getOrCreate()
    //创建原始数据
    val training = spark.createDataFrame(Seq((0L,"a b c d e spark",1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0)
    )).toDF("id","text","label")
    //创建分词
    val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
    //创建hashingTF
    val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features")
    //创建模型
    val lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01)
    //创建流程
    val pipeline = new Pipeline().setStages(Array(tokenizer,hashingTF,lr))
    //进行模型训练
    val model = pipeline.fit(training)
    //把模型存储到磁盘上
    model.write.overwrite().save("result/model/popeline")
    //把没有训练的模型存储到磁盘上
    pipeline.write.overwrite().save("result/unmodel/poeline")
    //从磁盘上读取
    val sameModel = PipelineModel.load("result/model/popeline")
    //创建测试数据
    val test = spark.createDataFrame(Seq((4L, "spark i j k"),
      (5L, "l m n"),
      (6L, "mapreduce spark"),
      (7L, "apache hadoop")
    )).toDF("id","text")
    //测试的输出
    model.transform(test).select("id","text","probability","prediction").collect()
      .foreach{case Row(id:Long,text:String,prob:Vector,prediction:Double) => println(s"($id,$text) --> prob=$prob, prediction = $prediction")}
    spark.stop()
  }
}

第二种:

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.{Row, SparkSession}

/**
  * Created by xiaopengpeng on 2016/12/20.
  */
class Popeline_2 {

}
object Popeline_2{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("流程")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "warehouse/dir")
      .getOrCreate()

    val training = spark.createDataFrame(Seq((1.0,Vectors.dense(0.0,1.1,0.1),
      (0.0, Vectors.dense(2.0, 1.0, -1.0)),
      (0.0, Vectors.dense(2.0, 1.3, 1.0)),
      (1.0, Vectors.dense(0.0, 1.2, -0.5))
      ))).toDF("label","features")

    val lr = new LogisticRegression()
    println("LogisticRegression parameters:\n"+lr.explainParams()+"\n")

    lr.setMaxIter(10).setRegParam(0.01)
    val model1 = lr.fit(training)
    println("Model 1 was fit using parameters: "+model1.parent.extractParamMap())
    val paramMap = ParamMap(lr.maxIter -> 20)
      .put(lr.maxIter -> 30)//这个会覆盖上一个
      .put(lr.regParam -> 0.1 ,lr.threshold -> 0.55)

    val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  //改变输出列名
    val paramMapCombined = paramMap++paramMap2
    val model2 = lr.fit(training,paramMapCombined)
    println("Model 2 was fit using parameters: "+model2.parent.extractParamMap)

    val test = spark.createDataFrame(Seq((1.0,Vectors.dense(-1.0,1.5,1.3)),
      (0.0, Vectors.dense(3.0, 2.0, -0.1)),
      (1.0, Vectors.dense(0.0, 2.2, -1.5))
    )).toDF("label","features")

    model2.transform(test)
      .select("features","label","myProbability","prediction")
      .collect()
      .foreach{case Row(features:Vector,lable:Double,prob:Vector,prediction:Double) => println(s"($features,$lable) ->prob=$prob,prediction=$prediction")}

  }
}
时间: 2024-07-30 07:17:56

spark2.1.1创建Pipeline的相关文章

创建pipeline view

小白的傻瓜教程,有错请指出~~转载请瞩目出处,谢谢~~~ 一,安装pipeline. 进入jenkins的[系统管理]--[插件管理]页面,选择[可选插件]然后搜索pipeline. 然后选择直接安装,它会将依赖的一些插件也一并安装.安装完成后重启jenkins就可以使用了. 二,使用pipeline. 1.在jenkins主页点击新建视图   2.然后选择[Build Pipeline View] 3.创建完成后进行配置: Select Initial Job:一个构建的job 其他的job

Spark2.x 机器学习视频教程

Spark2.x 机器学习视频教程讲师:轩宇老师链接:https://pan.baidu.com/s/1TcFl6KDjxJS597TxYFSCOA 密码:3t2z 本课程讲解Spark 在机器学习中的应用,并介绍如何从各种公开渠道获取用于机器学习系统的数据.内容涵盖推荐系统.回归.聚类.分类等经典机器学习算法及其实际应用,涵盖使用Spark ML Pipeline API创建和调试机器学习流程,内容更加系统.全面.与时俱进,适合所有欲借助Spark来实现常见机器学习应用的开发者. 本课程主要讲

试用Jenkins 2 的 Pipeline 项目

目前Jenkins最新的版本是2.7,现在试用一下pipeline类型的项目,本来想构建一个1.651版本的Jenkins为例,无奈大陆的网络 访问github不稳定,只好改为简单的工程. 目前有一个代码仓库地址: https://github.com/wangzy23/jenkins-pipeline.git 里面有一个代码文件 jenkins.c , 编译命令是:“gcc jenkins.c -o jenkins” , 运行编译的可执行文件./jenkins ,输出为:“Hello Jenk

Docker的Jenkins Pipeline工作流

原文地址:http://www.youruncloud.com/blog/127.html 分享主题 一个软件产品的开发周期中,尤其是敏捷开发,持续集成和持续部署是必不可少的环节,而随着产品的丰富,模块的增多.随即带来了更加多的问题,各模块间编译环境的准备,编译复杂,耗时增加,还需要专人去负责这个流程.而Jenkins则可以很好的解决这个单一而容易出错的CI(持续集成)工作. Jenkins也存在着编译环境不隔离的问题,虽然可以通过集群的方式解决,可是需要为每种环境甚至是一种语言的不同版本准备多

python使用pipeline读写redis

用了很久的redis了.随着业务的要求越来越高.对redis的读写速度要求也越来越高.正好最近有个需求(需要在秒级取值1000+的数据),如果对于传统的单词取值,循环取值,消耗实在是大,有小伙伴可能考虑到多线程,但这并不是最好的解决方案,这里考虑到了redis特有的功能pipeline管道功能.下面就更大家演示一下pipeline在python环境下的使用情况. 1.插入数据 >>> import redis >>> conn = redis.Redis(host='1

jenkins2.0 hello pipeline

文章来自:http://www.ciandcd.com 文中的代码来自可以从github下载: https://github.com/ciandcd 根据前面的2篇文章,我们已经安装和配置好了jenkins2.0, 包括所有pipeline相关的插件. 本文来个最简单的hello pipeline来看看pipeline到底能干啥,需要哪些新的知识. 1. 创建pipeline job,选择类型为pipeline: 2. 写grovvy脚本来实现pipeline job pipeline的核心就是

Spark2.0机器学习系列之6:GBDT(梯度提升决策树)、GBDT与随机森林差异、参数调试及Scikit代码分析

概念梳理 GBDT的别称 GBDT(Gradient Boost Decision Tree),梯度提升决策树.     GBDT这个算法还有一些其他的名字,比如说MART(Multiple Additive Regression Tree),GBRT(Gradient Boost Regression Tree),Tree Net等,其实它们都是一个东西(参考自wikipedia – Gradient Boosting),发明者是Friedman. 研究GBDT一定要看看Friedman的pa

memsql filesystem pipeline 试用

一些功能类似drill ,比如s3,file ... 创建file pipeline 准备file mkdir -p /opt/db/ touch books.txt 内容如下: The Catcher in the Rye, J.D. Salinger, 1945 Pride and Prejudice, Jane Austen, 1813 Of Mice and Men, John Steinbeck, 1937 Frankenstein, Mary Shelley, 1818 创建表 me

python - scrapy 爬虫框架(创建, 持久化, 去重, 深度, cookie)

## scrapy 依赖 twisted  - twisted 是一个基于事件循环的 异步非阻塞 框架/模块 ##  项目的创建  1. 创建 project scrapy startproject 项目名称 项目名称(项目结构) - spiders # 爬虫文件 - q.py - w.py - items.py # 持久化 - pipelines # 持久化 - middlewares.py # 中间件 - settings.py # 配置文件(爬虫) scrapy.cfg # 配置文件(部署