Scalaz(44)- concurrency :scalaz Future,尚不完整的多线程类型

scala已经配备了自身的Future类。我们先举个例子来了解scala Future的具体操作:

 1 import scala.concurrent._
 2 import ExecutionContext.Implicits.global
 3 object scalafuture {
 4   def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000) ; i + i }
 5                                       //> dbl: (i: Int)scala.concurrent.Future[Int]
 6   val fdbl = dbl(3)                   //> fdbl  : scala.concurrent.Future[Int] = List()
 7   fdbl.onSuccess {
 8     case a => println(s"${a/2} + ${a/2} = $a")
 9   }
10   println("calculating ...")          //> calculating ...
11   Thread.sleep(2000)                  //> 3 + 3 = 6
12 }

这是一个标准的异步运算;在成功完成运算事件上绑定callback来获取在其它线程中的运算结果。我们也可以进行异常处理:

1   val fdz = Future { 3 / 0 }      //> fdz  : scala.concurrent.Future[Int] = List()
2   fdz.onFailure {
3     case e => println(s"error message {${e.getMessage}}")
4   }
5   Thread.sleep(100)               //> error message {/ by zero}

又或者同时绑定运算成功和失败事件的callback函数:

1   import scala.util.{Success, Failure}
2   fdz onComplete {
3     case Success(a) => println(s"${a/2} + ${a/2} = $a")
4     case Failure(e) => println(s"error message {${e.getMessage}}")
5   }
6   Thread.sleep(100)               //> error message {/ by zero}

scala Future 实现了flatMap,我们可以把几个Future组合起来用:

 1   def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000); i + i }
 2                                                   //> dbl: (i: Int)scala.concurrent.Future[Int]
 3   def sqr(i: Int): Future[Int] = Future { i * i } //> sqr: (i: Int)scala.concurrent.Future[Int]
 4   def sum(a: Int, b: Int): Future[Int] = Future { a + b }
 5                                           //> sum: (a: Int, b: Int)scala.concurrent.Future[Int]
 6   val fsum = for {
 7     a <- dbl(3)
 8     b <- sqr(a)
 9     c <- sum(a,b)
10   } yield c                               //> fsum  : scala.concurrent.Future[Int] = List()
11
12   fsum onSuccess { case c => println(s"the combined result is: $c") }
13   Thread.sleep(2000)                     //> the combined result is: 42

scala Future利用flatMap实现了流程运算:先运算dbl再sqr再sum,这个顺序是固定的即使它们可能在不同的线程里运算,因为sqr依赖dbl的结果,而sum又依赖dbl和sqr的结果。

好了,既然scala Future的功能已经比较完善了,那么scalaz的Future又有什么不同的特点呢?首先,细心一点可以发现scala Future是即时运算的,从下面的例子里可以看出:

1   import scala.concurrent.duration._
2   val fs = Future {println("run now..."); System.currentTimeMillis() }
3                                          //> run now...
4                                          //| fs  : scala.concurrent.Future[Long] = List()
5   Await.result(fs, 1.second)             //> res0: Long = 1465907784714
6   Thread.sleep(1000)
7   Await.result(fs, 1.second)             //> res1: Long = 1465907784714

可以看到fs是在Future构建时即时运算的,而且只会运算一次。如果scala Future中包括了能产生副作用的代码,在构建时就会立即产生副作用。所以我们是无法使用scala Future来编写纯函数的,那么在scalaz里就必须为并发编程提供一个与scala Future具同等功能但又不会立即产生副作用的类型了,这就是scalaz版本的Future。我们看看scalaz是如何定义Future的:scalaz.concurrent/Future.scala

sealed abstract class Future[+A] {
...
object Future {
  case class Now[+A](a: A) extends Future[A]
  case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A]
  case class Suspend[+A](thunk: () => Future[A]) extends Future[A]
  case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B]
  case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit,
                            f: A => Future[B]) extends Future[B]
...

Future[A]就是个Free Monad。它的结构化表达方式分别有Now,Async,Suspend,BindSuspend,BindAsync。我们可以用这些结构实现flatMap函数,所以Future就是Free Monad:

  def flatMap[B](f: A => Future[B]): Future[B] = this match {
    case Now(a) => Suspend(() => f(a))
    case Suspend(thunk) => BindSuspend(thunk, f)
    case Async(listen) => BindAsync(listen, f)
    case BindSuspend(thunk, g) =>
      Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f)))
    case BindAsync(listen, g) =>
      Suspend(() => BindAsync(listen, g andThen (_ flatMap f)))
  }

free structure类型可以支持算式/算法关注分离,也就是说我们可以用scalaz Future来描述程序功能而不涉及正真运算。scalaz Future的构建方式如下:

 1 import scalaz._
 2 import Scalaz._
 3 import scalaz.concurrent._
 4 import scala.concurrent.duration._
 5 object scalazFuture {
 6 val fnow = Future.now {println("run..."); System.currentTimeMillis()}
 7                                           //> run...
 8                                           //| fnow  : scalaz.concurrent.Future[Long] = Now(1465909860301)
 9 val fdelay = Future.delay {println("run..."); System.currentTimeMillis()}
10                                           //> fdelay  : scalaz.concurrent.Future[Long] = Suspend(<function0>)
11 val fapply = Future {println("run..."); System.currentTimeMillis()}
12                                           //> fapply  : scalaz.concurrent.Future[Long] = Async(<function1>)

可以看到fnow是个即时运算的构建器,而这个now就是一个lift函数, 它负责把一个普通无副作用运算升格成Future。fdelay,fapply分别把运算存入trampoline进行结构化了。我们必须另外运算trampoline来运行结构内的运算:

 1 fdelay.run                                        //> run...
 2                                                   //| res0: Long = 1465910524847
 3 Thread.sleep(1000)
 4 fdelay.run                                        //> run...
 5                                                   //| res1: Long = 1465910525881
 6 fapply.run                                        //> run...
 7                                                   //| res2: Long = 1465910525883
 8 Thread.sleep(1000)
 9 fapply.run                                        //> run...
10                                                   //| res3: Long = 1465910526884

scalaz Future只有在运算时才会产生副作用,而且可以多次运算。

我们可以用即时(blocking)、异步、定时方式来运算Future:

 1 fapply.unsafePerformSync                          //> run...
 2                                                   //| res4: Long = 1465958049118
 3 fapply.unsafePerformAsync {
 4   case a => println(a)
 5 }
 6 Thread.sleep(1000)
 7 fapply.unsafePerformSyncFor(1 second)             //> run...
 8                                                   //| 1465958051126
 9                                                   //| run...
10                                                   //| res5: Long = 1465958052172

结构化状态Async代表了scalaz Future的多线程处理特性:

/**
   * Create a `Future` from an asynchronous computation, which takes the form
   * of a function with which we can register a callback. This can be used
   * to translate from a callback-based API to a straightforward monadic
   * version. See `Task.async` for a version that allows for asynchronous
   * exceptions.
   */
  def async[A](listen: (A => Unit) => Unit): Future[A] =
    Async((cb: A => Trampoline[Unit]) => listen { a => cb(a).run })

  /** Create a `Future` that will evaluate `a` using the given `ExecutorService`. */
  def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb =>
    pool.submit { new Callable[Unit] { def call = cb(a).run }}
  }

  /** Create a `Future` that will evaluate `a` after at least the given delay. */
  def schedule[A](a: => A, delay: Duration)(implicit pool: ScheduledExecutorService =
      Strategy.DefaultTimeoutScheduler): Future[A] =
    Async { cb =>
      pool.schedule(new Callable[Unit] {
        def call = cb(a).run
      }, delay.toMillis, TimeUnit.MILLISECONDS)
    }

我们看到apply和schedule在构建Future时对运算线程进行了配置。

如果我们需要模仿scala Future的功效可以用unsafeStart:

1 val fs = fapply.unsafeStart              //> run...
2                                          //| fs  : scalaz.concurrent.Future[Long] = Suspend(<function0>)
3 fs.run                                   //> res6: Long = 1465958922401
4 Thread.sleep(1000)
5 fs.run                                   //> res7: Long = 1465958922401

我们也可以用scala Future的callback方式用async函数把自定义的callback挂在构建的Future上:

1 def fu(t: Long): Future[String] =
2   Future.async[String]{k => k(s"the curreent time is: ${t.toString}!!!")}
3                                                   //> fu: (t: Long)scalaz.concurrent.Future[String]
4 fu(System.currentTimeMillis()).run                //> res8: String = the curreent time is: 1465958923415!!!

scala Future和scalaz Future之间可以相互转换:

 1 import scala.concurrent.{Future => sFuture}
 2 import scala.concurrent.ExecutionContext
 3 import scala.util.{Success,Failure}
 4 def futureTozFuture[A](sf: sFuture[A])(implicit ec: ExecutionContext): Future[A] =
 5   Future.async {cb => sf.onComplete {
 6     case Success(a) => cb(a)
 7 //    case Failure(e) => cb(e)
 8   }}                            //> futureTozFuture: [A](sf: scala.concurrent.Future[A])(implicit ec: scala.con
 9                                 //| current.ExecutionContext)scalaz.concurrent.Future[A]
10 def zFutureTosFuture[A](zf: Future[A]): sFuture[A] = {
11   val prom = scala.concurrent.Promise[A]
12   zf.unsafePerformAsync {
13      case a => prom.success(a)是
14   }
15   prom.future
16 }

突然发现scalaz Future是没有异常处理(exception)功能的。scalaz提供了concurrent.Task类型填补了Future的这部分缺陷。我们会在下篇讨论Task。
我们用上面scala Future的例子来示范scalaz Future的函数组合能力:

 1   def dbl(i: Int): Future[Int] = Future { i + i } //> dbl: (i: Int)scalaz.concurrent.Future[Int]
 2   def sqr(i: Int): Future[Int] = Future { i * i } //> sqr: (i: Int)scalaz.concurrent.Future[Int]
 3   def sum(a: Int, b: Int): Future[Int] = Future { a + b }
 4                                   //> sum: (a: Int, b: Int)scalaz.concurrent.Future[Int]
 5   val fsum = for {
 6     a <- dbl(3)
 7     b <- sqr(a)
 8     c <- sum(a,b)
 9   } yield c                       //> fsum  : scalaz.concurrent.Future[Int] = BindAsync(<function1>,<function1>)
10
11   fsum.unsafePerformAsync {
12     case a => println(s"result c is:$a")
13   }
14   Thread.sleep(1000)              //> result c is:42
时间: 2024-10-29 19:07:21

Scalaz(44)- concurrency :scalaz Future,尚不完整的多线程类型的相关文章

Scalaz(8)- typeclass:Monoid and Foldable

Monoid是种最简单的typeclass类型.我们先看看scalaz的Monoid typeclass定义:scalaz/Monoid.scala 1 trait Monoid[F] extends Semigroup[F] { self => 2 //// 3 /** The identity element for `append`. */ 4 def zero: F 5 ... Monoid trait又继承了Semigroup:scalaz/Semigroup.scala 1 trai

Scalaz(47)- scalaz-stream: 深入了解-Source

scalaz-stream库的主要设计目标是实现函数式的I/O编程(functional I/O).这样用户就能使用功能单一的基础I/O函数组合成为功能完整的I/O程序.还有一个目标就是保证资源的安全使用(resource safety):使用scalaz-stream编写的I/O程序能确保资源的安全使用,特别是在完成一项I/O任务后自动释放所有占用的资源包括file handle.memory等等.我们在上一篇的讨论里笼统地解释了一下scalaz-stream核心类型Process的基本情况,

Scalaz(49)- scalaz-stream: 安全的无穷运算-running infinite stream freely

scalaz-stream支持无穷数据流(infinite stream),这本身是它强大的功能之一,试想有多少系统需要通过无穷运算才能得以实现.这是因为外界的输入是不可预料的,对于系统本身就是无穷的,比如键盘鼠标输入什么时候终止.网站上有多少网页.数据库中还有多少条记录等等.但对无穷数据流的运算又引发了新的挑战.我们知道,fp程序的主要运算方式是递归算法,这是个问题产生的源泉:极容易掉入StackOverflowError陷阱.相信许多人对scalaz-stream如何实现无穷数据的运算安全都

Scalaz(39)- Free :a real monadic program

一直感觉FP比较虚,可能太多学术性的东西,不知道如何把这些由数学理论在背后支持的一套全新数据类型和数据结构在现实开发中加以使用.直到Free Monad,才真正感觉能用FP方式进行编程了.在前面我们已经花了不小篇幅来了解Free Monad,这次我想跟大家讨论一下用Free Monad来编写一个真正能运行的完整应用程序.当然,这个程序必须具备FP特性,比如函数组合(function composition),纯代码(pure code),延迟副作用(delayed side effect)等等.

Scalaz(53)- scalaz-stream: 程序运算器-application scenario

从上面多篇的讨论中我们了解到scalaz-stream代表一串连续无穷的数据或者程序.对这个数据流的处理过程就是一个状态机器(state machine)的状态转变过程.这种模式与我们通常遇到的程序流程很相似:通过程序状态的变化来推进程序进展.传统OOP式编程可能是通过一些全局变量来记录当前程序状态,而FP则是通过函数组合来实现状态转变的.这个FP模式讲起来有些模糊和抽象,但实际上通过我们前面长时间对FP编程的学习了解到FP编程讲究避免使用任何局部中间变量,更不用说全局变量了.FP程序的数据A是

Scalaz(33)- Free :算式-Monadic Programming

在任何模式的编程过程中都无法避免副作用的产生.我们可以用F[A]这种类型模拟FP的运算指令:A是可能产生副作用的运算,F[_]是个代数数据类型ADT(Algebraic Data Type),可以实现函数组合(functional composition),我们可以不用理会A,先用F[_]来组合形成描述功能的抽象程序AST(Abstract Syntax Tree),对A的运算可以分开另一个过程去实现,而且可以有多种的运算实现方式,这样就达到了算式AST(Monadic Programming)

Scalaz(38)- Free :Coproduce-Monadic语句组合

很多函数式编程爱好者都把FP称为Monadic Programming,意思是用Monad进行编程.我想FP作为一种比较成熟的编程模式,应该有一套比较规范的操作模式吧.因为Free能把任何F[A]升格成Monad,所以Free的算式(AST).算法(Interpreter)关注分离(separation of concern)模式应该可以成为一种规范的FP编程模式.我们在前面的几篇讨论中都涉及了一些AST的设计和运算,但都是一些功能单一,离散的例子.如果希望通过Free获取一个完整可用的程序,就

Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model

长久以来,函数式编程模式都被认为是一种学术研究用或教学实验用的编程模式.直到近几年由于大数据和多核CPU的兴起造成了函数式编程模式在一些实际大型应用中的出现,这才逐渐改变了人们对函数式编程无用论的观点.通过一段时间对函数式编程方法的学习,我们了解到Free Monad的算式/算法关注分离(separation of concern)可以是一种很实用的函数式编程模式.用Free Monad编写的程序容易理解并具备良好的可维护性.scalaz-stream的流程控制和多线程运算模式可以实现程序的安全

Scalaz(1)- 基础篇:隐式转换解析策略-Implicit resolution

在正式进入scalaz讨论前我们需要理顺一些基础的scalaz结构组成概念和技巧.scalaz是由即兴多态(ad-hoc polymorphism)类型(typeclass)组成.scalaz typeclass在scala中的应用有赖于scala compiler的一项特别功能:隐式转换(implicit conversion),使程序表述更精简.由于隐式转换是一项compiler功能,在程序编译(compile)的时候是由compiler来进行类型转换代码的产生和替代的. 让我们先了解一下作