scala已经配备了自身的Future类。我们先举个例子来了解scala Future的具体操作:
1 import scala.concurrent._ 2 import ExecutionContext.Implicits.global 3 object scalafuture { 4 def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000) ; i + i } 5 //> dbl: (i: Int)scala.concurrent.Future[Int] 6 val fdbl = dbl(3) //> fdbl : scala.concurrent.Future[Int] = List() 7 fdbl.onSuccess { 8 case a => println(s"${a/2} + ${a/2} = $a") 9 } 10 println("calculating ...") //> calculating ... 11 Thread.sleep(2000) //> 3 + 3 = 6 12 }
这是一个标准的异步运算;在成功完成运算事件上绑定callback来获取在其它线程中的运算结果。我们也可以进行异常处理:
1 val fdz = Future { 3 / 0 } //> fdz : scala.concurrent.Future[Int] = List() 2 fdz.onFailure { 3 case e => println(s"error message {${e.getMessage}}") 4 } 5 Thread.sleep(100) //> error message {/ by zero}
又或者同时绑定运算成功和失败事件的callback函数:
1 import scala.util.{Success, Failure} 2 fdz onComplete { 3 case Success(a) => println(s"${a/2} + ${a/2} = $a") 4 case Failure(e) => println(s"error message {${e.getMessage}}") 5 } 6 Thread.sleep(100) //> error message {/ by zero}
scala Future 实现了flatMap,我们可以把几个Future组合起来用:
1 def dbl(i: Int): Future[Int] = Future { Thread.sleep(1000); i + i } 2 //> dbl: (i: Int)scala.concurrent.Future[Int] 3 def sqr(i: Int): Future[Int] = Future { i * i } //> sqr: (i: Int)scala.concurrent.Future[Int] 4 def sum(a: Int, b: Int): Future[Int] = Future { a + b } 5 //> sum: (a: Int, b: Int)scala.concurrent.Future[Int] 6 val fsum = for { 7 a <- dbl(3) 8 b <- sqr(a) 9 c <- sum(a,b) 10 } yield c //> fsum : scala.concurrent.Future[Int] = List() 11 12 fsum onSuccess { case c => println(s"the combined result is: $c") } 13 Thread.sleep(2000) //> the combined result is: 42
scala Future利用flatMap实现了流程运算:先运算dbl再sqr再sum,这个顺序是固定的即使它们可能在不同的线程里运算,因为sqr依赖dbl的结果,而sum又依赖dbl和sqr的结果。
好了,既然scala Future的功能已经比较完善了,那么scalaz的Future又有什么不同的特点呢?首先,细心一点可以发现scala Future是即时运算的,从下面的例子里可以看出:
1 import scala.concurrent.duration._ 2 val fs = Future {println("run now..."); System.currentTimeMillis() } 3 //> run now... 4 //| fs : scala.concurrent.Future[Long] = List() 5 Await.result(fs, 1.second) //> res0: Long = 1465907784714 6 Thread.sleep(1000) 7 Await.result(fs, 1.second) //> res1: Long = 1465907784714
可以看到fs是在Future构建时即时运算的,而且只会运算一次。如果scala Future中包括了能产生副作用的代码,在构建时就会立即产生副作用。所以我们是无法使用scala Future来编写纯函数的,那么在scalaz里就必须为并发编程提供一个与scala Future具同等功能但又不会立即产生副作用的类型了,这就是scalaz版本的Future。我们看看scalaz是如何定义Future的:scalaz.concurrent/Future.scala
sealed abstract class Future[+A] { ... object Future { case class Now[+A](a: A) extends Future[A] case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A] case class Suspend[+A](thunk: () => Future[A]) extends Future[A] case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B] case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit, f: A => Future[B]) extends Future[B] ...
Future[A]就是个Free Monad。它的结构化表达方式分别有Now,Async,Suspend,BindSuspend,BindAsync。我们可以用这些结构实现flatMap函数,所以Future就是Free Monad:
def flatMap[B](f: A => Future[B]): Future[B] = this match { case Now(a) => Suspend(() => f(a)) case Suspend(thunk) => BindSuspend(thunk, f) case Async(listen) => BindAsync(listen, f) case BindSuspend(thunk, g) => Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f))) case BindAsync(listen, g) => Suspend(() => BindAsync(listen, g andThen (_ flatMap f))) }
free structure类型可以支持算式/算法关注分离,也就是说我们可以用scalaz Future来描述程序功能而不涉及正真运算。scalaz Future的构建方式如下:
1 import scalaz._ 2 import Scalaz._ 3 import scalaz.concurrent._ 4 import scala.concurrent.duration._ 5 object scalazFuture { 6 val fnow = Future.now {println("run..."); System.currentTimeMillis()} 7 //> run... 8 //| fnow : scalaz.concurrent.Future[Long] = Now(1465909860301) 9 val fdelay = Future.delay {println("run..."); System.currentTimeMillis()} 10 //> fdelay : scalaz.concurrent.Future[Long] = Suspend(<function0>) 11 val fapply = Future {println("run..."); System.currentTimeMillis()} 12 //> fapply : scalaz.concurrent.Future[Long] = Async(<function1>)
可以看到fnow是个即时运算的构建器,而这个now就是一个lift函数, 它负责把一个普通无副作用运算升格成Future。fdelay,fapply分别把运算存入trampoline进行结构化了。我们必须另外运算trampoline来运行结构内的运算:
1 fdelay.run //> run... 2 //| res0: Long = 1465910524847 3 Thread.sleep(1000) 4 fdelay.run //> run... 5 //| res1: Long = 1465910525881 6 fapply.run //> run... 7 //| res2: Long = 1465910525883 8 Thread.sleep(1000) 9 fapply.run //> run... 10 //| res3: Long = 1465910526884
scalaz Future只有在运算时才会产生副作用,而且可以多次运算。
我们可以用即时(blocking)、异步、定时方式来运算Future:
1 fapply.unsafePerformSync //> run... 2 //| res4: Long = 1465958049118 3 fapply.unsafePerformAsync { 4 case a => println(a) 5 } 6 Thread.sleep(1000) 7 fapply.unsafePerformSyncFor(1 second) //> run... 8 //| 1465958051126 9 //| run... 10 //| res5: Long = 1465958052172
结构化状态Async代表了scalaz Future的多线程处理特性:
/** * Create a `Future` from an asynchronous computation, which takes the form * of a function with which we can register a callback. This can be used * to translate from a callback-based API to a straightforward monadic * version. See `Task.async` for a version that allows for asynchronous * exceptions. */ def async[A](listen: (A => Unit) => Unit): Future[A] = Async((cb: A => Trampoline[Unit]) => listen { a => cb(a).run }) /** Create a `Future` that will evaluate `a` using the given `ExecutorService`. */ def apply[A](a: => A)(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Future[A] = Async { cb => pool.submit { new Callable[Unit] { def call = cb(a).run }} } /** Create a `Future` that will evaluate `a` after at least the given delay. */ def schedule[A](a: => A, delay: Duration)(implicit pool: ScheduledExecutorService = Strategy.DefaultTimeoutScheduler): Future[A] = Async { cb => pool.schedule(new Callable[Unit] { def call = cb(a).run }, delay.toMillis, TimeUnit.MILLISECONDS) }
我们看到apply和schedule在构建Future时对运算线程进行了配置。
如果我们需要模仿scala Future的功效可以用unsafeStart:
1 val fs = fapply.unsafeStart //> run... 2 //| fs : scalaz.concurrent.Future[Long] = Suspend(<function0>) 3 fs.run //> res6: Long = 1465958922401 4 Thread.sleep(1000) 5 fs.run //> res7: Long = 1465958922401
我们也可以用scala Future的callback方式用async函数把自定义的callback挂在构建的Future上:
1 def fu(t: Long): Future[String] = 2 Future.async[String]{k => k(s"the curreent time is: ${t.toString}!!!")} 3 //> fu: (t: Long)scalaz.concurrent.Future[String] 4 fu(System.currentTimeMillis()).run //> res8: String = the curreent time is: 1465958923415!!!
scala Future和scalaz Future之间可以相互转换:
1 import scala.concurrent.{Future => sFuture} 2 import scala.concurrent.ExecutionContext 3 import scala.util.{Success,Failure} 4 def futureTozFuture[A](sf: sFuture[A])(implicit ec: ExecutionContext): Future[A] = 5 Future.async {cb => sf.onComplete { 6 case Success(a) => cb(a) 7 // case Failure(e) => cb(e) 8 }} //> futureTozFuture: [A](sf: scala.concurrent.Future[A])(implicit ec: scala.con 9 //| current.ExecutionContext)scalaz.concurrent.Future[A] 10 def zFutureTosFuture[A](zf: Future[A]): sFuture[A] = { 11 val prom = scala.concurrent.Promise[A] 12 zf.unsafePerformAsync { 13 case a => prom.success(a)是 14 } 15 prom.future 16 }
突然发现scalaz Future是没有异常处理(exception)功能的。scalaz提供了concurrent.Task类型填补了Future的这部分缺陷。我们会在下篇讨论Task。
我们用上面scala Future的例子来示范scalaz Future的函数组合能力:
1 def dbl(i: Int): Future[Int] = Future { i + i } //> dbl: (i: Int)scalaz.concurrent.Future[Int] 2 def sqr(i: Int): Future[Int] = Future { i * i } //> sqr: (i: Int)scalaz.concurrent.Future[Int] 3 def sum(a: Int, b: Int): Future[Int] = Future { a + b } 4 //> sum: (a: Int, b: Int)scalaz.concurrent.Future[Int] 5 val fsum = for { 6 a <- dbl(3) 7 b <- sqr(a) 8 c <- sum(a,b) 9 } yield c //> fsum : scalaz.concurrent.Future[Int] = BindAsync(<function1>,<function1>) 10 11 fsum.unsafePerformAsync { 12 case a => println(s"result c is:$a") 13 } 14 Thread.sleep(1000) //> result c is:42