diff --git a/src/node_http2.cc b/src/node_http2.cc index 939e0011bdf..8dd222a692a 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -2219,6 +2219,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle, if (amount == 0 && stream->IsWritable()) { CHECK(stream->queue_.empty()); DEBUG_HTTP2SESSION2(session, "deferring stream %d", id); + stream->EmitWantsWrite(length); + if (stream->available_outbound_length_ > 0 || !stream->IsWritable()) { + // EmitWantsWrite() did something interesting synchronously, restart: + return OnRead(handle, id, buf, length, flags, source, user_data); + } return NGHTTP2_ERR_DEFERRED; } diff --git a/src/node_http2.h b/src/node_http2.h index 08109dcf046..2d55989fd7d 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -573,6 +573,8 @@ class Http2Stream : public AsyncWrap, // Required for StreamBase int DoShutdown(ShutdownWrap* req_wrap) override; + bool HasWantsWrite() const override { return true; } + // Initiate a response on this stream. inline int SubmitResponse(nghttp2_nv* nva, size_t len, diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index b7495a80ac6..f0d522a7b06 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -136,6 +136,13 @@ inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { listener_->OnStreamAfterShutdown(w, status); } +inline void StreamResource::EmitWantsWrite(size_t suggested_size) { +#ifdef DEBUG + v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent()); +#endif + listener_->OnStreamWantsWrite(suggested_size); +} + inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); } diff --git a/src/stream_base.h b/src/stream_base.h index f3e010d5bc9..96a7787e5bb 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -131,6 +131,13 @@ class StreamListener { // (and raises an assertion if there is none). virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status); + // This is called by the stream if it determines that it wants more data + // to be written to it. Not all streams support this. + // This callback will not be called as long as there are active writes. + // It is not supported by all streams; `stream->HasWantsWrite()` returns + // true if it is supported by a stream. + virtual void OnStreamWantsWrite(size_t suggested_size) {} + // This is called immediately before the stream is destroyed. virtual void OnStreamDestroy() {} @@ -199,6 +206,9 @@ class StreamResource { size_t count, uv_stream_t* send_handle) = 0; + // Returns true if the stream supports the `OnStreamWantsWrite()` interface. + virtual bool HasWantsWrite() const { return false; } + // Optionally, this may provide an error message to be used for // failing writes. virtual const char* Error() const; @@ -222,6 +232,8 @@ class StreamResource { void EmitAfterWrite(WriteWrap* w, int status); // Call the current listener's OnStreamAfterShutdown() method. void EmitAfterShutdown(ShutdownWrap* w, int status); + // Call the current listener's OnStreamWantsWrite() method. + void EmitWantsWrite(size_t suggested_size); StreamListener* listener_ = nullptr; uint64_t bytes_read_ = 0;