stream: implement Readable.from async iterator utility
PR-URL: https://github.com/nodejs/node/pull/27660 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Michaël Zasso <targos@protonmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
parent
af83b7963f
commit
030fa2ea44
@ -46,8 +46,8 @@ There are four fundamental stream types within Node.js:
|
|||||||
* [`Transform`][] - `Duplex` streams that can modify or transform the data as it
|
* [`Transform`][] - `Duplex` streams that can modify or transform the data as it
|
||||||
is written and read (for example, [`zlib.createDeflate()`][]).
|
is written and read (for example, [`zlib.createDeflate()`][]).
|
||||||
|
|
||||||
Additionally, this module includes the utility functions [pipeline][] and
|
Additionally, this module includes the utility functions [pipeline][],
|
||||||
[finished][].
|
[finished][] and [Readable.from][].
|
||||||
|
|
||||||
### Object Mode
|
### Object Mode
|
||||||
|
|
||||||
@ -1480,6 +1480,31 @@ async function run() {
|
|||||||
run().catch(console.error);
|
run().catch(console.error);
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Readable.from(iterable, [options])
|
||||||
|
|
||||||
|
* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
|
||||||
|
`Symbol.iterator` iterable protocol.
|
||||||
|
* `options` {Object} Options provided to `new stream.Readable([options])`.
|
||||||
|
By default, `Readable.from()` will set `options.objectMode` to `true`, unless
|
||||||
|
this is explicitly opted out by setting `options.objectMode` to `false`.
|
||||||
|
|
||||||
|
A utility method for creating Readable Streams out of iterators.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { Readable } = require('stream');
|
||||||
|
|
||||||
|
async function * generate() {
|
||||||
|
yield 'hello';
|
||||||
|
yield 'streams';
|
||||||
|
}
|
||||||
|
|
||||||
|
const readable = Readable.from(generate());
|
||||||
|
|
||||||
|
readable.on('data', (chunk) => {
|
||||||
|
console.log(chunk);
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
## API for Stream Implementers
|
## API for Stream Implementers
|
||||||
|
|
||||||
<!--type=misc-->
|
<!--type=misc-->
|
||||||
@ -2395,6 +2420,89 @@ primarily for examples and testing, but there are some use cases where
|
|||||||
|
|
||||||
<!--type=misc-->
|
<!--type=misc-->
|
||||||
|
|
||||||
|
### Streams Compatibility with Async Generators and Async Iterators
|
||||||
|
|
||||||
|
With the support of async generators and iterators in JavaScript, async
|
||||||
|
generators are effectively a first-class language-level stream construct at
|
||||||
|
this point.
|
||||||
|
|
||||||
|
Some common interop cases of using Node.js streams with async generators
|
||||||
|
and async iterators are provided below.
|
||||||
|
|
||||||
|
#### Consuming Readable Streams with Async Iterators
|
||||||
|
|
||||||
|
```js
|
||||||
|
(async function() {
|
||||||
|
for await (const chunk of readable) {
|
||||||
|
console.log(chunk);
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Creating Readable Streams with Async Generators
|
||||||
|
|
||||||
|
We can construct a Node.js Readable Stream from an asynchronous generator
|
||||||
|
using the `Readable.from` utility method:
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { Readable } = require('stream');
|
||||||
|
|
||||||
|
async function * generate() {
|
||||||
|
yield 'a';
|
||||||
|
yield 'b';
|
||||||
|
yield 'c';
|
||||||
|
}
|
||||||
|
|
||||||
|
const readable = Readable.from(generate());
|
||||||
|
|
||||||
|
readable.on('data', (chunk) => {
|
||||||
|
console.log(chunk);
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Piping to Writable Streams from Async Iterators
|
||||||
|
|
||||||
|
In the scenario of writing to a writeable stream from an async iterator,
|
||||||
|
it is important to ensure the correct handling of backpressure and errors.
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { once } = require('events');
|
||||||
|
|
||||||
|
const writeable = fs.createWriteStream('./file');
|
||||||
|
|
||||||
|
(async function() {
|
||||||
|
for await (const chunk of iterator) {
|
||||||
|
// Handle backpressure on write
|
||||||
|
if (!writeable.write(value))
|
||||||
|
await once(writeable, 'drain');
|
||||||
|
}
|
||||||
|
writeable.end();
|
||||||
|
// Ensure completion without errors
|
||||||
|
await once(writeable, 'finish');
|
||||||
|
})();
|
||||||
|
```
|
||||||
|
|
||||||
|
In the above, errors on the write stream would be caught and thrown by the two
|
||||||
|
`once` listeners, since `once` will also handle `'error'` events.
|
||||||
|
|
||||||
|
Alternatively the readable stream could be wrapped with `Readable.from` and
|
||||||
|
then piped via `.pipe`:
|
||||||
|
|
||||||
|
```js
|
||||||
|
const { once } = require('events');
|
||||||
|
|
||||||
|
const writeable = fs.createWriteStream('./file');
|
||||||
|
|
||||||
|
(async function() {
|
||||||
|
const readable = Readable.from(iterator);
|
||||||
|
readable.pipe(writeable);
|
||||||
|
// Ensure completion without errors
|
||||||
|
await once(writeable, 'finish');
|
||||||
|
})();
|
||||||
|
```
|
||||||
|
|
||||||
|
<!--type=misc-->
|
||||||
|
|
||||||
### Compatibility with Older Node.js Versions
|
### Compatibility with Older Node.js Versions
|
||||||
|
|
||||||
<!--type=misc-->
|
<!--type=misc-->
|
||||||
@ -2531,6 +2639,7 @@ contain multi-byte characters.
|
|||||||
[Compatibility]: #stream_compatibility_with_older_node_js_versions
|
[Compatibility]: #stream_compatibility_with_older_node_js_versions
|
||||||
[HTTP requests, on the client]: http.html#http_class_http_clientrequest
|
[HTTP requests, on the client]: http.html#http_class_http_clientrequest
|
||||||
[HTTP responses, on the server]: http.html#http_class_http_serverresponse
|
[HTTP responses, on the server]: http.html#http_class_http_serverresponse
|
||||||
|
[Readable.from]: #readable.from
|
||||||
[TCP sockets]: net.html#net_class_net_socket
|
[TCP sockets]: net.html#net_class_net_socket
|
||||||
[child process stdin]: child_process.html#child_process_subprocess_stdin
|
[child process stdin]: child_process.html#child_process_subprocess_stdin
|
||||||
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout
|
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout
|
||||||
|
@ -1139,3 +1139,42 @@ function endReadableNT(state, stream) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Readable.from = function(iterable, opts) {
|
||||||
|
let iterator;
|
||||||
|
if (iterable && iterable[Symbol.asyncIterator])
|
||||||
|
iterator = iterable[Symbol.asyncIterator]();
|
||||||
|
else if (iterable && iterable[Symbol.iterator])
|
||||||
|
iterator = iterable[Symbol.iterator]();
|
||||||
|
else
|
||||||
|
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
|
||||||
|
|
||||||
|
const readable = new Readable({
|
||||||
|
objectMode: true,
|
||||||
|
...opts
|
||||||
|
});
|
||||||
|
// Reading boolean to protect against _read
|
||||||
|
// being called before last iteration completion.
|
||||||
|
let reading = false;
|
||||||
|
readable._read = function() {
|
||||||
|
if (!reading) {
|
||||||
|
reading = true;
|
||||||
|
next();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
async function next() {
|
||||||
|
try {
|
||||||
|
const { value, done } = await iterator.next();
|
||||||
|
if (done) {
|
||||||
|
readable.push(null);
|
||||||
|
} else if (readable.push(await value)) {
|
||||||
|
next();
|
||||||
|
} else {
|
||||||
|
reading = false;
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
readable.destroy(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return readable;
|
||||||
|
};
|
||||||
|
@ -90,4 +90,4 @@ Promise.all([
|
|||||||
catchesErrors(),
|
catchesErrors(),
|
||||||
stopListeningAfterCatchingError(),
|
stopListeningAfterCatchingError(),
|
||||||
onceError()
|
onceError()
|
||||||
]);
|
]).then(common.mustCall());
|
||||||
|
163
test/parallel/test-readable-from.js
Normal file
163
test/parallel/test-readable-from.js
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const { mustCall } = require('../common');
|
||||||
|
const { once } = require('events');
|
||||||
|
const { Readable } = require('stream');
|
||||||
|
const { strictEqual } = require('assert');
|
||||||
|
|
||||||
|
async function toReadableBasicSupport() {
|
||||||
|
async function * generate() {
|
||||||
|
yield 'a';
|
||||||
|
yield 'b';
|
||||||
|
yield 'c';
|
||||||
|
}
|
||||||
|
|
||||||
|
const stream = Readable.from(generate());
|
||||||
|
|
||||||
|
const expected = ['a', 'b', 'c'];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
strictEqual(chunk, expected.shift());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function toReadableSyncIterator() {
|
||||||
|
function * generate() {
|
||||||
|
yield 'a';
|
||||||
|
yield 'b';
|
||||||
|
yield 'c';
|
||||||
|
}
|
||||||
|
|
||||||
|
const stream = Readable.from(generate());
|
||||||
|
|
||||||
|
const expected = ['a', 'b', 'c'];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
strictEqual(chunk, expected.shift());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function toReadablePromises() {
|
||||||
|
const promises = [
|
||||||
|
Promise.resolve('a'),
|
||||||
|
Promise.resolve('b'),
|
||||||
|
Promise.resolve('c')
|
||||||
|
];
|
||||||
|
|
||||||
|
const stream = Readable.from(promises);
|
||||||
|
|
||||||
|
const expected = ['a', 'b', 'c'];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
strictEqual(chunk, expected.shift());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function toReadableString() {
|
||||||
|
const stream = Readable.from('abc');
|
||||||
|
|
||||||
|
const expected = ['a', 'b', 'c'];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
strictEqual(chunk, expected.shift());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function toReadableOnData() {
|
||||||
|
async function * generate() {
|
||||||
|
yield 'a';
|
||||||
|
yield 'b';
|
||||||
|
yield 'c';
|
||||||
|
}
|
||||||
|
|
||||||
|
const stream = Readable.from(generate());
|
||||||
|
|
||||||
|
let iterations = 0;
|
||||||
|
const expected = ['a', 'b', 'c'];
|
||||||
|
|
||||||
|
stream.on('data', (chunk) => {
|
||||||
|
iterations++;
|
||||||
|
strictEqual(chunk, expected.shift());
|
||||||
|
});
|
||||||
|
|
||||||
|
await once(stream, 'end');
|
||||||
|
|
||||||
|
strictEqual(iterations, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function toReadableOnDataNonObject() {
|
||||||
|
async function * generate() {
|
||||||
|
yield 'a';
|
||||||
|
yield 'b';
|
||||||
|
yield 'c';
|
||||||
|
}
|
||||||
|
|
||||||
|
const stream = Readable.from(generate(), { objectMode: false });
|
||||||
|
|
||||||
|
let iterations = 0;
|
||||||
|
const expected = ['a', 'b', 'c'];
|
||||||
|
|
||||||
|
stream.on('data', (chunk) => {
|
||||||
|
iterations++;
|
||||||
|
strictEqual(chunk instanceof Buffer, true);
|
||||||
|
strictEqual(chunk.toString(), expected.shift());
|
||||||
|
});
|
||||||
|
|
||||||
|
await once(stream, 'end');
|
||||||
|
|
||||||
|
strictEqual(iterations, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function destroysTheStreamWhenThrowing() {
|
||||||
|
async function * generate() {
|
||||||
|
throw new Error('kaboom');
|
||||||
|
}
|
||||||
|
|
||||||
|
const stream = Readable.from(generate());
|
||||||
|
|
||||||
|
stream.read();
|
||||||
|
|
||||||
|
try {
|
||||||
|
await once(stream, 'error');
|
||||||
|
} catch (err) {
|
||||||
|
strictEqual(err.message, 'kaboom');
|
||||||
|
strictEqual(stream.destroyed, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function asTransformStream() {
|
||||||
|
async function * generate(stream) {
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
yield chunk.toUpperCase();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const source = new Readable({
|
||||||
|
objectMode: true,
|
||||||
|
read() {
|
||||||
|
this.push('a');
|
||||||
|
this.push('b');
|
||||||
|
this.push('c');
|
||||||
|
this.push(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const stream = Readable.from(generate(source));
|
||||||
|
|
||||||
|
const expected = ['A', 'B', 'C'];
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
strictEqual(chunk, expected.shift());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Promise.all([
|
||||||
|
toReadableBasicSupport(),
|
||||||
|
toReadableSyncIterator(),
|
||||||
|
toReadablePromises(),
|
||||||
|
toReadableString(),
|
||||||
|
toReadableOnData(),
|
||||||
|
toReadableOnDataNonObject(),
|
||||||
|
destroysTheStreamWhenThrowing(),
|
||||||
|
asTransformStream()
|
||||||
|
]).then(mustCall());
|
Loading…
x
Reference in New Issue
Block a user