使用 Node.js 进行 Shell 脚本编程
您可以购买本书的离线版本(HTML、PDF、EPUB、MOBI),并支持免费在线版本。
(广告,请不要屏蔽。)

9 原生 Node.js 流



本章介绍 Node 的原生流。它们支持 异步迭代,这使得它们更易于使用,这也是我们在本章中主要使用的方式。

请注意,跨平台的*Web 流*在 §10 “在 Node.js 上使用 Web 流” 中介绍。我们将在本书中主要使用这些。因此,如果您愿意,可以跳过本章。

9.1 回顾:异步迭代和异步生成器

异步迭代 是一种用于异步检索数据容器内容的协议(意味着当前“任务”在检索项目之前可能会暂停)。

异步生成器 有助于异步迭代。例如,这是一个异步生成器函数

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}

在本章的其余部分,请密切注意函数是异步函数还是异步生成器函数

/** @returns a Promise */
async function asyncFunction() { /*···*/ }

/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }

9.2 流

流是一种模式,其核心思想是“分而治之”大量数据:如果我们将数据分成较小的部分并一次处理一部分,我们就可以处理它。

Node.js 支持多种流——例如

9.2.1 管道

要在多个步骤中处理流数据,我们可以对流进行*管道*(连接)

  1. 通过可读流接收输入。
  2. 每个处理步骤都通过转换流执行。
  3. 对于最后一个处理步骤,我们有两个选择
    • 我们可以将最近的可读流中的数据写入可写流。也就是说,可写流是我们管道的最后一个元素。
    • 我们可以以其他方式处理最近的可读流中的数据。

第 (2) 部分是可选的。

9.2.2 文本编码

创建文本流时,最好始终指定编码

编码的默认值为 null,相当于 'utf8'

9.2.3 辅助函数:readableToString()

我们偶尔会使用以下辅助函数。您不需要了解它是如何工作的,只需要(大致)了解它的作用。

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function (chunk) {
      data += chunk;
    });
    readable.on('end', function () {
      resolve(data);
    });
    readable.on('error', function (err) {
      reject(err);
    });
  });
}

此函数是通过基于事件的 API 实现的。稍后我们将看到一种更简单的方法——通过异步迭代。

9.2.4 一些初步说明

9.3 可读流

9.3.1 创建可读流

9.3.1.1 从文件创建可读流

我们可以使用 fs.createReadStream() 来创建可读流

import * as fs from 'fs';

const readableStream = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});

assert.equal(
  await readableToString(readableStream),
  'This is a test!\n');
9.3.1.2 Readable.from():从迭代器创建可读流

静态方法 Readable.from(iterable, options?) 创建一个可读流,其中包含 iterable 中的数据。 iterable 可以是同步迭代器或异步迭代器。参数 options 是可选的,除其他外,可用于指定文本编码。

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n');
9.3.1.2.1 从字符串创建可读流

Readable.from() 接受任何迭代器,因此也可用于将字符串转换为流

import {Readable} from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
  await readableToString(readable),
  'Some text!');

目前Readable.from() 将字符串视为任何其他迭代器,因此会迭代其代码点。从性能角度来看,这并不理想,但对于大多数用例来说应该没问题。我预计 Readable.from() 会经常与字符串一起使用,因此将来可能会进行优化。

9.3.2 通过 for-await-of 从可读流中读取块

每个可读流都是异步可迭代的,这意味着我们可以使用 for-await-of 循环来读取其内容

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'
9.3.2.1 将可读流的内容收集到字符串中

以下函数是本章开头我们看到的函数的更简单的重新实现。

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

请注意,在这种情况下,我们必须使用异步函数,因为我们想返回一个 Promise。

9.3.3 通过模块 'node:readlines' 从可读流中读取行

内置模块 'node:readline' 允许我们从可读流中读取行

import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';

const filePath = process.argv[2]; // first command line argument

const rl = readline.createInterface({
  input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
});
for await (const line of rl) {
  console.log('>', line);
}
rl.close();

9.4 通过异步生成器转换可读流

异步迭代为转换流提供了一种优雅的替代方案,用于在多个步骤中处理流数据

总而言之,这些是此类处理管道的组成部分

可读
→ 第一个异步生成器 [→ … → 最后一个异步生成器]
→ 可读或异步函数

9.4.1 在异步迭代器中从块到编号行

在下一个示例中,我们将看到一个处理管道的示例,正如刚刚解释的那样。

import {Readable} from 'stream';

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    let startSearch = previous.length;
    previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
await logLines(numberLines(chunksToLines(chunks))); // (A)

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

处理管道在 A 行中设置。步骤如下

观察

9.5 可写流

9.5.1 为文件创建可写流

我们可以使用 fs.createWriteStream() 来创建可写流

const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});

9.5.2 写入可写流

在本节中,我们将研究写入可写流的方法

  1. 通过其方法 .write() 直接写入可写流。
  2. 使用模块 stream 中的函数 pipeline() 将可读流管道传输到可写流。

为了演示这些方法,我们使用它们来实现相同的函数 writeIterableToFile()

可读流的方法 .pipe() 也支持管道传输,但它有一个缺点,最好避免使用它。

9.5.2.1 writable.write(chunk)

在将数据写入流时,有两种基于回调的机制可以帮助我们

在以下示例中,我们对这些机制进行了 Promise 化,以便我们可以通过异步函数使用它们

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished() 的默认版本是基于回调的,但可以通过 util.promisify() 转换为基于 Promise 的版本(A 行)。

我们使用了以下两种模式

9.5.2.2 通过 stream.pipeline() 将可读流管道传输到可写流

在 A 行中,我们使用 stream.pipeline() 的 Promise 化版本将可读流 readable 管道传输到可写流 writable

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

方法 readable.pipe() 也支持管道传输,但有 一个警告:如果可读流发出错误,则可写流不会自动关闭。 pipeline() 没有这个警告。

模块 os

模块 buffer

模块 stream

模块 fs

本节中的静态类型信息基于 Definitely Typed

9.7 本章的进一步阅读和参考资料