NodeJs Web Streams API

 后端   小卒子   2024-09-11 09:28   154
  nodejs

Node.js 的 Web Streams API 模块提供了与 Web Streams 标准一致的流处理功能。它用于处理流数据,支持多种数据流操作,包括读取、写入和转换。Web Streams API 是基于流的标准化接口,旨在简化和统一处理流数据的操作。

模块引入

在 Node.js 中,你可以通过 stream/web 模块来访问 Web Streams API。使用示例如下:

import { ReadableStream, WritableStream, TransformStream } from 'stream/web';

或在 CommonJS 模块中:

const { ReadableStream, WritableStream, TransformStream } = require('stream/web');

主要类和接口

Node.js Web Streams API 模块主要包括以下几个类和接口:

  1. ReadableStream
  2. WritableStream
  3. TransformStream
  4. ReadableStreamDefaultReader
  5. WritableStreamDefaultWriter
  6. ReadableStreamDefaultController
  7. WritableStreamDefaultController
  8. TransformStreamDefaultController

1. ReadableStream

ReadableStream 表示一个从中读取数据的流。它通常用于流式读取数据源。

创建 ReadableStream 示例:

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello');
    controller.enqueue('World');
    controller.close();
  }
});

方法和属性:

  • getReader(): 返回一个 ReadableStreamDefaultReader 对象,该对象可以用来从流中读取数据。
  • tee(): 返回一个包含两个 ReadableStream 实例的数组,用于将流分成两个相同的流。

2. WritableStream

WritableStream 表示一个可以将数据写入的流。它通常用于流式写入数据目标。

创建 WritableStream 示例:

const writableStream = new WritableStream({
  write(chunk) {
    console.log(`Received chunk: ${chunk}`);
  },
  close() {
    console.log('Stream closed');
  },
  abort(err) {
    console.error('Stream aborted:', err);
  }
});

方法和属性:

  • getWriter(): 返回一个 WritableStreamDefaultWriter 对象,该对象可以用来向流中写入数据。
  • abort(): 取消写入流的操作,并将指定的错误传递给流的 abort 方法。

3. TransformStream

TransformStream 是一个变换流,它可以读取数据、处理数据,然后写入处理后的数据。它结合了 ReadableStreamWritableStream

创建 TransformStream 示例:

const transformStream = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

方法和属性:

  • readable: 返回一个 ReadableStream 对象,可以用于从变换流中读取数据。
  • writable: 返回一个 WritableStream 对象,可以用于向变换流中写入数据。

4. ReadableStreamDefaultReader

ReadableStreamDefaultReader 对象用于从 ReadableStream 中读取数据。

方法:

  • read(): 返回一个 Promise,解析为一个对象,其中包含从流中读取的数据或一个 done 标志。
  • releaseLock(): 释放对流的读取锁定,允许其他读取器访问该流。

5. WritableStreamDefaultWriter

WritableStreamDefaultWriter 对象用于向 WritableStream 中写入数据。

方法:

  • write(chunk): 向流中写入数据。
  • close(): 关闭流,确保所有数据都已写入。
  • abort(reason): 取消写入操作,并将指定的原因传递给流的 abort 方法。
  • releaseLock(): 释放对流的写入锁定,允许其他写入器访问该流。

6. ReadableStreamDefaultController

ReadableStreamDefaultController 对象控制 ReadableStream 的行为,如数据的排队和流的关闭。

方法:

  • enqueue(chunk): 将数据块排队到流中。
  • close(): 关闭流,表明没有更多的数据可供读取。
  • error(e): 向流报告错误。

7. WritableStreamDefaultController

WritableStreamDefaultController 对象控制 WritableStream 的行为,如流的处理和错误管理。

方法:

  • error(e): 向流报告错误。

8. TransformStreamDefaultController

TransformStreamDefaultController 对象控制 TransformStream 的行为,如数据的处理和流的状态。

方法:

  • enqueue(chunk): 将处理后的数据块排队到输出流中。
  • error(e): 向流报告错误。

示例

1. 创建可读流并读取数据

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello');
    controller.enqueue('World');
    controller.close();
  }
});

const reader = readableStream.getReader();
reader.read().then(({ done, value }) => {
  console.log(done, value); // false Hello
});

2. 创建可写流并写入数据

const writableStream = new WritableStream({
  write(chunk) {
    console.log(`Received chunk: ${chunk}`);
  }
});

const writer = writableStream.getWriter();
writer.write('Hello').then(() => writer.write('World'));

3. 创建转换流并进行数据转换

const transformStream = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

const reader = new ReadableStream({
  start(controller) {
    controller.enqueue('Hello');
    controller.enqueue('World');
    controller.close();
  }
}).pipeThrough(transformStream);

const transformedReader = reader.getReader();
transformedReader.read().then(({ done, value }) => {
  console.log(done, value); // false HELLO
});

总结

Node.js 的 Web Streams API 模块提供了强大的流处理能力,允许开发者以一致的方式处理数据流,包括读取、写入和转换操作。通过 ReadableStreamWritableStreamTransformStream,开发者可以高效地处理流数据,而通过 ReadableStreamDefaultReaderWritableStreamDefaultWriter,可以对流进行精细控制。这个 API 提供了现代化的流操作方式,与浏览器中的 Web Streams API 保持一致性,使得在不同环境中处理流数据更加方便。