http2: fix condition where data is lost
PR-URL: https://github.com/nodejs/node/pull/18895 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
This commit is contained in:
parent
fcebb16478
commit
dbe645f114
@ -307,8 +307,23 @@ function onStreamClose(code) {
|
|||||||
|
|
||||||
if (state.fd !== undefined)
|
if (state.fd !== undefined)
|
||||||
tryClose(state.fd);
|
tryClose(state.fd);
|
||||||
stream.push(null);
|
|
||||||
stream[kMaybeDestroy](null, code);
|
// Defer destroy we actually emit end.
|
||||||
|
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
|
||||||
|
// If errored or ended, we can destroy immediately.
|
||||||
|
stream[kMaybeDestroy](null, code);
|
||||||
|
} else {
|
||||||
|
// Wait for end to destroy.
|
||||||
|
stream.on('end', stream[kMaybeDestroy]);
|
||||||
|
// Push a null so the stream can end whenever the client consumes
|
||||||
|
// it completely.
|
||||||
|
stream.push(null);
|
||||||
|
|
||||||
|
// Same as net.
|
||||||
|
if (stream.readableLength === 0) {
|
||||||
|
stream.read(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receives a chunk of data for a given stream and forwards it on
|
// Receives a chunk of data for a given stream and forwards it on
|
||||||
@ -326,11 +341,19 @@ function onStreamRead(nread, buf) {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Last chunk was received. End the readable side.
|
// Last chunk was received. End the readable side.
|
||||||
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
|
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
|
||||||
`${sessionName(stream[kSession][kType])}]: ending readable.`);
|
`${sessionName(stream[kSession][kType])}]: ending readable.`);
|
||||||
stream.push(null);
|
|
||||||
stream[kMaybeDestroy]();
|
// defer this until we actually emit end
|
||||||
|
if (stream._readableState.endEmitted) {
|
||||||
|
stream[kMaybeDestroy]();
|
||||||
|
} else {
|
||||||
|
stream.on('end', stream[kMaybeDestroy]);
|
||||||
|
stream.push(null);
|
||||||
|
stream.read(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called when the remote peer settings have been updated.
|
// Called when the remote peer settings have been updated.
|
||||||
@ -1833,21 +1856,25 @@ class Http2Stream extends Duplex {
|
|||||||
session[kMaybeDestroy]();
|
session[kMaybeDestroy]();
|
||||||
process.nextTick(emit, this, 'close', code);
|
process.nextTick(emit, this, 'close', code);
|
||||||
callback(err);
|
callback(err);
|
||||||
}
|
|
||||||
|
|
||||||
|
}
|
||||||
// The Http2Stream can be destroyed if it has closed and if the readable
|
// The Http2Stream can be destroyed if it has closed and if the readable
|
||||||
// side has received the final chunk.
|
// side has received the final chunk.
|
||||||
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
|
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
|
||||||
if (error == null) {
|
if (error || code !== NGHTTP2_NO_ERROR) {
|
||||||
if (code === NGHTTP2_NO_ERROR &&
|
this.destroy(error);
|
||||||
(!this._readableState.ended ||
|
return;
|
||||||
!this._writableState.ended ||
|
}
|
||||||
this._writableState.pendingcb > 0 ||
|
|
||||||
!this.closed)) {
|
// TODO(mcollina): remove usage of _*State properties
|
||||||
return;
|
if (this._readableState.ended &&
|
||||||
}
|
this._writableState.ended &&
|
||||||
|
this._writableState.pendingcb === 0 &&
|
||||||
|
this.closed) {
|
||||||
|
this.destroy();
|
||||||
|
// This should return, but eslint complains.
|
||||||
|
// return
|
||||||
}
|
}
|
||||||
this.destroy(error);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,50 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const assert = require('assert');
|
||||||
|
const http2 = require('http2');
|
||||||
|
const { Readable } = require('stream');
|
||||||
|
|
||||||
|
const server = http2.createServer(common.mustCall((req, res) => {
|
||||||
|
res.setHeader('content-type', 'text/html');
|
||||||
|
const input = new Readable({
|
||||||
|
read() {
|
||||||
|
this.push('test');
|
||||||
|
this.push(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
input.pipe(res);
|
||||||
|
}));
|
||||||
|
|
||||||
|
server.listen(0, common.mustCall(() => {
|
||||||
|
const port = server.address().port;
|
||||||
|
const client = http2.connect(`http://localhost:${port}`);
|
||||||
|
|
||||||
|
const req = client.request();
|
||||||
|
|
||||||
|
req.on('response', common.mustCall((headers) => {
|
||||||
|
assert.strictEqual(headers[':status'], 200);
|
||||||
|
assert.strictEqual(headers['content-type'], 'text/html');
|
||||||
|
}));
|
||||||
|
|
||||||
|
let data = '';
|
||||||
|
|
||||||
|
const notCallClose = common.mustNotCall();
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
req.setEncoding('utf8');
|
||||||
|
req.removeListener('close', notCallClose);
|
||||||
|
req.on('close', common.mustCall(() => {
|
||||||
|
server.close();
|
||||||
|
client.close();
|
||||||
|
}));
|
||||||
|
req.on('data', common.mustCallAtLeast((d) => data += d));
|
||||||
|
req.on('end', common.mustCall(() => {
|
||||||
|
assert.strictEqual(data, 'test');
|
||||||
|
}));
|
||||||
|
}, common.platformTimeout(100));
|
||||||
|
|
||||||
|
req.on('close', notCallClose);
|
||||||
|
}));
|
55
test/parallel/test-http2-short-stream-client-server.js
Normal file
55
test/parallel/test-http2-short-stream-client-server.js
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const assert = require('assert');
|
||||||
|
const http2 = require('http2');
|
||||||
|
const { Readable } = require('stream');
|
||||||
|
|
||||||
|
const server = http2.createServer();
|
||||||
|
server.on('stream', common.mustCall((stream) => {
|
||||||
|
stream.respond({
|
||||||
|
':status': 200,
|
||||||
|
'content-type': 'text/html'
|
||||||
|
});
|
||||||
|
const input = new Readable({
|
||||||
|
read() {
|
||||||
|
this.push('test');
|
||||||
|
this.push(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
input.pipe(stream);
|
||||||
|
}));
|
||||||
|
|
||||||
|
|
||||||
|
server.listen(0, common.mustCall(() => {
|
||||||
|
const port = server.address().port;
|
||||||
|
const client = http2.connect(`http://localhost:${port}`);
|
||||||
|
|
||||||
|
const req = client.request();
|
||||||
|
|
||||||
|
req.on('response', common.mustCall((headers) => {
|
||||||
|
assert.strictEqual(headers[':status'], 200);
|
||||||
|
assert.strictEqual(headers['content-type'], 'text/html');
|
||||||
|
}));
|
||||||
|
|
||||||
|
let data = '';
|
||||||
|
|
||||||
|
const notCallClose = common.mustNotCall();
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
req.setEncoding('utf8');
|
||||||
|
req.removeListener('close', notCallClose);
|
||||||
|
req.on('close', common.mustCall(() => {
|
||||||
|
server.close();
|
||||||
|
client.close();
|
||||||
|
}));
|
||||||
|
req.on('data', common.mustCallAtLeast((d) => data += d));
|
||||||
|
req.on('end', common.mustCall(() => {
|
||||||
|
assert.strictEqual(data, 'test');
|
||||||
|
}));
|
||||||
|
}, common.platformTimeout(100));
|
||||||
|
|
||||||
|
req.on('close', notCallClose);
|
||||||
|
}));
|
Loading…
x
Reference in New Issue
Block a user