future and promise

Future and Promise

Future is a sort of placeholder object that you create for a result that does not exist. Generally, the result of the Future is computed concurrently and can be later collected. Composing concurrent tasks in this way tends to faster, asynchronous, non-blocking parallel code.

By default, futures and promises are non-blocking, make use of callbacks instead of typical blocking operations. To simplify the use of callbacks both syntactically(语法上) and conceptually, scala provides combinators(组合子) such as flatMap, foreach and filter used to compose future in a non-blocking way.

Future

A future is an object holding a value which will be available at some point. This value is usually the result of some other computation. The state of future could be complete(with a value or exception) and uncomplete. Complete can take one of two forms: success(return a value) or failed(return a exception).
Future cannot be rewrite, it is immutable.

import scala.concurrent._ // make future available
import ExcecutionContext.Implicits.global

val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = future {
    session.getFriends()
}

if session is null, then future will throw a nullpointexception. About executionContext:
Execution context execute tasks submitted to them, and you can think executions contexts as thread pools. They are essential for the future method because they handle how and when the asynchronous computation is executed. Another example

val firstOccurrence: Future[Int] = future {
    val source = scala.io.Source.fromFile("myText.txt")
    source.toSeq.indexOfSlice("myKeyword")
}

Callbacks

A better way to do it is a complete non-blocking way, by registering a callback on the future. The callback is called asynchronously once the future is available. The callback function may be executed asynchronously or sequentially on the same thread.

The most general form of registering a callback is by using the onComplete method which takes a callback function of type Try[T] => U. The callback is applied to the value of type Success[T] if the future completes successfully, or to a value of type Failure[T] otherwise.

Try[T] is similar to Option[T] or either[T, S], in that it is a monad that potentially holding some type. Option can be a Some[T] or None. Try[T] is Success[T] when it holds a value and otherwise Failure[T]. Failure[T] holds more information that just a plain NONE by saying why the value is not there.

import scala.util.{Success, Failure}

val f: Future[List[String]] = future {
    session.getRecentPosts
}
f onComplete {
    case Success(posts) => for (post <- posts) println(post)
    case Failure(t) => println("Error")
}
val f: Future[List[String]] = future {
    session.getRecentPosts
}

f onSuccess {
    case posts => for(post <- posts) println(post)
}

f onFailure {
    case t => println("Error: " + t.getMessage)
}

?Since partial functions have the isDefinedAt method, the onFailure trigger callback if it is defined for a particular Throwable.

val f = future {
    2 / 0
}

f onFailure { // this failure never be triggered
    case npe: NullPointerException =>
        println("I‘d be amazed if this printed out.")
}

*return unit The onComplete, onSuccess and onFailure methods have the result type Unit, which means the invocation of these methods cannot be chained. Note that this design is intentional because callbacks registered on the same future are unordered.

Functional Composition and For-Comprehension

内嵌式组合

val rateQuate = future {
    connection.getCurrentValue(USD)
}

rateQuote onSuccess { case quote =>
    val purchase = future {
        if(isProfitable(quote)) connection.buy(amout, quote)
        else throw new Exception("not profitable")
    }

    purchase onSuccess {
        case _ => println("Purchase " + amount + " USD")
    }
}

This works, but it is inconvenient for two reasons. First, we have to use onSuccess, and we have to nest the second purchase future within it. Second, purchase is within rateQuote onSuccess so the rest part of this code cannot see this variable. For this two reasons, futures combinators which allow a more straightforward composition. Using map:

val rateQute = future {...}

val purchase = rateQuote map { quote =>
    if(isProfitable(quote)) connection.buy(amout, quote)
    else throw new Exception("not profitable")
}

purchase onSuccess {
    case _ => println("Purchased " + amount + " USD")
}

By using map on rateQuote we have eliminated one onSuccess callback and, more importantly, the nesting. But map still cannot deal with exception.

So, we need flatMap, filter and foreach combinators.

val usdQuote = future {connection.getCurrentValue(USD)}
val chfQuote = future {connection.getCurrentValue{CHF}}

val purchase = for {
    usd <- usdQuote
    chf <- chfQuote
    if isProfitable(usd, chf)
} yield connection.buy(amout, chf)

The purchase future is completed only once both usdQuote and chfQuote are completed.

另一个例子,使用了多个组合子,虽然不常用,但是对理解组合子很有帮助

val purchase = usdQuote flatMap {
    usd =>
    chfQuote
        .withFilter(chf => isProfitable(usd, chf))
        .map(chf => connection.buy(amount, chf))
}

flatMap operation maps its own value into some other future. Once this different future is completed, the resulting future is completed with its value. 使用到flatMap的地方就是看不懂
the filter combinator creates a new future which contains the value of the original future only if it satisfies some predicate.

val purchase : Future[Int] = rateQuote map {
    quote => connection.buy(amount, quote)
} recover {
    case QuoteChangeException() => 0
}

QuoteChangeException是自己编出来的,代表一类异常。
the recover combinator create new future which hold the same result as the original future if it complete successfully. If it did not the partial function will be applied to Throwable which failed the future. It maps the throwable to some value and the new future is completed. If the partial function is not defined on that throwable, then original exception will be returned.
recoveryWith to recover is similar to that of flatMap to map.

combinator fallbackto create a new future holds the original result if it complete successfully, or otherwise the successful result of the argument future. If both failed, the new future is complete with this exception(?).

val usedQuote = future {
    connection.getCurrentValue(USD)
} map {
    usd => "Value: " + usd + "$"
}

val chfQuote = future {
    connection.getCurrentValue(CHF)
} map {
    chf => "Value: " + chf + "CHF"
}

val anyQuote = usdQuote  fallbackto chfQuote

anyQuote onSuccess {println(_)}

andThen, it return new future with the exactly same result as the current future, regardless of whether the current future is success or not.

val allPosts = mutable.Set[String]()

future {
    session.getRecentPosts
} andThen {
    posts => allPosts ++= posts
} andThen {
    posts =>
    clearAll()
    for(post <- posts) render(post)
}

In summary, the combinator on futures are purely functional. It returns a future which is related to the future it derived form.

Projections

val f = future {
    2 / 0
}
for(exc <- f.failed) print(exc) // 会打印出exception信息

Promise

future can also be created from promise.
A promise can be thought of as a writable, single-assignment container, which completes a future.

That is, a promise can be used to successfully complete a future with a value using the success method. (当value就绪时,future就绪)

Conversely, a promise can also to be used to complete a future with a exception, by failing the promise, using the failure method. (这时候r返回exception吧)
A promise p completes the future returned by p.future. This future is specific to the promise p. Depending on the implementation, it may be the case that p.future eq p. (promise是一个容器,可以用来传递消息)

import scala.concurrent.{future, promise}
import scala.concurrent.ExecutionContext.Implicits.global

val p = promise[T]
val f = p.future

val producer = future {
    val r = produceSomething()
    p success r //当r就绪时p.future就绪
    continueDoingSomethingUnrelated()
}

val consumer = future {
    startDoingSomething()
    f onSuccess {
        case r => doSomethingWithResult()
    }
}

//producer will produce failure
val producer = future {
    if(isValid(r))
        p failure (new IllegalStateException)
    else {
        val q = doSomethingComputation(r)
        p success q
    }
}

Calling success on a promise that has already been completed(or failed) will throw an IllegalStateException

One nice property of programs written using promises with operations described so far and futures which are composed through monadic operations without side-effects is that these programs are deterministic. Deterministic here means that, given that no exception is thrown in the program, the result of the program(values observed in the futures) will always be the same, regardless of the execution schedule of the parallel program.

the method completeWith completes the promise with another future. After the future is completed, the promise gets completed with the result of that future as well.

val f = future {1}
val p = promise[Int]

p completeWith f

p.future onSuccess {
    case x => println(x)
}

Using promise, the onComplete method of the futures and the future construct you can implement any of the functional composition combinators described earlier.

def first[T](f: Future[T], g: Future[T]): Future[T] = {
    val p = promise[T]

    //不知道下面的代码能不能换成completeWith
    f onSuccess {
        case x => p.trySuccess(x)
    }
    g onSuccess {
        case x => p.trySuccess(x)
    }
    p.future
}

Utilities

Duration在Akka库中被广泛使用,这是一个特殊的数据类型,这个类型的数据类型可以表示为无线(Duration.inf)Duration.MinusInf或有限的时间段

使用scala,时间段可以通过一个小型DSL来创建,并支持所有的期望类型

import akka.util.duration._

val fivesec = 5.seconds
val threemillis = 3.millis

val diff = fivesec - threemillis
assert(diff < fivesec)
val fourmills = threemillis * 4 / 3
val n = threemillis / (1 millisecond)

Abstract Duration contains methods that allow:

conversion to different time units(toNano, toMicros, toMills, toSecond, toMinutes)
Comparison of durations(<, <=, >)
Arithmetic operations(+, -, *)
Minimum and maximum (max, min)
Check if the duration is finite

Duration

Implicitly from types Int and Long. val d = 100 millis
By passing a Long length and a TimeUnit type. val d = Duration(100, MILLISECONDS)
by parsing a string that represent a time period. val d = Duration(“1.2 us”)

val d1 = Duration(100, MIILISECONDS)
val d2 = Duration(100, "millis")
val d3 = 100 millis
val d4 = Duration("1.2us")

// pattern matching
val Duration(length, unit) = 5 millis

Deadline
Duration有一个姐妹类,叫做Deadline,表示一个绝对的时间点,并支持计算当前时间到deadline之间的差距来生成Duration

val deadline = 10 seconds fromNow
awaitCond(..., deadline.timeLeft)

这些类在java中都有对应的实现。

时间: 2024-10-06 09:10:11

future and promise的相关文章

Netty5源码分析(七) -- 异步执行Future和Promise

java.util.concurrent.Future是Java提供的接口,表示异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成就返回结果,否则阻塞线程,直到任务完成. // Java FutureTask.get() public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); r

Future和Promise

Future用于获取异步操作的结果,而Promise则比较抽象,无法直接猜测出其功能. Future Future最早来源于JDK的java.util.concurrent.Future,它用于代表异步操作的结果. 可以通过get方法获取操作结果,如果操作尚未完成,则会同步阻塞当前调用的线程:如果不允许阻塞太长时间或者无限期阻塞,可以通过带超时时间的get方法获取结果:如果到达超时时间操作仍然没有完成,则抛出TimeoutException.通过isDone()方法可以判断当前的异步操作是否完成

[netty4][netty-common]Future与Promise分析

接口与类结构体系 -- [I]java.util.concurrent.Future<V> ---- [I]io.netty.util.concurrent.Future<V> ------ [AC]AbstractFuture, [I]ChannelFuture, [I]Promise -- [AC]AbstractFuture, [I]Promise -- [I]ChannelFuture, [I]Promise ---- DefaultPromise ---- [I]Chan

7. Netty源码分析之Future和Promise

一.Future Future源自java.util.concurrent.Future,用于获取异步操作的结果,它通过get()方法获取异步操作结果,操作尚未完成,则阻塞. Netty认为这是一个很不好的设计,操作结束时间难以确定,何不通过回调的方式获取结果呢.Netty的ChannelFuture通过监听的方式,当操作结束时调用注册在上面的方法获取操作结果. Future的方法定义: public interface Future<V> extends java.util.concurre

C++ 11 thread 基础用法 lock unlock join mutex joinable lock_guard unique_lock condition_variable wait notify_one notify_all asnyc future packaged_task promise

#include "pch.h"#include<iostream> #include<string> #include<vector> #include<list> // 线程相关头文件#include<thread>#include<mutex> #include<future>using namespace std; static int res = 0; //共享变量 演示使用互斥量读写. mu

C++多线程の线程通信future,promise,async

promise VS future

Future and Promise are the two separate sides of an asynchronous operation. promise is used by the "producer/writer" of the asynchronous operation. future is used by the "consumer/reader" of the asynchronous operation.

folly教程系列之:future/promise

     attension:本文严禁转载. 一.前言 promise/future是一个非常重要的异步编程模型,它可以让我们摆脱传统的回调陷阱,从而使用更加优雅.清晰的方式进行异步编程.c++11中已经开始支持std::future/std::promise,那么为什么folly还要提供自己的一套实现呢?原因是c++标准提供的future过于简单,而folly的实现中最大的改进就是可以为future添加回调函数(比如then),这样可以方便的链式调用,从而写出更加优雅.间接的代码,然后,改进还

C++11 多线程 future/promise简介

1. < future >头文件简介 Classes std::future std::future_error std::packaged_task std::promise std::shared_future Functions std::async std::future_category 2. std::future 简单来说,std::future提供了一种访问异步操作结果的机制. 从字面意思看,它表示未来.通常一个异步操作我们是不能马上就获取操作结果的,只能在未来某个时候获取.我