src: introduce native-layer stream piping
Provide a way to create pipes between native `StreamBase` instances that acts more directly than a `.pipe()` call would. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
f7f1437d44
commit
67f1d76956
2
node.gyp
2
node.gyp
@ -338,6 +338,7 @@
|
||||
'src/string_decoder.cc',
|
||||
'src/string_search.cc',
|
||||
'src/stream_base.cc',
|
||||
'src/stream_pipe.cc',
|
||||
'src/stream_wrap.cc',
|
||||
'src/tcp_wrap.cc',
|
||||
'src/timer_wrap.cc',
|
||||
@ -394,6 +395,7 @@
|
||||
'src/string_decoder-inl.h',
|
||||
'src/stream_base.h',
|
||||
'src/stream_base-inl.h',
|
||||
'src/stream_pipe.h',
|
||||
'src/stream_wrap.h',
|
||||
'src/tracing/agent.h',
|
||||
'src/tracing/node_trace_buffer.h',
|
||||
|
@ -58,6 +58,7 @@ namespace node {
|
||||
V(SHUTDOWNWRAP) \
|
||||
V(SIGNALWRAP) \
|
||||
V(STATWATCHER) \
|
||||
V(STREAMPIPE) \
|
||||
V(TCPCONNECTWRAP) \
|
||||
V(TCPSERVERWRAP) \
|
||||
V(TCPWRAP) \
|
||||
|
@ -222,6 +222,7 @@ struct PackageConfig {
|
||||
V(onstop_string, "onstop") \
|
||||
V(onstreamclose_string, "onstreamclose") \
|
||||
V(ontrailers_string, "ontrailers") \
|
||||
V(onunpipe_string, "onunpipe") \
|
||||
V(onwrite_string, "onwrite") \
|
||||
V(openssl_error_stack, "opensslErrorStack") \
|
||||
V(output_string, "output") \
|
||||
@ -233,6 +234,8 @@ struct PackageConfig {
|
||||
V(pbkdf2_error_string, "PBKDF2 Error") \
|
||||
V(pid_string, "pid") \
|
||||
V(pipe_string, "pipe") \
|
||||
V(pipe_target_string, "pipeTarget") \
|
||||
V(pipe_source_string, "pipeSource") \
|
||||
V(port_string, "port") \
|
||||
V(preference_string, "preference") \
|
||||
V(priority_string, "priority") \
|
||||
@ -255,9 +258,11 @@ struct PackageConfig {
|
||||
V(session_id_string, "sessionId") \
|
||||
V(shell_string, "shell") \
|
||||
V(signal_string, "signal") \
|
||||
V(sink_string, "sink") \
|
||||
V(size_string, "size") \
|
||||
V(sni_context_err_string, "Invalid SNI context") \
|
||||
V(sni_context_string, "sni_context") \
|
||||
V(source_string, "source") \
|
||||
V(stack_string, "stack") \
|
||||
V(status_string, "status") \
|
||||
V(stdio_string, "stdio") \
|
||||
|
@ -1813,7 +1813,9 @@ inline void Http2Stream::Close(int32_t code) {
|
||||
}
|
||||
|
||||
int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
|
||||
CHECK(!this->IsDestroyed());
|
||||
if (IsDestroyed())
|
||||
return UV_EPIPE;
|
||||
|
||||
{
|
||||
Http2Scope h2scope(this);
|
||||
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
|
||||
|
@ -120,6 +120,7 @@ struct sockaddr;
|
||||
V(serdes) \
|
||||
V(signal_wrap) \
|
||||
V(spawn_sync) \
|
||||
V(stream_pipe) \
|
||||
V(stream_wrap) \
|
||||
V(string_decoder) \
|
||||
V(tcp_wrap) \
|
||||
|
@ -67,8 +67,14 @@ inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
|
||||
|
||||
inline StreamResource::~StreamResource() {
|
||||
while (listener_ != nullptr) {
|
||||
listener_->OnStreamDestroy();
|
||||
RemoveStreamListener(listener_);
|
||||
StreamListener* listener = listener_;
|
||||
listener->OnStreamDestroy();
|
||||
// Remove the listener if it didn’t remove itself. This makes the logic
|
||||
// logic in `OnStreamDestroy()` implementations easier, because they
|
||||
// may call generic cleanup functions which can just remove the
|
||||
// listener unconditionally.
|
||||
if (listener == listener_)
|
||||
RemoveStreamListener(listener_);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,6 +141,9 @@ class StreamListener {
|
||||
// This is called immediately before the stream is destroyed.
|
||||
virtual void OnStreamDestroy() {}
|
||||
|
||||
// The stream this is currently associated with, or nullptr if there is none.
|
||||
inline StreamResource* stream() { return stream_; }
|
||||
|
||||
protected:
|
||||
// Pass along a read error to the `StreamListener` instance that was active
|
||||
// before this one. For example, a protocol parser does not care about read
|
||||
|
266
src/stream_pipe.cc
Normal file
266
src/stream_pipe.cc
Normal file
@ -0,0 +1,266 @@
|
||||
#include "stream_pipe.h"
|
||||
#include "stream_base-inl.h"
|
||||
#include "node_buffer.h"
|
||||
#include "node_internals.h"
|
||||
|
||||
using v8::Context;
|
||||
using v8::External;
|
||||
using v8::FunctionCallbackInfo;
|
||||
using v8::FunctionTemplate;
|
||||
using v8::Local;
|
||||
using v8::Object;
|
||||
using v8::Value;
|
||||
|
||||
namespace node {
|
||||
|
||||
StreamPipe::StreamPipe(StreamBase* source,
|
||||
StreamBase* sink,
|
||||
Local<Object> obj)
|
||||
: AsyncWrap(source->stream_env(), obj, AsyncWrap::PROVIDER_STREAMPIPE) {
|
||||
MakeWeak(this);
|
||||
|
||||
CHECK_NE(sink, nullptr);
|
||||
CHECK_NE(source, nullptr);
|
||||
|
||||
source->PushStreamListener(&readable_listener_);
|
||||
sink->PushStreamListener(&writable_listener_);
|
||||
|
||||
CHECK(sink->HasWantsWrite());
|
||||
|
||||
// Set up links between this object and the source/sink objects.
|
||||
// In particular, this makes sure that they are garbage collected as a group,
|
||||
// if that applies to the given streams (for example, Http2Streams use
|
||||
// weak references).
|
||||
obj->Set(env()->context(), env()->source_string(), source->GetObject())
|
||||
.FromJust();
|
||||
source->GetObject()->Set(env()->context(), env()->pipe_target_string(), obj)
|
||||
.FromJust();
|
||||
obj->Set(env()->context(), env()->sink_string(), sink->GetObject())
|
||||
.FromJust();
|
||||
sink->GetObject()->Set(env()->context(), env()->pipe_source_string(), obj)
|
||||
.FromJust();
|
||||
}
|
||||
|
||||
StreamPipe::~StreamPipe() {
|
||||
CHECK(is_closed_);
|
||||
}
|
||||
|
||||
StreamBase* StreamPipe::source() {
|
||||
return static_cast<StreamBase*>(readable_listener_.stream());
|
||||
}
|
||||
|
||||
StreamBase* StreamPipe::sink() {
|
||||
return static_cast<StreamBase*>(writable_listener_.stream());
|
||||
}
|
||||
|
||||
void StreamPipe::Unpipe() {
|
||||
if (is_closed_)
|
||||
return;
|
||||
|
||||
// Note that we cannot use virtual methods on `source` and `sink` here,
|
||||
// because this function can be called from their destructors via
|
||||
// `OnStreamDestroy()`.
|
||||
|
||||
is_closed_ = true;
|
||||
is_reading_ = false;
|
||||
source()->RemoveStreamListener(&readable_listener_);
|
||||
sink()->RemoveStreamListener(&writable_listener_);
|
||||
|
||||
// Delay the JS-facing part with SetImmediate, because this might be from
|
||||
// inside the garbage collector, so we can’t run JS here.
|
||||
HandleScope handle_scope(env()->isolate());
|
||||
env()->SetImmediate([](Environment* env, void* data) {
|
||||
StreamPipe* pipe = static_cast<StreamPipe*>(data);
|
||||
|
||||
HandleScope handle_scope(env->isolate());
|
||||
Context::Scope context_scope(env->context());
|
||||
Local<Object> object = pipe->object();
|
||||
|
||||
if (object->Has(env->context(), env->onunpipe_string()).FromJust()) {
|
||||
pipe->MakeCallback(env->onunpipe_string(), 0, nullptr).ToLocalChecked();
|
||||
}
|
||||
|
||||
// Set all the links established in the constructor to `null`.
|
||||
Local<Value> null = Null(env->isolate());
|
||||
|
||||
Local<Value> source_v;
|
||||
Local<Value> sink_v;
|
||||
source_v = object->Get(env->context(), env->source_string())
|
||||
.ToLocalChecked();
|
||||
sink_v = object->Get(env->context(), env->sink_string())
|
||||
.ToLocalChecked();
|
||||
CHECK(source_v->IsObject());
|
||||
CHECK(sink_v->IsObject());
|
||||
|
||||
object->Set(env->context(), env->source_string(), null).FromJust();
|
||||
object->Set(env->context(), env->sink_string(), null).FromJust();
|
||||
source_v.As<Object>()->Set(env->context(),
|
||||
env->pipe_target_string(),
|
||||
null).FromJust();
|
||||
sink_v.As<Object>()->Set(env->context(),
|
||||
env->pipe_source_string(),
|
||||
null).FromJust();
|
||||
}, static_cast<void*>(this), object());
|
||||
}
|
||||
|
||||
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
|
||||
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
|
||||
size_t size = std::min(suggested_size, pipe->wanted_data_);
|
||||
CHECK_GT(size, 0);
|
||||
return uv_buf_init(Malloc(size), size);
|
||||
}
|
||||
|
||||
void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
|
||||
const uv_buf_t& buf) {
|
||||
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
|
||||
AsyncScope async_scope(pipe);
|
||||
if (nread < 0) {
|
||||
// EOF or error; stop reading and pass the error to the previous listener
|
||||
// (which might end up in JS).
|
||||
free(buf.base);
|
||||
pipe->is_eof_ = true;
|
||||
stream()->ReadStop();
|
||||
CHECK_NE(previous_listener_, nullptr);
|
||||
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
|
||||
// If we’re not writing, close now. Otherwise, we’ll do that in
|
||||
// `OnStreamAfterWrite()`.
|
||||
if (!pipe->is_writing_) {
|
||||
pipe->ShutdownWritable();
|
||||
pipe->Unpipe();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
pipe->ProcessData(nread, buf);
|
||||
}
|
||||
|
||||
void StreamPipe::ProcessData(size_t nread, const uv_buf_t& buf) {
|
||||
uv_buf_t buffer = uv_buf_init(buf.base, nread);
|
||||
StreamWriteResult res = sink()->Write(&buffer, 1);
|
||||
if (!res.async) {
|
||||
free(buf.base);
|
||||
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
|
||||
} else {
|
||||
is_writing_ = true;
|
||||
is_reading_ = false;
|
||||
res.wrap->SetAllocatedStorage(buf.base, buf.len);
|
||||
source()->ReadStop();
|
||||
}
|
||||
}
|
||||
|
||||
void StreamPipe::ShutdownWritable() {
|
||||
sink()->Shutdown();
|
||||
}
|
||||
|
||||
void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
|
||||
int status) {
|
||||
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
|
||||
pipe->is_writing_ = false;
|
||||
if (pipe->is_eof_) {
|
||||
AsyncScope async_scope(pipe);
|
||||
pipe->ShutdownWritable();
|
||||
pipe->Unpipe();
|
||||
return;
|
||||
}
|
||||
|
||||
if (status != 0) {
|
||||
CHECK_NE(previous_listener_, nullptr);
|
||||
StreamListener* prev = previous_listener_;
|
||||
pipe->Unpipe();
|
||||
prev->OnStreamAfterWrite(w, status);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w,
|
||||
int status) {
|
||||
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
|
||||
CHECK_NE(previous_listener_, nullptr);
|
||||
StreamListener* prev = previous_listener_;
|
||||
pipe->Unpipe();
|
||||
prev->OnStreamAfterShutdown(w, status);
|
||||
}
|
||||
|
||||
void StreamPipe::ReadableListener::OnStreamDestroy() {
|
||||
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
|
||||
if (!pipe->is_eof_) {
|
||||
OnStreamRead(UV_EPIPE, uv_buf_init(nullptr, 0));
|
||||
}
|
||||
}
|
||||
|
||||
void StreamPipe::WritableListener::OnStreamDestroy() {
|
||||
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
|
||||
pipe->is_eof_ = true;
|
||||
pipe->Unpipe();
|
||||
}
|
||||
|
||||
void StreamPipe::WritableListener::OnStreamWantsWrite(size_t suggested_size) {
|
||||
StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this);
|
||||
pipe->wanted_data_ = suggested_size;
|
||||
if (pipe->is_reading_ || pipe->is_closed_)
|
||||
return;
|
||||
AsyncScope async_scope(pipe);
|
||||
pipe->is_reading_ = true;
|
||||
pipe->source()->ReadStart();
|
||||
}
|
||||
|
||||
uv_buf_t StreamPipe::WritableListener::OnStreamAlloc(size_t suggested_size) {
|
||||
CHECK_NE(previous_listener_, nullptr);
|
||||
return previous_listener_->OnStreamAlloc(suggested_size);
|
||||
}
|
||||
|
||||
void StreamPipe::WritableListener::OnStreamRead(ssize_t nread,
|
||||
const uv_buf_t& buf) {
|
||||
CHECK_NE(previous_listener_, nullptr);
|
||||
return previous_listener_->OnStreamRead(nread, buf);
|
||||
}
|
||||
|
||||
void StreamPipe::New(const FunctionCallbackInfo<Value>& args) {
|
||||
CHECK(args.IsConstructCall());
|
||||
CHECK(args[0]->IsExternal());
|
||||
CHECK(args[1]->IsExternal());
|
||||
auto source = static_cast<StreamBase*>(args[0].As<External>()->Value());
|
||||
auto sink = static_cast<StreamBase*>(args[1].As<External>()->Value());
|
||||
|
||||
new StreamPipe(source, sink, args.This());
|
||||
}
|
||||
|
||||
void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
|
||||
StreamPipe* pipe;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
|
||||
pipe->is_closed_ = false;
|
||||
if (pipe->wanted_data_ > 0)
|
||||
pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_);
|
||||
}
|
||||
|
||||
void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
|
||||
StreamPipe* pipe;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder());
|
||||
pipe->Unpipe();
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
void InitializeStreamPipe(Local<Object> target,
|
||||
Local<Value> unused,
|
||||
Local<Context> context) {
|
||||
Environment* env = Environment::GetCurrent(context);
|
||||
|
||||
// Create FunctionTemplate for FileHandle::CloseReq
|
||||
Local<FunctionTemplate> pipe = env->NewFunctionTemplate(StreamPipe::New);
|
||||
Local<String> stream_pipe_string =
|
||||
FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe");
|
||||
env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe);
|
||||
env->SetProtoMethod(pipe, "start", StreamPipe::Start);
|
||||
AsyncWrap::AddWrapMethods(env, pipe);
|
||||
pipe->SetClassName(stream_pipe_string);
|
||||
pipe->InstanceTemplate()->SetInternalFieldCount(1);
|
||||
target->Set(context, stream_pipe_string, pipe->GetFunction()).FromJust();
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
} // namespace node
|
||||
|
||||
NODE_MODULE_CONTEXT_AWARE_INTERNAL(stream_pipe,
|
||||
node::InitializeStreamPipe)
|
68
src/stream_pipe.h
Normal file
68
src/stream_pipe.h
Normal file
@ -0,0 +1,68 @@
|
||||
#ifndef SRC_STREAM_PIPE_H_
|
||||
#define SRC_STREAM_PIPE_H_
|
||||
|
||||
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
|
||||
|
||||
#include "stream_base.h"
|
||||
|
||||
namespace node {
|
||||
|
||||
class StreamPipe : public AsyncWrap {
|
||||
public:
|
||||
StreamPipe(StreamBase* source, StreamBase* sink, v8::Local<v8::Object> obj);
|
||||
~StreamPipe();
|
||||
|
||||
void Unpipe();
|
||||
|
||||
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
static void Unpipe(const v8::FunctionCallbackInfo<v8::Value>& args);
|
||||
|
||||
size_t self_size() const override { return sizeof(*this); }
|
||||
|
||||
private:
|
||||
StreamBase* source();
|
||||
StreamBase* sink();
|
||||
|
||||
void ShutdownWritable();
|
||||
void FlushToWritable();
|
||||
|
||||
bool is_reading_ = false;
|
||||
bool is_writing_ = false;
|
||||
bool is_eof_ = false;
|
||||
bool is_closed_ = true;
|
||||
|
||||
// Set a default value so that when we’re coming from Start(), we know
|
||||
// that we don’t want to read just yet.
|
||||
// This will likely need to be changed when supporting streams without
|
||||
// `OnStreamWantsWrite()` support.
|
||||
size_t wanted_data_ = 0;
|
||||
|
||||
void ProcessData(size_t nread, const uv_buf_t& buf);
|
||||
|
||||
class ReadableListener : public StreamListener {
|
||||
public:
|
||||
uv_buf_t OnStreamAlloc(size_t suggested_size) override;
|
||||
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
|
||||
void OnStreamDestroy() override;
|
||||
};
|
||||
|
||||
class WritableListener : public StreamListener {
|
||||
public:
|
||||
uv_buf_t OnStreamAlloc(size_t suggested_size) override;
|
||||
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
|
||||
void OnStreamAfterWrite(WriteWrap* w, int status) override;
|
||||
void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
|
||||
void OnStreamWantsWrite(size_t suggested_size) override;
|
||||
void OnStreamDestroy() override;
|
||||
};
|
||||
|
||||
ReadableListener readable_listener_;
|
||||
WritableListener writable_listener_;
|
||||
};
|
||||
|
||||
} // namespace node
|
||||
|
||||
#endif
|
||||
|
||||
#endif // SRC_STREAM_PIPE_H_
|
@ -35,6 +35,7 @@ common.crashOnUnhandledRejection();
|
||||
delete providers.HTTP2STREAM;
|
||||
delete providers.HTTP2PING;
|
||||
delete providers.HTTP2SETTINGS;
|
||||
delete providers.STREAMPIPE;
|
||||
|
||||
const objKeys = Object.keys(providers);
|
||||
if (objKeys.length > 0)
|
||||
|
Loading…
x
Reference in New Issue
Block a user