Spark MLlib FPGrowth算法

1.1 FPGrowth算法

1.1.1 基本概念

关联规则挖掘的一个典型例子是购物篮分析。关联规则研究有助于发现交易数据库中不同商品(项)之间的联系,找出顾客购买行为模式,如购买了某一商品对购买其他商品的影响,分析结果可以应用于商品货架布局、货存安排以及根据购买模式对用户进行分类。

关联规则的相关术语如下:

(1)项与项集

这是一个集合的概念,在一篮子商品中的一件消费品即为一项(Item),则若干项的集合为项集,如{啤酒,尿布}构成一个二元项集。

(2)关联规则

一般记为的形式,X为先决条件,Y为相应的关联结果,用于表示数据内隐含的关联性。如:表示购买了尿布的消费者往往也会购买啤酒。

关联性强度如何,由三个概念——支持度、置信度、提升度来控制和评价。

例:有10000个消费者购买了商品,其中购买尿布1000个,购买啤酒2000个,购买面包500个,同时购买尿布和面包800个,同时购买尿布和面包100个。

(3)支持度(Support)

支持度是指在所有项集中{X, Y}出现的可能性,即项集中同时含有X和Y的概率:

该指标作为建立强关联规则的第一个门槛,衡量了所考察关联规则在“量”上的多少。通过设定最小阈值(minsup),剔除“出镜率”较低的无意义规则,保留出现较为频繁的项集所隐含的规则。

设定最小阈值为5%,由于{尿布,啤酒}的支持度为800/10000=8%,满足基本输了要求,成为频繁项集,保留规则;而{尿布,面包}的支持度为100/10000=1%,被剔除。

(4)置信度(Confidence)

置信度表示在先决条件X发生的条件下,关联结果Y发生的概率:

这是生成强关联规则的第二个门槛,衡量了所考察的关联规则在“质”上的可靠性。相似的,我们需要对置信度设定最小阈值(mincon)来实现进一步筛选。

具体的,当设定置信度的最小阈值为70%时,置信度为800/1000=80%,而的置信度为800/2000=40%,被剔除。

(5)提升度(lift)

提升度表示在含有X的条件下同时含有Y的可能性与没有X这个条件下项集中含有Y的可能性之比:公式为confidence(artichok => cracker)/support(cracker) = 80%/50% = 1.6。该指标与置信度同样衡量规则的可靠性,可以看作是置信度的一种互补指标。

1.1.2
FP-Growth算法

FP-Growth(频繁模式增长)算法是韩家炜老师在2000年提出的关联分析算法,它采取如下分治策略:将提供频繁项集的数据库压缩到一棵频繁模式树(FP-Tree),但仍保留项集关联信息;该算法和Apriori算法最大的不同有两点:第一,不产生候选集,第二,只需要两次遍历数据库,大大提高了效率。

(1)按以下步骤构造FP-树

(a)
扫描事务数据库D一次。收集频繁项的集合F和它们的支持度。对F按支持度降序排序,结果为频繁项表L。

(b)
创建FP-树的根结点,以“null”标记它。对于D 中每个事务Trans,执行:选择 Trans 中的频繁项,并按L中的次序排序。设排序后的频繁项表为[p | P],其中,p 是第一个元素,而P 是剩余元素的表。调用insert_tree([p | P], T)。该过程执行情况如下。如果T有子女N使得N.item-name = p.item-name,则N 的计数增加1;否则创建一个新结点N将其计数设置为1,链接到它的父结点T,并且通过结点链结构将其链接到具有相同item-name的结点。如果P非空,递归地调用insert_tree(P,
N)。

(2)FP-树的挖掘

通过调用FP_growth(FP_tree, null)实现。该过程实现如下:

FP_growth(Tree, α)

(1) if Tree
含单个路径P then

(2) for
路径 P 中结点的每个组合(记作β)

(3)
产生模式β ∪ α,其支持度support = β中结点的最小支持度;

(4) else for each ai在Tree的头部(按照支持度由低到高顺序进行扫描) {

(5)
产生一个模式β = ai ∪ α,其支持度support = ai .support;

(6)
构造β的条件模式基,然后构造β的条件FP-树Treeβ;

(7) if Treeβ ≠ ? then

(8)
调用 FP_growth (Treeβ, β);}

end

1.1.3 FP-Growth算法演示—构造FP-树

(1)事务数据库建立

原始事务数据库如下:


Tid


Items


1


I1,I2,I5


2


I2,I4


3


I2,I3


4


I1,I2,I4


5


I1,I3


6


I2,I3


7


I1,I3


8


I1,I2,I3,I5


9


I1,I2,I3

扫描事务数据库得到频繁1-项目集F。


I1


I2


I3


I4


I5


6


7


6


2


2

定义minsup=20%,即最小支持度为2,重新排列F。


I2


I1


I3


I4


I5


7


6


6


2


2

重新调整事务数据库。


Tid


Items


1


I2, I1,I5


2


I2,I4


3


I2,I3


4


I2, I1,I4


5


I1,I3


6


I2,I3


7


I1,I3


8


I2, I1,I3,I5


9


I2, I1,I3

(2)创建根结点和频繁项目表

(3)加入第一个事务(I2,I1,I5)

(4)加入第二个事务(I2,I4)

(5)加入第三个事务(I2,I3)

以此类推加入第5、6、7、8、9个事务。

(6)加入第九个事务(I2,I1,I3)

1.1.4 FP-Growth算法演示—FP-树挖掘

FP-树建好后,就可以进行频繁项集的挖掘,挖掘算法称为FpGrowth(Frequent Pattern Growth)算法,挖掘从表头header的最后一个项开始,以此类推。本文以I5、I3为例进行挖掘。

(1)挖掘I5:

对于I5,得到条件模式基:<(I2,I1:1)>、<I2,I1,I3:1>

构造条件FP-tree:

得到I5频繁项集:{{I2,I5:2},{I1,I5:2},{I2,I1,I5:2}}

I4、I1的挖掘与I5类似,条件FP-树都是单路径。

(1)挖掘I3:

I5的情况是比较简单的,因为I5对应的条件FP-树是单路径的,I3稍微复杂一点。I3的条件模式基是(I2 I1:2), (I2:2), (I1:2),生成的条件FP-树如下图:

I3的条件FP-树仍然是一个多路径树,首先把模式后缀I3和条件FP-树中的项头表中的每一项取并集,得到一组模式{I2 I3:4, I1 I3:4},但是这一组模式不是后缀为I3的所有模式。还需要递归调用FP-growth,模式后缀为{I1,I3},{I1,I3}的条件模式基为{I2:2},其生成的条件FP-树如下图所示。

在FP_growth中把I2和模式后缀{I1,I3}取并得到模式{I1 I2 I3:2}。

理论上还应该计算一下模式后缀为{I2,I3}的模式集,但是{I2,I3}的条件模式基为空,递归调用结束。最终模式后缀I3的支持度>2的所有模式为:{ I2 I3:4, I1 I3:4, I1 I2 I3:2}。

1.2 Spark Mllib FPGrowth源码分析

FPGrowth源码包括:FPGrowth、FPTree两部分。

其中FPGrowth中包括:run方法、genFreqItems方法、genFreqItemsets方法、genCondTransactions方法;

FPTree中包括:add方法、merge方法、project方法、getTransactions方法、extract方法。

// run 计算频繁项集

/**

* Computes an FP-Growth model that contains frequent itemsets.

* @param data input data set, each element contains a transaction

* @return an [[FPGrowthModel]]

*/

def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = {

if (data.getStorageLevel == StorageLevel.NONE) {

logWarning("Input data is not cached.")

}

val count = data.count()//计算事务总数

val minCount = math.ceil(minSupport * count).toLong//计算最小支持度

val numParts =
if (numPartitions >
0) numPartitions
else data.partitions.length

val partitioner =
new HashPartitioner(numParts)

//freqItems计算满足最小支持度的Items项

val freqItems = genFreqItems(data, minCount, partitioner)

//freqItemsets计算频繁项集

val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)

new FPGrowthModel(freqItemsets)

} 

// genFreqItems计算满足最小支持度的Items项

/**

* Generates frequent items by filtering the input data using minimal support level.

* @param minCount minimum count for frequent itemsets

* @param partitioner partitioner used to distribute items

* @return array of frequent pattern ordered by their frequencies

*/

privatedef genFreqItems[Item: ClassTag](

data: RDD[Array[Item]],

minCount: Long,

partitioner: Partitioner): Array[Item] = {

data.flatMap { t =>

val uniq = t.toSet

if (t.size != uniq.size) {

thrownew SparkException(s"Items in a transaction must
be unique but got ${t.toSeq}.")

}

t

}.map(v => (v,
1L))

.reduceByKey(partitioner, _ + _)

.filter(_._2 >= minCount)

.collect()

.sortBy(-_._2)

.map(_._1)

}//统计每个Items项的频次,对小于minCount的Items项过滤,返回Items项。

// genFreqItemsets计算频繁项集:生成FP-Trees,挖掘FP-Trees

/**

* Generate frequent itemsets by building FP-Trees, the extraction is done on each partition.

* @param data transactions

* @param minCount minimum count for frequent itemsets

* @param freqItems frequent items

* @param partitioner partitioner used to distribute transactions

* @return an RDD of (frequent itemset, count)

*/

privatedef genFreqItemsets[Item: ClassTag](

data: RDD[Array[Item]],

minCount: Long,

freqItems: Array[Item],

partitioner: Partitioner): RDD[FreqItemset[Item]] = {

val itemToRank = freqItems.zipWithIndex.toMap//表头

data.flatMap { transaction =>

genCondTransactions(transaction, itemToRank, partitioner)

}.aggregateByKey(new FPTree[Int], partitioner.numPartitions)( //生成FP树

(tree, transaction) => tree.add(transaction,
1L), //FP树增加一条事务

(tree1, tree2) => tree1.merge(tree2)) //FP树合并

.flatMap { case (part, tree) =>

tree.extract(minCount, x => partitioner.getPartition(x) == part)//FP树挖掘频繁项

}.map { case (ranks, count) =>

new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)

}

}

// add FP-Trees增加一条事务数据

/** Adds a transaction with count. */

def add(t: Iterable[T], count: Long =
1L): this.type = {

require(count >
0)

var curr = root

curr.count += count

t.foreach { item =>

val summary = summaries.getOrElseUpdate(item,
new Summary)

summary.count += count

val child = curr.children.getOrElseUpdate(item, {

val newNode =
new Node(curr)

newNode.item = item

summary.nodes += newNode

newNode

})

child.count += count

curr = child

}

this

}

// merge FP-Trees合并

/** Merges another FP-Tree. */

def merge(other: FPTree[T]):
this.type = {

other.transactions.foreach {
case (t, c) =>

add(t, c)

}

this

}

// extract FP-Trees挖掘,返回所有频繁项集

/** Extracts all patterns with valid suffix and minimum count. */

def extract(

minCount: Long,

validateSuffix: T => Boolean = _ =>
true): Iterator[(List[T], Long)] = {

summaries.iterator.flatMap {
case (item, summary) =>

if (validateSuffix(item) && summary.count >= minCount) {

Iterator.single((item :: Nil, summary.count)) ++

project(item).extract(minCount).map {
case (t, c) =>

(item :: t, c)

}

} else {

Iterator.empty

}

}

}

}

1.3 Mllib FPGrowth实例

1、数据

数据格式为:物品1物品2物品3…

r z h k p

z y x w v u t s

s x o n r

x z y m t s q e

z

x z y r q t p

2、代码

//读取样本数据

valdata_path =
"/home/tmp/sample_fpgrowth.txt"

valdata =
sc.textFile(data_path)

valexamples =
data.map(_.split(" ")).cache()

//建立模型

valminSupport =
2

valnumPartition =
10

valmodel =
new
FPGrowth()

.setMinSupport(minSupport)

.setNumPartitions(numPartition)

.run(examples)

//打印结果

println(s"Number of frequent itemsets: ${model.freqItemsets.count()}")

model.freqItemsets.collect().foreach { itemset =>

println(itemset.items.mkString("[",
",", "]") +
", " + itemset.freq)

}

时间: 2024-09-30 19:44:27

Spark MLlib FPGrowth算法的相关文章

Spark MLlib机器学习算法、源码及实战讲解pdf电子版下载

Spark MLlib机器学习算法.源码及实战讲解pdf电子版下载 链接:https://pan.baidu.com/s/1ruX9inG5ttOe_5lhpK_LQg 提取码:idcb <Spark MLlib机器学习:算法.源码及实战详解>书中讲解由浅入深慢慢深入,解析讲解了MLlib的底层原理:数据操作及矩阵向量计算操作,该部分是MLlib实现的基础:并对此延伸机器学习的算法,循序渐进的讲解其中的原理,是读者一点一点的理解和掌握书中的知识. 目录 · · · · · · 第一部分 Spa

Spark MLlib SVM算法

1.1 SVM支持向量机算法 支持向量机理论知识参照以下文档: 支持向量机SVM(一) http://www.cnblogs.com/jerrylead/archive/2011/03/13/1982639.html 支持向量机SVM(二) http://www.cnblogs.com/jerrylead/archive/2011/03/13/1982684.html 支持向量机(三)核函数 http://www.cnblogs.com/jerrylead/archive/2011/03/18/

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark MLlib Linear Regression线性回归算法

1.Spark MLlib Linear Regression线性回归算法 1.1 线性回归算法 1.1.1 基础理论 在统计学中,线性回归(Linear Regression)是利用称为线性回归方程的最小平方函数对一个或多个自变量和因变量之间关系进行建模的一种回归分析.这种函数是一个或多个称为回归系数的模型参数的线性组合. 回归分析中,只包括一个自变量和一个因变量,且二者的关系可用一条直线近似表示,这种回归分析称为一元线性回归分析.如果回归分析中包括两个或两个以上的自变量,且因变量和自变量之间

Spark MLlib KMeans聚类算法

1.1 KMeans聚类算法 1.1.1 基础理论 KMeans算法的基本思想是初始随机给定K个簇中心,按照最邻近原则把待分类样本点分到各个簇.然后按平均法重新计算各个簇的质心,从而确定新的簇心.一直迭代,直到簇心的移动距离小于某个给定的值. K-Means聚类算法主要分为三个步骤: (1)第一步是为待聚类的点寻找聚类中心: (2)第二步是计算每个点到聚类中心的距离,将每个点聚类到离该点最近的聚类中去: (3)第三步是计算每个聚类中所有点的坐标平均值,并将这个平均值作为新的聚类中心: 反复执行(

Spark MLlib算法调用展示平台及其实现过程

1. 软件版本: IDE:Intellij IDEA 14,Java:1.7,Scala:2.10.6:Tomcat:7,CDH:5.8.0: Spark:1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0 : Hadoop:hadoop2.6.0-cdh5.8.0:(使用的是CDH提供的虚拟机) 2. 工程下载及部署: Scala封装Spark算法工程:https://github.com/fansy1990/Spark_MLlib_Algorithm_1.6.0.git

协同过滤算法 R/mapreduce/spark mllib多语言实现

用户电影评分数据集下载 http://grouplens.org/datasets/movielens/ 1) Item-Based,非个性化的,每个人看到的都一样 2) User-Based,个性化的,每个人看到的不一样 对用户的行为分析得到用户的喜好后,可以根据用户的喜好计算相似用户和物品,然后可以基于相似用户或物品进行推荐.这就是协同过滤中的两个分支了,基于用户的和基于物品的协同过滤. 在计算用户之间的相似度时,是将一个用户对所有物品的偏好作为一个向量,而在计算物品之间的相似度时,是将所有

spark.mllib源码阅读-分类算法4-DecisionTree

本篇博文主要围绕Spark上的决策树来讲解,我将分为2部分来阐述这一块的知识.第一部分会介绍一些决策树的基本概念.Spark下决策树的表示与存储.结点分类信息的存储.结点的特征选择与分类:第二部分通过一个Spark自带的示例来看看Spark的决策树的训练算法.另外,将本篇与上一篇博文"spark.mllib源码阅读bagging方法"的bagging子样本集抽样方法结合,也就理解了Spark下的决策森林树的实现过程. 第一部分: 决策树模型 分类决策树模型是一种描述对实例进行分类的树形

Spark MLlib Logistic Regression逻辑回归算法

1.1 逻辑回归算法 1.1.1 基础理论 logistic回归本质上是线性回归,只是在特征到结果的映射中加入了一层函数映射,即先把特征线性求和,然后使用函数g(z)将最为假设函数来预测.g(z)可以将连续值映射到0和1上. 它与线性回归的不同点在于:为了将线性回归输出的很大范围的数,例如从负无穷到正无穷,压缩到0和1之间,这样的输出值表达为"可能性"才能说服广大民众.当然了,把大值压缩到这个范围还有个很好的好处,就是可以消除特别冒尖的变量的影响. Logistic函数(或称为Sigm