http2: only schedule write when necessary
Introduce an `Http2Scope` class that, when it goes out of scope, checks whether a write to the network is desired by nghttp2. If that is the case, schedule a write using `SetImmediate()` rather than a custom per-session libuv handle. PR-URL: https://github.com/nodejs/node/pull/17183 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
This commit is contained in:
parent
10d86d2284
commit
06e1b03861
@ -27,6 +27,26 @@ const Http2Session::Callbacks Http2Session::callback_struct_saved[2] = {
|
||||
Callbacks(false),
|
||||
Callbacks(true)};
|
||||
|
||||
Http2Scope::Http2Scope(Http2Stream* stream) : Http2Scope(stream->session()) {}
|
||||
|
||||
Http2Scope::Http2Scope(Http2Session* session) {
|
||||
if (session->flags_ & (SESSION_STATE_HAS_SCOPE |
|
||||
SESSION_STATE_WRITE_SCHEDULED)) {
|
||||
// There is another scope further below on the stack, or it is already
|
||||
// known that a write is scheduled. In either case, there is nothing to do.
|
||||
return;
|
||||
}
|
||||
session->flags_ |= SESSION_STATE_HAS_SCOPE;
|
||||
session_ = session;
|
||||
}
|
||||
|
||||
Http2Scope::~Http2Scope() {
|
||||
if (session_ == nullptr)
|
||||
return;
|
||||
|
||||
session_->flags_ &= ~SESSION_STATE_HAS_SCOPE;
|
||||
session_->MaybeScheduleWrite();
|
||||
}
|
||||
|
||||
Http2Options::Http2Options(Environment* env) {
|
||||
nghttp2_option_new(&options_);
|
||||
@ -346,8 +366,6 @@ Http2Session::Http2Session(Environment* env,
|
||||
// be catching before it gets this far. Either way, crash if this
|
||||
// fails.
|
||||
CHECK_EQ(fn(&session_, callbacks, this, *opts), 0);
|
||||
|
||||
Start();
|
||||
}
|
||||
|
||||
|
||||
@ -356,40 +374,6 @@ Http2Session::~Http2Session() {
|
||||
Close();
|
||||
}
|
||||
|
||||
// For every node::Http2Session instance, there is a uv_prepare_t handle
|
||||
// whose callback is triggered on every tick of the event loop. When
|
||||
// run, nghttp2 is prompted to send any queued data it may have stored.
|
||||
// TODO(jasnell): Currently, this creates one uv_prepare_t per Http2Session,
|
||||
// we should investigate to see if it's faster to create a
|
||||
// single uv_prepare_t for all Http2Sessions, then iterate
|
||||
// over each.
|
||||
void Http2Session::Start() {
|
||||
prep_ = new uv_prepare_t();
|
||||
uv_prepare_init(env()->event_loop(), prep_);
|
||||
prep_->data = static_cast<void*>(this);
|
||||
uv_prepare_start(prep_, [](uv_prepare_t* t) {
|
||||
Http2Session* session = static_cast<Http2Session*>(t->data);
|
||||
HandleScope scope(session->env()->isolate());
|
||||
Context::Scope context_scope(session->env()->context());
|
||||
|
||||
// Sending data may call arbitrary JS code, so keep track of
|
||||
// async context.
|
||||
InternalCallbackScope callback_scope(session);
|
||||
session->SendPendingData();
|
||||
});
|
||||
}
|
||||
|
||||
// Stop the uv_prep_t from further activity, destroy the handle
|
||||
void Http2Session::Stop() {
|
||||
DEBUG_HTTP2SESSION(this, "stopping uv_prep_t handle");
|
||||
CHECK_EQ(uv_prepare_stop(prep_), 0);
|
||||
auto prep_close = [](uv_handle_t* handle) {
|
||||
delete reinterpret_cast<uv_prepare_t*>(handle);
|
||||
};
|
||||
uv_close(reinterpret_cast<uv_handle_t*>(prep_), prep_close);
|
||||
prep_ = nullptr;
|
||||
}
|
||||
|
||||
|
||||
void Http2Session::Close() {
|
||||
DEBUG_HTTP2SESSION(this, "closing session");
|
||||
@ -412,8 +396,6 @@ void Http2Session::Close() {
|
||||
static_cast<Http2Session::Http2Ping*>(data)->Done(false);
|
||||
}, static_cast<void*>(ping));
|
||||
}
|
||||
|
||||
Stop();
|
||||
}
|
||||
|
||||
|
||||
@ -484,6 +466,7 @@ inline void Http2Session::SubmitShutdownNotice() {
|
||||
inline void Http2Session::Settings(const nghttp2_settings_entry iv[],
|
||||
size_t niv) {
|
||||
DEBUG_HTTP2SESSION2(this, "submitting %d settings", niv);
|
||||
Http2Scope h2scope(this);
|
||||
// This will fail either if the system is out of memory, or if the settings
|
||||
// values are not within the appropriate range. We should be catching the
|
||||
// latter before it gets this far so crash in either case.
|
||||
@ -736,7 +719,8 @@ Http2Stream::SubmitTrailers::SubmitTrailers(
|
||||
|
||||
|
||||
inline void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers,
|
||||
size_t length) const {
|
||||
size_t length) const {
|
||||
Http2Scope h2scope(session_);
|
||||
if (length == 0)
|
||||
return;
|
||||
DEBUG_HTTP2SESSION2(session_, "sending trailers for stream %d, count: %d",
|
||||
@ -891,14 +875,37 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
|
||||
MakeCallback(env()->onsettings_string(), arraysize(argv), argv);
|
||||
}
|
||||
|
||||
void Http2Session::MaybeScheduleWrite() {
|
||||
CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0);
|
||||
if (session_ != nullptr && nghttp2_session_want_write(session_)) {
|
||||
flags_ |= SESSION_STATE_WRITE_SCHEDULED;
|
||||
env()->SetImmediate([](Environment* env, void* data) {
|
||||
Http2Session* session = static_cast<Http2Session*>(data);
|
||||
if (session->session_ == nullptr ||
|
||||
!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
|
||||
// This can happen e.g. when a stream was reset before this turn
|
||||
// of the event loop, in which case SendPendingData() is called early,
|
||||
// or the session was destroyed in the meantime.
|
||||
return;
|
||||
}
|
||||
|
||||
inline void Http2Session::SendPendingData() {
|
||||
// Sending data may call arbitrary JS code, so keep track of
|
||||
// async context.
|
||||
InternalCallbackScope callback_scope(session);
|
||||
session->SendPendingData();
|
||||
}, static_cast<void*>(this), object());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Http2Session::SendPendingData() {
|
||||
DEBUG_HTTP2SESSION(this, "sending pending data");
|
||||
// Do not attempt to send data on the socket if the destroying flag has
|
||||
// been set. That means everything is shutting down and the socket
|
||||
// will not be usable.
|
||||
if (IsDestroying())
|
||||
return;
|
||||
flags_ &= ~SESSION_STATE_WRITE_SCHEDULED;
|
||||
|
||||
WriteWrap* req = nullptr;
|
||||
char* dest = nullptr;
|
||||
@ -963,6 +970,7 @@ inline Http2Stream* Http2Session::SubmitRequest(
|
||||
int32_t* ret,
|
||||
int options) {
|
||||
DEBUG_HTTP2SESSION(this, "submitting request");
|
||||
Http2Scope h2scope(this);
|
||||
Http2Stream* stream = nullptr;
|
||||
Http2Stream::Provider::Stream prov(options);
|
||||
*ret = nghttp2_submit_request(session_, prispec, nva, len, *prov, nullptr);
|
||||
@ -1022,6 +1030,7 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
|
||||
uv_handle_type pending,
|
||||
void* ctx) {
|
||||
Http2Session* session = static_cast<Http2Session*>(ctx);
|
||||
Http2Scope h2scope(session);
|
||||
if (nread < 0) {
|
||||
uv_buf_t tmp_buf;
|
||||
tmp_buf.base = nullptr;
|
||||
@ -1187,6 +1196,7 @@ inline void Http2Stream::Close(int32_t code) {
|
||||
|
||||
|
||||
inline void Http2Stream::Shutdown() {
|
||||
Http2Scope h2scope(this);
|
||||
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
|
||||
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
|
||||
NGHTTP2_ERR_NOMEM);
|
||||
@ -1201,6 +1211,7 @@ int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
|
||||
}
|
||||
|
||||
inline void Http2Stream::Destroy() {
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM(this, "destroying stream");
|
||||
// Do nothing if this stream instance is already destroyed
|
||||
if (IsDestroyed())
|
||||
@ -1252,6 +1263,7 @@ void Http2Stream::OnDataChunk(
|
||||
|
||||
|
||||
inline void Http2Stream::FlushDataChunks() {
|
||||
Http2Scope h2scope(this);
|
||||
if (!data_chunks_.empty()) {
|
||||
uv_buf_t buf = data_chunks_.front();
|
||||
data_chunks_.pop();
|
||||
@ -1269,6 +1281,7 @@ inline void Http2Stream::FlushDataChunks() {
|
||||
inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
|
||||
size_t len,
|
||||
int options) {
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM(this, "submitting response");
|
||||
if (options & STREAM_OPTION_GET_TRAILERS)
|
||||
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
|
||||
@ -1289,6 +1302,7 @@ inline int Http2Stream::SubmitFile(int fd,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int options) {
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM(this, "submitting file");
|
||||
if (options & STREAM_OPTION_GET_TRAILERS)
|
||||
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
|
||||
@ -1305,6 +1319,7 @@ inline int Http2Stream::SubmitFile(int fd,
|
||||
|
||||
// Submit informational headers for a stream.
|
||||
inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM2(this, "sending %d informational headers", len);
|
||||
int ret = nghttp2_submit_headers(session_->session(),
|
||||
NGHTTP2_FLAG_NONE,
|
||||
@ -1317,6 +1332,7 @@ inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
|
||||
|
||||
inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
|
||||
bool silent) {
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM(this, "sending priority spec");
|
||||
int ret = silent ?
|
||||
nghttp2_session_change_stream_priority(session_->session(),
|
||||
@ -1330,6 +1346,7 @@ inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
|
||||
|
||||
|
||||
inline int Http2Stream::SubmitRstStream(const uint32_t code) {
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM2(this, "sending rst-stream with code %d", code);
|
||||
session_->SendPendingData();
|
||||
CHECK_EQ(nghttp2_submit_rst_stream(session_->session(),
|
||||
@ -1345,6 +1362,7 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
|
||||
size_t len,
|
||||
int32_t* ret,
|
||||
int options) {
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM(this, "sending push promise");
|
||||
*ret = nghttp2_submit_push_promise(session_->session(), NGHTTP2_FLAG_NONE,
|
||||
id_, nva, len, nullptr);
|
||||
@ -1384,6 +1402,7 @@ inline int Http2Stream::Write(nghttp2_stream_write_t* req,
|
||||
const uv_buf_t bufs[],
|
||||
unsigned int nbufs,
|
||||
nghttp2_stream_write_cb cb) {
|
||||
Http2Scope h2scope(this);
|
||||
if (!IsWritable()) {
|
||||
if (cb != nullptr)
|
||||
cb(req, UV_EOF);
|
||||
@ -1767,6 +1786,7 @@ void Http2Session::Goaway(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
Local<Context> context = env->context();
|
||||
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
|
||||
Http2Scope h2scope(session);
|
||||
|
||||
uint32_t errorCode = args[0]->Uint32Value(context).ToChecked();
|
||||
int32_t lastStreamID = args[1]->Int32Value(context).ToChecked();
|
||||
@ -2042,6 +2062,7 @@ void Http2Session::Http2Ping::Send(uint8_t* payload) {
|
||||
memcpy(&data, &startTime_, arraysize(data));
|
||||
payload = data;
|
||||
}
|
||||
Http2Scope h2scope(session_);
|
||||
CHECK_EQ(nghttp2_submit_ping(**session_, NGHTTP2_FLAG_NONE, payload), 0);
|
||||
}
|
||||
|
||||
|
@ -417,7 +417,9 @@ const char* nghttp2_errname(int rv) {
|
||||
|
||||
enum session_state_flags {
|
||||
SESSION_STATE_NONE = 0x0,
|
||||
SESSION_STATE_DESTROYING = 0x1
|
||||
SESSION_STATE_DESTROYING = 0x1,
|
||||
SESSION_STATE_HAS_SCOPE = 0x2,
|
||||
SESSION_STATE_WRITE_SCHEDULED = 0x4
|
||||
};
|
||||
|
||||
// This allows for 4 default-sized frames with their frame headers
|
||||
@ -429,6 +431,19 @@ typedef uint32_t(*get_setting)(nghttp2_session* session,
|
||||
class Http2Session;
|
||||
class Http2Stream;
|
||||
|
||||
// This scope should be present when any call into nghttp2 that may schedule
|
||||
// data to be written to the underlying transport is made, and schedules
|
||||
// such a write automatically once the scope is exited.
|
||||
class Http2Scope {
|
||||
public:
|
||||
explicit Http2Scope(Http2Stream* stream);
|
||||
explicit Http2Scope(Http2Session* session);
|
||||
~Http2Scope();
|
||||
|
||||
private:
|
||||
Http2Session* session_ = nullptr;
|
||||
};
|
||||
|
||||
// The Http2Options class is used to parse the options object passed in to
|
||||
// a Http2Session object and convert those into an appropriate nghttp2_option
|
||||
// struct. This is the primary mechanism by which the Http2Session object is
|
||||
@ -816,6 +831,9 @@ class Http2Session : public AsyncWrap {
|
||||
inline void MarkDestroying() { flags_ |= SESSION_STATE_DESTROYING; }
|
||||
inline bool IsDestroying() { return flags_ & SESSION_STATE_DESTROYING; }
|
||||
|
||||
// Schedule a write if nghttp2 indicates it wants to write to the socket.
|
||||
void MaybeScheduleWrite();
|
||||
|
||||
// Returns pointer to the stream, or nullptr if stream does not exist
|
||||
inline Http2Stream* FindStream(int32_t id);
|
||||
|
||||
@ -1005,6 +1023,8 @@ class Http2Session : public AsyncWrap {
|
||||
|
||||
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
|
||||
std::queue<Http2Ping*> outstanding_pings_;
|
||||
|
||||
friend class Http2Scope;
|
||||
};
|
||||
|
||||
class Http2Session::Http2Ping : public AsyncWrap {
|
||||
|
32
test/parallel/test-http2-session-gc-while-write-scheduled.js
Normal file
32
test/parallel/test-http2-session-gc-while-write-scheduled.js
Normal file
@ -0,0 +1,32 @@
|
||||
// Flags: --expose-gc
|
||||
|
||||
'use strict';
|
||||
const common = require('../common');
|
||||
if (!common.hasCrypto)
|
||||
common.skip('missing crypto');
|
||||
const http2 = require('http2');
|
||||
const makeDuplexPair = require('../common/duplexpair');
|
||||
|
||||
// This tests that running garbage collection while an Http2Session has
|
||||
// a write *scheduled*, it will survive that garbage collection.
|
||||
|
||||
{
|
||||
// This creates a session and schedules a write (for the settings frame).
|
||||
let client = http2.connect('http://localhost:80', {
|
||||
createConnection: common.mustCall(() => makeDuplexPair().clientSide)
|
||||
});
|
||||
|
||||
// First, wait for any nextTicks() and their responses
|
||||
// from the `connect()` call to run.
|
||||
tick(10, () => {
|
||||
// This schedules a write.
|
||||
client.settings(http2.getDefaultSettings());
|
||||
client = null;
|
||||
global.gc();
|
||||
});
|
||||
}
|
||||
|
||||
function tick(n, cb) {
|
||||
if (n--) setImmediate(tick, n, cb);
|
||||
else cb();
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user