Scala - Spark Lambda“goesto“ => 分析

 1 /// 定义一个函数AddNoise,参数分别为rdd,Fraction。其中rdd为(BreezeDenseMatrix,  BreezeDenseMatrix)元组构成的RDD。Fraction为一个Double。返回一个(BreezeDenseMatrix,  BreezeDenseMatrix)元组构成的RDD。
 2 def AddNoise(rdd: RDD[(BDM[Double], BDM[Double])], Fraction: Double): RDD[(BDM[Double], BDM[Double])] = {
 3 /// 定义返回值临时储存,它由rdd中每个元素实现f=>STH构成
 4     val addNoise = rdd.map { f =>
 5 /// f中第二部分数据,为一个BreezeDenseMatrix
 6       val features = f._2
 7 /// 生成一个BreezeDenseMatrix,由随机数填充
 8       val a = BDM.rand[Double](features.rows, features.cols)
 9 /// 定义一个BreezeDenseMatrix a1,其中元素为bool型。如果a中对应元素大于Fraction,则为true。否则为false。
10       val a1 = a :>= Fraction
11 /// 定义一个RDD d1,由a1中元素经过如下运算后填充:如果当前元素为true,则为1.0,否则为0。
12       val d1 = a1.data.map { f => if (f == true) 1.0 else 0.0 }
13 /// 新建一个BreezeDenseMatrix,分别由于features相应位置对应的d1元素填充。
14       val a2 = new BDM(features.rows, features.cols, d1)
15 /// :*表示各元素依次相乘。得到BreezeDenseMatrix。
16       val features2 = features :* a2
17 /// 返回(BreezeDenseMatrix,BreezeDenseMatrix)构成的RDD。作为函数返回值,进而更新addNoise。
18       (f._1, features2)
19     }
20 /// 返回运算后的结果,作为函数返回值。
21     addNoise
22   }

这段代码是用Scala写的运行与Spark上的,NN算法中AddNoise实现。用来完成DenoiseAutoencoder的随机噪声添加。

思想很简单,但却把Spark的map操作和Scala的lambda运算符的用法展现的淋漓尽致,值得学习。

代码来自sunbow0

个人分析,有误请指正。

时间: 2024-09-28 18:10:18

Scala - Spark Lambda“goesto“ => 分析的相关文章

打造基于hadoop的网站日志分析系统(5)之spark在日志分析系统里的简单应用

1.下载spark和运行 wget http://apache.fayea.com/apache-mirror/spark/spark-1.0.0/spark-1.0.0-bin-hadoop2.tgz 我这里下载的是1.0.0版,由于我们只是测试spark的用法所以不需要配置spark集群,只需把下好的文件解压,进入bin/文件夹. spark支持scala,java和python. scala和java输入命令:./spark-shell python 输入命令 ./pyspark 进入控制

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query

/** Spark SQL源代码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的. 那么基于以上存储结构,我们查询cache在jvm内的数据又是怎样查询的,本文将揭示查询In-Memory Data的方式. 一.引子 本例使用hive console里查询cache后的src表. select value from src 当我们将src表cache到了内存后,再次查询src,能够通过analyzed运行计划来观察内部调

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

/** Spark SQL源代码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源代码分析之Physical Plan.本文将介绍Physical Plan的toRDD的详细实现细节: 我们都知道一段sql,真正的运行是当你调用它的collect()方法才会运行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包括4种操作类型,即BasicOperator基本类型

spark JavaDirectKafkaWordCount 例子分析

spark  JavaDirectKafkaWordCount 例子分析: 1. KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet );后面参数意思: 源码是这样 @param ssc StreamingContext object * @param kafkaParams Kafka <

Spark MaprLab-Auction Data分析

一.环境安装 1.安装hadoop http://my.oschina.net/u/204498/blog/519789 2.安装spark 3.启动hadoop 4.启动spark 二. 1.数据准备 从MAPR官网上下载数据DEV360DATA.zip并上传到server上. [[email protected] spark-1.5.1-bin-hadoop2.6]$ pwd /home/hadoop/spark-1.5.1-bin-hadoop2.6 [[email protected] 

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反