泛函编程(18)-泛函库设计-并行运算组件库

作为专业的编程人员,我们经常会因为工作需要建立一些工具库。所谓工具库就是针对工作上经常会遇到的一些共性问题预先编制的由一整套函数所组成的函数库。通常这些工具库的功能都是在特别定制的一些数据类型支持下由一系列函数围绕着这些数据类型进行运算而实现的。在泛函编程范畴内也不例外。但在泛函工具库里的函数则更重视函数的组合能力(functional composition);因而泛函的工具库一般称为组件库(combinator library),库内函数则被称之为组件(combinator)。组件库的设计者对函数设计有着共通的最基本目标:通过对组件进行各种函数组合可以实现更大的功能。泛函组件库设计一般针对特别的功能需求或课题:首先尝试用一些数据类型来表述课题需求,然后围绕这些特制的数据类型设计一系列函数针对课题各个最基本需求范畴提供解决方法。我们在这节讨论中从一个并行运算组件库的设计过程来介绍泛函组件库设计模式。

我们设计这个并行运算组件库的目的:可以把一个普通运算放到另外一个独立的线程(thread)中去运行。这样我们可以同时把多个运算分别放到多个线程中同时运行从而达到并行运算的目的。问题简单明确,但如何对这些在各自独立运行空间的运算进行组合(composition)、变形(transformation)则值得仔细思量。

先从数据类型着手:一个并行运算应该像是一个容器,把一个普通运算封装在里面。我们来随便造个结构出来:Par[A],A是普通运算返回的结果类型。这个Par类型很像我们前面接触的高阶类型,那个承载A类型元素的管子类型。如果这样去想的话,我们可以用前面所有针对高阶类型的函数对管子内的元素A进行操作处理。那么如果一个运算是封装在Par里在另一个线程中运算完成后总是需要一个方法把结果取出来。这样我们可以先得出两个最基本的函数:

1 def unit[A](a: A): Par[A]    //把一个普通运算注入Par。把A升格到一个并行运算
2 def get[A](pa: Par[A]): A    //把并行运行结果抽取出来

下一个问题是运行线程控制:是由程序员来决定一个运算该放到一个新的线程里还是固定每一个运算都用新的独立线程?假设我们选择用由程序员调用一个函数来确定产生新线程。这样有两个优越:1、可以有更灵活的并行运算策略(有些已经确定很快完成的运算可能没有必要用新的线程,独立线程运算可能消耗更多的资源);2、独立线程机制和并行运算是松散耦合的:Par的实现中不需要了解线程管理机制。这个函数的款式如下:

def fork[A](pa: Par[A]): Par[A]  //为pa设定一个新的运行空间。并不改变pa,还是返回Par[A]

那么把一个运算放到一个新的线程里运行可以用这个函数表达:

def async[A](a: => A): Par[A] = fork(unit(a))  //不需要了解任何关于Par的信息。知道fork会为这个运算设定新的运行空间。注意还是返回Par[A]

因为我们追求的是线程机制和并行运算的松散耦合,那么我们就不会在Par里实际进行并行运算的运行,那么Par就只是对一个并行运算的描述。fork的返回还是Par,只是增加了对运算环境的描述,也不会真正运行算法。这样来说Par如果是一个运算描述,那么我们就需要一个真正的运行机制来获取运算结果了:

1 def run[A](pa: Par[A]): A    //由于Par的意义从容器变成运算描述,我们把get重新命名为run

我们就需要在run的函数实现方法里进行线程管理、计算运行等真正Par的运行了。

现在Par的表达形式包括如下:

1 def unit[A](a: A): Par[A]                      //把一个普通运算注入Par。把A升格到一个并行运算描述
2 def fork[A](pa: Par[A]): Par[A]                //为pa设定一个新的运行空间。返回的结果Par必须经run来运行并获取结果
3 def async[A](a: => A): Par[A] = fork(unit(a))  //不需要了解任何关于Par的信息。注意还是返回Par[A]
4 def run[A](pa: Par[A]): A                      //运行pa并抽取运算结果

应该是在v1.6以后吧,java API包含了java.util.concurrent包,其中包括了ExecutorService类提供线程管理方面的支持。ExecutorService和Future类翻译成scala如下:

class ExecutorService {
  def submit[A](a: Callable[A]): Future[A]
}
trait Future[A] {
  def get: A
  def get(timeout: Long, unit: TimeUnit): A
  def cancel(evenIfRunning: Boolean): Boolean
  def isDone: Boolean
  def isCancelled: Boolean
}

我们不需要进入多线程编程底层细节,用java Concurrent ExecutorService足够了。ExecutorService提供了以Callable形式向系统提交需运算任务方式;系统立即返回Future,我们可以用Future.get以锁定线程方式读取运算。由于运算结果读取是以锁定线程(blocking)形式进行的,那么使用get的时间节点就很重要了:如果提交一个运算后下一步直接get就会立即锁定线程直至运算完成,那我们就无法得到任何并行运算效果了。Future还提供了运行状态和中断运行等功能为编程人员提供更强大灵活的运算控制。为了获取更灵活的控制,Par的返回值应该从直接锁定线程读取A改成不会产生锁定线程效果的Future:

1 type Par[A] = ExecutorService => Future[A]
2 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)

现在Par的含义又从一个数据类型变成了一个函数描述了:传入一个ExecutorService,返回Future。我们可以用run来运行这个函数,系统会立即返回Future,无需任何等待。

下面让我们把这些最基本的函数都实现了:

 1 object par {
 2 import java.util.concurrent._
 3
 4 type Par[A] = ExecutorService => Future[A]
 5 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)
 6                                                   //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch7.par.Par[A])java.u
 7                                                   //| til.concurrent.Future[A]
 8
 9 def unit[A](a: A): Par[A] = es => {
10     new Future[A] {
11         def get: A = a
12         def isDone = true
13         def isCancelled = false
14         def get(timeOut: Long, timeUnit: TimeUnit): A = get
15         def cancel(evenIfRunning: Boolean): Boolean = false
16     }
17 }                                                 //> unit: [A](a: A)ch7.par.Par[A]
18 def fork[A](pa: Par[A]): Par[A] = es => {
19     es.submit[A](new Callable[A] {
20       def call: A = run(es)(pa).get
21     })
22 }                                                 //> fork: [A](pa: ch7.par.Par[A])ch7.par.Par[A]
23 def async[A](a: => A): Par[A] = fork(unit(a))     //> async: [A](a: => A)ch7.par.Par[A]
24
25 val a = unit(4+7)                                 //> a  : ch7.par.Par[Int] = <function1>
26 val b = async(2+1)                                //> b  : ch7.par.Par[Int] = <function1>
27 val es = Executors.newCachedThreadPool()          //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool
28                                                   //| [email protected][Running, pool size = 0, active threads = 0, queued tasks =
29                                                   //|  0, completed tasks = 0]
30 run(es)(b).get                                    //> res0: Int = 3
31 run(es)(a).get                                    //> res1: Int = 11
32 es.shutdown()
33
34 }

从应用例子里我们可以了解线程的管理是由现有的java工具提供的(Executors.newCachedThreadPool),我们无须了解线程管理细节。我们同时确定了线程的管理机制与我们设计的并行运算Par是松散耦合的。

注意:unit并没有使用ExecutorService es, 而是直接返回一个注明完成运算(isDone=true)的Future,这个Future的get就是unit的传入参数a。如果我们再用这个Future的get来得取表达式的运算结果的话,这个运算是在当前主线程中运行的。async通过fork选择新的线程;并向新的运行环境提交了运算任务。我们来分析一下运算流程:

1、val a = unit(4+7),unit构建了一个完成的 new Future; isDone=true,设置了 Future.get = 4 + 7,run(es)(a)在主线程中对表达式 4+7 进行了运算并得取结果 11。

2、val b = async(2+1) >>> fork(unit(2+1)), run(es)(b) >>> submit(new Callable), 注意 def call = run(es)(b).get : 这里提交的运算run(es)(b).get实际上又提交了一次运算并直接锁定线程(blocking)等待读取运算结果。第一次提交Callable又需要锁定线程等待提交运算完成计算。如果线程池只能提供一个线程的话,第一次提交了Callable会占用这个唯一的线程并等待第二次提交运算得出的结果,由于没有线程可以提供给二次提交运算,这个运算永远无法得到结果,那么run(es)(b).get就会产生死锁了(dead lock)。

我们在这节介绍了一个简单的泛函并行组件库设计,可以把一个运算放到主线程之外的另一个新的线程中计算。但是抽取运算结果却还是会锁定线程(blocking)。我们下一节将会讨论如何通过一些算法函数来实现并行运算。

时间: 2025-01-07 08:35:50

泛函编程(18)-泛函库设计-并行运算组件库的相关文章

泛函编程(20)-泛函库设计-Further Into Parallelism

上两节我们建了一个并行运算组件库,实现了一些基本的并行运算功能.到现在这个阶段,编写并行运算函数已经可以和数学代数解题相近了:我们了解了问题需求,然后从类型匹配入手逐步产生题解.下面我们再多做几个练习吧. 在上节我们介绍了asyncF,它的类型款式是这样的:asyncF(f: A => B): A => Par[B],从类型款式(type signature)分析,asyncF函数的功能是把一个普通的函数 A => B转成A => Par[B],Par[B]是一个并行运算.也就是说

泛函编程(19)-泛函库设计-Parallelism In Action

上节我们讨论了并行运算组件库的基础设计,实现了并行运算最基本的功能:创建新的线程并提交一个任务异步执行.并行运算类型的基本表达形式如下: 1 import java.util.concurrent._ 2 object Par { 3 type Par[A] = ExecutorService => Future[A] 4 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es) 5 //> run: [A](es: jav

泛函编程(30)-泛函IO:Free Monad-Monad生产线

在上节我们介绍了Trampoline.它主要是为了解决堆栈溢出(StackOverflow)错误而设计的.Trampoline类型是一种数据结构,它的设计思路是以heap换stack:对应传统递归算法运行时在堆栈上寄存程序状态,用Trampoline进行递归算法时程序状态是保存在Trampoline的数据结构里的.数据结构是在heap上的,所以可以实现以heap换stack的效果.这种以数据结构代替函数调用来解决问题的方式又为泛函编程提供了更广阔的发展空间. 我们知道,任何涉及IO的运算都会面临

泛函编程(5)-数据结构(Functional Data Structures)

编程即是编制对数据进行运算的过程.特殊的运算必须用特定的数据结构来支持有效运算.如果没有数据结构的支持,我们就只能为每条数据申明一个内存地址了,然后使用这些地址来操作这些数据,也就是我们熟悉的申明变量再对变量进行读写这个过程了.试想想如果没有数据结构,那我们要申明多少个变量呢.所以说,数据结构是任何编程不可缺少的元素. 泛函编程使用泛函数据结构(Functional Data Structure)来支持泛函程序.泛函数据结构的特点是”不可变特性“(Immutability), 是泛函编程中函数组

墨刀联合有赞Vant组件库,让你轻松设计出电商原型

继上周新上线了简历模板之后,本周墨刀的原型模板库又欢喜地增添一名新成员! 有赞Vant组件库 (做电商的宝宝要捂嘴笑了) Vant 组件库是有赞前端团队开源的一套基于Vue的UI组件库,目前版本收录了50+个组件,都是来源于有赞的微商城业务,经过有赞业务的检验,轻量可靠 (目前在Github上已经收获星星7857颗) 相比于其他Vue UI组件库,Vant 组件库不只是提供基础的UI组件,还增加了许多移动商城内常用的业务组件,可以让开发者快速构建移动商城. 墨刀的 Vant 组件库模板 正如有赞

搭建前端组件库(一)

本文梳理如何搭建和构建前端组件库. 了解几个问题 为何需要组件化? 大部分项目起源都是源于业务方的各种各样的奇葩需求.随着公司的业务发展,公司内部开始衍生出很多的B2C系统.后台系统,前端部门也疲于应对越来越多同质化的项目,这些项目在很多基础模块层.源代码存在不小的相似,甚至存在相似的业务模块. 笔者曾经所在的一个电商团队,前端成员基本每个人多做过登录注册.购物车.支付.微信登录...... 大量重复的业务代码.由于组内技术没有强制规范 本质上相同的东西,重复的去code就显得浪费. 分析这些问

第6课-函数库设计

1.Linux下的应用程序所需要的外部函数可以由函数可和系统调用提供.2.函数库是处于用户态的,由工作人员编写的函数的集合,而系统调用是由Linux内核实现的.3.函数库分为静态和动态,按照链接方式划分的.动态函数库比静态节约空间.使用静态函数库以后应用程序最终会包含自身和函数库,在内存中运行的时候就会有多个函数可的拷贝,导致在空间上的浪费.而动态函数库则在内存中只有一个拷贝,供多个都会使用到的程序使用.4.Linux使用的函数库一般都是在/lib或者在/usr/lib,以*.so*命名的,是动

泛函编程(24)-泛函数据类型-Monad, monadic programming

在上一节我们介绍了Monad.我们知道Monad是一个高度概括的抽象模型.好像创造Monad的目的是为了抽取各种数据类型的共性组件函数汇集成一套组件库从而避免重复编码.这些能对什么是Monad提供一个明确的答案吗?我们先从上节设计的Monad组件库中的一些基本函数来加深一点对Monad的了解: 1 trait Monad[M[_]] extends Functor[M] { 2 def unit[A](a: A): M[A] 3 def flatMap[A,B](ma: M[A])(f: A =

泛函编程(29)-泛函实用结构:Trampoline-不再怕StackOverflow

泛函编程方式其中一个特点就是普遍地使用递归算法,而且有些地方还无法避免使用递归算法.比如说flatMap就是一种推进式的递归算法,没了它就无法使用for-comprehension,那么泛函编程也就无法被称为Monadic Programming了.虽然递归算法能使代码更简洁易明,但同时又以占用堆栈(stack)方式运作.堆栈是软件程序有限资源,所以在使用递归算法对大型数据源进行运算时系统往往会出现StackOverflow错误.如果不想办法解决递归算法带来的StackOverflow问题,泛函