Node.js Stream-基础篇

Node.js Stream - 基础篇

邹斌 ·2016-07-08 11:51

背景

在构建较复杂的系统时,通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务。譬如,shell通过管道|连接各部分,其输入输出的规范是文本流。

Node.js中,内置的Stream模块也实现了类似功能,各部分通过.pipe()连接。

鉴于目前国内系统性介绍Stream的文章较少,而越来越多的开源工具都使用了Stream,本系列文章将从以下几方面来介绍相关内容:

  1. 流的基本类型,以及Stream模块的基本使用方法
  2. 流式处理与back pressure的工作原理
  3. 如何开发流式程序,包括对GulpBrowserify的剖析,以及一个实战示例。

本文为系列文章的第一篇。

流的四种类型

Stream提供了以下四种类型的流:

var Stream = require(‘stream‘)

var Readable = Stream.Readable
var Writable = Stream.Writable
var Duplex = Stream.Duplex
var Transform = Stream.Transform

使用Stream可实现数据的流式处理,如:

var fs = require(‘fs‘)
// `fs.createReadStream`创建一个`Readable`对象以读取`bigFile`的内容,并输出到标准输出
// 如果使用`fs.readFile`则可能由于文件过大而失败
fs.createReadStream(bigFile).pipe(process.stdout)

Readable

创建可读流。

实例:流式消耗迭代器中的数据。

‘use strict‘
const Readable = require(‘stream‘).Readable

class ToReadable extends Readable {
  constructor(iterator) {
    super()
    this.iterator = iterator
  }

  // 子类需要实现该方法
  // 这是生产数据的逻辑
  _read() {
    const res = this.iterator.next()
    if (res.done) {
      // 数据源已枯竭,调用`push(null)`通知流
      return this.push(null)
    }
    setTimeout(() => {
      // 通过`push`方法将数据添加到流中
      this.push(res.value + ‘\n‘)
    }, 0)
  }
}

module.exports = ToReadable

实际使用时,new ToReadable(iterator)会返回一个可读流,下游可以流式的消耗迭代器中的数据。

const iterator = function (limit) {
  return {
    next: function () {
      if (limit--) {
        return { done: false, value: limit + Math.random() }
      }
      return { done: true }
    }
  }
}(1e10)

const readable = new ToReadable(iterator)

// 监听`data`事件,一次获取一个数据
readable.on(‘data‘, data => process.stdout.write(data))

// 所有数据均已读完
readable.on(‘end‘, () => process.stdout.write(‘DONE‘))

执行上述代码,将会有100亿个随机数源源不断地写进标准输出流。

创建可读流时,需要继承Readable,并实现_read方法。

  • _read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。
  • _read方法中,通过调用push(data)将数据放入可读流中供下游消耗。
  • _read方法中,可以同步调用push(data),也可以异步调用。
  • 当全部数据都生产出来后,必须调用push(null)来结束可读流。
  • 流一旦结束,便不能再调用push(data)添加数据。

可以通过监听data事件的方式消耗可读流。

  • 在首次监听其data事件后,readable便会持续不断地调用_read(),通过触发data事件将数据输出。
  • 第一次data事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中)。
  • 当数据全部被消耗时,会触发end事件。

上面的例子中,process.stdout代表标准输出流,实际是一个可写流。下小节中介绍可写流的用法。

Writable

创建可写流。

前面通过继承的方式去创建一类可读流,这种方法也适用于创建一类可写流,只是需要实现的是_write(data, enc, next)方法,而不是_read()方法。

有些简单的情况下不需要创建一类流,而只是一个流对象,可以用如下方式去做:

const Writable = require(‘stream‘).Writable

const writable = Writable()
// 实现`_write`方法
// 这是将数据写入底层的逻辑
writable._write = function (data, enc, next) {
  // 将流中的数据写入底层
  process.stdout.write(data.toString().toUpperCase())
  // 写入完成时,调用`next()`方法通知流传入下一个数据
  process.nextTick(next)
}

// 所有数据均已写入底层
writable.on(‘finish‘, () => process.stdout.write(‘DONE‘))

// 将一个数据写入流中
writable.write(‘a‘ + ‘\n‘)
writable.write(‘b‘ + ‘\n‘)
writable.write(‘c‘ + ‘\n‘)

// 再无数据写入流时,需要调用`end`方法
writable.end()
  • 上游通过调用writable.write(data)将数据写入可写流中。write()方法会调用_write()data写入底层。
  • _write中,当数据成功写入底层后,必须调用next(err)告诉流开始处理下一个数据。
  • next的调用既可以是同步的,也可以是异步的。
  • 上游必须调用writable.end(data)来结束可写流,data是可选的。此后,不能再调用write新增数据。
  • end方法调用后,当所有底层的写操作均完成时,会触发finish事件。

Duplex

创建可读可写流。

Duplex实际上就是继承了ReadableWritable的一类流。
所以,一个Duplex对象既可当成可读流来使用(需要实现_read方法),也可当成可写流来使用(需要实现_write方法)。

var Duplex = require(‘stream‘).Duplex

var duplex = Duplex()

// 可读端底层读取逻辑
duplex._read = function () {
  this._readNum = this._readNum || 0
  if (this._readNum > 1) {
    this.push(null)
  } else {
    this.push(‘‘ + (this._readNum++))
  }
}

// 可写端底层写逻辑
duplex._write = function (buf, enc, next) {
  // a, b
  process.stdout.write(‘_write ‘ + buf.toString() + ‘\n‘)
  next()
}

// 0, 1
duplex.on(‘data‘, data => console.log(‘ondata‘, data.toString()))

duplex.write(‘a‘)
duplex.write(‘b‘)

duplex.end()

上面的代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
同时,又实现了_write方法,可作为下游去消耗数据。

因为它既可读又可写,所以称它有两端:可写端和可读端。
可写端的接口与Writable一致,作为下游来使用;可读端的接口与Readable一致,作为上游来使用。

Transform

在上面的例子中,可读流中的数据(0, 1)与可写流中的数据(‘a‘, ‘b‘)是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端。
Tranform继承自Duplex,并已经实现了_read_write方法,同时要求用户实现一个_transform方法。

‘use strict‘

const Transform = require(‘stream‘).Transform

class Rotate extends Transform {
  constructor(n) {
    super()
    // 将字母旋转`n`个位置
    this.offset = (n || 13) % 26
  }

  // 将可写端写入的数据变换后添加到可读端
  _transform(buf, enc, next) {
    var res = buf.toString().split(‘‘).map(c => {
      var code = c.charCodeAt(0)
      if (c >= ‘a‘ && c <= ‘z‘) {
        code += this.offset
        if (code > ‘z‘.charCodeAt(0)) {
          code -= 26
        }
      } else if (c >= ‘A‘ && c <= ‘Z‘) {
        code += this.offset
        if (code > ‘Z‘.charCodeAt(0)) {
          code -= 26
        }
      }
      return String.fromCharCode(code)
    }).join(‘‘)

    // 调用push方法将变换后的数据添加到可读端
    this.push(res)
    // 调用next方法准备处理下一个
    next()
  }

}

var transform = new Rotate(3)
transform.on(‘data‘, data => process.stdout.write(data))
transform.write(‘hello, ‘)
transform.write(‘world!‘)
transform.end()

// khoor, zruog!

objectMode

前面几节的例子中,经常看到调用data.toString()。这个toString()的调用是必需的吗?
本节介绍完如何控制流中的数据类型后,自然就有了答案。

在shell中,用管道(|)连接上下游。上游输出的是文本流(标准输出流),下游输入的也是文本流(标准输入流)。在本文介绍的流中,默认也是如此。

对于可读流来说,push(data)时,data只能是StringBuffer类型,而消耗时data事件输出的数据都是Buffer类型。对于可写流来说,write(data)时,data只能是StringBuffer类型,_write(data)调用时传进来的data都是Buffer类型。

也就是说,流中的数据默认情况下都是Buffer类型。产生的数据一放入流中,便转成Buffer被消耗;写入的数据在传给底层写逻辑时,也被转成Buffer类型。

但每个构造函数都接收一个配置对象,有一个objectMode的选项,一旦设置为true,就能出现“种瓜得瓜,种豆得豆”的效果。

Readable未设置objectMode时:

const Readable = require(‘stream‘).Readable

const readable = Readable()

readable.push(‘a‘)
readable.push(‘b‘)
readable.push(null)

readable.on(‘data‘, data => console.log(data))

输出:

<Buffer 61>
<Buffer 62>

Readable设置objectMode后:

const Readable = require(‘stream‘).Readable

const readable = Readable({ objectMode: true })

readable.push(‘a‘)
readable.push(‘b‘)
readable.push({})
readable.push(null)

readable.on(‘data‘, data => console.log(data))

输出:

a
b
{}

可见,设置objectMode后,push(data)的数据被原样地输出了。此时,可以生产任意类型的数据。

预告

Stream系列共三篇文章:

  • 第一部分:基础篇,介绍Stream接口的基本使用。
  • 第二部分:进阶篇,重点剖析Stream底层如何支持流式数据处理,及其back pressure机制。
  • 第三部分:实战篇。介绍如何使用Stream进行程序设计。从BrowserifyGulp总结出两种设计模式,并基于Stream构建一个为Git仓库自动生成changelog的应用作为示例。

参考文献

时间: 2024-10-21 17:06:14

Node.js Stream-基础篇的相关文章

Node.js Stream - 实战篇

邹斌 ·2016-07-22 11:04 背景 前面两篇(基础篇和进阶篇)主要介绍流的基本用法和原理,本篇从应用的角度,介绍如何使用管道进行程序设计,主要内容包括: 管道的概念 Browserify的管道设计 Gulp的管道设计 两种管道设计模式比较 实例 Pipeline 所谓“管道”,指的是通过a.pipe(b)的形式连接起来的多个Stream对象的组合. 假如现在有两个Transform:bold和red,分别可将文本流中某些关键字加粗和飘红.可以按下面的方式对文本同时加粗和飘红: //

Node.js系列基础学习----安装,实现Hello World, REPL

Node.js基础学习 1:简介 简单的说 Node.js 就是运行在服务端的 JavaScript.Node.js 是一个基于Chrome JavaScript 运行时建立的一个平台.Node.js是一个事件驱动I/O服务端JavaScript环境,基于Google的V8引擎,V8引擎执行Javascript的速度非常快,性能非常好 2:安装 à安装node.js 在官网安装自己win版本的node.js的版本,下载,安装完毕后在运行中输入node -v若是出现版本号就证明安装成功. à安装n

Node.js Stream-进阶篇

作者:美团点评技术团队链接:https://zhuanlan.zhihu.com/p/21681115来源:知乎著作权归作者所有.商业转载请联系作者获得授权,非商业转载请注明出处. 上篇(基础篇)主要介绍了Stream的基本概念和用法,本篇将深入剖析背后工作原理,重点是如何实现流式数据处理和back pressure机制. 目录 本篇介绍stream是如何实现流式数据处理的. 数据生产和消耗的媒介 为什么使用流取数据 如何通过流取到数据 read push方法 end事件 readable事件

Node.js Stream(流)

Stream 是一个抽象接口,Node 中有很多对象实现了这个接口.例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出). Node.js,Stream 有四种流类型: Readable - 可读操作. Writable - 可写操作. Duplex - 可读可写操作. Transform - 操作被写入数据,然后读出结果. 所有的 Stream 对象都是 EventEmitter 的实例.常用的事件有: data - 当有数据可读时触发.

学习node.js 第1篇 介绍nodejs

Node.js是什么? Node.js是建立在谷歌Chrome的JavaScript引擎(V8引擎)的Web应用程序框架. 它的最新版本是:v0.12.7(在编写本教程时的版本).Node.js在官方网站的定义文件内容如下: Node.js® is a platform built on Chrome's JavaScript runtime for easily building fast, scalable network applications. Node.js uses an even

学习node.js 第2篇 介绍node.js 安装

Node.js - 环境安装配置 如果愿意安装设置Node.js环境,需要计算机上提供以下两个软件: 一.文本编辑器 二.Node.js二进制安装包 文本编辑器 这将用来编写程序代码. 一些编辑器包括:Windows记事本,OS Edit命令,Brief, Epsilon, EMACS和VIM或vi. 文本编辑器的名称和版本的在不同的操作系统可能不太相同.例如,记事本可用在Windows,VIM或vi可以在Windows以及Linux或UNIX上使用. 编辑器创建的文件称为源文件并包含程序的源代

node.js学习第一篇

这篇从一个基本的读取本地文件案例,大概介绍了node.js的写法,代码如下 1 var fs = require('fs'); 2 var stats = []; 3 fs.readdir(process.cwd(), function(err, files){ 4 console.log(files); 5 console.log(' '); 6 if(!files.length){ 7 return console.log(' \033[31m No file to show!\033[39

三、Node.js中基础知识

1.控制台console对象的方法: 1) console.log(); //显示一行字符串 2) console.info(); //与1)完全相同 3) console.error(); //标准错误输出流的输出 4) console.warn(); //与3)完全相同 5) console.dir(); //查看一个对象中的内容并将其信息输出到控制台 6) console.time(); //标记开始时间 7) console.timeEnd(); //标记结束时间 8) console.

node.js零基础详细教程(5):express 、 路由

第五章 建议学习时间4小时  课程共10章 学习方式:详细阅读,并手动实现相关代码 学习目标:此教程将教会大家 安装Node.搭建服务器.express.mysql.mongodb.编写后台业务逻辑.编写接口,最后完成一个完整的项目后台,预计共10天课程. express Express 是一个基于 Node.js 平台的极简.灵活的 web 应用开发框架,它提供一系列强大的特性,帮助你创建各种 Web 和移动设备应用. 其实可以简单的将express理解为node.js的一个类库:我们在使用j