node 中的 stream

什么是 stream

Stream 借鉴自 Unix 编程哲学中的 pipe。

Unix shell 命令中觉的管道流式操作 | 将上一个命令的输出作为下一个命令的输入。node stream 中则是通过 .pip() 方法来进行的。

一个 stream 的运用场景。从服务器读取文件并返回给页面。

  • 朴素的实现:
var http = require(‘http‘);
var fs = require(‘fs‘);

var server = http.createServer(function (req, res) {
    fs.readFile(__dirname + ‘/data.txt‘, function (err, data) {
        res.end(data);
    });
});
server.listen(8000);
  • stream 实现:
var http = require(‘http‘);
var fs = require(‘fs‘);

var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream(__dirname + ‘/data.txt‘);
    stream.pipe(res);
});
server.listen(8000);

好处:

  • 代码更加简洁
  • 可自由组合各种模块处理数据

stream 的种类

分五种:

  • readable
  • writable
  • duplex
  • transform
  • classic

readable

readable 类型的流产生数据,可通过 .pip() 输送到能够消费流数据的地方,比如 writable,transform,duplex

一个 readable stream 示例:

var Readable = require(‘stream‘).Readable;

var rs = new Readable;
rs.push(‘beep ‘);
rs.push(‘boop\n‘);
rs.push(null);

rs.pipe(process.stdout);

运行结果:

$ node read0.js
beep boop

_read 方法与按需输出

上面 rs.push(null) 表示没有更多数据了。

上面从代码直接将数据塞入到 readable 流中,然后被缓冲起来,直到被消费。因为消费者有可能并不能立即消费这些内容,直接 push 数据后消耗不必要的资源。

更好的做法是,让 readable 流只在消费者需要数据的时候再 push。这是通过定义能 raedable 对象定义 ._read 方法来完成的。

var Readable = require(‘stream‘).Readable;
var rs = Readable();

var c = 97;
rs._read = function () {
    rs.push(String.fromCharCode(c++));
    if (c > ‘z‘.charCodeAt(0)) rs.push(null);
};

rs.pipe(process.stdout);

运行结果:

$ node read1.js
abcdefghijklmnopqrstuvwxyz

这种方式下,定义了 readable 流产生数据的方法 ._read,但并没有马上执行并输出数据,而是在 process.stdout 读取时,才调用输出的。

_read 方法可动态接收一个可选的 size 参数,由消费方指定一次读取想要多少字节的数据,当然,_read 方法的实现中是可以忽略这个入参的。

下面的示例可证明 _read 方法是消费方调用的时候才执行的,而不是主动执行。

var Readable = require(‘stream‘).Readable;
var rs = Readable();

var c = 97 - 1;

rs._read = function () {
    if (c >= ‘z‘.charCodeAt(0)) return rs.push(null);

    setTimeout(function () {
        rs.push(String.fromCharCode(++c));
    }, 100);
};

rs.pipe(process.stdout);

process.on(‘exit‘, function () {
    console.error(‘\n_read() called ‘ + (c - 97) + ‘ times‘);
});
process.stdout.on(‘error‘, process.exit);

输出任意数据

上面展示的是输出简单字符串,如果需要输出其他复杂数据,初始化时设置上正确的 objectMode 参数,Readable({ objectMode: true })

消费 readable 流产生的数据

这一段没看太懂

writable 流

writable 流可作为 .pip() 的对象。

src.pipe(writableStream)

创建 writable 流

需要实现 ._write(chunk, enc, next) 方法,其中:

  • chunk 为接收到的数据
  • encopts.decodeStringfalse 且收到的数据这字符串时,它表示字符串的编码
  • next(err) 数据处理后的回调,可传递一个错误信息以表示数据处理失败

默认情况下,获取到的字符串数据会转为 Buffer,可设置 Writable({ decodeStrings: false }) 来获取字符串数据。

一个 writable 示例:

var Writable = require(‘stream‘).Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
    console.dir(chunk);
    next();
};

process.stdin.pipe(ws);

向 writable 流写入数据

通过调用 writable 流的 write 方法来写入。

process.stdout.write(‘beep boop\n‘);

通过调用 end() 来结束数据的写入。

var fs = require(‘fs‘);
var ws = fs.createWriteStream(‘message.txt‘);

ws.write(‘beep ‘);

setTimeout(function () {
    ws.end(‘boop\n‘);
}, 1000);

duplex

双工类型的流,同时具有 writable 和 readable 流的功能。node 内建的 zlib,TCP sockets 以及 crypto 都是双工类型的。

所以可对双工类型的流进行如下操作:

a.pip(b).pip(a)

transform

一种特殊类型的双工流,区别在于 transform 类型其输出是输入的转换。跟它的名字一样,这里面对数据进行一些转换后输出。比如,通过 zlib.createGzip 来对数据进行 gzip 的压缩。有时候也将这种类型的流称为 through steam

classic stream

这里指使用旧版 api 的流。当一个流身上绑定了 data 事件的监听时,便会回退为经典旧版的流。

classic readable stream

当有数据时它会派发 data 事件,数据输出结束时派发 end 事件给消费者。

.pipe() 通过检查 stream.readable 以判断该流是否是 readable 类型。

classic readable 流的创建

一个 classic readable 流的创建示例:

var Stream = require(‘stream‘);
var stream = new Stream;
stream.readable = true;

var c = 64;
var iv = setInterval(function () {
    if (++c >= 75) {
        clearInterval(iv);
        stream.emit(‘end‘);
    }
    else stream.emit(‘data‘, String.fromCharCode(c));
}, 100);

stream.pipe(process.stdout);

从 classic readable 流读取数据

数据读取是通过监听流上的 dataend 事件。

一个从 classic readable 流读取数据的示例:

process.stdin.on(‘data‘, function (buf) {
    console.log(buf);
});
process.stdin.on(‘end‘, function () {
    console.log(‘__END__‘);
});

一般不建议通过这种方式来操作,一旦给流绑定 data 事件处理器,即回退到旧的 api 来使用流。如果真的有兼容操作旧版流的需求,应该通过 throughconcat-stream 来进行。

classic writable stream

只需要实现 .write(buf).end(buf) 及 .destroy() 方法即可,比较简单。

内建的流对象

总结

本质上,所有流都是 EventEmitter,通过事件可写入和读取数据。但通过新的 stream api,可方便地通过 .pipe() 方法来使用流而不是事件的方式。

参考

原文地址:https://www.cnblogs.com/Wayou/p/node_stream.html

时间: 2024-10-10 11:37:17

node 中的 stream的相关文章

node中的Readable - flowing/non-flowing mode

大家都知道在node中Readable Stream有两种模式: flowing mode和non-flowing mode. 对于flowing mode的Readable Stream, 我们是没法控制它何时去读数据读多少的,它会去尽快的去消耗data,并emit出来. 1 // in lib/_stream_readable.js 2 if (state.flowing && state.length === 0 &&!state.sync) { 3 stream.e

Node.js 【Stream之笔记】

从Node.js API文档中可知, 'A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter.''流是很多

Node.js:Stream(流)

Stream 是一个抽象接口,Node 中有很多对象实现了这个接口.例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出). Node.js,Stream 有四种流类型: Readable - 可读操作. Writable - 可写操作. Duplex - 可读可写操作. Transform - 操作被写入数据,然后读出结果. 所有的 Stream 对象都是 EventEmitter 的实例.常用的事件有: data - 当有数据可读时触发.

node 进阶 | 通过node中如何捕获异常阐述express的特点

node如何捕获异常 node基于js的单线程,有了非阻塞异步回调的概念,但是在处理多个并发连接时,并发环境要求高,最重要的是单线程,单核CPU,一个进程crash则web服务都crash,但是为什么node还这么火?甚至有了Node工程师这个岗,肯定就是node有自己crash之前与之后的解决方法,比如捕获异常 问:nodejs如何捕获异常?答:回调函数中有err形参,console.log出来,这是我之前回答别人问题的答案,但是自从我这几天看了如何捕获异常,才知道捕获异常的精髓就是不要让服务

重回博客 谈一谈Node中的异步和单线程

重回博客,这个帐号之前注册后就只发了一篇博客.听朋友建议,决定一周两次更新. 第一篇谈论一下最近想的比较多的异步的问题. 传统多线程异步 传统的异步是多线程的,当要同时做两件事的时候,他们是执行在不同的线程里的.这就像是柜台卖东西,来了一个人就得找一个员工陪他,直到这个人走了这个员工才能接待下一个客人.店内的员工就像线程池里的空闲线程,空闲的时候可以去接待客人,可是同时只能接待一个人,要接待其他人就得找另外一个人. 电脑里的线程相当于一个员工团队,哪里需要去哪里.多线程的异步好处在于可以更多的占

Cocos2d-x 3.0final 终结者系列教程09-绘图节点Node中的Schedule

如何让HelloWorld项目中的HelloWorld文字实现自动运动呢? 有的童鞋会想到使用线程,不断修改Label的Position, 这样不行,因为在Cocos2d-x中只能在主线程中来修改Node中的信息,这是由于所有的node都是非线程安全的,如果我们的场景移除了node 在子线程种可能引用错误,所以,要让Node执行特定的变化,需要在当前的Node中使用Schedule 使用方法很简单 1.在当前的HelloWorldScne.h中添加一个方法在HelloWorldScene 如:

Cocos2d-x 3.0final 终结者系列教程10-绘图节点Node中的Action

Action是作用在Node上的逻辑处理,比如让Node移动.旋转.缩放.变色.跳跃.翻转.透明等等,都有相对应的Action Action如何在Node上使用 1.定义Action对象 如 auto act=MoveTo::create(Point(30,0),1); 2.在Node上执行runAction auto sp=Sprite::create("npc.png"); sp->runAction(act); 这样就实现了在sp这个Node上执行移动到30,0这个坐标的动

Node中的定时器详解

在大多数的业务中,我们都会有一些需求,例如几秒钟实现网页的跳转,几分钟对于后台数据进行清理,node与javascript都具有将代码延迟一段时间的能力.在node中可以使用三种方式实现定时功能:超时时间,时间间隔和即时定时器.虽然有这三种定时器功能但是在平常的业务中使用还是有差别的,下来我们就一起讨论一下这三种定时器. 1.用超时时间来延迟工作 超时定时器用于将工作延迟一个特定的时间数量,当时间到了,回调函数执行,而定时器会消失.(建议:对于只执行一次的工作,使用超时时间). 1秒之后执行my

Node中http模块详解(服务端篇)

Node中的Http Node中提供了http模块,其中封装了高效的http服务器和http客户端 http.server是一个基于事件的HTTP服务器,内部是由c++实现的,接口由JavaScript封装 http.request是一个HTTP客户端工具.用户向服务器发送数据. 下面就来分别得介绍一下http的服务端和客户端 一.HTTP服务器 http.Server实现的,它提供了一套封装级别很低的API,仅仅是流控制和简单的解析,所有的高层功能都需要通过它的接口,就像在前面的文章<Node