src: give StreamBases the capability to ask for data
Add a `OnStreamWantsWrite()` event that allows streams to ask for more input data if they want some. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
c412150582
commit
f734b3eb04
@ -2219,6 +2219,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
|
|||||||
if (amount == 0 && stream->IsWritable()) {
|
if (amount == 0 && stream->IsWritable()) {
|
||||||
CHECK(stream->queue_.empty());
|
CHECK(stream->queue_.empty());
|
||||||
DEBUG_HTTP2SESSION2(session, "deferring stream %d", id);
|
DEBUG_HTTP2SESSION2(session, "deferring stream %d", id);
|
||||||
|
stream->EmitWantsWrite(length);
|
||||||
|
if (stream->available_outbound_length_ > 0 || !stream->IsWritable()) {
|
||||||
|
// EmitWantsWrite() did something interesting synchronously, restart:
|
||||||
|
return OnRead(handle, id, buf, length, flags, source, user_data);
|
||||||
|
}
|
||||||
return NGHTTP2_ERR_DEFERRED;
|
return NGHTTP2_ERR_DEFERRED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -573,6 +573,8 @@ class Http2Stream : public AsyncWrap,
|
|||||||
// Required for StreamBase
|
// Required for StreamBase
|
||||||
int DoShutdown(ShutdownWrap* req_wrap) override;
|
int DoShutdown(ShutdownWrap* req_wrap) override;
|
||||||
|
|
||||||
|
bool HasWantsWrite() const override { return true; }
|
||||||
|
|
||||||
// Initiate a response on this stream.
|
// Initiate a response on this stream.
|
||||||
inline int SubmitResponse(nghttp2_nv* nva,
|
inline int SubmitResponse(nghttp2_nv* nva,
|
||||||
size_t len,
|
size_t len,
|
||||||
|
@ -136,6 +136,13 @@ inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
|
|||||||
listener_->OnStreamAfterShutdown(w, status);
|
listener_->OnStreamAfterShutdown(w, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline void StreamResource::EmitWantsWrite(size_t suggested_size) {
|
||||||
|
#ifdef DEBUG
|
||||||
|
v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent());
|
||||||
|
#endif
|
||||||
|
listener_->OnStreamWantsWrite(suggested_size);
|
||||||
|
}
|
||||||
|
|
||||||
inline StreamBase::StreamBase(Environment* env) : env_(env) {
|
inline StreamBase::StreamBase(Environment* env) : env_(env) {
|
||||||
PushStreamListener(&default_listener_);
|
PushStreamListener(&default_listener_);
|
||||||
}
|
}
|
||||||
|
@ -131,6 +131,13 @@ class StreamListener {
|
|||||||
// (and raises an assertion if there is none).
|
// (and raises an assertion if there is none).
|
||||||
virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
|
virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
|
||||||
|
|
||||||
|
// This is called by the stream if it determines that it wants more data
|
||||||
|
// to be written to it. Not all streams support this.
|
||||||
|
// This callback will not be called as long as there are active writes.
|
||||||
|
// It is not supported by all streams; `stream->HasWantsWrite()` returns
|
||||||
|
// true if it is supported by a stream.
|
||||||
|
virtual void OnStreamWantsWrite(size_t suggested_size) {}
|
||||||
|
|
||||||
// This is called immediately before the stream is destroyed.
|
// This is called immediately before the stream is destroyed.
|
||||||
virtual void OnStreamDestroy() {}
|
virtual void OnStreamDestroy() {}
|
||||||
|
|
||||||
@ -199,6 +206,9 @@ class StreamResource {
|
|||||||
size_t count,
|
size_t count,
|
||||||
uv_stream_t* send_handle) = 0;
|
uv_stream_t* send_handle) = 0;
|
||||||
|
|
||||||
|
// Returns true if the stream supports the `OnStreamWantsWrite()` interface.
|
||||||
|
virtual bool HasWantsWrite() const { return false; }
|
||||||
|
|
||||||
// Optionally, this may provide an error message to be used for
|
// Optionally, this may provide an error message to be used for
|
||||||
// failing writes.
|
// failing writes.
|
||||||
virtual const char* Error() const;
|
virtual const char* Error() const;
|
||||||
@ -222,6 +232,8 @@ class StreamResource {
|
|||||||
void EmitAfterWrite(WriteWrap* w, int status);
|
void EmitAfterWrite(WriteWrap* w, int status);
|
||||||
// Call the current listener's OnStreamAfterShutdown() method.
|
// Call the current listener's OnStreamAfterShutdown() method.
|
||||||
void EmitAfterShutdown(ShutdownWrap* w, int status);
|
void EmitAfterShutdown(ShutdownWrap* w, int status);
|
||||||
|
// Call the current listener's OnStreamWantsWrite() method.
|
||||||
|
void EmitWantsWrite(size_t suggested_size);
|
||||||
|
|
||||||
StreamListener* listener_ = nullptr;
|
StreamListener* listener_ = nullptr;
|
||||||
uint64_t bytes_read_ = 0;
|
uint64_t bytes_read_ = 0;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user