http2: multiple style and performance updates

* move CHECK statements into DEBUG checks
* improve performance by removing branches
  * Several if checks were left in while the code was being developed.
    Now that the core API has stablized more, the checks are largely
    unnecessary and can be removed, yielding a significant boost in
    performance.
* refactor flow control for proper backpressure
* use std::queue for inbound headers
* use std::queue for outbound data
* remove now unnecessary FreeHeaders function
* expand comments and miscellaneous edits
* add a couple of misbehaving flow control tests

PR-URL: https://github.com/nodejs/node/pull/16239
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
James M Snell 2017-10-16 10:43:54 -07:00
parent 82b1660b1d
commit f16b9c189a
7 changed files with 552 additions and 362 deletions

View File

@ -279,19 +279,13 @@ function onSessionRead(nread, buf, handle) {
assert(stream !== undefined,
'Internal HTTP/2 Failure. Stream does not exist. Please ' +
'report this as a bug in Node.js');
const state = stream[kState];
_unrefActive(owner); // Reset the session timeout timer
_unrefActive(stream); // Reset the stream timeout timer
if (nread >= 0 && !stream.destroyed)
return stream.push(buf);
if (nread >= 0 && !stream.destroyed) {
if (!stream.push(buf)) {
this.streamReadStop(id);
state.reading = false;
}
} else {
// Last chunk was received. End the readable side.
stream.push(null);
}
// Last chunk was received. End the readable side.
stream.push(null);
}
// Called when the remote peer settings have been updated.
@ -1205,9 +1199,7 @@ function streamOnResume() {
return;
}
const session = this[kSession];
const state = this[kState];
if (session && !state.reading) {
state.reading = true;
if (session) {
assert(session[kHandle].streamReadStart(this[kID]) === undefined,
`HTTP/2 Stream ${this[kID]} does not exist. Please report this as ` +
'a bug in Node.js');
@ -1216,9 +1208,7 @@ function streamOnResume() {
function streamOnPause() {
const session = this[kSession];
const state = this[kState];
if (session && state.reading) {
state.reading = false;
if (session) {
assert(session[kHandle].streamReadStop(this[kID]) === undefined,
`HTTP/2 Stream ${this[kID]} does not exist. Please report this as ' +
'a bug in Node.js`);
@ -1412,12 +1402,8 @@ class Http2Stream extends Duplex {
return;
}
_unrefActive(this);
const state = this[kState];
if (state.reading)
return;
state.reading = true;
assert(this[kSession][kHandle].streamReadStart(this[kID]) === undefined,
`HTTP/2 Stream ${this[kID]} does not exist. Please report this as ` +
assert(this[kSession][kHandle].flushData(this[kID]) === undefined,
'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' +
'a bug in Node.js');
}

View File

@ -4,6 +4,8 @@
#include "node_http2.h"
#include "node_http2_state.h"
#include <queue>
namespace node {
using v8::Boolean;
@ -18,13 +20,8 @@ using v8::Undefined;
namespace http2 {
Freelist<nghttp2_data_chunk_t, FREELIST_MAX>
data_chunk_free_list;
Freelist<Nghttp2Stream, FREELIST_MAX> stream_free_list;
Freelist<nghttp2_header_list, FREELIST_MAX> header_free_list;
Nghttp2Session::Callbacks Nghttp2Session::callback_struct_saved[2] = {
Callbacks(false),
Callbacks(true)};
@ -32,6 +29,8 @@ Nghttp2Session::Callbacks Nghttp2Session::callback_struct_saved[2] = {
Http2Options::Http2Options(Environment* env) {
nghttp2_option_new(&options_);
nghttp2_option_set_no_auto_window_update(options_, 1);
AliasedBuffer<uint32_t, v8::Uint32Array>& buffer =
env->http2_state()->options_buffer;
uint32_t flags = buffer[IDX_OPTIONS_FLAGS];
@ -89,20 +88,24 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
HandleScope handle_scope(isolate);
Context::Scope context_scope(context);
if (object()->Has(context, env()->ongetpadding_string()).FromJust()) {
AliasedBuffer<uint32_t, v8::Uint32Array>& buffer =
env()->http2_state()->padding_buffer;
buffer[PADDING_BUF_FRAME_LENGTH] = frameLen;
buffer[PADDING_BUF_MAX_PAYLOAD_LENGTH] = maxPayloadLen;
MakeCallback(env()->ongetpadding_string(), 0, nullptr);
uint32_t retval = buffer[PADDING_BUF_RETURN_VALUE];
retval = retval <= maxPayloadLen ? retval : maxPayloadLen;
retval = retval >= frameLen ? retval : frameLen;
CHECK_GE(retval, frameLen);
CHECK_LE(retval, maxPayloadLen);
return retval;
}
return frameLen;
#if defined(DEBUG) && DEBUG
CHECK(object->Has(context, env()->ongetpadding_string()).FromJust());
#endif
AliasedBuffer<uint32_t, v8::Uint32Array>& buffer =
env()->http2_state()->padding_buffer;
buffer[PADDING_BUF_FRAME_LENGTH] = frameLen;
buffer[PADDING_BUF_MAX_PAYLOAD_LENGTH] = maxPayloadLen;
buffer[PADDING_BUF_RETURN_VALUE] = frameLen;
MakeCallback(env()->ongetpadding_string(), 0, nullptr);
uint32_t retval = buffer[PADDING_BUF_RETURN_VALUE];
retval = retval <= maxPayloadLen ? retval : maxPayloadLen;
retval = retval >= frameLen ? retval : frameLen;
#if defined(DEBUG) && DEBUG
CHECK_GE(retval, frameLen);
CHECK_LE(retval, maxPayloadLen);
#endif
return retval;
}
void Http2Session::SetNextStreamID(const FunctionCallbackInfo<Value>& args) {
@ -215,8 +218,10 @@ template <get_setting fn>
void RefreshSettings(const FunctionCallbackInfo<Value>& args) {
DEBUG_HTTP2("Http2Session: refreshing settings for session\n");
Environment* env = Environment::GetCurrent(args);
#if defined(DEBUG) && DEBUG
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsObject());
#endif
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args[0].As<Object>());
nghttp2_session* s = session->session();
@ -241,8 +246,10 @@ void RefreshSettings(const FunctionCallbackInfo<Value>& args) {
void RefreshSessionState(const FunctionCallbackInfo<Value>& args) {
DEBUG_HTTP2("Http2Session: refreshing session state\n");
Environment* env = Environment::GetCurrent(args);
#if defined(DEBUG) && DEBUG
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsObject());
#endif
AliasedBuffer<double, v8::Float64Array>& buffer =
env->http2_state()->session_state_buffer;
Http2Session* session;
@ -271,9 +278,11 @@ void RefreshSessionState(const FunctionCallbackInfo<Value>& args) {
void RefreshStreamState(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
#if defined(DEBUG) && DEBUG
CHECK_EQ(args.Length(), 2);
CHECK(args[0]->IsObject());
CHECK(args[1]->IsNumber());
#endif
int32_t id = args[1]->Int32Value(env->context()).ToChecked();
DEBUG_HTTP2("Http2Session: refreshing stream %d state\n", id);
Http2Session* session;
@ -321,8 +330,9 @@ void RefreshStreamState(const FunctionCallbackInfo<Value>& args) {
void Http2Session::New(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
#if defined(DEBUG) && DEBUG
CHECK(args.IsConstructCall());
#endif
int val = args[0]->IntegerValue(env->context()).ToChecked();
nghttp2_session_type type = static_cast<nghttp2_session_type>(val);
DEBUG_HTTP2("Http2Session: creating a session of type: %d\n", type);
@ -334,7 +344,9 @@ void Http2Session::New(const FunctionCallbackInfo<Value>& args) {
void Http2Session::Consume(const FunctionCallbackInfo<Value>& args) {
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsExternal());
#endif
session->Consume(args[0].As<External>());
}
@ -375,9 +387,12 @@ void Http2Session::SubmitPriority(const FunctionCallbackInfo<Value>& args) {
DEBUG_HTTP2("Http2Session: submitting priority for stream %d: "
"parent: %d, weight: %d, exclusive: %d, silent: %d\n",
id, parent, weight, exclusive, silent);
#if defined(DEBUG) && DEBUG
CHECK_GT(id, 0);
CHECK_GE(parent, 0);
CHECK_GE(weight, 0);
#endif
Nghttp2Stream* stream;
if (!(stream = session->FindStream(id))) {
@ -455,8 +470,11 @@ void Http2Session::SubmitSettings(const FunctionCallbackInfo<Value>& args) {
void Http2Session::SubmitRstStream(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber());
CHECK(args[1]->IsNumber());
#endif
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
@ -480,7 +498,9 @@ void Http2Session::SubmitRequest(const FunctionCallbackInfo<Value>& args) {
// args[2] parentStream ID (for priority spec)
// args[3] weight (for priority spec)
// args[4] exclusive boolean (for priority spec)
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsArray());
#endif
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
@ -511,8 +531,10 @@ void Http2Session::SubmitRequest(const FunctionCallbackInfo<Value>& args) {
}
void Http2Session::SubmitResponse(const FunctionCallbackInfo<Value>& args) {
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber());
CHECK(args[1]->IsArray());
#endif
Http2Session* session;
Nghttp2Stream* stream;
@ -540,11 +562,13 @@ void Http2Session::SubmitResponse(const FunctionCallbackInfo<Value>& args) {
}
void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber()); // Stream ID
CHECK(args[1]->IsNumber()); // File Descriptor
CHECK(args[2]->IsArray()); // Headers
CHECK(args[3]->IsNumber()); // Offset
CHECK(args[4]->IsNumber()); // Length
#endif
Http2Session* session;
Nghttp2Stream* stream;
@ -562,7 +586,9 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
int64_t length = args[4]->IntegerValue(context).ToChecked();
int options = args[5]->IntegerValue(context).ToChecked();
#if defined(DEBUG) && DEBUG
CHECK_GE(offset, 0);
#endif
DEBUG_HTTP2("Http2Session: submitting file %d for stream %d: headers: %d, "
"end-stream: %d\n", fd, id, headers->Length());
@ -578,8 +604,10 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
}
void Http2Session::SendHeaders(const FunctionCallbackInfo<Value>& args) {
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber());
CHECK(args[1]->IsArray());
#endif
Http2Session* session;
Nghttp2Stream* stream;
@ -606,7 +634,9 @@ void Http2Session::SendHeaders(const FunctionCallbackInfo<Value>& args) {
void Http2Session::ShutdownStream(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber());
#endif
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
Nghttp2Stream* stream;
@ -618,10 +648,11 @@ void Http2Session::ShutdownStream(const FunctionCallbackInfo<Value>& args) {
stream->Shutdown();
}
void Http2Session::StreamReadStart(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber());
#endif
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
Nghttp2Stream* stream;
@ -635,7 +666,9 @@ void Http2Session::StreamReadStart(const FunctionCallbackInfo<Value>& args) {
void Http2Session::StreamReadStop(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber());
#endif
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
Nghttp2Stream* stream;
@ -688,9 +721,10 @@ void Http2Session::DestroyStream(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
#if defined(DEBUG) && DEBUG
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsNumber());
#endif
int32_t id = args[0]->Int32Value(env->context()).ToChecked();
DEBUG_HTTP2("Http2Session: destroy stream %d\n", id);
Nghttp2Stream* stream;
@ -700,6 +734,23 @@ void Http2Session::DestroyStream(const FunctionCallbackInfo<Value>& args) {
stream->Destroy();
}
void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Http2Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
#if defined(DEBUG) && DEBUG
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsNumber());
#endif
int32_t id = args[0]->Int32Value(env->context()).ToChecked();
DEBUG_HTTP2("Http2Session: flushing data to js for stream %d\n", id);
Nghttp2Stream* stream;
if (!(stream = session->FindStream(id))) {
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
}
stream->FlushDataChunks();
}
void Http2Session::SubmitPushPromise(const FunctionCallbackInfo<Value>& args) {
Http2Session* session;
Environment* env = Environment::GetCurrent(args);
@ -707,8 +758,10 @@ void Http2Session::SubmitPushPromise(const FunctionCallbackInfo<Value>& args) {
Isolate* isolate = env->isolate();
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
#if defined(DEBUG) && DEBUG
CHECK(args[0]->IsNumber()); // parent stream ID
CHECK(args[1]->IsArray()); // headers array
#endif
Nghttp2Stream* parent;
int32_t id = args[0]->Int32Value(context).ToChecked();
@ -802,29 +855,28 @@ void Http2Session::OnTrailers(Nghttp2Stream* stream,
HandleScope scope(isolate);
Context::Scope context_scope(context);
if (object()->Has(context, env()->ontrailers_string()).FromJust()) {
Local<Value> argv[1] = {
Integer::New(isolate, stream->id())
};
Local<Value> argv[1] = {
Integer::New(isolate, stream->id())
};
Local<Value> ret = MakeCallback(env()->ontrailers_string(),
arraysize(argv), argv).ToLocalChecked();
if (!ret.IsEmpty()) {
if (ret->IsArray()) {
Local<Array> headers = ret.As<Array>();
if (headers->Length() > 0) {
Headers trailers(isolate, context, headers);
submit_trailers.Submit(*trailers, trailers.length());
}
Local<Value> ret = MakeCallback(env()->ontrailers_string(),
arraysize(argv), argv).ToLocalChecked();
if (!ret.IsEmpty()) {
if (ret->IsArray()) {
Local<Array> headers = ret.As<Array>();
if (headers->Length() > 0) {
Headers trailers(isolate, context, headers);
submit_trailers.Submit(*trailers, trailers.length());
}
}
}
}
void Http2Session::OnHeaders(Nghttp2Stream* stream,
nghttp2_header_list* headers,
nghttp2_headers_category cat,
uint8_t flags) {
void Http2Session::OnHeaders(
Nghttp2Stream* stream,
std::queue<nghttp2_header>* headers,
nghttp2_headers_category cat,
uint8_t flags) {
Local<Context> context = env()->context();
Isolate* isolate = env()->isolate();
Context::Scope context_scope(context);
@ -836,10 +888,12 @@ void Http2Session::OnHeaders(Nghttp2Stream* stream,
Local<Function> fn = env()->push_values_to_array_function();
Local<Value> argv[NODE_PUSH_VAL_TO_ARRAY_MAX * 2];
#if defined(DEBUG) && DEBUG
CHECK_LE(cat, NGHTTP2_HCAT_HEADERS);
#endif
// The headers are passed in above as a linked list of nghttp2_header_list
// structs. The following converts that into a JS array with the structure:
// The headers are passed in above as a queue of nghttp2_header structs.
// The following converts that into a JS array with the structure:
// [name1, value1, name2, value2, name3, value3, name3, value4] and so on.
// That array is passed up to the JS layer and converted into an Object form
// like {name1: value1, name2: value2, name3: [value3, value4]}. We do it
@ -847,16 +901,16 @@ void Http2Session::OnHeaders(Nghttp2Stream* stream,
// array than it is to generate and pass the object).
do {
size_t j = 0;
while (headers != nullptr && j < arraysize(argv) / 2) {
nghttp2_header_list* item = headers;
while (!headers->empty() && j < arraysize(argv) / 2) {
nghttp2_header item = headers->front();
// The header name and value are passed as external one-byte strings
name_str =
ExternalHeader::New<true>(env(), item->name).ToLocalChecked();
ExternalHeader::New<true>(env(), item.name).ToLocalChecked();
value_str =
ExternalHeader::New<false>(env(), item->value).ToLocalChecked();
ExternalHeader::New<false>(env(), item.value).ToLocalChecked();
argv[j * 2] = name_str;
argv[j * 2 + 1] = value_str;
headers = item->next;
headers->pop();
j++;
}
// For performance, we pass name and value pairs to array.protototype.push
@ -865,17 +919,15 @@ void Http2Session::OnHeaders(Nghttp2Stream* stream,
if (j > 0) {
fn->Call(env()->context(), holder, j * 2, argv).ToLocalChecked();
}
} while (headers != nullptr);
} while (!headers->empty());
if (object()->Has(context, env()->onheaders_string()).FromJust()) {
Local<Value> argv[4] = {
Integer::New(isolate, stream->id()),
Integer::New(isolate, cat),
Integer::New(isolate, flags),
holder
};
MakeCallback(env()->onheaders_string(), arraysize(argv), argv);
}
Local<Value> args[4] = {
Integer::New(isolate, stream->id()),
Integer::New(isolate, cat),
Integer::New(isolate, flags),
holder
};
MakeCallback(env()->onheaders_string(), arraysize(args), args);
}
@ -884,24 +936,17 @@ void Http2Session::OnStreamClose(int32_t id, uint32_t code) {
Local<Context> context = env()->context();
HandleScope scope(isolate);
Context::Scope context_scope(context);
if (object()->Has(context, env()->onstreamclose_string()).FromJust()) {
Local<Value> argv[2] = {
Integer::New(isolate, id),
Integer::NewFromUnsigned(isolate, code)
};
MakeCallback(env()->onstreamclose_string(), arraysize(argv), argv);
}
}
void FreeDataChunk(char* data, void* hint) {
nghttp2_data_chunk_t* item = reinterpret_cast<nghttp2_data_chunk_t*>(hint);
delete[] data;
data_chunk_free_list.push(item);
Local<Value> argv[2] = {
Integer::New(isolate, id),
Integer::NewFromUnsigned(isolate, code)
};
MakeCallback(env()->onstreamclose_string(), arraysize(argv), argv);
}
void Http2Session::OnDataChunk(
Nghttp2Stream* stream,
nghttp2_data_chunk_t* chunk) {
uv_buf_t* chunk) {
Isolate* isolate = env()->isolate();
Local<Context> context = env()->context();
HandleScope scope(isolate);
@ -912,11 +957,8 @@ void Http2Session::OnDataChunk(
ssize_t len = -1;
Local<Object> buf;
if (chunk != nullptr) {
len = chunk->buf.len;
buf = Buffer::New(isolate,
chunk->buf.base, len,
FreeDataChunk,
chunk).ToLocalChecked();
len = chunk->len;
buf = Buffer::New(isolate, chunk->base, len).ToLocalChecked();
}
EmitData(len, buf, obj);
}
@ -926,10 +968,9 @@ void Http2Session::OnSettings(bool ack) {
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Context::Scope context_scope(context);
if (object()->Has(context, env()->onsettings_string()).FromJust()) {
Local<Value> argv[1] = { Boolean::New(isolate, ack) };
MakeCallback(env()->onsettings_string(), arraysize(argv), argv);
}
Local<Value> argv[1] = { Boolean::New(isolate, ack) };
MakeCallback(env()->onsettings_string(), arraysize(argv), argv);
}
void Http2Session::OnFrameError(int32_t id, uint8_t type, int error_code) {
@ -937,14 +978,13 @@ void Http2Session::OnFrameError(int32_t id, uint8_t type, int error_code) {
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Context::Scope context_scope(context);
if (object()->Has(context, env()->onframeerror_string()).FromJust()) {
Local<Value> argv[3] = {
Integer::New(isolate, id),
Integer::New(isolate, type),
Integer::New(isolate, error_code)
};
MakeCallback(env()->onframeerror_string(), arraysize(argv), argv);
}
Local<Value> argv[3] = {
Integer::New(isolate, id),
Integer::New(isolate, type),
Integer::New(isolate, error_code)
};
MakeCallback(env()->onframeerror_string(), arraysize(argv), argv);
}
void Http2Session::OnPriority(int32_t stream,
@ -955,15 +995,14 @@ void Http2Session::OnPriority(int32_t stream,
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Context::Scope context_scope(context);
if (object()->Has(context, env()->onpriority_string()).FromJust()) {
Local<Value> argv[4] = {
Integer::New(isolate, stream),
Integer::New(isolate, parent),
Integer::New(isolate, weight),
Boolean::New(isolate, exclusive)
};
MakeCallback(env()->onpriority_string(), arraysize(argv), argv);
}
Local<Value> argv[4] = {
Integer::New(isolate, stream),
Integer::New(isolate, parent),
Integer::New(isolate, weight),
Boolean::New(isolate, exclusive)
};
MakeCallback(env()->onpriority_string(), arraysize(argv), argv);
}
void Http2Session::OnGoAway(int32_t lastStreamID,
@ -974,21 +1013,20 @@ void Http2Session::OnGoAway(int32_t lastStreamID,
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
Context::Scope context_scope(context);
if (object()->Has(context, env()->ongoawaydata_string()).FromJust()) {
Local<Value> argv[3] = {
Integer::NewFromUnsigned(isolate, errorCode),
Integer::New(isolate, lastStreamID),
Undefined(isolate)
};
if (length > 0) {
argv[2] = Buffer::Copy(isolate,
reinterpret_cast<char*>(data),
length).ToLocalChecked();
}
Local<Value> argv[3] = {
Integer::NewFromUnsigned(isolate, errorCode),
Integer::New(isolate, lastStreamID),
Undefined(isolate)
};
MakeCallback(env()->ongoawaydata_string(), arraysize(argv), argv);
if (length > 0) {
argv[2] = Buffer::Copy(isolate,
reinterpret_cast<char*>(data),
length).ToLocalChecked();
}
MakeCallback(env()->ongoawaydata_string(), arraysize(argv), argv);
}
void Http2Session::OnStreamAllocImpl(size_t suggested_size,
@ -1030,9 +1068,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
void Http2Session::Consume(Local<External> external) {
DEBUG_HTTP2("Http2Session: consuming socket\n");
#if defined(DEBUG) && DEBUG
CHECK(prev_alloc_cb_.is_empty());
#endif
StreamBase* stream = static_cast<StreamBase*>(external->Value());
#if defined(DEBUG) && DEBUG
CHECK_NE(stream, nullptr);
#endif
stream->Consume();
stream_ = stream;
prev_alloc_cb_ = stream->alloc_cb();
@ -1057,11 +1099,15 @@ void Http2Session::Unconsume() {
Headers::Headers(Isolate* isolate,
Local<Context> context,
Local<Array> headers) {
#if defined(DEBUG) && DEBUG
CHECK_EQ(headers->Length(), 2);
#endif
Local<Value> header_string = headers->Get(context, 0).ToLocalChecked();
Local<Value> header_count = headers->Get(context, 1).ToLocalChecked();
#if defined(DEBUG) && DEBUG
CHECK(header_string->IsString());
CHECK(header_count->IsUint32());
#endif
count_ = header_count.As<Uint32>()->Value();
int header_string_len = header_string.As<String>()->Length();
@ -1112,8 +1158,10 @@ Headers::Headers(Isolate* isolate,
p += nva[n].valuelen + 1;
}
#if defined(DEBUG) && DEBUG
CHECK_EQ(p, header_contents + header_string_len);
CHECK_EQ(n, count_);
#endif
}
@ -1199,6 +1247,8 @@ void Initialize(Local<Object> target,
Http2Session::SetNextStreamID);
env->SetProtoMethod(session, "destroyStream",
Http2Session::DestroyStream);
env->SetProtoMethod(session, "flushData",
Http2Session::FlushData);
StreamBase::AddMethods<Http2Session>(env, session,
StreamBase::kFlagHasWritev |
StreamBase::kFlagNoShutdown);

View File

@ -8,6 +8,8 @@
#include "stream_base-inl.h"
#include "string_bytes.h"
#include <queue>
namespace node {
namespace http2 {
@ -17,6 +19,10 @@ using v8::EscapableHandleScope;
using v8::Isolate;
using v8::MaybeLocal;
// Unlike the HTTP/1 implementation, the HTTP/2 implementation is not limited
// to a fixed number of known supported HTTP methods. These constants, therefore
// are provided strictly as a convenience to users and are exposed via the
// require('http2').constants object.
#define HTTP_KNOWN_METHODS(V) \
V(ACL, "ACL") \
V(BASELINE_CONTROL, "BASELINE-CONTROL") \
@ -58,6 +64,8 @@ using v8::MaybeLocal;
V(UPDATEREDIRECTREF, "UPDATEREDIRECTREF") \
V(VERSION_CONTROL, "VERSION-CONTROL")
// These are provided strictly as a convenience to users and are exposed via the
// require('http2').constants objects
#define HTTP_KNOWN_HEADERS(V) \
V(STATUS, ":status") \
V(METHOD, ":method") \
@ -143,6 +151,9 @@ HTTP_KNOWN_HEADERS(V)
HTTP_KNOWN_HEADER_MAX
};
// While some of these codes are used within the HTTP/2 implementation in
// core, they are provided strictly as a convenience to users and are exposed
// via the require('http2').constants object.
#define HTTP_STATUS_CODES(V) \
V(CONTINUE, 100) \
V(SWITCHING_PROTOCOLS, 101) \
@ -213,15 +224,22 @@ HTTP_STATUS_CODES(V)
#undef V
};
// The Padding Strategy determines the method by which extra padding is
// selected for HEADERS and DATA frames. These are configurable via the
// options passed in to a Http2Session object.
enum padding_strategy_type {
// No padding strategy
// No padding strategy. This is the default.
PADDING_STRATEGY_NONE,
// Padding will ensure all data frames are maxFrameSize
PADDING_STRATEGY_MAX,
// Padding will be determined via JS callback
// Padding will be determined via a JS callback. Note that this can be
// expensive because the callback is called once for every DATA and
// HEADERS frame. For performance reasons, this strategy should be
// avoided.
PADDING_STRATEGY_CALLBACK
};
// These are the error codes provided by the underlying nghttp2 implementation.
#define NGHTTP2_ERROR_CODES(V) \
V(NGHTTP2_ERR_INVALID_ARGUMENT) \
V(NGHTTP2_ERR_BUFFER_ERROR) \
@ -281,6 +299,10 @@ const char* nghttp2_errname(int rv) {
#define MIN_MAX_FRAME_SIZE DEFAULT_SETTINGS_MAX_FRAME_SIZE
#define MAX_INITIAL_WINDOW_SIZE 2147483647
// The Http2Options class is used to parse the options object passed in to
// a Http2Session object and convert those into an appropriate nghttp2_option
// struct. This is the primary mechanism by which the Http2Session object is
// configured.
class Http2Options {
public:
explicit Http2Options(Environment* env);
@ -294,7 +316,9 @@ class Http2Options {
}
void SetPaddingStrategy(padding_strategy_type val) {
#if DEBUG
CHECK_LE(val, PADDING_STRATEGY_CALLBACK);
#endif
padding_strategy_ = static_cast<padding_strategy_type>(val);
}
@ -364,18 +388,21 @@ class Http2Session : public AsyncWrap,
return OnMaxFrameSizePadding(frameLength, maxPayloadLen);
}
#if defined(DEBUG) && DEBUG
CHECK_EQ(padding_strategy_, PADDING_STRATEGY_CALLBACK);
#endif
return OnCallbackPadding(frameLength, maxPayloadLen);
}
void OnHeaders(Nghttp2Stream* stream,
nghttp2_header_list* headers,
nghttp2_headers_category cat,
uint8_t flags) override;
void OnHeaders(
Nghttp2Stream* stream,
std::queue<nghttp2_header>* headers,
nghttp2_headers_category cat,
uint8_t flags) override;
void OnStreamClose(int32_t id, uint32_t code) override;
void Send(uv_buf_t* bufs, size_t total) override;
void OnDataChunk(Nghttp2Stream* stream, nghttp2_data_chunk_t* chunk) override;
void OnDataChunk(Nghttp2Stream* stream, uv_buf_t* chunk) override;
void OnSettings(bool ack) override;
void OnPriority(int32_t stream,
int32_t parent,
@ -447,6 +474,7 @@ class Http2Session : public AsyncWrap,
static void SendShutdownNotice(const FunctionCallbackInfo<Value>& args);
static void SubmitGoaway(const FunctionCallbackInfo<Value>& args);
static void DestroyStream(const FunctionCallbackInfo<Value>& args);
static void FlushData(const FunctionCallbackInfo<Value>& args);
template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args);

View File

@ -10,26 +10,12 @@
namespace node {
namespace http2 {
#define FREELIST_MAX 1024
#define LINKED_LIST_ADD(list, item) \
do { \
if (list ## _tail_ == nullptr) { \
list ## _head_ = item; \
list ## _tail_ = item; \
} else { \
list ## _tail_->next = item; \
list ## _tail_ = item; \
} \
} while (0);
extern Freelist<nghttp2_data_chunk_t, FREELIST_MAX>
data_chunk_free_list;
#define FREELIST_MAX 10240
// Instances of Nghttp2Stream are created and pooled in order to speed
// allocation under load.
extern Freelist<Nghttp2Stream, FREELIST_MAX> stream_free_list;
extern Freelist<nghttp2_header_list, FREELIST_MAX> header_free_list;
#ifdef NODE_DEBUG_HTTP2
inline int Nghttp2Session::OnNghttpError(nghttp2_session* session,
const char* message,
@ -71,8 +57,8 @@ inline int Nghttp2Session::OnBeginHeadersCallback(nghttp2_session* session,
// and transparently so we do not need to worry about those at all.
inline int Nghttp2Session::OnHeaderCallback(nghttp2_session* session,
const nghttp2_frame* frame,
nghttp2_rcbuf *name,
nghttp2_rcbuf *value,
nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
@ -82,12 +68,15 @@ inline int Nghttp2Session::OnHeaderCallback(nghttp2_session* session,
frame->push_promise.promised_stream_id :
frame->hd.stream_id;
Nghttp2Stream* stream = handle->FindStream(id);
nghttp2_header_list* header = header_free_list.pop();
header->name = name;
header->value = value;
// The header name and value are stored in a reference counted buffer
// provided to us by nghttp2. We need to increment the reference counter
// here, then decrement it when we're done using it later.
nghttp2_rcbuf_incref(name);
nghttp2_rcbuf_incref(value);
LINKED_LIST_ADD(stream->current_headers, header);
nghttp2_header header;
header.name = name;
header.value = value;
stream->headers()->emplace(header);
return 0;
}
@ -107,6 +96,7 @@ inline int Nghttp2Session::OnFrameReceive(nghttp2_session* session,
handle->HandleDataFrame(frame);
break;
case NGHTTP2_PUSH_PROMISE:
// Intentional fall-through, handled just like headers frames
case NGHTTP2_HEADERS:
handle->HandleHeadersFrame(frame);
break;
@ -126,18 +116,24 @@ inline int Nghttp2Session::OnFrameReceive(nghttp2_session* session,
return 0;
}
inline int Nghttp2Session::OnFrameNotSent(nghttp2_session *session,
const nghttp2_frame *frame,
// nghttp2 will call this if an error occurs attempting to send a frame.
// Unless the stream or session is closed, this really should not happen
// unless there is a serious flaw in our implementation.
inline int Nghttp2Session::OnFrameNotSent(nghttp2_session* session,
const nghttp2_frame* frame,
int error_code,
void *user_data) {
Nghttp2Session *handle = static_cast<Nghttp2Session *>(user_data);
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: frame type %d was not sent, code: %d\n",
handle->TypeName(), frame->hd.type, error_code);
// Do not report if the frame was not sent due to the session closing
if (error_code != NGHTTP2_ERR_SESSION_CLOSING &&
error_code != NGHTTP2_ERR_STREAM_CLOSED &&
error_code != NGHTTP2_ERR_STREAM_CLOSING)
handle->OnFrameError(frame->hd.stream_id, frame->hd.type, error_code);
error_code != NGHTTP2_ERR_STREAM_CLOSING) {
handle->OnFrameError(frame->hd.stream_id,
frame->hd.type,
error_code);
}
return 0;
}
@ -153,14 +149,14 @@ inline int Nghttp2Session::OnInvalidHeader(nghttp2_session* session,
// Called when nghttp2 closes a stream, either in response to an RST_STREAM
// frame or the stream closing naturally on it's own
inline int Nghttp2Session::OnStreamClose(nghttp2_session *session,
inline int Nghttp2Session::OnStreamClose(nghttp2_session* session,
int32_t id,
uint32_t code,
void *user_data) {
Nghttp2Session *handle = static_cast<Nghttp2Session *>(user_data);
void* user_data) {
Nghttp2Session*handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: stream %d closed, code: %d\n",
handle->TypeName(), id, code);
Nghttp2Stream *stream = handle->FindStream(id);
Nghttp2Stream* stream = handle->FindStream(id);
// Intentionally ignore the callback if the stream does not exist
if (stream != nullptr)
stream->Close(code);
@ -170,17 +166,17 @@ inline int Nghttp2Session::OnStreamClose(nghttp2_session *session,
// Called by nghttp2 to collect the data while a file response is sent.
// The buf is the DATA frame buffer that needs to be filled with at most
// length bytes. flags is used to control what nghttp2 does next.
inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session *session,
inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session* session,
int32_t id,
uint8_t *buf,
uint8_t* buf,
size_t length,
uint32_t *flags,
nghttp2_data_source *source,
void *user_data) {
Nghttp2Session *handle = static_cast<Nghttp2Session *>(user_data);
uint32_t* flags,
nghttp2_data_source* source,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: reading outbound file data for stream %d\n",
handle->TypeName(), id);
Nghttp2Stream *stream = handle->FindStream(id);
Nghttp2Stream* stream = handle->FindStream(id);
int fd = source->fd;
int64_t offset = stream->fd_offset_;
@ -191,7 +187,7 @@ inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session *session,
length = stream->fd_length_;
uv_buf_t data;
data.base = reinterpret_cast<char *>(buf);
data.base = reinterpret_cast<char*>(buf);
data.len = length;
uv_fs_t read_req;
@ -226,54 +222,53 @@ inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session *session,
// Called by nghttp2 to collect the data to pack within a DATA frame.
// The buf is the DATA frame buffer that needs to be filled with at most
// length bytes. flags is used to control what nghttp2 does next.
inline ssize_t Nghttp2Session::OnStreamRead(nghttp2_session *session,
inline ssize_t Nghttp2Session::OnStreamRead(nghttp2_session* session,
int32_t id,
uint8_t *buf,
uint8_t* buf,
size_t length,
uint32_t *flags,
nghttp2_data_source *source,
void *user_data) {
Nghttp2Session *handle = static_cast<Nghttp2Session *>(user_data);
uint32_t* flags,
nghttp2_data_source* source,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: reading outbound data for stream %d\n",
handle->TypeName(), id);
Nghttp2Stream *stream = handle->FindStream(id);
Nghttp2Stream* stream = handle->FindStream(id);
size_t remaining = length;
size_t offset = 0;
// While there is data in the queue, copy data into buf until it is full.
// There may be data left over, which will be sent the next time nghttp
// calls this callback.
while (stream->queue_head_ != nullptr) {
while (!stream->queue_.empty()) {
DEBUG_HTTP2("Nghttp2Session %s: processing outbound data chunk\n",
handle->TypeName());
nghttp2_stream_write_queue *head = stream->queue_head_;
while (stream->queue_head_index_ < head->nbufs) {
nghttp2_stream_write* head = stream->queue_.front();
while (stream->queue_index_ < head->nbufs) {
if (remaining == 0)
goto end;
unsigned int n = stream->queue_head_index_;
unsigned int n = stream->queue_index_;
// len is the number of bytes in head->bufs[n] that are yet to be written
size_t len = head->bufs[n].len - stream->queue_head_offset_;
size_t len = head->bufs[n].len - stream->queue_offset_;
size_t bytes_to_write = len < remaining ? len : remaining;
memcpy(buf + offset,
head->bufs[n].base + stream->queue_head_offset_,
head->bufs[n].base + stream->queue_offset_,
bytes_to_write);
offset += bytes_to_write;
remaining -= bytes_to_write;
if (bytes_to_write < len) {
stream->queue_head_offset_ += bytes_to_write;
stream->queue_offset_ += bytes_to_write;
} else {
stream->queue_head_index_++;
stream->queue_head_offset_ = 0;
stream->queue_index_++;
stream->queue_offset_ = 0;
}
}
stream->queue_head_offset_ = 0;
stream->queue_head_index_ = 0;
stream->queue_head_ = head->next;
stream->queue_offset_ = 0;
stream->queue_index_ = 0;
head->cb(head->req, 0);
delete head;
stream->queue_.pop();
}
stream->queue_tail_ = nullptr;
end:
// If we are no longer writable and there is no more data in the queue,
@ -283,8 +278,8 @@ end:
// that will wait for data to become available.
// If neither of these flags are set, then nghttp2 will call this callback
// again to get the data for the next DATA frame.
int writable = stream->queue_head_ != nullptr || stream->IsWritable();
if (offset == 0 && writable && stream->queue_head_ == nullptr) {
int writable = !stream->queue_.empty() || stream->IsWritable();
if (offset == 0 && writable && stream->queue_.empty()) {
DEBUG_HTTP2("Nghttp2Session %s: deferring stream %d\n",
handle->TypeName(), id);
return NGHTTP2_ERR_DEFERRED;
@ -296,64 +291,70 @@ end:
GetTrailers(session, handle, stream, flags);
}
#if defined(DEBUG) && DEBUG
CHECK(offset <= length);
#endif
return offset;
}
// Called by nghttp2 when it needs to determine how much padding to apply
// to a DATA or HEADERS frame
inline ssize_t Nghttp2Session::OnSelectPadding(nghttp2_session *session,
const nghttp2_frame *frame,
inline ssize_t Nghttp2Session::OnSelectPadding(nghttp2_session* session,
const nghttp2_frame* frame,
size_t maxPayloadLen,
void *user_data) {
Nghttp2Session *handle = static_cast<Nghttp2Session *>(user_data);
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
#if defined(DEBUG) && DEBUG
CHECK(handle->HasGetPaddingCallback());
#endif
ssize_t padding = handle->GetPadding(frame->hd.length, maxPayloadLen);
DEBUG_HTTP2("Nghttp2Session %s: using padding, size: %d\n",
handle->TypeName(), padding);
return padding;
}
// Called by nghttp2 multiple times while processing a DATA frame
inline int Nghttp2Session::OnDataChunkReceived(nghttp2_session *session,
// While nghttp2 is processing a DATA frame, it will call the
// OnDataChunkReceived callback multiple times, passing along individual
// chunks of data from the DATA frame payload. These *must* be memcpy'd
// out because the pointer to the data will quickly become invalid.
inline int Nghttp2Session::OnDataChunkReceived(nghttp2_session* session,
uint8_t flags,
int32_t id,
const uint8_t *data,
const uint8_t* data,
size_t len,
void *user_data) {
Nghttp2Session *handle = static_cast<Nghttp2Session *>(user_data);
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: buffering data chunk for stream %d, size: "
"%d, flags: %d\n", handle->TypeName(),
id, len, flags);
Nghttp2Stream *stream = handle->FindStream(id);
nghttp2_data_chunk_t *chunk = data_chunk_free_list.pop();
chunk->buf = uv_buf_init(new char[len], len);
memcpy(chunk->buf.base, data, len);
if (stream->data_chunks_tail_ == nullptr) {
stream->data_chunks_head_ =
stream->data_chunks_tail_ = chunk;
} else {
stream->data_chunks_tail_->next = chunk;
stream->data_chunks_tail_ = chunk;
// We should never actually get a 0-length chunk so this check is
// only a precaution at this point.
if (len > 0) {
nghttp2_session_consume_connection(session, len);
Nghttp2Stream* stream = handle->FindStream(id);
char* buf = Malloc<char>(len);
memcpy(buf, data, len);
stream->data_chunks_.emplace(uv_buf_init(buf, len));
}
return 0;
}
inline void Nghttp2Session::GetTrailers(nghttp2_session *session,
Nghttp2Session *handle,
Nghttp2Stream *stream,
uint32_t *flags) {
// Only when we are done sending the last chunk of data do we check for
// any trailing headers that are to be sent. This is the only opportunity
// we have to make this check. If there are trailers, then the
// NGHTTP2_DATA_FLAG_NO_END_STREAM flag must be set.
inline void Nghttp2Session::GetTrailers(nghttp2_session* session,
Nghttp2Session* handle,
Nghttp2Stream* stream,
uint32_t* flags) {
if (stream->GetTrailers()) {
// Only when we are done sending the last chunk of data do we check for
// any trailing headers that are to be sent. This is the only opportunity
// we have to make this check. If there are trailers, then the
// NGHTTP2_DATA_FLAG_NO_END_STREAM flag must be set.
SubmitTrailers submit_trailers{handle, stream, flags};
handle->OnTrailers(stream, submit_trailers);
}
}
inline void Nghttp2Session::SubmitTrailers::Submit(nghttp2_nv *trailers,
// Submits any trailing header fields that have been collected
inline void Nghttp2Session::SubmitTrailers::Submit(nghttp2_nv* trailers,
size_t length) const {
if (length == 0)
return;
@ -367,6 +368,7 @@ inline void Nghttp2Session::SubmitTrailers::Submit(nghttp2_nv *trailers,
length);
}
// Submits a graceful shutdown notice to nghttp
// See: https://nghttp2.org/documentation/nghttp2_submit_shutdown_notice.html
inline void Nghttp2Session::SubmitShutdownNotice() {
DEBUG_HTTP2("Nghttp2Session %s: submitting shutdown notice\n",
@ -397,33 +399,36 @@ inline Nghttp2Stream* Nghttp2Session::FindStream(int32_t id) {
}
}
// Flushes any received queued chunks of data out to the JS layer
inline void Nghttp2Stream::FlushDataChunks(bool done) {
while (data_chunks_head_ != nullptr) {
DEBUG_HTTP2("Nghttp2Stream %d: emitting data chunk\n", id_);
nghttp2_data_chunk_t* item = data_chunks_head_;
data_chunks_head_ = item->next;
// item will be passed to the Buffer instance and freed on gc
session_->OnDataChunk(this, item);
// Flushes one buffered data chunk at a time.
inline void Nghttp2Stream::FlushDataChunks() {
if (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
data_chunks_.pop();
if (buf.len > 0) {
nghttp2_session_consume_stream(session_->session(), id_, buf.len);
session_->OnDataChunk(this, &buf);
} else {
session_->OnDataChunk(this, nullptr);
}
}
data_chunks_tail_ = nullptr;
if (done)
session_->OnDataChunk(this, nullptr);
}
// Passes all of the the chunks for a data frame out to the JS layer
// The chunks are collected as the frame is being processed and sent out
// to the JS side only when the frame is fully processed.
// Called when a DATA frame has been completely processed. Will check to
// see if the END_STREAM flag is set, and will flush the queued data chunks
// to JS if the stream is flowing
inline void Nghttp2Session::HandleDataFrame(const nghttp2_frame* frame) {
int32_t id = frame->hd.stream_id;
DEBUG_HTTP2("Nghttp2Session %s: handling data frame for stream %d\n",
TypeName(), id);
Nghttp2Stream* stream = this->FindStream(id);
// If the stream does not exist, something really bad happened
#if defined(DEBUG) && DEBUG
CHECK_NE(stream, nullptr);
bool done = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) ==
NGHTTP2_FLAG_END_STREAM;
stream->FlushDataChunks(done);
#endif
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
stream->data_chunks_.emplace(uv_buf_init(0, 0));
if (stream->IsReading())
stream->FlushDataChunks();
}
// Passes all of the collected headers for a HEADERS frame out to the JS layer.
@ -436,12 +441,13 @@ inline void Nghttp2Session::HandleHeadersFrame(const nghttp2_frame* frame) {
TypeName(), id);
Nghttp2Stream* stream = FindStream(id);
// If the stream does not exist, something really bad happened
#if defined(DEBUG) && DEBUG
CHECK_NE(stream, nullptr);
#endif
OnHeaders(stream,
stream->headers(),
stream->headers_category(),
frame->hd.flags);
stream->FreeHeaders();
}
// Notifies the JS layer that a PRIORITY frame has been received
@ -450,13 +456,17 @@ inline void Nghttp2Session::HandlePriorityFrame(const nghttp2_frame* frame) {
int32_t id = frame->hd.stream_id;
DEBUG_HTTP2("Nghttp2Session %s: handling priority frame for stream %d\n",
TypeName(), id);
// Ignore the priority frame if stream ID is <= 0
// This actually should never happen because nghttp2 should treat this as
// an error condition that terminates the session.
if (id > 0) {
nghttp2_priority_spec spec = priority_frame.pri_spec;
OnPriority(id, spec.stream_id, spec.weight, spec.exclusive);
}
// Priority frame stream ID should never be <= 0. nghttp2 handles this
// as an error condition that terminates the session, so we should be
// good here
#if defined(DEBUG) && DEBUG
CHECK_GT(id, 0)
#endif
nghttp2_priority_spec spec = priority_frame.pri_spec;
OnPriority(id, spec.stream_id, spec.weight, spec.exclusive);
}
// Notifies the JS layer that a GOAWAY frame has been received
@ -579,7 +589,6 @@ inline int Nghttp2Session::Init(uv_loop_t* loop,
Nghttp2Session* session = ContainerOf(&Nghttp2Session::prep_, t);
session->SendPendingData();
});
// uv_unref(reinterpret_cast<uv_handle_t*>(&prep_));
return ret;
}
@ -588,7 +597,9 @@ inline void Nghttp2Session::MarkDestroying() {
}
inline int Nghttp2Session::Free() {
#if defined(DEBUG) && DEBUG
CHECK(session_ != nullptr);
#endif
DEBUG_HTTP2("Nghttp2Session %s: freeing session\n", TypeName());
// Stop the loop
CHECK_EQ(uv_prepare_stop(&prep_), 0);
@ -657,19 +668,22 @@ inline void Nghttp2Stream::ResetState(
int options) {
DEBUG_HTTP2("Nghttp2Stream %d: resetting stream state\n", id);
session_ = session;
queue_head_ = nullptr;
queue_tail_ = nullptr;
data_chunks_head_ = nullptr;
data_chunks_tail_ = nullptr;
current_headers_head_ = nullptr;
current_headers_tail_ = nullptr;
while (!queue_.empty()) {
nghttp2_stream_write* head = queue_.front();
delete head;
queue_.pop();
}
while (!data_chunks_.empty())
data_chunks_.pop();
while (!current_headers_.empty())
current_headers_.pop();
current_headers_category_ = category;
flags_ = NGHTTP2_STREAM_FLAG_NONE;
id_ = id;
code_ = NGHTTP2_NO_ERROR;
prev_local_window_size_ = 65535;
queue_head_index_ = 0;
queue_head_offset_ = 0;
queue_index_ = 0;
queue_offset_ = 0;
getTrailers_ = options & STREAM_OPTION_GET_TRAILERS;
}
@ -689,41 +703,25 @@ inline void Nghttp2Stream::Destroy() {
}
// Free any remaining incoming data chunks.
while (data_chunks_head_ != nullptr) {
nghttp2_data_chunk_t* chunk = data_chunks_head_;
data_chunks_head_ = chunk->next;
delete[] chunk->buf.base;
data_chunk_free_list.push(chunk);
}
data_chunks_tail_ = nullptr;
while (!data_chunks_.empty())
data_chunks_.pop();
// Free any remaining outgoing data chunks.
while (queue_head_ != nullptr) {
nghttp2_stream_write_queue* head = queue_head_;
queue_head_ = head->next;
while (!queue_.empty()) {
nghttp2_stream_write* head = queue_.front();
head->cb(head->req, UV_ECANCELED);
delete head;
queue_.pop();
}
queue_tail_ = nullptr;
// Free any remaining headers
FreeHeaders();
while (!current_headers_.empty())
current_headers_.pop();
// Return this stream instance to the freelist
stream_free_list.push(this);
}
inline void Nghttp2Stream::FreeHeaders() {
DEBUG_HTTP2("Nghttp2Stream %d: freeing headers\n", id_);
while (current_headers_head_ != nullptr) {
DEBUG_HTTP2("Nghttp2Stream %d: freeing header item\n", id_);
nghttp2_header_list* item = current_headers_head_;
current_headers_head_ = item->next;
header_free_list.push(item);
}
current_headers_tail_ = nullptr;
}
// Submit informational headers for a stream.
inline int Nghttp2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
DEBUG_HTTP2("Nghttp2Stream %d: sending informational headers, count: %d\n",
@ -762,7 +760,9 @@ inline int32_t Nghttp2Stream::SubmitPushPromise(
size_t len,
Nghttp2Stream** assigned,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
#endif
DEBUG_HTTP2("Nghttp2Stream %d: sending push promise\n", id_);
int32_t ret = nghttp2_submit_push_promise(session_->session(),
NGHTTP2_FLAG_NONE,
@ -784,7 +784,9 @@ inline int32_t Nghttp2Stream::SubmitPushPromise(
inline int Nghttp2Stream::SubmitResponse(nghttp2_nv* nva,
size_t len,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
#endif
DEBUG_HTTP2("Nghttp2Stream %d: submitting response\n", id_);
getTrailers_ = options & STREAM_OPTION_GET_TRAILERS;
nghttp2_data_provider* provider = nullptr;
@ -804,8 +806,10 @@ inline int Nghttp2Stream::SubmitFile(int fd,
int64_t offset,
int64_t length,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
CHECK_GT(fd, 0);
#endif
DEBUG_HTTP2("Nghttp2Stream %d: submitting file\n", id_);
getTrailers_ = options & STREAM_OPTION_GET_TRAILERS;
nghttp2_data_provider prov;
@ -829,7 +833,9 @@ inline int32_t Nghttp2Session::SubmitRequest(
size_t len,
Nghttp2Stream** assigned,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
#endif
DEBUG_HTTP2("Nghttp2Session: submitting request\n");
nghttp2_data_provider* provider = nullptr;
nghttp2_data_provider prov;
@ -868,42 +874,21 @@ inline int Nghttp2Stream::Write(nghttp2_stream_write_t* req,
}
DEBUG_HTTP2("Nghttp2Stream %d: queuing buffers to send, count: %d\n",
id_, nbufs);
nghttp2_stream_write_queue* item = new nghttp2_stream_write_queue;
nghttp2_stream_write* item = new nghttp2_stream_write;
item->cb = cb;
item->req = req;
item->nbufs = nbufs;
item->bufs.AllocateSufficientStorage(nbufs);
req->handle = this;
req->item = item;
memcpy(*(item->bufs), bufs, nbufs * sizeof(*bufs));
if (queue_head_ == nullptr) {
queue_head_ = item;
queue_tail_ = item;
} else {
queue_tail_->next = item;
queue_tail_ = item;
}
queue_.push(item);
nghttp2_session_resume_data(session_->session(), id_);
return 0;
}
inline void Nghttp2Stream::ReadStart() {
// Has no effect if IsReading() is true.
if (IsReading())
return;
DEBUG_HTTP2("Nghttp2Stream %d: start reading\n", id_);
if (IsPaused()) {
// If handle->reading is less than zero, read_start had never previously
// been called. If handle->reading is zero, reading had started and read
// stop had been previously called, meaning that the flow control window
// has been explicitly set to zero. Reset the flow control window now to
// restart the flow of data.
nghttp2_session_set_local_window_size(session_->session(),
NGHTTP2_FLAG_NONE,
id_,
prev_local_window_size_);
}
flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
@ -913,22 +898,9 @@ inline void Nghttp2Stream::ReadStart() {
inline void Nghttp2Stream::ReadStop() {
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
// Has no effect if IsReading() is false, which will happen if we either
// have not started reading yet at all (NGHTTP2_STREAM_FLAG_READ_START is not
// set) or if we're already paused (NGHTTP2_STREAM_FLAG_READ_PAUSED is set.
if (!IsReading())
return;
flags_ |= NGHTTP2_STREAM_FLAG_READ_PAUSED;
// When not reading, explicitly set the local window size to 0 so that
// the peer does not keep sending data that has to be buffered
int32_t ret =
nghttp2_session_get_stream_local_window_size(session_->session(), id_);
if (ret >= 0)
prev_local_window_size_ = ret;
nghttp2_session_set_local_window_size(session_->session(),
NGHTTP2_FLAG_NONE,
id_, 0);
}
Nghttp2Session::Callbacks::Callbacks(bool kHasGetPaddingCallback) {

View File

@ -8,6 +8,7 @@
#include "uv.h"
#include "nghttp2/nghttp2.h"
#include <queue>
#include <stdio.h>
#include <unordered_map>
@ -17,11 +18,11 @@ namespace http2 {
#ifdef NODE_DEBUG_HTTP2
// Adapted from nghttp2 own debug printer
static inline void _debug_vfprintf(const char *fmt, va_list args) {
static inline void _debug_vfprintf(const char* fmt, va_list args) {
vfprintf(stderr, fmt, args);
}
void inline debug_vfprintf(const char *format, ...) {
void inline debug_vfprintf(const char* format, ...) {
va_list args;
va_start(args, format);
_debug_vfprintf(format, args);
@ -39,9 +40,8 @@ class Nghttp2Session;
class Nghttp2Stream;
struct nghttp2_stream_write_t;
struct nghttp2_data_chunk_t;
#define MAX_BUFFER_COUNT 10
#define MAX_BUFFER_COUNT 16
#define SEND_BUFFER_RECOMMENDED_SIZE 4096
enum nghttp2_session_type {
@ -77,18 +77,16 @@ typedef void (*nghttp2_stream_write_cb)(
nghttp2_stream_write_t* req,
int status);
struct nghttp2_stream_write_queue {
struct nghttp2_stream_write {
unsigned int nbufs = 0;
nghttp2_stream_write_t* req = nullptr;
nghttp2_stream_write_cb cb = nullptr;
nghttp2_stream_write_queue* next = nullptr;
MaybeStackBuffer<uv_buf_t, MAX_BUFFER_COUNT> bufs;
};
struct nghttp2_header_list {
struct nghttp2_header {
nghttp2_rcbuf* name = nullptr;
nghttp2_rcbuf* value = nullptr;
nghttp2_header_list* next = nullptr;
};
// Handle Types
@ -157,13 +155,14 @@ class Nghttp2Session {
inline void RemoveStream(int32_t id);
virtual void Send(uv_buf_t* buf, size_t length) {}
virtual void OnHeaders(Nghttp2Stream* stream,
nghttp2_header_list* headers,
nghttp2_headers_category cat,
uint8_t flags) {}
virtual void OnHeaders(
Nghttp2Stream* stream,
std::queue<nghttp2_header>* headers,
nghttp2_headers_category cat,
uint8_t flags) {}
virtual void OnStreamClose(int32_t id, uint32_t code) {}
virtual void OnDataChunk(Nghttp2Stream* stream,
nghttp2_data_chunk_t* chunk) {}
uv_buf_t* chunk) {}
virtual void OnSettings(bool ack) {}
virtual void OnPriority(int32_t id,
int32_t parent,
@ -251,7 +250,7 @@ class Nghttp2Session {
static inline int OnDataChunkReceived(nghttp2_session* session,
uint8_t flags,
int32_t id,
const uint8_t *data,
const uint8_t* data,
size_t len,
void* user_data);
static inline ssize_t OnStreamReadFD(nghttp2_session* session,
@ -304,17 +303,13 @@ class Nghttp2Stream {
int options = 0);
inline ~Nghttp2Stream() {
#if defined(DEBUG) && DEBUG
CHECK_EQ(session_, nullptr);
CHECK_EQ(queue_head_, nullptr);
CHECK_EQ(queue_tail_, nullptr);
CHECK_EQ(data_chunks_head_, nullptr);
CHECK_EQ(data_chunks_tail_, nullptr);
CHECK_EQ(current_headers_head_, nullptr);
CHECK_EQ(current_headers_tail_, nullptr);
#endif
DEBUG_HTTP2("Nghttp2Stream %d: freed\n", id_);
}
inline void FlushDataChunks(bool done = false);
inline void FlushDataChunks();
// Resets the state of the stream instance to defaults
inline void ResetState(
@ -434,23 +429,22 @@ class Nghttp2Stream {
return id_;
}
inline nghttp2_header_list* headers() const {
return current_headers_head_;
inline std::queue<nghttp2_header>* headers() {
return &current_headers_;
}
inline nghttp2_headers_category headers_category() const {
return current_headers_category_;
}
inline void FreeHeaders();
void StartHeaders(nghttp2_headers_category category) {
DEBUG_HTTP2("Nghttp2Stream %d: starting headers, category: %d\n",
id_, category);
// We shouldn't be in the middle of a headers block already.
// Something bad happened if this fails
CHECK_EQ(current_headers_head_, nullptr);
CHECK_EQ(current_headers_tail_, nullptr);
#if defined(DEBUG) && DEBUG
CHECK(current_headers_.empty());
#endif
current_headers_category_ = category;
}
@ -466,23 +460,20 @@ class Nghttp2Stream {
// Outbound Data... This is the data written by the JS layer that is
// waiting to be written out to the socket.
nghttp2_stream_write_queue* queue_head_ = nullptr;
nghttp2_stream_write_queue* queue_tail_ = nullptr;
unsigned int queue_head_index_ = 0;
size_t queue_head_offset_ = 0;
std::queue<nghttp2_stream_write*> queue_;
unsigned int queue_index_ = 0;
size_t queue_offset_ = 0;
int64_t fd_offset_ = 0;
int64_t fd_length_ = -1;
// The Current Headers block... As headers are received for this stream,
// they are temporarily stored here until the OnFrameReceived is called
// signalling the end of the HEADERS frame
nghttp2_header_list* current_headers_head_ = nullptr;
nghttp2_header_list* current_headers_tail_ = nullptr;
nghttp2_headers_category current_headers_category_ = NGHTTP2_HCAT_HEADERS;
std::queue<nghttp2_header> current_headers_;
// Inbound Data... This is the data received via DATA frames for this stream.
nghttp2_data_chunk_t* data_chunks_head_ = nullptr;
nghttp2_data_chunk_t* data_chunks_tail_ = nullptr;
std::queue<uv_buf_t> data_chunks_;
// The RST_STREAM code used to close this stream
int32_t code_ = NGHTTP2_NO_ERROR;
@ -498,13 +489,6 @@ class Nghttp2Stream {
struct nghttp2_stream_write_t {
void* data;
int status;
Nghttp2Stream* handle;
nghttp2_stream_write_queue* item;
};
struct nghttp2_data_chunk_t {
uv_buf_t buf;
nghttp2_data_chunk_t* next = nullptr;
};
} // namespace http2

View File

@ -0,0 +1,89 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const h2 = require('http2');
const net = require('net');
const preamble = Buffer.from([
0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f,
0x32, 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a,
0x0d, 0x0a, 0x00, 0x00, 0x0c, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x03, 0x00, 0x00, 0x00, 0x64, 0x00, 0x04, 0x00, 0x00, 0xff,
0xff, 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00,
0x00, 0xc8, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00,
0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x02, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x07, 0x00,
0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00,
0x00, 0x03, 0x00, 0x00, 0x00, 0x2c, 0x01, 0x24, 0x00, 0x00, 0x00,
0x0d, 0x00, 0x00, 0x00, 0x0b, 0x0f, 0x83, 0x84, 0x86, 0x41, 0x8a,
0xa0, 0xe4, 0x1d, 0x13, 0x9d, 0x09, 0xb8, 0xf0, 0x1e, 0x07, 0x53,
0x03, 0x2a, 0x2f, 0x2a, 0x90, 0x7a, 0x8a, 0xaa, 0x69, 0xd2, 0x9a,
0xc4, 0xc0, 0x57, 0x0b, 0xcb, 0x87, 0x0f, 0x0d, 0x83, 0x08, 0x00,
0x0f
]);
const data = Buffer.from([
0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d,
0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c,
0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a
]);
// This is testing the case of a misbehaving client that is not paying
// attention to flow control. The initial window size is set to data
// payload * 2, which in this case is 36, the stream is paused so
// WINDOW_UPDATE frames are not being sent, which means the window
// size is not being updated. A well behaved client is supposed to
// stop sending until the window size is expanded again.
//
// However, our malicious client keeps sending data beyond the flow
// control window!
//
// Bad client! Bad!
//
// Fortunately, nghttp2 handles this situation for us by keeping track
// of the flow control window and responding with a FLOW_CONTROL_ERROR
// causing the stream to get shut down...
//
// At least, that's what is supposed to happen.
let client;
const server = h2.createServer({ settings: { initialWindowSize: 36 } });
server.on('stream', (stream) => {
// Not reading causes the flow control window to get backed up.
stream.pause();
stream.on('error', common.mustCall((err) => {
common.expectsError({
code: 'ERR_HTTP2_STREAM_ERROR',
type: Error,
message: 'Stream closed with error code 3'
})(err);
server.close();
client.destroy();
}));
stream.on('end', common.mustNotCall());
stream.respond();
stream.end('ok');
});
server.listen(0, () => {
client = net.connect(server.address().port, () => {
client.on('error', console.log);
client.write(preamble);
client.write(data);
client.write(data);
client.write(data);
});
});

View File

@ -0,0 +1,81 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const h2 = require('http2');
const net = require('net');
const preamble = Buffer.from([
0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f,
0x32, 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a,
0x0d, 0x0a, 0x00, 0x00, 0x0c, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x03, 0x00, 0x00, 0x00, 0x64, 0x00, 0x04, 0x00, 0x00, 0xff,
0xff, 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00,
0x00, 0xc8, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00,
0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x02, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x07, 0x00,
0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00,
0x00, 0x03, 0x00, 0x00, 0x00, 0x2c, 0x01, 0x24, 0x00, 0x00, 0x00,
0x0d, 0x00, 0x00, 0x00, 0x0b, 0x0f, 0x83, 0x84, 0x86, 0x41, 0x8a,
0xa0, 0xe4, 0x1d, 0x13, 0x9d, 0x09, 0xb8, 0xf0, 0x1e, 0x07, 0x53,
0x03, 0x2a, 0x2f, 0x2a, 0x90, 0x7a, 0x8a, 0xaa, 0x69, 0xd2, 0x9a,
0xc4, 0xc0, 0x57, 0x0b, 0xcb, 0x87, 0x0f, 0x0d, 0x83, 0x08, 0x00,
0x0f
]);
const data = Buffer.from([
0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d,
0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c,
0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a
]);
// This is testing the case of a misbehaving client that is not paying
// attention to flow control. The initial window size is set to data
// payload, which in this case is 18, the stream is set to flowing so
// WINDOW_UPDATE frames are being sent, but the client is writing
// faster than those can be read.
//
// Bad client! Bad!
//
// Fortunately, nghttp2 handles this situation for us by keeping track
// of the flow control window and responding with a FLOW_CONTROL_ERROR
// causing the stream to get shut down...
//
// At least, that's what is supposed to happen.
let client;
const server = h2.createServer({ settings: { initialWindowSize: 18 } });
server.on('stream', (stream) => {
stream.resume();
stream.on('error', common.mustCall((err) => {
common.expectsError({
code: 'ERR_HTTP2_STREAM_ERROR',
type: Error,
message: 'Stream closed with error code 3'
})(err);
server.close();
client.destroy();
}));
stream.respond();
stream.end('ok');
});
server.listen(0, () => {
client = net.connect(server.address().port, () => {
client.on('error', console.log);
client.write(preamble);
client.write(data);
client.write(data);
client.write(data);
});
});