菜单

相关源文件

本文档提供了 Node.js 中流 API 的全面概述,它是处理流数据的基本抽象。通过一次处理一小块数据而不是将所有数据加载到内存中,流可以实现高效的数据处理。它们特别适用于处理大文件、网络通信和其他 I/O 操作。

有关经常使用流的文件系统 API 的信息,请参阅 文件系统

流简介

流是 Node.js 中用于处理流数据的抽象接口。它们提供了一种以内存高效的方式读取或写入连续数据流的方法。所有流都是 EventEmitter 类的实例,允许它们发出事件并响应事件。

Node.js 中的四种基本流类型是

  1. Readable:可从中读取数据的流(例如,从文件中读取)
  2. Writable:可向其中写入数据的流(例如,写入文件)
  3. Duplex:既可读又可写的流(例如,TCP 套接字)
  4. Transform:可作为读写数据的副本并可修改或转换数据的双工流(例如,压缩流)

来源:doc/api/stream.md36-46

流架构

流类层次结构

来源:lib/stream.js54-108 doc/api/stream.md35-46

数据流和缓冲

Readable 和 Writable 流都会在内部缓冲数据。潜在的缓冲数据量取决于创建流时传递的 highWaterMark 选项。对于普通流,highWaterMark 指定总字节数。对于对象模式的流,它指定对象总数。

当 Readable 流的内部缓冲区达到 highWaterMark 指定的阈值时,它将暂时停止从底层资源读取,直到缓冲数据被消耗。同样,当 Writable 流的内部缓冲区超过 highWaterMark 时,对 write() 的调用将返回 false,表明生产者应停止发送数据,直到发出 'drain' 事件。

来源:doc/api/stream.md342-396

Readable 流

Readable 流是可从中消耗数据的源的抽象。例如,从文件读取、从 HTTP 响应读取或从 TCP 套接字读取。

创建 Readable 流

有多种方法可以创建 Readable 流

  1. 从现有数据源实例化

  2. 通过扩展 Readable 类来实现自定义 Readable 流

消耗 Readable 流

Readable 流可以通过几种方式进行消耗

  1. 使用事件监听器

  2. 使用 pipe() 方法将数据发送到 Writable 流

  3. 使用异步迭代(配合 for await...of

  4. 使用流运算符(Node.js 16.x 及更高版本)

来源:doc/api/stream.md441-467 doc/api/stream.md195-246

Writable 流

Writable 流是可向其中写入数据的目标(destination)的抽象。例如,写入文件、写入 HTTP 请求或写入 TCP 套接字。

创建 Writable 流

要实现自定义 Writable 流,请扩展 Writable 类并实现 _write() 方法

写入 Writable 流

可以使用 write()end() 方法将数据写入 Writable 流

write() 方法返回一个布尔值,指示是否可以继续写入,或者内部缓冲区是否已满。如果返回 false,则应等待 'drain' 事件发出后再写入更多数据,以避免使流过载

来源:doc/api/stream.md468-499 doc/api/stream.md537-562

Duplex 和 Transform 流

Duplex 流是实现 Readable 和 Writable 接口的流。Transform 流是 Duplex 流的一种特殊类型,其输出由输入计算得出。

Duplex 流

Duplex 流的例子包括 TCP 套接字、zlib 流和 crypto 流。Duplex 流为读写维护单独的内部缓冲区,允许每侧独立操作。

Transform 流

Transform 流是 Duplex 流,其输出是根据输入计算得出的。它们同时实现 Readable 和 Writable 接口,并在将数据提供读取之前转换写入的数据。

Transform 流的例子包括 zlib 压缩流和 crypto 密码流。

要实现自定义 Transform 流,请扩展 Transform 类并实现 _transform() 方法

来源:doc/api/stream.md382-396

流运算符

Node.js 提供了一组流运算符,用于以更函数式的方式处理流。从 Node.js v16 开始,这些运算符可在 Readable 流上使用。

常用流运算符

  • map(fn, options):使用提供的函数转换流中的每个块
  • filter(fn, options):根据提供的谓词函数过滤流中的块
  • reduce(reducer, initialValue, options):将流归约为单个值
  • forEach(fn, options):对流中的每个块执行操作
  • some(fn, options):如果流中的任何块满足谓词,则返回 true
  • every(fn, options):如果流中的所有块都满足谓词,则返回 true
  • find(fn, options):返回满足谓词的第一个块
  • toArray():将流中的所有块收集到一个数组中
  • flatMap(fn, options):将每个块映射到一个流并展平结果
  • take(n):从流中获取前 n 个块
  • drop(n):从流中丢弃前 n 个块

使用示例

来源:lib/internal/streams/operators.js68-216 lib/internal/streams/operators.js256-268 test/parallel/test-stream-map.js30-57 test/parallel/test-stream-filter.js12-48 test/parallel/test-stream-toArray.js9-38

实用函数

Node.js 提供了几个用于处理流的实用函数

pipeline(source, ...transforms, destination, callback)

pipeline 函数将流连接起来,并正确处理错误和清理

还有一个基于 Promise 的版本可从 'node:stream/promises' 获得

finished(stream[, options])

finished 函数等待流完成或出错

还有一个基于 Promise 的版本

来源:doc/api/stream.md65-246 doc/api/stream.md250-321

对象模式 vs. 二进制模式

默认情况下,流在 Buffer、字符串、TypedArrays 或 DataViews 上操作。当使用 objectMode: true 选项创建流时,它将在 JavaScript 对象上操作。

二进制模式和对象模式之间的主要区别

  1. 在二进制模式下,highWaterMark 选项指定总字节数,而在对象模式下,它指定对象总数。
  2. 二进制模式流自动将字符串转换为 Buffer,而对象模式流可以处理除 null(表示流结束)以外的任何 JavaScript 值。

来源:doc/api/stream.md323-340

背压处理

背压是流数据处理中的一个重要概念。它指的是一种机制,允许较慢的消费者向较快的生产者发出信号,表明其需要减速以避免使消费者过载。

在 Node.js 流中,背压是自动处理的

  1. 对于 Readable 流,当内部缓冲区填充到 highWaterMark 时,流会暂时停止从底层资源读取。
  2. 对于 Writable 流,当内部缓冲区超过 highWaterMark 时,write() 方法将返回 false。然后生产者应停止发送数据,直到发出 'drain' 事件。

处理背压的示例

来源: doc/api/stream.md367-381 doc/api/stream.md532-562

与其他 Node.js API 集成

许多 Node.js API 使用或返回流

  1. 文件系统: fs.createReadStream(), fs.createWriteStream()
  2. HTTP/HTTPS: 请求和响应对象
  3. Zlib: 压缩/解压缩流
  4. Crypto: 加密/解密流
  5. 子进程: stdout, stdin, 和 stderr
  6. TCP 套接字: 用于网络通信的双工流

组合不同流类型的示例

来源: doc/api/stream.md473-486 doc/api/crypto.md326-395 doc/api/tls.md572-582

错误处理

处理错误是使用流时一个重要的方面。有几种处理流中错误的方法

  1. 基于事件的错误处理:监听流上的 'error' 事件。

  2. 使用 pipeline()pipeline() 函数会自动处理错误和清理。

  3. 使用 finished()finished() 函数检测流何时完成或出错。

  4. 将 async/await 与 Promise 结合使用:使用 pipeline()finished() 的基于 Promise 的版本。

来源: doc/api/stream.md65-113 doc/api/stream.md250-321

最佳实践

在使用 Node.js 流时,请考虑以下最佳实践

  1. 使用 pipeline() 来连接多个流:它能正确处理错误和清理。
  2. 处理反压:检查 write() 的返回值,并在需要时等待 'drain' 事件。
  3. 设置适当的 highWaterMark 值:通过设置适当的缓冲区大小来控制内存使用。
  4. 正确关闭流:在 Writable 流上调用 end(),并处理 Readable 流上的 'end' 事件。
  5. 明智地使用 object mode:仅在必要时使用 object mode,因为它可能比 binary mode 效率低。
  6. 实现适当的错误处理:始终处理流中的错误,以防止内存泄漏和未捕获的异常。
  7. 考虑使用流运算符:对于函数式流处理,请使用 map()filter() 等运算符。
  8. 使用 async 迭代来消费流:使用 for await...of,您可以更清晰地消费 Readable 流。

来源: doc/api/stream.md323-396 lib/internal/streams/operators.js68-216

结论

Node.js 的 Streams API 为处理流动数据提供了一个强大的抽象。它通过以小块处理数据、管理反压以及跨不同类型 I/O 操作提供一致的接口,实现了高效的数据处理。理解流对于构建高性能的 Node.js 应用程序至关重要,尤其是那些处理大量数据或网络通信的应用程序。

通过利用 Node.js 提供的流运算符和实用函数,您可以构建复杂的、具有干净、可维护代码的数据处理管道。