Introduction to Monoids and Semigroups with Spark

在地球上什么是Monoid??

定义:

monoid(幺半群 译注:参考附注1翻译,下文中继续使用英文名)是一个带有二元运算(+)和一个单位元(译注:原文为identity element)i使得对于任意x,x+i=i+x=x。注意它不像群(译注:group,数学上翻译为群),它不带有逆元素。也可以说是带有单位元的半群(semigroup)

Wow,没什么用。那先看一些例子然后重新看个简单定义。。

https://blog.safaribooksonline.com/2013/05/15/monoids-for-programmers-a-scala-example/

1.整数和加法 
结合律 => (a+b)+c == a+(b+c) 和 单位元=>0+n==n+0==n 
1. 整数和乘法 
结合律=>(a*b)c==a(b*c) 和单位元=>1*n==n*1==n 
1.列表和关联 
结合律=>List(1,2)+(List(3,4)+List(5,6))==(List(1,2)+List(3,4))+List(5,6)==List(1,2,3,4,5,6)和单位元=>List(1)+List()==List()+List(1)==List(1)

这看起来任意二元操作都是Monoid.我们能否举一些反例?

例如平均

avg(1,2) 
avg(10,avg(20,30))!=avg(avg(10,20),30)

减法!

原来一个semigroup就是一个Monoid除了其不需要一个单位元,所以更具有包容性。 
关键在于你想象二元操作结合性的要求,这意味着可以不在乎计算的顺序!也意味着很容易并发的进行计算。 
一个Monoid本质上是特定类型所遵循的协议。这是否给予我们任何线索在Scala中实现Monoids/Semigroups?

Typeclass实现Monoids

//It is up to the developer to enforce the associativity rule!!!
trait SemiGroup[T]{  def op(a: T, b: T): T
}

trait Monoid[T] extends SemiGroup[T]{  def zero: T
}123456789

现在我们熟悉了怎么在Scala中实现Monoid那么我们能够实现IntAdditionMonoid么

object Monoids{
  implicit object IntAdditionMonoid extends Monoid[Int]{    def op(a: Int, b: Int): Int = a + b    def zeru: Int = 0
  }
}123456

好样的。现在我们在哪可以使用呢?看如何在方法中使用例如reduce…

trait SemiGroup[T]{  def op(a: T, b: T): T
}

trait Monoid[T] extends SemiGroup[T]{  def zero: T
}
implicit object IntAdditionMonoid extends Monoid[Int]{  def op(a: Int, b: Int): Int = a + b  def zero: Int = 0}

val listA = List(1,3,5,6)def reduceWithMonoid[A](seq:Seq[A])(implicit ev:Monoid[A]): A = {
  seq.reduce(ev.op(_,_))
}
println(reduceWithMonoid(listA))12345678910111213141516171819

再定义更多然后看他们如何表现

trait SemiGroup[T]{
  def op(a: T, b: T): T
}trait Monoid[T] extends SemiGroup[T]{
  def zero: T
}
implicit object IntAdditionMonoid extends Monoid[Int]{
  def op(a: Int, b: Int): Int = a + b  def zero: Int = 0}//We now have must use a class as type parameters are required due to the fact that tuples themselves have classes.//Here our goal is to define functionality for tuples that contain monoid abiding typesclass Tuple2SemiGroup[A,B]()(implicit sg1: SemiGroup[A], sg2: SemiGroup[B]) extends SemiGroup[(A,B)]{
  def op(a: (A,B), b: (A,B)): (A,B) = (sg1.op(a._1, b._1), sg2.op(a._2, b._2))
}//As we cannot make above an implicit class because that actually does something different (more on this with an aside about pimp my library pattern soon)//Well we can use another feature of implicits which are implicit conversions. This function provides logic on how to change a Tuple that contains Semigroups and return a SemiGroup of the tuple itselfimplicit def tuple2SemiGroup[A,B](implicit sg1: SemiGroup[A],sg2: SemiGroup[B]): SemiGroup[(A,B)] = {  new Tuple2SemiGroup[A,B]()(sg1,sg2)
}val listA = List((1,2),(3,4),(5,2),(6,9))def reduceWithMonoid[A](seq: Seq[A])(implicit ev: SemiGroup[A]): A = {
  seq.reduce(ev.op(_,_))
}
println(reduceWithMonoid(listA))123456789101112131415161718192021222324252627282930

···

看如何在monoid定义中包含聚集逻辑。事实上我们可以重定义集合对象的行为这意味着高重用和高扩展的代码。再看一个例子,然后我们转向spark.

在多看一个例子,Semigroups 可以很容易应用在合并2个maps关联键并且对值求和。

trait SemiGroup[T]{
  def op(a: T, b: T): T
}trait Monoid[T] extends SemiGroup[T]{
  def zero: T
}
implicit object IntAdditionMonoid extends Monoid[Int]{
  def op(a: Int, b: Int): Int = a + b  def zero: Int = 0}//Here we only need to assume that the values can form a SemiGroup as the keys are just being combined.class MapSemiGroup[K,V]()(implicit sg1: SemiGroup[V]) extends SemiGroup[Map[K,V]]{
  //We are aggregating where the initial map is one of the maps and we loop through key values of other one and combine.
  //This way any keys that don‘t appear in the looping map are there already,all keys that appear in both are overwritten
  def op(iteratingMap: Map[K,V], startingMap: Map[K,V]): Map[K,V] = iteratingMap.aggregate(startingMap)({
    (currentMap: Map[K,V], kv: (K,V)) => {      val newValue: V = startingMap.get(kv._1).map(v => sg1.op(v, kv._2)).getOrElse(kv._2)
      currentMap + (kv._1 -> newValue)
      }
    },    //This is the combine part (if done in parallel, could have two different maps that need to be combined) this assumes that all keys are already combined....
    {
      (mapOne: Map[K,V], mapTwo: Map[K,V]) => mapOne ++ mapTwo
    }
  )
}//As we cannot make above an implicit class because that actually does something diferent (more on this with an aside about pimp my library pattern soon)//Well we can use another feature of implicits which are implicit conversions. This function provides logic on how to change a Tuple that contains Semigroups and return a SemiGroup of the tuple itselfimplicit def mapSemiGroup[K,V](implicit sg1: SemiGroup[V]): SemiGroup[Map[K,V]] = {  new MapSemiGroup[K,V]()(sg1)
}val mapA = Map("A" -> 1, "B" -> 2, "D" -> 5)val mapB = Map("A" -> 3, "C" -> 3, "D" -> 1)val mapC = Map("B" -> 10, "D" -> 3)def reduceWithMonoid[A](seq: Seq[A])(implicit ev: SemiGroup[A]): A = {
  seq.reduce(ev.op(_,_))
}
println(reduceWithMonoid(List(mapA, mapB, mapC)))12345678910111213141516171819202122232425262728293031323334353637383940414243

Pimp My Library(译注:参考3)示例和我们为什么不用implicit classes

//An implicit class takes a constructor which is the class to be pimpd. You can then define methods etc. which will be "available" on that type as though it was native functionality!!!!implicit class PimpedString(s: String){
  def pimpMyString(): String = s + " is pimped"}println("My String".pimpMyString())123456

Spark使用Pimp My Library模式来添加只在特定类型的RDDs上可用的特定方法。如:Key Value Pair RDDs 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/implicit class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)extends Loggingwith SparkHadoopMapReduceUtilwith Serializable
{
}12345678910

这看起来你期望去设计你自己的monoid/semigroup库。不要担心,twitter已经做了并且使其可以用于Spark!(这意味着一切都是可序列化的)。他们还写的让其有效率(译注:这句没懂,原文:They have also written it in a way such that it performs very) 
https://github.com/twitter/algebird

合在一起,Monoids与Spark

我们需要在spark中聚集RDDs时编写很多函数,不幸的是,这些函数大致看上去是一样的但是用一般方式很难编写。使用Monoids是一种方式来达到目的,这是一个实际的例子:

 //This is a call from an aggregation section that updates state with the HyperLogLog object
 val stateUniques = makeModelUniquesTime.updateStateByKey(updateTotalCountState[HLL])
 //This is a call from an aggregation section that updates state with the Long
 val statePV = makeModelCountReduceWithTime.updateStateByKey(updateTotalCountState[Long])

//This was originally implemented as tow methods, one for HLL and one for Long. With Monoids we can write a singel method that takes care of both cases.
def updateTotalCountState[U](values: Seq[(BananaTimestamp, U)], state: Option[(BananaTimeStamp, U)])(implicit monoid: Monoid[U], ct: ClassTag[U]): Option[(BananaTimestamp, U)] = {
  val defaultState = (null, monoid.zero)  values match {    case Nil => Some(state.getOrElse(defaultState))    case _ =>
      val hdT = values(0)._1
      // The reduction logic is now contained in the monoid definitions as opposed to thest functions. We can instead distil this to what is takes to update state
      val v = values.map{case (_, a) => a}.reduce(monoid.plus)
      val stateReceived = state.getOrElse(defaultState)      if(checkResetState(stateReceived._1, hdT)) Some((hdT, v)) else Some((hdT, monoid.plus(v, stateReceived._2)))
    }
  }123456789101112131415161718

原文链接:https://thewanderingmonad.wordpress.com/2015/05/17/introduction-to-monoids-and-semigroups-with-spark/

参考 
1、monoid http://hongjiang.info/semigroup-and-monoid/ 
2、https://zh.wikipedia.org/wiki/%E5%B9%BA%E5%8D%8A%E7%BE%A4 
3、http://www.ituring.com.cn/article/195776

时间: 2024-10-07 00:13:27

Introduction to Monoids and Semigroups with Spark的相关文章

Introduction to Big Data with Apache Spark 课程总结

课程主要实用内容: 1.spark实验环境的搭建 2.4个lab的内容 3.常用函数 4.变量共享 1.spark实验环境的搭建(windows) a. 下载,安装visualbox 管理员身份运行;课程要求最新版4.3.28,如果c中遇到虚拟机打不开的,可以用4.2.12,不影响 b. 下载,安装vagrant,重启 管理员身份运行 c. 下载虚拟机 c1.将vagrant加入path,D:\HashiCorp\Vagrant\bin c2.创建虚拟机存放的目录,比如myvagrant c3.

如何拿到阿里算法校招offer

好多同学有问过怎么能拿到阿里算法类校招的offer,刚好看到这篇文章分享给大家,详情可以看原文链接,原文链接中有视频讲解. 师兄师姐的建议: 之前初学算法的时候上过的公开课和看过的书 1. Coursera:<Machine Learning>.<Pattern Discovery in Data Mining>.<R Programming>,平台特点:通俗易懂,适合入门,看完公开课基本能写出作业 2. edx:<Introduction to Big Data

Spark从入门到上手实战

Spark从入门到上手实战 课程学习地址:http://www.xuetuwuyou.com/course/186 课程出自学途无忧网:http://www.xuetuwuyou.com 讲师:轩宇老师 课程简介: Spark属于新起的基于内存处理海量数据的框架,由于其快速被众公司所青睐.Spark 生态栈框架,非常的强大,可以对数据进行批处理.流式处理.SQL 交互式处理及机器学习和Graphx 图像计算.目前绝大数公司都使用,主要在于 Spark SQL 结构化数据的处理,非常的快速,高性能

[JS Compose] 5. Create types with Semigroups

An introduction to concatting items via the formal Semi-group interface. Semi-groups are simply a type with a concat method that are associative. We define three semigroup instances and see them in action.   A semigroup is a type with a concat method

从Hadoop到Spark的架构实践

摘要:本文则主要介绍TalkingData在大数据平台建设过程中,逐渐引入Spark,并且以Hadoop YARN和Spark为基础来构建移动大数据平台的过程. 当下,Spark已经在国内得到了广泛的认可和支持:2014年,Spark Summit China在北京召开,场面火爆:同年,Spark Meetup在北京.上海.深圳和杭州四个城市举办,其中仅北京就成功举办了5次,内容更涵盖Spark Core.Spark Streaming.Spark MLlib.Spark SQL等众多领域.而作

读learning spark lighting

chapter 1 introduction to the analysis with spark the conponents of Sparks spark core(contains the basic  functionality of sparks. spark Core  is also the  home to the APIs that defines the RDDs), spark sql(structured data ) is the package  for worki

sparklyr包--实现R与Spark接口

1.sparklyr包简介 Rstudio公司发布的sparklyr包具有以下几个功能: 实现R与Spark的连接: sparklyr包提供了一个完整的dplyr后端,可筛选并聚合Spark数据集,接着在R中实现分析与可视化: 利用Spark的MLlib机器学习库在R中实现分布式机器学习算法: 可以创建一个扩展,用于调用Spark API,并为Spark的所有包集提供了一个接口. 2.RStudio Server安装sparklyr包 Linux版本:Ubuntu 16.04 LTS 64bit

Tuning Java Garbage Collection for Spark Applicati

This is a guest post from our friends in the SSG STO Big Data Technology group at Intel. Join us at the Spark Summit to hear from Intel and other companies deploying Spark in production.  Use the code Databricks20 to receive a 20% discount! Spark is

【转载】Getting Started with Spark (in Python)

Getting Started with Spark (in Python) Benjamin Bengfort Hadoop is the standard tool for distributed computing across really large data sets and is the reason why you see "Big Data" on advertisements as you walk through the airport. It has becom