泛函编程(19)-泛函库设计-Parallelism In Action

上节我们讨论了并行运算组件库的基础设计,实现了并行运算最基本的功能:创建新的线程并提交一个任务异步执行。并行运算类型的基本表达形式如下:

 1 import java.util.concurrent._
 2 object Par {
 3   type Par[A] = ExecutorService => Future[A]
 4   def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)
 5                                                   //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch71.Par.Par[A])java.
 6                                                   //| util.concurrent.Future[A]
 7   def unit[A](a: A): Par[A] = {
 8       es => new Future[A] {
 9           def get = a
10           def get(t: Long, u: TimeUnit) = get
11           def isDone = true
12           def isCancelled = false
13           def cancel(evenIsRunning: Boolean) = false
14       }
15   }                                               //> unit: [A](a: A)ch71.Par.Par[A]
16   def fork[A](pa: Par[A]): Par[A] = {            //注意这里有个错误?
17       es => es.submit(new Callable[A] {
18           def call: A = run(es)(pa).get
19       })
20   }
21   def async[A](a: => A): Par[A] = fork(unit(a))
22
23 }

实际上我们已经实现了两项最基本的函数:

1、unit[A](a: A): Par[A] : 我们硬生生的按照Par的类型款式造了一个Future实例,这样我们才可以用Future.get的形式读取运算结果值。看看这个例子:unit(42+1),在调用函数unit时由于传入参数是即时计算的,所以在进入unit前已经完成了计算结果43。然后人为的把这个结果赋予Future.get,这样我们就可以和真正的由ExecutorService返回的Future一样用同样的方式读取结果。所以说unit纯粹是一个改变格式的升格函数,没有任何其它作用。

2、async[A](a: => A): Par[A]:这个async函数把表达式a提交到主线程之外的另一个线程。新的线程由ExecutorService提供,我们无须理会,这样可以实现线程管理和并行运算组件库的松散耦合。由于async的传人函数是延后计算类型,所以我们可以把表达式a提交给另一个线程去运算。

那么我们用例子来示范一下:

 1   val es = Executors.newCachedThreadPool()  //线程由jvm提供,我们无须理会
 2                                                   //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool
 3                                                   //| [email protected][Running, pool size = 0, active threads = 0, queued tasks =
 4                                                   //|  0, completed tasks = 0]
 5   val a = unit({println(Thread.currentThread.getName); 42+1})
 6                                                   //> main
 7                                                   //| a  : ch71.Par.Par[Int] = <function1>
 8   val b = async({println(Thread.currentThread.getName); 42+1})
 9                                                   //> main
10                                                   //| b  : ch71.Par.Par[Int] = <function1>
11   run(es)(a).get                                  //> res0: Int = 43
12   run(es)(b).get                                  //> res1: Int = 43
13   es.shutdown()

看到问题了吗?用run运算a,b时没有显示println,而这个println在申明val a, val b 时已经执行了。对unit这可以理解:参数是即时计算的,所以println和结果43都在进入函数之前运算了(然后放到Future.get)。但是async的参数不是延迟计算的吗?我们再看清楚:async(a: => A) >>> fork(unit(a)),到fork函数参数unit(a)就立即计算了。所以 fork(pa: => Par[A])才可以保证在提交任务前都不会计算表达式a。我们必须把fork的函数款式改一下:

1 def fork[A](pa: => Par[A]): Par[A] = {
2         es => es.submit(new Callable[A] {
3          def call: A = run(es)(pa).get
4       })
5   }                                               //> fork: [A](pa: ch71.Par.Par[A])ch71.Par.Par[A]

再运行一下例子:

 1  val es = Executors.newCachedThreadPool()  //线程由jvm提供,我们无须理会
 2                                                   //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool
 3                                                   //| [email protected][Running, pool size = 0, active threads = 0, queued tasks =
 4                                                   //|  0, completed tasks = 0]
 5   val a = unit({println(Thread.currentThread.getName); 42+1})
 6                                                   //> main
 7                                                   //| a  : ch71.Par.Par[Int] = <function1>
 8   val b = async({println(Thread.currentThread.getName); 42+1})
 9                                                   //> b  : ch71.Par.Par[Int] = <function1>
10   run(es)(a).get                                  //> res0: Int = 43
11   run(es)(b).get                                  //> pool-1-thread-1
12                                                   //| res1: Int = 43
13   es.shutdown()

看看结果:unit在主线程main运行,而async则在pool-1-thread-1这个非主线程内运行。

实现异步运算才是并行运算的第一步。并行运算顾名思义就是把一个大任务分解成几个较小任务然后同时异步运算后再把结果结合起来。我们用伪代码描述一下并行运算思路:

1  //伪代码
2   val big10sencondJob = ???      //一个10秒运算
3   val small5sJob1 = split big10sencondJob in half  //分解成两个 5秒运算
4   val small5sJob2 = split big10sencondJob in half  //分解成两个 5秒运算
5   val fa = run small5sJob1      //立即返回future 但开始运算 5 秒
6   val fb = run small5sJob2      //立即返回future 但开始运算 5 秒
7   val sum = fa.get + fb.get     //等待5秒后可以得出结果

看来用以上方式是可以得到并行运算的效果(10秒到5秒区别)。但我们采用了串指令(imperative)方式实现。当然我们必须考虑用泛函方式来实现并行运算的启动及结果抽取。

先用泛函方式启动并行运算。如果我们并行启动两个运算:

1  def map2[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C]

map2并行启动pa,pb然后把它们的结果用函数f结合。看起来很美。那么我们先试着把它实现了:

 1  def map2[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C] = {
 2     import TimeUnit.NANOSECONDS
 3       es => new Future[C] {
 4           val fa = run(es)(pa)        //在这里按pa的定义来确定在那个线程运行。如果pa是fork Par则在非主线程中运行
 5           val fb = run(es)(pb)
 6           def get = f(fa.get, fb.get)
 7           def get(timeOut: Long, timeUnit: TimeUnit) = {
 8               val start = System.nanoTime
 9               val a = fa.get
10               val end = System.nanoTime
11               //fa.get用去了一些时间。剩下给fb.get的timeout值要减去
12               val b = fb.get(timeOut - timeUnit.convert((end - start), NANOSECONDS) , timeUnit)
13             f(a,b)
14           }
15           def isDone = fa.isDone && fb.isDone
16           def isCancelled = fa.isCancelled && fb.isCancelled
17           def cancel(evenIsRunning: Boolean) = fa.cancel(evenIsRunning) || fb.cancel(evenIsRunning)
18       }
19   }                                               //> map2: [A, B, C](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])(f: (A, B) => C)ch
20                                                   //| 71.Par.Par[C]

在map2的实现里我们人为地建了个Future[C]。但在建的过程中我们运行了pa,pb的计算。如果我们对pa或pb有运算超时要求的话,就必须计算每次运算所使用的时间。所以Future[C]是符合pa,pb的运算要求的。

我们先试着同时运算41+2,33+4两个计算:

 1 val es = Executors.newCachedThreadPool()  //线程由jvm提供,我们无须理会
 2                                                   //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPoo
 3                                                   //| [email protected][Running, pool size = 0, active threads = 0, queued tasks
 4                                                   //|  = 0, completed tasks = 0]
 5   map2(async({println(Thread.currentThread.getName); 41+2}),
 6        async({println(Thread.currentThread.getName); 33+4}))
 7        {(a,b) => {println(Thread.currentThread.getName); a+b}}(es).get
 8                                                   //> pool-1-thread-1
 9                                                   //| pool-1-thread-2
10                                                   //| main
11                                                   //| res0: Int = 80

啊!pa,pb分别在不同的非主线程中运行了。但函数f的运行是在主线程main中运行的。我们试着把这个也放到非主线程中:

1 fork { map2(async({println(Thread.currentThread.getName); 41+2}),
2        async({println(Thread.currentThread.getName); 33+4}))
3        {(a,b) => {println(Thread.currentThread.getName); a+b}}}(es).get
4                                                   //> pool-1-thread-2
5                                                   //| pool-1-thread-3
6                                                   //| pool-1-thread-1
7                                                   //| res0: Int = 80

现在所有的计算都是在不同的非主线程中运算的了,清楚了吧。

两个以上并行运算可以通过map2来实现:

 1   def map3[A,B,C,D](pa: Par[A], pb: Par[B], pc: Par[C])(f: (A,B,C) => D): Par[D] = {
 2       map2(pa,map2(pb,pc){(b,c) => (b,c)}){(a,bc) => {
 3           val (b,c) = bc
 4           f(a,b,c)
 5       }}
 6   }
 7   def map4[A,B,C,D,E](pa: Par[A], pb: Par[B], pc: Par[C], pd: Par[D])(f: (A,B,C,D) => E): Par[E] = { //| 71.Par.Par[C]
 8       map2(pa,map2(pb,map2(pc,pd){(c,d) => (c,d)}){(b,cd) => (b,cd)}){(a,bcd) => {
 9           val (b,(c,d)) = bcd
10           f(a,b,c,d)
11       }}
12   }
13   def map5[A,B,C,D,E,F](pa: Par[A], pb: Par[B], pc: Par[C], pd: Par[D], pe: Par[E])(f: (A,B,C,D,E) => F): Par[F] = { //| 71.Par.Par[C]
14       map2(pa,map2(pb,map2(pc,map2(pd,pe){(d,e) => (d,e)}){(c,de) => (c,de)}){(b,cde) => (b,cde)}){(a,bcde) => {
15           val (b,(c,(d,e))) = bcde
16           f(a,b,c,d,e)
17       }}
18   }

再看个例子:如果一个并行运算的表达式是个List[Int],即 Par[List[Int]]。 如何对内部的List[Int]进行排序?

 1 //我们可以run pa, get list 后进行排序,然后再封装进Future[List[Int]]
 2   def sortPar(pa: Par[List[Int]]): Par[List[Int]] = {
 3     es => {
 4           val l = run(es)(pa).get
 5           new Future[List[Int]] {
 6               def get = l.sorted
 7               def isDone = true
 8               def isCancelled = false
 9               def get(t: Long, u: TimeUnit) = get
10               def cancel(e: Boolean) = false
11           }
12       }
13   }
14  //也可以用map2来实现。因为map2可以启动并行运算,也可以对par内元素进行操作。但操作只针对一个par,
15  //我们用unit(())替代第二个par。现在我们可以对一个par的元素进行操作了
16   def sortedPar(pa: Par[List[Int]]): Par[List[Int]] = {
17       map2(pa,unit(())){(a,_) => a.sorted}
18   }
19   //map是对一个par的元素进行变形操作,我们同样可以用map2实现了
20   def map[A,B](pa: Par[A])(f: A => B): Par[B] = {
21       map2(pa,unit(())){(a,_) => f(a) }
22   }
23   //然后用map去对Par[List[Int]]排序
24   def sortParByMap(pa: Par[List[Int]]): Par[List[Int]] = {
25       map(pa){_.sorted}
26   }

看看运行结果:

1 sortPar(async({println(Thread.currentThread.getName); List(4,1,2,3)}))(es).get
2                                                   //> pool-1-thread-1
3                                                   //| res3: List[Int] = List(1, 2, 3, 4)
4  sortParByMap(async({println(Thread.currentThread.getName); List(4,1,2,3)}))(es).get
5                                                   //> pool-1-thread-1
6                                                   //| res4: List[Int] = List(1, 2, 3, 4)

实际上map2做了两件事:启动了两个并行运算、对运算结果进行了处理。这样说map2是可以被分解成更基本的组件函数:

 1 //启动两项并行运算
 2   def product[A,B](pa: Par[A], pb: Par[B]): Par[(A,B)] = {
 3       es => unit((run(es)(pa).get, run(es)(pb).get))(es)
 4   }                                               //> product: [A, B](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])ch71.Par.Par[(A, B
 5                                                   //| )]
 6   //处理运算结果
 7   def map[A,B](pa: Par[A])(f: A => B): Par[B] = {
 8       es => unit(f(run(es)(pa).get))(es)
 9   }                                               //> map: [A, B](pa: ch71.Par.Par[A])(f: A => B)ch71.Par.Par[B]
10   //再组合map2
11   def map2_pm[A,B,C](pa: Par[A], pb: Par[B])(f: (A,B) => C): Par[C] = {
12       map(product(pa, pb)){a => f(a._1, a._2)}
13   }                                               //> map2_pm: [A, B, C](pa: ch71.Par.Par[A], pb: ch71.Par.Par[B])(f: (A, B) => C
14                                                   //| )ch71.Par.Par[C]

我们还可以把函数A => B转换成A => Par[B],意思是把对A的运算变成并行运算Par[B]:

1   def asyncF[A,B](f: A => B): A => Par[B] = a => async(f(a))
2                                                   //> asyncF: [A, B](f: A => B)A => ch71.Par.Par[B]

用asyncF应该可以把对一个List的处理函数变成并行运算:

1 def parMap[A,B](as: List[A])(f: A => B): Par[List[B]]

用 map(as){asyncF(f)}可以得到List[Par[B]]。再想办法List[Par[B]] >>> Par[List[B]],这不就是我们经常遇到的那个sequence函数的类型款式吗。那我们就先实现了par的sequence函数吧:

 1  //用递归法实现
 2   def sequence_r[A](lp: List[Par[A]]): Par[List[A]] = {
 3       lp match {
 4           case Nil => unit(List())
 5           case h::t => map2(h,fork(sequence_r(t))){_ :: _}
 6       }
 7   }                                               //> sequence_r: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
 8   //用foldLeft
 9   def sequenceByFoldLeft[A](lp: List[Par[A]]): Par[List[A]] = {
10       lp.foldLeft(unit[List[A]](Nil)){(t,h) => map2(h,t){_ :: _}}
11   }                                               //> sequenceByFoldLeft: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
12   //用foldRight
13   def sequenceByFoldRight[A](lp: List[Par[A]]): Par[List[A]] = {
14       lp.foldRight(unit[List[A]](Nil)){(h,t) => map2(h,t){_ :: _}}
15   }                                               //> sequenceByFoldRight: [A](lp: List[ch71.Par.Par[A]])ch71.Par.Par[List[A]]
16   //用IndexedSeq切成两半来实现
17   def sequenceBalanced[A](as: IndexedSeq[Par[A]]): Par[IndexedSeq[A]] = {
18     if (as.isEmpty) unit(Vector())
19     else if (as.length == 1) map(as.head){a => Vector(a)}
20     else {
21         val (l,r) = as.splitAt(as.length / 2)
22         map2(sequenceBalanced(l),sequenceBalanced(r)){_ ++ _}
23     }
24   }                                               //> sequenceBalanced: [A](as: IndexedSeq[ch71.Par.Par[A]])ch71.Par.Par[IndexedS
25   def sequence[A](lp: List[Par[A]]): Par[List[A]] = { //| eq[A]]
26     map(sequenceBalanced(lp.toIndexedSeq)){_.toList}
27   }

有了sequence就可以从List[Par[A]]到Par[List[A]],实现parMap应该没问题了:

 1  def parMap[A,B](as: List[A])(f: A => B): Par[List[B]] = fork {
 2       val lps = as.map{asyncF(f)}
 3       sequence(lps)
 4   }                                               //> parMap: [A, B](as: List[A])(f: A => B)ch71.Par.Par[List[B]]
 5  fork(parMap(List(1,2,3,4,5)){ _ + 10 })(es).get  //> pool-1-thread-1
 6                                                   //| pool-1-thread-2
 7                                                   //| pool-1-thread-3
 8                                                   //| pool-1-thread-4
 9                                                   //| pool-1-thread-5
10                                                   //| pool-1-thread-6
11                                                   //| pool-1-thread-8
12                                                   //| pool-1-thread-7
13                                                   //| pool-1-thread-9
14                                                   //| pool-1-thread-10
15                                                   //| pool-1-thread-14
16                                                   //| pool-1-thread-12
17                                                   //| pool-1-thread-15
18                                                   //| pool-1-thread-11
19                                                   //| pool-1-thread-13
20                                                   //| res3: List[Int] = List(11, 12, 13, 14, 15)

现在我们的并行计算组件库已经能够提供一些基本的并行运算功能了。

时间: 2024-10-26 10:45:31

泛函编程(19)-泛函库设计-Parallelism In Action的相关文章

泛函编程(30)-泛函IO:Free Monad-Monad生产线

在上节我们介绍了Trampoline.它主要是为了解决堆栈溢出(StackOverflow)错误而设计的.Trampoline类型是一种数据结构,它的设计思路是以heap换stack:对应传统递归算法运行时在堆栈上寄存程序状态,用Trampoline进行递归算法时程序状态是保存在Trampoline的数据结构里的.数据结构是在heap上的,所以可以实现以heap换stack的效果.这种以数据结构代替函数调用来解决问题的方式又为泛函编程提供了更广阔的发展空间. 我们知道,任何涉及IO的运算都会面临

泛函编程(5)-数据结构(Functional Data Structures)

编程即是编制对数据进行运算的过程.特殊的运算必须用特定的数据结构来支持有效运算.如果没有数据结构的支持,我们就只能为每条数据申明一个内存地址了,然后使用这些地址来操作这些数据,也就是我们熟悉的申明变量再对变量进行读写这个过程了.试想想如果没有数据结构,那我们要申明多少个变量呢.所以说,数据结构是任何编程不可缺少的元素. 泛函编程使用泛函数据结构(Functional Data Structure)来支持泛函程序.泛函数据结构的特点是”不可变特性“(Immutability), 是泛函编程中函数组

泛函编程(20)-泛函库设计-Further Into Parallelism

上两节我们建了一个并行运算组件库,实现了一些基本的并行运算功能.到现在这个阶段,编写并行运算函数已经可以和数学代数解题相近了:我们了解了问题需求,然后从类型匹配入手逐步产生题解.下面我们再多做几个练习吧. 在上节我们介绍了asyncF,它的类型款式是这样的:asyncF(f: A => B): A => Par[B],从类型款式(type signature)分析,asyncF函数的功能是把一个普通的函数 A => B转成A => Par[B],Par[B]是一个并行运算.也就是说

泛函编程(18)-泛函库设计-并行运算组件库

作为专业的编程人员,我们经常会因为工作需要建立一些工具库.所谓工具库就是针对工作上经常会遇到的一些共性问题预先编制的由一整套函数所组成的函数库.通常这些工具库的功能都是在特别定制的一些数据类型支持下由一系列函数围绕着这些数据类型进行运算而实现的.在泛函编程范畴内也不例外.但在泛函工具库里的函数则更重视函数的组合能力(functional composition):因而泛函的工具库一般称为组件库(combinator library),库内函数则被称之为组件(combinator).组件库的设计者

泛函编程(6)-数据结构-List基础

List是一种最普通的泛函数据结构,比较直观,有良好的示范基础.List就像一个管子,里面可以装载一长条任何类型的东西.如需要对管子里的东西进行处理,则必须在管子内按直线顺序一个一个的来,这符合泛函编程的风格.与其它的泛函数据结构设计思路一样,设计List时先考虑List的两种状态:空或不为空两种类型.这两种类型可以用case class 来表现: 1 trait List[+A] {} 2 case class Cons[+A](head: A, tail: List[A]) extends

泛函编程(9)-异常处理-Option

Option是一种新的数据类型.形象的来描述:Option就是一种特殊的List,都是把数据放在一个管子里:然后在管子内部对数据进行各种操作.所以Option的数据操作与List很相似.不同的是Option的管子内最多只能存放一个元素,在这个方面Option的数据操作就比List简单的多,因为使用者不必理会数据元素的位置.顺序.Option只有两种状态:包含一个任何类型的元素或者为空.或者这样讲:一个Option实例包含 0 或 1 个元素:None代表为空,Some(x)代表包含一个任意类型的

泛函编程(27)-泛函编程模式-Monad Transformer

经过了一段时间的学习,我们了解了一系列泛函数据类型.我们知道,在所有编程语言中,数据类型是支持软件编程的基础.同样,泛函数据类型Foldable,Monoid,Functor,Applicative,Traversable,Monad也是我们将来进入实际泛函编程的必需.在前面对这些数据类型的探讨中我们发现: 1.Monoid的主要用途是在进行折叠(Foldable)算法时对可折叠结构内元素进行函数施用(function application). 2.Functor可以对任何高阶数据类型F[_]

泛函编程(23)-泛函数据类型-Monad

简单来说:Monad就是泛函编程中最概括通用的数据模型(高阶数据类型).它不但涵盖了所有基础类型(primitive types)的泛函行为及操作,而且任何高阶类或者自定义类一旦具备Monad特性就可以与任何类型的Monad实例一样在泛函编程中共同提供一套通用的泛函编程方式.所以有人把泛函编程视作Monadic Programming也不为过之.那么,具体什么是Monad呢? 在前面我们讨论过Monoid,我们说过它是一个特殊的范畴(Category),所有数据类型的Monoid实例都共同拥有一

泛函编程(24)-泛函数据类型-Monad, monadic programming

在上一节我们介绍了Monad.我们知道Monad是一个高度概括的抽象模型.好像创造Monad的目的是为了抽取各种数据类型的共性组件函数汇集成一套组件库从而避免重复编码.这些能对什么是Monad提供一个明确的答案吗?我们先从上节设计的Monad组件库中的一些基本函数来加深一点对Monad的了解: 1 trait Monad[M[_]] extends Functor[M] { 2 def unit[A](a: A): M[A] 3 def flatMap[A,B](ma: M[A])(f: A =

泛函编程(25)-泛函数据类型-Monad-Applicative

上两期我们讨论了Monad.我们说Monad是个最有概括性(抽象性)的泛函数据类型,它可以覆盖绝大多数数据类型.任何数据类型只要能实现flatMap+unit这组Monad最基本组件函数就可以变成Monad实例,就可以使用Monad组件库像for-comprehension这样特殊的.Monad具备的泛函式数据结构内部的按序计算运行流程.针对不同的数据类型,flatMap+unit组件实现方式会有所不同,这是因为flatMap+unit代表着承载数据类型特别的计算行为.之前我们尝试了List,O