FunDA(6)- Reactive Streams:Play with Iteratees、Enumerator and Enumeratees

在上一节我们介绍了Iteratee。它的功能是消耗从一些数据源推送过来的数据元素,不同的数据消耗方式代表了不同功能的Iteratee。所谓的数据源就是我们这节要讨论的Enumerator。Enumerator是一种数据源:它会根据下游数据消耗方(Iteratee)的具体状态主动向下推送数据元素。我们已经讨论过Iteratee的状态Step类型:

trait Step[E,+A]
case class Done[+A,E](a: A, remain: Input[E]) extends Step[E,A]
case class Cont[E,+A](k: Input[E] => InputStreamHandler[E,A]) extends Step[E,A]
case class Error[E](msg: String, loc:Input[E]) extends Step[E,Nothing]

这其中Iteratee通过Cont状态通知Enumerator可以发送数据元素,并提供了k函数作为Enumerator的数据推送函数。Enumerator推送的数据元素,也就是Iteratee的输入Input[E],除单纯数据元素之外还代表着数据源状态:

trait Input[+E]
case class EL[E](e: E) extends Input[E]
case object EOF extends Input[Nothing]
case object Empty extends Input[Nothing]

Enumerator通过Input[E]来通知Iteratee当前数据源状态,如:是否已经完成所有数据推送(EOF),或者当前推送了什么数据元素(El[E](e:E))。Enumerator主动向Iteratee输出数据然后返回新状态的Iteratee。我们可以从Enumerator的类型款式看得出:

trait Enumerator[E] {

  /**
   * Apply this Enumerator to an Iteratee
   */
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]

}

这个Future的目的主要是为了避免占用线程。实际上我们可以最终通过调用Iteratee的fold函数来实现Enumerator功能,如:

 /**
   * Creates an enumerator which produces the one supplied
   * input and nothing else. This enumerator will NOT
   * automatically produce Input.EOF after the given input.
   */
  def enumInput[E](e: Input[E]) = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
      i.fold {
        case Step.Cont(k) => eagerFuture(k(e))
        case _ => Future.successful(i)
      }(dec)
  }

又或者通过构建器(constructor, apply)来构建Eumerator:

/**
   * Create an Enumerator from a set of values
   *
   * Example:
   * {{{
   *   val enumerator: Enumerator[String] = Enumerator("kiki", "foo", "bar")
   * }}}
   */
  def apply[E](in: E*): Enumerator[E] = in.length match {
    case 0 => Enumerator.empty
    case 1 => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC {
        case Step.Cont(k) => k(Input.El(in.head))
        case _ => i
      }
    }
    case _ => new Enumerator[E] {
      def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i)
    }
  }

  /**
   * Create an Enumerator from any TraversableOnce like collection of elements.
   *
   * Example of an iterator of lines of a file :
   * {{{
   *  val enumerator: Enumerator[String] = Enumerator( scala.io.Source.fromFile("myfile.txt").getLines )
   * }}}
   */
  def enumerate[E](traversable: TraversableOnce[E])(implicit ctx: scala.concurrent.ExecutionContext): Enumerator[E] = {
    val it = traversable.toIterator
    Enumerator.unfoldM[scala.collection.Iterator[E], E](it: scala.collection.Iterator[E])({ currentIt =>
      if (currentIt.hasNext)
        Future[Option[(scala.collection.Iterator[E], E)]]({
          val next = currentIt.next
          Some((currentIt -> next))
        })(ctx)
      else
        Future.successful[Option[(scala.collection.Iterator[E], E)]]({
          None
        })
    })(dec)
  }

  /**
   * An empty enumerator
   */
  def empty[E]: Enumerator[E] = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]) = Future.successful(i)
  }

  private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) => Future[Iteratee[E, A]] = { (l, i) =>
    l.foldLeft(Future.successful(i))((i, e) =>
      i.flatMap(it => it.pureFold {
        case Step.Cont(k) => k(Input.El(e))
        case _ => it
      }(dec))(dec))
  }

下面是个直接构建Enumerator的例子:

 val enumUsers: Enumerator[String] = {
   Enumerator("Tiger","Hover","Grand","John")
       //> enumUsers  : play.api.libs.iteratee.Enumerator[String] = [email protected]

在这个例子里的Enumerator就是用上面那个apply构建的。我们把enumUsers连接到costume Iteratee:

 val consume = Iteratee.consume[String]()        //> consume  : play.api.libs.iteratee.Iteratee[String,String] = Cont(<function1>)
 val consumeUsers = enumUsers.apply(consume)      //> consumeUsers  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,String]] = Success([email protected])

我们是用apply(consume)来连接Enumerator和Iteratees的。apply函数的定义如下:

/**
   * Attaches this Enumerator to an [[play.api.libs.iteratee.Iteratee]], driving the
   * Iteratee to (asynchronously) consume the input. The Iteratee may enter its
   * [[play.api.libs.iteratee.Done]] or [[play.api.libs.iteratee.Error]]
   * state, or it may be left in a [[play.api.libs.iteratee.Cont]] state (allowing it
   * to consume more input after that sent by the enumerator).
   *
   * If the Iteratee reaches a [[play.api.libs.iteratee.Done]] state, it will
   * contain a computed result and the remaining (unconsumed) input.
   */
  def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]]

这是个抽象函数。举个例实现这个apply函数的例子:

/**
   * Creates an enumerator which produces the one supplied
   * input and nothing else. This enumerator will NOT
   * automatically produce Input.EOF after the given input.
   */
  def enumInput[E](e: Input[E]) = new Enumerator[E] {
    def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] =
      i.fold {
        case Step.Cont(k) => eagerFuture(k(e))
        case _ => Future.successful(i)
      }(dec)
  }

consumeUsers: Future[Iteratee[String,String]],我们用Future的函数来显示发送数据内容:

 val futPrint = consumeUsers.flatMap { i => i.run }.map(println)
    //> futPrint  : scala.concurrent.Future[Unit] = List()
 Await.ready(futPrint,Duration.Inf)     //> TigerHoverGrandJohn res0: demo.worksheet.enumerator.futPrint.type = Success(()) 

另一种更直接的方式:

val futUsers = Iteratee.flatten(consumeUsers).run.map(println)
      //> futUsers  : scala.concurrent.Future[Unit] = List()
 Await.ready(futPrint,Duration.Inf)
      //> TigerHoverGrandJohnres1: demo.worksheet.enumerator.futPrint.type = Success(())

我们也可以使用函数符号 |>> :

 val futPrintUsers = {
  Iteratee.flatten(enumUsers |>> consume).run.map(println)
     //> futPrintUsers  : scala.concurrent.Future[Unit] = List()
 }
 Await.ready(futPrintUsers,Duration.Inf)
     //> TigerHoverGrandJohn res2: demo.worksheet.enumerator.futPrintUsers.type = Success(())

我们还可以把两个Enumerator串联起来向一个Iteratee发送数据:

 val futEnums = {
   Iteratee.flatten {
     (enumUsers >>> enumColors) |>> consume
   }.run.map(println)                       //> futEnums  : scala.concurrent.Future[Unit] = List()
 }
  Await.ready(futEnums,Duration.Inf)
      //> TigerHoverGrandJohnRedWhiteBlueYellow res3: demo.worksheet.enumerator.futEnums.type = Success(())

当然,最实用的应该是把InputStream的数据推送给一个Iteratee,如把一个文件内容发送给Iteratee:

/**
   * Create an enumerator from the given input stream.
   *
   * Note that this enumerator will block when it reads from the file.
   *
   * @param file The file to create the enumerator from.
   * @param chunkSize The size of chunks to read from the file.
   */
  def fromFile(file: java.io.File, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    fromStream(new java.io.FileInputStream(file), chunkSize)(ec)
  }

/**
   * Create an enumerator from the given input stream.
   *
   * This enumerator will block on reading the input stream, in the supplied ExecutionContext.  Care must therefore
   * be taken to ensure that this isn‘t a slow stream.  If using this with slow input streams, make sure the
   * ExecutionContext is appropriately configured to handle the blocking.
   *
   * @param input The input stream
   * @param chunkSize The size of chunks to read from the stream.
   * @param ec The ExecutionContext to execute blocking code.
   */
  def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    implicit val pec = ec.prepare()
    generateM({
      val buffer = new Array[Byte](chunkSize)
      val bytesRead = blocking { input.read(buffer) }
      val chunk = bytesRead match {
        case -1 => None
        case `chunkSize` => Some(buffer)
        case read =>
          val input = new Array[Byte](read)
          System.arraycopy(buffer, 0, input, 0, read)
          Some(input)
      }
      Future.successful(chunk)
    })(pec).onDoneEnumerating(input.close)(pec)
  }

这项功能的核心函数是这个generateM,它的函数款式如下:

/**
   * Like [[play.api.libs.iteratee.Enumerator.repeatM]], but the callback returns an Option, which allows the stream
   * to be eventually terminated by returning None.
   *
   * @param e The input function.  Returns a future eventually redeemed with Some value if there is input to pass, or a
   *          future eventually redeemed with None if the end of the stream has been reached.
   */
  def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] = checkContinue0(new TreatCont0[E] {
    private val pec = ec.prepare()

    def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]) = executeFuture(e)(pec).flatMap {
      case Some(e) => loop(k(Input.El(e)))
      case None => Future.successful(Cont(k))
    }(dec)
  })

checkContinue0函数是这样定义的:

trait TreatCont0[E] {

    def apply[A](loop: Iteratee[E, A] => Future[Iteratee[E, A]], k: Input[E] => Iteratee[E, A]): Future[Iteratee[E, A]]

  }

  def checkContinue0[E](inner: TreatCont0[E]) = new Enumerator[E] {

    def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {

      def step(it: Iteratee[E, A]): Future[Iteratee[E, A]] = it.fold {
        case Step.Done(a, e) => Future.successful(Done(a, e))
        case Step.Cont(k) => inner[A](step, k)
        case Step.Error(msg, e) => Future.successful(Error(msg, e))
      }(dec)

      step(it)
    }
  }

从这段代码 case Step.Cont(k)=>inner[A](step, k)可以推断操作模式应该是当下游Iteratee在Cont状态下不断递归式调用Cont函数k向下推送数据e。我们再仔细看看generateM的函数款式;

 def generateM[E](e: => Future[Option[E]])(implicit ec: ExecutionContext): Enumerator[E] 

实际上刚才的操作就是重复调用这个e:=>Future[Option[E]]函数。再分析fromStream代码:

  def fromStream(input: java.io.InputStream, chunkSize: Int = 1024 * 8)(implicit ec: ExecutionContext): Enumerator[Array[Byte]] = {
    implicit val pec = ec.prepare()
    generateM({
      val buffer = new Array[Byte](chunkSize)
      val bytesRead = blocking { input.read(buffer) }
      val chunk = bytesRead match {
        case -1 => None
        case `chunkSize` => Some(buffer)
        case read =>
          val input = new Array[Byte](read)
          System.arraycopy(buffer, 0, input, 0, read)
          Some(input)
      }
      Future.successful(chunk)
    })(pec).onDoneEnumerating(input.close)(pec)
  }

我们看到传入generateM的参数是一段代码,在Iteratee状态为Cont时会不断重复运行,也就是说这段代码会逐次从输入源中读取chunkSize个Byte。这种做法是典型的Streaming方式,避免了一次性上载所有数据。下面是一个文件读取Enumerator例子:

 import java.io._
 val fileEnum: Enumerator[Array[Byte]] = {
  Enumerator.fromFile(new File("/users/tiger/lines.txt"))
 }
 val futFile = Iteratee.flatten { fileEnum |>> consume }.run.map(println)

注意:fileEnum |>> consume并不能通过编译,这是因为fileEnum是个Enumerator[Array[Byte]],而consume是个Iteratee[String,String],Array[Byte]与String类型不符。我们可以用个Enumeratee来进行相关的转换。下面就介绍一下Enumeratee的功能。

Enumeratee其实是一种转换器。它把Enumerator产生的数据转换成能适配Iteratee的数据类型,或者Iteratee所需要的数据。比如我们想把一串字符类的数字汇总相加时,首先必须把字符转换成数字类型才能进行Iteratee的汇总操作:

val strNums = Enumerator("1","2","3")            //> strNums  : play.api.libs.iteratee.Enumerator[String] = [email protected]
 val sumIteratee: Iteratee[Int,Int] = Iteratee.fold(0)((s,i) => s+i)
                                                 //> sumIteratee  : play.api.libs.iteratee.Iteratee[Int,Int] = Cont(<function1>)

 val strToInt: Enumeratee[String,Int] = Enumeratee.map {s => s.toInt}
                                                 //> strToInt  : play.api.libs.iteratee.Enumeratee[String,Int] = [email protected]
 strNums |>> strToInt.transform(sumIteratee)     //> res4: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
 strNums |>> strToInt &>> sumIteratee            //> res5: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[String,Int]] = List()
 strNums.through(strToInt) |>> sumIteratee       //> res6: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] = List()
 val futsum = Iteratee.flatten(strNums &> strToInt |>> sumIteratee).run.map(println)
                                                //> futsum  : scala.concurrent.Future[Unit] = List()
 Await.ready(futsum,Duration.Inf)               //> 6
                                                //| res7: demo.worksheet.enumerator.futsum.type = Success(())

在上面这个例子里Enumerator数据元素是String, Iteratee操作数据类型是Int, strToInt是个把String转换成Int的Enumeratee,我们用了几种转换方式的表达形式,结果都是一样的,等于6。我们可以用Enumerator.through或者Enumeratee.transform来连接Enumerator与Iteratee。当然,我们也可以筛选输入Iteratee的数据:

val sum2 = strNums &> Enumeratee.take(2) &> strToInt |>> sumIteratee
                 //> sum2  : scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Int]] =List()
 val futsum2 = Iteratee.flatten(sum2).run.map(println)
                                                  //> futsum2  : scala.concurrent.Future[Unit] = List()
 Await.ready(futsum2,Duration.Inf)                //> 3
                                                  //| res8: demo.worksheet.enumerator.futsum2.type = Success(())

上面例子里的Enumeratee.take(2)就是一个数据处理的Enumeratee。

现在Enumerator+Enumeratee+Iteratee从功能上越来越像fs2了,当然了,Iteratee就是一个流工具库。我们已经选择了fs2,因为它可以支持灵活的并行运算,所以再深入讨论Iteratee就没什么意义了。

时间: 2024-10-29 13:15:56

FunDA(6)- Reactive Streams:Play with Iteratees、Enumerator and Enumeratees的相关文章

FunDA(9)- Stream Source:reactive data streams

上篇我们讨论了静态数据源(Static Source, snapshot).这种方式只能在预知数据规模有限的情况下使用,对于超大型的数据库表也可以说是不安全的资源使用方式.Slick3.x已经增加了支持Reactive-Streams功能,可以通过Reactive-Streams API来实现有限内存空间内的无限规模数据读取,这正符合了FunDA的设计理念:高效.便捷.安全的后台数据处理工具库.我们在前面几篇讨论里介绍了Iteratee模式,play-iteratees支持Reactive-St

FunDA(4)- 数据流内容控制:Stream data element control

上节我们探讨了通过scalaz-stream-fs2来驱动一套数据处理流程,用fs2的Pipe类型来实现对数据流的逐行操作.本篇讨论准备在上节讨论的基础上对数据流的流动和元素操作进行优化完善.如数据流动中增加诸如next.skip.eof功能.内容控制中增加对行元素的append.insert.update.remove等操作方法.但是经过一番对fs2的再次解读,发现这些操作模式并不像我所想象那样的方式,实际上用fs2来实现数据行控制可能会更加简单和直接.这是因为与传统数据库行浏览方式不同的是f

FunDA(3)- 流动数据行操作:FDAPipeLine operations using scalaz-stream-fs2

在上节讨论里我们介绍了数据行流式操作的设想,主要目的是把后台数据库的数据载入前端内存再拆分为强类型的数据行,这样我们可以对每行数据进行使用和处理.形象点描述就是对内存里的一个数据流(data-stream)进行逐行操作.我们在上节用foreach模拟了一个流控来示范数据行的操作处理.在这节我们讨论一下用scalaz-stream-fs2作为数据流管理工具来实现FunDA的数据行流动管理功能.fs2的Stream是一种自然的拖动型(pull-model)数据流.而fs2的Pipe类型则像是管道的阀

FunDA(17)- 示范:异常处理与事后处理 - Exceptions handling and Finalizers

作为一个能安全运行的工具库,为了保证占用资源的安全性,对异常处理(exception handling)和事后处理(final clean-up)的支持是不可或缺的.FunDA的数据流FDAPipeLine一般是通过读取数据库数据形成数据源开始的.为了保证每个数据源都能被安全的使用,FunDA提供了事后处理finalizing程序接口来实现数据流使用完毕后的清理及异常处理(error-handling)程序接口来捕获和处理使用过程中出现的异常情况.首先,事后处理程序(finalizer)保证了在

FunDA(15)- 示范:任务并行运算 - user task parallel execution

FunDA的并行运算施用就是对用户自定义函数的并行运算.原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例.这些函数运行实例同时在各自不同的线程里同步运算直至耗尽所有输入.并行运算的具体函数实例数是用fs2-nondeterminism的算法根据CPU内核数.线程池配置和用户指定的最大运算实例数来决定的.我们在这次示范里可以对比一下同样工作内容的并行运算和串形运算效率.在前面示范里我们获取了一个AQMRPT表.但这个表不够合理化(normalized):state和c

泛函编程(32)-泛函IO:IO Monad

由于泛函编程非常重视函数组合(function composition),任何带有副作用(side effect)的函数都无法实现函数组合,所以必须把包含外界影响(effectful)副作用不纯代码(impure code)函数中的纯代码部分(pure code)抽离出来形成独立的另一个纯函数.我们通过代码抽离把不纯代码逐步抽离向外推并在程序里形成一个纯代码核心(pure core).这样我们就可以顺利地在这个纯代码核心中实现函数组合.IO Monad就是泛函编程处理副作用代码的一种手段.我们先

Scalaz(21)-类型例证:Liskov and Leibniz - type evidence

Leskov,Leibniz,别扭的名字,是什么地干活?碰巧从scalaz源代码里发现了这么个东西:scalaz/BindSyntax.scala /** Wraps a value `self` and provides methods related to `Bind` */ final class BindOps[F[_],A] private[syntax](val self: F[A])(implicit val F: Bind[F]) extends Ops[F[A]] { ////

Scalaz(2)- 基础篇:随意多态-typeclass, ad-hoc polymorphism

scalaz功能基本上由以下三部分组成: 1.新的数据类型,如:Validation, NonEmptyList ... 2.标准scala类型的延伸类型,如:OptionOps, ListOps ... 3.通过typeclass的随意多态(ad-hoc polymorphism)编程模式实现的大量概括性函数组件库 我们在这篇重点讨论多态(polymorphism),特别是随意多态(ad-hoc polymorphism). 多态,简单来讲就是一项操作(operation)可以对任意类型施用.

Scalaz(1)- 基础篇:隐式转换解析策略-Implicit resolution

在正式进入scalaz讨论前我们需要理顺一些基础的scalaz结构组成概念和技巧.scalaz是由即兴多态(ad-hoc polymorphism)类型(typeclass)组成.scalaz typeclass在scala中的应用有赖于scala compiler的一项特别功能:隐式转换(implicit conversion),使程序表述更精简.由于隐式转换是一项compiler功能,在程序编译(compile)的时候是由compiler来进行类型转换代码的产生和替代的. 让我们先了解一下作