stream: make async iterator .next() always resolve

See: https://github.com/nodejs/readable-stream/issues/387

PR-URL: https://github.com/nodejs/node/pull/24668
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
This commit is contained in:
Matteo Collina 2018-11-27 09:24:48 +01:00
parent 9e33e86180
commit fa1535aed7
2 changed files with 75 additions and 5 deletions

View File

@ -39,6 +39,11 @@ function onReadable(iter) {
function wrapForNext(lastPromise, iter) { function wrapForNext(lastPromise, iter) {
return (resolve, reject) => { return (resolve, reject) => {
lastPromise.then(() => { lastPromise.then(() => {
if (iter[kEnded]) {
resolve(createIterResult(undefined, true));
return;
}
iter[kHandlePromise](resolve, reject); iter[kHandlePromise](resolve, reject);
}, reject); }, reject);
}; };
@ -61,7 +66,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
} }
if (this[kEnded]) { if (this[kEnded]) {
return Promise.resolve(createIterResult(null, true)); return Promise.resolve(createIterResult(undefined, true));
} }
if (this[kStream].destroyed) { if (this[kStream].destroyed) {
@ -74,7 +79,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
if (this[kError]) { if (this[kError]) {
reject(this[kError]); reject(this[kError]);
} else { } else {
resolve(createIterResult(null, true)); resolve(createIterResult(undefined, true));
} }
}); });
}); });
@ -115,7 +120,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
reject(err); reject(err);
return; return;
} }
resolve(createIterResult(null, true)); resolve(createIterResult(undefined, true));
}); });
}); });
}, },
@ -131,7 +136,6 @@ const createReadableStreamAsyncIterator = (stream) => {
value: stream._readableState.endEmitted, value: stream._readableState.endEmitted,
writable: true writable: true
}, },
[kLastPromise]: { value: null, writable: true },
// the function passed to new Promise // the function passed to new Promise
// is cached so we avoid allocating a new // is cached so we avoid allocating a new
// closure at every run // closure at every run
@ -151,6 +155,7 @@ const createReadableStreamAsyncIterator = (stream) => {
writable: true, writable: true,
}, },
}); });
iterator[kLastPromise] = null;
finished(stream, (err) => { finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
@ -172,7 +177,7 @@ const createReadableStreamAsyncIterator = (stream) => {
iterator[kLastPromise] = null; iterator[kLastPromise] = null;
iterator[kLastResolve] = null; iterator[kLastResolve] = null;
iterator[kLastReject] = null; iterator[kLastReject] = null;
resolve(createIterResult(null, true)); resolve(createIterResult(undefined, true));
} }
iterator[kEnded] = true; iterator[kEnded] = true;
}); });

View File

@ -393,6 +393,71 @@ async function tests() {
r.destroy(null); r.destroy(null);
} }
})(); })();
await (async () => {
console.log('all next promises must be resolved on end');
const r = new Readable({
objectMode: true,
read() {
}
});
const b = r[Symbol.asyncIterator]();
const c = b.next();
const d = b.next();
r.push(null);
assert.deepStrictEqual(await c, { done: true, value: undefined });
assert.deepStrictEqual(await d, { done: true, value: undefined });
})();
await (async () => {
console.log('all next promises must be resolved on destroy');
const r = new Readable({
objectMode: true,
read() {
}
});
const b = r[Symbol.asyncIterator]();
const c = b.next();
const d = b.next();
r.destroy();
assert.deepStrictEqual(await c, { done: true, value: undefined });
assert.deepStrictEqual(await d, { done: true, value: undefined });
})();
await (async () => {
console.log('all next promises must be resolved on destroy with error');
const r = new Readable({
objectMode: true,
read() {
}
});
const b = r[Symbol.asyncIterator]();
const c = b.next();
const d = b.next();
const err = new Error('kaboom');
r.destroy(err);
await Promise.all([(async () => {
let e;
try {
await c;
} catch (_e) {
e = _e;
}
assert.strictEqual(e, err);
})(), (async () => {
let e;
try {
await d;
} catch (_e) {
e = _e;
}
assert.strictEqual(e, err);
})()]);
})();
} }
// to avoid missing some tests if a promise does not resolve // to avoid missing some tests if a promise does not resolve