http2: improve http2 code a bit

Multiple general improvements to http2 internals for
readability and efficiency

PR-URL: https://github.com/nodejs/node/pull/23984
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
Reviewed-By: Ujjwal Sharma <usharma1998@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell 2018-10-30 16:17:33 -07:00 committed by Rich Trott
parent ab2778d763
commit 7825045ee6
5 changed files with 209 additions and 191 deletions

View File

@ -39,8 +39,7 @@ function main({ n, nheaders }) {
function doRequest(remaining) { function doRequest(remaining) {
const req = client.request(headersObject); const req = client.request(headersObject);
req.end(); req.resume();
req.on('data', () => {});
req.on('end', () => { req.on('end', () => {
if (remaining > 0) { if (remaining > 0) {
doRequest(remaining - 1); doRequest(remaining - 1);

View File

@ -7,9 +7,9 @@ const fs = require('fs');
const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html'); const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html');
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
requests: [100, 1000, 10000, 100000], requests: [100, 1000, 5000],
streams: [100, 200, 1000], streams: [1, 10, 20, 40, 100, 200],
clients: [1, 2], clients: [2],
benchmarker: ['h2load'] benchmarker: ['h2load']
}, { flags: ['--no-warnings'] }); }, { flags: ['--no-warnings'] });

View File

@ -6,9 +6,9 @@ const fs = require('fs');
const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html'); const file = path.join(path.resolve(__dirname, '../fixtures'), 'alice.html');
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
requests: [100, 1000, 10000, 100000], requests: [100, 1000, 5000],
streams: [100, 200, 1000], streams: [1, 10, 20, 40, 100, 200],
clients: [1, 2], clients: [2],
benchmarker: ['h2load'] benchmarker: ['h2load']
}, { flags: ['--no-warnings'] }); }, { flags: ['--no-warnings'] });

View File

@ -430,14 +430,20 @@ function mapToHeaders(map,
let count = 0; let count = 0;
const keys = Object.keys(map); const keys = Object.keys(map);
const singles = new Set(); const singles = new Set();
for (var i = 0; i < keys.length; i++) { let i;
let key = keys[i]; let isArray;
let value = map[key]; let key;
let value;
let isSingleValueHeader;
let err;
for (i = 0; i < keys.length; i++) {
key = keys[i];
value = map[key];
if (value === undefined || key === '') if (value === undefined || key === '')
continue; continue;
key = key.toLowerCase(); key = key.toLowerCase();
const isSingleValueHeader = kSingleValueHeaders.has(key); isSingleValueHeader = kSingleValueHeaders.has(key);
let isArray = Array.isArray(value); isArray = Array.isArray(value);
if (isArray) { if (isArray) {
switch (value.length) { switch (value.length) {
case 0: case 0:
@ -459,12 +465,13 @@ function mapToHeaders(map,
singles.add(key); singles.add(key);
} }
if (key[0] === ':') { if (key[0] === ':') {
const err = assertValuePseudoHeader(key); err = assertValuePseudoHeader(key);
if (err !== undefined) if (err !== undefined)
return err; return err;
ret = `${key}\0${value}\0${ret}`; ret = `${key}\0${value}\0${ret}`;
count++; count++;
} else { continue;
}
if (isIllegalConnectionSpecificHeader(key, value)) { if (isIllegalConnectionSpecificHeader(key, value)) {
return new ERR_HTTP2_INVALID_CONNECTION_HEADERS(key); return new ERR_HTTP2_INVALID_CONNECTION_HEADERS(key);
} }
@ -474,12 +481,11 @@ function mapToHeaders(map,
ret += `${key}\0${val}\0`; ret += `${key}\0${val}\0`;
} }
count += value.length; count += value.length;
} else { continue;
}
ret += `${key}\0${value}\0`; ret += `${key}\0${value}\0`;
count++; count++;
} }
}
}
return [ret, count]; return [ret, count];
} }

View File

@ -911,8 +911,10 @@ int Http2Session::OnBeginHeadersCallback(nghttp2_session* handle,
Debug(session, "beginning headers for stream %d", id); Debug(session, "beginning headers for stream %d", id);
Http2Stream* stream = session->FindStream(id); Http2Stream* stream = session->FindStream(id);
if (stream == nullptr) { // The common case is that we're creating a new stream. The less likely
if (!session->CanAddStream()) { // case is that we're receiving a set of trailers
if (LIKELY(stream == nullptr)) {
if (UNLIKELY(!session->CanAddStream())) {
// Too many concurrent streams being opened // Too many concurrent streams being opened
nghttp2_submit_rst_stream(**session, NGHTTP2_FLAG_NONE, id, nghttp2_submit_rst_stream(**session, NGHTTP2_FLAG_NONE, id,
NGHTTP2_ENHANCE_YOUR_CALM); NGHTTP2_ENHANCE_YOUR_CALM);
@ -940,7 +942,7 @@ int Http2Session::OnHeaderCallback(nghttp2_session* handle,
// If stream is null at this point, either something odd has happened // If stream is null at this point, either something odd has happened
// or the stream was closed locally while header processing was occurring. // or the stream was closed locally while header processing was occurring.
// either way, do not proceed and close the stream. // either way, do not proceed and close the stream.
if (stream == nullptr) if (UNLIKELY(stream == nullptr))
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
// If the stream has already been destroyed, ignore. // If the stream has already been destroyed, ignore.
@ -955,7 +957,7 @@ int Http2Session::OnHeaderCallback(nghttp2_session* handle,
// Called by nghttp2 when a complete HTTP2 frame has been received. There are // Called by nghttp2 when a complete HTTP2 frame has been received. There are
// only a handful of frame types tha we care about handling here. // only a handful of frame types that we care about handling here.
int Http2Session::OnFrameReceive(nghttp2_session* handle, int Http2Session::OnFrameReceive(nghttp2_session* handle,
const nghttp2_frame* frame, const nghttp2_frame* frame,
void* user_data) { void* user_data) {
@ -1032,10 +1034,14 @@ int Http2Session::OnFrameNotSent(nghttp2_session* handle,
Environment* env = session->env(); Environment* env = session->env();
Debug(session, "frame type %d was not sent, code: %d", Debug(session, "frame type %d was not sent, code: %d",
frame->hd.type, error_code); frame->hd.type, error_code);
// Do not report if the frame was not sent due to the session closing // Do not report if the frame was not sent due to the session closing
if (error_code != NGHTTP2_ERR_SESSION_CLOSING && if (error_code == NGHTTP2_ERR_SESSION_CLOSING ||
error_code != NGHTTP2_ERR_STREAM_CLOSED && error_code == NGHTTP2_ERR_STREAM_CLOSED ||
error_code != NGHTTP2_ERR_STREAM_CLOSING) { error_code == NGHTTP2_ERR_STREAM_CLOSING) {
return 0;
}
Isolate* isolate = env->isolate(); Isolate* isolate = env->isolate();
HandleScope scope(isolate); HandleScope scope(isolate);
Local<Context> context = env->context(); Local<Context> context = env->context();
@ -1047,7 +1053,6 @@ int Http2Session::OnFrameNotSent(nghttp2_session* handle,
Integer::New(isolate, error_code) Integer::New(isolate, error_code)
}; };
session->MakeCallback(env->onframeerror_string(), arraysize(argv), argv); session->MakeCallback(env->onframeerror_string(), arraysize(argv), argv);
}
return 0; return 0;
} }
@ -1074,7 +1079,9 @@ int Http2Session::OnStreamClose(nghttp2_session* handle,
Http2Stream* stream = session->FindStream(id); Http2Stream* stream = session->FindStream(id);
// Intentionally ignore the callback if the stream does not exist or has // Intentionally ignore the callback if the stream does not exist or has
// already been destroyed // already been destroyed
if (stream != nullptr && !stream->IsDestroyed()) { if (stream == nullptr || stream->IsDestroyed())
return 0;
stream->Close(code); stream->Close(code);
// It is possible for the stream close to occur before the stream is // It is possible for the stream close to occur before the stream is
// ever passed on to the javascript side. If that happens, skip straight // ever passed on to the javascript side. If that happens, skip straight
@ -1084,15 +1091,14 @@ int Http2Session::OnStreamClose(nghttp2_session* handle,
Local<Value> fn = Local<Value> fn =
stream->object()->Get(context, env->onstreamclose_string()) stream->object()->Get(context, env->onstreamclose_string())
.ToLocalChecked(); .ToLocalChecked();
if (fn->IsFunction()) {
Local<Value> argv[] = { if (!fn->IsFunction()) {
Integer::NewFromUnsigned(isolate, code)
};
stream->MakeCallback(fn.As<Function>(), arraysize(argv), argv);
} else {
stream->Destroy(); stream->Destroy();
return 0;
} }
}
Local<Value> arg = Integer::NewFromUnsigned(isolate, code);
stream->MakeCallback(fn.As<Function>(), 1, &arg);
return 0; return 0;
} }
@ -1125,9 +1131,12 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
"%d, flags: %d", id, len, flags); "%d, flags: %d", id, len, flags);
Environment* env = session->env(); Environment* env = session->env();
HandleScope scope(env->isolate()); HandleScope scope(env->isolate());
// We should never actually get a 0-length chunk so this check is // We should never actually get a 0-length chunk so this check is
// only a precaution at this point. // only a precaution at this point.
if (len > 0) { if (len == 0)
return 0;
// Notify nghttp2 that we've consumed a chunk of data on the connection // Notify nghttp2 that we've consumed a chunk of data on the connection
// so that it can send a WINDOW_UPDATE frame. This is a critical part of // so that it can send a WINDOW_UPDATE frame. This is a critical part of
// the flow control process in http2 // the flow control process in http2
@ -1171,7 +1180,7 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
else else
stream->inbound_consumed_data_while_paused_ += avail; stream->inbound_consumed_data_while_paused_ += avail;
} while (len != 0); } while (len != 0);
}
return 0; return 0;
} }
@ -1430,7 +1439,7 @@ void Http2Session::HandleOriginFrame(const nghttp2_frame* frame) {
nghttp2_extension ext = frame->ext; nghttp2_extension ext = frame->ext;
nghttp2_ext_origin* origin = static_cast<nghttp2_ext_origin*>(ext.payload); nghttp2_ext_origin* origin = static_cast<nghttp2_ext_origin*>(ext.payload);
Local<Array> holder = Array::New(isolate); Local<Value> holder = Array::New(isolate);
Local<Function> fn = env()->push_values_to_array_function(); Local<Function> fn = env()->push_values_to_array_function();
Local<Value> argv[NODE_PUSH_VAL_TO_ARRAY_MAX]; Local<Value> argv[NODE_PUSH_VAL_TO_ARRAY_MAX];
@ -1449,9 +1458,7 @@ void Http2Session::HandleOriginFrame(const nghttp2_frame* frame) {
fn->Call(context, holder, j, argv).ToLocalChecked(); fn->Call(context, holder, j, argv).ToLocalChecked();
} }
Local<Value> args[1] = { holder }; MakeCallback(env()->onorigin_string(), 1, &holder);
MakeCallback(env()->onorigin_string(), arraysize(args), args);
} }
// Called by OnFrameReceived when a complete PING frame has been received. // Called by OnFrameReceived when a complete PING frame has been received.
@ -1464,9 +1471,8 @@ void Http2Session::HandlePingFrame(const nghttp2_frame* frame) {
bool ack = frame->hd.flags & NGHTTP2_FLAG_ACK; bool ack = frame->hd.flags & NGHTTP2_FLAG_ACK;
if (ack) { if (ack) {
Http2Ping* ping = PopPing(); Http2Ping* ping = PopPing();
if (ping != nullptr) {
ping->Done(true, frame->ping.opaque_data); if (ping == nullptr) {
} else {
// PING Ack is unsolicited. Treat as a connection error. The HTTP/2 // PING Ack is unsolicited. Treat as a connection error. The HTTP/2
// spec does not require this, but there is no legitimate reason to // spec does not require this, but there is no legitimate reason to
// receive an unsolicited PING ack on a connection. Either the peer // receive an unsolicited PING ack on a connection. Either the peer
@ -1474,26 +1480,36 @@ void Http2Session::HandlePingFrame(const nghttp2_frame* frame) {
// nonsense. // nonsense.
arg = Integer::New(isolate, NGHTTP2_ERR_PROTO); arg = Integer::New(isolate, NGHTTP2_ERR_PROTO);
MakeCallback(env()->error_string(), 1, &arg); MakeCallback(env()->error_string(), 1, &arg);
return;
} }
} else {
ping->Done(true, frame->ping.opaque_data);
return;
}
// Notify the session that a ping occurred // Notify the session that a ping occurred
arg = Buffer::Copy(env(), arg = Buffer::Copy(env(),
reinterpret_cast<const char*>(frame->ping.opaque_data), reinterpret_cast<const char*>(frame->ping.opaque_data),
8).ToLocalChecked(); 8).ToLocalChecked();
MakeCallback(env()->onping_string(), 1, &arg); MakeCallback(env()->onping_string(), 1, &arg);
} }
}
// Called by OnFrameReceived when a complete SETTINGS frame has been received. // Called by OnFrameReceived when a complete SETTINGS frame has been received.
void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
bool ack = frame->hd.flags & NGHTTP2_FLAG_ACK; bool ack = frame->hd.flags & NGHTTP2_FLAG_ACK;
if (ack) { if (!ack) {
// This is not a SETTINGS acknowledgement, notify and return
MakeCallback(env()->onsettings_string(), 0, nullptr);
return;
}
// If this is an acknowledgement, we should have an Http2Settings // If this is an acknowledgement, we should have an Http2Settings
// object for it. // object for it.
Http2Settings* settings = PopSettings(); Http2Settings* settings = PopSettings();
if (settings != nullptr) { if (settings != nullptr) {
settings->Done(true); settings->Done(true);
} else { return;
}
// SETTINGS Ack is unsolicited. Treat as a connection error. The HTTP/2 // SETTINGS Ack is unsolicited. Treat as a connection error. The HTTP/2
// spec does not require this, but there is no legitimate reason to // spec does not require this, but there is no legitimate reason to
// receive an unsolicited SETTINGS ack on a connection. Either the peer // receive an unsolicited SETTINGS ack on a connection. Either the peer
@ -1510,11 +1526,6 @@ void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
Local<Value> arg = Integer::New(isolate, NGHTTP2_ERR_PROTO); Local<Value> arg = Integer::New(isolate, NGHTTP2_ERR_PROTO);
MakeCallback(env()->error_string(), 1, &arg); MakeCallback(env()->error_string(), 1, &arg);
} }
} else {
// Otherwise, notify the session about a new settings
MakeCallback(env()->onsettings_string(), 0, nullptr);
}
}
// Callback used when data has been written to the stream. // Callback used when data has been written to the stream.
void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) { void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
@ -1535,7 +1546,10 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
// queue), but only if a write has not already been scheduled. // queue), but only if a write has not already been scheduled.
void Http2Session::MaybeScheduleWrite() { void Http2Session::MaybeScheduleWrite() {
CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0); CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0);
if (session_ != nullptr && nghttp2_session_want_write(session_)) { if (UNLIKELY(session_ == nullptr))
return;
if (nghttp2_session_want_write(session_)) {
HandleScope handle_scope(env()->isolate()); HandleScope handle_scope(env()->isolate());
Debug(this, "scheduling write"); Debug(this, "scheduling write");
flags_ |= SESSION_STATE_WRITE_SCHEDULED; flags_ |= SESSION_STATE_WRITE_SCHEDULED;
@ -1594,7 +1608,7 @@ void Http2Session::ClearOutgoing(int status) {
for (int32_t stream_id : current_pending_rst_streams) { for (int32_t stream_id : current_pending_rst_streams) {
Http2Stream* stream = FindStream(stream_id); Http2Stream* stream = FindStream(stream_id);
if (stream != nullptr) if (LIKELY(stream != nullptr))
stream->FlushRstStream(); stream->FlushRstStream();
} }
} }
@ -1769,7 +1783,7 @@ Http2Stream* Http2Session::SubmitRequest(
Http2Stream::Provider::Stream prov(options); Http2Stream::Provider::Stream prov(options);
*ret = nghttp2_submit_request(session_, prispec, nva, len, *prov, nullptr); *ret = nghttp2_submit_request(session_, prispec, nva, len, *prov, nullptr);
CHECK_NE(*ret, NGHTTP2_ERR_NOMEM); CHECK_NE(*ret, NGHTTP2_ERR_NOMEM);
if (*ret > 0) if (LIKELY(*ret > 0))
stream = new Http2Stream(this, *ret, NGHTTP2_HCAT_HEADERS, options); stream = new Http2Stream(this, *ret, NGHTTP2_HCAT_HEADERS, options);
return stream; return stream;
} }
@ -1784,15 +1798,24 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
IncrementCurrentSessionMemory(buf.len); IncrementCurrentSessionMemory(buf.len);
CHECK(stream_buf_ab_.IsEmpty()); CHECK(stream_buf_ab_.IsEmpty());
OnScopeLeave on_scope_leave([&]() {
// Once finished handling this write, reset the stream buffer.
// The memory has either been free()d or was handed over to V8.
DecrementCurrentSessionMemory(buf.len);
stream_buf_ab_ = Local<ArrayBuffer>();
stream_buf_ = uv_buf_init(nullptr, 0);
});
// Only pass data on if nread > 0
if (nread <= 0) { if (nread <= 0) {
free(buf.base); free(buf.base);
if (nread < 0) { if (nread < 0) {
PassReadErrorToPreviousListener(nread); PassReadErrorToPreviousListener(nread);
} }
} else { return;
// Only pass data on if nread > 0 }
// Makre sure that there was no read previously active. // Make sure that there was no read previously active.
CHECK_NULL(stream_buf_.base); CHECK_NULL(stream_buf_.base);
CHECK_EQ(stream_buf_.len, 0); CHECK_EQ(stream_buf_.len, 0);
@ -1819,25 +1842,15 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
statistics_.data_received += nread; statistics_.data_received += nread;
ssize_t ret = Write(&stream_buf_, 1); ssize_t ret = Write(&stream_buf_, 1);
if (ret < 0) { if (UNLIKELY(ret < 0)) {
Debug(this, "fatal error receiving data: %d", ret); Debug(this, "fatal error receiving data: %d", ret);
Local<Value> arg = Integer::New(isolate, ret);
MakeCallback(env()->error_string(), 1, &arg);
return;
}
Local<Value> argv[] = {
Integer::New(isolate, ret),
};
MakeCallback(env()->error_string(), arraysize(argv), argv);
} else {
MaybeStopReading(); MaybeStopReading();
} }
}
// Since we are finished handling this write, reset the stream buffer.
// The memory has either been free()d or was handed over to V8.
DecrementCurrentSessionMemory(buf.len);
stream_buf_ab_ = Local<ArrayBuffer>();
stream_buf_ = uv_buf_init(nullptr, 0);
}
bool Http2Session::HasWritesOnSocketForStream(Http2Stream* stream) { bool Http2Session::HasWritesOnSocketForStream(Http2Stream* stream) {
for (const nghttp2_stream_write& wr : outgoing_buffers_) { for (const nghttp2_stream_write& wr : outgoing_buffers_) {