FunDA(3)- 流动数据行操作:FDAPipeLine operations using scalaz-stream-fs2

在上节讨论里我们介绍了数据行流式操作的设想,主要目的是把后台数据库的数据载入前端内存再拆分为强类型的数据行,这样我们可以对每行数据进行使用和处理。形象点描述就是对内存里的一个数据流(data-stream)进行逐行操作。我们在上节用foreach模拟了一个流控来示范数据行的操作处理。在这节我们讨论一下用scalaz-stream-fs2作为数据流管理工具来实现FunDA的数据行流动管理功能。fs2的Stream是一种自然的拖动型(pull-model)数据流。而fs2的Pipe类型则像是管道的阀门(valve),我们可以在Pipe里截获流动中的数据行。我们看看下面的fs2 Stream例子:

1   def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
2     _.evalMap {row => Task.delay {println(s"$prompt> $row"); row}}
3                                                   //> log: [ROW](prompt: String)fs2.Pipe[fs2.Task,ROW,ROW]
4   Stream.range(1,5).through(log("")).run.unsafeRun//> > 1
5                                                   //| > 2
6                                                   //| > 3
7                                                   //| > 4

函数log是个Pipe类型。我们看到Pipe类型可以截获Stream中的流动元素,在函数log里我们通过evalMap来立即运算了println把当前的元素内容显示出来。所以我们并没有用runLog来收集Stream的元素(runLog也只能在完成所有元素的收集后才能显示结果)。

按照FunDA设计要求:从后台数据库中读取数据、载入内存然后逐行进行处理,那么我们可以用这个Pipe类型来实现数据的逐行处理,包括控制数据流动以及任意插入一些自定义数据元素。下面我们就试试通过定义Pipe类型的不同功能来实现行数据处理:

 1   def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => {
 2     def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => {
 3       h.receive1Option {
 4         case Some((r,h)) => if ( 3 == r) Pull.done
 5                             else Pull.output1(r) >> go(h)
 6         case None => Pull.done
 7       }
 8     }
 9     in.pull(go)
10   }                                               //> stopOn3: [ROW]=> fs2.Pipe[fs2.Task,ROW,ROW]
11   Stream(4,2,9,3,8,1)
12    .through(log("before"))
13    .through(stopOn3)
14    .through(log("after"))
15    .run
16    .unsafeRun                                     //> before> 4
17                                                   //| after> 4
18                                                   //| before> 2
19                                                   //| after> 2
20                                                   //| before> 9
21                                                   //| after> 9
22                                                   //| before> 3

stopOn3是个自定义Pipe。它的功能是截取当前元素、检查当前元素值、如果遇到3则终止数据流。从运算结果看:当before> 3时数据流停止流动(停止向下游发送元素)。虽然成功地实现了它的目的,函数stopOn3的设计者必须对fs2有较深的了解。而对于FunDA的终端用户来说不要说需要掌握fs2的运算机制,就连那些复杂的fs2类型就已经不可接受了。我想了一下:如果我们提供一个像stopOn3这样的Pipe函数、由用户提供有关的功能函数作为传入参数,这样的方式应该有比较大的接收空间。我们先从类型开始:重新模拟一套简明的与fs2类型相对应的FunDA类型:

1   //数据处理管道
2   type FDAPipeLine[ROW] = Stream[Task,ROW]
3   //数据作业节点
4   type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW]
5   //数据管道开关阀门,从此处获得管道内数据
6   type FDAValve[ROW] = Handle[Task,ROW]
7   //管道连接器
8   type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit]

下面是用这些类型向用户提供的帮助函数(helpers):

 1   //库提供:停止数据流动
 2   def fda_haltFlow = Pull.done                        //> fda_haltFlow: => fs2.Pull[Nothing,Nothing,Nothing]
 3   //库提供:向下游发送一个ROW
 4   def fda_sendRow[ROW](row: ROW) = Pull.output1(row)  //> fda_sendRow: [ROW](row: ROW)fs2.Pull[Nothing,ROW,Unit]
 5   //库提供:处理当前数据。运行用户提供的功能wf
 6   def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = {
 7     def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
 8       h.receive1Option {
 9         case Some((r,h)) => wf(r) >> go(h)
10         case None => fda_haltFlow
11       }
12     }
13     in => in.pull(go)
14   }   //> fda_doWork: [ROW](wf: ROW => demo.ws.FDAPipe.FDAPipeJoint[ROW])demo.ws.FDAPipe.FDAWorkNode[ROW]

现在看来貌似一旦用户可以提供一个ROW => FDAPipeJoint[ROW]函数,就可以用fda_doWork函数来运算这个函数了。我们按上面例子的功能要求来设计一个这样的函数:

 1  //样板用户提供数据处理功能函数
 2   def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => {
 3      if (3 == row ) fda_haltFlow
 4      else fda_sendRow(row)
 5   }                                               //> breakOn3: [ROW]=> ROW => demo.ws.FDAPipe.FDAPipeJoint[ROW]
 6   //测试运算
 7   Stream(4,2,9,3,8,1)
 8    .through(log("before"))
 9    .through(fda_doWork(breakOn3))
10    .through(log("after"))
11    .run
12    .unsafeRun                                     //> before> 4
13                                                   //| after> 4
14                                                   //| before> 2
15                                                   //| after> 2
16                                                   //| before> 9
17                                                   //| after> 9
18                                                   //| before> 3

成功实现功能。下面是这篇讨论中的示范代码:

 1 import fs2._
 2 object FDAPipe {
 3   def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
 4     _.evalMap {row => Task.delay {println(s"$prompt> $row"); row}}
 5   Stream.range(1,5).through(log("")).run.unsafeRun
 6   def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => {
 7     def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => {
 8       h.receive1Option {
 9         case Some((r,h)) => if ( 3 == r) Pull.done
10                             else Pull.output1(r) >> go(h)
11         case None => Pull.done
12       }
13     }
14     in.pull(go)
15   }
16   Stream(4,2,9,3,8,1)
17    .through(log("before"))
18    .through(stopOn3)
19    .through(log("after"))
20    .run
21    .unsafeRun
22   //数据处理管道
23   type FDAPipeLine[ROW] = Stream[Task,ROW]
24   //数据作业节点
25   type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW]
26   //数据管道开关阀门,从此处获得管道内数据
27   type FDAValve[ROW] = Handle[Task,ROW]
28   //管道连接器
29   type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit]
30
31   //库提供:停止数据流动
32   def fda_haltFlow = Pull.done
33   //库提供:向下游发送一个ROW
34   def fda_sendRow[ROW](row: ROW) = Pull.output1(row)
35   //库提供:处理当前数据。运行用户提供的功能wf
36   def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = {
37     def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
38       h.receive1Option {
39         case Some((r,h)) => wf(r) >> go(h)
40         case None => fda_haltFlow
41       }
42     }
43     in => in.pull(go)
44   }
45   //用户提供数据处理功能函数
46   def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => {
47      if (3 == row ) fda_haltFlow
48      else fda_sendRow(row)
49   }
50   //测试运算
51   Stream(4,2,9,3,8,1)
52    .through(log("before"))
53    .through(fda_doWork(breakOn3))
54    .through(log("after"))
55    .run
56    .unsafeRun
57 }
时间: 2024-10-12 13:52:24

FunDA(3)- 流动数据行操作:FDAPipeLine operations using scalaz-stream-fs2的相关文章

MySQL-6-语句数据行操作

语句数据行操作 今日内容 SQL语句数据行操作补充 create table tb12( id int auto_increment primary key, name varchar(32), age int )engine=innodb default charset=utf8; 增 insert into tb11(name,age) values('alex',12); insert into tb11(name,age) values('alex',12),('root',18); i

42 外键 数据行的操作

外键; create table department ( id int auto_increment primary key, depart_name varchar(32) not null default '' )engine=Innodb charset=utf8; nsert into department (depart_name) values ('公关'), ('关关'),('关公'); create table userinfo ( id int auto_increment

设置mysql数据表列自动递增以及数据行插入操作

创建mysql数据表,设置id列递增.主键create table running_log ( id int primary key auto_increment, routename varchar(255), log varchar(255), time datetime ); 往有递增数据列的数据表插入新的数据行 1.INSERT INTO `running_log` (`id`, `routename`, `log`, `time`) VALUES ('null', 'yunnan-10

JAVASE02-Unit08: 文本数据IO操作 、 异常处理

Unit08: 文本数据IO操作 . 异常处理 * java.io.ObjectOutputStream * 对象输出流,作用是进行对象序列化 package day08; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; /** * java.io.Ob

oracle命令行操作

exp zdxk/[email protected] TABLES=(ms_data_dictory_def,ms_static_data_def) file=c:\staticdata.dmp 通过输入 EXP 命令和您的用户名/口令, 导出操作将提示您输入参数: 例如: EXP SCOTT/TIGER 或者, 您也可以通过输入跟有各种参数的 EXP 命令来控制导出的运行方式.要指定参数, 您可以使用关键字: 格式: EXP KEYWORD=value 或 KEYWORD=(value1,va

iOS开发UI篇—自定义瀑布流控件(蘑菇街数据刷新操作)

iOS开发UI篇—自定义瀑布流控件(蘑菇街数据刷新操作) 一.简单说明 使用数据刷新框架: 该框架提供了两种刷新的方法,一个是使用block回调(存在循环引用问题,_ _weak),一个是使用调用. 问题:在进行下拉刷新之前,应该要清空之前的所有数据(在刷新数据这个方法中). 移除正在显示的cell: (1)把字典中的所有的值,都从屏幕上移除 (2)清除字典中的所有元素 (3)清除cell的frame,每个位置的cell的frame都要重新计算 (4)清除可复用的缓存池. 该部分的代码如下: 1

9.数据的操作

数据操作能力是大数据分析至关重要的能力.数据操作主要包括:更改(exchange),移动(moving),排序(sorting),转换(transforming).Hive提供了诸多查询语句,关键字,操作和方法来进行数据操作. 一. 数据更改数据更改主要包括:LOAD, INSERT, IMPORT, and EXPORT 1. LOAD DATAload关键字的作用是将数据移动到hive中.如果是从HDFS加载数据,则加载成功后会删除源数据:如果是从本地加载,则加载成功后不会删除源数据. 数据

如何处理大量数据并发操作

文件缓存,数据库缓存,优化sql,数据分流,数据库表的横向和纵向划分,优化代码结构! 锁述的概 一. 为什么要引入锁 多个用户同时对数据库的并发操作时会带来以下数据不一致的问题: 丢失更新 A,B两个用户读同一数据并进行修改,其中一个用户的修改结果破坏了另一个修改的结果,比如订票系统 脏读 A用户修改了数据,随后B用户又读出该数据,但A用户因为某些原因取消了对数据的修改,数据恢复原值,此时B得到的数据就与数据库内的数据产生了不一致 不可重复读 A用户读取数据,随后B用户读出该数据并修改,此时A用

利用数据集进行数据访问操作

数据访问有两种不同的方式,一种是用Connection, Command , DataReader来进行数据对数据的操作,另一种是用DataAdapter(适配器)来进行数据操作,而数据则一般放在内存中的数据集合DataSet,这种方式可以在内存中对数据操作,然后在合适的时间再将修改传到数据库. 一. 建立数据集: 1. 在解决方案右键添加新建项,找到数据中的数据集: 2. 在服务器资源管理器里数据连接添加连接: 3. 之后找到数据库里面需要用到的表,选中之后拖拽至右侧数据集里: 这样一个数据集