泛函编程(36)-泛函Stream IO:IO数据源-IO Source & Sink

上期我们讨论了IO处理过程:Process[I,O]。我们说Process就像电视信号盒子一样有输入端和输出端两头。Process之间可以用一个Process的输出端与另一个Process的输入端连接起来形成一串具备多项数据处理功能的完整IO过程。但合成的IO过程两头输入端则需要接到一个数据源,而另外一端则可能会接到一个数据接收设备如文件、显示屏等。我们在这篇简单地先介绍一下IO数据源Source和IO数据接收端Sink。

我们先用一个独立的数据类型来代表数据源Source进行简单的示范说明,这个类型与Process类型没有任何关系:

import ProcessLib._
object SourceSink {
trait Source[O] {  //以下helper function都是把Source当作O类的List处理
  def |>[O2](p: Process[O,O2]): Source[O2]   //粘接一个Process p. 向其输入O
  def filter(f: O => Boolean): Source[O] = this |> Process.filter(f) //向p输入O
  def map[O2](f: O => O2): Source[O2] = this |> Process.lift(f)
  def take(n: Int): Source[O] = this |> Process.take(n)  //截取前n个O
  def takeWhile(f: O => Boolean): Source[O] = this |> Process.takeWhile(f)
  def drop(n: Int): Source[O] = this |> Process.drop(n) //跳过前n个O
  def dropWhile(f: O => Boolean): Source[O] = this |> Process.dropWhile(f)
}

从以上trait可以看到:Source的工作原理就是把一个Process的输入黏贴到Source的输出端。我们可以用这个 |> 把一串Process粘到Source的输出,如:Src.proc1.proc2.proc3。不过我们得先把proc1,proc2,proc3定义成Source组件函数,因为Source是一个完全独立的类型。

我们再来看看一个Source特殊案例:

case class ResourceR[R,I,O](   //Source的一个只读资源案例
 acquire: IO[R],   //资源使用门户  resource handle
 release: R => IO[Unit], //完成使用资源后的清理函数
 step: R => IO[Option[I]], //资源内容读取函数
 trans: Process[I,O]  //输出方式
 ) extends Source[O] {
 	def |>[O2](p: Process[O,O2]): Source[O2] =  //实现抽象函数
 	  ResourceR(acquire,release,step,trans |> p) //每次输入都产生一个ResourceR.它的trans与p进行管道对接
 }

这是个只读的数据源。我们看到所有的动作都被包嵌在IO类型里,这样可以把副作用的产生延后到一些Source Interpreter来运算。这里我们只要用最简单的IO来说明就可以了:

trait IO[A] { self =>
    def run: A
    def map[B](f: A => B): IO[B] =
      new IO[B] { def run = f(self.run) }
    def flatMap[B](f: A => IO[B]): IO[B] =
      new IO[B] { def run = f(self.run).run }
}
object IO {
    def unit[A](a: => A): IO[A] = new IO[A] { def run = a }
    def flatMap[A,B](fa: IO[A])(f: A => IO[B]) = fa flatMap f
    def apply[A](a: => A): IO[A] = unit(a) // syntax for IO { .. }
}

这个IO类型我们在前面的讨论里曾经练习过。

现在我们来看看一个文件读取的ResourceR例子:

object Source {
import java.io._
	def lines(fileName: String): Source[String] =  //从文件fileName里读取String
	  ResourceR(   //创建一个Source的实例
	    IO {io.Source.fromFile(fileName) },  //资源
	    (src: io.Source) => IO { src.close },  //清理
	    (src: io.Source) => IO {    //读取
	    	lazy val iterator = src.getLines
	    	if (iterator.hasNext) Some(iterator.next) else None //读完返回None
	    },
	    Process.passUnchanged) //Process[I,I],读什么输入什么
}

现在我们可以这样写一段程序了:

 Source.lines("input.txt").count.exists{_ >= 40000 }
                                                  //> res0: ch15.SourceSink.Source[Boolean] = ResourceR(ch15.SourceSink$IO$$anon$
                                                  //| [email protected],<function1>,<function1>,Await(<function1>))

噢,记住把count和exists放到Source trait里:

	def exists(f: O => Boolean): Source[Boolean] = this |> Process.exists(f)
	def count: Source[Int] = this |> Process.count

上面的表达式可以说还只是IO过程的描述。实际副作用产生是在interpreter里:

	def collect: IO[IndexedSeq[O]] = {  //读取数据源返回IO[IndexedSeq[O]], 用IO.run来实际运算
 		def tryOr[A](a: => A)(cleanup: IO[Unit]): A =  //运算表达式a, 出现异常立即清理现场
 		  try a catch {case e: Exception => cleanup.run; throw e}
 		@annotation.tailrec  //这是个尾递归算法,根据trans状态
 		def go(acc: IndexedSeq[O], cleanup: IO[Unit], step: IO[Option[I]], trans: Process[I,O]): IndexedSeq[O] =
 		  trans match {
 		  	case Halt() => cleanup.run; acc  //停止状态,清理现场
 		  	case Emit(out,next) => go(tryOr(out +: acc)(cleanup), cleanup, step, next) //积累acc
 		  	case Await(iproc) => tryOr(step.run)(cleanup) match {
 		  		case None => cleanup.run; acc  //读完了清理现场
 		  		case si => go(acc,cleanup,step,iproc(si))  //读入元素作为Process输入来改变Process状态
 		  	}
 		  }
 		acquire map {res => go(IndexedSeq(),release(res),step(res),trans)} //开始读取
 	}

注意:无论读取完成或中途失败退出都会导致现场清理以防止资源漏出。可以推断这个interpreter还是很安全的。

与Source同样,我们还是用一个独立的类型Sink来代表数据接收端进行简单说明:

trait Sink[I] {
 	def <|[I2](p: Process[I2,I]): Sink[I2] //p的输出接到Sink的输入
 	def filter(f: I => Boolean): Sink[I] = this <| Process.filter(f)  //从p接收I
 	def map[I2](f: I2 => I): Sink[I2] = this <| Process.lift(f) //将接收的I2变成I
 	def take(n: Int): Sink[I] = this <| Process.take(n)  //从p接收前n个I
 	def takeWhile(f: I => Boolean): Sink[I] = this <| Process.takeWhile(f)
 	def drop(n: Int): Sink[I] = this <| Process.drop(n) //过滤掉首n个I
 	def dropWhile(f: I => Boolean): Sink[I] = this <| Process.dropWhile(f)
 }

这和Source trait及其相似。注意和Process连接是反向的:由p指向Sink。

同样,一个只写的资源实例如下:

case class ResourceW[R,I,I2](  //只写资源
   acquire: IO[R],   //资源使用门户, resource handle
   release: R => IO[Unit],  //清理函数
   rcvf: R => (I2 => IO[Unit]), //接收方式
   trans: Process[I,I2]  //处理过程
   ) extends Sink[I] {
   	def <|[I2](p: Process[I2,I]): Sink[I2] =
   	  ResourceW(acquire,release,rcvf,p |> trans)	//制造一个ResourceW实例,由p到trans
   }

这个也和ResourceR相似。还是与Process连接方式是反方向的:由p到trans。

以下是一个向文件写入的Sink组件:

object Sink {
 import java.io._
 	def file(fileName: String, append: Boolean = false): Sink[String] = //结果是Sink[String]。必须用interpreter来运算
 	  ResourceW(   //是一个ResourceW实例
 	  IO {new FileWriter(fileName,append) }, //创建FileWriter
 	  (w: FileWriter) => IO {w.close},  //释放FileWriter
 	  (w: FileWriter) => (s: String) => IO {w.write(s)},  //写入
 	  Process.passUnchanged    //不处理写入数据
 	  )
 }

在学习过程中发现,独立于Process类型的Source,Sink类型使IO算法的表达式类型的集成很困难。这也限制了组件的功能。我们无法实现泛函编程简洁高雅的表达形式。在下面的讨论中我们会集中精力分析具备数据源功能的Process,希望在表达方式上能有所进步。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-28 03:10:23

泛函编程(36)-泛函Stream IO:IO数据源-IO Source & Sink的相关文章

泛函编程(5)-数据结构(Functional Data Structures)

编程即是编制对数据进行运算的过程.特殊的运算必须用特定的数据结构来支持有效运算.如果没有数据结构的支持,我们就只能为每条数据申明一个内存地址了,然后使用这些地址来操作这些数据,也就是我们熟悉的申明变量再对变量进行读写这个过程了.试想想如果没有数据结构,那我们要申明多少个变量呢.所以说,数据结构是任何编程不可缺少的元素. 泛函编程使用泛函数据结构(Functional Data Structure)来支持泛函程序.泛函数据结构的特点是”不可变特性“(Immutability), 是泛函编程中函数组

泛函编程(35)-泛函Stream IO:IO处理过程-IO Process

IO处理可以说是计算机技术的核心.不是吗?使用计算机的目的就是希望它对输入数据进行运算后向我们输出计算结果.所谓Stream IO简单来说就是对一串按序相同类型的输入数据进行处理后输出计算结果.输入数据源可能是一串键盘字符.鼠标位置坐标.文件字符行.数据库纪录等.如何实现泛函模式的Stream IO处理则是泛函编程不可或缺的技术. 首先,我们先看一段较熟悉的IO程序: 1 import java.io._ 2 def linesGt4k(fileName: String): IO[Boolean

泛函编程(38)-泛函Stream IO:IO Process in action

在前面的几节讨论里我们终于得出了一个概括又通用的IO Process类型Process[F[_],O].这个类型同时可以代表数据源(Source)和数据终端(Sink).在这节讨论里我们将针对Process[F,O]的特性通过一些应用实例来示范它的组合性(composibility)和由数据源到接收终端IO全过程的功能完整性. 我们已经在前面的讨论中对IO Process的各种函数组合进行了调研和尝试,现在我们先探讨一下数据源设计方案:为了实现资源使用的安全性和IO程序的可组合性,我们必须保证无

泛函编程(37)-泛函Stream IO:通用的IO处理过程-Free Process

在上两篇讨论中我们介绍了IO Process:Process[I,O],它的工作原理.函数组合等.很容易想象,一个完整的IO程序是由 数据源+处理过程+数据终点: Source->Process->Sink所组成的.我们发现:Process[I,O]本身是无法兼顾Source和Sink的功能.而独立附加的Source和Sink又无法有效地与Process[I,O]进行函数组合(functional composition). 实际上Process[I,O]是一种固定单一输入类型(single

泛函编程(32)-泛函IO:IO Monad

由于泛函编程非常重视函数组合(function composition),任何带有副作用(side effect)的函数都无法实现函数组合,所以必须把包含外界影响(effectful)副作用不纯代码(impure code)函数中的纯代码部分(pure code)抽离出来形成独立的另一个纯函数.我们通过代码抽离把不纯代码逐步抽离向外推并在程序里形成一个纯代码核心(pure core).这样我们就可以顺利地在这个纯代码核心中实现函数组合.IO Monad就是泛函编程处理副作用代码的一种手段.我们先

泛函编程(30)-泛函IO:Free Monad-Monad生产线

在上节我们介绍了Trampoline.它主要是为了解决堆栈溢出(StackOverflow)错误而设计的.Trampoline类型是一种数据结构,它的设计思路是以heap换stack:对应传统递归算法运行时在堆栈上寄存程序状态,用Trampoline进行递归算法时程序状态是保存在Trampoline的数据结构里的.数据结构是在heap上的,所以可以实现以heap换stack的效果.这种以数据结构代替函数调用来解决问题的方式又为泛函编程提供了更广阔的发展空间. 我们知道,任何涉及IO的运算都会面临

泛函编程(14)-try to map them all

虽然明白泛函编程风格中最重要的就是对一个管子里的元素进行操作.这个管子就是这么一个东西:F[A],我们说F是一个针对元素A的高阶类型,其实F就是一个装载A类型元素的管子,A类型是相对低阶,或者说是基础的类型.泛函编程风格就是在F内部用对付A类的函数对里面的元素进行操作.但在之前现实编程中确总是没能真正体会这种编程模式畅顺的用法:到底应该在哪里用?怎么用?可能内心里还是没能摆脱OOP的思维方式吧.在前面Stream设计章节里,我们采用了封装形式的数据结构设计,把数据结构uncons放进了特质申明里

泛函编程(27)-泛函编程模式-Monad Transformer

经过了一段时间的学习,我们了解了一系列泛函数据类型.我们知道,在所有编程语言中,数据类型是支持软件编程的基础.同样,泛函数据类型Foldable,Monoid,Functor,Applicative,Traversable,Monad也是我们将来进入实际泛函编程的必需.在前面对这些数据类型的探讨中我们发现: 1.Monoid的主要用途是在进行折叠(Foldable)算法时对可折叠结构内元素进行函数施用(function application). 2.Functor可以对任何高阶数据类型F[_]

泛函编程(17)-泛函状态-State In Action

对OOP编程人员来说,泛函状态State是一种全新的数据类型.我们在上节做了些介绍,在这节我们讨论一下State类型的应用:用一个具体的例子来示范如何使用State类型.以下是这个例子的具体描述: 模拟一个自动糖果贩售机逻辑:贩售机有两种操作方法:投入硬币和扭动出糖旋钮.贩售机可以处于锁定和放开两种状态.模拟运作跟踪贩售机内当前的糖果和硬币数量.贩售机的操作逻辑要求如下: 1.如果机内有糖的话,投入硬币贩售机从锁定状态进入放开状态 2.在放开状态下扭动旋钮贩售机放出一块糖果后自动进入锁定状态 3