写给心急程序员的 JavaScript(ES2022 版)
请支持本书:购买捐赠
(广告,请勿屏蔽。)

42 异步迭代



  所需知识

对于本章,您应该熟悉以下内容

42.1 基本异步迭代

42.1.1 协议:异步迭代

要了解异步迭代的工作原理,让我们先回顾一下 同步迭代。它包含以下接口

interface Iterable<T> {
  [Symbol.iterator]() : Iterator<T>;
}
interface Iterator<T> {
  next() : IteratorResult<T>;
}
interface IteratorResult<T> {
  value: T;
  done: boolean;
}

对于异步迭代协议,我们只想改变一件事:.next() 生成的值应该异步传递。有两种可行的选择

换句话说,问题是是将值包装在 Promise 中,还是将整个迭代器结果包装在 Promise 中。

必须是后者,因为当 .next() 返回结果时,它会启动一个异步计算。只有在计算完成后才能确定该计算是生成值还是表示迭代结束。因此,.done.value 都需要包装在 Promise 中。

异步迭代的接口如下所示。

interface AsyncIterable<T> {
  [Symbol.asyncIterator]() : AsyncIterator<T>;
}
interface AsyncIterator<T> {
  next() : Promise<IteratorResult<T>>; // (A)
}
interface IteratorResult<T> {
  value: T;
  done: boolean;
}

与同步接口的唯一区别是 .next() 的返回类型(A 行)。

42.1.2 直接使用异步迭代

以下代码直接使用异步迭代协议

const asyncIterable = syncToAsyncIterable(['a', 'b']); // (A)
const asyncIterator = asyncIterable[Symbol.asyncIterator]();

// Call .next() until .done is true:
asyncIterator.next() // (B)
.then(iteratorResult => {
  assert.deepEqual(
    iteratorResult,
    { value: 'a', done: false });
  return asyncIterator.next(); // (C)
})
.then(iteratorResult => {
  assert.deepEqual(
    iteratorResult,
    { value: 'b', done: false });
  return asyncIterator.next(); // (D)
})
.then(iteratorResult => {
  assert.deepEqual(
    iteratorResult,
     { value: undefined, done: true });
})
;

在 A 行中,我们为值 'a''b' 创建了一个异步可迭代对象。稍后我们将看到 syncToAsyncIterable() 的实现。

我们在 B 行、C 行和 D 行调用 .next()。每次我们都使用 .then() 来解包 Promise,并使用 assert.deepEqual() 来检查解包后的值。

如果我们使用异步函数,我们可以简化这段代码。现在我们通过 await 解包 Promise,代码看起来就像我们在进行同步迭代一样

async function f() {
  const asyncIterable = syncToAsyncIterable(['a', 'b']);
  const asyncIterator = asyncIterable[Symbol.asyncIterator]();
  
  // Call .next() until .done is true:
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 'a', done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 'b', done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: undefined, done: true });
}

42.1.3 通过 for-await-of 使用异步迭代

异步迭代协议不打算直接使用。支持它的语言结构之一是 for-await-of 循环,它是 for-of 循环的异步版本。它可以在异步函数和*异步生成器*(本章稍后介绍)中使用。这是一个 for-await-of 的使用示例

for await (const x of syncToAsyncIterable(['a', 'b'])) {
  console.log(x);
}
// Output:
// 'a'
// 'b'

for-await-of 相对灵活。除了异步可迭代对象之外,它还支持同步可迭代对象

for await (const x of ['a', 'b']) {
  console.log(x);
}
// Output:
// 'a'
// 'b'

并且它支持对包装在 Promise 中的值进行同步迭代

const arr = [Promise.resolve('a'), Promise.resolve('b')];
for await (const x of arr) {
  console.log(x);
}
// Output:
// 'a'
// 'b'

  练习:将异步可迭代对象转换为数组

警告:我们很快就会在本章中看到此练习的解决方案。

42.2 异步生成器

异步生成器同时是两件事

  异步生成器与同步生成器非常相似

由于异步生成器和同步生成器非常相似,因此我不解释 yieldyield* 的确切工作原理。如果您有疑问,请查阅 §38 “同步生成器”

因此,异步生成器具有

这看起来如下所示

async function* asyncGen() {
  // Input: Promises, async iterables
  const x = await somePromise;
  for await (const y of someAsyncIterable) {
    // ···
  }

  // Output
  yield someValue;
  yield* otherAsyncGen();
}

42.2.1 示例:通过异步生成器创建异步可迭代对象

让我们看一个例子。以下代码创建了一个包含三个数字的异步可迭代对象

async function* yield123() {
  for (let i=1; i<=3; i++) {
    yield i;
  }
}

yield123() 的结果是否符合异步迭代协议?

async function check() {
  const asyncIterable = yield123();
  const asyncIterator = asyncIterable[Symbol.asyncIterator]();
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 1, done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 2, done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 3, done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: undefined, done: true });
}
check();

42.2.2 示例:将同步可迭代对象转换为异步可迭代对象

以下异步生成器将同步可迭代对象转换为异步可迭代对象。它实现了我们之前使用过的函数 syncToAsyncIterable()

async function* syncToAsyncIterable(syncIterable) {
  for (const elem of syncIterable) {
    yield elem;
  }
}

注意:在这种情况下,输入是同步的(不需要 await)。

42.2.3 示例:将异步可迭代对象转换为数组

以下函数是先前练习的解决方案。它将异步可迭代对象转换为数组(想想展开,但用于异步可迭代对象而不是同步可迭代对象)。

async function asyncIterableToArray(asyncIterable) {
  const result = [];
  for await (const value of asyncIterable) {
    result.push(value);
  }
  return result;
}

请注意,在这种情况下我们不能使用异步生成器:我们通过 for-await-of 获取输入并返回一个包装在 Promise 中的数组。后一个要求排除了异步生成器。

这是 asyncIterableToArray() 的测试

async function* createAsyncIterable() {
  yield 'a';
  yield 'b';
}
const asyncIterable = createAsyncIterable();
assert.deepEqual(
  await asyncIterableToArray(asyncIterable), // (A)
  ['a', 'b']
);

请注意 A 行中的 await,这是解包 asyncIterableToArray() 返回的 Promise 所必需的。为了使 await 工作,此代码片段必须在异步函数中运行。

42.2.4 示例:转换异步可迭代对象

让我们实现一个异步生成器,它通过转换现有的异步可迭代对象来生成新的异步可迭代对象。

async function* timesTwo(asyncNumbers) {
  for await (const x of asyncNumbers) {
    yield x * 2;
  }
}

为了测试此函数,我们使用上一节中的 asyncIterableToArray()

async function* createAsyncIterable() {
  for (let i=1; i<=3; i++) {
    yield i;
  }
}
assert.deepEqual(
  await asyncIterableToArray(timesTwo(createAsyncIterable())),
  [2, 4, 6]
);

  练习:异步生成器

警告:我们很快就会在本章中看到此练习的解决方案。

42.2.5 示例:映射异步可迭代对象

提醒一下,以下是映射同步可迭代对象的方法

function* mapSync(iterable, func) {
  let index = 0;
  for (const x of iterable) {
    yield func(x, index);
    index++;
  }
}
const syncIterable = mapSync(['a', 'b', 'c'], s => s.repeat(3));
assert.deepEqual(
  Array.from(syncIterable),
  ['aaa', 'bbb', 'ccc']);

异步版本如下所示

async function* mapAsync(asyncIterable, func) { // (A)
  let index = 0;
  for await (const x of asyncIterable) { // (B)
    yield func(x, index);
    index++;
  }
}

请注意同步实现和异步实现是多么相似。唯一的两个区别是 A 行中的 async 和 B 行中的 await。这与从同步函数到异步函数的过程类似——我们只需要添加关键字 async 和偶尔的 await

为了测试 mapAsync(),我们使用辅助函数 asyncIterableToArray() (本章前面已展示)

async function* createAsyncIterable() {
  yield 'a';
  yield 'b';
}
const mapped = mapAsync(
  createAsyncIterable(), s => s.repeat(3));
assert.deepEqual(
  await asyncIterableToArray(mapped), // (A)
  ['aaa', 'bbb']);

再一次,我们使用 await 来解包 Promise(A 行),并且此代码片段必须在异步函数中运行。

  练习:filterAsyncIter()

exercises/async-iteration/filter_async_iter_test.mjs

42.3 Node.js 流上的异步迭代

42.3.1 Node.js 流:通过回调函数实现异步(推送)

传统上,从 Node.js 流中异步读取是通过回调函数完成的

function main(inputFilePath) {
  const readStream = fs.createReadStream(inputFilePath,
    { encoding: 'utf8', highWaterMark: 1024 });
  readStream.on('data', (chunk) => {
    console.log('>>> '+chunk);
  });
  readStream.on('end', () => {
    console.log('### DONE ###');
  });
}

也就是说,流处于控制之中并将数据推送到读取器。

42.3.2 Node.js 流:通过异步迭代实现异步(拉取)

从 Node.js 10 开始,我们还可以使用异步迭代从流中读取数据

async function main(inputFilePath) {
  const readStream = fs.createReadStream(inputFilePath,
    { encoding: 'utf8', highWaterMark: 1024 });

  for await (const chunk of readStream) {
    console.log('>>> '+chunk);
  }
  console.log('### DONE ###');
}

这一次,读取器处于控制之中并从流中拉取数据。

42.3.3 示例:从数据块到行

Node.js 流迭代数据*块*(任意长度的部分)。以下异步生成器将数据块上的异步可迭代对象转换为行上的异步可迭代对象

/**
 * Parameter: async iterable of chunks (strings)
 * Result: async iterable of lines (incl. newlines)
 */
async function* chunksToLines(chunksAsync) {
  let previous = '';
  for await (const chunk of chunksAsync) { // input
    previous += chunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf('\n')) >= 0) {
      // line includes the EOL (Windows '\r\n' or Unix '\n')
      const line = previous.slice(0, eolIndex+1);
      yield line; // output
      previous = previous.slice(eolIndex+1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

让我们将 chunksToLines() 应用于数据块上的异步可迭代对象(由 chunkIterable() 生成)

async function* chunkIterable() {
  yield 'First\nSec';
  yield 'ond\nThird\nF';
  yield 'ourth';
}
const linesIterable = chunksToLines(chunkIterable());
assert.deepEqual(
  await asyncIterableToArray(linesIterable),
  [
    'First\n',
    'Second\n',
    'Third\n',
    'Fourth',
  ]);

现在我们有了行上的异步可迭代对象,我们可以使用先前练习的解决方案 numberLines() 来对这些行进行编号

async function* numberLines(linesAsync) {
  let lineNumber = 1;
  for await (const line of linesAsync) {
    yield lineNumber + ': ' + line;
    lineNumber++;
  }
}
const numberedLines = numberLines(chunksToLines(chunkIterable()));
assert.deepEqual(
  await asyncIterableToArray(numberedLines),
  [
    '1: First\n',
    '2: Second\n',
    '3: Third\n',
    '4: Fourth',
  ]);