Scala 中的异步事件处理

在任何并发性应用程序中,异步事件处理都至关重要。无论事件的来源是什么(不同的计算任务、I/O 操作或与外部系统的交互),您的代码都必须跟踪事件,协调为响应它们而执行的操作。应用程序可以采用两种基本方法之一来实现异步事件处理:

  • 阻塞:一个等待事件的协调线程。
  • 非阻塞:事件向应用程序生成某种形式的通知,而没有线程显式等待它。

合成事件

scala.concurrent.Promise 和 scala.concurrent.Future 类为 Scala 开发人员提供了一些与 Java 8 开发人员的 CompletableFuture 使用方式类似的选项。具体地讲,Future 同时提供了阻塞和非阻塞的事件完成方式。但是,尽管在此级别上很相似,但用于处理两种 future 的技术是不同的。

我们先来看一个并发任务设置:

任务和排序

在一个特定操作中,应用程序通常必须执行多个处理步骤。例如,在向用户返回结果之前,Web 应用程序可能需要:

  1. 在一个数据库中查找用户的信息
  2. 使用查找到的信息来执行 Web 服务调用,并执行另一次数据库查询。
  3. 根据从前两个操作中获得的结果来执行数据库更新。
    图 1 演示了这种结构类型。

图 1. 应用程序任务流

图 1 将处理过程分解为 4 个不同的任务,它们通过表示顺序依赖关系的箭头相连接。任务 1 可以直接执行,任务 2 和任务 3 都在任务 1 完成后执行,任务 4 在任务 2 和任务 3 都完成后执行。

建模异步事件

在真实的系统中,异步事件的来源一般是并行计算或一种形式的 I/O 操作。但是,使用简单的时间延迟来建模这种系统会更容易一些,这也是这里所采用的方法。清单 1 显示了我用于生成事件的基本的赋时事件 (timed-event) 代码,这些事件采用了已完成的 Future 格式。

清单 1. 赋时事件代码

import java.util.Timer
import java.util.TimerTask

import scala.concurrent._

object TimedEvent {
  val timer = new Timer

  /** Return a Future which completes successfully with the supplied value after secs seconds. */
  def delayedSuccess[T](secs: Int, value: T): Future[T] = {
    val result = Promise[T]
    timer.schedule(new TimerTask() {
      def run() = {
        result.success(value)
      }
    }, secs * 1000)
    result.future
  }

  /** Return a Future which completes failing with an IllegalArgumentException after secs
    * seconds. */
  def delayedFailure(secs: Int, msg: String): Future[Int] = {
    val result = Promise[Int]
    timer.schedule(new TimerTask() {
      def run() = {
        result.failure(new IllegalArgumentException(msg))
      }
    }, secs * 1000)
    result.future
  }

清单 1 中的 Scala 代码使用一个 java.util.Timer 来安排 java.util.TimerTask 在一个延迟之后执行。每个 TimerTask 在运行时完成一个有关联的 future。delayedSuccess 函数定制了一个任务,在运行时成功完成一个 Scala Future[T],然后将该 future 返回给调用方。delayedSuccess 函数返回相同类型的 future,但使用了一个在完成 future 时发生 IllegalArgumentException 异常的失败任务。

清单 2 展示了如何使用 清单 1 中的代码创建 Future[Int] 格式的事件,使之与 图 1 中的 4 个任务相匹配。(此代码来自示例代码中的 AsyncHappy 类。)

清单 2. 示例任务的事件

// task definitions
def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1)
def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2)
def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3)
def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)

清单 2 中 4 个任务方法中的每一个都为该任务的完成时刻使用了特定的延迟值:task1 为 1 秒,task2 为 2 秒,task3 为 3 秒,task4 重新变为 1 秒。每个任务还接受一个输入值,是该输入加上任务编号作为 future 的(最终)结果值。这些方法都使用了 future 的成功形式;稍后您会看到一些使用失败形式的例子。

这些任务要求您按 图 1 中所示的顺序运行它们,向每个任务传递上一个任务返回的结果值(或者对于 task4,传递前两个任务结果的和)。如果中间两个任务同时执行,总的执行时间大约为 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。如果 task1 的输入为 1,那么结果为 2。如果该结果被传递给 task2 和 task3,那么结果将为 4 和 5。如果这两个结果的和 (9) 被作为输入传递给 task4,那么最终结果将为 13。

阻塞等待

在设定好操作环境之后,是时候来查看 Scala 如何处理事件的完成情况了。与上一期的 Java 代码中一样,协调 4 个任务的执行的最简单的方法是使用阻塞等待:主要线程等待每个任务依次完成。清单 3(同样来自示例代码中的 AsyncHappy 类)给出了此方法。

清单 3. 阻塞等待任务执行

def runBlocking() = {
  val v1 = Await.result(task1(1), Duration.Inf)
  val future2 = task2(v1)
  val future3 = task3(v1)
  val v2 = Await.result(future2, Duration.Inf)
  val v3 = Await.result(future3, Duration.Inf)
  val v4 = Await.result(task4(v2 + v3), Duration.Inf)
  val result = Promise[Int]
  result.success(v4)
  result.future
}

清单 3 使用 Scala scala.concurrent.Await 对象的 result() 方法来完成阻塞等待。该代码首先等待 task1 的结果,然后同时创建 task2 和 task3 future,并等待两个任务依次返回 future,最后等待 task4 的结果。最后 3 行(创建和设置 result)使得该方法能够返回一个 Future[Int]。返回该 future,让此方法与我接下来展示的非阻塞形式一致,但该 future 将在该方法返回之前完成。

组合 future

清单 4(同样来自示例代码中的 AsyncHappy 类)展示了一种将 future 联系在一起的方式,以便按正确顺序并使用正确的依赖关系执行任务,而不使用阻塞。

清单 4. 使用 onSuccess() 处理事件的完成

def runOnSuccess() = {
  val result = Promise[Int]
  task1(1).onSuccess(v => v match {
    case v1 => {
      val a = task2(v1)
      val b = task3(v1)
      a.onSuccess(v => v match {
        case v2 =>
          b.onSuccess(v => v match {
            case v3 => task4(v2 + v3).onSuccess(v4 => v4 match {
              case x => result.success(x)
            })
          })
      })
    }
  })
  result.future
}

清单 4 代码使用 onSuccess() 方法将一个函数(技术上讲是一个部分函数,因为它仅处理成功完成的情况)设置为在每个 future 完成时返回。因为 onSuccess() 调用是嵌套式的,所以它们将按顺序执行(即使 future 未完全按顺序完成)。

清单 4 的代码比较容易理解,但很冗长。清单 5 展示了一种使用 flatMap() 方法处理这种情况的更简单的方法。

清单 5. 使用 flatMap() 处理事件的完成

def runFlatMap() = {
  task1(1) flatMap {v1 =>
    val a = task2(v1)
    val b = task3(v1)
    a flatMap { v2 =>
      b flatMap { v3 => task4(v2 + v3) }}
  }
}

清单 5 中的代码实际上执行了与 清单 4 相同的事情,但 清单 5 使用了 flatMap() 方法从每个 future 中提取单一结果值。使用 flatMap() 消除了 清单 4 中所需的 match / case 结构,提供了一种更简洁的格式,但采用了同样的逐步执行路线。

试用示例

示例代码使用了一个 Scala App 来依次运行事件代码的每个版本,并确保完成事件(约 5 秒)和结果 (13) 是正确的。您可以使用 Maven 从命令行运行此代码,如清单 6 所示(删除了无关的 Maven 输出):

清单 6. 运行事件代码

[email protected]:~/devworks/scala4/code> mvn scala:run -Dlauncher=happypath
...
[INFO] launcher ‘happypath‘ selected => com.sosnoski.concur.article4.AsyncHappy
Starting runBlocking
runBlocking returned 13 in 5029 ms.
Starting runOnSuccess
runOnSuccess returned 13 in 5011 ms.
Starting runFlatMap
runFlatMap returned 13 in 5002 ms.

不顺利的道路

目前为止,您看到了以 future 形式协调事件的代码,这些代码总是能够成功完成。在真实应用程序中,不能寄希望于事情总是这么顺利。处理任务过程中可能会出现问题,而且在 JVM 语言术语中,这些问题通常表示为 Throwable。

更改 清单 2 中的任务定义很容易,只需使用 delayedFailure() 代替 delayedSuccess() 方法,如这里的 task4 所示:

def task4(input: Int) = TimedEvent.delayedFailure(1, "This won‘t work!")

如果运行仅将 task4 修改为完成时抛出异常的 清单 3,那么您会得到 task4 上的 Await.result() 调用所抛出的预期的 IllegalArgumentException。如果在 runBlocking() 方法中没有捕获该问题,该异常会在调用链中一直传递,直到最终捕获问题(如果未捕获问题,则会终止线程)。幸运的是,修改该代码很容易,因此,如果任何任务完成时抛出异常,该异常会通过返回的 future 传递给调用方来处理。清单 7 展示了这一更改。

清单 7. 具有异常的阻塞等待

def runBlocking() = {
  val result = Promise[Int]
  try {
    val v1 = Await.result(task1(1), Duration.Inf)
    val future2 = task2(v1)
    val future3 = task3(v1)
    val v2 = Await.result(future2, Duration.Inf)
    val v3 = Await.result(future3, Duration.Inf)
    val v4 = Await.result(task4(v2 + v3), Duration.Inf)
    result.success(v4)
  } catch {
    case t: Throwable => result.failure(t)
  }
  result.future
}

清单 7 非常浅显易懂,最初的代码包装在一个 try/catch 中,catch 在返回的 future 完成时传回异常。此方法稍微复杂一些,但任何 Scala 开发人员应该仍然很容易理解它。

那么,清单 4 和清单 5 中的事件处理代码的非阻塞变形是怎样的?从名称可以看出,清单 4 中使用的 onSuccess() 方法仅 适用于 future 的成功完成类型。如果想要同时处理成功和失败完成类型,则必须使用 onComplete() 方法,检查哪种完成例行适用。清单 8 展示了此技术如何用在事件处理代码中。

清单 8. 成功和失败的 onComplete() 处理

def runOnComplete() = {
  val result = Promise[Int]
  task1(1).onComplete(v => v match {
    case Success(v1) => {
      val a = task2(v1)
      val b = task3(v1)
      a.onComplete(v => v match {
        case Success(v2) =>
          b.onComplete(v => v match {
            case Success(v3) => task4(v2 + v3).onComplete(v4 => v4 match {
              case Success(x) => result.success(x)
              case Failure(t) => result.failure(t)
            })
            case Failure(t) => result.failure(t)
          })
        case Failure(t) => result.failure(t)
      })
    }
    case Failure(t) => result.failure(t)
  })
  result.future
}

清单 8 看起来很凌乱,幸运的是还有一种简单得多的替代方法:使用 清单 5 中的 flatMap() 代码代替。flatMap() 方法同时处理成功和失败完成类型,无需执行任何更改。

使用 async

最新的 Scala 版本包含在编译期间使用宏 转换代码的能力。目前实现的一个最有用的宏是 async,它在编译期间将使用 future 的看似顺序的代码转换为异步代码。清单 9 展示了 async 如何简化本教程中使用的任务代码。

清单 9. 结合使用 future 与 async

def runAsync(): Future[Int] = {
  async {
    val v1 = await(task1(1))
    val a = task2(v1)
    val b = task3(v1)
    await(task4(await(a) + await(b)))
  }
}

清单 9 中封装的 async 调用了 async 宏。此调用将该代码块声明为异步执行的代码,并在默认情况下异步执行它,然后返回一个 future 表示该代码块的执行结果。在该代码块中,await() 方法(实际上是该宏的一个关键字,而不是一个真正的方法)显示了何处需要一个 future 的结果。async 宏在编译期间修改了 Scala 程序的抽象语法树 (AST),以便将该代码块转换为使用回调的代码,这大体相当于 清单 4 的代码。

除了 async 包装器之外,清单 9 中的代码还与 清单 3 中最初的阻塞代码很相似。这主要是这个宏的成就,它抽象化了异步事件的所有复杂性,使它看起来像您在编写简单的线性代码。在幕后,这涉及到大量复杂性。

async 内部原理

如果查看 Scala 编译器从源代码生成的类,就会看到一些具有类似 AsyncHappy$$anonfun$1.class 的名称的内部类。从名称可以猜到,这些类由编译器为异步函数而生成(比如传递给 onSuccess() 或 flatMap() 方法的语句。)

使用 Scala 2.11.1 编译器和 Async 0.9.2 实现,您还会看到一个名为 AsyncUnhappy$stateMachine$macro$1$1.class 的类。这是 async 宏生成的实际实现代码,采用状态机的形式来处理异步任务。清单 10 给出了此类的一个部分地方进行了反编译(decompiled)的视图。

清单 10. 反编译后的 AsyncUnhappy$stateMachine$macro$1$1.class

public class AsyncUnhappy$stateMachine$macro$1$1
  implements Function1<Try<Object>, BoxedUnit>, Function0.mcV.sp
{
  private int state;
  private final Promise<Object> result;
  private int await$macro$3$macro$13;
  private int await$macro$7$macro$14;
  private int await$macro$5$macro$15;
  private int await$macro$11$macro$16;
  ...
  public void resume() {
    ...
  }

  public void apply(Try<Object> tr) {
    int i = this.state;
    switch (i) {
      default:
        throw new MatchError(BoxesRunTime.boxToInteger(i));
      case 3:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$11$macro$16 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 4;
          resume();
        }
        break;
      case 2:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$7$macro$14 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 3;
          resume();
        }
        break;
      case 1:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$5$macro$15 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 2;
          resume();
        }
        break;
      case 0:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$3$macro$13 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 1;
          resume();
        }
        break;
    }
  }
  ...
}

清单 10 中的 apply() 方法处理实际的状态更改,估算一个 future 的结果并将输出状态更改为匹配。输入状态会告诉该代码正在估算哪个 future;每个状态值对应于 async 代码块中一个特定的 future。从 清单 10 的部分代码很难了解这一点,但查看其他一些字节码,就可以看到状态代码是与任务匹配的,所以状态 0 表示 task1 的结果符合预期,状态 1 表示 task2 的结果符合预期,依此类推。

resume() 方法并未显示在 清单 10 中,因为反编译器无法确定如何将它转换为 Java 代码。我也不打算探讨这个过程,但通过查看字节码,可以确定 resume() 方法执行了与状态代码上的 Java switch 相似的工作。对于每个非最终状态,resume() 执行适当的代码段来设置下一个预期的 future,最终将 AsyncUnhappy$stateMachine$macro$1$1 实例设置为 future 的 onComplete() 方法的目标。对于最终状态,resume() 将会设置结果值并履行对最终结果的承诺。

您实际上并不需要深入分析生成的代码来理解 async(但它可能很有趣)。关于 async 工作原理的完整描述,请查阅 SIP-22 - Async 提案。

async 限制
由于 async 宏将代码转换为状态机类的方式,该宏的使用有一些限制。最明显的限制是,不能将 await() 嵌套在 async 代码块中的另一个对象或闭包内(包括一个函数定义)。也不能将 await() 嵌套在一个 try 或 catch 内。

除了这些使用限制之外,async 的最大问题是:在调试时,您同样会体验到一些通常与异步代码有关的问题回调,在这种情况下,需要尝试理解没有反映明显的代码结构的调用堆栈。不幸的是,目前的调试器设计无法解决这些问题。这是 Scala 中一个新的工作区域(请参阅 反思调试器。)与此同时,您可以禁用 async 代码块的异步执行,让调试变得更轻松(假设您尝试修复的问题在按顺序执行操作时仍然存在)。

最后,Scala 宏仍是一项我们正在开展的工作。async 有望在未来的版本中成为 Scala 语言的一个正式部分,但只有在 Scala 语言团队对宏的工作方式感到满意时,这种情况才会出现。到那时,无法确保 async 的格式不会发生改变。

结束语

一些处理异步事件的 Scala 方法与 Java 代码存在很大的区别。借助 flatMap() 和 async 宏,Scala 提供了整洁而且容易理解的技术。async 特别有趣,您可以编写看似正常的顺序的代码,但编译的代码会并发地执行。Scala 不是提供这种方法的惟一语言,但基于宏的实现为其他方法提供了极高的灵活性。

本文转自:https://www.ibm.com/developerworks/cn/java/j-jvmc4/index.html

原文地址:https://www.cnblogs.com/listenfwind/p/9911934.html

时间: 2024-10-03 14:45:12

Scala 中的异步事件处理的相关文章

HttpApplication中的异步线程

一.Asp.net中的线程池设置 在Asp.net的服务处理中,每当服务器收到一个请求,HttpRuntime将从HttpApplication池中获取一个HttpApplication对象处理此请求,请求的处理过程将被排入线程池中,对于Asp.net来说,在Machine.config文件的processModel部分中可以设置线程池中的参数. Asp.net线程相关的参数配置: 参数 配置 autoConfig 基于服务器的配置自动设置. maxWorkerThreads 设置每个CPU的最

scala中java并发编程

Runnable/Callable 线程(Thread) Executors/ExecutorService Future 线程安全问题 示例:搜索引擎 解决方案 Runnable/Callable Runnable只有一个没有返回值的方法 1 2 3 trait Runnable {   def run(): Unit } Callable的方法和run类似,只不过它有一个返回值 1 2 3 trait Callable[V] {   def call(): V } 线程 Scala的并发是建

第4节 Scala中的actor介绍:1、actor概念介绍;2、actor执行顺序和发送消息的方式

要看这一节... 10.    Scala Actor并发编程 10.1.   课程目标 10.1.1.    目标一:熟悉Scala Actor并发编程 10.1.2.    目标二:为学习Akka做准备 注:Scala Actor是scala 2.10.x版本及以前版本的Actor. Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,老版本的Actor已经废弃. 10.2.   什么是Scala  Actor 10.2.1.    概念 Scala中的Actor能够实

scala中trait学习笔记

scala中提供的trait(特质)和Java中的Interface有很多相似之处.都可以持有方法的声明和属性,但是trait还有比interface强大的多的其他用法. 1. trait可以带有方法实现: 2. trait与interface一样,可以互相继承.但是trait可以继承自某个类,但是这种特质只能够混入父类的子类中,不能随意混入: 3. trait中可以在运行时动态调用方法. 下面举一个trait使用的例子. 首先定义一个虚类IntQueue和特质Logger abstract c

转载: scala中span和partition区别

scala中的partition span splitAt groupBy 可把Collection分成:满足条件的一组,其他的另一组. partitionspan List(1,9,2,4,5).span(_<3)       // (List(1),List(9, 2, 4, 5)),碰到不符合就结束 List(1,9,2,4,5).partition(_<3) // (List(1, 2),List(9, 4, 5)),扫描所有 splitAt // (List(1, 3),List(5

scala学习手记16 &ndash; scala中的static

前面两节学了scala的对象和伴生对象,这两个在使用的时候很有些java的静态成员的意思. scala中没有静态字段和静态方法.静态成员会破坏scala所支持的完整的面向对象模型.不过可以通过伴生对象实现对scala的类一级的操作. 回过头来再看一遍那个Marker的例子,略做了一些调整: class Marker private(val color: String) { println("Creating " + this) override def toString(): Stri

scala学习手记2 - scala中的循环

先来看一段Java中的循环: for (int i = 1; i < 4; i++) { System.out.print(i + ","); } 毫无疑问,scala可以让这个循环更加简洁.根据上一节中的内容,没有必要显示指定变量i的类型,我们甚至不需要声明这个变量.其次输出的语句也可以更加简洁一些,在scala中可以直接使用println()这个方法输出字符串.最后scala的循环结构也是非常的轻量级.好了,可以看一下代码了: for (i <- 1 to 3) { p

ASP.NET MVC中使用异步控制器

线程池 一直想把项目改写成异步,但是ASP.NETMVC3下写的过于繁琐,.NET 4.5与ASP.NET MVC下代码写起来就比较简单了, MS好像也一直喜欢这样搞,每一个成熟的东西,都要演变好几个版本,才能趋于规范. ASP.NET MVC 中为什么需要使用异步呢,IIS有一个线程池来处理用户的请求,当一个新的请求过来时,将调度池中的线程以处理该请求,然而,但并发量很高的情况下,池中的线程已经不能够满足这么多的请求时候,池中的每一个线程都处于忙的状态则在处理请求时将阻塞处理请求的线程,并且该

第85讲:Scala中For表达式的强大表现力实战

今日[DT大数据梦工厂视频]<第85讲:Scala中For表达式的强大表现力实战>51CTO视频:http://edu.51cto.com/lesson/id-71503.html(DT大数据梦工厂scala的所有视频.PPT和代码在百度云盘的链接:http://url.cn/fSFPjS)85讲 scala for 表达式的强大表现力高阶函数的行为 指定了对数据 处理 的细节 .case class Person(name:String,isMale:Boolean,children:Per