readableToString()
for-await-of
从可读流中读取块'node:readlines'
从可读流中读取行本章介绍 Node 的原生流。它们支持 异步迭代,这使得它们更易于使用,这也是我们在本章中主要使用的方式。
请注意,跨平台的*Web 流*在 §10 “在 Node.js 上使用 Web 流” 中介绍。我们将在本书中主要使用这些。因此,如果您愿意,可以跳过本章。
异步迭代 是一种用于异步检索数据容器内容的协议(意味着当前“任务”在检索项目之前可能会暂停)。
异步生成器 有助于异步迭代。例如,这是一个异步生成器函数
/**
* @returns an asynchronous iterable
*/
async function* asyncGenerator(asyncIterable) {
for await (const item of asyncIterable) { // input
if (···) {
yield '> ' + item; // output
}
} }
for-await-of
循环迭代输入 asyncIterable
。此循环也可在正常的异步函数中使用。yield
将值馈送到此生成器返回的异步迭代器中。在本章的其余部分,请密切注意函数是异步函数还是异步生成器函数
/** @returns a Promise */
async function asyncFunction() { /*···*/ }
/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }
流是一种模式,其核心思想是“分而治之”大量数据:如果我们将数据分成较小的部分并一次处理一部分,我们就可以处理它。
Node.js 支持多种流——例如
*可读流*是可以从中读取数据的流。换句话说,它们是数据源。例如*可读文件流*,它允许我们读取文件的内容。
*可写流*是可以写入数据的流。换句话说,它们是数据的接收器。例如*可写文件流*,它允许我们将数据写入文件。
*转换流*既可读又可写。作为可写流,它接收数据片段,对其进行*转换*(更改或丢弃),然后将它们作为可读流输出。
要在多个步骤中处理流数据,我们可以对流进行*管道*(连接)
第 (2) 部分是可选的。
创建文本流时,最好始终指定编码
Node.js 文档有 支持的编码及其默认拼写列表——例如
'utf8'
'utf16le'
'base64'
也允许使用几种不同的拼写。您可以使用 Buffer.isEncoding()
来检查哪些是
> buffer.Buffer.isEncoding('utf8')true
> buffer.Buffer.isEncoding('utf-8')true
> buffer.Buffer.isEncoding('UTF-8')true
> buffer.Buffer.isEncoding('UTF:8')false
编码的默认值为 null
,相当于 'utf8'
。
readableToString()
我们偶尔会使用以下辅助函数。您不需要了解它是如何工作的,只需要(大致)了解它的作用。
import * as stream from 'stream';
/**
* Reads all the text in a readable stream and returns it as a string,
* via a Promise.
* @param {stream.Readable} readable
*/
function readableToString(readable) {
return new Promise((resolve, reject) => {
let data = '';
.on('data', function (chunk) {
readable+= chunk;
data ;
}).on('end', function () {
readableresolve(data);
;
}).on('error', function (err) {
readablereject(err);
;
});
}) }
此函数是通过基于事件的 API 实现的。稍后我们将看到一种更简单的方法——通过异步迭代。
await
的情况。在这种情况下,我们假设我们 在模块内部 或异步函数的主体内部。'\n'
(LF)'\r\n'
(CR LF)os
中的 常量 EOL
访问当前平台的换行符。我们可以使用 fs.createReadStream()
来创建可读流
import * as fs from 'fs';
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
.equal(
assertawait readableToString(readableStream),
'This is a test!\n');
Readable.from()
:从迭代器创建可读流静态方法 Readable.from(iterable, options?)
创建一个可读流,其中包含 iterable
中的数据。 iterable
可以是同步迭代器或异步迭代器。参数 options
是可选的,除其他外,可用于指定文本编码。
import * as stream from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
.equal(
assertawait readableToString(readableStream),
'One line\nAnother line\n');
Readable.from()
接受任何迭代器,因此也可用于将字符串转换为流
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
.equal(
assertawait readableToString(readable),
'Some text!');
目前,Readable.from()
将字符串视为任何其他迭代器,因此会迭代其代码点。从性能角度来看,这并不理想,但对于大多数用例来说应该没问题。我预计 Readable.from()
会经常与字符串一起使用,因此将来可能会进行优化。
for-await-of
从可读流中读取块每个可读流都是异步可迭代的,这意味着我们可以使用 for-await-of
循环来读取其内容
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'
以下函数是本章开头我们看到的函数的更简单的重新实现。
import {Readable} from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
+= chunk;
result
}return result;
}
const readable = Readable.from('Good morning!', {encoding: 'utf8'});
.equal(await readableToString2(readable), 'Good morning!'); assert
请注意,在这种情况下,我们必须使用异步函数,因为我们想返回一个 Promise。
'node:readlines'
从可读流中读取行内置模块 'node:readline'
允许我们从可读流中读取行
import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';
const filePath = process.argv[2]; // first command line argument
const rl = readline.createInterface({
input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
;
})for await (const line of rl) {
console.log('>', line);
}.close(); rl
异步迭代为转换流提供了一种优雅的替代方案,用于在多个步骤中处理流数据
Readable.from()
将其转换为可读流(稍后可以将其管道传输到可写流)。总而言之,这些是此类处理管道的组成部分
在下一个示例中,我们将看到一个处理管道的示例,正如刚刚解释的那样。
import {Readable} from 'stream';
/**
* @param chunkIterable An asynchronous or synchronous iterable
* over “chunks” (arbitrary strings)
* @returns An asynchronous iterable over “lines”
* (strings with at most one newline that always appears at the end)
*/
async function* chunksToLines(chunkIterable) {
let previous = '';
for await (const chunk of chunkIterable) {
let startSearch = previous.length;
+= chunk;
previous while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = previous.slice(0, eolIndex+1);
yield line;
= previous.slice(eolIndex+1);
previous = 0;
startSearch
}
}if (previous.length > 0) {
yield previous;
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1;
for await (const line of lineIterable) {
yield lineNumber + ' ' + line;
++;
lineNumber
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line);
}
}
const chunks = Readable.from(
'Text with\nmultiple\nlines.\n',
encoding: 'utf8'});
{await logLines(numberLines(chunksToLines(chunks))); // (A)
// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'
处理管道在 A 行中设置。步骤如下
chunksToLines()
:从具有块的异步迭代器到具有行的异步迭代器。numberLines()
:从具有行的异步迭代器到具有编号行的异步迭代器。logLines()
:记录异步迭代器中的项目。观察
chunksToLines()
和 numberLines()
的输入和输出都是异步迭代器。这就是为什么它们是异步生成器(如 async
和 *
所示)。logLines()
的输入是异步迭代器。这就是为什么它是一个异步函数(如 async
所示)。我们可以使用 fs.createWriteStream()
来创建可写流
const writableStream = fs.createWriteStream(
'tmp/log.txt', {encoding: 'utf8'});
在本节中,我们将研究写入可写流的方法
.write()
直接写入可写流。stream
中的函数 pipeline()
将可读流管道传输到可写流。为了演示这些方法,我们使用它们来实现相同的函数 writeIterableToFile()
。
可读流的方法 .pipe()
也支持管道传输,但它有一个缺点,最好避免使用它。
writable.write(chunk)
在将数据写入流时,有两种基于回调的机制可以帮助我们
'drain'
表示背压结束。finished()
在流在以下示例中,我们对这些机制进行了 Promise 化,以便我们可以通过异步函数使用它们
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// Handle backpressure
await once(writable, 'drain');
}
}.end(); // (C)
writable// Wait until done. Throws if there are errors.
await finished(writable);
}
await writeIterableToFile(
'One', ' line of text.\n'], 'tmp/log.txt');
[.equal(
assert.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
fs'One line of text.\n');
stream.finished()
的默认版本是基于回调的,但可以通过 util.promisify()
转换为基于 Promise 的版本(A 行)。
我们使用了以下两种模式
在处理背压的同时写入可写流(B 行)
if (!writable.write(chunk)) {
await once(writable, 'drain');
}
关闭可写流并等待写入完成(C 行)
.end();
writableawait finished(writable);
stream.pipeline()
将可读流管道传输到可写流在 A 行中,我们使用 stream.pipeline()
的 Promise 化版本将可读流 readable
管道传输到可写流 writable
import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(
, {encoding: 'utf8'});
iterableconst writable = fs.createWriteStream(filePath);
await pipeline(readable, writable); // (A)
}await writeIterableToFile(
'One', ' line of text.\n'], 'tmp/log.txt');
[// ···
readable.pipe(destination)
方法 readable.pipe()
也支持管道传输,但有 一个警告:如果可读流发出错误,则可写流不会自动关闭。 pipeline()
没有这个警告。
模块 os
const EOL: string
(自 0.7.8 起)
包含当前平台使用的行尾字符序列。
模块 buffer
Buffer.isEncoding(encoding: string): boolean
(自 0.9.1 起)
如果 encoding
正确命名了文本支持的 Node.js 编码之一,则返回 true
。 支持的编码 包括
'utf8'
'utf16le'
'ascii'
'latin1
'base64'
'hex'
(每个字节作为两个十六进制字符)模块 stream
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any>
(自 10.0.0 起)
可读流是异步可迭代的。例如,您可以在异步函数或异步生成器中使用 for-await-of
循环来迭代它们。
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void>
(自 10.0.0 起)
当读取/写入完成或出现错误时,返回的 Promise 将被解决。
此 promise 化版本创建如下:
const finished = util.promisify(stream.finished);
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void>
(自 10.0.0 起)
在流之间进行管道传输。返回的 Promise 在管道完成或出现错误时结算。
此 promise 化版本创建如下:
const pipeline = util.promisify(stream.pipeline);
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable
(自 12.3.0 起)
将可迭代对象转换为可读流。
interface ReadableOptions {
?: number;
highWaterMark?: string;
encoding?: boolean;
objectMode?(this: Readable, size: number): void;
read?(this: Readable, error: Error | null,
destroy: (error: Error | null) => void): void;
callback?: boolean;
autoDestroy }
这些选项与 Readable
构造函数的选项相同,并在此处进行了说明。
模块 fs
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream
(自 2.3.0 起)
创建一个可读流。还有更多选项可用。
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream
(自 2.3.0 起)
使用选项 .flags
,您可以指定是要写入还是追加,以及如果文件存在或不存在时会发生什么。还有更多选项可用。
本节中的静态类型信息基于 Definitely Typed。