diff --git a/lib/internal/http2/compat.js b/lib/internal/http2/compat.js index 2523fb0ba37..462b0c51b88 100644 --- a/lib/internal/http2/compat.js +++ b/lib/internal/http2/compat.js @@ -97,46 +97,50 @@ function onStreamError(error) { } function onRequestPause() { - this[kStream].pause(); + const stream = this[kStream]; + if (stream) + stream.pause(); } function onRequestResume() { - this[kStream].resume(); + const stream = this[kStream]; + if (stream) + stream.resume(); } -function onRequestDrain() { - if (this.isPaused()) - this.resume(); -} - -function onStreamResponseDrain() { +function onStreamDrain() { this[kResponse].emit('drain'); } +// TODO Http2Stream does not emit 'close' function onStreamClosedRequest() { this[kRequest].push(null); } +// TODO Http2Stream does not emit 'close' function onStreamClosedResponse() { - const res = this[kResponse]; - res.writable = false; - res.emit('finish'); + this[kResponse].emit('finish'); } function onStreamAbortedRequest(hadError, code) { - if ((this.writable) || - (this._readableState && !this._readableState.ended)) { - this.emit('aborted', hadError, code); - this.emit('close'); + const request = this[kRequest]; + if (request[kState].closed === false) { + request.emit('aborted', hadError, code); + request.emit('close'); } } function onStreamAbortedResponse() { - if (this.writable) { - this.emit('close'); + const response = this[kResponse]; + if (response[kState].closed === false) { + response.emit('close'); } } +function resumeStream(stream) { + stream.resume(); +} + class Http2ServerRequest extends Readable { constructor(stream, headers, options, rawHeaders) { super(options); @@ -158,13 +162,12 @@ class Http2ServerRequest extends Readable { stream.on('end', onStreamEnd); stream.on('error', onStreamError); stream.on('close', onStreamClosedRequest); - stream.on('aborted', onStreamAbortedRequest.bind(this)); + stream.on('aborted', onStreamAbortedRequest); const onfinish = this[kFinish].bind(this); stream.on('streamClosed', onfinish); stream.on('finish', onfinish); this.on('pause', onRequestPause); this.on('resume', onRequestResume); - this.on('drain', onRequestDrain); } get closed() { @@ -221,7 +224,7 @@ class Http2ServerRequest extends Readable { _read(nread) { const stream = this[kStream]; if (stream !== undefined) { - stream.resume(); + process.nextTick(resumeStream, stream); } else { this.emit('error', new errors.Error('ERR_HTTP2_STREAM_CLOSED')); } @@ -279,9 +282,9 @@ class Http2ServerResponse extends Stream { this[kStream] = stream; stream[kResponse] = this; this.writable = true; - stream.on('drain', onStreamResponseDrain); + stream.on('drain', onStreamDrain); stream.on('close', onStreamClosedResponse); - stream.on('aborted', onStreamAbortedResponse.bind(this)); + stream.on('aborted', onStreamAbortedResponse); const onfinish = this[kFinish].bind(this); stream.on('streamClosed', onfinish); stream.on('finish', onfinish); diff --git a/test/parallel/test-http2-compat-serverrequest-pause.js b/test/parallel/test-http2-compat-serverrequest-pause.js new file mode 100644 index 00000000000..60241815727 --- /dev/null +++ b/test/parallel/test-http2-compat-serverrequest-pause.js @@ -0,0 +1,53 @@ +// Flags: --expose-http2 +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const h2 = require('http2'); + +// Check that pause & resume work as expected with Http2ServerRequest + +const testStr = 'Request Body from Client'; + +const server = h2.createServer(); + +server.on('request', common.mustCall((req, res) => { + let data = ''; + req.pause(); + req.setEncoding('utf8'); + req.on('data', common.mustCall((chunk) => (data += chunk))); + setTimeout(common.mustCall(() => { + assert.strictEqual(data, ''); + req.resume(); + }), common.platformTimeout(100)); + req.on('end', common.mustCall(() => { + assert.strictEqual(data, testStr); + res.end(); + })); + + // shouldn't throw if underlying Http2Stream no longer exists + res.on('finish', common.mustCall(() => process.nextTick(() => { + assert.doesNotThrow(() => req.pause()); + assert.doesNotThrow(() => req.resume()); + }))); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + + const client = h2.connect(`http://localhost:${port}`); + const request = client.request({ + ':path': '/foobar', + ':method': 'POST', + ':scheme': 'http', + ':authority': `localhost:${port}` + }); + request.resume(); + request.end(testStr); + request.on('end', common.mustCall(function() { + client.destroy(); + server.close(); + })); +})); diff --git a/test/parallel/test-http2-compat-serverrequest-pipe.js b/test/parallel/test-http2-compat-serverrequest-pipe.js new file mode 100644 index 00000000000..b952372841e --- /dev/null +++ b/test/parallel/test-http2-compat-serverrequest-pipe.js @@ -0,0 +1,48 @@ +// Flags: --expose-http2 +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const fs = require('fs'); +const path = require('path'); + +// piping should work as expected with createWriteStream + +const loc = path.join(common.fixturesDir, 'person.jpg'); +const fn = path.join(common.tmpDir, 'http2pipe.jpg'); +common.refreshTmpDir(); + +const server = http2.createServer(); + +server.on('request', common.mustCall((req, res) => { + const dest = req.pipe(fs.createWriteStream(fn)); + dest.on('finish', common.mustCall(() => { + assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn)); + fs.unlinkSync(fn); + res.end(); + })); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + let remaining = 2; + function maybeClose() { + if (--remaining === 0) { + server.close(); + client.destroy(); + } + } + + const req = client.request({ ':method': 'POST' }); + req.on('response', common.mustCall()); + req.resume(); + req.on('end', common.mustCall(maybeClose)); + const str = fs.createReadStream(loc); + str.on('end', common.mustCall(maybeClose)); + str.pipe(req); +})); diff --git a/test/parallel/test-http2-compat-serverresponse-drain.js b/test/parallel/test-http2-compat-serverresponse-drain.js new file mode 100644 index 00000000000..312b7c75ab6 --- /dev/null +++ b/test/parallel/test-http2-compat-serverresponse-drain.js @@ -0,0 +1,44 @@ +// Flags: --expose-http2 +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const h2 = require('http2'); + +// Check that drain event is passed from Http2Stream + +const testString = 'tests'; + +const server = h2.createServer(); + +server.on('request', common.mustCall((req, res) => { + res.stream._writableState.highWaterMark = testString.length; + assert.strictEqual(res.write(testString), false); + res.on('drain', common.mustCall(() => res.end(testString))); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + + const client = h2.connect(`http://localhost:${port}`); + const request = client.request({ + ':path': '/foobar', + ':method': 'POST', + ':scheme': 'http', + ':authority': `localhost:${port}` + }); + request.resume(); + request.end(); + + let data = ''; + request.setEncoding('utf8'); + request.on('data', (chunk) => (data += chunk)); + + request.on('end', common.mustCall(function() { + assert.strictEqual(data, testString.repeat(2)); + client.destroy(); + server.close(); + })); +}));