Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model

长久以来,函数式编程模式都被认为是一种学术研究用或教学实验用的编程模式。直到近几年由于大数据和多核CPU的兴起造成了函数式编程模式在一些实际大型应用中的出现,这才逐渐改变了人们对函数式编程无用论的观点。通过一段时间对函数式编程方法的学习,我们了解到Free Monad的算式/算法关注分离(separation of concern)可以是一种很实用的函数式编程模式。用Free Monad编写的程序容易理解并具备良好的可维护性。scalaz-stream的流程控制和多线程运算模式可以实现程序的安全并行运算。把Free Monad和scalaz-stream有机结合起来可以形成一种新的编程模式来支持函数式多线程编程来编制具备安全性、易扩展、易维护的并行运算程序。我们先从一个简单的Free Monad程序开始:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 import scala.language.higherKinds
 6 import scala.language.implicitConversions
 7 object freeStream {
 8 //1. 定义语句
 9  object DSLs {
10    sealed trait Interact[A]
11    case class Ask(q: String) extends Interact[String]
12    case class Tell(m: String) extends Interact[Unit]
13 //2. Free升格
14    implicit def interactToFree[A](ia: Interact[A]) = Free.liftF(ia)
15  }
16  //3. 程序逻辑/算式
17  object PRGs {
18  import DSLs._
19    val prgGetName: Free[Interact,Unit] = for {
20      first <- Ask("What‘s your first name?")
21      last <- Ask("What‘s your last name?")
22      _ <- Tell(s"Hello $first $last")
23    } yield ()
24  }
25  //4. 实现方式/算式
26  object IMPs {
27  import DSLs._
28    object InteractConsole extends (Interact ~> Id) {
29       def apply[A](ia: Interact[A]): Id[A] = ia match {
30         case Ask(q) => {println(q); Console.readLine}
31         case Tell(m) => println(m)
32       }
33    }
34  }

在这个程序里我们按照一个固定的框架步骤来实现“定义语句”、“升格Free”、“功能描述”及“实现方式”。这里特别需要注意的是所谓的算式/算法关注分离,即“功能描述”和“实现方式”是互不关联的。这样我们可以提供不同版本的实现方式来进行测试、环境转换等工作。Free Monad的具体运算方式如下:

1 //5. 运算/Run
2 import DSLs._,PRGs._,IMPs._
3 prgGetName.foldMapRec(InteractConsole)

运算结果返回A:对于prgGetName来说就是Unit。不过如果直接运行foldMapRec有可能会产生副作用(siede effect)。这样不符合纯代码要求,无法实现这个程序与其它程序的函数组合。我们需要把这段可能产生副作用的代码放到Task里:

1 val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
2 ?//> taskGetName  : scalaz.concurrent.Task[scalaz.Scalaz.Id[Unit]] = [email protected]

这样我们就获得了一个异线程的延迟运算。我们可以放心地用这个taskGetName进行函数组合。把这个Free Monad程序转换成scalaz-stream的Process也很容易:

1 val prcGetName = Process.eval(taskGetName)  //> prcGetName  : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.Scalaz.Id[Unit]] = Await([email protected],<function1,<function1>)

我们用Process.eval直接把它转换成Process[Task,Unit]类型。下面我们用scalaz-stream的运算方式来运算这个Free Monad程序:

1 object FreeInteract extends App {
2   import DSLs._,PRGs._,IMPs._
3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
4   val prcGetName = Process.eval(taskGetName)
5   prcGetName.run.run
6 }

运算结果如下:

1 What‘s your first name?
2 tiger
3 What‘s your last name?
4 chan
5 Hello, tiger chan!

虽然这个例子看起来很简单,但其中代表的意义却不小:我们潜移默化地实现了函数式多线程编程了。

如果我们需要Free Monad程序返回运算结果的话就调整一下功能描述(算式):

1   val prgGetUserID = for {
2     uid <- ask("Enter User ID:")
3   } yield uid

再运算一下:

 1 object FreeInteract extends App {
 2   import DSLs._,PRGs._,IMPs._
 3   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
 4   val prcGetName = Process.eval(taskGetName)
 5   //prcGetName.run.run
 6   Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)}).runLog.run.map(println)
 7 ...
 8 Enter User ID:
 9 tiger123
10 tiger123

用纯代码方式echo输入:

1   pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
2 ...
3 Enter User ID:
4 user234
5 user234

也可以把结果发送到一个Sink来显示:

val outSink: Sink[Task,String] = Process.constant{x =>Task.delay{prgEchoInput(x).foldMapRec(InteractConsole)}}
(pUserID to outSink).run.run
...
Enter User ID:
jonathon
jonathon

我们试着再加一个Free程序功能:验证用户编号

 1   sealed trait Login[A]
 2   case class CheckID(id: String) extends Login[Boolean]
 3 ...
 4   def prgCheckID(id: String) = for {
 5     b <- Free.liftF(CheckID(id))
 6   } yield b
 7 ...
 8   object UserLogin extends (Login ~> Id) {
 9     def apply[A](la: Login[A]): Id[A] = la match {
10       case CheckID(id) => if (id === "tiger123") true else false
11     }
12   }

stream流程是:先读取用户编号然后验证,跟着在Sink输出结果:

 1  def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
 2   val chCheckID = channel.lift(fCheckID)
 3   ((pUserID through chCheckID) to outSink).run.run
 4 ...
 5
 6 Enter User ID:
 7 tiger123
 8 true
 9 ...
10 Enter User ID:
11 johnny234
12 false

不错!Free Monad和scalar-stream可以很好的集成在一起。
我把这节讨论的示范源代码提供给大家:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scalaz.stream._
 5 object DSLs {
 6   sealed trait Interact[A]
 7   case class Ask(q: String) extends Interact[String]
 8   case class Tell(m: String) extends Interact[Unit]
 9   object Interact {
10     def ask(q: String): Free[Interact, String] = Free.liftF(Ask(q))
11     def tell(m: String): Free[Interact, Unit] = Free.liftF(Tell(m))
12   }
13   sealed trait Login[A]
14   case class CheckID(id: String) extends Login[Boolean]
15 }
16 object PRGs {
17   import DSLs._
18   import Interact._
19
20   val prgGetName = for {
21     first <- ask("What‘s your first name?")
22     last <- ask("What‘s your last name?")
23     _ <- tell(s"Hello, $first $last!")
24   } yield()
25
26   val prgGetUserID = for {
27     uid <- ask("Enter User ID:")
28   } yield uid
29
30   def prgEchoInput(m: String) = tell(m)
31
32   def prgCheckID(id: String) = for {
33     b <- Free.liftF(CheckID(id))
34   } yield b
35
36 }
37 object IMPs {
38   import DSLs._
39   object InteractConsole extends (Interact ~> Id) {
40     def apply[A](ia: Interact[A]): Id[A] = ia match {
41       case Ask(q) => { println(q); readLine }
42       case Tell(m) => println(m)
43     }
44   }
45   object UserLogin extends (Login ~> Id) {
46     def apply[A](la: Login[A]): Id[A] = la match {
47       case CheckID(id) => if (id === "tiger123") true else false
48     }
49   }
50 }
51
52
53 object FreeInteract extends App {
54   import DSLs._,PRGs._,IMPs._
55   val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)}
56   val prcGetName = Process.eval(taskGetName)
57   //prcGetName.run.run
58   val pUserID= Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)})
59   //pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run
60   val outSink: Sink[Task,String] = Process.constant { x => Task.delay {prgEchoInput(x).foldMapRec(InteractConsole) } }
61   //(pUserID to outSink).run.run
62   def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString)
63   val chCheckID = channel.lift(fCheckID)
64   ((pUserID through chCheckID) to outSink).run.run
65   
时间: 2024-10-11 08:02:53

Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model的相关文章

Java多线程编程模式实战指南(三):Two-phase Termination模式--转载

本文由本人首次发布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-two-phase-termination.转载请注明作者: 黄文海 出处:http://viscent.iteye.com. 停止线程是一个目标简单而实现却不那么简单的任务.首先,Java没有提供直接的API用于停止线程.此外,停止线程时还有一些额外的细节需要考虑,如待停止的线程处于阻塞(等待锁)或者等待状态(等待其它

Java多线程编程模式实战指南(三):Two-phase Termination模式

停止线程是一个目标简单而实现却不那么简单的任务.首先,Java没有提供直接的API用于停止线程.此外,停止线程时还有一些额外的细节需要考虑,如待停止的线程处于阻塞(等待锁)或者等待状态(等待其它线程).尚有未处理完的任务等.本文介绍的Two-phase Termination模式提供了一种通用的用于优雅地停止线程的方法. Two-phase Termination模式简介 Java并没有提供直接的API用于停止线程.Two-phase Termination模式通过将停止线程这个动作分解为准备阶

Javascript编程模式(JavaScript Programming Patterns)Part 1.

JavaScript 为网站添加状态,这些状态可能是校验或者更复杂的行为像拖拽终止功能或者是异步的请求webserver (aka Ajax). 在过去的那些年里, JavaScript libraries变得越来越流行. 如果你面对着很多的工作计划,一个很明确的道理就是在网站变得越来越复杂的情况下每次修改‘轮子“肯定让你不爽.当然我们把类库放到一边,聚焦于 JavaScript的语法,对你最有价值的东西是在你编写 JavaScript你要明确你使用的是那种”编程模式“. 下面主要介绍几个jav

Java多线程编程模式实战指南(二):Immutable Object模式--转载

本文由本人首次发布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-immutable-object.转载请注明作者: 黄文海 出处:http://viscent.iteye.com. 多线程共享变量的情况下,为了保证数据一致性,往往需要对这些变量的访问进行加锁.而锁本身又会带来一些问题和开销.Immutable Object模式使得我们可以在不使用锁的情况下,既保证共享变量访问的线程安

Java多线程编程模式实战指南(一):Active Object模式--转载

本文由黄文海首次发布在infoq中文站上:http://www.infoq.com/cn/articles/Java-multithreaded-programming-mode-active-object-part1 .转载请注明作者: 黄文海 出处:http://viscent.iteye.com. Active Object模式简介 Active Object模式是一种异步编程模式.它通过对方法的调用与方法的执行进行解耦来提高并发性.若以任务的概念来说,Active Object模式的核心

Java多线程编程模式实战指南之Promise模式

Promise模式简介(转) Promise模式是一种异步编程模式 .它使得我们可以先开始一个任务的执行,并得到一个用于获取该任务执行结果的凭据对象,而不必等待该任务执行完毕就可以继续执行其他操作.等到我们需要该任务的执行结果时,再调用凭据对象的相关方法来获取.这样就避免了不必要的等待,增加了系统的并发性.这好比我们去小吃店,同时点了鸭血粉丝汤和生煎包.当我们点餐付完款后,我们拿到手的其实只是一张可借以换取相应食品的收银小票(凭据对象)而已,而不是对应的实物.由于鸭血粉丝汤可以较快制作好,故我们

Java多线程编程模式实战指南一:Active Object模式(上)

Active Object模式简介 Active Object模式是一种异步编程模式.它通过对方法的调用与方法的执行进行解耦来提高并发性.若以任务的概念来说,Active Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离.这有点类似于System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc()方法调用返回后的某段时间才开始执行任务--回收垃圾.我们知

Java多线程编程模式实战指南(二):Immutable Object模式

多线程共享变量的情况下,为了保证数据一致性,往往需要对这些变量的访问进行加锁.而锁本身又会带来一些问题和开销.Immutable Object模式使得我们可以在不使用锁的情况下,既保证共享变量访问的线程安全,又能避免引入锁可能带来的问题和开销. Immutable Object模式简介 多线程环境中,一个对象常常会被多个线程共享.这种情况下,如果存在多个线程并发地修改该对象的状态或者一个线程读取该对象的状态而另外一个线程试图修改该对象的状态,我们不得不做一些同步访问控制以保证数据一致性.而这些同

Java多线程编程模式实战指南一:Active Object模式(下)

Active Object模式的评价与实现考量 Active Object模式通过将方法的调用与执行分离,实现了异步编程.有利于提高并发性,从而提高系统的吞吐率. Active Object模式还有个好处是它可以将任务(MethodRequest)的提交(调用异步方法)和任务的执行策略(Execution Policy)分离.任务的执行策略被封装在Scheduler的实现类之内,因此它对外是不"可见"的,一旦需要变动也不会影响其它代码,降低了系统的耦合性.任务的执行策略可以反映以下一些