FunDA(9)- Stream Source:reactive data streams

上篇我们讨论了静态数据源(Static Source, snapshot)。这种方式只能在预知数据规模有限的情况下使用,对于超大型的数据库表也可以说是不安全的资源使用方式。Slick3.x已经增加了支持Reactive-Streams功能,可以通过Reactive-Streams API来实现有限内存空间内的无限规模数据读取,这正符合了FunDA的设计理念:高效、便捷、安全的后台数据处理工具库。我们在前面几篇讨论里介绍了Iteratee模式,play-iteratees支持Reactive-Streams并且提供与Slick3.x的接口API,我们就在这篇讨论里介绍如何把Slick-Reactive-Streams转换成fs2-Streams。根据Slick官方文档:Slick可以通过db.stream函数用Reactive-Stream方式来读取后台数据,具体的配置如下:

  val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
  val action = queryAction.withStatementParameters(fetchSize = 512)
  val publisher = db.stream(disableAutocommit andThen action)

首先,我们需要取消自动提交(disableAutocommit)。fetchSize是缓存数据页长度(每批次读取数据字数),然后用db.stream来构成一个Reactive-Streams标准的数据源publisher。Slick官方网页只提供了下面这个使用publisher的例子:

  val fut = publisher.foreach(s => println(s))
  Await.ready(fut,Duration.Inf)

除了数据枚举外就没什么用处,也无法提供更细节点的示范。FunDA的具体解决方案是把publisher转换成play-iteratee的Enumerator。play-iteratee支持Reactive-Streams,所以这个Enumerator应该具备协调后台数据和内存缓冲之间关系(back-pressure)的功能。play-iteratee是如下构建Enumerator的;

import play.api.libs.iteratee._
val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

enumerator从后台数据库表中产生的数据源通过Iteratee把数据元素enqueue推送给一个fs2的queue:

    private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont {
      case Input.EOF => {
        q.enqueue1(None).unsafeRun
        Done((), Input.Empty)
      }
      case Input.Empty => pushData(q)
      case Input.El(e) => {
        q.enqueue1(Some(e)).unsafeRun
        pushData(q)
      }
    }

然后fs2进行dequeue后生成fs2的Stream:

      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
        pipe.unNoneTerminate(q.dequeue)
      }

整个构建Stream的过程在FunDA的fdasources包是这样定义的:

package com.bayakala.funda.fdasources
import fs2._
import play.api.libs.iteratee._
import com.bayakala.funda.fdapipes._
import slick.driver.JdbcProfile

object FDADataStream {

  class FDAStreamLoader[SOURCE, TARGET](slickProfile: JdbcProfile, convert: SOURCE => TARGET) {

    import slickProfile.api._

    def fda_typedStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[TARGET] = {
      val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
      val action_ = action.withStatementParameters(fetchSize = fetchSize)
      val publisher = slickDB.stream(disableAutocommit andThen action)
      val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
        pipe.unNoneTerminate(q.dequeue).map {row => convert(row)}
      }

    }
    def fda_plainStream(action: DBIOAction[Iterable[SOURCE],Streaming[SOURCE],Effect.Read])(slickDB: Database)(fetchSize: Int, queSize: Int): FDAPipeLine[SOURCE] = {
      val disableAutocommit = SimpleDBIO(_.connection.setAutoCommit(false))
      val action_ = action.withStatementParameters(fetchSize = fetchSize)
      val publisher = slickDB.stream(disableAutocommit andThen action)
      val enumerator = streams.IterateeStreams.publisherToEnumerator(publisher)

      Stream.eval(async.boundedQueue[Task,Option[SOURCE]](queSize)).flatMap { q =>
        Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()
        pipe.unNoneTerminate(q.dequeue)
      }

    }
    private def pushData[R](q: async.mutable.Queue[Task,Option[R]]): Iteratee[R,Unit] = Cont {
      case Input.EOF => {
        q.enqueue1(None).unsafeRun
        Done((), Input.Empty)
      }
      case Input.Empty => pushData(q)
      case Input.El(e) => {
        q.enqueue1(Some(e)).unsafeRun
        pushData(q)
      }
    }

  }
  object FDAStreamLoader {
    def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE => TARGET): FDAStreamLoader[SOURCE, TARGET] =
      new FDAStreamLoader[SOURCE, TARGET](slickProfile, converter)
  }
}

FDADataStream对象内主要实现了fda_typedStream和fda_plainStream。fda_typedStream提供了SOURCE=>TARGET的转换。从Enumerator转换到Stream整个过程和原理我们在FunDA(7)里已经详细介绍过了。下面我们看看FunDA-Example中fda_typedStream的具体应用例子:

package com.bayakala.funda.fdasources.examples
import slick.driver.H2Driver.api._
import com.bayakala.funda.fdasources.FDADataStream._
import com.bayakala.funda.samples._
import com.bayakala.funda.fdarows._
import com.bayakala.funda.fdapipes._
import FDANodes._
import FDAValves._
object Example2 extends App {
   val albums = SlickModels.albums
   val companies = SlickModels.companies

//数据源query
   val albumsInfo = for {
     (a,c) <- albums join companies on (_.company === _.id)
   } yield (a.title,a.artist,a.year,c.name)

//query结果强类型(用户提供)
  case class Album(title: String, artist: String, year: Int, publisher: String) extends FDAROW
//转换函数(用户提供)
  def toTypedRow(row: (String, String, Option[Int], String)): Album =
    Album(row._1, row._2, row._3.getOrElse(2000), row._4)

  val db = Database.forConfig("h2db")

  val streamLoader = FDAStreamLoader(slick.driver.H2Driver, toTypedRow _)
  val albumStream = streamLoader.fda_typedStream(albumsInfo.result)(db)(512,128)

//定义一个用户作业函数:列印数据内容
  def printAlbums: FDATask[FDAROW] = row => {
    row match {
      case album: Album =>
        println("____________________")
        println(s"品名:${album.title}")
        println(s"演唱:${album.artist}")
        println(s"年份:${album.year}")
        println(s"发行:${album.publisher}")
        fda_next(album)
      case _ => fda_skip
    }
  }

  albumStream.through(fda_execUserTask(printAlbums)).run.unsafeRun

}

运算结果:

品名:Keyboard Cat‘s Greatest Hits
演唱:Keyboard Cat
年份:1999
发行:Sony Music Inc
____________________
品名:Spice
演唱:Spice Girls
年份:1999
发行:Columbia Records
____________________
品名:Whenever You Need Somebody
演唱:Rick Astley
年份:1999
发行:Sony Music Inc
____________________
品名:The Triumph of Steel
演唱:Manowar
年份:1999
发行:The K-Pops Singers
____________________
品名:Believe
演唱:Justin Bieber
年份:1999
发行:Columbia Records

Process finished with exit code 0
时间: 2024-08-03 07:57:04

FunDA(9)- Stream Source:reactive data streams的相关文章

FunDA(4)- 数据流内容控制:Stream data element control

上节我们探讨了通过scalaz-stream-fs2来驱动一套数据处理流程,用fs2的Pipe类型来实现对数据流的逐行操作.本篇讨论准备在上节讨论的基础上对数据流的流动和元素操作进行优化完善.如数据流动中增加诸如next.skip.eof功能.内容控制中增加对行元素的append.insert.update.remove等操作方法.但是经过一番对fs2的再次解读,发现这些操作模式并不像我所想象那样的方式,实际上用fs2来实现数据行控制可能会更加简单和直接.这是因为与传统数据库行浏览方式不同的是f

FunDA(3)- 流动数据行操作:FDAPipeLine operations using scalaz-stream-fs2

在上节讨论里我们介绍了数据行流式操作的设想,主要目的是把后台数据库的数据载入前端内存再拆分为强类型的数据行,这样我们可以对每行数据进行使用和处理.形象点描述就是对内存里的一个数据流(data-stream)进行逐行操作.我们在上节用foreach模拟了一个流控来示范数据行的操作处理.在这节我们讨论一下用scalaz-stream-fs2作为数据流管理工具来实现FunDA的数据行流动管理功能.fs2的Stream是一种自然的拖动型(pull-model)数据流.而fs2的Pipe类型则像是管道的阀

FunDA(6)- Reactive Streams:Play with Iteratees、Enumerator and Enumeratees

在上一节我们介绍了Iteratee.它的功能是消耗从一些数据源推送过来的数据元素,不同的数据消耗方式代表了不同功能的Iteratee.所谓的数据源就是我们这节要讨论的Enumerator.Enumerator是一种数据源:它会根据下游数据消耗方(Iteratee)的具体状态主动向下推送数据元素.我们已经讨论过Iteratee的状态Step类型: trait Step[E,+A] case class Done[+A,E](a: A, remain: Input[E]) extends Step[

FunDA(17)- 示范:异常处理与事后处理 - Exceptions handling and Finalizers

作为一个能安全运行的工具库,为了保证占用资源的安全性,对异常处理(exception handling)和事后处理(final clean-up)的支持是不可或缺的.FunDA的数据流FDAPipeLine一般是通过读取数据库数据形成数据源开始的.为了保证每个数据源都能被安全的使用,FunDA提供了事后处理finalizing程序接口来实现数据流使用完毕后的清理及异常处理(error-handling)程序接口来捕获和处理使用过程中出现的异常情况.首先,事后处理程序(finalizer)保证了在

FunDA(15)- 示范:任务并行运算 - user task parallel execution

FunDA的并行运算施用就是对用户自定义函数的并行运算.原理上就是把一个输入流截分成多个输入流并行地输入到一个自定义函数的多个运行实例.这些函数运行实例同时在各自不同的线程里同步运算直至耗尽所有输入.并行运算的具体函数实例数是用fs2-nondeterminism的算法根据CPU内核数.线程池配置和用户指定的最大运算实例数来决定的.我们在这次示范里可以对比一下同样工作内容的并行运算和串形运算效率.在前面示范里我们获取了一个AQMRPT表.但这个表不够合理化(normalized):state和c

泛函编程(32)-泛函IO:IO Monad

由于泛函编程非常重视函数组合(function composition),任何带有副作用(side effect)的函数都无法实现函数组合,所以必须把包含外界影响(effectful)副作用不纯代码(impure code)函数中的纯代码部分(pure code)抽离出来形成独立的另一个纯函数.我们通过代码抽离把不纯代码逐步抽离向外推并在程序里形成一个纯代码核心(pure core).这样我们就可以顺利地在这个纯代码核心中实现函数组合.IO Monad就是泛函编程处理副作用代码的一种手段.我们先

Scalaz(21)-类型例证:Liskov and Leibniz - type evidence

Leskov,Leibniz,别扭的名字,是什么地干活?碰巧从scalaz源代码里发现了这么个东西:scalaz/BindSyntax.scala /** Wraps a value `self` and provides methods related to `Bind` */ final class BindOps[F[_],A] private[syntax](val self: F[A])(implicit val F: Bind[F]) extends Ops[F[A]] { ////

Scalaz(2)- 基础篇:随意多态-typeclass, ad-hoc polymorphism

scalaz功能基本上由以下三部分组成: 1.新的数据类型,如:Validation, NonEmptyList ... 2.标准scala类型的延伸类型,如:OptionOps, ListOps ... 3.通过typeclass的随意多态(ad-hoc polymorphism)编程模式实现的大量概括性函数组件库 我们在这篇重点讨论多态(polymorphism),特别是随意多态(ad-hoc polymorphism). 多态,简单来讲就是一项操作(operation)可以对任意类型施用.

Scalaz(1)- 基础篇:隐式转换解析策略-Implicit resolution

在正式进入scalaz讨论前我们需要理顺一些基础的scalaz结构组成概念和技巧.scalaz是由即兴多态(ad-hoc polymorphism)类型(typeclass)组成.scalaz typeclass在scala中的应用有赖于scala compiler的一项特别功能:隐式转换(implicit conversion),使程序表述更精简.由于隐式转换是一项compiler功能,在程序编译(compile)的时候是由compiler来进行类型转换代码的产生和替代的. 让我们先了解一下作