Scalaz(47)- scalaz-stream: 深入了解-Source

scalaz-stream库的主要设计目标是实现函数式的I/O编程(functional I/O)。这样用户就能使用功能单一的基础I/O函数组合成为功能完整的I/O程序。还有一个目标就是保证资源的安全使用(resource safety):使用scalaz-stream编写的I/O程序能确保资源的安全使用,特别是在完成一项I/O任务后自动释放所有占用的资源包括file handle、memory等等。我们在上一篇的讨论里笼统地解释了一下scalaz-stream核心类型Process的基本情况,不过大部分时间都用在了介绍Process1这个通道类型。在这篇讨论里我们会从实际应用的角度来介绍整个scalaz-stream链条的设计原理及应用目的。我们提到过Process具有Emit/Await/Halt三个状态,而Append是一个链接stream节点的重要类型。先看看这几个类型在scalaz-stream里的定义:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]

case class Await[+F[_], A, +O](
    req: F[A]
    , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance
    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] 

case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]

case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O]

我们看到Process[F,O]被包嵌在Trampoline类型里,所以Process是通过Trampoline来实现函数结构化的,可以有效解决大量stream运算堆栈溢出问题(StackOverflowError)。撇开Trampoline等复杂的语法,以上类型可以简化成以下理论结构:

trait Process[+F[_],+O]
case object Cause

case class Emit[O](out: O) extends Process[Nothing, O] 

case class Halt(cause: Cause) extends Process[Nothing,Nothing]

case class Await[+F[_],E,+O](
  req: F[E],
  rcv: E => Process[F,O],
  preempt: E => Process[F,Nothing] = Halt) extends Process[F,O]

case class Append[+F[_],+O](
  head: Process[F,O],
  stack: Vector[Cause => Process[F,O]]) extends Process[F,O]

我们来说明一下:

Process[F[_],O]:从它的类型可以推断出scalaz-stream可以在输出O类型元素的过程中进行可能含副作用的F类型运算。

Emit[O](out: O):发送一个O类型元素;不可能进行任何附加运算

Halt(cause: Cause):停止发送;cause是停止的原因:End-完成发送,Err-出错终止,Kill-强行终止

Await[+F[_],E,+O]:这个是运算流的核心Process状态。先进行F运算req,得出结果E后输入函数rcv转换到下一个Process状态,完成后执行preempt这个事后清理函数。这不就是个flatMap函数结构版嘛。值得注意的是E类型是个内部类型,由F运算产生后输入rcv后就不再引用了。我们可以在preepmt函数里进行资源释放。如果我们需要构建一个运算流,看来就只有使用这个Await类型了

Append[+F[_],+O]:Append是一个Process[F,O]链接类型。首先它不但担负了元素O的传送,更重要的是它还可以把上一节点的F运算传到下一个节点。这样才能在下面节点时运行对上一个节点的事后处置函数(finalizer)。Append可以把多个节点结成一个大节点:head是第一个节点,stack是一串函数,每个函数接受上一个节点完成状态后运算出下一个节点状态

一个完整的scalaz-stream由三个类型的节点组成Source(源点)/Transducer(传换点)/Sink(终点)。节点间通过Await或者Append来链接。我们再来看看Source/Transducer/Sink的类型款式:

上游:Source       >>> Process0[O]   >>> Process[F[_],O]

中游:Transduce    >>> Process1[I,O]

下游:Sink/Channel >>> Process[F[_],O => F[Unit]], Channel >>> Process[F[_],I => F[O]]

我们可以用一个文件处理流程来描述完整scalaz-stream链条的作用:

Process[F[_],O],用F[O]方式读取文件中的O值,这时F是有副作用的

>>> Process[I,O],I代表从文件中读取的原始数据,O代表经过筛选、处理产生的输出数据

>>> O => F[Unit]是一个不返回结果的函数,代表对输入的O类型数据进行F运算,如把O类型数据存写入一个文件

/>> I => F[O]是个返回结果的函数,对输入I进行F运算后返回O,如把一条记录写入数据库后返回写入状态

以上流程简单描述:从文件中读出数据->加工处理读出数据->写入另一个文件。虽然从描述上看起来很简单,但我们的目的是资源安全使用:无论在任何终止情况下:正常读写、中途强行停止、出错终止,scalaz-stream都会主动关闭开启的文件、停止使用的线程、释放占用的内存等其它资源。这样看来到不是那么简单了。我们先试着分析Source/Transducer/Sink这几种类型的作用:

import Process._
emit(0)                        //> res0: scalaz.stream.Process0[Int] = Emit(Vector(0))
emitAll(Seq(1,2,3))            //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
Process(1,2,3)                 //> res2: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3))
Process()                      //> res3: scalaz.stream.Process0[Nothing] = Emit(List())

以上都是Process0的构建方式,也算是数据源。但它们只是代表了内存中的一串值,对我们来说没什么意义,因为我们希望从外设获取这些值,比如从文件或者数据库里读取数据,也就是说需要F运算效果。Process0[O] >>> Process[Nothing,O],而我们需要的是Process[F,O]。那么我们这样写如何呢?

val p: Process[Task,Int] = emitAll(Seq(1,2,3))
   //> p  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))

emitAll(Seq(1,2,3)).toSource
   //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))

类型倒是匹配了,但表达式Emit(...)里没有任何Task的影子,这个无法满足我们对Source的需要。看来只有以下这种方式了:

await(Task.delay{3})(emit)
//> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected],<function1>,<function1>)
eval(Task.delay{3})
//> res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected],<function1>,<function1>)

现在不但类型匹配,而且表达式里还包含了Task运算。我们通过Task.delay可以进行文件读取等带有副作用的运算,这是因为Await将会运行req:F[E] >>> Task[Int]。这正是我们需要的Source。那我们能不能用这个Source来发出一串数据呢?

def emitSeq[A](xa: Seq[A]):Process[Task,A] =
  xa match {
    case h :: t => await(Task.delay {h})(emit) ++ emitSeq(t)
    case Nil => halt
  }                                     //> emitSeq: [A](xa: Seq[A])scalaz.stream.Process[scalaz.concurrent.Task,A]
val es1 = emitSeq(Seq(1,2,3))           //> es1  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Await([email protected],<function1>,<function1>),Vector(<function1>))
val es2 = emitSeq(Seq("a","b","c"))     //> es2  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Await(
[email protected],<function1>,<function1>),Vector(<function1>))
es1.runLog.run                          //> res7: Vector[Int] = Vector(1, 2, 3)
es2.runLog.run                          //> res8: Vector[String] = Vector(a, b, c)

以上示范中我们用await运算了Task,然后返回了Process[Task,?],一个可能带副作用运算的Source。实际上我们在很多情况下都需要从外部的源头用Task来获取一些数据,通常这些数据源都对数据获取进行了异步(asynchronous)运算处理,然后通过callback方式来提供这些数据。我们可以用Task.async函数来把这些callback函数转变成Task,下一步我们只需要用Process.eval或者await就可以把这个Task升格成Process[Task,?]。我们先看个简单的例子:假如我们用scala.concurrent.Future来进行异步数据读取,可以这样把Future转换成Process:

def getData(dbName: String): Task[String] = Task.async { cb =>
   import scala.concurrent._
   import scala.concurrent.ExecutionContext.Implicits.global
   import scala.util.{Success,Failure}
   Future { s"got data from $dbName" }.onComplete {
     case Success(a) => cb(a.right)
     case Failure(e) => cb(e.left)
   }
}                                        //> getData: (dbName: String)scalaz.concurrent.Task[String]
val procGetData = eval(getData("MySQL")) //> procGetData  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await([email protected],<function1>,<function1>)
procGetData.runLog.run                   //> res9: Vector[String] = Vector(got data from MySQL)

我们也可以把java的callback转变成Task:

  import com.ning.http.client._
  val asyncHttpClient = new AsyncHttpClient()     //> asyncHttpClient  : com.ning.http.client.AsyncHttpClient = [email protected]
  def get(s: String): Task[Response] = Task.async[Response] { callback =>
    asyncHttpClient.prepareGet(s).execute(
      new AsyncCompletionHandler[Unit] {
        def onCompleted(r: Response): Unit = callback(r.right)

        def onError(e: Throwable): Unit = callback(e.left)
      }
    )
  }                 //> get: (s: String)scalaz.concurrent.Task[com.ning.http.client.Response]
  val prcGet = Process.eval(get("http://sina.com"))
                    //> prcGet  : scalaz.stream.Process[scalaz.concurrent.Task,com.ning.http.client.Response] = Await([email protected],<function1>,<function1>)
  prcGet.run.run    //> 12:25:27.852 [New I/O worker #1] DEBUG c.n.h.c.p.n.r.NettyConnectListener -Request using non cached Channel '[id: 0x23fa1307, /192.168.200.3:50569 =>sina.com/66.102.251.33:80]':

如果直接按照scalaz Task callback的类型款式 def async(callback:(Throwable \/ Unit) => Unit):

  def read(callback: (Throwable \/ Array[Byte]) => Unit): Unit = ???
                                 //> read: (callback: scalaz.\/[Throwable,Array[Byte]] => Unit)Unit
  val t: Task[Array[Byte]] = Task.async(read)     //> t  : scalaz.concurrent.Task[Array[Byte]] = [email protected]
  val t2: Task[Array[Byte]] = for {
    bytes <- t
    moarBytes <- t
  } yield (bytes ++ moarBytes)          //> t2  : scalaz.concurrent.Task[Array[Byte]] = [email protected]
  val prct2 = Process.eval(t2)          //> prct2  : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await([email protected],<function1>,<function1>)

  def asyncRead(succ: Array[Byte] => Unit, fail: Throwable => Unit): Unit = ???
                          //> asyncRead: (succ: Array[Byte] => Unit, fail: Throwable => Unit)Unit
  val t3: Task[Array[Byte]] = Task.async { callback =>
     asyncRead(b => callback(b.right), err => callback(err.left))
  }                      //> t3  : scalaz.concurrent.Task[Array[Byte]] = [email protected]
  val t4: Task[Array[Byte]] = t3.flatMap(b => Task(b))
                         //> t4  : scalaz.concurrent.Task[Array[Byte]] = [email protected]
  val prct4 = Process.eval(t4)      //> prct4  : scalaz.stream.Process[scalaz.concurrent.Task,Array[Byte]] = Await([email protected],<function1>,<function1>)

我们也可以用timer来产生Process[Task,A]:

  import scala.concurrent.duration._
  implicit val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(3)
                  //> scheduler  : java.util.concurrent.ScheduledExecutorService = [email protected][Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
  val fizz = time.awakeEvery(3.seconds).map(_ => "fizz")
                  //> fizz  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await([email protected],<function1>,<function1>)
  val fizz3 = fizz.take(3)   //> fizz3  : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
  fizz3.runLog.run           //> res9: Vector[String] = Vector(fizz, fizz, fizz)

Queue、Top和Signal都可以作为带副作用数据源的构建器。我们先看看Queue是如何产生数据源的:

  type BigStringResult = String
  val qJobResult = async.unboundedQueue[BigStringResult]
                         //> qJobResult  : scalaz.stream.async.mutable.Queue[demo.ws.blogStream.BigStringResult] = [email protected]
  def longGet(jobnum: Int): BigStringResult = {
    Thread.sleep(2000)
    s"Some large data sets from job#${jobnum}"
  }                      //> longGet: (jobnum: Int)demo.ws.blogStream.BigStringResult

//  multi-tasking
    val start = System.currentTimeMillis()        //> start  : Long = 1468556250797
    Task.fork(qJobResult.enqueueOne(longGet(1))).unsafePerformAsync{case _ => ()}
    Task.fork(qJobResult.enqueueOne(longGet(2))).unsafePerformAsync{case _ => ()}
    Task.fork(qJobResult.enqueueOne(longGet(3))).unsafePerformAsync{case _ => ()}
    val timemill = System.currentTimeMillis() - start
                                                  //> timemill  : Long = 17
    Thread.sleep(3000)
    qJobResult.close.run
 val bigData = {
 //multi-tasking
    val j1 = qJobResult.dequeue
    val j2 = qJobResult.dequeue
    val j3 = qJobResult.dequeue
    for {
     r1 <- j1
     r2 <- j2
     r3 <- j3
    } yield r1 + ","+ r2 + "," + r3
  }                       //> bigData  : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = Await([email protected],<function1>,<function1>)

  bigData.runLog.run      //> res9: Vector[String] = Vector(Some large data sets from job#2,Some large data sets from job#3,Some large data sets from job#1)

再看看Topic示范:

import scala.concurrent._
  import scala.concurrent.duration._
  import scalaz.stream.async.mutable._
  import scala.concurrent.ExecutionContext.Implicits.global
  val sharedData: Topic[BigStringResult] = async.topic()
       //> sharedData  : scalaz.stream.async.mutable.Topic[demo.ws.blogStream.BigStringResult] = [email protected]
  val subscriber = sharedData.subscribe.runLog    //> subscriber  : scalaz.concurrent.Task[Vector[demo.ws.blogStream.BigStringResult]] = [email protected]
  val otherThread = future {
    subscriber.run // Added this here - now subscriber is really attached to the topic
  }                //> otherThread  : scala.concurrent.Future[Vector[demo.ws.blogStream.BigStringResult]] = List()
  // Need to give subscriber some time to start up.
  // I doubt you'd do this in actual code.

  // topics seem more useful for hooking up things like
  // sensors that produce a continual stream of data,

  // and where individual values can be dropped on
  // floor.
  Thread.sleep(100)

  sharedData.publishOne(longGet(1)).run // don't just call publishOne; need to run the resulting task
  sharedData.close.run // Don't just call close; need to run the resulting task

  // Need to wait for the output
  val result = Await.result(otherThread, Duration.Inf)
       //> result  : Vector[demo.ws.blogStream.BigStringResult] = Vector(Some large data sets from job#1)

以上对可能带有副作用的Source的各种产生方法提供了解释和示范。scalaz-stream的其他类型节点将在下面的讨论中深入介绍。

时间: 2024-10-25 16:29:54

Scalaz(47)- scalaz-stream: 深入了解-Source的相关文章

Scalaz(16)- Monad:依赖注入-Dependency Injection By Reader Monad

在上一篇讨论里我们简单的介绍了一下Cake Pattern和Reader Monad是如何实现依赖注入的.主要还是从方法上示范了如何用Cake Pattern和Reader在编程过程中解析依赖和注入依赖.考虑到依赖注入模式在编程中的重要性和普遍性,觉着还需要再讨论的深入一些,使依赖注入模式在FP领域里能从理论走向实际.既然我们正在scalaz的介绍系列里,所以这篇我们就着重示范Reader Monad的依赖注入方法. 再说明一下依赖注入:我们说过在团队协作开发里能够实现软件模块的各自独立开发,原

Scalaz(20)-Monad: Validation-Applicative版本的Either

scalaz还提供了个type class叫Validation.乍看起来跟\/没什么分别.实际上这个Validation是在\/的基础上增加了Applicative功能,就是实现了ap函数.通过Applicative实例就可以同时运算多个Validation并返回多条异常信息.所以,\/与Validation核心分别就在于Validation可以返回多条异常信息.Validation也是由两种状态组成:Success和Failure,分别与\/的left和right相对应.Failure可以返

Scalaz(12)- Monad:Writer - some kind of logger

通过前面的几篇讨论我们了解到F[T]就是FP中运算的表达形式(representation of computation).在这里F[]不仅仅是一种高阶类型,它还代表了一种运算协议(computation protocol)或者称为运算模型好点,如IO[T],Option[T].运算模型规范了运算值T的运算方式.而Monad是一种特殊的FP运算模型M[A],它是一种持续运算模式.通过flatMap作为链条把前后两个运算连接起来.多个flatMap同时作用可以形成一个程序运行链.我们可以在flat

Spring Cloud - 8 (Spring Cloud Stream) &#539798;

原文: http://blog.gqylpy.com/gqy/497 置顶:来自一名75后老程序员的武林秘籍--必读(博主推荐) 来,先呈上武林秘籍链接:http://blog.gqylpy.com/gqy/401/ 你好,我是一名极客!一个 75 后的老工程师! 我将花两分钟,表述清楚我让你读这段文字的目的! 如果你看过武侠小说,你可以把这个经历理解为,你失足落入一个山洞遇到了一位垂暮的老者!而这位老者打算传你一套武功秘籍! 没错,我就是这个老者! 干研发 20 多年了!我也年轻过,奋斗过!我

流式计算(二)-Kafka Stream

前面说了Java8的流,这里还说流处理,既然是流,比如水流车流,肯定得有流的源头,源可以有多种,可以自建,也可以从应用端获取,今天就拿非常经典的Kafka做源头来说事,比如要来一套应用日志实时分析框架,或者是高并发实时流处理框架,正是Kafka的拿手好戏. 环境:Idea2019.03/Gradle6.0.1/JDK11.0.4/Lambda/RHEL8.0/VMWare15.5/Springboot2.2.1.RELEASE/Zookeeper3.5.5/Kafka2.3.1 难度:新手--战

基于虎书实现LALR(1)分析并生成GLSL编译器前端代码(C#)

基于虎书实现LALR(1)分析并生成GLSL编译器前端代码(C#) 为了完美解析GLSL源码,获取其中的信息(都有哪些in/out/uniform等),我决定做个GLSL编译器的前端(以后简称编译器或FrontEndParser). 以前我做过一个CGCompiler,可以自动生成LL(1)文法的编译器代码(C#语言的).于是我从<The OpenGL ® Shading Language>(以下简称"PDF")找到一个GLSL的文法,就开始试图将他改写为LL(1)文法.等

图像处理

1 public class ImageDraw 2 { 3 public Image NewBitmap(int w, int h, float dpix, float dpiy) 4 { 5 var rt = new Bitmap(w, h); 6 rt.SetResolution(dpix, dpiy); 7 return rt; 8 } 9 10 /// <summary> 得到图像的指定区域. 11 /// </summary> 12 /// <param name

Linux 程序包管理及sed基础

1. 简述rpm与yum命令的常见选项,并举例 rpm简称包管理器,即RedHat系列发行版的程序包管理工具,由于它遵循GPL规则且功能强大方便,故而广受欢迎: yum是rhel系列系统上rpm包管理器的前端工具,它可以自动执行系统更新,包括依赖性分析和基于"知识库"元数据的过时处理,还可以执行新的包的安装,删除旧的包,为其用户提供更方便快捷的安装程序. rpm命令:rpm [OPTIONS] [PACKAGE_FILE]安装:-i, --install升级:-U, --update,

handsontable的单元格操作方法

1.为handsontable添加钩子方法 addHook(key,callback):key为钩子方法名 [javascript] view plaincopyprint? <span style="font-size:18px;">例如:hot.addHook(‘beforeInit‘, myCallback);</span> addHookOnce(key,callback):添加只使用一次的方法,用完后自动删除 [javascript] view pla

For,Function,Lazy

1 package com.dtgroup.study 2 import scala.io.Source 3 4 object ForFunctionLazy { 5 def main(args: Array[String]): Unit = { 6 // for 7 println("for:line 0") 8 for (i <- 1 to 2; j <- 1 to 2) println("i=" + i + ",j=" + j)