http2: fix responses to long payload reqs

When a request with a long payload is received, http2 does
not allow a response that does not process all the incoming
payload. Add a conditional Http2Stream.close call that runs
only if the user hasn't attempted to read the stream.

PR-URL: https://github.com/nodejs/node/pull/20084
Fixes: https://github.com/nodejs/node/issues/20060
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Anatoli Papirovski 2018-04-16 22:03:10 +02:00
parent c51b7b296e
commit b55a11d1b1
No known key found for this signature in database
GPG Key ID: 614E2E1ABEB4B2C0
10 changed files with 279 additions and 105 deletions

View File

@ -206,6 +206,7 @@ const STREAM_FLAGS_CLOSED = 0x2;
const STREAM_FLAGS_HEADERS_SENT = 0x4;
const STREAM_FLAGS_HEAD_REQUEST = 0x8;
const STREAM_FLAGS_ABORTED = 0x10;
const STREAM_FLAGS_HAS_TRAILERS = 0x20;
const SESSION_FLAGS_PENDING = 0x0;
const SESSION_FLAGS_READY = 0x1;
@ -330,26 +331,13 @@ function onStreamClose(code) {
if (stream.destroyed)
return;
const state = stream[kState];
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: closed with code ${code}`);
if (!stream.closed) {
// Clear timeout and remove timeout listeners
stream.setTimeout(0);
stream.removeAllListeners('timeout');
if (!stream.closed)
closeStream(stream, code, false);
// Set the state flags
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;
// Close the writable side of the stream
abort(stream);
stream.end();
}
state.fd = -1;
stream[kState].fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
@ -504,7 +492,7 @@ function requestOnConnect(headers, options) {
// At this point, the stream should have already been destroyed during
// the session.destroy() method. Do nothing else.
if (session.destroyed)
if (session === undefined || session.destroyed)
return;
// If the session was closed while waiting for the connect, destroy
@ -1412,6 +1400,9 @@ class ClientHttp2Session extends Http2Session {
if (options.endStream)
stream.end();
if (options.waitForTrailers)
stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
const onConnect = requestOnConnect.bind(stream, headersList, options);
if (this.connecting) {
this.on('connect', onConnect);
@ -1445,8 +1436,11 @@ function afterDoStreamWrite(status, handle) {
}
function streamOnResume() {
if (!this.destroyed && !this.pending)
if (!this.destroyed && !this.pending) {
if (!this[kState].didRead)
this[kState].didRead = true;
this[kHandle].readStart();
}
}
function streamOnPause() {
@ -1454,16 +1448,6 @@ function streamOnPause() {
this[kHandle].readStop();
}
// If the writable side of the Http2Stream is still open, emit the
// 'aborted' event and set the aborted flag.
function abort(stream) {
if (!stream.aborted &&
!(stream._writableState.ended || stream._writableState.ending)) {
stream[kState].flags |= STREAM_FLAGS_ABORTED;
stream.emit('aborted');
}
}
function afterShutdown() {
this.callback();
const stream = this.handle[kOwner];
@ -1471,6 +1455,51 @@ function afterShutdown() {
stream[kMaybeDestroy]();
}
function closeStream(stream, code, shouldSubmitRstStream = true) {
const state = stream[kState];
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;
// Clear timeout and remove timeout listeners
stream.setTimeout(0);
stream.removeAllListeners('timeout');
const { ending, finished } = stream._writableState;
if (!ending) {
// If the writable side of the Http2Stream is still open, emit the
// 'aborted' event and set the aborted flag.
if (!stream.aborted) {
state.flags |= STREAM_FLAGS_ABORTED;
stream.emit('aborted');
}
// Close the writable side.
stream.end();
}
if (shouldSubmitRstStream) {
const finishFn = finishCloseStream.bind(stream, code);
if (!ending || finished || code !== NGHTTP2_NO_ERROR)
finishFn();
else
stream.once('finish', finishFn);
}
}
function finishCloseStream(code) {
const rstStreamFn = submitRstStream.bind(this, code);
// If the handle has not yet been assigned, queue up the request to
// ensure that the RST_STREAM frame is sent after the stream ID has
// been determined.
if (this.pending) {
this.push(null);
this.once('ready', rstStreamFn);
return;
}
rstStreamFn();
}
// An Http2Stream is a Duplex stream that is backed by a
// node::http2::Http2Stream handle implementing StreamBase.
class Http2Stream extends Duplex {
@ -1490,6 +1519,7 @@ class Http2Stream extends Duplex {
this[kTimeout] = null;
this[kState] = {
didRead: false,
flags: STREAM_FLAGS_PENDING,
rstCode: NGHTTP2_NO_ERROR,
writeQueueSize: 0,
@ -1756,6 +1786,8 @@ class Http2Stream extends Duplex {
throw headersList;
this[kSentTrailers] = headers;
this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;
const ret = this[kHandle].trailers(headersList);
if (ret < 0)
this.destroy(new NghttpError(ret));
@ -1786,38 +1818,13 @@ class Http2Stream extends Duplex {
if (callback !== undefined && typeof callback !== 'function')
throw new ERR_INVALID_CALLBACK();
// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');
// Close the writable
abort(this);
this.end();
if (this.closed)
return;
const state = this[kState];
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;
if (callback !== undefined) {
if (callback !== undefined)
this.once('close', callback);
}
if (this[kHandle] === undefined)
return;
const rstStreamFn = submitRstStream.bind(this, code);
// If the handle has not yet been assigned, queue up the request to
// ensure that the RST_STREAM frame is sent after the stream ID has
// been determined.
if (this.pending) {
this.push(null);
this.once('ready', rstStreamFn);
return;
}
rstStreamFn();
closeStream(this, code);
}
// Called by this.destroy().
@ -1832,26 +1839,19 @@ class Http2Stream extends Duplex {
debug(`Http2Stream ${this[kID] || '<pending>'} [Http2Session ` +
`${sessionName(session[kType])}]: destroying stream`);
const state = this[kState];
const code = state.rstCode =
err != null ?
NGHTTP2_INTERNAL_ERROR :
state.rstCode || NGHTTP2_NO_ERROR;
if (handle !== undefined) {
// If the handle exists, we need to close, then destroy the handle
this.close(code);
if (!this._readableState.ended && !this._readableState.ending)
this.push(null);
const code = err != null ?
NGHTTP2_INTERNAL_ERROR : (state.rstCode || NGHTTP2_NO_ERROR);
const hasHandle = handle !== undefined;
if (!this.closed)
closeStream(this, code, hasHandle);
this.push(null);
if (hasHandle) {
handle.destroy();
session[kState].streams.delete(id);
} else {
// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');
state.flags |= STREAM_FLAGS_CLOSED;
abort(this);
this.end();
this.push(null);
session[kState].pendingStreams.delete(this);
}
@ -1884,13 +1884,23 @@ class Http2Stream extends Duplex {
}
// TODO(mcollina): remove usage of _*State properties
if (this._readableState.ended &&
this._writableState.ended &&
this._writableState.pendingcb === 0 &&
this.closed) {
this.destroy();
// This should return, but eslint complains.
// return
if (this._writableState.ended && this._writableState.pendingcb === 0) {
if (this._readableState.ended && this.closed) {
this.destroy();
return;
}
// We've submitted a response from our server session, have not attempted
// to process any incoming data, and have no trailers. This means we can
// attempt to gracefully close the session.
const state = this[kState];
if (this.headersSent &&
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
!this._readableState.resumeScheduled) {
this.close();
}
}
}
}
@ -2095,7 +2105,6 @@ function afterOpen(session, options, headers, streamOptions, err, fd) {
}
if (this.destroyed || this.closed) {
tryClose(fd);
abort(this);
return;
}
state.fd = fd;
@ -2224,8 +2233,10 @@ class ServerHttp2Stream extends Http2Stream {
if (options.endStream)
streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD;
if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
state.flags |= STREAM_FLAGS_HAS_TRAILERS;
}
headers = processHeaders(headers);
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
@ -2285,8 +2296,10 @@ class ServerHttp2Stream extends Http2Stream {
}
let streamOptions = 0;
if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
}
if (typeof fd !== 'number')
throw new ERR_INVALID_ARG_TYPE('fd', 'number', fd);
@ -2346,8 +2359,10 @@ class ServerHttp2Stream extends Http2Stream {
}
let streamOptions = 0;
if (options.waitForTrailers)
if (options.waitForTrailers) {
streamOptions |= STREAM_OPTION_GET_TRAILERS;
this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
}
const session = this[kSession];
debug(`Http2Stream ${this[kID]} [Http2Session ` +

View File

@ -1364,16 +1364,35 @@ void Http2Session::MaybeScheduleWrite() {
// storage for data and metadata that was associated with these writes.
void Http2Session::ClearOutgoing(int status) {
CHECK_NE(flags_ & SESSION_STATE_SENDING, 0);
flags_ &= ~SESSION_STATE_SENDING;
for (const nghttp2_stream_write& wr : outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr)
wrap->Done(status);
if (outgoing_buffers_.size() > 0) {
outgoing_storage_.clear();
for (const nghttp2_stream_write& wr : outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr)
wrap->Done(status);
}
outgoing_buffers_.clear();
}
outgoing_buffers_.clear();
outgoing_storage_.clear();
flags_ &= ~SESSION_STATE_SENDING;
// Now that we've finished sending queued data, if there are any pending
// RstStreams we should try sending again and then flush them one by one.
if (pending_rst_streams_.size() > 0) {
std::vector<int32_t> current_pending_rst_streams;
pending_rst_streams_.swap(current_pending_rst_streams);
SendPendingData();
for (int32_t stream_id : current_pending_rst_streams) {
Http2Stream* stream = FindStream(stream_id);
if (stream != nullptr)
stream->FlushRstStream();
}
}
}
// Queue a given block of data for sending. This always creates a copy,
@ -1397,18 +1416,19 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) {
// chunk out to the i/o socket to be sent. This is a particularly hot method
// that will generally be called at least twice be event loop iteration.
// This is a potential performance optimization target later.
void Http2Session::SendPendingData() {
// Returns non-zero value if a write is already in progress.
uint8_t Http2Session::SendPendingData() {
DEBUG_HTTP2SESSION(this, "sending pending data");
// Do not attempt to send data on the socket if the destroying flag has
// been set. That means everything is shutting down and the socket
// will not be usable.
if (IsDestroyed())
return;
return 0;
flags_ &= ~SESSION_STATE_WRITE_SCHEDULED;
// SendPendingData should not be called recursively.
if (flags_ & SESSION_STATE_SENDING)
return;
return 1;
// This is cleared by ClearOutgoing().
flags_ |= SESSION_STATE_SENDING;
@ -1432,15 +1452,15 @@ void Http2Session::SendPendingData() {
// does take care of things like closing the individual streams after
// a socket has been torn down, so we still need to call it.
ClearOutgoing(UV_ECANCELED);
return;
return 0;
}
// Part Two: Pass Data to the underlying stream
size_t count = outgoing_buffers_.size();
if (count == 0) {
flags_ &= ~SESSION_STATE_SENDING;
return;
ClearOutgoing(0);
return 0;
}
MaybeStackBuffer<uv_buf_t, 32> bufs;
bufs.AllocateSufficientStorage(count);
@ -1471,6 +1491,8 @@ void Http2Session::SendPendingData() {
DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
nghttp2_session_want_read(session_));
return 0;
}
@ -1830,12 +1852,25 @@ int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
// peer.
void Http2Stream::SubmitRstStream(const uint32_t code) {
CHECK(!this->IsDestroyed());
code_ = code;
// If possible, force a purge of any currently pending data here to make sure
// it is sent before closing the stream. If it returns non-zero then we need
// to wait until the current write finishes and try again to avoid nghttp2
// behaviour where it prioritizes RstStream over everything else.
if (session_->SendPendingData() != 0) {
session_->AddPendingRstStream(id_);
return;
}
FlushRstStream();
}
void Http2Stream::FlushRstStream() {
if (IsDestroyed())
return;
Http2Scope h2scope(this);
// Force a purge of any currently pending data here to make sure
// it is sent before closing the stream.
session_->SendPendingData();
CHECK_EQ(nghttp2_submit_rst_stream(**session_, NGHTTP2_FLAG_NONE,
id_, code), 0);
id_, code_), 0);
}

View File

@ -591,6 +591,8 @@ class Http2Stream : public AsyncWrap,
// Submits an RST_STREAM frame using the given code
void SubmitRstStream(const uint32_t code);
void FlushRstStream();
// Submits a PUSH_PROMISE frame with this stream as the parent.
Http2Stream* SubmitPushPromise(
nghttp2_nv* nva,
@ -797,7 +799,7 @@ class Http2Session : public AsyncWrap, public StreamListener {
bool Ping(v8::Local<v8::Function> function);
void SendPendingData();
uint8_t SendPendingData();
// Submits a new request. If the request is a success, assigned
// will be a pointer to the Http2Stream instance assigned.
@ -845,6 +847,11 @@ class Http2Session : public AsyncWrap, public StreamListener {
size_t self_size() const override { return sizeof(*this); }
// Schedule an RstStream for after the current write finishes.
inline void AddPendingRstStream(int32_t stream_id) {
pending_rst_streams_.emplace_back(stream_id);
}
// Handle reads/writes from the underlying network transport.
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
void OnStreamAfterWrite(WriteWrap* w, int status) override;
@ -1049,6 +1056,7 @@ class Http2Session : public AsyncWrap, public StreamListener {
std::vector<nghttp2_stream_write> outgoing_buffers_;
std::vector<uint8_t> outgoing_storage_;
std::vector<int32_t> pending_rst_streams_;
void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length);
void ClearOutgoing(int status);

BIN
test/fixtures/person-large.jpg vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 137 KiB

View File

@ -124,3 +124,21 @@ const Countdown = require('../common/countdown');
req.on('error', () => {});
}));
}
// test destroy before connect
{
const server = h2.createServer();
server.on('stream', common.mustNotCall());
server.listen(0, common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
server.on('connection', common.mustCall(() => {
server.close();
client.close();
}));
const req = client.request();
req.destroy();
}));
}

View File

@ -63,8 +63,14 @@ server.listen(0, common.mustCall(() => {
message: 'Stream closed with error code NGHTTP2_PROTOCOL_ERROR'
}));
req.on('response', common.mustCall());
req.resume();
// The `response` event should not fire as the server should receive the
// RST_STREAM frame before it ever has a chance to reply.
req.on('response', common.mustNotCall());
// The `end` event should still fire as we close the readable stream by
// pushing a `null` chunk.
req.on('end', common.mustCall());
req.resume();
req.end();
}));

View File

@ -0,0 +1,48 @@
'use strict';
// Verifies that uploading data from a client works
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');
const fs = require('fs');
const fixtures = require('../common/fixtures');
const loc = fixtures.path('person-large.jpg');
assert(fs.existsSync(loc));
fs.readFile(loc, common.mustCall((err, data) => {
assert.ifError(err);
const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
stream.on('close', common.mustCall(() => {
assert.strictEqual(stream.rstCode, 0);
}));
stream.respond({ ':status': 400 });
stream.end();
}));
server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall((headers) => {
assert.strictEqual(headers[':status'], 400);
}));
req.resume();
req.on('end', common.mustCall(() => {
server.close();
client.close();
}));
const str = fs.createReadStream(loc);
str.pipe(req);
}));
}));

View File

@ -11,7 +11,7 @@ const fs = require('fs');
const fixtures = require('../common/fixtures');
const Countdown = require('../common/countdown');
const loc = fixtures.path('person.jpg');
const loc = fixtures.path('person-large.jpg');
let fileData;
assert(fs.existsSync(loc));

View File

@ -0,0 +1,44 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const fixtures = require('../common/fixtures');
const http2 = require('http2');
const content = Buffer.alloc(1e5, 0x44);
const server = http2.createSecureServer({
key: fixtures.readKey('agent1-key.pem'),
cert: fixtures.readKey('agent1-cert.pem')
});
server.on('stream', common.mustCall((stream) => {
stream.respond({
'Content-Type': 'application/octet-stream',
'Content-Length': (content.length.toString() * 2),
'Vary': 'Accept-Encoding'
});
stream.write(content);
stream.write(content);
stream.end();
stream.close();
}));
server.listen(0, common.mustCall(() => {
const client = http2.connect(`https://localhost:${server.address().port}`,
{ rejectUnauthorized: false });
const req = client.request({ ':path': '/' });
req.end();
let receivedBufferLength = 0;
req.on('data', common.mustCallAtLeast((buf) => {
receivedBufferLength += buf.length;
}, 1));
req.on('close', common.mustCall(() => {
assert.strictEqual(receivedBufferLength, content.length * 2);
client.close();
server.close();
}));
}));

View File

@ -26,7 +26,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => {
switch (entry.type) {
case 'server':
assert.strictEqual(entry.streamCount, 1);
assert.strictEqual(entry.framesReceived, 5);
assert(entry.framesReceived >= 3);
break;
case 'client':
assert.strictEqual(entry.streamCount, 1);