fetch()
读取网页Web Streams 是一个用于 streams 的标准,现在所有主流 Web 平台都支持它:Web 浏览器、Node.js 和 Deno。(Streams 是一种抽象概念,用于从各种来源(文件、托管在服务器上的数据等)顺序读取和写入数据。)
例如,全局函数 fetch()
(用于下载在线资源)异步返回一个 Response,它有一个属性 .body
,其中包含一个 Web Stream。
本章介绍 Node.js 上的 Web Streams,但我们学到的大部分内容都适用于支持它们的 Web 平台。
让我们从概述 Web Streams 的一些基础知识开始。之后,我们将快速进入示例。
Streams 是一种用于访问数据的结构,例如
它们的两大优势是
我们可以处理大量数据,因为 Streams 允许我们将它们分成更小的部分(称为 块),我们可以一次处理一个块。
我们可以在处理不同数据时使用相同的数据结构 Streams。这使得代码更容易重用。
Web Streams(通常省略“Web”)是一个相对较新的标准,它起源于 Web 浏览器,但现在也得到了 Node.js 和 Deno 的支持(如 MDN 兼容性表 所示)。
在 Web Streams 中,块通常是以下两种之一
Web Streams 主要有三种
ReadableStream 用于从 源 读取数据。执行此操作的代码称为 消费者。
WritableStream 用于将数据写入 接收器。执行此操作的代码称为 生产者。
TransformStream 由两个 Streams 组成
其思想是通过“将数据通过管道传输”到 TransformStream 来转换数据。也就是说,我们将数据写入可写端,并从可读端读取转换后的数据。以下 TransformStreams 内置于大多数 JavaScript 平台中(稍后将详细介绍)
TextDecoderStream
将此类数据转换为字符串。TextEncoderStream
将 JavaScript 字符串转换为 UTF-8 数据。CompressionStream
将二进制数据压缩为 GZIP 和其他压缩格式。DecompressionStream
解压缩 GZIP 和其他压缩格式的二进制数据。ReadableStreams、WritableStreams 和 TransformStreams 可用于传输文本或二进制数据。在本章中,我们主要使用前者。字节流(用于二进制数据)将在最后简要介绍。
管道传输 是一种操作,它允许我们将 ReadableStream 管道传输 到 WritableStream:只要 ReadableStream 生成数据,此操作就会读取该数据并将其写入 WritableStream。如果我们只连接两个 Streams,我们就可以得到一种方便的方法来将数据从一个位置传输到另一个位置(例如,复制文件)。但是,我们也可以连接两个以上的 Streams,并获得可以以各种方式处理数据的 管道链。这是一个管道链的示例
ReadableStream 通过将前者管道传输到后者的可写端来连接到 TransformStream。类似地,TransformStream 通过将前者的可读端管道传输到后者的可写端来连接到另一个 TransformStream。TransformStream 通过将前者的可读端管道传输到后者来连接到 WritableStream。
管道链中的一个问题是,成员接收到的数据可能多于其当前可以处理的数据。背压 是一种解决此问题的技术:它使数据接收器能够告诉其发送器,它应该暂时停止发送数据,以便接收器不会过载。
另一种看待背压的方式是将其视为一种信号,该信号通过管道链从过载的成员向后传播到链的开头。例如,请考虑以下管道链
ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream
这就是背压如何通过此链传播的
我们已经到达管道链的开头。因此,ReadableStream 内部不会累积数据(也已缓冲),WritableStream 有时间恢复。一旦恢复,它就会发出信号,表明它已准备好再次接收数据。该信号也会通过链向后传播,直到到达 ReadableStream,数据处理才会恢复。
在第一次了解背压时,为了便于理解,省略了一些细节。这些将在后面介绍。
在 Node.js 中,Web Streams 可从以下两个来源获得
'node:stream/web'
目前,Node.js 中只有一个 API 直接支持 Web Streams – Fetch API
const response = await fetch('https://example.com');
const readableStream = response.body;
对于其他情况,我们需要使用模块 'node:stream'
中的以下静态方法之一,将 Node.js Stream 转换为 Web Stream,反之亦然
Readable.toWeb(nodeReadable)
Readable.fromWeb(webReadableStream, options?)
Writable.toWeb(nodeWritable)
Writable.fromWeb(webWritableStream, options?)
Duplex.toWeb(nodeDuplex)
Duplex.fromWeb(webTransformStream, options?)
另一个 API 部分支持 Web Streams:FileHandles 具有方法 .readableWebStream()
。
ReadableStreams 允许我们从各种来源读取数据块。它们具有以下类型(您可以随意浏览此类型及其属性的说明;当我们在示例中遇到它们时,将再次对其进行说明)
interface ReadableStream<TChunk> {
getReader(): ReadableStreamDefaultReader<TChunk>;
readonly locked: boolean;
Symbol.asyncIterator](): AsyncIterator<TChunk>;
[
cancel(reason?: any): Promise<void>;
pipeTo(
: WritableStream<TChunk>,
destination?: StreamPipeOptions
options: Promise<void>;
)pipeThrough<TChunk2>(
: ReadableWritablePair<TChunk2, TChunk>,
transform?: StreamPipeOptions
options: ReadableStream<TChunk2>;
)
// Not used in this chapter:
tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}
interface StreamPipeOptions {
?: AbortSignal;
signal?: boolean;
preventClose?: boolean;
preventAbort?: boolean;
preventCancel }
这些属性的说明
.getReader()
返回一个 Reader – 一个我们可以通过它从 ReadableStream 读取数据的对象。ReadableStreams 返回 Readers 类似于 可迭代对象 返回迭代器。.locked
:每个 ReadableStream 一次只能有一个活动的 Reader。当一个 Reader 正在使用时,ReadableStream 被锁定,并且无法调用 .getReader()
。[Symbol.asyncIterator](https://exploring.javascript.ac.cn/impatient-js/ch_async-iteration.html)
:此方法使 ReadableStreams 异步可迭代。它目前仅在某些平台上实现。.cancel(reason)
取消 Stream,因为使用者不再对其感兴趣。reason
被传递给 ReadableStream 的 底层源 的 .cancel()
方法(稍后将详细介绍)。此操作完成后,返回的 Promise 将被兑现。.pipeTo()
将其 ReadableStream 的内容馈送到 WritableStream。此操作完成后,返回的 Promise 将被兑现。.pipeTo()
确保背压、关闭、错误等都通过管道链正确传播。我们可以通过其第二个参数指定选项.signal
允许我们将 AbortSignal 传递给此方法,这使我们能够通过 AbortController 中止管道传输。.preventClose
:如果为 true
,则在 ReadableStream 关闭时,它会阻止 WritableStream 关闭。当我们想将多个 ReadableStreams 管道传输到同一个 WritableStream 时,这很有用。.pipeThrough()
将其 ReadableStream 连接到 ReadableWritablePair(大致相当于:TransformStream,稍后将详细介绍)。它返回生成的 ReadableStream(即 ReadableWritablePair 的可读端)。以下小节介绍了使用 ReadableStreams 的三种方法
我们可以使用 Readers 从 ReadableStreams 中读取数据。它们具有以下类型(您可以随意浏览此类型及其属性的说明;当我们在示例中遇到它们时,将再次对其进行说明)
interface ReadableStreamGenericReader {
readonly closed: Promise<undefined>;
cancel(reason?: any): Promise<void>;
}interface ReadableStreamDefaultReader<TChunk>
extends ReadableStreamGenericReader
{releaseLock(): void;
read(): Promise<ReadableStreamReadResult<TChunk>>;
}
interface ReadableStreamReadResult<TChunk> {
: boolean;
done: TChunk | undefined;
value }
这些属性的说明
.closed
:此 Promise 在 Stream 关闭后兑现。如果 Stream 出错或在 Stream 关闭之前释放了 Reader 的锁,则会拒绝该 Promise。.cancel()
:在活动的 Reader 中,此方法取消关联的 ReadableStream。.releaseLock()
取消激活 Reader 并解锁其 Stream。.read()
返回一个 ReadableStreamReadResult(一个包装的块)的 Promise,该块有两个属性.done
是一个布尔值,只要可以读取块,它就为 false
,在最后一个块之后为 true
。.value
是块(或最后一个块之后的 undefined
)。如果您了解迭代的工作原理,ReadableStreamReadResult 可能看起来很熟悉:ReadableStream 类似于可迭代对象,Reader 类似于迭代器,ReadableStreamReadResult 类似于迭代器方法 .next()
返回的对象。
以下代码演示了使用 Reader 的协议
const reader = readableStream.getReader(); // (A)
.equal(readableStream.locked, true); // (B)
asserttry {
while (true) {
const {done, value: chunk} = await reader.read(); // (C)
if (done) break;
// Use `chunk`
}finally {
} .releaseLock(); // (D)
reader }
**获取 Reader。** 我们不能直接从 readableStream
读取,我们首先需要获取一个 *Reader*(A 行)。每个 ReadableStream 最多只能有一个 Reader。获取 Reader 后,readableStream
将被锁定(B 行)。在我们再次调用 .getReader()
之前,我们必须调用 .releaseLock()
(D 行)。
**读取块。** .read()
返回一个 Promise,该 Promise 用于解析为具有属性 .done
和 .value
的对象(C 行)。读取最后一个块后,.done
为 true
。这种方法类似于 JavaScript 中异步迭代的工作方式。
在以下示例中,我们从文本文件 data.txt
中读取块(字符串)
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)
const reader = webReadableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}finally {
} .releaseLock();
reader
}// Output:
// 'Content of text file\n'
我们将 Node.js Readable 转换为 Web ReadableStream(A 行)。然后,我们使用前面解释的协议来读取块。
在下一个示例中,我们将 ReadableStream 的所有块连接成一个字符串并返回它
/**
* Returns a string with the contents of `readableStream`.
*/
async function readableStreamToString(readableStream) {
const reader = readableStream.getReader();
try {
let result = '';
while (true) {
const {done, value} = await reader.read();
if (done) {
return result; // (A)
}+= value;
result
}finally {
} .releaseLock(); // (B)
reader
} }
方便的是,finally
子句始终会被执行,无论我们如何离开 try
子句。也就是说,如果我们返回结果(A 行),则锁将被正确释放(B 行)。
ReadableStream 也可以通过异步迭代来消费
const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
while (true) {
let chunk;
done: exhaustive, value: chunk} = await iterator.next());
({if (exhaustive) break;
console.log(chunk);
}finally {
} // If the loop was terminated before we could iterate exhaustively
// (via an exception or `return`), we must call `iterator.return()`.
// Check if that was the case.
if (!exhaustive) {
.return();
iterator
} }
值得庆幸的是,for-await-of
循环为我们处理了异步迭代的所有细节
for await (const chunk of readableStream) {
console.log(chunk);
}
让我们重做之前从文件中读取文本的尝试。这一次,我们使用异步迭代而不是 Reader
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
const nodeReadable = fs.createReadStream(
'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
console.log(chunk);
}// Output:
// 'Content of text file'
我们之前使用 Reader 来使用 ReadableStream 的内容组装字符串。使用异步迭代,代码变得更简单
/**
* Returns a string with the contents of `readableStream`.
*/
async function readableStreamToString2(readableStream) {
let result = '';
for await (const chunk of readableStream) {
+= chunk;
result
}return result;
}
目前,Node.js 和 Deno 支持对 ReadableStream 的异步迭代,但 Web 浏览器不支持:有一个GitHub issue链接到错误报告。
鉴于尚不清楚如何在浏览器上支持异步迭代,因此包装比 polyfill 更安全。以下代码基于Chromium 错误报告中的建议
async function* getAsyncIterableFor(readableStream) {
const reader = readableStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) return;
yield value;
}finally {
} .releaseLock();
reader
} }
ReadableStream 有两种用于管道传输的方法
readableStream.pipeTo(writeableStream)
同步返回 Promise p
。它异步读取 readableStream
的所有块并将它们写入 writableStream
。完成后,它将 fulfill p
。
当我们探索 WritableStream 时,我们将看到 .pipeTo()
的示例,因为它提供了一种将数据传输到其中的便捷方式。
readableStream.pipeThrough(transformStream)
将 readableStream
管道传输到 transformStream.writable
并返回 transformStream.readable
(每个 TransformStream 都具有引用其可写端和可读端的属性)。查看此操作的另一种方法是,我们通过将 transformStream
连接到 readableStream
来创建一个新的 ReadableStream。
当我们探索 TransformStream 时,我们将看到 .pipeThrough()
的示例,因为此方法是使用它们的主要方式。
如果我们想通过 ReadableStream 读取外部源,我们可以将其包装在适配器对象中,并将该对象传递给 ReadableStream
构造函数。适配器对象称为 ReadableStream 的 *底层源*(排队策略将在后面我们详细介绍背压时进行解释)
new ReadableStream(underlyingSource?, queuingStrategy?)
这是底层源的类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次对其进行解释)
interface UnderlyingSource<TChunk> {
?(
start: ReadableStreamController<TChunk>
controller: void | Promise<void>;
)?(
pull: ReadableStreamController<TChunk>
controller: void | Promise<void>;
)?(reason?: any): void | Promise<void>;
cancel
// Only used in byte streams and ignored in this section:
: 'bytes' | undefined;
type: bigint;
autoAllocateChunkSize }
这是 ReadableStream 调用这些方法的时间
在我们调用 ReadableStream
的构造函数后,将立即调用 .start(controller)
。
每当 ReadableStream 的内部队列中有空间时,就会调用 .pull(controller)
。它会重复调用,直到队列再次满为止。仅在 .start()
完成后才会调用此方法。如果 .pull()
没有排队任何内容,则不会再次调用它。
如果 ReadableStream 的使用者通过 readableStream.cancel()
或 reader.cancel()
取消了它,则会调用 .cancel(reason)
。reason
是传递给这些方法的值。
这些方法中的每一个都可以返回一个 Promise,并且在 Promise 被解决之前不会采取进一步的步骤。如果我们想做一些异步的事情,这很有用。
.start()
和 .pull()
的参数 controller
允许它们访问流。它具有以下类型
type ReadableStreamController<TChunk> =
| ReadableStreamDefaultController<TChunk>
| ReadableByteStreamController<TChunk> // ignored here
;
interface ReadableStreamDefaultController<TChunk> {
enqueue(chunk?: TChunk): void;
readonly desiredSize: number | null;
close(): void;
error(err?: any): void;
}
目前,块是字符串。稍后我们将介绍字节流,其中 Uint8Array 很常见。以下是这些方法的作用
.enqueue(chunk)
将 chunk
添加到 ReadableStream 的内部队列中。.desiredSize
指示 .enqueue()
写入的队列中有多少空间。如果队列已满,则为零;如果队列已超过其最大大小,则为负数。因此,如果所需大小为零或负数,我们必须停止排队。null
。.close()
关闭 ReadableStream。使用者仍然可以清空队列,但之后流将结束。底层源调用此方法非常重要,否则读取其流将永远不会完成。.error(err)
使流处于错误模式:将来与它的所有交互都将失败,并显示错误值 err
。在我们实现底层源的第一个示例中,我们仅提供方法 .start()
。我们将在下一小节中看到 .pull()
的用例。
const readableStream = new ReadableStream({
start(controller) {
.enqueue('First line\n'); // (A)
controller.enqueue('Second line\n'); // (B)
controller.close(); // (C)
controller,
};
})for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'First line\n'
// 'Second line\n'
我们使用控制器创建一个包含两个块的流(A 行和 B 行)。关闭流非常重要(C 行)。否则,for-await-of
循环将永远不会结束!
请注意,这种排队方式并不完全安全:存在超过内部队列容量的风险。我们很快就会看到如何避免这种风险。
一种常见的情况是将推送源或拉取源转换为 ReadableStream。源是推送还是拉取决定了我们如何使用 UnderlyingSource 连接到 ReadableStream
推送源:当有新数据时,此类源会通知我们。我们使用 .start()
来设置侦听器和支持数据结构。如果我们接收到的数据过多,并且所需大小不再为正,则必须告诉源暂停。如果稍后调用 .pull()
,我们可以取消暂停。响应所需大小变为非正数而暂停外部源称为 *应用背压*。
拉取源:我们会向此类源请求新数据,通常是异步的。因此,我们通常不会在 .start()
中做太多事情,而是在每次调用 .pull()
时检索数据。
接下来我们将看到两种源的示例。
在以下示例中,我们围绕套接字包装了一个 ReadableStream,该套接字将其数据推送到我们这里(它调用我们)。此示例取自 Web 流规范
function makeReadableBackpressureSocketStream(host, port) {
const socket = createBackpressureSocket(host, port);
return new ReadableStream({
start(controller) {
.ondata = event => {
socket.enqueue(event.data);
controller
if (controller.desiredSize <= 0) {
// The internal queue is full, so propagate
// the backpressure signal to the underlying source.
.readStop();
socket
};
}
.onend = () => controller.close();
socket.onerror = () => controller.error(
socketnew Error('The socket errored!'));
,
}
pull() {
// This is called if the internal queue has been emptied, but the
// stream’s consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
.readStart();
socket,
}
cancel() {
.close();
socket,
};
}) }
工具函数 iterableToReadableStream()
接受一个块的可迭代对象,并将其转换为 ReadableStream
/**
* @param iterable an iterable (asynchronous or synchronous)
*/
function iterableToReadableStream(iterable) {
return new ReadableStream({
start() {
if (typeof iterable[Symbol.asyncIterator] === 'function') {
this.iterator = iterable[Symbol.asyncIterator]();
else if (typeof iterable[Symbol.iterator] === 'function') {
} this.iterator = iterable[Symbol.iterator]();
else {
} throw new Error('Not an iterable: ' + iterable);
},
}
async pull(controller) {
if (this.iterator === null) return;
// Sync iterators return non-Promise values,
// but `await` doesn’t mind and simply passes them on
const {value, done} = await this.iterator.next();
if (done) {
this.iterator = null;
.close();
controllerreturn;
}.enqueue(value);
controller,
}
cancel() {
this.iterator = null;
.close();
controller,
};
}) }
让我们使用异步生成器函数创建一个异步可迭代对象,并将该可迭代对象转换为 ReadableStream
async function* genAsyncIterable() {
yield 'how';
yield 'are';
yield 'you';
}const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'how'
// 'are'
// 'you'
iterableToReadableStream()
也适用于同步可迭代对象
const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
console.log(chunk);
}
// Output:
// 'hello'
// 'everyone'
最终可能会有一个静态辅助方法 ReadableStream.from()
来提供此功能(有关更多信息,请参阅其拉取请求)。
WritableStream 允许我们将数据块写入各种接收器。它们具有以下类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次对其进行解释)
interface WritableStream<TChunk> {
getWriter(): WritableStreamDefaultWriter<TChunk>;
readonly locked: boolean;
close(): Promise<void>;
abort(reason?: any): Promise<void>;
}
这些属性的说明
.getWriter()
返回一个 Writer,这是一个我们可以通过它写入 WritableStream 的对象。.locked
:每个 WritableStream 一次只能有一个活动的 Writer。当一个 Writer 正在使用时,WritableStream 将被锁定,并且无法调用 .getWriter()
。.close()
关闭流.abort()
中止流以下小节介绍了将数据发送到 WritableStream 的两种方法
我们可以使用 *Writer* 写入 WritableStream。它们具有以下类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次对其进行解释)
interface WritableStreamDefaultWriter<TChunk> {
readonly desiredSize: number | null;
readonly ready: Promise<undefined>;
write(chunk?: TChunk): Promise<void>;
releaseLock(): void;
close(): Promise<void>;
readonly closed: Promise<undefined>;
abort(reason?: any): Promise<void>;
}
这些属性的说明
.desiredSize
指示此 WriteStream 的队列中有多少空间。如果队列已满,则为零;如果队列已超过其最大大小,则为负数。因此,如果所需大小为零或负数,我们必须停止写入。
null
。.ready
返回一个 Promise,当所需大小从非正数变为正数时,该 Promise 将被 fulfill。这意味着没有活动的背压,可以写入数据。如果所需大小稍后变回非正数,则会创建一个新的待处理 Promise 并返回。
.write()
将块写入流。如果写入成功,它将返回一个被 fulfill 的 Promise;如果出现错误,则返回一个被拒绝的 Promise。
.releaseLock()
释放 Writer 对其流的锁定。
.close()
与关闭 Writer 的流具有相同的效果。
.closed
返回一个 Promise,当流关闭时,该 Promise 将被 fulfill。
.abort()
与中止 Writer 的流具有相同的效果。
以下代码显示了使用 Writer 的协议
const writer = writableStream.getWriter(); // (A)
.equal(writableStream.locked, true); // (B)
asserttry {
// Writing the chunks (explained later)
finally {
} .releaseLock(); // (C)
writer }
我们不能直接写入 writableStream
,我们首先需要获取一个 *Writer*(A 行)。每个 WritableStream 最多只能有一个 Writer。获取 Writer 后,writableStream
将被锁定(B 行)。在我们再次调用 .getWriter()
之前,我们必须调用 .releaseLock()
(C 行)。
有三种写入块的方法。
.write()
(低效地处理背压)第一种写入方法是等待 .write()
的每个结果
await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();
当我们传递给 .write()
的块成功写入后,它返回的 Promise 将被 fulfill。“成功写入”的确切含义取决于 WritableStream 的实现方式,例如,对于文件流,该块可能已发送到操作系统,但仍驻留在缓存中,因此实际上尚未写入磁盘。
当流关闭时,.close()
返回的 Promise 将被 fulfill。
这种写入方法的一个缺点是,等到写入成功意味着不使用队列。因此,数据吞吐量可能会降低。
.write()
拒绝(忽略背压)在第二种写入方法中,我们忽略 .write()
返回的 Promise,只等待 .close()
返回的 Promise。
.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
writerawait writer.close(); // reports errors
.write()
的同步调用将块添加到 WritableStream 的内部队列中。通过不等待返回的 Promise,我们不会等到每个块都被写入。但是,等待 .close()
确保在继续之前队列为空并且所有写入都已成功。
在 A 行和 B 行调用 .catch()
是必要的,以避免在写入过程中出现问题时出现有关未处理的 Promise 拒绝的警告。此类警告通常会记录到控制台。我们可以忽略 .write()
报告的错误,因为 .close()
也会向我们报告这些错误。
可以使用一个忽略 Promise 拒绝的辅助函数来改进前面的代码。
ignoreRejections(
.write('Chunk 1'),
writer.write('Chunk 2'),
writer;
)await writer.close(); // reports errors
function ignoreRejections(...promises) {
for (const promise of promises) {
.catch(() => {});
promise
} }
这种方法的一个缺点是忽略了背压:我们只是假设队列足够大,可以容纳我们写入的所有内容。
.ready
(有效处理背压)在这种写入方法中,我们通过等待 Writer getter .ready
来有效地处理背压。
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
.write('Chunk 1').catch(() => {});
writer
await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
.write('Chunk 2').catch(() => {});
writer
await writer.close(); // reports errors
每当流从有背压转换到没有背压时,.ready
中的 Promise 就会兑现。
在本例中,我们通过 WritableStream 创建一个文本文件 data.txt
。
import * as fs from 'node:fs';
import {Writable} from 'node:stream';
const nodeWritable = fs.createWriteStream(
'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)
const writer = webWritableStream.getWriter();
try {
await writer.write('First line\n');
await writer.write('Second line\n');
await writer.close();
finally {
} .releaseLock()
writer }
在 A 行,我们为文件 data.txt
创建一个 Node.js 流。在 B 行,我们将此流转换为 Web 流。然后我们使用 Writer 向其中写入字符串。
除了使用 Writer 之外,我们还可以通过将 ReadableStream 管道传输到 WritableStream 来写入 WritableStream。
await readableStream.pipeTo(writableStream);
当管道传输成功完成时,.pipeTo()
返回的 Promise 就会兑现。
管道传输在当前任务完成或暂停后执行。以下代码演示了这一点。
const readableStream = new ReadableStream({ // (A)
start(controller) {
.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
controller,
};
})const writableStream = new WritableStream({ // (B)
write(chunk) {
console.log('WRITE: ' + JSON.stringify(chunk));
,
}close() {
console.log('CLOSE WritableStream');
,
};
})
console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
.then(() => console.log('Promise fulfilled'));
promiseconsole.log('After .pipeTo()');
// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'
在 A 行,我们创建一个 ReadableStream。在 B 行,我们创建一个 WritableStream。
我们可以看到 .pipeTo()
(C 行)立即返回。在新任务中,读取和写入块。然后关闭 writableStream
,最后兑现 promise
。
在以下示例中,我们为文件创建一个 WritableStream,并将 ReadableStream 管道传输到该文件。
const webReadableStream = new ReadableStream({ // (A)
async start(controller) {
.enqueue('First line\n');
controller.enqueue('Second line\n');
controller.close();
controller,
};
})
const nodeWritable = fs.createWriteStream( // (B)
'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)
await webReadableStream.pipeTo(webWritableStream); // (D)
在 A 行,我们创建一个 ReadableStream。在 B 行,我们为文件 data.txt
创建一个 Node.js 流。在 C 行,我们将此流转换为 Web 流。在 D 行,我们将 webReadableStream
管道传输到文件的 WritableStream。
在以下示例中,我们将两个 ReadableStream 写入一个 WritableStream。
function createReadableStream(prefix) {
return new ReadableStream({
async start(controller) {
.enqueue(prefix + 'chunk 1');
controller.enqueue(prefix + 'chunk 2');
controller.close();
controller,
};
})
}
const writableStream = new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
,
}close() {
console.log('CLOSE');
,
}abort(err) {
console.log('ABORT ' + err);
,
};
})
await createReadableStream('Stream 1: ')
.pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
.pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();
// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'
我们告诉 .pipeTo()
在 ReadableStream 关闭后不要关闭 WritableStream(A 行和 B 行)。因此,WritableStream 在 A 行之后保持打开状态,我们可以将另一个 ReadableStream 管道传输到它。
如果我们想通过 WritableStream 写入外部接收器,我们可以将其包装在一个适配器对象中,并将该对象传递给 WritableStream
构造函数。适配器对象称为 WritableStream 的*底层接收器*(队列策略将在后面我们详细介绍背压时解释)。
new WritableStream(underlyingSink?, queuingStrategy?)
这是底层接收器的类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次解释它们)。
interface UnderlyingSink<TChunk> {
?(
start: WritableStreamDefaultController
controller: void | Promise<void>;
)?(
write: TChunk,
chunk: WritableStreamDefaultController
controller: void | Promise<void>;
)?(): void | Promise<void>;;
close?(reason?: any): void | Promise<void>;
abort }
这些属性的说明
在我们调用 WritableStream
的构造函数后,将立即调用 .start(controller)
。如果我们执行异步操作,则可以返回 Promise。在此方法中,我们可以为写入做准备。
当一个新块准备好写入外部接收器时,将调用 .write(chunk, controller)
。我们可以通过返回一个 Promise 来施加背压,该 Promise 在背压消失后兑现。
在调用 writer.close()
并且所有排队的写入都成功后,将调用 .close()
。在此方法中,我们可以在写入后进行清理。
如果调用了 writeStream.abort()
或 writer.abort()
,则会调用 .abort(reason)
。reason
是传递给这些方法的值。
.start()
和 .write()
的参数 controller
允许它们使 WritableStream 出错。它具有以下类型。
interface WritableStreamDefaultController {
readonly signal: AbortSignal;
error(err?: any): void;
}
.signal
是一个 AbortSignal,如果我们想在流中止时中止写入或关闭操作,我们可以监听它。.error(err)
使 WritableStream 出错:它被关闭,并且与它的所有未来交互都将失败,并出现错误值 err
。在下一个示例中,我们将 ReadableStream 管道传输到 WritableStream,以便检查 ReadableStream 如何生成块。
const readableStream = new ReadableStream({
start(controller) {
.enqueue('First chunk');
controller.enqueue('Second chunk');
controller.close();
controller,
};
})await readableStream.pipeTo(
new WritableStream({
write(chunk) {
console.log('WRITE ' + JSON.stringify(chunk));
,
}close() {
console.log('CLOSE');
,
}abort(err) {
console.log('ABORT ' + err);
,
}
});
)// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'
在下一个示例中,我们创建 WriteStream
的一个子类,该子类在字符串中收集所有写入的块。我们可以通过方法 .getString()
访问该字符串。
class StringWritableStream extends WritableStream {
= '';
#string constructor() {
super({
// We need to access the `this` of `StringWritableStream`.
// Hence the arrow function (and not a method).
write: (chunk) => {
this.#string += chunk;
,
};
})
}getString() {
return this.#string;
}
}const stringStream = new StringWritableStream();
const writer = stringStream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
finally {
} .releaseLock()
writer
}.equal(
assert.getString(),
stringStream'How are you?'
; )
这种方法的一个缺点是我们混合了两个 API:WritableStream
的 API 和我们新的字符串流 API。另一种方法是委托给 WritableStream,而不是扩展它。
function StringcreateWritableStream() {
let string = '';
return {
stream: new WritableStream({
write(chunk) {
+= chunk;
string ,
},
})getString() {
return string;
,
};
}
}
const stringStream = StringcreateWritableStream();
const writer = stringStream.stream.getWriter();
try {
await writer.write('How are');
await writer.write(' you?');
await writer.close();
finally {
} .releaseLock()
writer
}.equal(
assert.getString(),
stringStream'How are you?'
; )
此功能也可以通过类(而不是作为对象的工厂函数)来实现。
TransformStream
使用 TransformStream 最常见的方式是“管道传输”它们。
const transformedStream = readableStream.pipeThrough(transformStream);
.pipeThrough()
将 readableStream
管道传输到 transformStream
的可写端,并返回其可读端。换句话说:我们创建了一个新的 ReadableStream,它是 readableStream
的转换版本。
.pipeThrough()
不仅接受 TransformStream,还接受任何具有以下形状的对象。
interface ReadableWritablePair<RChunk, WChunk> {
: ReadableStream<RChunk>;
readable: WritableStream<WChunk>;
writable }
Node.js 支持以下标准 TransformStream。
TextEncoderStream
和 TextDecoderStream
TextDecoderStream
可以正确处理这些情况。TextEncoderStream
、TextDecoderStream
)。CompressionStream
、DecompressionStream
deflate
(ZLIB 压缩数据格式)、deflate-raw
(DEFLATE 算法)、gzip
(GZIP 文件格式)。CompressionStream
、DecompressionStream
)。在以下示例中,我们解码 UTF-8 编码的字节流。
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
response.body
是一个 ReadableByteStream,其块是 Uint8Array
(TypedArray)的实例。我们将该流通过 TextDecoderStream
进行管道传输,以获得具有字符串块的流。
请注意,单独转换每个字节块(例如,通过 TextDecoder
)是行不通的,因为 单个 Unicode 代码点在 UTF-8 中最多编码为四个字节,并且这些字节可能不在同一个块中。
以下 Node.js 模块记录通过标准输入发送给它的所有内容。
// echo-stdin.mjs
import {Readable} from 'node:stream';
const webStream = Readable.toWeb(process.stdin)
.pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
console.log('>>>', chunk);
}
我们可以通过存储在 process.stdin
中的流访问标准输入(process
是一个全局 Node.js 变量)。如果我们没有为此流设置编码并通过 Readable.toWeb()
转换它,我们将获得一个字节流。我们将其通过 TextDecoderStream 进行管道传输,以获得文本流。
请注意,我们增量处理标准输入:一旦另一个块可用,我们就记录它。换句话说,我们不会等到标准输入完成。当数据很大或仅间歇发送时,这很有用。
我们可以通过将 Transformer 对象传递给 TransformStream
的构造函数来实现自定义 TransformStream。此类对象具有以下类型(您可以随意浏览此类型及其属性的解释;当我们在示例中遇到它们时,将再次解释它们)。
interface Transformer<TInChunk, TOutChunk> {
?(
start: TransformStreamDefaultController<TOutChunk>
controller: void | Promise<void>;
)?(
transform: TInChunk,
chunk: TransformStreamDefaultController<TOutChunk>
controller: void | Promise<void>;
)?(
flush: TransformStreamDefaultController<TOutChunk>
controller: void | Promise<void>;
) }
这些属性的说明
TransformStream
的构造函数后,将立即调用 .start(controller)
。在这里,我们可以在转换开始之前准备一些事情。.transform(chunk, controller)
执行实际的转换。它接收一个输入块,并可以使用其参数 controller
将一个或多个转换后的输出块排队。它也可以选择根本不排队任何内容。.flush(controller)
。在这里,我们可以在转换完成后执行清理工作。这些方法中的每一个都可以返回一个 Promise,并且在 Promise 被解决之前不会采取进一步的步骤。如果我们想做一些异步的事情,这很有用。
参数 controller
具有以下类型。
interface TransformStreamDefaultController<TOutChunk> {
enqueue(chunk?: TOutChunk): void;
readonly desiredSize: number | null;
terminate(): void;
error(err?: any): void;
}
.enqueue(chunk)
将 chunk
添加到 TransformStream 的可读端(输出)。.desiredSize
返回 TransformStream 的可读端(输出)的内部队列的所需大小。.terminate()
关闭可读端(输出)并使 TransformStream 的可写端(输入)出错。如果转换器对可写端(输入)的剩余块不感兴趣并希望跳过它们,则可以使用它。.error(err)
使 TransformStream 出错:与它的所有未来交互都将失败,并出现错误值 err
。TransformStream 中的背压怎么办?该类将其可读端(输出)的背压传播到其可写端(输入)。假设是转换不会改变太多数据量。因此,转换可以忽略背压。但是,可以通过 transformStreamDefaultController.desiredSize
检测到它,并通过从 transformer.transform()
返回 Promise 来传播它。
以下 TransformStream
的子类将具有任意块的流转换为每个块恰好包含一行文本的流。也就是说,除了最后一个块之外,每个块都以行尾 (EOL) 字符串结尾:在 Unix(包括 macOS)上为 '\n'
,在 Windows 上为 '\r\n'
。
class ChunksToLinesTransformer {
= '';
#previous
transform(chunk, controller) {
let startSearch = this.#previous.length;
this.#previous += chunk;
while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = this.#previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = this.#previous.slice(0, eolIndex+1);
.enqueue(line);
controllerthis.#previous = this.#previous.slice(eolIndex+1);
= 0;
startSearch
}
}
flush(controller) {
// Clean up and enqueue any text we’re still holding on to
if (this.#previous.length > 0) {
.enqueue(this.#previous);
controller
}
}
}class ChunksToLinesStream extends TransformStream {
constructor() {
super(new ChunksToLinesTransformer());
}
}
const stream = new ReadableStream({
async start(controller) {
.enqueue('multiple\nlines of\ntext');
controller.close();
controller,
};
})const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);
for await (const line of transformed) {
console.log('>>>', JSON.stringify(line));
}
// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'
请注意,Deno 的内置 TextLineStream
提供了类似的功能。
提示:我们也可以通过异步生成器进行此转换。它将异步迭代 ReadableStream 并返回一个具有行的异步可迭代对象。其实现在 §9.4 “通过异步生成器转换可读流” 中给出。
由于 ReadableStream 是异步可迭代的,因此我们可以使用 异步生成器 来转换它们。这会导致非常优雅的代码。
const stream = new ReadableStream({
async start(controller) {
.enqueue('one');
controller.enqueue('two');
controller.enqueue('three');
controller.close();
controller,
};
})
async function* prefixChunks(prefix, asyncIterable) {
for await (const chunk of asyncIterable) {
yield '> ' + chunk;
}
}
const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
console.log(transformedChunk);
}
// Output:
// '> one'
// '> two'
// '> three'
让我们仔细看看背压。考虑以下管道链。
.pipeThrough(ts).pipeTo(ws); rs
rs
是一个 ReadableStream,ts
是一个 TransformStream,ws
是一个 WritableStream。这些是由前面的表达式创建的连接(.pipeThrough
使用 .pipeTo
将 rs
连接到 ts
的可写端)。
rs -pipeTo-> ts{writable,readable} -pipeTo-> ws
观察结果
rs
的底层源可以看作是位于 rs
之前的管道链成员。ws
的底层接收器可以看作是位于 ws
之后的管道链成员。假设 ws
的底层接收器速度很慢,并且 ws
的缓冲区最终已满。然后会发生以下步骤
ws
发出信号表示它已满。pipeTo
停止从 ts.readable
读取。ts.readable
发出信号表示它已满。ts
停止将块从 ts.writable
移动到 ts.readable
。ts.writable
发出信号表示它已满。pipeTo
停止从 rs
读取。rs
向其底层源发出信号表示它已满。此示例说明我们需要两种功能
让我们探讨一下这些功能是如何在 Web Streams API 中实现的。
背压信号由接收数据的实体发出。Web Streams 有两个这样的实体
.write()
接收数据。.enqueue()
时,ReadableStream 接收数据。在这两种情况下,输入都通过队列进行缓冲。应用背压的信号是队列已满。让我们看看如何检测到它。
以下是队列的位置
队列的*期望大小*是一个数字,表示队列中剩余的空间量
因此,如果期望大小为零或更小,我们必须应用背压。它可以通过包含队列的对象的 getter .desiredSize
获得。
期望大小是如何计算的?通过一个指定了所谓*排队策略*的对象。ReadableStream
和 WritableStream
具有默认的排队策略,可以通过其构造函数的可选参数覆盖。接口 QueuingStrategy
有两个属性
.size(chunk)
返回 chunk
的大小。.highWaterMark
指定队列的最大大小。队列的期望大小是高水位线减去队列的当前大小。
发送数据的实体需要通过施加背压来响应发出的背压信号。
我们可以等待 writer.ready
中的 Promise。当我们这样做时,我们会被阻塞,并实现所需的背压。一旦队列中有空间,Promise 就会被兑现。当 writer.desiredSize
的值大于零时,就会触发兑现。
或者,我们可以等待 writer.write()
返回的 Promise。如果我们这样做,队列甚至不会被填满。
如果我们愿意,我们还可以根据 writer.desiredSize
来确定块的大小。
可以传递给 ReadableStream 的底层源对象包装了一个外部源。在某种程度上,它也是管道链的成员;一个位于其 ReadableStream 之前的成员。
只有当队列中有空间时,才会请求底层拉取源提供新数据。如果没有空间,则会自动施加背压,因为没有数据被拉取。
底层推送源应在排队某些内容后检查 controller.desiredSize
:如果它为零或更小,则应通过暂停其外部源来施加背压。
可以传递给 WritableStream 的底层接收器对象包装了一个外部接收器。在某种程度上,它也是管道链的成员;一个位于其 WritableStream 之后的成员。
每个外部接收器发出背压信号的方式都不同(在某些情况下根本不会发出)。底层接收器可以通过从方法 .write()
返回一个 Promise 来施加背压,该 Promise 在写入完成后兑现。Web Streams 标准中有一个示例,演示了它是如何工作的。
.writable
→
.readable
)TransformStream 通过为前者实现底层接收器,为后者实现底层源,将其可写端与其可读端连接起来。它有一个内部槽 .[[backpressure]]
,指示当前是否激活了内部背压。
可写端的底层接收器的方法 .write()
会异步等待,直到没有内部背压,然后才会将另一个块馈送到 TransformStream 的转换器(Web Streams 标准:TransformStreamDefaultSinkWriteAlgorithm
)。然后,转换器可以通过其 TransformStreamDefaultController 排队某些内容。请注意,.write()
返回一个 Promise,该 Promise 在方法完成后兑现。在此之前,WriteStream 会通过其队列缓冲传入的写入请求。因此,可写端的背压是通过该队列及其期望大小来发出的。
如果通过 TransformStreamDefaultController 对块进行排队,并且可读端的队列已满,则会激活 TransformStream 的背压(Web Streams 标准:TransformStreamDefaultControllerEnqueue
)。
如果从 Reader 中读取了一些内容,则 TransformStream 的背压可能会被停用(Web Streams 标准:ReadableStreamDefaultReaderRead
)
.pull()
了(Web Streams 标准:.[[PullSteps]]
)。.pull()
会停用背压(Web Streams 标准:TransformStreamDefaultSourcePullAlgorithm
)。.pipeTo()
(ReadableStream →
WritableStream).pipeTo()
通过 Reader 从 ReadableStream 中读取块,并通过 Writer 将它们写入 WritableStream。每当 writer.desiredSize
为零或更小时,它都会暂停(Web Streams 标准:ReadableStreamPipeTo
的步骤 15)。
到目前为止,我们只处理了*文本流*,即块为字符串的流。但是 Web Streams API 也支持用于二进制数据的*字节流*,其中块是 Uint8Arrays(TypedArrays)
ReadableStream
有一种特殊的 'bytes'
模式。WritableStream
本身并不关心块是字符串还是 Uint8Arrays。因此,实例是文本流还是字节流取决于底层接收器可以处理哪种块。TransformStream
可以处理哪种块也取决于其 Transformer。接下来,我们将学习如何创建可读字节流。
ReadableStream
构造函数创建哪种流取决于其可选的第一个参数 underlyingSource
的可选属性 .type
如果省略 .type
或未提供底层源,则新实例为文本流。
如果 .type
是字符串 'bytes'
,则新实例为字节流
const readableByteStream = new ReadableStream({
type: 'bytes',
async start() { /*...*/ }
// ...
; })
如果 ReadableStream 处于 'bytes'
模式,会发生什么变化?
在默认模式下,底层源可以返回任何类型的块。在字节模式下,块必须是 ArrayBufferViews,即 TypedArrays(例如 Uint8Arrays)或 DataViews。
此外,可读字节流可以创建两种 Reader
.getReader()
返回 ReadableStreamDefaultReader
的实例。.getReader({mode: 'byob'})
返回 ReadableStreamBYOBReader
的实例。“BYOB”代表“自带缓冲区”,这意味着我们可以将缓冲区(ArrayBufferView)传递给 reader.read()
。之后,该 ArrayBufferView 将被分离,不再可用。但是 .read()
会在一个新的 ArrayBufferView 中返回其数据,该 ArrayBufferView 具有相同的类型并访问同一个 ArrayBuffer 的同一个区域。
此外,可读字节流具有不同的控制器:它们是 ReadableByteStreamController
的实例(相对于 ReadableStreamDefaultController
)。除了强制底层源对 ArrayBufferViews(TypedArrays 或 DataViews)进行排队之外,它还通过其属性 .byobRequest
支持 ReadableStreamBYOBReaders。底层源将其数据写入存储在此属性中的 BYOBRequest。Web Streams 标准在其“创建流的示例”部分中有两个使用 .byobRequest
的示例。
在下一个示例中,我们将创建一个无限的可读字节流,用随机数据填充其块(灵感来自:“在 Node.js 中实现 Web Streams API”中的 example4.mjs
)。
import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);
const readableByteStream = new ReadableStream({
type: 'bytes',
async pull(controller) {
const byobRequest = controller.byobRequest;
await asyncRandomFill(byobRequest.view);
.respond(byobRequest.view.byteLength);
byobRequest,
};
})
const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);
由于 readableByteStream
是无限的,我们无法对其进行循环。这就是我们只读取其第一个块的原因(B 行)。
我们在 A 行创建的缓冲区被传输,因此在 B 行之后不可读。
在以下示例中,我们将创建一个可读字节流,并将其通过一个流进行管道传输,该流将其压缩为 GZIP 格式
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// 256 zeros
.enqueue(new Uint8Array(256));
controller.close();
controller,
};
})const transformedStream = readableByteStream.pipeThrough(
new CompressionStream('gzip'));
await logChunks(transformedStream);
async function logChunks(readableByteStream) {
const reader = readableByteStream.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) break;
console.log(value);
}finally {
} .releaseLock();
reader
} }
fetch()
读取网页fetch()
的结果解析为一个响应对象,其属性 .body
是一个可读字节流。我们通过 TextDecoderStream
将该字节流转换为文本流
const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
console.log(stringChunk);
}
Node.js 是唯一支持以下帮助程序函数的 Web 平台,它将其称为实用程序消费者
import {
,
arrayBuffer,
blob,
buffer,
json,
textfrom 'node:stream/consumers'; }
这些函数将 Web ReadableStreams、Node.js Readables 和 AsyncIterators 转换为 Promise,这些 Promise 将使用以下内容兑现
arrayBuffer()
)blob()
)buffer()
)json()
)text()
)二进制数据假定为 UTF-8 编码
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
type: 'bytes',
start(controller) {
// TextEncoder converts strings to UTF-8 encoded Uint8Arrays
const encoder = new TextEncoder();
const view = encoder.encode('"😀"');
.deepEqual(
assert,
viewUint8Array.of(34, 240, 159, 152, 128, 34)
;
).enqueue(view);
controller.close();
controller,
};
})const jsonData = await streamConsumers.json(readableByteStream);
.equal(jsonData, '😀'); assert
字符串流按预期工作
import * as streamConsumers from 'node:stream/consumers';
const readableByteStream = new ReadableStream({
start(controller) {
.enqueue('"😀"');
controller.close();
controller,
};
})const jsonData = await streamConsumers.json(readableByteStream);
.equal(jsonData, '😀'); assert
本节中提到的所有资料都是本章的来源。
本章没有涵盖 Web Streams API 的所有方面。您可以在此处找到更多信息
更多资料