http2: use session not socket timeout, tests
Change default timeout to be tracked on the session instead of the socket, as nghttp2 manages the socket and we would need to maintain two sets of timeouts for similar purpose. Also fixes session setTimeout to work as it wasn't getting _unrefActive correctly (was called on the handle). Fixes: https://github.com/nodejs/node/issues/15158 PR-URL: https://github.com/nodejs/node/pull/15188 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
641646463d
commit
46133b5beb
@ -150,8 +150,8 @@ function emit() {
|
|||||||
// event. If the stream is not new, emit the 'headers' event to pass
|
// event. If the stream is not new, emit the 'headers' event to pass
|
||||||
// the block of headers on.
|
// the block of headers on.
|
||||||
function onSessionHeaders(id, cat, flags, headers) {
|
function onSessionHeaders(id, cat, flags, headers) {
|
||||||
_unrefActive(this);
|
|
||||||
const owner = this[kOwner];
|
const owner = this[kOwner];
|
||||||
|
_unrefActive(owner);
|
||||||
debug(`[${sessionName(owner[kType])}] headers were received on ` +
|
debug(`[${sessionName(owner[kType])}] headers were received on ` +
|
||||||
`stream ${id}: ${cat}`);
|
`stream ${id}: ${cat}`);
|
||||||
const streams = owner[kState].streams;
|
const streams = owner[kState].streams;
|
||||||
@ -265,7 +265,7 @@ function onSessionStreamClose(id, code) {
|
|||||||
const stream = owner[kState].streams.get(id);
|
const stream = owner[kState].streams.get(id);
|
||||||
if (stream === undefined)
|
if (stream === undefined)
|
||||||
return;
|
return;
|
||||||
_unrefActive(this);
|
_unrefActive(owner);
|
||||||
// Set the rst state for the stream
|
// Set the rst state for the stream
|
||||||
abort(stream);
|
abort(stream);
|
||||||
const state = stream[kState];
|
const state = stream[kState];
|
||||||
@ -287,14 +287,16 @@ function afterFDClose(err) {
|
|||||||
|
|
||||||
// Called when an error event needs to be triggered
|
// Called when an error event needs to be triggered
|
||||||
function onSessionError(error) {
|
function onSessionError(error) {
|
||||||
_unrefActive(this);
|
const owner = this[kOwner];
|
||||||
process.nextTick(() => this[kOwner].emit('error', error));
|
_unrefActive(owner);
|
||||||
|
process.nextTick(() => owner.emit('error', error));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Receives a chunk of data for a given stream and forwards it on
|
// Receives a chunk of data for a given stream and forwards it on
|
||||||
// to the Http2Stream Duplex for processing.
|
// to the Http2Stream Duplex for processing.
|
||||||
function onSessionRead(nread, buf, handle) {
|
function onSessionRead(nread, buf, handle) {
|
||||||
const streams = this[kOwner][kState].streams;
|
const owner = this[kOwner];
|
||||||
|
const streams = owner[kState].streams;
|
||||||
const id = handle.id;
|
const id = handle.id;
|
||||||
const stream = streams.get(id);
|
const stream = streams.get(id);
|
||||||
// It should not be possible for the stream to not exist at this point.
|
// It should not be possible for the stream to not exist at this point.
|
||||||
@ -303,7 +305,7 @@ function onSessionRead(nread, buf, handle) {
|
|||||||
'Internal HTTP/2 Failure. Stream does not exist. Please ' +
|
'Internal HTTP/2 Failure. Stream does not exist. Please ' +
|
||||||
'report this as a bug in Node.js');
|
'report this as a bug in Node.js');
|
||||||
const state = stream[kState];
|
const state = stream[kState];
|
||||||
_unrefActive(this); // Reset the session timeout timer
|
_unrefActive(owner); // Reset the session timeout timer
|
||||||
_unrefActive(stream); // Reset the stream timeout timer
|
_unrefActive(stream); // Reset the stream timeout timer
|
||||||
|
|
||||||
if (nread >= 0 && !stream.destroyed) {
|
if (nread >= 0 && !stream.destroyed) {
|
||||||
@ -322,7 +324,7 @@ function onSessionRead(nread, buf, handle) {
|
|||||||
function onSettings(ack) {
|
function onSettings(ack) {
|
||||||
const owner = this[kOwner];
|
const owner = this[kOwner];
|
||||||
debug(`[${sessionName(owner[kType])}] new settings received`);
|
debug(`[${sessionName(owner[kType])}] new settings received`);
|
||||||
_unrefActive(this);
|
_unrefActive(owner);
|
||||||
let event = 'remoteSettings';
|
let event = 'remoteSettings';
|
||||||
if (ack) {
|
if (ack) {
|
||||||
if (owner[kState].pendingAck > 0)
|
if (owner[kState].pendingAck > 0)
|
||||||
@ -348,7 +350,7 @@ function onPriority(id, parent, weight, exclusive) {
|
|||||||
debug(`[${sessionName(owner[kType])}] priority advisement for stream ` +
|
debug(`[${sessionName(owner[kType])}] priority advisement for stream ` +
|
||||||
`${id}: \n parent: ${parent},\n weight: ${weight},\n` +
|
`${id}: \n parent: ${parent},\n weight: ${weight},\n` +
|
||||||
` exclusive: ${exclusive}`);
|
` exclusive: ${exclusive}`);
|
||||||
_unrefActive(this);
|
_unrefActive(owner);
|
||||||
const streams = owner[kState].streams;
|
const streams = owner[kState].streams;
|
||||||
const stream = streams.get(id);
|
const stream = streams.get(id);
|
||||||
const emitter = stream === undefined ? owner : stream;
|
const emitter = stream === undefined ? owner : stream;
|
||||||
@ -370,7 +372,7 @@ function onFrameError(id, type, code) {
|
|||||||
const owner = this[kOwner];
|
const owner = this[kOwner];
|
||||||
debug(`[${sessionName(owner[kType])}] error sending frame type ` +
|
debug(`[${sessionName(owner[kType])}] error sending frame type ` +
|
||||||
`${type} on stream ${id}, code: ${code}`);
|
`${type} on stream ${id}, code: ${code}`);
|
||||||
_unrefActive(this);
|
_unrefActive(owner);
|
||||||
const streams = owner[kState].streams;
|
const streams = owner[kState].streams;
|
||||||
const stream = streams.get(id);
|
const stream = streams.get(id);
|
||||||
const emitter = stream !== undefined ? stream : owner;
|
const emitter = stream !== undefined ? stream : owner;
|
||||||
@ -975,7 +977,7 @@ class Http2Session extends EventEmitter {
|
|||||||
state.destroying = true;
|
state.destroying = true;
|
||||||
|
|
||||||
// Unenroll the timer
|
// Unenroll the timer
|
||||||
this.setTimeout(0);
|
this.setTimeout(0, sessionOnTimeout);
|
||||||
|
|
||||||
// Shut down any still open streams
|
// Shut down any still open streams
|
||||||
const streams = state.streams;
|
const streams = state.streams;
|
||||||
@ -2185,7 +2187,6 @@ function socketDestroy(error) {
|
|||||||
const type = this[kSession][kType];
|
const type = this[kSession][kType];
|
||||||
debug(`[${sessionName(type)}] socket destroy called`);
|
debug(`[${sessionName(type)}] socket destroy called`);
|
||||||
delete this[kServer];
|
delete this[kServer];
|
||||||
this.setTimeout(0);
|
|
||||||
// destroy the session first so that it will stop trying to
|
// destroy the session first so that it will stop trying to
|
||||||
// send data while we close the socket.
|
// send data while we close the socket.
|
||||||
this[kSession].destroy();
|
this[kSession].destroy();
|
||||||
@ -2247,31 +2248,6 @@ function socketOnError(error) {
|
|||||||
this.destroy(error);
|
this.destroy(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
// When the socket times out on the server, attempt a graceful shutdown
|
|
||||||
// of the session.
|
|
||||||
function socketOnTimeout() {
|
|
||||||
debug('socket timeout');
|
|
||||||
process.nextTick(() => {
|
|
||||||
const server = this[kServer];
|
|
||||||
const session = this[kSession];
|
|
||||||
// If server or session are undefined, or session.destroyed is true
|
|
||||||
// then we're already in the process of shutting down, do nothing.
|
|
||||||
if (server === undefined || session === undefined)
|
|
||||||
return;
|
|
||||||
const state = session[kState];
|
|
||||||
if (state.destroyed || state.destroying)
|
|
||||||
return;
|
|
||||||
if (!server.emit('timeout', session, this)) {
|
|
||||||
session.shutdown(
|
|
||||||
{
|
|
||||||
graceful: true,
|
|
||||||
errorCode: NGHTTP2_NO_ERROR
|
|
||||||
},
|
|
||||||
this.destroy.bind(this));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handles the on('stream') event for a session and forwards
|
// Handles the on('stream') event for a session and forwards
|
||||||
// it on to the server object.
|
// it on to the server object.
|
||||||
function sessionOnStream(stream, headers, flags, rawHeaders) {
|
function sessionOnStream(stream, headers, flags, rawHeaders) {
|
||||||
@ -2289,15 +2265,34 @@ function sessionOnSocketError(error, socket) {
|
|||||||
this[kServer].emit('socketError', error, socket, this);
|
this[kServer].emit('socketError', error, socket, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// When the session times out on the server, attempt a graceful shutdown
|
||||||
|
function sessionOnTimeout() {
|
||||||
|
debug('session timeout');
|
||||||
|
process.nextTick(() => {
|
||||||
|
// if destroyed or destryoing, do nothing
|
||||||
|
if (this[kState].destroyed || this[kState].destroying)
|
||||||
|
return;
|
||||||
|
const server = this[kServer];
|
||||||
|
const socket = this[kSocket];
|
||||||
|
// If server or socket are undefined, then we're already in the process of
|
||||||
|
// shutting down, do nothing.
|
||||||
|
if (server === undefined || socket === undefined)
|
||||||
|
return;
|
||||||
|
if (!server.emit('timeout', this)) {
|
||||||
|
this.shutdown(
|
||||||
|
{
|
||||||
|
graceful: true,
|
||||||
|
errorCode: NGHTTP2_NO_ERROR
|
||||||
|
},
|
||||||
|
socket.destroy.bind(socket));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
function connectionListener(socket) {
|
function connectionListener(socket) {
|
||||||
debug('[server] received a connection');
|
debug('[server] received a connection');
|
||||||
const options = this[kOptions] || {};
|
const options = this[kOptions] || {};
|
||||||
|
|
||||||
if (this.timeout) {
|
|
||||||
socket.setTimeout(this.timeout);
|
|
||||||
socket.on('timeout', socketOnTimeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (socket.alpnProtocol === false || socket.alpnProtocol === 'http/1.1') {
|
if (socket.alpnProtocol === false || socket.alpnProtocol === 'http/1.1') {
|
||||||
if (options.allowHTTP1 === true) {
|
if (options.allowHTTP1 === true) {
|
||||||
// Fallback to HTTP/1.1
|
// Fallback to HTTP/1.1
|
||||||
@ -2325,6 +2320,11 @@ function connectionListener(socket) {
|
|||||||
session.on('priority', sessionOnPriority);
|
session.on('priority', sessionOnPriority);
|
||||||
session.on('socketError', sessionOnSocketError);
|
session.on('socketError', sessionOnSocketError);
|
||||||
|
|
||||||
|
if (this.timeout) {
|
||||||
|
session.setTimeout(this.timeout);
|
||||||
|
session.on('timeout', sessionOnTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
socket[kServer] = this;
|
socket[kServer] = this;
|
||||||
|
|
||||||
process.nextTick(emit.bind(this, 'session', session));
|
process.nextTick(emit.bind(this, 'session', session));
|
||||||
|
41
test/parallel/test-http2-session-timeout.js
Normal file
41
test/parallel/test-http2-session-timeout.js
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
// Flags: --expose-http2
|
||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
if (!common.hasCrypto)
|
||||||
|
common.skip('missing crypto');
|
||||||
|
const h2 = require('http2');
|
||||||
|
|
||||||
|
const serverTimeout = common.platformTimeout(200);
|
||||||
|
const callTimeout = common.platformTimeout(10);
|
||||||
|
|
||||||
|
const server = h2.createServer();
|
||||||
|
server.timeout = serverTimeout;
|
||||||
|
|
||||||
|
server.on('request', (req, res) => res.end());
|
||||||
|
server.on('timeout', common.mustNotCall());
|
||||||
|
|
||||||
|
server.listen(0, common.mustCall(() => {
|
||||||
|
const port = server.address().port;
|
||||||
|
|
||||||
|
const url = `http://localhost:${port}`;
|
||||||
|
const client = h2.connect(url);
|
||||||
|
makeReq(40);
|
||||||
|
|
||||||
|
function makeReq(attempts) {
|
||||||
|
const request = client.request({
|
||||||
|
':path': '/foobar',
|
||||||
|
':method': 'GET',
|
||||||
|
':scheme': 'http',
|
||||||
|
':authority': `localhost:${port}`,
|
||||||
|
});
|
||||||
|
request.end();
|
||||||
|
|
||||||
|
if (attempts) {
|
||||||
|
setTimeout(() => makeReq(attempts - 1), callTimeout);
|
||||||
|
} else {
|
||||||
|
server.close();
|
||||||
|
client.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}));
|
Loading…
x
Reference in New Issue
Block a user