stream: fix end-of-stream for HTTP/2
HTTP/2 streams call `.end()` on themselves from their `.destroy()` method, which might be queued (e.g. due to network congestion) and not processed before the stream itself is destroyed. In that case, the `_writableState.ended` property could be set before the stream emits its `'close'` event, and never actually emits the `'finished'` event, confusing the end-of-stream implementation so that it wouldn’t call its callback. This can be fixed by watching for the end events themselves using the existing `'finish'` and `'end'` listeners rather than relying on the `.ended` properties of the `_...State` objects. These properties still need to be checked to know whether stream closure was premature – My understanding is that ideally, streams should not emit `'close'` before `'end'` and/or `'finished'`, so this might be another bug, but changing this would require modifying tests and almost certainly be a breaking change. Fixes: https://github.com/nodejs/node/issues/24456 PR-URL: https://github.com/nodejs/node/pull/24926 Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
This commit is contained in:
parent
2f75eed1aa
commit
83ec33b933
@ -36,8 +36,6 @@ function eos(stream, opts, callback) {
|
||||
|
||||
callback = once(callback);
|
||||
|
||||
const ws = stream._writableState;
|
||||
const rs = stream._readableState;
|
||||
let readable = opts.readable || (opts.readable !== false && stream.readable);
|
||||
let writable = opts.writable || (opts.writable !== false && stream.writable);
|
||||
|
||||
@ -45,13 +43,17 @@ function eos(stream, opts, callback) {
|
||||
if (!stream.writable) onfinish();
|
||||
};
|
||||
|
||||
var writableEnded = stream._writableState && stream._writableState.finished;
|
||||
const onfinish = () => {
|
||||
writable = false;
|
||||
writableEnded = true;
|
||||
if (!readable) callback.call(stream);
|
||||
};
|
||||
|
||||
var readableEnded = stream._readableState && stream._readableState.endEmitted;
|
||||
const onend = () => {
|
||||
readable = false;
|
||||
readableEnded = true;
|
||||
if (!writable) callback.call(stream);
|
||||
};
|
||||
|
||||
@ -60,11 +62,16 @@ function eos(stream, opts, callback) {
|
||||
};
|
||||
|
||||
const onclose = () => {
|
||||
if (readable && !(rs && rs.ended)) {
|
||||
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
|
||||
let err;
|
||||
if (readable && !readableEnded) {
|
||||
if (!stream._readableState || !stream._readableState.ended)
|
||||
err = new ERR_STREAM_PREMATURE_CLOSE();
|
||||
return callback.call(stream, err);
|
||||
}
|
||||
if (writable && !(ws && ws.ended)) {
|
||||
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
|
||||
if (writable && !writableEnded) {
|
||||
if (!stream._writableState || !stream._writableState.ended)
|
||||
err = new ERR_STREAM_PREMATURE_CLOSE();
|
||||
return callback.call(stream, err);
|
||||
}
|
||||
};
|
||||
|
||||
@ -77,7 +84,7 @@ function eos(stream, opts, callback) {
|
||||
stream.on('abort', onclose);
|
||||
if (stream.req) onrequest();
|
||||
else stream.on('request', onrequest);
|
||||
} else if (writable && !ws) { // legacy streams
|
||||
} else if (writable && !stream._writableState) { // legacy streams
|
||||
stream.on('end', onlegacyfinish);
|
||||
stream.on('close', onlegacyfinish);
|
||||
}
|
||||
|
@ -12,8 +12,6 @@ test-net-connect-options-port: PASS,FLAKY
|
||||
test-http2-pipe: PASS,FLAKY
|
||||
test-worker-syntax-error: PASS,FLAKY
|
||||
test-worker-syntax-error-file: PASS,FLAKY
|
||||
# https://github.com/nodejs/node/issues/24456
|
||||
test-stream-pipeline-http2: PASS,FLAKY
|
||||
|
||||
[$system==linux]
|
||||
|
||||
|
39
test/parallel/test-stream-pipeline-queued-end-in-destroy.js
Normal file
39
test/parallel/test-stream-pipeline-queued-end-in-destroy.js
Normal file
@ -0,0 +1,39 @@
|
||||
'use strict';
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { Readable, Duplex, pipeline } = require('stream');
|
||||
|
||||
// Test that the callback for pipeline() is called even when the ._destroy()
|
||||
// method of the stream places an .end() request to itself that does not
|
||||
// get processed before the destruction of the stream (i.e. the 'close' event).
|
||||
// Refs: https://github.com/nodejs/node/issues/24456
|
||||
|
||||
const readable = new Readable({
|
||||
read: common.mustCall(() => {})
|
||||
});
|
||||
|
||||
const duplex = new Duplex({
|
||||
write(chunk, enc, cb) {
|
||||
// Simulate messages queueing up.
|
||||
},
|
||||
read() {},
|
||||
destroy(err, cb) {
|
||||
// Call end() from inside the destroy() method, like HTTP/2 streams
|
||||
// do at the time of writing.
|
||||
this.end();
|
||||
cb(err);
|
||||
}
|
||||
});
|
||||
|
||||
duplex.on('finished', common.mustNotCall());
|
||||
|
||||
pipeline(readable, duplex, common.mustCall((err) => {
|
||||
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
|
||||
}));
|
||||
|
||||
// Write one chunk of data, and destroy the stream later.
|
||||
// That should trigger the pipeline destruction.
|
||||
readable.push('foo');
|
||||
setImmediate(() => {
|
||||
readable.destroy();
|
||||
});
|
Loading…
x
Reference in New Issue
Block a user