使用 Node.js 进行 Shell 脚本编程
您可以购买本书的离线版本(HTML、PDF、EPUB、MOBI),并支持免费在线版本。
(广告,请不要屏蔽。)

10 在 Node.js 上使用 Web Streams



Web Streams 是一个用于 streams 的标准,现在所有主流 Web 平台都支持它:Web 浏览器、Node.js 和 Deno。(Streams 是一种抽象概念,用于从各种来源(文件、托管在服务器上的数据等)顺序读取和写入数据。)

例如,全局函数 fetch()(用于下载在线资源)异步返回一个 Response,它有一个属性 .body,其中包含一个 Web Stream。

本章介绍 Node.js 上的 Web Streams,但我们学到的大部分内容都适用于支持它们的 Web 平台。

10.1 什么是 Web Streams?

让我们从概述 Web Streams 的一些基础知识开始。之后,我们将快速进入示例。

Streams 是一种用于访问数据的结构,例如

它们的两大优势是

Web Streams(通常省略“Web”)是一个相对较新的标准,它起源于 Web 浏览器,但现在也得到了 Node.js 和 Deno 的支持(如 MDN 兼容性表 所示)。

在 Web Streams 中,块通常是以下两种之一

10.1.1 Streams 的种类

Web Streams 主要有三种

ReadableStreams、WritableStreams 和 TransformStreams 可用于传输文本或二进制数据。在本章中,我们主要使用前者。字节流(用于二进制数据)将在最后简要介绍。

10.1.2 管道链

管道传输 是一种操作,它允许我们将 ReadableStream 管道传输 到 WritableStream:只要 ReadableStream 生成数据,此操作就会读取该数据并将其写入 WritableStream。如果我们只连接两个 Streams,我们就可以得到一种方便的方法来将数据从一个位置传输到另一个位置(例如,复制文件)。但是,我们也可以连接两个以上的 Streams,并获得可以以各种方式处理数据的 管道链。这是一个管道链的示例

ReadableStream 通过将前者管道传输到后者的可写端来连接到 TransformStream。类似地,TransformStream 通过将前者的可读端管道传输到后者的可写端来连接到另一个 TransformStream。TransformStream 通过将前者的可读端管道传输到后者来连接到 WritableStream。

10.1.3 背压

管道链中的一个问题是,成员接收到的数据可能多于其当前可以处理的数据。背压 是一种解决此问题的技术:它使数据接收器能够告诉其发送器,它应该暂时停止发送数据,以便接收器不会过载。

另一种看待背压的方式是将其视为一种信号,该信号通过管道链从过载的成员向后传播到链的开头。例如,请考虑以下管道链

ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream

这就是背压如何通过此链传播的

我们已经到达管道链的开头。因此,ReadableStream 内部不会累积数据(也已缓冲),WritableStream 有时间恢复。一旦恢复,它就会发出信号,表明它已准备好再次接收数据。该信号也会通过链向后传播,直到到达 ReadableStream,数据处理才会恢复。

在第一次了解背压时,为了便于理解,省略了一些细节。这些将在后面介绍。

10.1.4 Node.js 中对 Web Streams 的支持

在 Node.js 中,Web Streams 可从以下两个来源获得

目前,Node.js 中只有一个 API 直接支持 Web Streams – Fetch API

const response = await fetch('https://example.com');
const readableStream = response.body;

对于其他情况,我们需要使用模块 'node:stream' 中的以下静态方法之一,将 Node.js Stream 转换为 Web Stream,反之亦然

另一个 API 部分支持 Web Streams:FileHandles 具有方法 .readableWebStream()

10.2 从 ReadableStreams 中读取数据

ReadableStreams 允许我们从各种来源读取数据块。它们具有以下类型(您可以随意浏览此类型及其属性的说明;当我们在示例中遇到它们时,将再次对其进行说明)

interface ReadableStream<TChunk> {
  getReader(): ReadableStreamDefaultReader<TChunk>;
  readonly locked: boolean;
  [Symbol.asyncIterator](): AsyncIterator<TChunk>;

  cancel(reason?: any): Promise<void>;

  pipeTo(
    destination: WritableStream<TChunk>,
    options?: StreamPipeOptions
  ): Promise<void>;
  pipeThrough<TChunk2>(
    transform: ReadableWritablePair<TChunk2, TChunk>,
    options?: StreamPipeOptions
  ): ReadableStream<TChunk2>;
  
  // Not used in this chapter:
  tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}

interface StreamPipeOptions {
  signal?: AbortSignal;
  preventClose?: boolean;
  preventAbort?: boolean;
  preventCancel?: boolean;
}

这些属性的说明

以下小节介绍了使用 ReadableStreams 的三种方法

10.2.1 通过 Readers 使用 ReadableStreams

我们可以使用 Readers 从 ReadableStreams 中读取数据。它们具有以下类型(您可以随意浏览此类型及其属性的说明;当我们在示例中遇到它们时,将再次对其进行说明)

interface ReadableStreamGenericReader {
  readonly closed: Promise<undefined>;
  cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
  extends ReadableStreamGenericReader
{
  releaseLock(): void;
  read(): Promise<ReadableStreamReadResult<TChunk>>;
}

interface ReadableStreamReadResult<TChunk> {
  done: boolean;
  value: TChunk | undefined;
}

这些属性的说明

如果您了解迭代的工作原理,ReadableStreamReadResult 可能看起来很熟悉:ReadableStream 类似于可迭代对象,Reader 类似于迭代器,ReadableStreamReadResult 类似于迭代器方法 .next() 返回的对象。

以下代码演示了使用 Reader 的协议

const reader = readableStream.getReader(); // (A)
assert.equal(readableStream.locked, true); // (B)
try {
  while (true) {
    const {done, value: chunk} = await reader.read(); // (C)
    if (done) break;
    // Use `chunk`
  }
} finally {
  reader.releaseLock(); // (D)
}

**获取 Reader。** 我们不能直接从 readableStream 读取,我们首先需要获取一个 *Reader*(A 行)。每个 ReadableStream 最多只能有一个 Reader。获取 Reader 后,readableStream 将被锁定(B 行)。在我们再次调用 .getReader() 之前,我们必须调用 .releaseLock()(D 行)。

**读取块。** .read() 返回一个 Promise,该 Promise 用于解析为具有属性 .done.value 的对象(C 行)。读取最后一个块后,.donetrue。这种方法类似于 JavaScript 中异步迭代的工作方式。

10.2.1.1 示例:通过 ReadableStream 读取文件

在以下示例中,我们从文本文件 data.txt 中读取块(字符串)

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)

const reader = webReadableStream.getReader();
try {
  while (true) {
    const {done, value} = await reader.read();
    if (done) break;
    console.log(value);
  }
} finally {
  reader.releaseLock();
}
// Output:
// 'Content of text file\n'

我们将 Node.js Readable 转换为 Web ReadableStream(A 行)。然后,我们使用前面解释的协议来读取块。

10.2.1.2 示例:使用 ReadableStream 的内容组装字符串

在下一个示例中,我们将 ReadableStream 的所有块连接成一个字符串并返回它

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString(readableStream) {
  const reader = readableStream.getReader();
  try {
    let result = '';
    while (true) {
      const {done, value} = await reader.read();
      if (done) {
        return result; // (A)
      }
      result += value;
    }
  } finally {
    reader.releaseLock(); // (B)
  }
}

方便的是,finally 子句始终会被执行,无论我们如何离开 try 子句。也就是说,如果我们返回结果(A 行),则锁将被正确释放(B 行)。

10.2.2 通过异步迭代消费 ReadableStream

ReadableStream 也可以通过异步迭代来消费

const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
  while (true) {
    let chunk;
    ({done: exhaustive, value: chunk} = await iterator.next());
    if (exhaustive) break;
    console.log(chunk);
  }
} finally {
  // If the loop was terminated before we could iterate exhaustively
  // (via an exception or `return`), we must call `iterator.return()`.
  // Check if that was the case.
  if (!exhaustive) {
    iterator.return();
  }
}

值得庆幸的是,for-await-of 循环为我们处理了异步迭代的所有细节

for await (const chunk of readableStream) {
  console.log(chunk);
}
10.2.2.1 示例:使用异步迭代读取流

让我们重做之前从文件中读取文本的尝试。这一次,我们使用异步迭代而不是 Reader

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
  console.log(chunk);
}
// Output:
// 'Content of text file'
10.2.2.2 示例:使用 ReadableStream 的内容组装字符串

我们之前使用 Reader 来使用 ReadableStream 的内容组装字符串。使用异步迭代,代码变得更简单

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString2(readableStream) {
  let result = '';
  for await (const chunk of readableStream) {
    result += chunk;
  }
  return result;
}
10.2.2.3 注意:浏览器不支持对 ReadableStream 的异步迭代

目前,Node.js 和 Deno 支持对 ReadableStream 的异步迭代,但 Web 浏览器不支持:有一个GitHub issue链接到错误报告。

鉴于尚不清楚如何在浏览器上支持异步迭代,因此包装比 polyfill 更安全。以下代码基于Chromium 错误报告中的建议

async function* getAsyncIterableFor(readableStream) {
  const reader = readableStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) return;
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

10.2.3 将 ReadableStream 管道传输到 WritableStream

ReadableStream 有两种用于管道传输的方法

10.3 通过包装将数据源转换为 ReadableStream

如果我们想通过 ReadableStream 读取外部源,我们可以将其包装在适配器对象中,并将该对象传递给 ReadableStream 构造函数。适配器对象称为 ReadableStream 的 *底层源*(排队策略将在后面我们详细介绍背压时进行解释)

new ReadableStream(underlyingSource?, queuingStrategy?)

这是底层源的类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次对其进行解释)

interface UnderlyingSource<TChunk> {
  start?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  pull?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  cancel?(reason?: any): void | Promise<void>;

  // Only used in byte streams and ignored in this section:
  type: 'bytes' | undefined;
  autoAllocateChunkSize: bigint;
}

这是 ReadableStream 调用这些方法的时间

这些方法中的每一个都可以返回一个 Promise,并且在 Promise 被解决之前不会采取进一步的步骤。如果我们想做一些异步的事情,这很有用。

.start().pull() 的参数 controller 允许它们访问流。它具有以下类型

type ReadableStreamController<TChunk> =
  | ReadableStreamDefaultController<TChunk>
  | ReadableByteStreamController<TChunk> // ignored here
;

interface ReadableStreamDefaultController<TChunk> {
  enqueue(chunk?: TChunk): void;
  readonly desiredSize: number | null;
  close(): void;
  error(err?: any): void;
}

目前,块是字符串。稍后我们将介绍字节流,其中 Uint8Array 很常见。以下是这些方法的作用

10.3.1 实现底层源的第一个示例

在我们实现底层源的第一个示例中,我们仅提供方法 .start()。我们将在下一小节中看到 .pull() 的用例。

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First line\n'); // (A)
    controller.enqueue('Second line\n'); // (B)
    controller.close(); // (C)
  },
});
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'First line\n'
// 'Second line\n'

我们使用控制器创建一个包含两个块的流(A 行和 B 行)。关闭流非常重要(C 行)。否则,for-await-of 循环将永远不会结束!

请注意,这种排队方式并不完全安全:存在超过内部队列容量的风险。我们很快就会看到如何避免这种风险。

10.3.2 使用 ReadableStream 包装推送源或拉取源

一种常见的情况是将推送源或拉取源转换为 ReadableStream。源是推送还是拉取决定了我们如何使用 UnderlyingSource 连接到 ReadableStream

接下来我们将看到两种源的示例。

10.3.2.1 示例:从支持背压的推送源创建 ReadableStream

在以下示例中,我们围绕套接字包装了一个 ReadableStream,该套接字将其数据推送到我们这里(它调用我们)。此示例取自 Web 流规范

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // The internal queue is full, so propagate
          // the backpressure signal to the underlying source.
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(
        new Error('The socket errored!'));
    },

    pull() {
      // This is called if the internal queue has been emptied, but the
      // stream’s consumer still wants more data. In that case, restart
      // the flow of data if we have previously paused it.
      socket.readStart();
    },

    cancel() {
      socket.close();
    },
  });
}
10.3.2.2 示例:从拉取源创建 ReadableStream

工具函数 iterableToReadableStream() 接受一个块的可迭代对象,并将其转换为 ReadableStream

/**
 * @param iterable an iterable (asynchronous or synchronous)
 */
 function iterableToReadableStream(iterable) {
  return new ReadableStream({
    start() {
      if (typeof iterable[Symbol.asyncIterator] === 'function') {
        this.iterator = iterable[Symbol.asyncIterator]();
      } else if (typeof iterable[Symbol.iterator] === 'function') {
        this.iterator = iterable[Symbol.iterator]();
      } else {
        throw new Error('Not an iterable: ' + iterable);
      }
    },

    async pull(controller) {
      if (this.iterator === null) return;
      // Sync iterators return non-Promise values,
      // but `await` doesn’t mind and simply passes them on
      const {value, done} = await this.iterator.next();
      if (done) {
        this.iterator = null;
        controller.close();
        return;
      }
      controller.enqueue(value);
    },

    cancel() {
      this.iterator = null;
      controller.close();
    },
  });
}

让我们使用异步生成器函数创建一个异步可迭代对象,并将该可迭代对象转换为 ReadableStream

async function* genAsyncIterable() {
  yield 'how';
  yield 'are';
  yield 'you';
}
const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'how'
// 'are'
// 'you'

iterableToReadableStream() 也适用于同步可迭代对象

const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'hello'
// 'everyone'

最终可能会有一个静态辅助方法 ReadableStream.from() 来提供此功能(有关更多信息,请参阅其拉取请求)。

10.4 写入 WritableStream

WritableStream 允许我们将数据块写入各种接收器。它们具有以下类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次对其进行解释)

interface WritableStream<TChunk> {
  getWriter(): WritableStreamDefaultWriter<TChunk>;
  readonly locked: boolean;

  close(): Promise<void>;
  abort(reason?: any): Promise<void>;
}

这些属性的说明

以下小节介绍了将数据发送到 WritableStream 的两种方法

10.4.1 通过 Writer 写入 WritableStream

我们可以使用 *Writer* 写入 WritableStream。它们具有以下类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次对其进行解释)

interface WritableStreamDefaultWriter<TChunk> {
  readonly desiredSize: number | null;
  readonly ready: Promise<undefined>;
  write(chunk?: TChunk): Promise<void>;
  releaseLock(): void;

  close(): Promise<void>;
  readonly closed: Promise<undefined>;
  abort(reason?: any): Promise<void>;
}

这些属性的说明

以下代码显示了使用 Writer 的协议

const writer = writableStream.getWriter(); // (A)
assert.equal(writableStream.locked, true); // (B)
try {
  // Writing the chunks (explained later)
} finally {
  writer.releaseLock(); // (C)
}

我们不能直接写入 writableStream,我们首先需要获取一个 *Writer*(A 行)。每个 WritableStream 最多只能有一个 Writer。获取 Writer 后,writableStream 将被锁定(B 行)。在我们再次调用 .getWriter() 之前,我们必须调用 .releaseLock()(C 行)。

有三种写入块的方法。

10.4.1.1 写入方法 1:等待 .write()(低效地处理背压)

第一种写入方法是等待 .write() 的每个结果

await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();

当我们传递给 .write() 的块成功写入后,它返回的 Promise 将被 fulfill。“成功写入”的确切含义取决于 WritableStream 的实现方式,例如,对于文件流,该块可能已发送到操作系统,但仍驻留在缓存中,因此实际上尚未写入磁盘。

当流关闭时,.close() 返回的 Promise 将被 fulfill。

这种写入方法的一个缺点是,等到写入成功意味着不使用队列。因此,数据吞吐量可能会降低。

10.4.1.2 写入方法 2:忽略 .write() 拒绝(忽略背压)

在第二种写入方法中,我们忽略 .write() 返回的 Promise,只等待 .close() 返回的 Promise。

writer.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
await writer.close(); // reports errors

.write() 的同步调用将块添加到 WritableStream 的内部队列中。通过不等待返回的 Promise,我们不会等到每个块都被写入。但是,等待 .close() 确保在继续之前队列为空并且所有写入都已成功。

在 A 行和 B 行调用 .catch() 是必要的,以避免在写入过程中出现问题时出现有关未处理的 Promise 拒绝的警告。此类警告通常会记录到控制台。我们可以忽略 .write() 报告的错误,因为 .close() 也会向我们报告这些错误。

可以使用一个忽略 Promise 拒绝的辅助函数来改进前面的代码。

ignoreRejections(
  writer.write('Chunk 1'),
  writer.write('Chunk 2'),
);
await writer.close(); // reports errors

function ignoreRejections(...promises) {
  for (const promise of promises) {
    promise.catch(() => {});
  }
}

这种方法的一个缺点是忽略了背压:我们只是假设队列足够大,可以容纳我们写入的所有内容。

10.4.1.3 写入方法 3:等待 .ready(有效处理背压)

在这种写入方法中,我们通过等待 Writer getter .ready 来有效地处理背压。

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 1').catch(() => {});

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 2').catch(() => {});

await writer.close(); // reports errors

每当流从有背压转换到没有背压时,.ready 中的 Promise 就会兑现。

10.4.1.4 示例:通过 Writer 写入文件

在本例中,我们通过 WritableStream 创建一个文本文件 data.txt

import * as fs from 'node:fs';
import {Writable} from 'node:stream';

const nodeWritable = fs.createWriteStream(
  'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)

const writer = webWritableStream.getWriter();
try {
  await writer.write('First line\n');
  await writer.write('Second line\n');
  await writer.close();
} finally {
  writer.releaseLock()
}

在 A 行,我们为文件 data.txt 创建一个 Node.js 流。在 B 行,我们将此流转换为 Web 流。然后我们使用 Writer 向其中写入字符串。

10.4.2 管道传输到 WritableStream

除了使用 Writer 之外,我们还可以通过将 ReadableStream 管道传输到 WritableStream 来写入 WritableStream。

await readableStream.pipeTo(writableStream);

当管道传输成功完成时,.pipeTo() 返回的 Promise 就会兑现。

10.4.2.1 管道传输是异步发生的

管道传输在当前任务完成或暂停后执行。以下代码演示了这一点。

const readableStream = new ReadableStream({ // (A)
  start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});
const writableStream = new WritableStream({ // (B)
  write(chunk) {
    console.log('WRITE: ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE WritableStream');
  },
});


console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
promise.then(() => console.log('Promise fulfilled'));
console.log('After .pipeTo()');

// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'

在 A 行,我们创建一个 ReadableStream。在 B 行,我们创建一个 WritableStream。

我们可以看到 .pipeTo()(C 行)立即返回。在新任务中,读取和写入块。然后关闭 writableStream,最后兑现 promise

10.4.2.2 示例:管道传输到文件的 WritableStream

在以下示例中,我们为文件创建一个 WritableStream,并将 ReadableStream 管道传输到该文件。

const webReadableStream = new ReadableStream({ // (A)
  async start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});

const nodeWritable = fs.createWriteStream( // (B)
  'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)

await webReadableStream.pipeTo(webWritableStream); // (D)

在 A 行,我们创建一个 ReadableStream。在 B 行,我们为文件 data.txt 创建一个 Node.js 流。在 C 行,我们将此流转换为 Web 流。在 D 行,我们将 webReadableStream 管道传输到文件的 WritableStream。

10.4.2.3 示例:将两个 ReadableStream 写入一个 WritableStream

在以下示例中,我们将两个 ReadableStream 写入一个 WritableStream。

function createReadableStream(prefix) {
  return new ReadableStream({
    async start(controller) {
      controller.enqueue(prefix + 'chunk 1');
      controller.enqueue(prefix + 'chunk 2');
      controller.close();
    },
  });
}

const writableStream = new WritableStream({
  write(chunk) {
    console.log('WRITE ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE');
  },
  abort(err) {
    console.log('ABORT ' + err);
  },
});

await createReadableStream('Stream 1: ')
  .pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
  .pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();

// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'

我们告诉 .pipeTo() 在 ReadableStream 关闭后不要关闭 WritableStream(A 行和 B 行)。因此,WritableStream 在 A 行之后保持打开状态,我们可以将另一个 ReadableStream 管道传输到它。

10.5 通过包装将数据接收器转换为 WritableStream

如果我们想通过 WritableStream 写入外部接收器,我们可以将其包装在一个适配器对象中,并将该对象传递给 WritableStream 构造函数。适配器对象称为 WritableStream 的*底层接收器*(队列策略将在后面我们详细介绍背压时解释)。

new WritableStream(underlyingSink?, queuingStrategy?)

这是底层接收器的类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次解释它们)。

interface UnderlyingSink<TChunk> {
  start?(
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  write?(
    chunk: TChunk,
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  close?(): void | Promise<void>;;
  abort?(reason?: any): void | Promise<void>;
}

这些属性的说明

.start().write() 的参数 controller 允许它们使 WritableStream 出错。它具有以下类型。

interface WritableStreamDefaultController {
  readonly signal: AbortSignal;
  error(err?: any): void;
}

10.5.1 示例:跟踪 ReadableStream

在下一个示例中,我们将 ReadableStream 管道传输到 WritableStream,以便检查 ReadableStream 如何生成块。

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First chunk');
    controller.enqueue('Second chunk');
    controller.close();
  },
});
await readableStream.pipeTo(
  new WritableStream({
    write(chunk) {
      console.log('WRITE ' + JSON.stringify(chunk));
    },
    close() {
      console.log('CLOSE');
    },
    abort(err) {
      console.log('ABORT ' + err);
    },
  })
);
// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'

10.5.2 示例:在字符串中收集写入 WriteStream 的块

在下一个示例中,我们创建 WriteStream 的一个子类,该子类在字符串中收集所有写入的块。我们可以通过方法 .getString() 访问该字符串。

class StringWritableStream extends WritableStream {
  #string = '';
  constructor() {
    super({
      // We need to access the `this` of `StringWritableStream`.
      // Hence the arrow function (and not a method).
      write: (chunk) => {
        this.#string += chunk;
      },
    });
  }
  getString() {
    return this.#string;
  }
}
const stringStream = new StringWritableStream();
const writer = stringStream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

这种方法的一个缺点是我们混合了两个 API:WritableStream 的 API 和我们新的字符串流 API。另一种方法是委托给 WritableStream,而不是扩展它。

function StringcreateWritableStream() {
  let string = '';
  return {
    stream: new WritableStream({
      write(chunk) {
        string += chunk;
      },
    }),
    getString() {
      return string;
    },
  };
}

const stringStream = StringcreateWritableStream();
const writer = stringStream.stream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

此功能也可以通过类(而不是作为对象的工厂函数)来实现。

10.6 使用 TransformStream

TransformStream

使用 TransformStream 最常见的方式是“管道传输”它们。

const transformedStream = readableStream.pipeThrough(transformStream);

.pipeThrough()readableStream 管道传输到 transformStream 的可写端,并返回其可读端。换句话说:我们创建了一个新的 ReadableStream,它是 readableStream 的转换版本。

.pipeThrough() 不仅接受 TransformStream,还接受任何具有以下形状的对象。

interface ReadableWritablePair<RChunk, WChunk> {
  readable: ReadableStream<RChunk>;
  writable: WritableStream<WChunk>;
}

10.6.1 标准 TransformStream

Node.js 支持以下标准 TransformStream。

10.6.1.1 示例:解码 UTF-8 编码的字节流

在以下示例中,我们解码 UTF-8 编码的字节流。

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

response.body 是一个 ReadableByteStream,其块是 Uint8ArrayTypedArray)的实例。我们将该流通过 TextDecoderStream 进行管道传输,以获得具有字符串块的流。

请注意,单独转换每个字节块(例如,通过 TextDecoder)是行不通的,因为 单个 Unicode 代码点在 UTF-8 中最多编码为四个字节,并且这些字节可能不在同一个块中。

10.6.1.2 示例:为标准输入创建可读文本流

以下 Node.js 模块记录通过标准输入发送给它的所有内容。

// echo-stdin.mjs
import {Readable} from 'node:stream';

const webStream = Readable.toWeb(process.stdin)
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
  console.log('>>>', chunk);
}

我们可以通过存储在 process.stdin 中的流访问标准输入(process 是一个全局 Node.js 变量)。如果我们没有为此流设置编码并通过 Readable.toWeb() 转换它,我们将获得一个字节流。我们将其通过 TextDecoderStream 进行管道传输,以获得文本流。

请注意,我们增量处理标准输入:一旦另一个块可用,我们就记录它。换句话说,我们不会等到标准输入完成。当数据很大或仅间歇发送时,这很有用。

10.7 实现自定义 TransformStream

我们可以通过将 Transformer 对象传递给 TransformStream 的构造函数来实现自定义 TransformStream。此类对象具有以下类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次解释它们)。

interface Transformer<TInChunk, TOutChunk> {
  start?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  transform?(
    chunk: TInChunk,
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  flush?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
}

这些属性的说明

这些方法中的每一个都可以返回一个 Promise,并且在 Promise 被解决之前不会采取进一步的步骤。如果我们想做一些异步的事情,这很有用。

参数 controller 具有以下类型。

interface TransformStreamDefaultController<TOutChunk> {
  enqueue(chunk?: TOutChunk): void;
  readonly desiredSize: number | null;
  terminate(): void;
  error(err?: any): void;
}

TransformStream 中的背压怎么办?该类将其可读端(输出)的背压传播到其可写端(输入)。假设是转换不会改变太多数据量。因此,转换可以忽略背压。但是,可以通过 transformStreamDefaultController.desiredSize 检测到它,并通过从 transformer.transform() 返回 Promise 来传播它。

10.7.1 示例:将任意块的流转换为行的流

以下 TransformStream 的子类将具有任意块的流转换为每个块恰好包含一行文本的流。也就是说,除了最后一个块之外,每个块都以行尾 (EOL) 字符串结尾:在 Unix(包括 macOS)上为 '\n',在 Windows 上为 '\r\n'

class ChunksToLinesTransformer {
  #previous = '';

  transform(chunk, controller) {
    let startSearch = this.#previous.length;
    this.#previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = this.#previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = this.#previous.slice(0, eolIndex+1);
      controller.enqueue(line);
      this.#previous = this.#previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }

  flush(controller) {
    // Clean up and enqueue any text we’re still holding on to
    if (this.#previous.length > 0) {
      controller.enqueue(this.#previous);
    }
  }
}
class ChunksToLinesStream extends TransformStream {
  constructor() {
    super(new ChunksToLinesTransformer());
  }
}

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('multiple\nlines of\ntext');
    controller.close();
  },
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);

for await (const line of transformed) {
  console.log('>>>', JSON.stringify(line));
}

// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'

请注意,Deno 的内置 TextLineStream 提供了类似的功能。

提示:我们也可以通过异步生成器进行此转换。它将异步迭代 ReadableStream 并返回一个具有行的异步可迭代对象。其实现在 §9.4 “通过异步生成器转换可读流” 中给出。

10.7.2 提示:异步生成器也非常适合转换流

由于 ReadableStream 是异步可迭代的,因此我们可以使用 异步生成器 来转换它们。这会导致非常优雅的代码。

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('one');
    controller.enqueue('two');
    controller.enqueue('three');
    controller.close();
  },
});

async function* prefixChunks(prefix, asyncIterable) {
  for await (const chunk of asyncIterable) {
    yield '> ' + chunk;
  }
}

const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
  console.log(transformedChunk);
}

// Output:
// '> one'
// '> two'
// '> three'

10.8 详细了解背压

让我们仔细看看背压。考虑以下管道链。

rs.pipeThrough(ts).pipeTo(ws);

rs 是一个 ReadableStream,ts 是一个 TransformStream,ws 是一个 WritableStream。这些是由前面的表达式创建的连接(.pipeThrough 使用 .pipeTors 连接到 ts 的可写端)。

rs -pipeTo-> ts{writable,readable} -pipeTo-> ws

观察结果

假设 ws 的底层接收器速度很慢,并且 ws 的缓冲区最终已满。然后会发生以下步骤

此示例说明我们需要两种功能

让我们探讨一下这些功能是如何在 Web Streams API 中实现的。

10.8.1 发出背压信号

背压信号由接收数据的实体发出。Web Streams 有两个这样的实体

在这两种情况下,输入都通过队列进行缓冲。应用背压的信号是队列已满。让我们看看如何检测到它。

以下是队列的位置

队列的*期望大小*是一个数字,表示队列中剩余的空间量

因此,如果期望大小为零或更小,我们必须应用背压。它可以通过包含队列的对象的 getter .desiredSize 获得。

期望大小是如何计算的?通过一个指定了所谓*排队策略*的对象。ReadableStreamWritableStream 具有默认的排队策略,可以通过其构造函数的可选参数覆盖。接口 QueuingStrategy 有两个属性

队列的期望大小是高水位线减去队列的当前大小。

10.8.2 对背压做出反应

发送数据的实体需要通过施加背压来响应发出的背压信号。

10.8.2.1 通过 Writer 向 WritableStream 写入数据的代码

如果我们愿意,我们还可以根据 writer.desiredSize 来确定块的大小。

10.8.2.2 ReadableStream 的底层源

可以传递给 ReadableStream 的底层源对象包装了一个外部源。在某种程度上,它也是管道链的成员;一个位于其 ReadableStream 之前的成员。

10.8.2.3 WritableStream 的底层接收器

可以传递给 WritableStream 的底层接收器对象包装了一个外部接收器。在某种程度上,它也是管道链的成员;一个位于其 WritableStream 之后的成员。

每个外部接收器发出背压信号的方式都不同(在某些情况下根本不会发出)。底层接收器可以通过从方法 .write() 返回一个 Promise 来施加背压,该 Promise 在写入完成后兑现。Web Streams 标准中有一个示例,演示了它是如何工作的。

10.8.2.4 TransformStream (.writable .readable)

TransformStream 通过为前者实现底层接收器,为后者实现底层源,将其可写端与其可读端连接起来。它有一个内部槽 .[[backpressure]],指示当前是否激活了内部背压。

10.8.2.5 .pipeTo() (ReadableStream WritableStream)

.pipeTo() 通过 Reader 从 ReadableStream 中读取块,并通过 Writer 将它们写入 WritableStream。每当 writer.desiredSize 为零或更小时,它都会暂停(Web Streams 标准:ReadableStreamPipeTo 的步骤 15)。

10.9 字节流

到目前为止,我们只处理了*文本流*,即块为字符串的流。但是 Web Streams API 也支持用于二进制数据的*字节流*,其中块是 Uint8Arrays(TypedArrays

接下来,我们将学习如何创建可读字节流。

10.9.1 可读字节流

ReadableStream 构造函数创建哪种流取决于其可选的第一个参数 underlyingSource 的可选属性 .type

如果 ReadableStream 处于 'bytes' 模式,会发生什么变化?

在默认模式下,底层源可以返回任何类型的块。在字节模式下,块必须是 ArrayBufferViews,即 TypedArrays(例如 Uint8Arrays)或 DataViews。

此外,可读字节流可以创建两种 Reader

“BYOB”代表“自带缓冲区”,这意味着我们可以将缓冲区(ArrayBufferView)传递给 reader.read()。之后,该 ArrayBufferView 将被分离,不再可用。但是 .read() 会在一个新的 ArrayBufferView 中返回其数据,该 ArrayBufferView 具有相同的类型并访问同一个 ArrayBuffer 的同一个区域。

此外,可读字节流具有不同的控制器:它们是 ReadableByteStreamController 的实例(相对于 ReadableStreamDefaultController)。除了强制底层源对 ArrayBufferViews(TypedArrays 或 DataViews)进行排队之外,它还通过其属性 .byobRequest 支持 ReadableStreamBYOBReaders。底层源将其数据写入存储在此属性中的 BYOBRequest。Web Streams 标准在其“创建流的示例”部分中有两个使用 .byobRequest 的示例。

10.9.2 示例:一个填充了随机数据的无限可读字节流

在下一个示例中,我们将创建一个无限的可读字节流,用随机数据填充其块(灵感来自:“在 Node.js 中实现 Web Streams API”中的 example4.mjs)。

import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);

const readableByteStream = new ReadableStream({
  type: 'bytes',
  async pull(controller) {
    const byobRequest = controller.byobRequest;
    await asyncRandomFill(byobRequest.view);
    byobRequest.respond(byobRequest.view.byteLength);
  },
});

const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);

由于 readableByteStream 是无限的,我们无法对其进行循环。这就是我们只读取其第一个块的原因(B 行)。

我们在 A 行创建的缓冲区被传输,因此在 B 行之后不可读。

10.9.3 示例:压缩可读字节流

在以下示例中,我们将创建一个可读字节流,并将其通过一个流进行管道传输,该流将其压缩为 GZIP 格式

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // 256 zeros
    controller.enqueue(new Uint8Array(256));
    controller.close();
  },
});
const transformedStream = readableByteStream.pipeThrough(
  new CompressionStream('gzip'));
await logChunks(transformedStream);

async function logChunks(readableByteStream) {
  const reader = readableByteStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) break;
      console.log(value);
    }
  } finally {
    reader.releaseLock();
  }
}

10.9.4 示例:通过 fetch() 读取网页

fetch() 的结果解析为一个响应对象,其属性 .body 是一个可读字节流。我们通过 TextDecoderStream 将该字节流转换为文本流

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
  new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

10.10 Node.js 特定的帮助程序

Node.js 是唯一支持以下帮助程序函数的 Web 平台,它将其称为实用程序消费者

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';

这些函数将 Web ReadableStreams、Node.js Readables 和 AsyncIterators 转换为 Promise,这些 Promise 将使用以下内容兑现

二进制数据假定为 UTF-8 编码

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // TextEncoder converts strings to UTF-8 encoded Uint8Arrays
    const encoder = new TextEncoder();
    const view = encoder.encode('"😀"');
    assert.deepEqual(
      view,
      Uint8Array.of(34, 240, 159, 152, 128, 34)
    );
    controller.enqueue(view);
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

字符串流按预期工作

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  start(controller) {
    controller.enqueue('"😀"');
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

10.11 扩展阅读

本节中提到的所有资料都是本章的来源。

本章没有涵盖 Web Streams API 的所有方面。您可以在此处找到更多信息

更多资料