Future and Promise
Future is a sort of placeholder object that you create for a result that does not exist. Generally, the result of the Future is computed concurrently and can be later collected. Composing concurrent tasks in this way tends to faster, asynchronous, non-blocking parallel code.
By default, futures and promises are non-blocking, make use of callbacks instead of typical blocking operations. To simplify the use of callbacks both syntactically(语法上) and conceptually, scala provides combinators(组合子) such as flatMap, foreach and filter used to compose future in a non-blocking way.
Future
A future is an object holding a value which will be available at some point. This value is usually the result of some other computation. The state of future could be complete(with a value or exception) and uncomplete. Complete can take one of two forms: success(return a value) or failed(return a exception).
Future cannot be rewrite, it is immutable.
import scala.concurrent._ // make future available
import ExcecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = future {
session.getFriends()
}
if session is null, then future will throw a nullpointexception. About executionContext:
Execution context execute tasks submitted to them, and you can think executions contexts as thread pools. They are essential for the future method because they handle how and when the asynchronous computation is executed. Another example
val firstOccurrence: Future[Int] = future {
val source = scala.io.Source.fromFile("myText.txt")
source.toSeq.indexOfSlice("myKeyword")
}
Callbacks
A better way to do it is a complete non-blocking way, by registering a callback on the future. The callback is called asynchronously once the future is available. The callback function may be executed asynchronously or sequentially on the same thread.
The most general form of registering a callback is by using the onComplete method which takes a callback function of type Try[T] => U. The callback is applied to the value of type Success[T] if the future completes successfully, or to a value of type Failure[T] otherwise.
Try[T] is similar to Option[T] or either[T, S], in that it is a monad that potentially holding some type. Option can be a Some[T] or None. Try[T] is Success[T] when it holds a value and otherwise Failure[T]. Failure[T] holds more information that just a plain NONE by saying why the value is not there.
import scala.util.{Success, Failure}
val f: Future[List[String]] = future {
session.getRecentPosts
}
f onComplete {
case Success(posts) => for (post <- posts) println(post)
case Failure(t) => println("Error")
}
val f: Future[List[String]] = future {
session.getRecentPosts
}
f onSuccess {
case posts => for(post <- posts) println(post)
}
f onFailure {
case t => println("Error: " + t.getMessage)
}
?Since partial functions have the isDefinedAt method, the onFailure trigger callback if it is defined for a particular Throwable.
val f = future {
2 / 0
}
f onFailure { // this failure never be triggered
case npe: NullPointerException =>
println("I‘d be amazed if this printed out.")
}
*return unit The onComplete, onSuccess and onFailure methods have the result type Unit, which means the invocation of these methods cannot be chained. Note that this design is intentional because callbacks registered on the same future are unordered.
Functional Composition and For-Comprehension
内嵌式组合
val rateQuate = future {
connection.getCurrentValue(USD)
}
rateQuote onSuccess { case quote =>
val purchase = future {
if(isProfitable(quote)) connection.buy(amout, quote)
else throw new Exception("not profitable")
}
purchase onSuccess {
case _ => println("Purchase " + amount + " USD")
}
}
This works, but it is inconvenient for two reasons. First, we have to use onSuccess, and we have to nest the second purchase future within it. Second, purchase is within rateQuote onSuccess so the rest part of this code cannot see this variable. For this two reasons, futures combinators which allow a more straightforward composition. Using map:
val rateQute = future {...}
val purchase = rateQuote map { quote =>
if(isProfitable(quote)) connection.buy(amout, quote)
else throw new Exception("not profitable")
}
purchase onSuccess {
case _ => println("Purchased " + amount + " USD")
}
By using map on rateQuote we have eliminated one onSuccess callback and, more importantly, the nesting. But map still cannot deal with exception.
So, we need flatMap, filter and foreach combinators.
val usdQuote = future {connection.getCurrentValue(USD)}
val chfQuote = future {connection.getCurrentValue{CHF}}
val purchase = for {
usd <- usdQuote
chf <- chfQuote
if isProfitable(usd, chf)
} yield connection.buy(amout, chf)
The purchase future is completed only once both usdQuote and chfQuote are completed.
另一个例子,使用了多个组合子,虽然不常用,但是对理解组合子很有帮助
val purchase = usdQuote flatMap {
usd =>
chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf))
}
flatMap operation maps its own value into some other future. Once this different future is completed, the resulting future is completed with its value. 使用到flatMap的地方就是看不懂
the filter combinator creates a new future which contains the value of the original future only if it satisfies some predicate.
val purchase : Future[Int] = rateQuote map {
quote => connection.buy(amount, quote)
} recover {
case QuoteChangeException() => 0
}
QuoteChangeException是自己编出来的,代表一类异常。
the recover combinator create new future which hold the same result as the original future if it complete successfully. If it did not the partial function will be applied to Throwable which failed the future. It maps the throwable to some value and the new future is completed. If the partial function is not defined on that throwable, then original exception will be returned.
recoveryWith to recover is similar to that of flatMap to map.
combinator fallbackto create a new future holds the original result if it complete successfully, or otherwise the successful result of the argument future. If both failed, the new future is complete with this exception(?).
val usedQuote = future {
connection.getCurrentValue(USD)
} map {
usd => "Value: " + usd + "$"
}
val chfQuote = future {
connection.getCurrentValue(CHF)
} map {
chf => "Value: " + chf + "CHF"
}
val anyQuote = usdQuote fallbackto chfQuote
anyQuote onSuccess {println(_)}
andThen, it return new future with the exactly same result as the current future, regardless of whether the current future is success or not.
val allPosts = mutable.Set[String]()
future {
session.getRecentPosts
} andThen {
posts => allPosts ++= posts
} andThen {
posts =>
clearAll()
for(post <- posts) render(post)
}
In summary, the combinator on futures are purely functional. It returns a future which is related to the future it derived form.
Projections
val f = future {
2 / 0
}
for(exc <- f.failed) print(exc) // 会打印出exception信息
Promise
future can also be created from promise.
A promise can be thought of as a writable, single-assignment container, which completes a future.
That is, a promise can be used to successfully complete a future with a value using the success method. (当value就绪时,future就绪)
Conversely, a promise can also to be used to complete a future with a exception, by failing the promise, using the failure method. (这时候r返回exception吧)
A promise p completes the future returned by p.future. This future is specific to the promise p. Depending on the implementation, it may be the case that p.future eq p. (promise是一个容器,可以用来传递消息)
import scala.concurrent.{future, promise}
import scala.concurrent.ExecutionContext.Implicits.global
val p = promise[T]
val f = p.future
val producer = future {
val r = produceSomething()
p success r //当r就绪时p.future就绪
continueDoingSomethingUnrelated()
}
val consumer = future {
startDoingSomething()
f onSuccess {
case r => doSomethingWithResult()
}
}
//producer will produce failure
val producer = future {
if(isValid(r))
p failure (new IllegalStateException)
else {
val q = doSomethingComputation(r)
p success q
}
}
Calling success on a promise that has already been completed(or failed) will throw an IllegalStateException
One nice property of programs written using promises with operations described so far and futures which are composed through monadic operations without side-effects is that these programs are deterministic. Deterministic here means that, given that no exception is thrown in the program, the result of the program(values observed in the futures) will always be the same, regardless of the execution schedule of the parallel program.
the method completeWith completes the promise with another future. After the future is completed, the promise gets completed with the result of that future as well.
val f = future {1}
val p = promise[Int]
p completeWith f
p.future onSuccess {
case x => println(x)
}
Using promise, the onComplete method of the futures and the future construct you can implement any of the functional composition combinators described earlier.
def first[T](f: Future[T], g: Future[T]): Future[T] = {
val p = promise[T]
//不知道下面的代码能不能换成completeWith
f onSuccess {
case x => p.trySuccess(x)
}
g onSuccess {
case x => p.trySuccess(x)
}
p.future
}
Utilities
Duration在Akka库中被广泛使用,这是一个特殊的数据类型,这个类型的数据类型可以表示为无线(Duration.inf)Duration.MinusInf或有限的时间段
使用scala,时间段可以通过一个小型DSL来创建,并支持所有的期望类型
import akka.util.duration._
val fivesec = 5.seconds
val threemillis = 3.millis
val diff = fivesec - threemillis
assert(diff < fivesec)
val fourmills = threemillis * 4 / 3
val n = threemillis / (1 millisecond)
Abstract Duration contains methods that allow:
conversion to different time units(toNano, toMicros, toMills, toSecond, toMinutes)
Comparison of durations(<, <=, >)
Arithmetic operations(+, -, *)
Minimum and maximum (max, min)
Check if the duration is finite
Duration
Implicitly from types Int and Long. val d = 100 millis
By passing a Long length and a TimeUnit type. val d = Duration(100, MILLISECONDS)
by parsing a string that represent a time period. val d = Duration(“1.2 us”)
val d1 = Duration(100, MIILISECONDS)
val d2 = Duration(100, "millis")
val d3 = 100 millis
val d4 = Duration("1.2us")
// pattern matching
val Duration(length, unit) = 5 millis
Deadline
Duration有一个姐妹类,叫做Deadline,表示一个绝对的时间点,并支持计算当前时间到deadline之间的差距来生成Duration
val deadline = 10 seconds fromNow
awaitCond(..., deadline.timeLeft)
这些类在java中都有对应的实现。