Scalaz(49)- scalaz-stream: 安全的无穷运算-running infinite stream freely

scalaz-stream支持无穷数据流(infinite stream),这本身是它强大的功能之一,试想有多少系统需要通过无穷运算才能得以实现。这是因为外界的输入是不可预料的,对于系统本身就是无穷的,比如键盘鼠标输入什么时候终止、网站上有多少网页、数据库中还有多少条记录等等。但对无穷数据流的运算又引发了新的挑战。我们知道,fp程序的主要运算方式是递归算法,这是个问题产生的源泉:极容易掉入StackOverflowError陷阱。相信许多人对scalaz-stream如何实现无穷数据的运算安全都充满了好奇和疑问,那我们就在本篇讨论中分析一下scalaz-stream的具体运算方式。

scalaz-stream是由Process类型组件链接而成。Process是个状态机器(state machine)由Emit、Await、Append、Halt几个状态组成。值得注意的是这几个状态都是结构化的:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]

case class Await[+F[_], A, +O](
    req: F[A]
    , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance
    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] {
...
}
case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]

case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O] {
...
}

首先这些结构代表了Process类型其中的某种状态,而且要注意Await和Append的连接函数运算结果是Trampoline类型的,说明运算这两个连接函数可以避免StackOverflowError,实现安全运行。同时仔细观察可以发现用这些状态结构是可以实现point和flatMap函数的:

  def point(o: O): Process[Nothing,O] = Emit(o)

/**
   * Generate a `Process` dynamically for each output of this `Process`, and
   * sequence these processes using `append`.
   */
  final def flatMap[F2[x] >: F[x], O2](f: O => Process[F2, O2]): Process[F2, O2] = {
    // Util.debug(s"FMAP $this")
    this match {
      case Halt(_) => this.asInstanceOf[Process[F2, O2]]
      case Emit(os) if os.isEmpty => this.asInstanceOf[Process[F2, O2]]
      case Emit(os) => os.tail.foldLeft(Try(f(os.head)))((p, n) => p ++ Try(f(n)))
      case [email protected](_, _, _) => aw.extend(_ flatMap f)
      case [email protected](p, n) => ap.extend(_ flatMap f)
    }
  }
 

以上证实了Process就是Free Monad。Free Monad可以实现函数结构化,通过heap置换stack,可以在固定的堆栈空间内运行任何规模的程序,有效解决运行递归算法造成的StackOverflowError问题。值得注意的是不但Await和Append这两个状态转换方式是结构化的,它们的连接函数(continuation)运算结果也是包嵌在Trampoline里的。也就是说这样的设计保证了无论在翻译多层的Process状态组合或者运算超长Process链接的stream都可以避免StackOverflowError。

我们来详细了解一下具体的scalaz-stream程序实现方式:在之前的讨论里介绍了通过Free Monad编程的特点是算式/算法关注分离。我们可以说用Process组合成stream就是所谓的算式:对程序功能的描述。而算法具体来说应该由两部分组成:程序翻译和运算,把程序功能描述翻译成Free Monad结构然后运算这些结构里的函数。连续的算法会被翻译成多层的结构。那么翻译和运算就可能会同时进行:翻译一层即运算一层。所以我称算法(interpreter)为译算器:代表翻译和运算。对于无穷运算程序,compiler只能用Process类型的构建器(constructor)把程序翻译成Process的初始状态,然后译算器(interpreter)会一边继续进一步翻译一边运算结果。我们先从分析Process的运算器(runner)Process.runLog作业模式开始:

/**
   * Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in
   * which we can catch exceptions. This function is not tail recursive and
   * relies on the `Monad[F]` to ensure stack safety.
   */
  final def runLog[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Vector[O2]] = {
    runFoldMap[F2, Vector[O2]](Vector(_))(
      F, C,
      // workaround for performance bug in Vector ++
      Monoid.instance[Vector[O2]]((a, b) => a fast_++ b, Vector())
    )
  }

runLog是runFoldMap函数的一个特殊施用:

/**
   * Collect the outputs of this `Process[F,O]` into a Monoid `B`, given a `Monad[F]` in
   * which we can catch exceptions. This function is not tail recursive and
   * relies on the `Monad[F]` to ensure stack safety.
   */
  final def runFoldMap[F2[x] >: F[x], B](f: O => B)(implicit F: Monad[F2], C: Catchable[F2], B: Monoid[B]): F2[B] = {
    def go(cur: Process[F2, O], acc: B): F2[B] = {
      cur.step match {
        case s: Step[F2,O]@unchecked =>
          (s.head, s.next) match {
            case (Emit(os), cont) =>
              F.bind(F.point(os.foldLeft(acc)((b, o) => B.append(b, f(o))))) { nacc =>
                go(cont.continue.asInstanceOf[Process[F2,O]], nacc)
              }
            case (awt:Await[F2,Any,O]@unchecked, cont) =>
              awt.evaluate.flatMap(p => go(p +: cont, acc))
          }
        case Halt(End) => F.point(acc)
        case Halt(Kill) => F.point(acc)
        case Halt(Error(rsn)) => C.fail(rsn)
      }
    }

    go(this, B.zero)
  }

这里面又引用了step函数和Step类型:

 /**
   * Run one step of an incremental traversal of this `Process`.
   * This function is mostly intended for internal use. As it allows
   * a `Process` to be observed and captured during its execution,
   * users are responsible for ensuring resource safety.
   */
  final def step: HaltOrStep[F, O] = {
    val empty: Emit[Nothing] = Emit(Nil)
    @tailrec
    def go(cur: Process[F,O], stack: Vector[Cause => Trampoline[Process[F,O]]], cnt: Int) : HaltOrStep[F,O] = {
      if (stack.nonEmpty) cur match {
        case Halt(End) if cnt <= 0  => Step(empty,Cont(stack))
        case Halt(cause) => go(Try(stack.head(cause).run), stack.tail, cnt - 1)
        case Emit(os) if os.isEmpty => Step(empty,Cont(stack))
        case [email protected](Emit(os)) => Step(emt,Cont(stack))
        case [email protected](_,_,_) => Step(awt,Cont(stack))
        case Append(h,st) => go(h, st fast_++ stack, cnt - 1)
      } else cur match {
        case [email protected](cause) => hlt
        case [email protected](os) if os.isEmpty => halt0
        case [email protected](os) => Step(emt,Cont(Vector.empty))
        case [email protected](_,_,_) => Step(awt,Cont(Vector.empty))
        case Append(h,st) => go(h,st, cnt - 1)
      }
    }
    go(this,Vector.empty, 10)   // *any* value >= 1 works here. higher values improve throughput but reduce concurrency and fairness. 10 is a totally wild guess

  }
/**
   * Intermediate step of process.
   * Used to step within the process to define complex combinators.
   */
  case class Step[+F[_], +O](head: EmitOrAwait[F, O], next: Cont[F, O]) extends HaltOrStep[F, O] {
    def toProcess : Process[F,O] = Append(head.asInstanceOf[HaltEmitOrAwait[F,O]],next.stack)
  }

  /**
   * Continuation of the process. Represents process _stack_. Used in conjunction with `Step`.
   */
  case class Cont[+F[_], +O](stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance) {

    /**
     * Prepends supplied process to this stack
     */
    def +:[F2[x] >: F[x], O2 >: O](p: Process[F2, O2]): Process[F2, O2] = prepend(p)

    /** alias for +: */
    def prepend[F2[x] >: F[x], O2 >: O](p: Process[F2, O2]): Process[F2, O2] = {
      if (stack.isEmpty) p
      else p match {
        case app: Append[[email protected], [email protected]] => Append[F2, O2](app.head, app.stack fast_++ stack)
        case emt: Emit[[email protected]] => Append(emt, stack)
        case awt: Await[[email protected], _, [email protected]] => Append(awt, stack)
        case [email protected](_) => Append(hlt, stack)
      }
    }

Step也是一个结构(case class),代表了一个完整连接的运算步骤:head为当前Emit或Await状态;next是另一种结构Cont,能引导下一个状态。stack代表一串状态连接函数:由当前状态终结原因推导到下一个状态。step函数的作用是判断当前Process状态是否符合构建Step结构条件,返回HaltOrStep类型结果,即:如当前Process状态不符合构建Step条件即进入Halt状态。从step函数中go函数的流程可以得出:当前状态为Emit或者Await时直接转成单步Step(没有下一个状态,next为空)。当前Process状态为Append时才会产生next不为空的Step(意思是完成当前状态运算后产生的结果会引导下一个状态)。很明显,这个step包含了翻译的作用:当前状态为Append时把它翻译成一个连续的Step:next这个Cont结构不为空,而Cont可以被转换成Process[F,O]:

/**
     * Converts this stack to process, that is used
     * when following process with normal termination.
     */
    def continue: Process[F, O] = prepend(halt)

我们再看看runFoldMap里这段代码:

   def go(cur: Process[F2, O], acc: B): F2[B] = {
      cur.step match {
        case s: Step[F2,O]@unchecked =>
          (s.head, s.next) match {
            case (Emit(os), cont) =>
              F.bind(F.point(os.foldLeft(acc)((b, o) => B.append(b, f(o))))) { nacc =>
                go(cont.continue.asInstanceOf[Process[F2,O]], nacc)
              }
            case (awt:Await[F2,Any,O]@unchecked, cont) =>
              awt.evaluate.flatMap(p => go(p +: cont, acc))
          }
        case Halt(End) => F.point(acc)
        case Halt(Kill) => F.point(acc)
        case Halt(Error(rsn)) => C.fail(rsn)
      }
  

如果当前状态是个多步的Step(next不为空):运算当前步骤后递归式重复对下面的步骤进行翻译,即重复 ->go->step,同时对翻译的步骤进行运算。

下面我们再看看compiler是如何产生Process初始状态的:

1   emit(3)                            //> res3: scalaz.stream.Process0[Int] = Emit(Vector(3))
2   emitAll(Seq(1,2,3))                //> res4: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
3   Process(1,2,3)                     //> res5: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3))
4   emitAll(Seq(1,2,3)).toSource       //> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))

compiler对这几个简单的Process描述都产生了所谓的单步结构。runLog可以直接运算Emit结构内的元素然后终止。再看看需要从外部获取数据的Source:

1   await(Task.delay(3))(emit)       //> res8: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected],<function1>,<function1>)
2   eval(Task.delay {3})             //> res9: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected],<function1>,<function1>)

compiler产生的是Await结构。Await结构内确切的内容是:

// Await{task, {o => Emit(o)}, {o => Halt(End)}}     

对这个结构runLog会先运算task得出3,然后Emit(3),之后正常终止Halt(End)。

我们跟着再观察一个连续运算的例子:

1   emit(1) ++ emit(2)     //> res7: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(1)),Vector(<function1>))
2 //Append{Emit(Vector(1)), Vector({case End => Emit(Vector(2)) case c => Halt(c)})}
3
4   emit(1) ++ emit(2) ++ emit(3)    //> res8: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(1)),Vector(<function1>, <function1>))
5 //Append{Emit(Vector(1)), Vector({case End => Emit(Vector(2)) case c => Halt(c)},
6 //                               {case End => Emit(Vector(3)) case c => Halt(c)}}

对于 ++ 操作,compile产生了Append结构。结构内容如上所述。

用递归运算产生了下面的Await结构:

1   await(Task.delay(3))(emit)                      //> res9: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected],<function1>,<function1>)
2   eval(Task.delay {3})                            //> res10: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected],<function1>,<function1>)
3 // Await{task, {o => Emit(o)}, {o => Halt(End)}}
4
5   await(Task.delay(1))(i => await(Task.delay(2))(j => emit(i+j)))
6                                                   //> res11: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected],<function1>,<function1>)
7 // Await{task, {o => Await {task1, {o1 => emit(o+o1)}, {o1 => Halt(End)}}
8 //                   , {o => Halt(End)}

以上这几个例子里的stream都是明确声明(declarative stream)的,属于有限规模数据。下面我们正式来介绍一下无穷数据流(infinite stream)的具体实现方式。之前我们认识到repeat是个无穷函数:p.repeat表示无限复制Process p。我们看看compiler是如何处理它的:

1   emit(2).repeat  //> res12: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(2)),Vector(<function1>))

repeat可以用Append结构表示:

 case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O] {

再看看repeat函数的实现方法:

/**
   * Run this process until it halts, then run it again and again, as
   * long as no errors or `Kill` occur.
   */
  final def repeat: Process[F, O] = this.append(this.repeat)

/**
   * If this process halts due to `Cause.End`, runs `p2` after `this`.
   * Otherwise halts with whatever caused `this` to `Halt`.
   */
  final def append[F2[x] >: F[x], O2 >: O](p2: => Process[F2, O2]): Process[F2, O2] = {
    onHalt {
      case End => p2
      case cause => Halt(cause)
    }
  }

repeat通过append函数产生了个Append结构。append函数的作用是在上一个Process正常结束时继续运算p2,否则终止Halt。在repeat函数里这个p2就是repeat运算自己。用普通话解释:完成上一个运算后继续不断重复地再运算它。这就是一个典型的无穷数据源了。同时我们可以预测到Append结构里的内容:

1   emit(2).repeat  //> res12: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(2)),Vector(<function1>))
2 // [email protected]{Emit(Vector(2)),Vector({case End => app case c => Halt(c)})}

app代表当前Append。按这样的原理我们可以编写一下无穷数据产生函数:

1   def dup(i: Int): Process[Task,Int] = await(Task.delay(i))(j => emit(j) ++ dup(j))
2                 //> dup: (i: Int)scalaz.stream.Process[scalaz.concurrent.Task,Int]
3   dup(5).take(5).runLog.run         //> res13: Vector[Int] = Vector(5, 5, 5, 5, 5)
4
5   def inc(start: Int): Process[Task,Int] = await(Task.delay(start))(i => emit(i) ++ inc(i+1))
6                //> inc: (start: Int)scalaz.stream.Process[scalaz.concurrent.Task,Int]
7   inc(5).take(5).runLog.run         //> res14: Vector[Int] = Vector(5, 6, 7, 8, 9)

我们知道最终这两个函数会产生Append结构,所以确定能够在固定的堆栈空间内运算这些Append结构内的连接函数(continuation),实现安全无穷运算。

时间: 2024-08-09 22:59:13

Scalaz(49)- scalaz-stream: 安全的无穷运算-running infinite stream freely的相关文章

泛函编程(13)-无穷数据流-Infinite Stream

上节我们提到Stream和List的主要分别是在于Stream的"延后计算"(lazy evaluation)特性.我们还讨论过在处理大规模排列数据集时,Stream可以一个一个把数据元素搬进内存并且可以逐个元素地进行处理操作.这让我不禁联想到我们常用的数据搜索读取方式了:大量的数据存放在数据库里,就好像无穷的数据源头.我们把数据读取方式(那些数据库读写API函数)嵌入Stream的操作函数内,把数据搜索条件传入Stream构造器(constructor)中形成一个对数据搜索操作的描述

Scalaz(58)- scalaz-stream: fs2-并行运算示范,fs2 parallel processing

从表面上来看,Stream代表一连串无穷数据元素.一连串的意思是元素有固定的排列顺序,所以对元素的运算也必须按照顺序来:完成了前面的运算再跟着进行下一个元素的运算.这样来看,Stream应该不是很好的并行运算工具.但是,fs2所支持的并行运算方式不是以数据元素而是以?Stream为运算单位的:fs2支持多个Stream同时进行运算,如merge函数.所以fs2使Stream的并行运算成为了可能. 一般来说,我们可能在Stream的几个状态节点要求并行运算: 1.同时运算多个数据源头来产生不排序的

Scalaz(46)- scalaz-stream 基础介绍

scalaz-stream是一个泛函数据流配件库(functional stream combinator library),特别适用于函数式编程.scalar-stream是由一个以上各种状态的Process串联组成.stream代表一连串的元素,可能是自动产生或者由外部的源头输入,如:一连串鼠标位置:文件中的文字行:数据库记录:又或者一连串的HTTP请求等.Process就是stream转换器(transducer),它可以把一种stream转换成另一种stream.Process的类型款式

Scalaz(16)- Monad:依赖注入-Dependency Injection By Reader Monad

在上一篇讨论里我们简单的介绍了一下Cake Pattern和Reader Monad是如何实现依赖注入的.主要还是从方法上示范了如何用Cake Pattern和Reader在编程过程中解析依赖和注入依赖.考虑到依赖注入模式在编程中的重要性和普遍性,觉着还需要再讨论的深入一些,使依赖注入模式在FP领域里能从理论走向实际.既然我们正在scalaz的介绍系列里,所以这篇我们就着重示范Reader Monad的依赖注入方法. 再说明一下依赖注入:我们说过在团队协作开发里能够实现软件模块的各自独立开发,原

Scalaz(10)- Monad:就是一种函数式编程模式-a design patter

Monad typeclass不是一种类型,而是一种程序设计模式(design pattern),是泛函编程中最重要的编程概念,因而很多行内人把FP又称为Monadic Programming.这其中透露的Monad重要性则不言而喻.Scalaz是通过Monad typeclass为数据运算的程序提供了一套规范的编程方式,如常见的for-comprehension.而不同类型的Monad实例则会支持不同的程序运算行为,如:Option Monad在运算中如果遇到None值则会中途退出:State

Scalaz(20)-Monad: Validation-Applicative版本的Either

scalaz还提供了个type class叫Validation.乍看起来跟\/没什么分别.实际上这个Validation是在\/的基础上增加了Applicative功能,就是实现了ap函数.通过Applicative实例就可以同时运算多个Validation并返回多条异常信息.所以,\/与Validation核心分别就在于Validation可以返回多条异常信息.Validation也是由两种状态组成:Success和Failure,分别与\/的left和right相对应.Failure可以返

Scalaz(12)- Monad:Writer - some kind of logger

通过前面的几篇讨论我们了解到F[T]就是FP中运算的表达形式(representation of computation).在这里F[]不仅仅是一种高阶类型,它还代表了一种运算协议(computation protocol)或者称为运算模型好点,如IO[T],Option[T].运算模型规范了运算值T的运算方式.而Monad是一种特殊的FP运算模型M[A],它是一种持续运算模式.通过flatMap作为链条把前后两个运算连接起来.多个flatMap同时作用可以形成一个程序运行链.我们可以在flat

Scalaz(57)- scalaz-stream: fs2-多线程编程,fs2 concurrency

fs2的多线程编程模式不但提供了无阻碍I/O(java nio)能力,更为并行运算提供了良好的编程工具.在进入并行运算讨论前我们先示范一下fs2 pipe2对象里的一些Stream合并功能.我们先设计两个帮助函数(helper)来跟踪运算及模拟运算环境: 1 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap {a => 2 Task.delay { println(prompt + a); a}} //> log: [A](promp

Scalaz(59)- scalaz-stream: fs2-程序并行运算,fs2 running effects in parallel

scalaz-stream-fs2是一种函数式的数据流编程工具.fs2的类型款式是:Stream[F[_],O],F[_]代表一种运算模式,O代表Stream数据元素的类型.实际上F就是一种延迟运算机制:F中间包含的类型如F[A]的A是一个可能会产生副作用不纯代码(impure code)的运算结果类型,我们必须用F对A运算的延迟机制才能实现编程过程中的函数组合(compositionality),这是函数式编程的标准做法.如果为一个Stream装备了F[A],就代表这个Stream会在处理数据