diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 850307ba284..8d8f200262d 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -279,19 +279,13 @@ function onSessionRead(nread, buf, handle) { assert(stream !== undefined, 'Internal HTTP/2 Failure. Stream does not exist. Please ' + 'report this as a bug in Node.js'); - const state = stream[kState]; _unrefActive(owner); // Reset the session timeout timer _unrefActive(stream); // Reset the stream timeout timer + if (nread >= 0 && !stream.destroyed) + return stream.push(buf); - if (nread >= 0 && !stream.destroyed) { - if (!stream.push(buf)) { - this.streamReadStop(id); - state.reading = false; - } - } else { - // Last chunk was received. End the readable side. - stream.push(null); - } + // Last chunk was received. End the readable side. + stream.push(null); } // Called when the remote peer settings have been updated. @@ -1205,9 +1199,7 @@ function streamOnResume() { return; } const session = this[kSession]; - const state = this[kState]; - if (session && !state.reading) { - state.reading = true; + if (session) { assert(session[kHandle].streamReadStart(this[kID]) === undefined, `HTTP/2 Stream ${this[kID]} does not exist. Please report this as ` + 'a bug in Node.js'); @@ -1216,9 +1208,7 @@ function streamOnResume() { function streamOnPause() { const session = this[kSession]; - const state = this[kState]; - if (session && state.reading) { - state.reading = false; + if (session) { assert(session[kHandle].streamReadStop(this[kID]) === undefined, `HTTP/2 Stream ${this[kID]} does not exist. Please report this as ' + 'a bug in Node.js`); @@ -1412,12 +1402,8 @@ class Http2Stream extends Duplex { return; } _unrefActive(this); - const state = this[kState]; - if (state.reading) - return; - state.reading = true; - assert(this[kSession][kHandle].streamReadStart(this[kID]) === undefined, - `HTTP/2 Stream ${this[kID]} does not exist. Please report this as ` + + assert(this[kSession][kHandle].flushData(this[kID]) === undefined, + 'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' + 'a bug in Node.js'); } diff --git a/src/node_http2.cc b/src/node_http2.cc index 6175ad5a7f3..08942333436 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -4,6 +4,8 @@ #include "node_http2.h" #include "node_http2_state.h" +#include + namespace node { using v8::Boolean; @@ -18,13 +20,8 @@ using v8::Undefined; namespace http2 { -Freelist - data_chunk_free_list; - Freelist stream_free_list; -Freelist header_free_list; - Nghttp2Session::Callbacks Nghttp2Session::callback_struct_saved[2] = { Callbacks(false), Callbacks(true)}; @@ -32,6 +29,8 @@ Nghttp2Session::Callbacks Nghttp2Session::callback_struct_saved[2] = { Http2Options::Http2Options(Environment* env) { nghttp2_option_new(&options_); + nghttp2_option_set_no_auto_window_update(options_, 1); + AliasedBuffer& buffer = env->http2_state()->options_buffer; uint32_t flags = buffer[IDX_OPTIONS_FLAGS]; @@ -89,20 +88,24 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen, HandleScope handle_scope(isolate); Context::Scope context_scope(context); - if (object()->Has(context, env()->ongetpadding_string()).FromJust()) { - AliasedBuffer& buffer = - env()->http2_state()->padding_buffer; - buffer[PADDING_BUF_FRAME_LENGTH] = frameLen; - buffer[PADDING_BUF_MAX_PAYLOAD_LENGTH] = maxPayloadLen; - MakeCallback(env()->ongetpadding_string(), 0, nullptr); - uint32_t retval = buffer[PADDING_BUF_RETURN_VALUE]; - retval = retval <= maxPayloadLen ? retval : maxPayloadLen; - retval = retval >= frameLen ? retval : frameLen; - CHECK_GE(retval, frameLen); - CHECK_LE(retval, maxPayloadLen); - return retval; - } - return frameLen; +#if defined(DEBUG) && DEBUG + CHECK(object->Has(context, env()->ongetpadding_string()).FromJust()); +#endif + + AliasedBuffer& buffer = + env()->http2_state()->padding_buffer; + buffer[PADDING_BUF_FRAME_LENGTH] = frameLen; + buffer[PADDING_BUF_MAX_PAYLOAD_LENGTH] = maxPayloadLen; + buffer[PADDING_BUF_RETURN_VALUE] = frameLen; + MakeCallback(env()->ongetpadding_string(), 0, nullptr); + uint32_t retval = buffer[PADDING_BUF_RETURN_VALUE]; + retval = retval <= maxPayloadLen ? retval : maxPayloadLen; + retval = retval >= frameLen ? retval : frameLen; +#if defined(DEBUG) && DEBUG + CHECK_GE(retval, frameLen); + CHECK_LE(retval, maxPayloadLen); +#endif + return retval; } void Http2Session::SetNextStreamID(const FunctionCallbackInfo& args) { @@ -215,8 +218,10 @@ template void RefreshSettings(const FunctionCallbackInfo& args) { DEBUG_HTTP2("Http2Session: refreshing settings for session\n"); Environment* env = Environment::GetCurrent(args); +#if defined(DEBUG) && DEBUG CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsObject()); +#endif Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args[0].As()); nghttp2_session* s = session->session(); @@ -241,8 +246,10 @@ void RefreshSettings(const FunctionCallbackInfo& args) { void RefreshSessionState(const FunctionCallbackInfo& args) { DEBUG_HTTP2("Http2Session: refreshing session state\n"); Environment* env = Environment::GetCurrent(args); +#if defined(DEBUG) && DEBUG CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsObject()); +#endif AliasedBuffer& buffer = env->http2_state()->session_state_buffer; Http2Session* session; @@ -271,9 +278,11 @@ void RefreshSessionState(const FunctionCallbackInfo& args) { void RefreshStreamState(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); +#if defined(DEBUG) && DEBUG CHECK_EQ(args.Length(), 2); CHECK(args[0]->IsObject()); CHECK(args[1]->IsNumber()); +#endif int32_t id = args[1]->Int32Value(env->context()).ToChecked(); DEBUG_HTTP2("Http2Session: refreshing stream %d state\n", id); Http2Session* session; @@ -321,8 +330,9 @@ void RefreshStreamState(const FunctionCallbackInfo& args) { void Http2Session::New(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); +#if defined(DEBUG) && DEBUG CHECK(args.IsConstructCall()); - +#endif int val = args[0]->IntegerValue(env->context()).ToChecked(); nghttp2_session_type type = static_cast(val); DEBUG_HTTP2("Http2Session: creating a session of type: %d\n", type); @@ -334,7 +344,9 @@ void Http2Session::New(const FunctionCallbackInfo& args) { void Http2Session::Consume(const FunctionCallbackInfo& args) { Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsExternal()); +#endif session->Consume(args[0].As()); } @@ -375,9 +387,12 @@ void Http2Session::SubmitPriority(const FunctionCallbackInfo& args) { DEBUG_HTTP2("Http2Session: submitting priority for stream %d: " "parent: %d, weight: %d, exclusive: %d, silent: %d\n", id, parent, weight, exclusive, silent); + +#if defined(DEBUG) && DEBUG CHECK_GT(id, 0); CHECK_GE(parent, 0); CHECK_GE(weight, 0); +#endif Nghttp2Stream* stream; if (!(stream = session->FindStream(id))) { @@ -455,8 +470,11 @@ void Http2Session::SubmitSettings(const FunctionCallbackInfo& args) { void Http2Session::SubmitRstStream(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); Local context = env->context(); + +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); CHECK(args[1]->IsNumber()); +#endif Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); @@ -480,7 +498,9 @@ void Http2Session::SubmitRequest(const FunctionCallbackInfo& args) { // args[2] parentStream ID (for priority spec) // args[3] weight (for priority spec) // args[4] exclusive boolean (for priority spec) +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsArray()); +#endif Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); @@ -511,8 +531,10 @@ void Http2Session::SubmitRequest(const FunctionCallbackInfo& args) { } void Http2Session::SubmitResponse(const FunctionCallbackInfo& args) { +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); CHECK(args[1]->IsArray()); +#endif Http2Session* session; Nghttp2Stream* stream; @@ -540,11 +562,13 @@ void Http2Session::SubmitResponse(const FunctionCallbackInfo& args) { } void Http2Session::SubmitFile(const FunctionCallbackInfo& args) { +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); // Stream ID CHECK(args[1]->IsNumber()); // File Descriptor CHECK(args[2]->IsArray()); // Headers CHECK(args[3]->IsNumber()); // Offset CHECK(args[4]->IsNumber()); // Length +#endif Http2Session* session; Nghttp2Stream* stream; @@ -562,7 +586,9 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo& args) { int64_t length = args[4]->IntegerValue(context).ToChecked(); int options = args[5]->IntegerValue(context).ToChecked(); +#if defined(DEBUG) && DEBUG CHECK_GE(offset, 0); +#endif DEBUG_HTTP2("Http2Session: submitting file %d for stream %d: headers: %d, " "end-stream: %d\n", fd, id, headers->Length()); @@ -578,8 +604,10 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo& args) { } void Http2Session::SendHeaders(const FunctionCallbackInfo& args) { +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); CHECK(args[1]->IsArray()); +#endif Http2Session* session; Nghttp2Stream* stream; @@ -606,7 +634,9 @@ void Http2Session::SendHeaders(const FunctionCallbackInfo& args) { void Http2Session::ShutdownStream(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); +#endif Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); Nghttp2Stream* stream; @@ -618,10 +648,11 @@ void Http2Session::ShutdownStream(const FunctionCallbackInfo& args) { stream->Shutdown(); } - void Http2Session::StreamReadStart(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); +#endif Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); Nghttp2Stream* stream; @@ -635,7 +666,9 @@ void Http2Session::StreamReadStart(const FunctionCallbackInfo& args) { void Http2Session::StreamReadStop(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); +#endif Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); Nghttp2Stream* stream; @@ -688,9 +721,10 @@ void Http2Session::DestroyStream(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); Http2Session* session; ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); - +#if defined(DEBUG) && DEBUG CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsNumber()); +#endif int32_t id = args[0]->Int32Value(env->context()).ToChecked(); DEBUG_HTTP2("Http2Session: destroy stream %d\n", id); Nghttp2Stream* stream; @@ -700,6 +734,23 @@ void Http2Session::DestroyStream(const FunctionCallbackInfo& args) { stream->Destroy(); } +void Http2Session::FlushData(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + Http2Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); +#if defined(DEBUG) && DEBUG + CHECK_EQ(args.Length(), 1); + CHECK(args[0]->IsNumber()); +#endif + int32_t id = args[0]->Int32Value(env->context()).ToChecked(); + DEBUG_HTTP2("Http2Session: flushing data to js for stream %d\n", id); + Nghttp2Stream* stream; + if (!(stream = session->FindStream(id))) { + return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID); + } + stream->FlushDataChunks(); +} + void Http2Session::SubmitPushPromise(const FunctionCallbackInfo& args) { Http2Session* session; Environment* env = Environment::GetCurrent(args); @@ -707,8 +758,10 @@ void Http2Session::SubmitPushPromise(const FunctionCallbackInfo& args) { Isolate* isolate = env->isolate(); ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); +#if defined(DEBUG) && DEBUG CHECK(args[0]->IsNumber()); // parent stream ID CHECK(args[1]->IsArray()); // headers array +#endif Nghttp2Stream* parent; int32_t id = args[0]->Int32Value(context).ToChecked(); @@ -802,29 +855,28 @@ void Http2Session::OnTrailers(Nghttp2Stream* stream, HandleScope scope(isolate); Context::Scope context_scope(context); - if (object()->Has(context, env()->ontrailers_string()).FromJust()) { - Local argv[1] = { - Integer::New(isolate, stream->id()) - }; + Local argv[1] = { + Integer::New(isolate, stream->id()) + }; - Local ret = MakeCallback(env()->ontrailers_string(), - arraysize(argv), argv).ToLocalChecked(); - if (!ret.IsEmpty()) { - if (ret->IsArray()) { - Local headers = ret.As(); - if (headers->Length() > 0) { - Headers trailers(isolate, context, headers); - submit_trailers.Submit(*trailers, trailers.length()); - } + Local ret = MakeCallback(env()->ontrailers_string(), + arraysize(argv), argv).ToLocalChecked(); + if (!ret.IsEmpty()) { + if (ret->IsArray()) { + Local headers = ret.As(); + if (headers->Length() > 0) { + Headers trailers(isolate, context, headers); + submit_trailers.Submit(*trailers, trailers.length()); } } } } -void Http2Session::OnHeaders(Nghttp2Stream* stream, - nghttp2_header_list* headers, - nghttp2_headers_category cat, - uint8_t flags) { +void Http2Session::OnHeaders( + Nghttp2Stream* stream, + std::queue* headers, + nghttp2_headers_category cat, + uint8_t flags) { Local context = env()->context(); Isolate* isolate = env()->isolate(); Context::Scope context_scope(context); @@ -836,10 +888,12 @@ void Http2Session::OnHeaders(Nghttp2Stream* stream, Local fn = env()->push_values_to_array_function(); Local argv[NODE_PUSH_VAL_TO_ARRAY_MAX * 2]; +#if defined(DEBUG) && DEBUG CHECK_LE(cat, NGHTTP2_HCAT_HEADERS); +#endif - // The headers are passed in above as a linked list of nghttp2_header_list - // structs. The following converts that into a JS array with the structure: + // The headers are passed in above as a queue of nghttp2_header structs. + // The following converts that into a JS array with the structure: // [name1, value1, name2, value2, name3, value3, name3, value4] and so on. // That array is passed up to the JS layer and converted into an Object form // like {name1: value1, name2: value2, name3: [value3, value4]}. We do it @@ -847,16 +901,16 @@ void Http2Session::OnHeaders(Nghttp2Stream* stream, // array than it is to generate and pass the object). do { size_t j = 0; - while (headers != nullptr && j < arraysize(argv) / 2) { - nghttp2_header_list* item = headers; + while (!headers->empty() && j < arraysize(argv) / 2) { + nghttp2_header item = headers->front(); // The header name and value are passed as external one-byte strings name_str = - ExternalHeader::New(env(), item->name).ToLocalChecked(); + ExternalHeader::New(env(), item.name).ToLocalChecked(); value_str = - ExternalHeader::New(env(), item->value).ToLocalChecked(); + ExternalHeader::New(env(), item.value).ToLocalChecked(); argv[j * 2] = name_str; argv[j * 2 + 1] = value_str; - headers = item->next; + headers->pop(); j++; } // For performance, we pass name and value pairs to array.protototype.push @@ -865,17 +919,15 @@ void Http2Session::OnHeaders(Nghttp2Stream* stream, if (j > 0) { fn->Call(env()->context(), holder, j * 2, argv).ToLocalChecked(); } - } while (headers != nullptr); + } while (!headers->empty()); - if (object()->Has(context, env()->onheaders_string()).FromJust()) { - Local argv[4] = { - Integer::New(isolate, stream->id()), - Integer::New(isolate, cat), - Integer::New(isolate, flags), - holder - }; - MakeCallback(env()->onheaders_string(), arraysize(argv), argv); - } + Local args[4] = { + Integer::New(isolate, stream->id()), + Integer::New(isolate, cat), + Integer::New(isolate, flags), + holder + }; + MakeCallback(env()->onheaders_string(), arraysize(args), args); } @@ -884,24 +936,17 @@ void Http2Session::OnStreamClose(int32_t id, uint32_t code) { Local context = env()->context(); HandleScope scope(isolate); Context::Scope context_scope(context); - if (object()->Has(context, env()->onstreamclose_string()).FromJust()) { - Local argv[2] = { - Integer::New(isolate, id), - Integer::NewFromUnsigned(isolate, code) - }; - MakeCallback(env()->onstreamclose_string(), arraysize(argv), argv); - } -} -void FreeDataChunk(char* data, void* hint) { - nghttp2_data_chunk_t* item = reinterpret_cast(hint); - delete[] data; - data_chunk_free_list.push(item); + Local argv[2] = { + Integer::New(isolate, id), + Integer::NewFromUnsigned(isolate, code) + }; + MakeCallback(env()->onstreamclose_string(), arraysize(argv), argv); } void Http2Session::OnDataChunk( Nghttp2Stream* stream, - nghttp2_data_chunk_t* chunk) { + uv_buf_t* chunk) { Isolate* isolate = env()->isolate(); Local context = env()->context(); HandleScope scope(isolate); @@ -912,11 +957,8 @@ void Http2Session::OnDataChunk( ssize_t len = -1; Local buf; if (chunk != nullptr) { - len = chunk->buf.len; - buf = Buffer::New(isolate, - chunk->buf.base, len, - FreeDataChunk, - chunk).ToLocalChecked(); + len = chunk->len; + buf = Buffer::New(isolate, chunk->base, len).ToLocalChecked(); } EmitData(len, buf, obj); } @@ -926,10 +968,9 @@ void Http2Session::OnSettings(bool ack) { Isolate* isolate = env()->isolate(); HandleScope scope(isolate); Context::Scope context_scope(context); - if (object()->Has(context, env()->onsettings_string()).FromJust()) { - Local argv[1] = { Boolean::New(isolate, ack) }; - MakeCallback(env()->onsettings_string(), arraysize(argv), argv); - } + + Local argv[1] = { Boolean::New(isolate, ack) }; + MakeCallback(env()->onsettings_string(), arraysize(argv), argv); } void Http2Session::OnFrameError(int32_t id, uint8_t type, int error_code) { @@ -937,14 +978,13 @@ void Http2Session::OnFrameError(int32_t id, uint8_t type, int error_code) { Isolate* isolate = env()->isolate(); HandleScope scope(isolate); Context::Scope context_scope(context); - if (object()->Has(context, env()->onframeerror_string()).FromJust()) { - Local argv[3] = { - Integer::New(isolate, id), - Integer::New(isolate, type), - Integer::New(isolate, error_code) - }; - MakeCallback(env()->onframeerror_string(), arraysize(argv), argv); - } + + Local argv[3] = { + Integer::New(isolate, id), + Integer::New(isolate, type), + Integer::New(isolate, error_code) + }; + MakeCallback(env()->onframeerror_string(), arraysize(argv), argv); } void Http2Session::OnPriority(int32_t stream, @@ -955,15 +995,14 @@ void Http2Session::OnPriority(int32_t stream, Isolate* isolate = env()->isolate(); HandleScope scope(isolate); Context::Scope context_scope(context); - if (object()->Has(context, env()->onpriority_string()).FromJust()) { - Local argv[4] = { - Integer::New(isolate, stream), - Integer::New(isolate, parent), - Integer::New(isolate, weight), - Boolean::New(isolate, exclusive) - }; - MakeCallback(env()->onpriority_string(), arraysize(argv), argv); - } + + Local argv[4] = { + Integer::New(isolate, stream), + Integer::New(isolate, parent), + Integer::New(isolate, weight), + Boolean::New(isolate, exclusive) + }; + MakeCallback(env()->onpriority_string(), arraysize(argv), argv); } void Http2Session::OnGoAway(int32_t lastStreamID, @@ -974,21 +1013,20 @@ void Http2Session::OnGoAway(int32_t lastStreamID, Isolate* isolate = env()->isolate(); HandleScope scope(isolate); Context::Scope context_scope(context); - if (object()->Has(context, env()->ongoawaydata_string()).FromJust()) { - Local argv[3] = { - Integer::NewFromUnsigned(isolate, errorCode), - Integer::New(isolate, lastStreamID), - Undefined(isolate) - }; - if (length > 0) { - argv[2] = Buffer::Copy(isolate, - reinterpret_cast(data), - length).ToLocalChecked(); - } + Local argv[3] = { + Integer::NewFromUnsigned(isolate, errorCode), + Integer::New(isolate, lastStreamID), + Undefined(isolate) + }; - MakeCallback(env()->ongoawaydata_string(), arraysize(argv), argv); + if (length > 0) { + argv[2] = Buffer::Copy(isolate, + reinterpret_cast(data), + length).ToLocalChecked(); } + + MakeCallback(env()->ongoawaydata_string(), arraysize(argv), argv); } void Http2Session::OnStreamAllocImpl(size_t suggested_size, @@ -1030,9 +1068,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, void Http2Session::Consume(Local external) { DEBUG_HTTP2("Http2Session: consuming socket\n"); +#if defined(DEBUG) && DEBUG CHECK(prev_alloc_cb_.is_empty()); +#endif StreamBase* stream = static_cast(external->Value()); +#if defined(DEBUG) && DEBUG CHECK_NE(stream, nullptr); +#endif stream->Consume(); stream_ = stream; prev_alloc_cb_ = stream->alloc_cb(); @@ -1057,11 +1099,15 @@ void Http2Session::Unconsume() { Headers::Headers(Isolate* isolate, Local context, Local headers) { +#if defined(DEBUG) && DEBUG CHECK_EQ(headers->Length(), 2); +#endif Local header_string = headers->Get(context, 0).ToLocalChecked(); Local header_count = headers->Get(context, 1).ToLocalChecked(); +#if defined(DEBUG) && DEBUG CHECK(header_string->IsString()); CHECK(header_count->IsUint32()); +#endif count_ = header_count.As()->Value(); int header_string_len = header_string.As()->Length(); @@ -1112,8 +1158,10 @@ Headers::Headers(Isolate* isolate, p += nva[n].valuelen + 1; } +#if defined(DEBUG) && DEBUG CHECK_EQ(p, header_contents + header_string_len); CHECK_EQ(n, count_); +#endif } @@ -1199,6 +1247,8 @@ void Initialize(Local target, Http2Session::SetNextStreamID); env->SetProtoMethod(session, "destroyStream", Http2Session::DestroyStream); + env->SetProtoMethod(session, "flushData", + Http2Session::FlushData); StreamBase::AddMethods(env, session, StreamBase::kFlagHasWritev | StreamBase::kFlagNoShutdown); diff --git a/src/node_http2.h b/src/node_http2.h index 68801eeb54e..b416243823a 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -8,6 +8,8 @@ #include "stream_base-inl.h" #include "string_bytes.h" +#include + namespace node { namespace http2 { @@ -17,6 +19,10 @@ using v8::EscapableHandleScope; using v8::Isolate; using v8::MaybeLocal; +// Unlike the HTTP/1 implementation, the HTTP/2 implementation is not limited +// to a fixed number of known supported HTTP methods. These constants, therefore +// are provided strictly as a convenience to users and are exposed via the +// require('http2').constants object. #define HTTP_KNOWN_METHODS(V) \ V(ACL, "ACL") \ V(BASELINE_CONTROL, "BASELINE-CONTROL") \ @@ -58,6 +64,8 @@ using v8::MaybeLocal; V(UPDATEREDIRECTREF, "UPDATEREDIRECTREF") \ V(VERSION_CONTROL, "VERSION-CONTROL") +// These are provided strictly as a convenience to users and are exposed via the +// require('http2').constants objects #define HTTP_KNOWN_HEADERS(V) \ V(STATUS, ":status") \ V(METHOD, ":method") \ @@ -143,6 +151,9 @@ HTTP_KNOWN_HEADERS(V) HTTP_KNOWN_HEADER_MAX }; +// While some of these codes are used within the HTTP/2 implementation in +// core, they are provided strictly as a convenience to users and are exposed +// via the require('http2').constants object. #define HTTP_STATUS_CODES(V) \ V(CONTINUE, 100) \ V(SWITCHING_PROTOCOLS, 101) \ @@ -213,15 +224,22 @@ HTTP_STATUS_CODES(V) #undef V }; +// The Padding Strategy determines the method by which extra padding is +// selected for HEADERS and DATA frames. These are configurable via the +// options passed in to a Http2Session object. enum padding_strategy_type { - // No padding strategy + // No padding strategy. This is the default. PADDING_STRATEGY_NONE, // Padding will ensure all data frames are maxFrameSize PADDING_STRATEGY_MAX, - // Padding will be determined via JS callback + // Padding will be determined via a JS callback. Note that this can be + // expensive because the callback is called once for every DATA and + // HEADERS frame. For performance reasons, this strategy should be + // avoided. PADDING_STRATEGY_CALLBACK }; +// These are the error codes provided by the underlying nghttp2 implementation. #define NGHTTP2_ERROR_CODES(V) \ V(NGHTTP2_ERR_INVALID_ARGUMENT) \ V(NGHTTP2_ERR_BUFFER_ERROR) \ @@ -281,6 +299,10 @@ const char* nghttp2_errname(int rv) { #define MIN_MAX_FRAME_SIZE DEFAULT_SETTINGS_MAX_FRAME_SIZE #define MAX_INITIAL_WINDOW_SIZE 2147483647 +// The Http2Options class is used to parse the options object passed in to +// a Http2Session object and convert those into an appropriate nghttp2_option +// struct. This is the primary mechanism by which the Http2Session object is +// configured. class Http2Options { public: explicit Http2Options(Environment* env); @@ -294,7 +316,9 @@ class Http2Options { } void SetPaddingStrategy(padding_strategy_type val) { +#if DEBUG CHECK_LE(val, PADDING_STRATEGY_CALLBACK); +#endif padding_strategy_ = static_cast(val); } @@ -364,18 +388,21 @@ class Http2Session : public AsyncWrap, return OnMaxFrameSizePadding(frameLength, maxPayloadLen); } +#if defined(DEBUG) && DEBUG CHECK_EQ(padding_strategy_, PADDING_STRATEGY_CALLBACK); +#endif return OnCallbackPadding(frameLength, maxPayloadLen); } - void OnHeaders(Nghttp2Stream* stream, - nghttp2_header_list* headers, - nghttp2_headers_category cat, - uint8_t flags) override; + void OnHeaders( + Nghttp2Stream* stream, + std::queue* headers, + nghttp2_headers_category cat, + uint8_t flags) override; void OnStreamClose(int32_t id, uint32_t code) override; void Send(uv_buf_t* bufs, size_t total) override; - void OnDataChunk(Nghttp2Stream* stream, nghttp2_data_chunk_t* chunk) override; + void OnDataChunk(Nghttp2Stream* stream, uv_buf_t* chunk) override; void OnSettings(bool ack) override; void OnPriority(int32_t stream, int32_t parent, @@ -447,6 +474,7 @@ class Http2Session : public AsyncWrap, static void SendShutdownNotice(const FunctionCallbackInfo& args); static void SubmitGoaway(const FunctionCallbackInfo& args); static void DestroyStream(const FunctionCallbackInfo& args); + static void FlushData(const FunctionCallbackInfo& args); template static void GetSettings(const FunctionCallbackInfo& args); diff --git a/src/node_http2_core-inl.h b/src/node_http2_core-inl.h index 085631dc388..5bfe1dc19a9 100644 --- a/src/node_http2_core-inl.h +++ b/src/node_http2_core-inl.h @@ -10,26 +10,12 @@ namespace node { namespace http2 { -#define FREELIST_MAX 1024 - -#define LINKED_LIST_ADD(list, item) \ - do { \ - if (list ## _tail_ == nullptr) { \ - list ## _head_ = item; \ - list ## _tail_ = item; \ - } else { \ - list ## _tail_->next = item; \ - list ## _tail_ = item; \ - } \ - } while (0); - -extern Freelist - data_chunk_free_list; +#define FREELIST_MAX 10240 +// Instances of Nghttp2Stream are created and pooled in order to speed +// allocation under load. extern Freelist stream_free_list; -extern Freelist header_free_list; - #ifdef NODE_DEBUG_HTTP2 inline int Nghttp2Session::OnNghttpError(nghttp2_session* session, const char* message, @@ -71,8 +57,8 @@ inline int Nghttp2Session::OnBeginHeadersCallback(nghttp2_session* session, // and transparently so we do not need to worry about those at all. inline int Nghttp2Session::OnHeaderCallback(nghttp2_session* session, const nghttp2_frame* frame, - nghttp2_rcbuf *name, - nghttp2_rcbuf *value, + nghttp2_rcbuf* name, + nghttp2_rcbuf* value, uint8_t flags, void* user_data) { Nghttp2Session* handle = static_cast(user_data); @@ -82,12 +68,15 @@ inline int Nghttp2Session::OnHeaderCallback(nghttp2_session* session, frame->push_promise.promised_stream_id : frame->hd.stream_id; Nghttp2Stream* stream = handle->FindStream(id); - nghttp2_header_list* header = header_free_list.pop(); - header->name = name; - header->value = value; + // The header name and value are stored in a reference counted buffer + // provided to us by nghttp2. We need to increment the reference counter + // here, then decrement it when we're done using it later. nghttp2_rcbuf_incref(name); nghttp2_rcbuf_incref(value); - LINKED_LIST_ADD(stream->current_headers, header); + nghttp2_header header; + header.name = name; + header.value = value; + stream->headers()->emplace(header); return 0; } @@ -107,6 +96,7 @@ inline int Nghttp2Session::OnFrameReceive(nghttp2_session* session, handle->HandleDataFrame(frame); break; case NGHTTP2_PUSH_PROMISE: + // Intentional fall-through, handled just like headers frames case NGHTTP2_HEADERS: handle->HandleHeadersFrame(frame); break; @@ -126,18 +116,24 @@ inline int Nghttp2Session::OnFrameReceive(nghttp2_session* session, return 0; } -inline int Nghttp2Session::OnFrameNotSent(nghttp2_session *session, - const nghttp2_frame *frame, +// nghttp2 will call this if an error occurs attempting to send a frame. +// Unless the stream or session is closed, this really should not happen +// unless there is a serious flaw in our implementation. +inline int Nghttp2Session::OnFrameNotSent(nghttp2_session* session, + const nghttp2_frame* frame, int error_code, - void *user_data) { - Nghttp2Session *handle = static_cast(user_data); + void* user_data) { + Nghttp2Session* handle = static_cast(user_data); DEBUG_HTTP2("Nghttp2Session %s: frame type %d was not sent, code: %d\n", handle->TypeName(), frame->hd.type, error_code); // Do not report if the frame was not sent due to the session closing if (error_code != NGHTTP2_ERR_SESSION_CLOSING && error_code != NGHTTP2_ERR_STREAM_CLOSED && - error_code != NGHTTP2_ERR_STREAM_CLOSING) - handle->OnFrameError(frame->hd.stream_id, frame->hd.type, error_code); + error_code != NGHTTP2_ERR_STREAM_CLOSING) { + handle->OnFrameError(frame->hd.stream_id, + frame->hd.type, + error_code); + } return 0; } @@ -153,14 +149,14 @@ inline int Nghttp2Session::OnInvalidHeader(nghttp2_session* session, // Called when nghttp2 closes a stream, either in response to an RST_STREAM // frame or the stream closing naturally on it's own -inline int Nghttp2Session::OnStreamClose(nghttp2_session *session, +inline int Nghttp2Session::OnStreamClose(nghttp2_session* session, int32_t id, uint32_t code, - void *user_data) { - Nghttp2Session *handle = static_cast(user_data); + void* user_data) { + Nghttp2Session*handle = static_cast(user_data); DEBUG_HTTP2("Nghttp2Session %s: stream %d closed, code: %d\n", handle->TypeName(), id, code); - Nghttp2Stream *stream = handle->FindStream(id); + Nghttp2Stream* stream = handle->FindStream(id); // Intentionally ignore the callback if the stream does not exist if (stream != nullptr) stream->Close(code); @@ -170,17 +166,17 @@ inline int Nghttp2Session::OnStreamClose(nghttp2_session *session, // Called by nghttp2 to collect the data while a file response is sent. // The buf is the DATA frame buffer that needs to be filled with at most // length bytes. flags is used to control what nghttp2 does next. -inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session *session, +inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session* session, int32_t id, - uint8_t *buf, + uint8_t* buf, size_t length, - uint32_t *flags, - nghttp2_data_source *source, - void *user_data) { - Nghttp2Session *handle = static_cast(user_data); + uint32_t* flags, + nghttp2_data_source* source, + void* user_data) { + Nghttp2Session* handle = static_cast(user_data); DEBUG_HTTP2("Nghttp2Session %s: reading outbound file data for stream %d\n", handle->TypeName(), id); - Nghttp2Stream *stream = handle->FindStream(id); + Nghttp2Stream* stream = handle->FindStream(id); int fd = source->fd; int64_t offset = stream->fd_offset_; @@ -191,7 +187,7 @@ inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session *session, length = stream->fd_length_; uv_buf_t data; - data.base = reinterpret_cast(buf); + data.base = reinterpret_cast(buf); data.len = length; uv_fs_t read_req; @@ -226,54 +222,53 @@ inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session *session, // Called by nghttp2 to collect the data to pack within a DATA frame. // The buf is the DATA frame buffer that needs to be filled with at most // length bytes. flags is used to control what nghttp2 does next. -inline ssize_t Nghttp2Session::OnStreamRead(nghttp2_session *session, +inline ssize_t Nghttp2Session::OnStreamRead(nghttp2_session* session, int32_t id, - uint8_t *buf, + uint8_t* buf, size_t length, - uint32_t *flags, - nghttp2_data_source *source, - void *user_data) { - Nghttp2Session *handle = static_cast(user_data); + uint32_t* flags, + nghttp2_data_source* source, + void* user_data) { + Nghttp2Session* handle = static_cast(user_data); DEBUG_HTTP2("Nghttp2Session %s: reading outbound data for stream %d\n", handle->TypeName(), id); - Nghttp2Stream *stream = handle->FindStream(id); + Nghttp2Stream* stream = handle->FindStream(id); size_t remaining = length; size_t offset = 0; // While there is data in the queue, copy data into buf until it is full. // There may be data left over, which will be sent the next time nghttp // calls this callback. - while (stream->queue_head_ != nullptr) { + while (!stream->queue_.empty()) { DEBUG_HTTP2("Nghttp2Session %s: processing outbound data chunk\n", handle->TypeName()); - nghttp2_stream_write_queue *head = stream->queue_head_; - while (stream->queue_head_index_ < head->nbufs) { + nghttp2_stream_write* head = stream->queue_.front(); + while (stream->queue_index_ < head->nbufs) { if (remaining == 0) goto end; - unsigned int n = stream->queue_head_index_; + unsigned int n = stream->queue_index_; // len is the number of bytes in head->bufs[n] that are yet to be written - size_t len = head->bufs[n].len - stream->queue_head_offset_; + size_t len = head->bufs[n].len - stream->queue_offset_; size_t bytes_to_write = len < remaining ? len : remaining; memcpy(buf + offset, - head->bufs[n].base + stream->queue_head_offset_, + head->bufs[n].base + stream->queue_offset_, bytes_to_write); offset += bytes_to_write; remaining -= bytes_to_write; if (bytes_to_write < len) { - stream->queue_head_offset_ += bytes_to_write; + stream->queue_offset_ += bytes_to_write; } else { - stream->queue_head_index_++; - stream->queue_head_offset_ = 0; + stream->queue_index_++; + stream->queue_offset_ = 0; } } - stream->queue_head_offset_ = 0; - stream->queue_head_index_ = 0; - stream->queue_head_ = head->next; + stream->queue_offset_ = 0; + stream->queue_index_ = 0; head->cb(head->req, 0); delete head; + stream->queue_.pop(); } - stream->queue_tail_ = nullptr; end: // If we are no longer writable and there is no more data in the queue, @@ -283,8 +278,8 @@ end: // that will wait for data to become available. // If neither of these flags are set, then nghttp2 will call this callback // again to get the data for the next DATA frame. - int writable = stream->queue_head_ != nullptr || stream->IsWritable(); - if (offset == 0 && writable && stream->queue_head_ == nullptr) { + int writable = !stream->queue_.empty() || stream->IsWritable(); + if (offset == 0 && writable && stream->queue_.empty()) { DEBUG_HTTP2("Nghttp2Session %s: deferring stream %d\n", handle->TypeName(), id); return NGHTTP2_ERR_DEFERRED; @@ -296,64 +291,70 @@ end: GetTrailers(session, handle, stream, flags); } +#if defined(DEBUG) && DEBUG CHECK(offset <= length); +#endif return offset; } // Called by nghttp2 when it needs to determine how much padding to apply // to a DATA or HEADERS frame -inline ssize_t Nghttp2Session::OnSelectPadding(nghttp2_session *session, - const nghttp2_frame *frame, +inline ssize_t Nghttp2Session::OnSelectPadding(nghttp2_session* session, + const nghttp2_frame* frame, size_t maxPayloadLen, - void *user_data) { - Nghttp2Session *handle = static_cast(user_data); + void* user_data) { + Nghttp2Session* handle = static_cast(user_data); +#if defined(DEBUG) && DEBUG CHECK(handle->HasGetPaddingCallback()); +#endif ssize_t padding = handle->GetPadding(frame->hd.length, maxPayloadLen); DEBUG_HTTP2("Nghttp2Session %s: using padding, size: %d\n", handle->TypeName(), padding); return padding; } -// Called by nghttp2 multiple times while processing a DATA frame -inline int Nghttp2Session::OnDataChunkReceived(nghttp2_session *session, +// While nghttp2 is processing a DATA frame, it will call the +// OnDataChunkReceived callback multiple times, passing along individual +// chunks of data from the DATA frame payload. These *must* be memcpy'd +// out because the pointer to the data will quickly become invalid. +inline int Nghttp2Session::OnDataChunkReceived(nghttp2_session* session, uint8_t flags, int32_t id, - const uint8_t *data, + const uint8_t* data, size_t len, - void *user_data) { - Nghttp2Session *handle = static_cast(user_data); + void* user_data) { + Nghttp2Session* handle = static_cast(user_data); DEBUG_HTTP2("Nghttp2Session %s: buffering data chunk for stream %d, size: " "%d, flags: %d\n", handle->TypeName(), id, len, flags); - Nghttp2Stream *stream = handle->FindStream(id); - nghttp2_data_chunk_t *chunk = data_chunk_free_list.pop(); - chunk->buf = uv_buf_init(new char[len], len); - memcpy(chunk->buf.base, data, len); - if (stream->data_chunks_tail_ == nullptr) { - stream->data_chunks_head_ = - stream->data_chunks_tail_ = chunk; - } else { - stream->data_chunks_tail_->next = chunk; - stream->data_chunks_tail_ = chunk; + // We should never actually get a 0-length chunk so this check is + // only a precaution at this point. + if (len > 0) { + nghttp2_session_consume_connection(session, len); + Nghttp2Stream* stream = handle->FindStream(id); + char* buf = Malloc(len); + memcpy(buf, data, len); + stream->data_chunks_.emplace(uv_buf_init(buf, len)); } return 0; } -inline void Nghttp2Session::GetTrailers(nghttp2_session *session, - Nghttp2Session *handle, - Nghttp2Stream *stream, - uint32_t *flags) { +// Only when we are done sending the last chunk of data do we check for +// any trailing headers that are to be sent. This is the only opportunity +// we have to make this check. If there are trailers, then the +// NGHTTP2_DATA_FLAG_NO_END_STREAM flag must be set. +inline void Nghttp2Session::GetTrailers(nghttp2_session* session, + Nghttp2Session* handle, + Nghttp2Stream* stream, + uint32_t* flags) { if (stream->GetTrailers()) { - // Only when we are done sending the last chunk of data do we check for - // any trailing headers that are to be sent. This is the only opportunity - // we have to make this check. If there are trailers, then the - // NGHTTP2_DATA_FLAG_NO_END_STREAM flag must be set. SubmitTrailers submit_trailers{handle, stream, flags}; handle->OnTrailers(stream, submit_trailers); } } -inline void Nghttp2Session::SubmitTrailers::Submit(nghttp2_nv *trailers, +// Submits any trailing header fields that have been collected +inline void Nghttp2Session::SubmitTrailers::Submit(nghttp2_nv* trailers, size_t length) const { if (length == 0) return; @@ -367,6 +368,7 @@ inline void Nghttp2Session::SubmitTrailers::Submit(nghttp2_nv *trailers, length); } +// Submits a graceful shutdown notice to nghttp // See: https://nghttp2.org/documentation/nghttp2_submit_shutdown_notice.html inline void Nghttp2Session::SubmitShutdownNotice() { DEBUG_HTTP2("Nghttp2Session %s: submitting shutdown notice\n", @@ -397,33 +399,36 @@ inline Nghttp2Stream* Nghttp2Session::FindStream(int32_t id) { } } -// Flushes any received queued chunks of data out to the JS layer -inline void Nghttp2Stream::FlushDataChunks(bool done) { - while (data_chunks_head_ != nullptr) { - DEBUG_HTTP2("Nghttp2Stream %d: emitting data chunk\n", id_); - nghttp2_data_chunk_t* item = data_chunks_head_; - data_chunks_head_ = item->next; - // item will be passed to the Buffer instance and freed on gc - session_->OnDataChunk(this, item); +// Flushes one buffered data chunk at a time. +inline void Nghttp2Stream::FlushDataChunks() { + if (!data_chunks_.empty()) { + uv_buf_t buf = data_chunks_.front(); + data_chunks_.pop(); + if (buf.len > 0) { + nghttp2_session_consume_stream(session_->session(), id_, buf.len); + session_->OnDataChunk(this, &buf); + } else { + session_->OnDataChunk(this, nullptr); + } } - data_chunks_tail_ = nullptr; - if (done) - session_->OnDataChunk(this, nullptr); } -// Passes all of the the chunks for a data frame out to the JS layer -// The chunks are collected as the frame is being processed and sent out -// to the JS side only when the frame is fully processed. +// Called when a DATA frame has been completely processed. Will check to +// see if the END_STREAM flag is set, and will flush the queued data chunks +// to JS if the stream is flowing inline void Nghttp2Session::HandleDataFrame(const nghttp2_frame* frame) { int32_t id = frame->hd.stream_id; DEBUG_HTTP2("Nghttp2Session %s: handling data frame for stream %d\n", TypeName(), id); Nghttp2Stream* stream = this->FindStream(id); // If the stream does not exist, something really bad happened +#if defined(DEBUG) && DEBUG CHECK_NE(stream, nullptr); - bool done = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) == - NGHTTP2_FLAG_END_STREAM; - stream->FlushDataChunks(done); +#endif + if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) + stream->data_chunks_.emplace(uv_buf_init(0, 0)); + if (stream->IsReading()) + stream->FlushDataChunks(); } // Passes all of the collected headers for a HEADERS frame out to the JS layer. @@ -436,12 +441,13 @@ inline void Nghttp2Session::HandleHeadersFrame(const nghttp2_frame* frame) { TypeName(), id); Nghttp2Stream* stream = FindStream(id); // If the stream does not exist, something really bad happened +#if defined(DEBUG) && DEBUG CHECK_NE(stream, nullptr); +#endif OnHeaders(stream, stream->headers(), stream->headers_category(), frame->hd.flags); - stream->FreeHeaders(); } // Notifies the JS layer that a PRIORITY frame has been received @@ -450,13 +456,17 @@ inline void Nghttp2Session::HandlePriorityFrame(const nghttp2_frame* frame) { int32_t id = frame->hd.stream_id; DEBUG_HTTP2("Nghttp2Session %s: handling priority frame for stream %d\n", TypeName(), id); - // Ignore the priority frame if stream ID is <= 0 - // This actually should never happen because nghttp2 should treat this as - // an error condition that terminates the session. - if (id > 0) { - nghttp2_priority_spec spec = priority_frame.pri_spec; - OnPriority(id, spec.stream_id, spec.weight, spec.exclusive); - } + + // Priority frame stream ID should never be <= 0. nghttp2 handles this + // as an error condition that terminates the session, so we should be + // good here + +#if defined(DEBUG) && DEBUG + CHECK_GT(id, 0) +#endif + + nghttp2_priority_spec spec = priority_frame.pri_spec; + OnPriority(id, spec.stream_id, spec.weight, spec.exclusive); } // Notifies the JS layer that a GOAWAY frame has been received @@ -579,7 +589,6 @@ inline int Nghttp2Session::Init(uv_loop_t* loop, Nghttp2Session* session = ContainerOf(&Nghttp2Session::prep_, t); session->SendPendingData(); }); -// uv_unref(reinterpret_cast(&prep_)); return ret; } @@ -588,7 +597,9 @@ inline void Nghttp2Session::MarkDestroying() { } inline int Nghttp2Session::Free() { +#if defined(DEBUG) && DEBUG CHECK(session_ != nullptr); +#endif DEBUG_HTTP2("Nghttp2Session %s: freeing session\n", TypeName()); // Stop the loop CHECK_EQ(uv_prepare_stop(&prep_), 0); @@ -657,19 +668,22 @@ inline void Nghttp2Stream::ResetState( int options) { DEBUG_HTTP2("Nghttp2Stream %d: resetting stream state\n", id); session_ = session; - queue_head_ = nullptr; - queue_tail_ = nullptr; - data_chunks_head_ = nullptr; - data_chunks_tail_ = nullptr; - current_headers_head_ = nullptr; - current_headers_tail_ = nullptr; + while (!queue_.empty()) { + nghttp2_stream_write* head = queue_.front(); + delete head; + queue_.pop(); + } + while (!data_chunks_.empty()) + data_chunks_.pop(); + while (!current_headers_.empty()) + current_headers_.pop(); current_headers_category_ = category; flags_ = NGHTTP2_STREAM_FLAG_NONE; id_ = id; code_ = NGHTTP2_NO_ERROR; prev_local_window_size_ = 65535; - queue_head_index_ = 0; - queue_head_offset_ = 0; + queue_index_ = 0; + queue_offset_ = 0; getTrailers_ = options & STREAM_OPTION_GET_TRAILERS; } @@ -689,41 +703,25 @@ inline void Nghttp2Stream::Destroy() { } // Free any remaining incoming data chunks. - while (data_chunks_head_ != nullptr) { - nghttp2_data_chunk_t* chunk = data_chunks_head_; - data_chunks_head_ = chunk->next; - delete[] chunk->buf.base; - data_chunk_free_list.push(chunk); - } - data_chunks_tail_ = nullptr; + while (!data_chunks_.empty()) + data_chunks_.pop(); // Free any remaining outgoing data chunks. - while (queue_head_ != nullptr) { - nghttp2_stream_write_queue* head = queue_head_; - queue_head_ = head->next; + while (!queue_.empty()) { + nghttp2_stream_write* head = queue_.front(); head->cb(head->req, UV_ECANCELED); delete head; + queue_.pop(); } - queue_tail_ = nullptr; // Free any remaining headers - FreeHeaders(); + while (!current_headers_.empty()) + current_headers_.pop(); // Return this stream instance to the freelist stream_free_list.push(this); } -inline void Nghttp2Stream::FreeHeaders() { - DEBUG_HTTP2("Nghttp2Stream %d: freeing headers\n", id_); - while (current_headers_head_ != nullptr) { - DEBUG_HTTP2("Nghttp2Stream %d: freeing header item\n", id_); - nghttp2_header_list* item = current_headers_head_; - current_headers_head_ = item->next; - header_free_list.push(item); - } - current_headers_tail_ = nullptr; -} - // Submit informational headers for a stream. inline int Nghttp2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { DEBUG_HTTP2("Nghttp2Stream %d: sending informational headers, count: %d\n", @@ -762,7 +760,9 @@ inline int32_t Nghttp2Stream::SubmitPushPromise( size_t len, Nghttp2Stream** assigned, int options) { +#if defined(DEBUG) && DEBUG CHECK_GT(len, 0); +#endif DEBUG_HTTP2("Nghttp2Stream %d: sending push promise\n", id_); int32_t ret = nghttp2_submit_push_promise(session_->session(), NGHTTP2_FLAG_NONE, @@ -784,7 +784,9 @@ inline int32_t Nghttp2Stream::SubmitPushPromise( inline int Nghttp2Stream::SubmitResponse(nghttp2_nv* nva, size_t len, int options) { +#if defined(DEBUG) && DEBUG CHECK_GT(len, 0); +#endif DEBUG_HTTP2("Nghttp2Stream %d: submitting response\n", id_); getTrailers_ = options & STREAM_OPTION_GET_TRAILERS; nghttp2_data_provider* provider = nullptr; @@ -804,8 +806,10 @@ inline int Nghttp2Stream::SubmitFile(int fd, int64_t offset, int64_t length, int options) { +#if defined(DEBUG) && DEBUG CHECK_GT(len, 0); CHECK_GT(fd, 0); +#endif DEBUG_HTTP2("Nghttp2Stream %d: submitting file\n", id_); getTrailers_ = options & STREAM_OPTION_GET_TRAILERS; nghttp2_data_provider prov; @@ -829,7 +833,9 @@ inline int32_t Nghttp2Session::SubmitRequest( size_t len, Nghttp2Stream** assigned, int options) { +#if defined(DEBUG) && DEBUG CHECK_GT(len, 0); +#endif DEBUG_HTTP2("Nghttp2Session: submitting request\n"); nghttp2_data_provider* provider = nullptr; nghttp2_data_provider prov; @@ -868,42 +874,21 @@ inline int Nghttp2Stream::Write(nghttp2_stream_write_t* req, } DEBUG_HTTP2("Nghttp2Stream %d: queuing buffers to send, count: %d\n", id_, nbufs); - nghttp2_stream_write_queue* item = new nghttp2_stream_write_queue; + nghttp2_stream_write* item = new nghttp2_stream_write; item->cb = cb; item->req = req; item->nbufs = nbufs; item->bufs.AllocateSufficientStorage(nbufs); - req->handle = this; - req->item = item; memcpy(*(item->bufs), bufs, nbufs * sizeof(*bufs)); - - if (queue_head_ == nullptr) { - queue_head_ = item; - queue_tail_ = item; - } else { - queue_tail_->next = item; - queue_tail_ = item; - } + queue_.push(item); nghttp2_session_resume_data(session_->session(), id_); return 0; } inline void Nghttp2Stream::ReadStart() { - // Has no effect if IsReading() is true. if (IsReading()) return; DEBUG_HTTP2("Nghttp2Stream %d: start reading\n", id_); - if (IsPaused()) { - // If handle->reading is less than zero, read_start had never previously - // been called. If handle->reading is zero, reading had started and read - // stop had been previously called, meaning that the flow control window - // has been explicitly set to zero. Reset the flow control window now to - // restart the flow of data. - nghttp2_session_set_local_window_size(session_->session(), - NGHTTP2_FLAG_NONE, - id_, - prev_local_window_size_); - } flags_ |= NGHTTP2_STREAM_FLAG_READ_START; flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED; @@ -913,22 +898,9 @@ inline void Nghttp2Stream::ReadStart() { inline void Nghttp2Stream::ReadStop() { DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_); - // Has no effect if IsReading() is false, which will happen if we either - // have not started reading yet at all (NGHTTP2_STREAM_FLAG_READ_START is not - // set) or if we're already paused (NGHTTP2_STREAM_FLAG_READ_PAUSED is set. if (!IsReading()) return; flags_ |= NGHTTP2_STREAM_FLAG_READ_PAUSED; - - // When not reading, explicitly set the local window size to 0 so that - // the peer does not keep sending data that has to be buffered - int32_t ret = - nghttp2_session_get_stream_local_window_size(session_->session(), id_); - if (ret >= 0) - prev_local_window_size_ = ret; - nghttp2_session_set_local_window_size(session_->session(), - NGHTTP2_FLAG_NONE, - id_, 0); } Nghttp2Session::Callbacks::Callbacks(bool kHasGetPaddingCallback) { diff --git a/src/node_http2_core.h b/src/node_http2_core.h index 1a8e9a9f57b..bb536f1159d 100644 --- a/src/node_http2_core.h +++ b/src/node_http2_core.h @@ -8,6 +8,7 @@ #include "uv.h" #include "nghttp2/nghttp2.h" +#include #include #include @@ -17,11 +18,11 @@ namespace http2 { #ifdef NODE_DEBUG_HTTP2 // Adapted from nghttp2 own debug printer -static inline void _debug_vfprintf(const char *fmt, va_list args) { +static inline void _debug_vfprintf(const char* fmt, va_list args) { vfprintf(stderr, fmt, args); } -void inline debug_vfprintf(const char *format, ...) { +void inline debug_vfprintf(const char* format, ...) { va_list args; va_start(args, format); _debug_vfprintf(format, args); @@ -39,9 +40,8 @@ class Nghttp2Session; class Nghttp2Stream; struct nghttp2_stream_write_t; -struct nghttp2_data_chunk_t; -#define MAX_BUFFER_COUNT 10 +#define MAX_BUFFER_COUNT 16 #define SEND_BUFFER_RECOMMENDED_SIZE 4096 enum nghttp2_session_type { @@ -77,18 +77,16 @@ typedef void (*nghttp2_stream_write_cb)( nghttp2_stream_write_t* req, int status); -struct nghttp2_stream_write_queue { +struct nghttp2_stream_write { unsigned int nbufs = 0; nghttp2_stream_write_t* req = nullptr; nghttp2_stream_write_cb cb = nullptr; - nghttp2_stream_write_queue* next = nullptr; MaybeStackBuffer bufs; }; -struct nghttp2_header_list { +struct nghttp2_header { nghttp2_rcbuf* name = nullptr; nghttp2_rcbuf* value = nullptr; - nghttp2_header_list* next = nullptr; }; // Handle Types @@ -157,13 +155,14 @@ class Nghttp2Session { inline void RemoveStream(int32_t id); virtual void Send(uv_buf_t* buf, size_t length) {} - virtual void OnHeaders(Nghttp2Stream* stream, - nghttp2_header_list* headers, - nghttp2_headers_category cat, - uint8_t flags) {} + virtual void OnHeaders( + Nghttp2Stream* stream, + std::queue* headers, + nghttp2_headers_category cat, + uint8_t flags) {} virtual void OnStreamClose(int32_t id, uint32_t code) {} virtual void OnDataChunk(Nghttp2Stream* stream, - nghttp2_data_chunk_t* chunk) {} + uv_buf_t* chunk) {} virtual void OnSettings(bool ack) {} virtual void OnPriority(int32_t id, int32_t parent, @@ -251,7 +250,7 @@ class Nghttp2Session { static inline int OnDataChunkReceived(nghttp2_session* session, uint8_t flags, int32_t id, - const uint8_t *data, + const uint8_t* data, size_t len, void* user_data); static inline ssize_t OnStreamReadFD(nghttp2_session* session, @@ -304,17 +303,13 @@ class Nghttp2Stream { int options = 0); inline ~Nghttp2Stream() { +#if defined(DEBUG) && DEBUG CHECK_EQ(session_, nullptr); - CHECK_EQ(queue_head_, nullptr); - CHECK_EQ(queue_tail_, nullptr); - CHECK_EQ(data_chunks_head_, nullptr); - CHECK_EQ(data_chunks_tail_, nullptr); - CHECK_EQ(current_headers_head_, nullptr); - CHECK_EQ(current_headers_tail_, nullptr); +#endif DEBUG_HTTP2("Nghttp2Stream %d: freed\n", id_); } - inline void FlushDataChunks(bool done = false); + inline void FlushDataChunks(); // Resets the state of the stream instance to defaults inline void ResetState( @@ -434,23 +429,22 @@ class Nghttp2Stream { return id_; } - inline nghttp2_header_list* headers() const { - return current_headers_head_; + inline std::queue* headers() { + return ¤t_headers_; } inline nghttp2_headers_category headers_category() const { return current_headers_category_; } - inline void FreeHeaders(); - void StartHeaders(nghttp2_headers_category category) { DEBUG_HTTP2("Nghttp2Stream %d: starting headers, category: %d\n", id_, category); // We shouldn't be in the middle of a headers block already. // Something bad happened if this fails - CHECK_EQ(current_headers_head_, nullptr); - CHECK_EQ(current_headers_tail_, nullptr); +#if defined(DEBUG) && DEBUG + CHECK(current_headers_.empty()); +#endif current_headers_category_ = category; } @@ -466,23 +460,20 @@ class Nghttp2Stream { // Outbound Data... This is the data written by the JS layer that is // waiting to be written out to the socket. - nghttp2_stream_write_queue* queue_head_ = nullptr; - nghttp2_stream_write_queue* queue_tail_ = nullptr; - unsigned int queue_head_index_ = 0; - size_t queue_head_offset_ = 0; + std::queue queue_; + unsigned int queue_index_ = 0; + size_t queue_offset_ = 0; int64_t fd_offset_ = 0; int64_t fd_length_ = -1; // The Current Headers block... As headers are received for this stream, // they are temporarily stored here until the OnFrameReceived is called // signalling the end of the HEADERS frame - nghttp2_header_list* current_headers_head_ = nullptr; - nghttp2_header_list* current_headers_tail_ = nullptr; nghttp2_headers_category current_headers_category_ = NGHTTP2_HCAT_HEADERS; + std::queue current_headers_; // Inbound Data... This is the data received via DATA frames for this stream. - nghttp2_data_chunk_t* data_chunks_head_ = nullptr; - nghttp2_data_chunk_t* data_chunks_tail_ = nullptr; + std::queue data_chunks_; // The RST_STREAM code used to close this stream int32_t code_ = NGHTTP2_NO_ERROR; @@ -498,13 +489,6 @@ class Nghttp2Stream { struct nghttp2_stream_write_t { void* data; int status; - Nghttp2Stream* handle; - nghttp2_stream_write_queue* item; -}; - -struct nghttp2_data_chunk_t { - uv_buf_t buf; - nghttp2_data_chunk_t* next = nullptr; }; } // namespace http2 diff --git a/test/parallel/test-http2-misbehaving-flow-control-paused.js b/test/parallel/test-http2-misbehaving-flow-control-paused.js new file mode 100644 index 00000000000..ee799b1d5a2 --- /dev/null +++ b/test/parallel/test-http2-misbehaving-flow-control-paused.js @@ -0,0 +1,89 @@ +'use strict'; + +const common = require('../common'); + +if (!common.hasCrypto) + common.skip('missing crypto'); + +const h2 = require('http2'); +const net = require('net'); + +const preamble = Buffer.from([ + 0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, + 0x32, 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, + 0x0d, 0x0a, 0x00, 0x00, 0x0c, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x00, 0x64, 0x00, 0x04, 0x00, 0x00, 0xff, + 0xff, 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, + 0x00, 0xc8, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, + 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x07, 0x00, + 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x00, 0x2c, 0x01, 0x24, 0x00, 0x00, 0x00, + 0x0d, 0x00, 0x00, 0x00, 0x0b, 0x0f, 0x83, 0x84, 0x86, 0x41, 0x8a, + 0xa0, 0xe4, 0x1d, 0x13, 0x9d, 0x09, 0xb8, 0xf0, 0x1e, 0x07, 0x53, + 0x03, 0x2a, 0x2f, 0x2a, 0x90, 0x7a, 0x8a, 0xaa, 0x69, 0xd2, 0x9a, + 0xc4, 0xc0, 0x57, 0x0b, 0xcb, 0x87, 0x0f, 0x0d, 0x83, 0x08, 0x00, + 0x0f +]); + +const data = Buffer.from([ + 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c, + 0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a +]); + +// This is testing the case of a misbehaving client that is not paying +// attention to flow control. The initial window size is set to data +// payload * 2, which in this case is 36, the stream is paused so +// WINDOW_UPDATE frames are not being sent, which means the window +// size is not being updated. A well behaved client is supposed to +// stop sending until the window size is expanded again. +// +// However, our malicious client keeps sending data beyond the flow +// control window! +// +// Bad client! Bad! +// +// Fortunately, nghttp2 handles this situation for us by keeping track +// of the flow control window and responding with a FLOW_CONTROL_ERROR +// causing the stream to get shut down... +// +// At least, that's what is supposed to happen. + +let client; + +const server = h2.createServer({ settings: { initialWindowSize: 36 } }); +server.on('stream', (stream) => { + + // Not reading causes the flow control window to get backed up. + stream.pause(); + + stream.on('error', common.mustCall((err) => { + common.expectsError({ + code: 'ERR_HTTP2_STREAM_ERROR', + type: Error, + message: 'Stream closed with error code 3' + })(err); + server.close(); + client.destroy(); + })); + + stream.on('end', common.mustNotCall()); + + stream.respond(); + stream.end('ok'); +}); + +server.listen(0, () => { + client = net.connect(server.address().port, () => { + client.on('error', console.log); + + client.write(preamble); + + client.write(data); + client.write(data); + client.write(data); + }); +}); diff --git a/test/parallel/test-http2-misbehaving-flow-control.js b/test/parallel/test-http2-misbehaving-flow-control.js new file mode 100644 index 00000000000..010e0774131 --- /dev/null +++ b/test/parallel/test-http2-misbehaving-flow-control.js @@ -0,0 +1,81 @@ +'use strict'; + +const common = require('../common'); + +if (!common.hasCrypto) + common.skip('missing crypto'); + +const h2 = require('http2'); +const net = require('net'); + +const preamble = Buffer.from([ + 0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, + 0x32, 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, + 0x0d, 0x0a, 0x00, 0x00, 0x0c, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x00, 0x64, 0x00, 0x04, 0x00, 0x00, 0xff, + 0xff, 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, + 0x00, 0xc8, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x05, + 0x00, 0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, + 0x00, 0x00, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x07, 0x00, + 0x00, 0x00, 0x05, 0x02, 0x00, 0x00, 0x00, 0x00, 0x0b, 0x00, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x00, 0x2c, 0x01, 0x24, 0x00, 0x00, 0x00, + 0x0d, 0x00, 0x00, 0x00, 0x0b, 0x0f, 0x83, 0x84, 0x86, 0x41, 0x8a, + 0xa0, 0xe4, 0x1d, 0x13, 0x9d, 0x09, 0xb8, 0xf0, 0x1e, 0x07, 0x53, + 0x03, 0x2a, 0x2f, 0x2a, 0x90, 0x7a, 0x8a, 0xaa, 0x69, 0xd2, 0x9a, + 0xc4, 0xc0, 0x57, 0x0b, 0xcb, 0x87, 0x0f, 0x0d, 0x83, 0x08, 0x00, + 0x0f +]); + +const data = Buffer.from([ + 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c, + 0x6c, 0x6f, 0x0a, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x0a +]); + +// This is testing the case of a misbehaving client that is not paying +// attention to flow control. The initial window size is set to data +// payload, which in this case is 18, the stream is set to flowing so +// WINDOW_UPDATE frames are being sent, but the client is writing +// faster than those can be read. +// +// Bad client! Bad! +// +// Fortunately, nghttp2 handles this situation for us by keeping track +// of the flow control window and responding with a FLOW_CONTROL_ERROR +// causing the stream to get shut down... +// +// At least, that's what is supposed to happen. + +let client; +const server = h2.createServer({ settings: { initialWindowSize: 18 } }); +server.on('stream', (stream) => { + + stream.resume(); + + stream.on('error', common.mustCall((err) => { + common.expectsError({ + code: 'ERR_HTTP2_STREAM_ERROR', + type: Error, + message: 'Stream closed with error code 3' + })(err); + server.close(); + client.destroy(); + })); + + stream.respond(); + stream.end('ok'); +}); + +server.listen(0, () => { + client = net.connect(server.address().port, () => { + client.on('error', console.log); + + client.write(preamble); + + client.write(data); + client.write(data); + client.write(data); + }); +});