stream: async iteration should work with destroyed stream

Fixes https://github.com/nodejs/node/issues/23730.

PR-URL: https://github.com/nodejs/node/pull/23785
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Matheus Marchini <mat@mmarchini.me>
This commit is contained in:
Matteo Collina 2018-10-20 15:04:57 +02:00
parent 12c530ccd5
commit 3ec8cec648
2 changed files with 82 additions and 27 deletions

View File

@ -1,5 +1,7 @@
'use strict';
const finished = require('internal/streams/end-of-stream');
const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
@ -34,30 +36,6 @@ function onReadable(iter) {
process.nextTick(readAndResolve, iter);
}
function onEnd(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(createIterResult(null, true));
}
iter[kEnded] = true;
}
function onError(iter, err) {
const reject = iter[kLastReject];
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
reject(err);
}
iter[kError] = err;
}
function wrapForNext(lastPromise, iter) {
return function(resolve, reject) {
lastPromise.then(function() {
@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
return Promise.resolve(createIterResult(null, true));
}
if (this[kStream].destroyed) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise((resolve, reject) => {
process.nextTick(() => {
if (this[kError]) {
reject(this[kError]);
} else {
resolve(createIterResult(null, true));
}
});
});
}
// if we have multiple next() calls
// we will wait for the previous Promise to finish
// this logic is optimized to support for await loops,
@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => {
},
});
finished(stream, (err) => {
if (err) {
const reject = iterator[kLastReject];
// reject if we are waiting for data in the Promise
// returned by next() and store the error
if (reject !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
reject(err);
}
iterator[kError] = err;
return;
}
const resolve = iterator[kLastResolve];
if (resolve !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(null, true));
}
iterator[kEnded] = true;
});
stream.on('readable', onReadable.bind(null, iterator));
stream.on('end', onEnd.bind(null, iterator));
stream.on('error', onError.bind(null, iterator));
return iterator;
};

View File

@ -1,7 +1,7 @@
'use strict';
const common = require('../common');
const { Readable } = require('stream');
const { Readable, PassThrough, pipeline } = require('stream');
const assert = require('assert');
async function tests() {
@ -324,6 +324,44 @@ async function tests() {
assert.strictEqual(data, expected);
})();
await (async function() {
console.log('.next() on destroyed stream');
const readable = new Readable({
read() {
// no-op
}
});
readable.destroy();
try {
await readable[Symbol.asyncIterator]().next();
} catch (e) {
assert.strictEqual(e.code, 'ERR_STREAM_PREMATURE_CLOSE');
}
})();
await (async function() {
console.log('.next() on pipelined stream');
const readable = new Readable({
read() {
// no-op
}
});
const passthrough = new PassThrough();
const err = new Error('kaboom');
pipeline(readable, passthrough, common.mustCall((e) => {
assert.strictEqual(e, err);
}));
readable.destroy(err);
try {
await readable[Symbol.asyncIterator]().next();
} catch (e) {
assert.strictEqual(e, err);
}
})();
}
// to avoid missing some tests if a promise does not resolve