爱你不容易 —— Stream详解

作为前端,我们常常会和 Stream 有着频繁的接触。比如使用 gulp 对项目进行构建的时候,我们会使用 gulp.src 接口将匹配到的文件转为 stream(流)的形式,再通过 .pipe() 接口对其进行链式加工处理;

或者比如我们通过 http 模块创建一个 HTTP 服务:

const http = require(‘http‘);
http.createServer( (req, res) => {
  //...
}).listen(3000);

此处的 req 和 res 也属于 Stream 的消费接口(前者为 Readable Stream,后者为 Writable Stream)

事实上像上述的 req/res,或者 process.stdout 等接口都属于 Stream 的实例,因此较少存在情况,是需要我们手动引入 Stream 模块的,例如:

//demo1.js
‘use strict‘;
const Readable = require(‘stream‘).Readable;
const rs = Readable();
const s = ‘VaJoy‘;
const l = s.length;
let i = 0;
rs._read = ()=>{
    if(i == l){
        rs.push(‘ is my name‘);
        return rs.push(null)
    }
    rs.push(s[i++])
};
rs.pipe(process.stdout);

如果不太能读懂上述代码,或者对 Stream 的概念感到模糊,那么可以放轻松,因为本文会进一步地对 Stream 进行剖析,并且谈谈直接使用它可能会存在的一些问题(这也是为何 gulp 要使用 through2 的原因)

另外本文的实例均可在我的 github 仓库https://github.com/VaJoy/stream/获取到,读者可以自行下载和调试。

一. Stream的作用

在介绍 Stream(流)之前,我们先来看一个例子 —— 模拟服务器把本地某个文件内容吐给客户端:

//demo2
var http = require(‘http‘);
var fs = require(‘fs‘);

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + ‘/data.txt‘, function (err, data) {
        res.end(data);
    });
});
server.listen(3000);

这段代码虽然可以正常执行,但存在一个显著的问题 —— 对于每一个客户端的请求,fs.readFile 接口都会把整个文件都缓存到内存中去,然后才开始把数据吐给用户。那么当文件体积很大、请求也较多(且特别当请求来自慢速用户)的时候,服务器需要消耗很大的内存,导致性能低下。

然而这个问题,则正是 stream 发挥所长的地方。如前文提及的,res 是流对象,那我们正好可以将其利用起来:

var server2 = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + ‘/data.txt‘);
    stream.pipe(res);
});
server2.listen(4000);

在上方代码段里,fs.createReadStream 创建了 data.txt 的可读流(Readable Stream)。这里需要事先了解的是,流可以简单地分为“可读的(readable)”、“可写的(writable)”,或者“读写均可”三种类型,且所有的流都属于 EventEmitter 的实例

回到代码,对于创建的可读流,我们通过 .pipe() 接口来监听其 dataend 事件,并把 data.txt (的可读流)拆分成一小块一小块的数据(chunks),像流水一样源源不断地吐给客户端,而不再需要等待整个文件都加载到内存后才发送数据。

其中 .pipe 可以视为流的“管道/通道”方法,任何类型的流都会有这个 .pipe 方法去成对处理流的输入与输出。

为了方便理解,我们把上述两种方式(不使用流/使用流)处理为如下的情景(卧槽我好好一个前端为啥要P这么萌的图)

⑴ 不使用流:

⑵ 使用流:

由此可以得知,使用流(stream)的形式,可以大大提升响应时间,又能有效减轻服务器内存的压力。

二. Stream的分类

在上文我们曾提及到,stream 可以按读写权限来简单地分做三类,不过这里我们再细化下,可以把 stream 归为如下五个类别:

⑴ Readable Streams
⑵ Writable Streams
⑶ Transform Streams
⑷ Duplex Streams
⑸ Classic Streams

其中 Transform Streams 和 Duplex Streams 都属于即可读又可写的流,而最后一个 Classic Streams 是对 Node 古早版本上的 Stream 的一个统称。我们将照例对其进行逐一介绍。

2.1 Readable Streams

即可读流,通过 .pipe 接口可以将其数据传递给一个 writable、transform 或者 duplex流:

readableStream.pipe(dst)

常见的 Readable Streams 包括:

  • 客户端上的 HTTP responses
  • 服务端上的 HTTP requests
  • fs read streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • 子进程的 stdout 和 stderr
  • process.stdin

例如在前面 demo2 的代码段中,我们就使用了 fs.createReadStream 接口来创建了一个 fs read stream:

var server2 = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + ‘/data.txt‘);
    stream.pipe(res);
});
server2.listen(4000);

这里有个有趣的地方 —— 虽然 Readable Streams 称为可读流,但在将其传入一个消耗对象之前,它都是可写的:

var Readable = require(‘stream‘).Readable;

var rs = new Readable;
rs.push(‘servers ‘);
rs.push(‘are listening on\n‘);
rs.push(‘3000 and 4000\n‘);
rs.push(null);

rs.pipe(process.stdout);

执行结果:

在这段代码中,我们通过 readStream.push(data) 的形式往可读流里注入数据,并以 readStream.push(null) 来结束可读流。

不过这种写法有个弊端 —— 从使用 .push() 将数据注入 readable 流中开始,直到另一个东西(process.stdout)来消耗数据之前,这些数据都会存在缓存中。

这里有个内置接口 ._read()  可以用来处理这个问题,它是从系统底层开始读取数据流时才会不断调用自身,从而减少缓存冗余。

我们可以回过头来看 demo1 的例子:

‘use strict‘;
const Readable = require(‘stream‘).Readable;
const rs = Readable();
const s = ‘VaJoy‘;
const l = s.length;
let i = 0;
rs._read = ()=>{
    if(i == l){
        rs.push(‘ is my name‘);
        return rs.push(null)
    }
    rs.push(s[i++])
};
rs.pipe(process.stdout);

我们是在 ._read 方法中才使用 readStream.push(data) 往可读流里注入数据供下游消耗,从而提升流处理的性能。

这里也有个小问题 —— 上一句话所提到的“供下游消耗”,这个下游通常又会以怎样的形式来消耗可读流的呢?

首先,可以使用我们熟悉的 .pipe() 方法将可读流推送给一个消耗对象(writable、transform 或者 duplex流)

//ext1
const fs = require(‘fs‘);
const zlib = require(‘zlib‘);

const r = fs.createReadStream(‘data.txt‘);
const z = zlib.createGzip();
const w = fs.createWriteStream(‘data.txt.gz‘);
r.pipe(z).pipe(w);

其次,也可以通过监听可读流的“data”事件(别忘了文章前面提到的“所有的流都属于 EventEmitter 的实例”)来实现消耗处理 —— 在首次监听其 data 事件后,readStream 便会持续不断地调用 _read(),通过触发 data 事件将数据输出。当数据全部被消耗时,则触发 end 事件。

示例:

//demo3
const Readable = require(‘stream‘).Readable;

class ToReadable extends Readable {
    constructor(iterator) {
        super();
        this.iterator = iterator
    }
    _read() {
        const res = this.iterator.next();
        if (res.done) {
            // 迭代结束,顺便结束可读流
            this.push(null)
        }
        setTimeout(() => {
            // 将数据添加到流中
            this.push(res.value + ‘\n‘)
        }, 0)
    }
}

const gen = function *(a){
    let count = 5,
        res = a;
    while(count--){
        res = res*res;
        yield res
    }
};

const readable = new ToReadable(gen(2));

// 监听`data`事件,一次获取一个数据
readable.on(‘data‘, data => process.stdout.write(data));

// 可读流消耗完毕
readable.on(‘end‘, () => process.stdout.write(‘readable stream ends~‘));

执行结果为:

这里需要留意的是,在使用 .push() 往可读流里注入数据的代码段,我们使用了 setTimeout 将其包裹起来,这是为了让系统能有足够时间优先处理接收流结束信号的事务。当然你也可以改写为:

        if (res.done) {
            // 直接 return
            return this.push(null)
        }
        this.push(res.value + ‘\n‘)

2.2 Writable Streams

Writable(可写)流接口是对写入数据的目标的抽象:

src.pipe(writableStream)

常见的 Writable Streams 包括:

  • 客户端的 HTTP requests
  • 服务端的 HTTP responses
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • 子进程的 stdin
  • process.stdout 和 process.stderr

可写流有两个重要的方法:

  • writableStream.write(data) —— 往可写流里写入数据;
  • writableStream.end([data]) —— 停止写入数据,结束可写流。在调用 .end() 后,再调用 .write() 方法会产生错误。

另外,如同通过 readable._read() 方法可以处理可读流,我们可以通过 writable._write(chunk, enc, next) 方法在系统底层处理流写入的逻辑中,对数据进行处理。

其中参数 chunk 代表写进来的数据;参数 enc 代表编码的字符串;参数 next(err) 是一个回调函数,调用它可以告知消费者进行下一轮的数据流写入。

示例:

//demo4
const Writable = require(‘stream‘).Writable;
const writable = Writable();

writable._write = (chunck, enc, next) => {
    // 输出打印
    process.stdout.write(chunck.toString().toUpperCase());
    // 写入完成时,调用`next()`方法通知流传入下一个数据
    process.nextTick(next)
};

// 所有数据均已写入底层
writable.on(‘finish‘, () => process.stdout.write(‘DONE‘));

// 将一个数据写入流中
writable.write(‘a‘ + ‘\n‘);
writable.write(‘b‘ + ‘\n‘);
writable.write(‘c‘ + ‘\n‘);

// 再无数据写入流时,需要调用`end`方法
writable.end();

执行如下:

2.3 Duplex Streams

Duplex 是双工的意思,因此很容易猜到 Duplex 流就是既能读又能写的一类流,它继承了 Readable 和 Writable 的接口。

常见的 Duplex Streams 有:

  • TCP sockets
  • zlib streams
  • crypto streams

示例:

//demo5
const Duplex = require(‘stream‘).Duplex;
const duplex = Duplex();

duplex._read = function () {
    var date = new Date();
    this.push( date.getFullYear().toString() );
    this.push(null)
};

duplex._write = function (buf, enc, next) {
    console.log( buf.toString() + ‘\n‘ );
    next()
};

duplex.on(‘data‘, data => console.log( data.toString() ));

duplex.write(‘the year is‘);

duplex.end();

执行结果:

2.4 Transform Streams

Transform Stream 是在继承了 Duplex Streams 的基础上再进行了扩展,它可以把写入的数据和输出的数据,通过 ._transform 接口关联起来。

常见的 Transform Streams 有:

  • zlib streams
  • crypto streams

示例:

//demo6
const Transform = require(‘stream‘).Transform;
class SetName extends Transform {
    constructor(name) {
        super();
        this.name = name || ‘‘
    }
    // .write接口写入的数据,处理后直接从 data 事件的回调中可取得
    _transform(buf, enc, next) {
        var res = buf.toString().toUpperCase();
        this.push(res + this.name + ‘\n‘);
        next()
    }

}

var transform = new SetName(‘VaJoy‘);
transform.on(‘data‘, data => process.stdout.write(data));

transform.write(‘my name is ‘);
transform.write(‘here is ‘);
transform.end();

执行结果:

2.5 Classic Streams

在较早版本的 NodeJS 里,Stream 的实现相较简陋,例如上文提及的“Stream.Readable”接口均是从 Node 0.9.4 开始才有,因此我们往往需要对其进行多次封装扩展才能更好地用来开发。

而 Classic Streams 便是对这种古旧模式的 Stream 接口的统称。

需要留意的是,只要往任意一个 stream 注册一个“data”事件监听器,它就会自动切换到“classic”模式,并按照旧的 API 去执行。

classic 流可以当作一个带有 .pipe 接口的事件发射器(event emitter),当它要为消耗者提供数据时会发射“data”事件,当要结束生产数据时,则发射“end”事件。

另外只有当设置 Stream.readable 为 true 时,.pipe 接口才会将当前流视作可读流:

//demo7
var Stream = require(‘stream‘);
var stream = new Stream();
stream.readable = true; //告诉 .pipe 这是个可读流

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit(‘end‘);
    }
    else stream.emit(‘data‘, String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

另外,Classic readable streams 还有 .pause() 和 .resume() 两个接口可用于暂停/恢复流的读取:

createServer(function(q,s) {
  // ADVISORY only!
  q.pause()
  session(q, function(ses) {
    q.on(‘data‘, handler)
    q.resume()
  })
})

3. Object Mode

对于可读流来说,push(data) 时,data 的类型只能是 String 或Buffer,且消耗时 data 事件输出的数据类型都为 Buffer;

对于可写流来说,write(data) 时,data 的类型也只能是 String 或 Buffer,_write(data) 调用时所传进来的 data 类型都为 Buffer。

示例:

//demo8
writable._write = (chunck, enc, next) => {
    // 输出打印
    console.log(chunck);   //Buffer
    //console.log(chunck.toString());  //转为String

    process.nextTick(next)
};

writable.write(‘Happy Chinese Year‘);
writable.end();

执行结果:

不过,为了增强数据类型的灵活性,无论是可读流或是可写流,只需要往其构造函数里传入配置参数“{ objectMode: true }”,便可往流里传入/获取任意类型(null除外)的数据:

const objectModeWritable = Writable({ objectMode: true });

objectModeWritable._write = (chunck, enc, next) => {
    // 输出打印
    console.log(typeof chunck);
    console.log(chunck);
    process.nextTick(next)
};

objectModeWritable.write(‘Happy Chinese Year‘);
objectModeWritable.write( { year : 2017 } );
objectModeWritable.end( 2017 );

执行结果:

4. Stream的兼容问题

在前文我们介绍了 classic streams,它属于陈旧版本的 Node 上的 Stream 接口,可以把它称为 Streams1。而从 Node 0.10 开始,Stream 新增了系列实用的新接口,可以做更多除了 .pipe() 之外的事情,我们把其归类为 Streams2(事实上,在 Node 0.11+开始,Stream有些许新的变动,从该版本开始的 Stream 也可称为 Streams3)

那么这里存在一个问题 —— 那些使用了 Stream1 的项目(特别是 npm 包),想升级使用环境的 Node 版本到 0.10+,会否导致兼容问题呢?

还好 Streams2 虽然改头换面,但本质上是设计为向后兼容的。

打个比方,如果你同时推送了一条 Streams2 流和一条旧格式的、基于事件发射器的流,Stream2 将降级为旧模式(shim mode)来向后兼容。

但是,如果我们的开发环境使用的是 Node 0.8(且因为某些原因不能升级),但又想使用 Streams2 的API怎么办呢?或者比如 npm 上的某些开源的工具包,想要拥抱 Streams2 的便利,又想保持对使用 Node 0.8 的用户进行兼容处理,这样又得怎么处理?

针对上述问题,早在 Node 0.10 释放之前,Issacs 就把 Node-core 中操作 Stream 的核心接口独立拷贝了一份出来,开源到了 npm 上并持续更新,它就是 readable-stream

通过使用 readable-stream,我们就可以在那些核心里没有 Streams2/3 的低版本 Node 中,直接使用 Streams2/3:

var Readable = require(‘stream‘).Readable || require(‘readable-stream‘).Readable

readable-stream 现在有 v1.0.x 和 v1.1.x 两个主要版本,前者跟进 Streams2 的迭代,后者跟进 Streams3 的迭代,用户可以根据需求使用对应版本的包。

5. through2

readable-stream 虽然提供了一个 Streams 的兼容方案,但我们也希望能对 Stream 复杂的API进行精简。

through2 便基于 readable-stream 对 Stream 接口进行了封装,并提供了更简单和灵活的方法。

through2 会为你生成 Duplex Streams 来处理任意你想使用的流 —— 如前文介绍,Duplex 流继承了 writable 和 readable 流的接口,使用起来更方便。

来看下 through2 的示例:

//demo9
const fs = require(‘fs‘);
const through2 = require(‘through2‘);
fs.createReadStream(‘data.txt‘)
    .pipe(through2(function (chunk, enc, callback) {
        for (var i = 0; i < chunk.length; i++)
            if (chunk[i] == 97)
                chunk[i] = 122; // 把 ‘a‘ 替换为 ‘z‘

        this.push(chunk);

        callback()
    }))
    .pipe(fs.createWriteStream(‘out.txt‘))
    .on(‘finish‘, ()=> {
        console.log(‘DONE‘)
    });

使用 through2.obj 接口操作 Object Mode 下的流:

//demo10
const fs = require(‘fs‘);
const through2 = require(‘through2‘);
const csv2 = require(‘csv2‘);

let all = [];

fs.createReadStream(‘list.csv‘)
    .pipe(csv2())
    // through2.obj(fn) 是 through2({ objectMode: true }, fn) 的简易封装
    .pipe(through2.obj(function (chunk, enc, callback) {
        var data = {
            name: chunk[0],
            sex: chunk[1],
            addr: chunk[2]
        };
        this.push(data);

        callback()
    }))
    .on(‘data‘, function (data) {
        all.push(data)
    })
    .on(‘end‘, function () {
        console.log(all)
    });

对比原生的 Stream API,through2 简洁了不少,加上有 readable-stream 依赖加持,也很好理解为何像 gulp 及其插件都会使用 through2 来操作和处理 stream 了。

以上是本文对 Stream 的一个介绍,但事实上 Stream 还有许多未露面的 API,感兴趣的同学可以直接阅读官方 API文档做进一步了解。共勉~

Reference

⑴ Stream API Doc - https://nodejs.org/api/stream.html

⑵ stream-handbook - https://github.com/substack/stream-handbook

⑶ Node.js Stream - 基础篇 - http://www.cnblogs.com/zapple/p/5759670.html

⑷ Why I don‘t use Node‘s core ‘stream‘ module - https://r.va.gg/2014/06/why-i-dont-use-nodes-core-stream-module.html

时间: 2024-10-13 04:52:18

爱你不容易 —— Stream详解的相关文章

TS Stream 详解

<什么是TS> TS(transport stream) , TS流文件,是一种DVD的文件格式,TS格式的特点就是要求从视频流的任一片段开始都是可以独立解码的,这种特性就决定了TS流文件主要用来实时传送的节目,比如实时广播的电视节目.与之相互对应的是PS(Program Stream),PS主要应用于存储的具有固定时长的节目,如DVD电影,将DVD上的VOB文件的前面一截剪掉(或者干脆就是数据损坏),那么就会导致整个文件无法解码. <TS码流数据封装格式> 网络上常见的TS码流结

Java8之Stream详解

Java8中提供了Stream对集合操作作出了极大的简化,学习了Stream之后,我们以后不用使用for循环就能对集合作出很好的操作. 一.流的初始化与转换 Java中的Stream的所有操作都是针对流的,所以,使用Stream必须要得到Stream对象: 1.初始化一个流 Stream stream = Stream.of("a", "b", "c"); 2.数组转换为一个流 String [] strArray = new String[]

Java8初体验(二)Stream语法详解

原文链接:http://ifeve.com/stream/ 1. Stream初体验 我们先来看看Java里面是怎么定义Stream的: A sequence of elements supporting sequential and parallel aggregate operations. 我们来解读一下上面的那句话: Stream是元素的集合,这点让Stream看起来用些类似Iterator: 可以支持顺序和并行的对原Stream进行汇聚的操作: 大家可以把Stream当成一个高级版本的

转载爱哥自定义View系列--Paint详解

上图是paint中的各种set方法 这些属性大多我们都可以见名知意,很好理解,即便如此,哥还是带大家过一遍逐个剖析其用法,其中会不定穿插各种绘图类比如Canvas.Xfermode.ColorFilter等等的用法. set(Paint src) 顾名思义为当前画笔设置一个画笔,说白了就是把另一个画笔的属性设置Copy给我们的画笔,不累赘了 setARGB(int a, int r, int g, int b) 不扯了,别跟我说不懂 setAlpha(int a) 同上 setAntiAlias

转载爱哥自定义View系列--文字详解

FontMetrics FontMetrics意为字体测量,这么一说大家是不是瞬间感受到了这玩意的重要性?那这东西有什么用呢?我们通过源码追踪进去可以看到FontMetrics其实是Paint的一个内部类,而它里面呢就定义了top,ascent,descent,bottom,leading五个成员变量其他什么也没有: 这五个成员变量除了top和bottom我们较熟悉外其余三个都很陌生是做什么用的呢?首先我给大家看张图: 这张图很简单但是也很扼要的说明了top,ascent,descent,bot

Node.js开发入门—Stream用法详解

Stream是Node.js中非常重要的一个模块,应用广泛.一个流是一个具备了可读.可写或既可读又可写能力的接口,通过这些接口,我们可以和磁盘文件.套接字.HTTP请求来交互,实现数据从一个地方流动到另一个地方的功能. 所有的流都实现了EventEmitter的接口,具备事件能力,通过发射事件来反馈流的状态.比如有错误发生时会发射"error"事件,有数据可被读取时发射"data"事件.这样我们就可以注册监听器来处理某个事件,达到我们的目的. Node.js定义了R

JAVA8之lambda表达式详解,及stream中的lambda使用

原文:http://blog.csdn.net/jinzhencs/article/details/50748202 lambda表达式详解 一.问题 1.什么是lambda表达式? 2.lambda表达式用来干什么的? 3.lambda表达式的优缺点? 4.lambda表达式的使用场景? 5.lambda只是一个语法糖吗? 二.概念 lambda表达式是JAVA8中提供的一种新的特性,它支持Java也能进行简单的"函数式编程". 它是一个匿名函数,Lambda表达式基于数学中的λ演算

【转】Java8初体验(二)Stream语法详解

原文链接 http://ifeve.com/stream/ Java8初体验(二)Stream语法详解 感谢同事[天锦]的投稿.投稿请联系 [email protected]上篇文章Java8初体验(一)lambda表达式语法比较详细的介绍了lambda表达式的方方面面,细心的读者会发现那篇文章的例子中有很多Stream的例子.这些Stream的例子可能让你产生疑惑,本文将会详细讲解Stream的使用方法(不会涉及Stream的原理,因为这个系列的文章还是一个快速学习如何使用的). 1. Str

Java 8 Stream API详解

Java 8 Stream API详解 一.Stream API介绍 Java 8引入了全新的Stream API,此Stream与Java I/O包里的InputStream和OutputStream是完全不同的概念,它不同于StAX对XML解析的Stream,也不同于Amazon Kinesis对大数据实时处理的Stream.Stream API更像具有Iterable的集合类,但行为和集合类又有所不同,它是对集合对象功能的增强,专注于对集合对象进行各种非常便捷.高效的聚合操作或大批量数据操