Spark实战3:异常检测算法Scala语言

  异常检测原理是根据训练数据的高斯分布,计算均值和方差,若测试数据样本点带入高斯公式计算的概率低于某个阈值(0.1),判定为异常点。

1 创建数据集转化工具类,把csv数据集转化为RDD数据结构

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

object FeaturesParser{
  def parseFeatures(rawdata: RDD[String]): RDD[Vector] = {
    val rdd: RDD[Array[Double]] = rawdata.map(_.split(",").map(_.toDouble))
    val vectors: RDD[Vector] = rdd.map(arrDouble => Vectors.dense(arrDouble))
    vectors
  }

  def parseFeaturesWithLabel(cvData: RDD[String]): RDD[LabeledPoint] = {
    val rdd: RDD[Array[Double]] = cvData.map(_.split(",").map(_.toDouble))
    val labeledPoints = rdd.map(arrDouble => new LabeledPoint(arrDouble(0), Vectors.dense(arrDouble.slice(1, arrDouble.length))))
    labeledPoints
  }
}

2 创建异常检测工具类,主要是预测是否为异常点

object AnomalyDetection {

  /**
    * True if the given point is an anomaly, false otherwise
    * @param point
    * @param means
    * @param variances
    * @param epsilon
    * @return
    */
  def predict (point: Vector, means: Vector, variances: Vector, epsilon: Double): Boolean = {
    println("-->")
    println("-->v1"+probFunction(point, means, variances))
    println("-->v2"+epsilon)
    probFunction(point, means, variances) < epsilon
  }

  def probFunction(point: Vector, means: Vector, variances: Vector): Double = {
    val tripletByFeature: List[(Double, Double, Double)] = (point.toArray, means.toArray, variances.toArray).zipped.toList
    tripletByFeature.map { triplet =>
      val x = triplet._1
      val mean = triplet._2
      val variance = triplet._3
      val expValue = Math.pow(Math.E, -0.5 * Math.pow(x - mean,2) / variance)
      (1.0 / (Math.sqrt(variance) * Math.sqrt(2.0 * Math.PI))) * expValue
    }.product
  }
}

3异常检测模型类

import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD

class AnomalyDetectionModel(means2: Vector, variances2: Vector, epsilon2: Double) extends java.io.Serializable{
   var means: Vector = means2
   var variances: Vector = variances2
   var epsilon: Double = epsilon2

   def predict(point: Vector) : Boolean ={
      println("-->1")
      AnomalyDetection.predict(point, means, variances, epsilon)
   }

   def predict(points: RDD[Vector]): RDD[(Vector, Boolean)] = {
    println("-->2")
    points.map(p => (p,AnomalyDetection.predict(p, means, variances, epsilon)))
   }
}

4 包括启动异常检测模型,优化参数,输出评价指标等函数功能(注意序列化Serializable )

import org.apache.spark.Logging
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.rdd.RDD

/**
  * Anomaly Detection algorithm
  */
class AnomalyDetection extends java.io.Serializable with Logging {

  val default_epsilon: Double = 0.01

  def run(data: RDD[Vector]): AnomalyDetectionModel = {
    val sc = data.sparkContext

    val stats: MultivariateStatisticalSummary = Statistics.colStats(data)
    val mean: Vector = stats.mean
    val variances: Vector = stats.variance
    logInfo("MEAN %s VARIANCE %s".format(mean, variances))
    // println(s"--> MEAN VARIANCE$mean,$variances")
    println("--> MEAN VARIANCE"+mean+variances)
    new AnomalyDetectionModel(mean, variances, default_epsilon)
  }

  /**
    * Uses the labeled input points to optimize the epsilon parameter by finding the best F1 Score
    * @param crossValData
    * @param anomalyDetectionModel
    * @return
    */
  def optimize(crossValData: RDD[LabeledPoint], anomalyDetectionModel: AnomalyDetectionModel) = {
    val sc = crossValData.sparkContext
    val bcMean = sc.broadcast(anomalyDetectionModel.means)
    val bcVar = sc.broadcast(anomalyDetectionModel.variances)

    //compute probability density function for each example in the cross validation set
    val probsCV: RDD[Double] = crossValData.map(labeledpoint =>
      AnomalyDetection.probFunction(labeledpoint.features, bcMean.value, bcVar.value)
    )

    //select epsilon
    crossValData.persist()
    val epsilonWithF1Score: (Double, Double) = evaluate(crossValData, probsCV)
    crossValData.unpersist()

    logInfo("Best epsilon %s F1 score %s".format(epsilonWithF1Score._1, epsilonWithF1Score._2))
    new AnomalyDetectionModel(anomalyDetectionModel.means, anomalyDetectionModel.variances, epsilonWithF1Score._1)
  }

  /**
    *  Finds the best threshold to use for selecting outliers based on the results from a validation set and the ground truth.
    *
    * @param crossValData labeled data
    * @param probsCV probability density function as calculated for the labeled data
    * @return Epsilon and the F1 score
    */
  private def evaluate(crossValData: RDD[LabeledPoint], probsCV: RDD[Double]) = {

    val minPval: Double = probsCV.min()
    val maxPval: Double = probsCV.max()
    logInfo("minPVal: %s, maxPVal %s".format(minPval, maxPval))
    val sc = probsCV.sparkContext

    var bestEpsilon = 0D
    var bestF1 = 0D

    val stepsize = (maxPval - minPval) / 1000.0

    //find best F1 for different epsilons
    for (epsilon <- minPval to maxPval by stepsize){

      val bcepsilon = sc.broadcast(epsilon)

      val ourPredictions: RDD[Double] = probsCV.map{ prob =>
        if (prob < bcepsilon.value)
          1.0 //anomaly
        else
          0.0
      }
      val labelAndPredictions: RDD[(Double, Double)] = crossValData.map(_.label).zip(ourPredictions)
      val labelWithPredictionCached: RDD[(Double, Double)] = labelAndPredictions

      val falsePositives = countStatisticalMeasure(labelWithPredictionCached, 0.0, 1.0)
      val truePositives = countStatisticalMeasure(labelWithPredictionCached, 1.0, 1.0)
      val falseNegatives = countStatisticalMeasure(labelWithPredictionCached, 1.0, 0.0)

      val precision = truePositives / Math.max(1.0, truePositives + falsePositives)
      val recall = truePositives / Math.max(1.0, truePositives + falseNegatives)

      val f1Score = 2.0 * precision * recall / (precision + recall)

      if (f1Score > bestF1){
        bestF1 = f1Score
        bestEpsilon = epsilon
      }
    }

    (bestEpsilon, bestF1)
  }

  /**
    * Function to calculate true / false positives, negatives
    *
    * @param labelWithPredictionCached
    * @param labelVal
    * @param predictionVal
    * @return
    */
  private def countStatisticalMeasure(labelWithPredictionCached: RDD[(Double, Double)], labelVal: Double, predictionVal: Double): Double = {
    labelWithPredictionCached.filter { labelWithPrediction =>
      val label = labelWithPrediction._1
      val prediction = labelWithPrediction._2
      label == labelVal && prediction == predictionVal
    }.count().toDouble
  }

}

5 读取数据集,在hdfs的路径/user/mapr/,转化为RDD,训练模型,预测异常点:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD

// val conf = new SparkConf().setAppName("Anomaly Detection Spark2")
// val sc = new SparkContext(conf)

val rawFilePath = "/user/mapr/training.csv"
val cvFilePath = "/user/mapr/cross_val.csv"
val rawdata = sc.textFile(rawFilePath, 2).cache()
val cvData = sc.textFile(cvFilePath, 2).cache()

val trainingVec: RDD[Vector] = FeaturesParser.parseFeatures(rawdata)
val cvLabeledVec: RDD[LabeledPoint] = FeaturesParser.parseFeaturesWithLabel(cvData)

// trainingVec.collect().foreach(println)
// cvLabeledVec.collect().foreach(println)

val data = trainingVec.cache()
val anDet: AnomalyDetection = new AnomalyDetection()
//derive model
val model = anDet.run(data)

val dataCvVec = cvLabeledVec.cache()
// val optimalModel = anDet.optimize(dataCvVec, model)

//find outliers in CV
val cvVec = cvLabeledVec.map(_.features)
// cvVec.collect().foreach(println)
// print("-->"+typeOf[cvVec])
val results = model.predict(cvVec)

// results.collect().foreach(println)
val outliers = results.filter(_._2).collect()
// outliers.foreach(v => println(v._1))
println("\nFound %s outliers\n".format(outliers.length))

备注:输出的部分结果为,异常点输出

时间: 2025-01-07 13:47:02

Spark实战3:异常检测算法Scala语言的相关文章

《时序异常检测算法概览》

时序异常检测算法概览 2018-09-03 17:08:49 分类:人工智能与大数据 来自:论智(微信号:jqr_AI),作者:Pavel Tiunov,编译:weakish来源:statsbot,原文链接 编者按:Statsbot CTO Pavel Tiunov简要介绍了最流行的时序异常检测算法,并讨论了它们的优点和缺点 在Statsbot,我们持续检查异常检测方法这一领域的研究,并据此更新我们的模型. 本文概览了最流行的时序异常检测算法,并讨论了它们的优点和缺点. 本文是为想要了解异常检测

如何开发一个异常检测系统:如何评价一个异常检测算法

利用数值来评价一个异常检测算法的重要性 使用实数评价法很重要,当你用某个算法来开发一个具体的机器学习应用时,你常常需要做出很多决定,如选择什么样的特征等等,如果你能找到如何来评价算法,直接返回一个实数来告诉你算法的好坏,那样你做决定就会更容易一些.如现在有一个特征,要不要将这个特征考虑进来?如果你带上这个特征运行你的算法,再去掉这个特征运行你的算法,得到返回的实数,这个实数直接告诉你加上这个特征算法是变好了还是变坏了,这样你就有一种更简单的算法来确定是否要加上这个特征. 为了更快地开发出一个异常

异常检测算法的Octave仿真

在基于高斯分布的异常检测算法一文中,详细给出了异常检测算法的原理及其公式,本文为该算法的Octave仿真.实例为,根据训练样例(一组网络服务器)的吞吐量(Throughput)和延迟时间(Latency)数据,标记出异常的服务器. 可视化的数据集如下: 我们根据数据集X,计算其二维高斯分布的数学期望mu与方差sigma2: function [mu sigma2] = estimateGaussian(X) %ESTIMATEGAUSSIAN This function estimates th

异常检测算法--Isolation Forest

南大周志华老师在2010年提出一个异常检测算法Isolation Forest,在工业界很实用,算法效果好,时间效率高,能有效处理高维数据和海量数据,这里对这个算法进行简要总结. iTree 提到森林,自然少不了树,毕竟森林都是由树构成的,看Isolation Forest(简称iForest)前,我们先来看看Isolation Tree(简称iTree)是怎么构成的,iTree是一种随机二叉树,每个节点要么有两个女儿,要么就是叶子节点,一个孩子都没有.给定一堆数据集D,这里D的所有属性都是连续

机器学习总结2 - 关于激活函数、损失函数、正则化、异常检测算法总结

LSTM特性, CNN特性, 损失函数, paper, 项目 ...软件 激活函数: -> sigmod: 硬饱和性, y(0,1), 斜率趋于0;-> tanh: 软饱和性, y(-1,1), 虽然输出均值为0, 可以更快收敛, 但斜率依然会趋于0;-> relu: 当x<0时, 存在硬饱和, y(0, +), 使用leak-relu, 当x<0时, 使斜率不会为0; 损失函数/ 性能指标:-> 均方差mse, 均方根误差rmse, 常用于回归问题, rmse=500

[转] Socket心跳包异常检测的C语言实现,服务器与客户端代码案例

转载自:zxh2075的专栏 在Socket心跳机制中,心跳包可以由服务器发送给客户端,也可以由客户端发送给服务器,不过比较起来,前者开销可能较大.本文实现的是由客户端给服务器发送心跳包,服务器不必返回应答包,而是通过判断客户在线会话记录中的计数标志值来实现心跳异常的检测,以此决定客户端是否已经断开连接以及删除其在线会话记录. 基本思路: ①客户端定时给服务器发送心跳包(案例中定时时间为3秒): ②服务器创建一个心跳检测的线程,线程中每隔3秒对用户在线会话记录中的计数器进行加1操作(初始值为0)

异常检测(Anomaly detection): 异常检测算法(应用高斯分布)

估计P(x)的分布--密度估计 我们有m个样本,每个样本有n个特征值,每个特征都分别服从不同的高斯分布,上图中的公式是在假设每个特征都独立的情况下,实际无论每个特征是否独立,这个公式的效果都不错.连乘的公式表达如上图所示. 估计p(x)的分布问题被称为密度估计问题(density estimation)

异常检测概览——孤立森林和局部异常因子算法效果是最好的

转自博客:http://www.infosec-wiki.com/?p=140760 一.关于异常检测 异常检测(outlier detection)在以下场景: 数据预处理 病毒木马检测 工业制造产品检测 网络流量检测 等,有着重要的作用.由于在以上场景中,异常的数据量都是很少的一部分,因此诸如:SVM.逻辑回归等分类算法,都不适用,因为: 监督学习算法适用于有大量的正向样本,也有大量的负向样本,有足够的样本让算法去学习其特征,且未来新出现的样本与训练样本分布一致. 以下是异常检测和监督学习相

异常检测及欺诈

一.无监督异常检测模型   1.在线流数据异常检测(iforest隔离森林算法) 该方法的主要思想是,通过随机选定样本属性及其值将样本空间进行随机划分,分割的过程可以看成类似于随机森林中树建立的过程,对于新的样本,基于建立的隔离树求其分割深度,深度值越小,表明越容易被隔离,也就意味着异常的概率越大:反之则为正常样本.该方法是基于异常数据"少且不同"的特征,来采用随机隔离的思想设计异常检查. 该方法的主要优点是,在构建初始模型时不需要任何实际的数据,从而能快速构建初始探测模型,它符合数据