Node.js 的 Web Streams API
模块提供了与 Web Streams 标准一致的流处理功能。它用于处理流数据,支持多种数据流操作,包括读取、写入和转换。Web Streams API 是基于流的标准化接口,旨在简化和统一处理流数据的操作。
模块引入
在 Node.js 中,你可以通过 stream/web
模块来访问 Web Streams API。使用示例如下:
import { ReadableStream, WritableStream, TransformStream } from 'stream/web';
或在 CommonJS 模块中:
const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
主要类和接口
Node.js Web Streams API 模块主要包括以下几个类和接口:
ReadableStream
WritableStream
TransformStream
ReadableStreamDefaultReader
WritableStreamDefaultWriter
ReadableStreamDefaultController
WritableStreamDefaultController
TransformStreamDefaultController
1. ReadableStream
ReadableStream
表示一个从中读取数据的流。它通常用于流式读取数据源。
创建 ReadableStream
示例:
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
controller.enqueue('World');
controller.close();
}
});
方法和属性:
getReader()
: 返回一个ReadableStreamDefaultReader
对象,该对象可以用来从流中读取数据。tee()
: 返回一个包含两个ReadableStream
实例的数组,用于将流分成两个相同的流。
2. WritableStream
WritableStream
表示一个可以将数据写入的流。它通常用于流式写入数据目标。
创建 WritableStream
示例:
const writableStream = new WritableStream({
write(chunk) {
console.log(`Received chunk: ${chunk}`);
},
close() {
console.log('Stream closed');
},
abort(err) {
console.error('Stream aborted:', err);
}
});
方法和属性:
getWriter()
: 返回一个WritableStreamDefaultWriter
对象,该对象可以用来向流中写入数据。abort()
: 取消写入流的操作,并将指定的错误传递给流的abort
方法。
3. TransformStream
TransformStream
是一个变换流,它可以读取数据、处理数据,然后写入处理后的数据。它结合了 ReadableStream
和 WritableStream
。
创建 TransformStream
示例:
const transformStream = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
方法和属性:
readable
: 返回一个ReadableStream
对象,可以用于从变换流中读取数据。writable
: 返回一个WritableStream
对象,可以用于向变换流中写入数据。
4. ReadableStreamDefaultReader
ReadableStreamDefaultReader
对象用于从 ReadableStream
中读取数据。
方法:
read()
: 返回一个 Promise,解析为一个对象,其中包含从流中读取的数据或一个done
标志。releaseLock()
: 释放对流的读取锁定,允许其他读取器访问该流。
5. WritableStreamDefaultWriter
WritableStreamDefaultWriter
对象用于向 WritableStream
中写入数据。
方法:
write(chunk)
: 向流中写入数据。close()
: 关闭流,确保所有数据都已写入。abort(reason)
: 取消写入操作,并将指定的原因传递给流的abort
方法。releaseLock()
: 释放对流的写入锁定,允许其他写入器访问该流。
6. ReadableStreamDefaultController
ReadableStreamDefaultController
对象控制 ReadableStream
的行为,如数据的排队和流的关闭。
方法:
enqueue(chunk)
: 将数据块排队到流中。close()
: 关闭流,表明没有更多的数据可供读取。error(e)
: 向流报告错误。
7. WritableStreamDefaultController
WritableStreamDefaultController
对象控制 WritableStream
的行为,如流的处理和错误管理。
方法:
error(e)
: 向流报告错误。
8. TransformStreamDefaultController
TransformStreamDefaultController
对象控制 TransformStream
的行为,如数据的处理和流的状态。
方法:
enqueue(chunk)
: 将处理后的数据块排队到输出流中。error(e)
: 向流报告错误。
示例
1. 创建可读流并读取数据
const readableStream = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
controller.enqueue('World');
controller.close();
}
});
const reader = readableStream.getReader();
reader.read().then(({ done, value }) => {
console.log(done, value); // false Hello
});
2. 创建可写流并写入数据
const writableStream = new WritableStream({
write(chunk) {
console.log(`Received chunk: ${chunk}`);
}
});
const writer = writableStream.getWriter();
writer.write('Hello').then(() => writer.write('World'));
3. 创建转换流并进行数据转换
const transformStream = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const reader = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
controller.enqueue('World');
controller.close();
}
}).pipeThrough(transformStream);
const transformedReader = reader.getReader();
transformedReader.read().then(({ done, value }) => {
console.log(done, value); // false HELLO
});
总结
Node.js 的 Web Streams API 模块提供了强大的流处理能力,允许开发者以一致的方式处理数据流,包括读取、写入和转换操作。通过 ReadableStream
、WritableStream
和 TransformStream
,开发者可以高效地处理流数据,而通过 ReadableStreamDefaultReader
和 WritableStreamDefaultWriter
,可以对流进行精细控制。这个 API 提供了现代化的流操作方式,与浏览器中的 Web Streams API 保持一致性,使得在不同环境中处理流数据更加方便。