http2: fix compat stream read handling, add tests
Handle edge case where stream pause is called between resume being called and actually evaluated. Other minor adjustments to avoid various edge cases around stream events. Add new tests that cover all changes. Fixes: https://github.com/nodejs/node/issues/15491 PR-URL: https://github.com/nodejs/node/pull/15503 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
ebc58d7a22
commit
c705f1067c
@ -97,46 +97,50 @@ function onStreamError(error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
function onRequestPause() {
|
function onRequestPause() {
|
||||||
this[kStream].pause();
|
const stream = this[kStream];
|
||||||
|
if (stream)
|
||||||
|
stream.pause();
|
||||||
}
|
}
|
||||||
|
|
||||||
function onRequestResume() {
|
function onRequestResume() {
|
||||||
this[kStream].resume();
|
const stream = this[kStream];
|
||||||
|
if (stream)
|
||||||
|
stream.resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
function onRequestDrain() {
|
function onStreamDrain() {
|
||||||
if (this.isPaused())
|
|
||||||
this.resume();
|
|
||||||
}
|
|
||||||
|
|
||||||
function onStreamResponseDrain() {
|
|
||||||
this[kResponse].emit('drain');
|
this[kResponse].emit('drain');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO Http2Stream does not emit 'close'
|
||||||
function onStreamClosedRequest() {
|
function onStreamClosedRequest() {
|
||||||
this[kRequest].push(null);
|
this[kRequest].push(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO Http2Stream does not emit 'close'
|
||||||
function onStreamClosedResponse() {
|
function onStreamClosedResponse() {
|
||||||
const res = this[kResponse];
|
this[kResponse].emit('finish');
|
||||||
res.writable = false;
|
|
||||||
res.emit('finish');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function onStreamAbortedRequest(hadError, code) {
|
function onStreamAbortedRequest(hadError, code) {
|
||||||
if ((this.writable) ||
|
const request = this[kRequest];
|
||||||
(this._readableState && !this._readableState.ended)) {
|
if (request[kState].closed === false) {
|
||||||
this.emit('aborted', hadError, code);
|
request.emit('aborted', hadError, code);
|
||||||
this.emit('close');
|
request.emit('close');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function onStreamAbortedResponse() {
|
function onStreamAbortedResponse() {
|
||||||
if (this.writable) {
|
const response = this[kResponse];
|
||||||
this.emit('close');
|
if (response[kState].closed === false) {
|
||||||
|
response.emit('close');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function resumeStream(stream) {
|
||||||
|
stream.resume();
|
||||||
|
}
|
||||||
|
|
||||||
class Http2ServerRequest extends Readable {
|
class Http2ServerRequest extends Readable {
|
||||||
constructor(stream, headers, options, rawHeaders) {
|
constructor(stream, headers, options, rawHeaders) {
|
||||||
super(options);
|
super(options);
|
||||||
@ -158,13 +162,12 @@ class Http2ServerRequest extends Readable {
|
|||||||
stream.on('end', onStreamEnd);
|
stream.on('end', onStreamEnd);
|
||||||
stream.on('error', onStreamError);
|
stream.on('error', onStreamError);
|
||||||
stream.on('close', onStreamClosedRequest);
|
stream.on('close', onStreamClosedRequest);
|
||||||
stream.on('aborted', onStreamAbortedRequest.bind(this));
|
stream.on('aborted', onStreamAbortedRequest);
|
||||||
const onfinish = this[kFinish].bind(this);
|
const onfinish = this[kFinish].bind(this);
|
||||||
stream.on('streamClosed', onfinish);
|
stream.on('streamClosed', onfinish);
|
||||||
stream.on('finish', onfinish);
|
stream.on('finish', onfinish);
|
||||||
this.on('pause', onRequestPause);
|
this.on('pause', onRequestPause);
|
||||||
this.on('resume', onRequestResume);
|
this.on('resume', onRequestResume);
|
||||||
this.on('drain', onRequestDrain);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get closed() {
|
get closed() {
|
||||||
@ -221,7 +224,7 @@ class Http2ServerRequest extends Readable {
|
|||||||
_read(nread) {
|
_read(nread) {
|
||||||
const stream = this[kStream];
|
const stream = this[kStream];
|
||||||
if (stream !== undefined) {
|
if (stream !== undefined) {
|
||||||
stream.resume();
|
process.nextTick(resumeStream, stream);
|
||||||
} else {
|
} else {
|
||||||
this.emit('error', new errors.Error('ERR_HTTP2_STREAM_CLOSED'));
|
this.emit('error', new errors.Error('ERR_HTTP2_STREAM_CLOSED'));
|
||||||
}
|
}
|
||||||
@ -279,9 +282,9 @@ class Http2ServerResponse extends Stream {
|
|||||||
this[kStream] = stream;
|
this[kStream] = stream;
|
||||||
stream[kResponse] = this;
|
stream[kResponse] = this;
|
||||||
this.writable = true;
|
this.writable = true;
|
||||||
stream.on('drain', onStreamResponseDrain);
|
stream.on('drain', onStreamDrain);
|
||||||
stream.on('close', onStreamClosedResponse);
|
stream.on('close', onStreamClosedResponse);
|
||||||
stream.on('aborted', onStreamAbortedResponse.bind(this));
|
stream.on('aborted', onStreamAbortedResponse);
|
||||||
const onfinish = this[kFinish].bind(this);
|
const onfinish = this[kFinish].bind(this);
|
||||||
stream.on('streamClosed', onfinish);
|
stream.on('streamClosed', onfinish);
|
||||||
stream.on('finish', onfinish);
|
stream.on('finish', onfinish);
|
||||||
|
53
test/parallel/test-http2-compat-serverrequest-pause.js
Normal file
53
test/parallel/test-http2-compat-serverrequest-pause.js
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
// Flags: --expose-http2
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const assert = require('assert');
|
||||||
|
const h2 = require('http2');
|
||||||
|
|
||||||
|
// Check that pause & resume work as expected with Http2ServerRequest
|
||||||
|
|
||||||
|
const testStr = 'Request Body from Client';
|
||||||
|
|
||||||
|
const server = h2.createServer();
|
||||||
|
|
||||||
|
server.on('request', common.mustCall((req, res) => {
|
||||||
|
let data = '';
|
||||||
|
req.pause();
|
||||||
|
req.setEncoding('utf8');
|
||||||
|
req.on('data', common.mustCall((chunk) => (data += chunk)));
|
||||||
|
setTimeout(common.mustCall(() => {
|
||||||
|
assert.strictEqual(data, '');
|
||||||
|
req.resume();
|
||||||
|
}), common.platformTimeout(100));
|
||||||
|
req.on('end', common.mustCall(() => {
|
||||||
|
assert.strictEqual(data, testStr);
|
||||||
|
res.end();
|
||||||
|
}));
|
||||||
|
|
||||||
|
// shouldn't throw if underlying Http2Stream no longer exists
|
||||||
|
res.on('finish', common.mustCall(() => process.nextTick(() => {
|
||||||
|
assert.doesNotThrow(() => req.pause());
|
||||||
|
assert.doesNotThrow(() => req.resume());
|
||||||
|
})));
|
||||||
|
}));
|
||||||
|
|
||||||
|
server.listen(0, common.mustCall(() => {
|
||||||
|
const port = server.address().port;
|
||||||
|
|
||||||
|
const client = h2.connect(`http://localhost:${port}`);
|
||||||
|
const request = client.request({
|
||||||
|
':path': '/foobar',
|
||||||
|
':method': 'POST',
|
||||||
|
':scheme': 'http',
|
||||||
|
':authority': `localhost:${port}`
|
||||||
|
});
|
||||||
|
request.resume();
|
||||||
|
request.end(testStr);
|
||||||
|
request.on('end', common.mustCall(function() {
|
||||||
|
client.destroy();
|
||||||
|
server.close();
|
||||||
|
}));
|
||||||
|
}));
|
48
test/parallel/test-http2-compat-serverrequest-pipe.js
Normal file
48
test/parallel/test-http2-compat-serverrequest-pipe.js
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
// Flags: --expose-http2
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const assert = require('assert');
|
||||||
|
const http2 = require('http2');
|
||||||
|
const fs = require('fs');
|
||||||
|
const path = require('path');
|
||||||
|
|
||||||
|
// piping should work as expected with createWriteStream
|
||||||
|
|
||||||
|
const loc = path.join(common.fixturesDir, 'person.jpg');
|
||||||
|
const fn = path.join(common.tmpDir, 'http2pipe.jpg');
|
||||||
|
common.refreshTmpDir();
|
||||||
|
|
||||||
|
const server = http2.createServer();
|
||||||
|
|
||||||
|
server.on('request', common.mustCall((req, res) => {
|
||||||
|
const dest = req.pipe(fs.createWriteStream(fn));
|
||||||
|
dest.on('finish', common.mustCall(() => {
|
||||||
|
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
|
||||||
|
fs.unlinkSync(fn);
|
||||||
|
res.end();
|
||||||
|
}));
|
||||||
|
}));
|
||||||
|
|
||||||
|
server.listen(0, common.mustCall(() => {
|
||||||
|
const port = server.address().port;
|
||||||
|
const client = http2.connect(`http://localhost:${port}`);
|
||||||
|
|
||||||
|
let remaining = 2;
|
||||||
|
function maybeClose() {
|
||||||
|
if (--remaining === 0) {
|
||||||
|
server.close();
|
||||||
|
client.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const req = client.request({ ':method': 'POST' });
|
||||||
|
req.on('response', common.mustCall());
|
||||||
|
req.resume();
|
||||||
|
req.on('end', common.mustCall(maybeClose));
|
||||||
|
const str = fs.createReadStream(loc);
|
||||||
|
str.on('end', common.mustCall(maybeClose));
|
||||||
|
str.pipe(req);
|
||||||
|
}));
|
44
test/parallel/test-http2-compat-serverresponse-drain.js
Normal file
44
test/parallel/test-http2-compat-serverresponse-drain.js
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
// Flags: --expose-http2
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const assert = require('assert');
|
||||||
|
const h2 = require('http2');
|
||||||
|
|
||||||
|
// Check that drain event is passed from Http2Stream
|
||||||
|
|
||||||
|
const testString = 'tests';
|
||||||
|
|
||||||
|
const server = h2.createServer();
|
||||||
|
|
||||||
|
server.on('request', common.mustCall((req, res) => {
|
||||||
|
res.stream._writableState.highWaterMark = testString.length;
|
||||||
|
assert.strictEqual(res.write(testString), false);
|
||||||
|
res.on('drain', common.mustCall(() => res.end(testString)));
|
||||||
|
}));
|
||||||
|
|
||||||
|
server.listen(0, common.mustCall(() => {
|
||||||
|
const port = server.address().port;
|
||||||
|
|
||||||
|
const client = h2.connect(`http://localhost:${port}`);
|
||||||
|
const request = client.request({
|
||||||
|
':path': '/foobar',
|
||||||
|
':method': 'POST',
|
||||||
|
':scheme': 'http',
|
||||||
|
':authority': `localhost:${port}`
|
||||||
|
});
|
||||||
|
request.resume();
|
||||||
|
request.end();
|
||||||
|
|
||||||
|
let data = '';
|
||||||
|
request.setEncoding('utf8');
|
||||||
|
request.on('data', (chunk) => (data += chunk));
|
||||||
|
|
||||||
|
request.on('end', common.mustCall(function() {
|
||||||
|
assert.strictEqual(data, testString.repeat(2));
|
||||||
|
client.destroy();
|
||||||
|
server.close();
|
||||||
|
}));
|
||||||
|
}));
|
Loading…
x
Reference in New Issue
Block a user