Spark MLlib之水塘抽样算法(Reservoir Sampling)

1.理解
  问题定义可以简化如下:在不知道文件总行数的情况下,如何从文件中随机的抽取一行?

  首先想到的是我们做过类似的题目吗?当然,在知道文件行数的情况下,我们可以很容易的用C运行库的rand函数随机的获得一个行数,从而随机的取出一行,但是,当前的情况是不知道行数,这样如何求呢?我们需要一个概念来帮助我们做出猜想,来使得对每一行取出的概率相等,也即随机。这个概念即蓄水池抽样(Reservoir Sampling)。

水塘抽样算法(Reservoir Sampling)思想:
  在序列流中取一个数,如何确保随机性,即取出某个数据的概率为:1/(已读取数据个数)

  假设已经读取n个数,现在保留的数是Ax,取到Ax的概率为(1/n)。

  对于第n+1个数An+1,以1/(n+1)的概率取An+1,否则仍然取Ax。依次类推,可以保证取到数据的随机性。

  数学归纳法证明如下:

    当n=1时,显然,取A1。取A1的概率为1/1。

  假设当n=k时,取到的数据Ax。取Ax的概率为1/k。

   当n=k+1时,以1/(k+1)的概率取An+1,否则仍然取Ax。

 (1)如果取Ak+1,则概率为1/(k+1);

 (2)如果仍然取Ax,则概率为(1/k)*(k/(k+1))=1/(k+1)

  所以,对于之后的第n+1个数An+1,以1/(n+1)的概率取An+1,否则仍然取Ax。依次类推,可以保证取到数据的随机性。

在序列流中取k个数,如何确保随机性,即取出某个数据的概率为:k/(已读取数据个数)

  建立一个数组,将序列流里的前k个数,保存在数组中。(也就是所谓的”蓄水池”)

  对于第n个数An,以k/n的概率取An并以1/k的概率随机替换“蓄水池”中的某个元素;否则“蓄水池”数组不变。依次类推,可以保证取到数据的随机性。

  数学归纳法证明如下:

    当n=k是,显然“蓄水池”中任何一个数都满足,保留这个数的概率为k/k。

    假设当n=m(m>k)时,“蓄水池”中任何一个数都满足,保留这个数的概率为k/m。
    当n=m+1时,以k/(m+1)的概率取An,并以1/k的概率,随机替换“蓄水池”中的某个元素,否则“蓄水池”数组不变。则数组中保留下来的数的概率为:

  所以,对于第n个数An,以k/n的概率取An并以1/k的概率随机替换“蓄水池”中的某个元素;否则“蓄水池”数组不变。依次类推,可以保证取到数据的随机性。

Spark中的水塘抽样算法(Reservoir Sampling)
  spark的Partitioner子类RangePartitioner中有用到Reservoir Sampling抽样算法(org.apache.spark.RangePartitioner#sketch).

spark的util中有reservoirSampleAndCount方法(org.apache.spark.util.random.SamplingUtils#reservoirSampleAndCount)

源码为:

/**
   * Reservoir sampling implementation that also returns the input size.
   *
   * @param input input size
   * @param k reservoir size
   * @param seed random seed
   * @return (samples, input size)
   */
  def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Int) = {
    val reservoir = new Array[T](k)
    // Put the first k elements in the reservoir.
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // If we have consumed all the elements, return them. Otherwise do the replacement.
    if (i < k) {
      // If input size < k, trim the array to return only an array of input size.
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      // If input size > k, continue the sampling process.
      val rand = new XORShiftRandom(seed)
      while (input.hasNext) {
        val item = input.next()
        val replacementIndex = rand.nextInt(i)
        if (replacementIndex < k) {
          reservoir(replacementIndex) = item
        }
        i += 1
      }
      (reservoir, i)
    }
  }

代码实现思路比较简单,新建一个k大小的数组reservoir,如果元数据中数据少于k,直接返回原数据数组和原数据个数。如果大于,则对接下来的元素进行比较,随机生成一个数i,如果这个数小于k,则替换数组reservoir中第i个数,直至没有元素,则返回reservoir的copy数组。

2.代码:
测试org.apache.spark.util.random.SamplingUtils$#reservoirSampleAndCount方法:

package org.apache.spark.sourceCode.partitionerLearning

import org.apache.spark.util.SparkLearningFunSuite
import org.apache.spark.util.random.SamplingUtils

import scala.util.Random

class reservoirSampleAndCountSuite extends SparkLearningFunSuite {
  test("reservoirSampleAndCount") {
    val input = Seq.fill(100)(Random.nextInt())
    val (sample1, count1) = SamplingUtils.reservoirSampleAndCount(input.iterator, 150)
    assert(count1 === 100)
    assert(input === sample1.toSeq)

    // input size == k
    val (sample2, count2) = SamplingUtils.reservoirSampleAndCount(input.iterator, 100)
    assert(count2 === 100)
    assert(input === sample2.toSeq)

    // input size > k
    val (sample3, count3) = SamplingUtils.reservoirSampleAndCount(input.iterator, 10)
    assert(count3 === 100)
    assert(sample3.length === 10)
    println(input)
    sample3.foreach{each=>print(each+" ")}
  }

}

3.结果:

List(1287104639, 547232730, -595310393, -1264894486, 427750044, -776002403, 32230947, -1390386390, 484259687, 774711013, -1989325813, -957970416, 945685455, -1322730587, -1919655222, 1642426087, -489524599, -1070401860, -1454008456, -1882431453, -1843815884, -1987533758, -854529853, 879991257, -864077378, 478381860, 111307761, 1504756336, -1892792571, -1413976846, -848218587, -101494119, 1592476609, 247606007, 1269634528, 568928892, 488930464, -2145986422, 1643110602, 280675891, -878405966, 1799740067, 981424562, -1552824965, -1760162041, -288189264, -373755181, -2112636248, -2108911467, -1815555415, 302051417, 254178521, -1137490849, 426066017, -819810525, 1408383341, 1183678420, 234717727, 1470632905, 271163573, -22448780, 486064749, 378168799, -1444541974, 419089337, 1700972847, 1291787131, 644012641, -1618133452, 313585654, 658987252, 869334013, -811750155, -1561229418, 814819564, -197177628, 1051344432, 1746109173, 358985873, -265551244, 1362130460, -1635943643, 168813599, -669120136, -1084593890, -150445899, 387678120, 1994726806, 71986215, 1323527748, 700729367, 219285004, -1513691303, -97767338, 2099894386, -652208741, 704524016, 123647594, -1281589410, -1713105982)
-197177628 -22448780 478381860 -1137490849 219285004 168813599 1269634528 -1454008456 658987252 378168799 

原文地址:https://www.cnblogs.com/itboys/p/9825169.html

时间: 2024-10-08 22:48:00

Spark MLlib之水塘抽样算法(Reservoir Sampling)的相关文章

Spark MLlib Linear Regression线性回归算法

1.Spark MLlib Linear Regression线性回归算法 1.1 线性回归算法 1.1.1 基础理论 在统计学中,线性回归(Linear Regression)是利用称为线性回归方程的最小平方函数对一个或多个自变量和因变量之间关系进行建模的一种回归分析.这种函数是一个或多个称为回归系数的模型参数的线性组合. 回归分析中,只包括一个自变量和一个因变量,且二者的关系可用一条直线近似表示,这种回归分析称为一元线性回归分析.如果回归分析中包括两个或两个以上的自变量,且因变量和自变量之间

孙其功陪你学之——Spark MLlib之K-Means聚类算法

看到 程序员的自我修养 – SelfUp.cn 里面有Spark MLlib之K-Means聚类算法. 但是是java 语言的,于是我按照例程用Scala写了一个,分享在此. 由于在学习 spark mllib 但是如此详细的资料真的很难找,在此分享. 测试数据 0.0 0.0 0.0 0.1 0.1 0.1 0.2 0.2 0.2 9.0 9.0 9.0 9.1 9.1 9.1 9.2 9.2 9.2 15.1 15.1 15.1 18.0 17.0 19.0 20.0 21.0 22.0 p

spark.mllib源代码阅读-优化算法1-Gradient

Spark中定义的损失函数及梯度,在看源代码之前,先回想一下机器学习中定义了哪些损失函数,毕竟梯度求解是为优化求解损失函数服务的. 监督学习问题是在如果空间F中选取模型f作为决策函数.对于给定的输入X,由f(X)给出对应的输出Y,这个输出的预測值f(X)与真实值Y可能一致也可能不一致,用一个损失函数(lossfunction)或代价函数(cost function)来度量预測错误的程度.损失函数是f(X)和Y的非负实值函数,记作L(Y, f(X)). 统计学习中经常使用的损失函数有下面几种: (

Spark MLlib算法调用展示平台及其实现过程

1. 软件版本: IDE:Intellij IDEA 14,Java:1.7,Scala:2.10.6:Tomcat:7,CDH:5.8.0: Spark:1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0 : Hadoop:hadoop2.6.0-cdh5.8.0:(使用的是CDH提供的虚拟机) 2. 工程下载及部署: Scala封装Spark算法工程:https://github.com/fansy1990/Spark_MLlib_Algorithm_1.6.0.git

Reservoir Sampling - 蓄水池抽样

问题起源于编程珠玑Column 12中的题目10,其描述如下: How could you select one of n objects at random, where you see the objects sequentially but you do not know the value of n beforehand? For concreteness, how would you read a text file, and select and print one random l

用Spark学习矩阵分解推荐算法

在矩阵分解在协同过滤推荐算法中的应用中,我们对矩阵分解在推荐算法中的应用原理做了总结,这里我们就从实践的角度来用Spark学习矩阵分解推荐算法. 1. Spark推荐算法概述 在Spark MLlib中,推荐算法这块只实现了基于矩阵分解的协同过滤推荐算法.而基于的算法是FunkSVD算法,即将m个用户和n个物品对应的评分矩阵M分解为两个低维的矩阵:$$M_{m \times n}=P_{m \times k}^TQ_{k \times n}$$ 其中k为分解成低维的维数,一般远比m和n小.如果大

储水池抽样算法原理与实现

***********************************************声明****************************************************** 原创作品,出自 "晓风残月xj" 博客,欢迎转载,转载时请务必注明出处(http://blog.csdn.net/xiaofengcanyuexj). 由于各种原因,可能存在诸多不足,欢迎斧正! *******************************************

Shuffle an Array (水塘抽样)

随机性问题 水塘抽样算法可保证每个样本被抽到的概率相等 使用场景:从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况 Knuth洗牌算法 拿起第i张牌时,只从它前面的牌随机选出j,或从它后面的牌随机选出j交换即可 1 class Solution { 2 public: 3 Solution(vector<int>& nums) { 4 v = nums; 5 } 6 7 /** Resets the array to i

Spark MLlib Logistic Regression逻辑回归算法

1.1 逻辑回归算法 1.1.1 基础理论 logistic回归本质上是线性回归,只是在特征到结果的映射中加入了一层函数映射,即先把特征线性求和,然后使用函数g(z)将最为假设函数来预测.g(z)可以将连续值映射到0和1上. 它与线性回归的不同点在于:为了将线性回归输出的很大范围的数,例如从负无穷到正无穷,压缩到0和1之间,这样的输出值表达为"可能性"才能说服广大民众.当然了,把大值压缩到这个范围还有个很好的好处,就是可以消除特别冒尖的变量的影响. Logistic函数(或称为Sigm