src: refactor WriteWrap and ShutdownWraps

Encapsulate stream requests more:

- `WriteWrap` and `ShutdownWrap` classes are now tailored to the
  streams on which they are used. In particular, for most streams
  these are now plain `AsyncWrap`s and do not carry the overhead
  of unused libuv request data.
- Provide generic `Write()` and `Shutdown()` methods that wrap
  around the actual implementations, and make *usage* of streams
  easier, rather than implementing; for example, wrap objects
  don’t need to be provided by callers anymore.
- Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to
  call the corresponding JS handlers, rather than always trying
  to call them. This makes usage of streams by other C++ code
  easier and leaner.

Also fix up some tests that were previously not actually testing
asynchronicity when the comments indicated that they would.

PR-URL: https://github.com/nodejs/node/pull/18676
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2018-02-08 04:59:10 +01:00
parent 0ed9ea861b
commit 0e7b61229a
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
20 changed files with 554 additions and 425 deletions

View File

@ -118,7 +118,7 @@ function client(type, len) {
fail(err, 'write'); fail(err, 'write');
} }
function afterWrite(err, handle, req) { function afterWrite(err, handle) {
if (err) if (err)
fail(err, 'write'); fail(err, 'write');

View File

@ -51,7 +51,7 @@ function main({ dur, len, type }) {
if (err) if (err)
fail(err, 'write'); fail(err, 'write');
writeReq.oncomplete = function(status, handle, req, err) { writeReq.oncomplete = function(status, handle, err) {
if (err) if (err)
fail(err, 'write'); fail(err, 'write');
}; };
@ -130,7 +130,7 @@ function main({ dur, len, type }) {
fail(err, 'write'); fail(err, 'write');
} }
function afterWrite(err, handle, req) { function afterWrite(err, handle) {
if (err) if (err)
fail(err, 'write'); fail(err, 'write');

View File

@ -74,14 +74,14 @@ function main({ dur, len, type }) {
fail(err, 'write'); fail(err, 'write');
} else if (!writeReq.async) { } else if (!writeReq.async) {
process.nextTick(function() { process.nextTick(function() {
afterWrite(null, clientHandle, writeReq); afterWrite(0, clientHandle);
}); });
} }
} }
function afterWrite(status, handle, req, err) { function afterWrite(status, handle) {
if (err) if (status)
fail(err, 'write'); fail(status, 'write');
while (clientHandle.writeQueueSize === 0) while (clientHandle.writeQueueSize === 0)
write(); write();

View File

@ -1399,20 +1399,19 @@ function trackWriteState(stream, bytes) {
session[kHandle].chunksSentSinceLastWrite = 0; session[kHandle].chunksSentSinceLastWrite = 0;
} }
function afterDoStreamWrite(status, handle, req) { function afterDoStreamWrite(status, handle) {
const stream = handle[kOwner]; const stream = handle[kOwner];
const session = stream[kSession]; const session = stream[kSession];
stream[kUpdateTimer](); stream[kUpdateTimer]();
const { bytes } = req; const { bytes } = this;
stream[kState].writeQueueSize -= bytes; stream[kState].writeQueueSize -= bytes;
if (session !== undefined) if (session !== undefined)
session[kState].writeQueueSize -= bytes; session[kState].writeQueueSize -= bytes;
if (typeof req.callback === 'function') if (typeof this.callback === 'function')
req.callback(null); this.callback(null);
req.handle = undefined;
} }
function streamOnResume() { function streamOnResume() {

View File

@ -115,9 +115,9 @@ class JSStreamWrap extends Socket {
const handle = this._handle; const handle = this._handle;
this.stream.end(() => { setImmediate(() => {
// Ensure that write was dispatched // Ensure that write is dispatched asynchronously.
setImmediate(() => { this.stream.end(() => {
this.finishShutdown(handle, 0); this.finishShutdown(handle, 0);
}); });
}); });

View File

@ -335,7 +335,7 @@ function onSocketFinish() {
} }
function afterShutdown(status, handle, req) { function afterShutdown(status, handle) {
var self = handle.owner; var self = handle.owner;
debug('afterShutdown destroyed=%j', self.destroyed, debug('afterShutdown destroyed=%j', self.destroyed,
@ -869,12 +869,12 @@ protoGetter('bytesWritten', function bytesWritten() {
}); });
function afterWrite(status, handle, req, err) { function afterWrite(status, handle, err) {
var self = handle.owner; var self = handle.owner;
if (self !== process.stderr && self !== process.stdout) if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status); debug('afterWrite', status);
if (req.async) if (this.async)
self[kLastWriteQueueSize] = 0; self[kLastWriteQueueSize] = 0;
// callback may come after call to destroy. // callback may come after call to destroy.
@ -884,9 +884,9 @@ function afterWrite(status, handle, req, err) {
} }
if (status < 0) { if (status < 0) {
var ex = errnoException(status, 'write', req.error); var ex = errnoException(status, 'write', this.error);
debug('write failure', ex); debug('write failure', ex);
self.destroy(ex, req.cb); self.destroy(ex, this.cb);
return; return;
} }
@ -895,8 +895,8 @@ function afterWrite(status, handle, req, err) {
if (self !== process.stderr && self !== process.stdout) if (self !== process.stderr && self !== process.stdout)
debug('afterWrite call cb'); debug('afterWrite call cb');
if (req.cb) if (this.cb)
req.cb.call(undefined); this.cb.call(undefined);
} }

View File

@ -306,6 +306,7 @@ class ModuleWrap;
V(script_context_constructor_template, v8::FunctionTemplate) \ V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \ V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \ V(secure_context_constructor_template, v8::FunctionTemplate) \
V(shutdown_wrap_constructor_function, v8::Function) \
V(tcp_constructor_template, v8::FunctionTemplate) \ V(tcp_constructor_template, v8::FunctionTemplate) \
V(tick_callback_function, v8::Function) \ V(tick_callback_function, v8::Function) \
V(timers_callback_function, v8::Function) \ V(timers_callback_function, v8::Function) \

View File

@ -91,8 +91,6 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
req_wrap->object() req_wrap->object()
}; };
req_wrap->Dispatched();
TryCatch try_catch(env()->isolate()); TryCatch try_catch(env()->isolate());
Local<Value> value; Local<Value> value;
int value_int = UV_EPROTO; int value_int = UV_EPROTO;
@ -127,8 +125,6 @@ int JSStream::DoWrite(WriteWrap* w,
bufs_arr bufs_arr
}; };
w->Dispatched();
TryCatch try_catch(env()->isolate()); TryCatch try_catch(env()->isolate());
Local<Value> value; Local<Value> value;
int value_int = UV_EPROTO; int value_int = UV_EPROTO;
@ -154,9 +150,8 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {
template <class Wrap> template <class Wrap>
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) { void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
Wrap* w;
CHECK(args[0]->IsObject()); CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>()); Wrap* w = static_cast<Wrap*>(StreamReq::FromObject(args[0].As<Object>()));
w->Done(args[1]->Int32Value()); w->Done(args[1]->Int32Value());
} }

View File

@ -1552,18 +1552,9 @@ void Http2Session::SendPendingData() {
chunks_sent_since_last_write_++; chunks_sent_since_last_write_++;
// DoTryWrite may modify both the buffer list start itself and the StreamWriteResult res = underlying_stream()->Write(*bufs, count);
// base pointers/length of the individual buffers. if (!res.async) {
uv_buf_t* writebufs = *bufs; ClearOutgoing(res.err);
if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) {
// All writes finished synchronously, nothing more to do here.
ClearOutgoing(0);
return;
}
WriteWrap* req = AllocateSend();
if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) {
req->Dispose();
} }
DEBUG_HTTP2SESSION2(this, "wants data in return? %d", DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
@ -1649,15 +1640,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
chunks_sent_since_last_write_ = n; chunks_sent_since_last_write_ = n;
} }
// Allocates the data buffer used to pass outbound data to the i/o stream.
WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate());
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
return WriteWrap::New(env(), obj, static_cast<StreamBase*>(stream_));
}
// Callback used to receive inbound data from the i/o stream // Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
Http2Scope h2scope(this); Http2Scope h2scope(this);
@ -1833,20 +1815,15 @@ inline void Http2Stream::Close(int32_t code) {
DEBUG_HTTP2STREAM2(this, "closed with code %d", code); DEBUG_HTTP2STREAM2(this, "closed with code %d", code);
} }
inline void Http2Stream::Shutdown() {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
DEBUG_HTTP2STREAM(this, "writable side shutdown");
}
int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) { int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
CHECK(!this->IsDestroyed()); CHECK(!this->IsDestroyed());
req_wrap->Dispatched(); {
Shutdown(); Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
DEBUG_HTTP2STREAM(this, "writable side shutdown");
}
req_wrap->Done(0); req_wrap->Done(0);
return 0; return 0;
} }
@ -2038,7 +2015,6 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap,
CHECK_EQ(send_handle, nullptr); CHECK_EQ(send_handle, nullptr);
Http2Scope h2scope(this); Http2Scope h2scope(this);
session_->SetChunksSinceLastWrite(); session_->SetChunksSinceLastWrite();
req_wrap->Dispatched();
if (!IsWritable()) { if (!IsWritable()) {
req_wrap->Done(UV_EOF); req_wrap->Done(UV_EOF);
return 0; return 0;

View File

@ -601,9 +601,6 @@ class Http2Stream : public AsyncWrap,
inline void Close(int32_t code); inline void Close(int32_t code);
// Shutdown the writable side of the stream
inline void Shutdown();
// Destroy this stream instance and free all held memory. // Destroy this stream instance and free all held memory.
inline void Destroy(); inline void Destroy();
@ -818,6 +815,10 @@ class Http2Session : public AsyncWrap, public StreamListener {
inline void EmitStatistics(); inline void EmitStatistics();
inline StreamBase* underlying_stream() {
return static_cast<StreamBase*>(stream_);
}
void Start(); void Start();
void Stop(); void Stop();
void Close(uint32_t code = NGHTTP2_NO_ERROR, void Close(uint32_t code = NGHTTP2_NO_ERROR,
@ -907,8 +908,6 @@ class Http2Session : public AsyncWrap, public StreamListener {
template <get_setting fn> template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args); static void GetSettings(const FunctionCallbackInfo<Value>& args);
WriteWrap* AllocateSend();
uv_loop_t* event_loop() const { uv_loop_t* event_loop() const {
return env()->event_loop(); return env()->event_loop();
} }

View File

@ -33,6 +33,11 @@ void ReqWrap<T>::Dispatched() {
req_.data = this; req_.data = this;
} }
template <typename T>
ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
return ContainerOf(&ReqWrap<T>::req_, req);
}
} // namespace node } // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

View File

@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap {
inline void Dispatched(); // Call this after the req has been dispatched. inline void Dispatched(); // Call this after the req has been dispatched.
T* req() { return &req_; } T* req() { return &req_; }
static ReqWrap* from_req(T* req);
private: private:
friend class Environment; friend class Environment;
friend int GenDebugSymbols(); friend int GenDebugSymbols();

View File

@ -25,6 +25,25 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks; using AsyncHooks = Environment::AsyncHooks;
inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField),
nullptr);
req_wrap_obj->SetAlignedPointerInInternalField(kStreamReqField, this);
}
inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
return static_cast<StreamReq*>(
req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField));
}
inline void StreamReq::Dispose() {
object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr);
delete this;
}
inline v8::Local<v8::Object> StreamReq::object() {
return GetAsyncWrap()->object();
}
inline StreamListener::~StreamListener() { inline StreamListener::~StreamListener() {
if (stream_ != nullptr) if (stream_ != nullptr)
@ -36,6 +55,15 @@ inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
} }
inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
CHECK_NE(previous_listener_, nullptr);
previous_listener_->OnStreamAfterShutdown(w, status);
}
inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
CHECK_NE(previous_listener_, nullptr);
previous_listener_->OnStreamAfterWrite(w, status);
}
inline StreamResource::~StreamResource() { inline StreamResource::~StreamResource() {
while (listener_ != nullptr) { while (listener_ != nullptr) {
@ -93,6 +121,9 @@ inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
listener_->OnStreamAfterWrite(w, status); listener_->OnStreamAfterWrite(w, status);
} }
inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
listener_->OnStreamAfterShutdown(w, status);
}
inline StreamBase::StreamBase(Environment* env) : env_(env) { inline StreamBase::StreamBase(Environment* env) : env_(env) {
PushStreamListener(&default_listener_); PushStreamListener(&default_listener_);
@ -102,6 +133,150 @@ inline Environment* StreamBase::stream_env() const {
return env_; return env_;
} }
inline void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
AfterRequest(req_wrap, [&]() {
EmitAfterWrite(req_wrap, status);
});
}
inline void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
AfterRequest(req_wrap, [&]() {
EmitAfterShutdown(req_wrap, status);
});
}
template<typename Wrap, typename EmitEvent>
inline void StreamBase::AfterRequest(Wrap* req_wrap, EmitEvent emit) {
Environment* env = stream_env();
v8::HandleScope handle_scope(env->isolate());
v8::Context::Scope context_scope(env->context());
emit();
req_wrap->Dispose();
}
inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();
if (req_wrap_obj.IsEmpty()) {
req_wrap_obj =
env->shutdown_wrap_constructor_function()
->NewInstance(env->context()).ToLocalChecked();
}
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(
env, GetAsyncWrap()->get_async_id());
ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
int err = DoShutdown(req_wrap);
if (err != 0) {
req_wrap->Dispose();
}
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
return err;
}
inline StreamWriteResult StreamBase::Write(
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle,
v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();
int err;
if (send_handle == nullptr) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
return StreamWriteResult { false, err, nullptr };
}
}
if (req_wrap_obj.IsEmpty()) {
req_wrap_obj =
env->write_wrap_constructor_function()
->NewInstance(env->context()).ToLocalChecked();
}
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(
env, GetAsyncWrap()->get_async_id());
WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
err = DoWrite(req_wrap, bufs, count, send_handle);
bool async = err == 0;
if (!async) {
req_wrap->Dispose();
req_wrap = nullptr;
}
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
req_wrap_obj->Set(env->async(), v8::Boolean::New(env->isolate(), async));
return StreamWriteResult { async, err, req_wrap };
}
template<typename OtherBase, bool kResetPersistent>
SimpleShutdownWrap<OtherBase, kResetPersistent>::SimpleShutdownWrap(
StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj)
: ShutdownWrap(stream, req_wrap_obj),
OtherBase(stream->stream_env(),
req_wrap_obj,
AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this));
}
template<typename OtherBase, bool kResetPersistent>
SimpleShutdownWrap<OtherBase, kResetPersistent>::~SimpleShutdownWrap() {
ClearWrap(static_cast<AsyncWrap*>(this)->object());
if (kResetPersistent) {
auto& persistent = static_cast<AsyncWrap*>(this)->persistent();
CHECK_EQ(persistent.IsEmpty(), false);
persistent.Reset();
}
}
inline ShutdownWrap* StreamBase::CreateShutdownWrap(
v8::Local<v8::Object> object) {
return new SimpleShutdownWrap<AsyncWrap>(this, object);
}
template<typename OtherBase, bool kResetPersistent>
SimpleWriteWrap<OtherBase, kResetPersistent>::SimpleWriteWrap(
StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj)
: WriteWrap(stream, req_wrap_obj),
OtherBase(stream->stream_env(),
req_wrap_obj,
AsyncWrap::PROVIDER_WRITEWRAP) {
Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this));
}
template<typename OtherBase, bool kResetPersistent>
SimpleWriteWrap<OtherBase, kResetPersistent>::~SimpleWriteWrap() {
ClearWrap(static_cast<AsyncWrap*>(this)->object());
if (kResetPersistent) {
auto& persistent = static_cast<AsyncWrap*>(this)->persistent();
CHECK_EQ(persistent.IsEmpty(), false);
persistent.Reset();
}
}
inline WriteWrap* StreamBase::CreateWriteWrap(
v8::Local<v8::Object> object) {
return new SimpleWriteWrap<AsyncWrap>(this, object);
}
template <class Base> template <class Base>
void StreamBase::AddMethods(Environment* env, void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate> t, Local<FunctionTemplate> t,
@ -230,38 +405,35 @@ inline void ShutdownWrap::OnDone(int status) {
stream()->AfterShutdown(this, status); stream()->AfterShutdown(this, status);
} }
inline void WriteWrap::SetAllocatedStorage(char* data, size_t size) {
WriteWrap* WriteWrap::New(Environment* env, CHECK_EQ(storage_, nullptr);
Local<Object> obj, storage_ = data;
StreamBase* wrap, storage_size_ = size;
size_t extra) {
size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
char* storage = new char[storage_size];
return new(storage) WriteWrap(env, obj, wrap, storage_size);
} }
inline char* WriteWrap::Storage() {
void WriteWrap::Dispose() { return storage_;
this->~WriteWrap();
delete[] reinterpret_cast<char*>(this);
} }
inline size_t WriteWrap::StorageSize() const {
char* WriteWrap::Extra(size_t offset) { return storage_size_;
return reinterpret_cast<char*>(this) +
ROUND_UP(sizeof(*this), kAlignSize) +
offset;
}
size_t WriteWrap::ExtraSize() const {
return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
} }
inline void WriteWrap::OnDone(int status) { inline void WriteWrap::OnDone(int status) {
stream()->AfterWrite(this, status); stream()->AfterWrite(this, status);
} }
inline void StreamReq::Done(int status, const char* error_str) {
AsyncWrap* async_wrap = GetAsyncWrap();
Environment* env = async_wrap->env();
if (error_str != nullptr) {
async_wrap->object()->Set(env->error_string(),
OneByteString(env->isolate(), error_str));
}
OnDone(status);
}
} // namespace node } // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

View File

@ -34,6 +34,11 @@ template int StreamBase::WriteString<LATIN1>(
const FunctionCallbackInfo<Value>& args); const FunctionCallbackInfo<Value>& args);
struct Free {
void operator()(char* ptr) const { free(ptr); }
};
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) { int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
return ReadStart(); return ReadStart();
} }
@ -45,45 +50,10 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) { int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsObject()); CHECK(args[0]->IsObject());
Local<Object> req_wrap_obj = args[0].As<Object>(); Local<Object> req_wrap_obj = args[0].As<Object>();
AsyncWrap* wrap = GetAsyncWrap(); return Shutdown(req_wrap_obj);
CHECK_NE(wrap, nullptr);
AsyncHooks::DefaultTriggerAsyncIdScope(env, wrap->get_async_id());
ShutdownWrap* req_wrap = new ShutdownWrap(env,
req_wrap_obj,
this);
int err = DoShutdown(req_wrap);
if (err)
delete req_wrap;
return err;
}
void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
Environment* env = req_wrap->env();
// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = {
Integer::New(env->isolate(), status),
GetObject(),
req_wrap_obj
};
if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
delete req_wrap;
} }
@ -104,19 +74,14 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
count = chunks->Length() >> 1; count = chunks->Length() >> 1;
MaybeStackBuffer<uv_buf_t, 16> bufs(count); MaybeStackBuffer<uv_buf_t, 16> bufs(count);
uv_buf_t* buf_list = *bufs;
size_t storage_size = 0; size_t storage_size = 0;
uint32_t bytes = 0; uint32_t bytes = 0;
size_t offset; size_t offset;
WriteWrap* req_wrap;
int err;
if (!all_buffers) { if (!all_buffers) {
// Determine storage size first // Determine storage size first
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
Local<Value> chunk = chunks->Get(i * 2); Local<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk)) if (Buffer::HasInstance(chunk))
@ -145,20 +110,11 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
bufs[i].len = Buffer::Length(chunk); bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len; bytes += bufs[i].len;
} }
// Try writing immediately without allocation
err = DoTryWrite(&buf_list, &count);
if (err != 0 || count == 0)
goto done;
} }
{ std::unique_ptr<char[], Free> storage;
AsyncWrap* wrap = GetAsyncWrap(); if (storage_size > 0)
CHECK_NE(wrap, nullptr); storage = std::unique_ptr<char[], Free>(Malloc(storage_size));
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
}
offset = 0; offset = 0;
if (!all_buffers) { if (!all_buffers) {
@ -174,9 +130,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
} }
// Write string // Write string
offset = ROUND_UP(offset, WriteWrap::kAlignSize);
CHECK_LE(offset, storage_size); CHECK_LE(offset, storage_size);
char* str_storage = req_wrap->Extra(offset); char* str_storage = storage.get() + offset;
size_t str_size = storage_size - offset; size_t str_size = storage_size - offset;
Local<String> string = chunk->ToString(env->context()).ToLocalChecked(); Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
@ -192,35 +147,17 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
offset += str_size; offset += str_size;
bytes += str_size; bytes += str_size;
} }
err = DoTryWrite(&buf_list, &count);
if (err != 0 || count == 0) {
req_wrap->Dispatched();
req_wrap->Dispose();
goto done;
}
} }
err = DoWrite(req_wrap, buf_list, count, nullptr); StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
req_wrap_obj->Set(env->async(), True(env->isolate()));
if (err)
req_wrap->Dispose();
done:
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes)); req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
if (res.wrap != nullptr && storage) {
return err; res.wrap->SetAllocatedStorage(storage.release(), storage_size);
}
return res.err;
} }
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) { int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject()); CHECK(args[0]->IsObject());
@ -232,49 +169,20 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
} }
Local<Object> req_wrap_obj = args[0].As<Object>(); Local<Object> req_wrap_obj = args[0].As<Object>();
const char* data = Buffer::Data(args[1]);
size_t length = Buffer::Length(args[1]);
WriteWrap* req_wrap;
uv_buf_t buf; uv_buf_t buf;
buf.base = const_cast<char*>(data); buf.base = Buffer::Data(args[1]);
buf.len = length; buf.len = Buffer::Length(args[1]);
// Try writing immediately without allocation StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
uv_buf_t* bufs = &buf;
size_t count = 1;
int err = DoTryWrite(&bufs, &count);
if (err != 0)
goto done;
if (count == 0)
goto done;
CHECK_EQ(count, 1);
// Allocate, or write rest if (res.async)
{ req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
AsyncWrap* wrap = GetAsyncWrap(); req_wrap_obj->Set(env->context(), env->bytes_string(),
CHECK_NE(wrap, nullptr); Integer::NewFromUnsigned(env->isolate(), buf.len))
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, .FromJust();
wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this);
}
err = DoWrite(req_wrap, bufs, count, nullptr); return res.err;
req_wrap_obj->Set(env->async(), True(env->isolate()));
req_wrap_obj->Set(env->buffer_string(), args[1]);
if (err)
req_wrap->Dispose();
done:
const char* msg = Error();
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), length));
return err;
} }
@ -305,8 +213,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
return UV_ENOBUFS; return UV_ENOBUFS;
// Try writing immediately if write size isn't too big // Try writing immediately if write size isn't too big
WriteWrap* req_wrap;
char* data;
char stack_storage[16384]; // 16kb char stack_storage[16384]; // 16kb
size_t data_size; size_t data_size;
uv_buf_t buf; uv_buf_t buf;
@ -325,36 +231,33 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
size_t count = 1; size_t count = 1;
err = DoTryWrite(&bufs, &count); err = DoTryWrite(&bufs, &count);
// Failure // Immediate failure or success
if (err != 0) if (err != 0 || count == 0) {
goto done; req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
.FromJust();
// Success req_wrap_obj->Set(env->context(),
if (count == 0) env->bytes_string(),
goto done; Integer::NewFromUnsigned(env->isolate(), data_size))
.FromJust();
return err;
}
// Partial write // Partial write
CHECK_EQ(count, 1); CHECK_EQ(count, 1);
} }
{ std::unique_ptr<char[], Free> data;
AsyncWrap* wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
}
data = req_wrap->Extra();
if (try_write) { if (try_write) {
// Copy partial data // Copy partial data
memcpy(data, buf.base, buf.len); data = std::unique_ptr<char[], Free>(Malloc(buf.len));
memcpy(data.get(), buf.base, buf.len);
data_size = buf.len; data_size = buf.len;
} else { } else {
// Write it // Write it
data = std::unique_ptr<char[], Free>(Malloc(storage_size));
data_size = StringBytes::Write(env->isolate(), data_size = StringBytes::Write(env->isolate(),
data, data.get(),
storage_size, storage_size,
string, string,
enc); enc);
@ -362,78 +265,36 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
CHECK_LE(data_size, storage_size); CHECK_LE(data_size, storage_size);
buf = uv_buf_init(data, data_size); buf = uv_buf_init(data.get(), data_size);
if (!IsIPCPipe()) { uv_stream_t* send_handle = nullptr;
err = DoWrite(req_wrap, &buf, 1, nullptr);
} else {
uv_handle_t* send_handle = nullptr;
if (!send_handle_obj.IsEmpty()) { if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
HandleWrap* wrap; // TODO(addaleax): This relies on the fact that HandleWrap comes first
ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); // as a superclass of each individual subclass.
send_handle = wrap->GetHandle(); // There are similar assumptions in other places in the code base.
// Reference LibuvStreamWrap instance to prevent it from being garbage // A better idea would be having all BaseObject's internal pointers
// collected before `AfterWrite` is called. // refer to the BaseObject* itself; this would require refactoring
CHECK_EQ(false, req_wrap->persistent().IsEmpty()); // throughout the code base but makes Node rely much less on C++ quirks.
req_wrap_obj->Set(env->handle_string(), send_handle_obj); HandleWrap* wrap;
} ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
err = DoWrite( // Reference LibuvStreamWrap instance to prevent it from being garbage
req_wrap, // collected before `AfterWrite` is called.
&buf, req_wrap_obj->Set(env->handle_string(), send_handle_obj);
1,
reinterpret_cast<uv_stream_t*>(send_handle));
} }
req_wrap_obj->Set(env->async(), True(env->isolate())); StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
if (err) req_wrap_obj->Set(env->context(), env->bytes_string(),
req_wrap->Dispose(); Integer::NewFromUnsigned(env->isolate(), data_size))
.FromJust();
done: if (res.wrap != nullptr) {
const char* msg = Error(); res.wrap->SetAllocatedStorage(data.release(), data_size);
if (msg != nullptr) {
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
ClearError();
}
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), data_size));
return err;
}
void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
// The wrap and request objects should still be there.
CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
// Unref handle property
Local<Object> req_wrap_obj = req_wrap->object();
req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
EmitAfterWrite(req_wrap, status);
Local<Value> argv[] = {
Integer::New(env->isolate(), status),
GetObject(),
req_wrap_obj,
Undefined(env->isolate())
};
const char* msg = Error();
if (msg != nullptr) {
argv[3] = OneByteString(env->isolate(), msg);
ClearError();
} }
if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) return res.err;
req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
req_wrap->Dispose();
} }
@ -510,4 +371,39 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
stream->CallJSOnreadMethod(nread, obj); stream->CallJSOnreadMethod(nread, obj);
} }
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
StreamReq* req_wrap, int status) {
StreamBase* stream = static_cast<StreamBase*>(stream_);
Environment* env = stream->stream_env();
AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
Local<Object> req_wrap_obj = async_wrap->object();
Local<Value> argv[] = {
Integer::New(env->isolate(), status),
stream->GetObject(),
Undefined(env->isolate())
};
const char* msg = stream->Error();
if (msg != nullptr) {
argv[2] = OneByteString(env->isolate(), msg);
stream->ClearError();
}
if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
}
void ReportWritesToJSStreamListener::OnStreamAfterWrite(
WriteWrap* req_wrap, int status) {
OnStreamAfterReqFinished(req_wrap, status);
}
void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
ShutdownWrap* req_wrap, int status) {
OnStreamAfterReqFinished(req_wrap, status);
}
} // namespace node } // namespace node

View File

@ -14,114 +14,75 @@
namespace node { namespace node {
// Forward declarations // Forward declarations
class ShutdownWrap;
class WriteWrap;
class StreamBase; class StreamBase;
class StreamResource; class StreamResource;
template<typename Base> struct StreamWriteResult {
bool async;
int err;
WriteWrap* wrap;
};
class StreamReq { class StreamReq {
public: public:
explicit StreamReq(StreamBase* stream) : stream_(stream) { static constexpr int kStreamReqField = 1;
explicit StreamReq(StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
AttachToObject(req_wrap_obj);
} }
inline void Done(int status, const char* error_str = nullptr) { virtual ~StreamReq() {}
Base* req = static_cast<Base*>(this); virtual AsyncWrap* GetAsyncWrap() = 0;
Environment* env = req->env(); v8::Local<v8::Object> object();
if (error_str != nullptr) {
req->object()->Set(env->error_string(),
OneByteString(env->isolate(), error_str));
}
req->OnDone(status); void Done(int status, const char* error_str = nullptr);
} void Dispose();
inline StreamBase* stream() const { return stream_; } inline StreamBase* stream() const { return stream_; }
static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
protected:
virtual void OnDone(int status) = 0;
void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
private: private:
StreamBase* const stream_; StreamBase* const stream_;
}; };
class ShutdownWrap : public ReqWrap<uv_shutdown_t>, class ShutdownWrap : public StreamReq {
public StreamReq<ShutdownWrap> {
public: public:
ShutdownWrap(Environment* env, ShutdownWrap(StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj, v8::Local<v8::Object> req_wrap_obj)
StreamBase* stream) : StreamReq(stream, req_wrap_obj) { }
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
StreamReq<ShutdownWrap>(stream) {
Wrap(req_wrap_obj, this);
}
~ShutdownWrap() { void OnDone(int status) override; // Just calls stream()->AfterShutdown()
ClearWrap(object());
}
static ShutdownWrap* from_req(uv_shutdown_t* req) {
return ContainerOf(&ShutdownWrap::req_, req);
}
size_t self_size() const override { return sizeof(*this); }
inline void OnDone(int status); // Just calls stream()->AfterShutdown()
}; };
class WriteWrap : public ReqWrap<uv_write_t>, class WriteWrap : public StreamReq {
public StreamReq<WriteWrap> {
public: public:
static inline WriteWrap* New(Environment* env, char* Storage();
v8::Local<v8::Object> obj, size_t StorageSize() const;
StreamBase* stream, void SetAllocatedStorage(char* data, size_t size);
size_t extra = 0);
inline void Dispose();
inline char* Extra(size_t offset = 0);
inline size_t ExtraSize() const;
size_t self_size() const override { return storage_size_; } WriteWrap(StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj)
static WriteWrap* from_req(uv_write_t* req) { : StreamReq(stream, req_wrap_obj) { }
return ContainerOf(&WriteWrap::req_, req);
}
static const size_t kAlignSize = 16;
WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* stream)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(stream),
storage_size_(0) {
Wrap(obj, this);
}
inline void OnDone(int status); // Just calls stream()->AfterWrite()
protected:
WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* stream,
size_t storage_size)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(stream),
storage_size_(storage_size) {
Wrap(obj, this);
}
~WriteWrap() { ~WriteWrap() {
ClearWrap(object()); free(storage_);
} }
void* operator new(size_t size) = delete; void OnDone(int status) override; // Just calls stream()->AfterWrite()
void* operator new(size_t size, char* storage) { return storage; }
// This is just to keep the compiler happy. It should never be called, since
// we don't use exceptions in node.
void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
private: private:
// People should not be using the non-placement new and delete operator on a char* storage_ = nullptr;
// WriteWrap. Ensure this never happens. size_t storage_size_ = 0;
void operator delete(void* ptr) { UNREACHABLE(); }
const size_t storage_size_;
}; };
@ -147,15 +108,23 @@ class StreamListener {
// `OnStreamRead()` is called when data is available on the socket and has // `OnStreamRead()` is called when data is available on the socket and has
// been read into the buffer provided by `OnStreamAlloc()`. // been read into the buffer provided by `OnStreamAlloc()`.
// The `buf` argument is the return value of `uv_buf_t`, or may be a buffer // The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
// with base nullpptr in case of an error. // with base nullptr in case of an error.
// `nread` is the number of read bytes (which is at most the buffer length), // `nread` is the number of read bytes (which is at most the buffer length),
// or, if negative, a libuv error code. // or, if negative, a libuv error code.
virtual void OnStreamRead(ssize_t nread, virtual void OnStreamRead(ssize_t nread,
const uv_buf_t& buf) = 0; const uv_buf_t& buf) = 0;
// This is called once a Write has finished. `status` may be 0 or, // This is called once a write has finished. `status` may be 0 or,
// if negative, a libuv error code. // if negative, a libuv error code.
virtual void OnStreamAfterWrite(WriteWrap* w, int status) {} // By default, this is simply passed on to the previous listener
// (and raises an assertion if there is none).
virtual void OnStreamAfterWrite(WriteWrap* w, int status);
// This is called once a shutdown has finished. `status` may be 0 or,
// if negative, a libuv error code.
// By default, this is simply passed on to the previous listener
// (and raises an assertion if there is none).
virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
// This is called immediately before the stream is destroyed. // This is called immediately before the stream is destroyed.
virtual void OnStreamDestroy() {} virtual void OnStreamDestroy() {}
@ -174,9 +143,21 @@ class StreamListener {
}; };
// An (incomplete) stream listener class that calls the `.oncomplete()`
// method of the JS objects associated with the wrap objects.
class ReportWritesToJSStreamListener : public StreamListener {
public:
void OnStreamAfterWrite(WriteWrap* w, int status) override;
void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
private:
void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
};
// A default emitter that just pushes data chunks as Buffer instances to // A default emitter that just pushes data chunks as Buffer instances to
// JS land via the handles .ondata method. // JS land via the handles .ondata method.
class EmitToJSStreamListener : public StreamListener { class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
public: public:
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
}; };
@ -188,20 +169,31 @@ class StreamResource {
public: public:
virtual ~StreamResource(); virtual ~StreamResource();
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0; // These need to be implemented on the readable side of this stream:
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
// Start reading from the underlying resource. This is called by the consumer // Start reading from the underlying resource. This is called by the consumer
// when more data is desired. // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
// pass data along to the consumer.
virtual int ReadStart() = 0; virtual int ReadStart() = 0;
// Stop reading from the underlying resource. This is called by the // Stop reading from the underlying resource. This is called by the
// consumer when its buffers are full and no more data can be handled. // consumer when its buffers are full and no more data can be handled.
virtual int ReadStop() = 0; virtual int ReadStop() = 0;
// These need to be implemented on the writable side of this stream:
// All of these methods may return an error code synchronously.
// In that case, the finish callback should *not* be called.
// Perform a shutdown operation, and call req_wrap->Done() when finished.
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
// Try to write as much data as possible synchronously, and modify
// `*bufs` and `*count` accordingly. This is a no-op by default.
virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
// Perform a write of data, and call req_wrap->Done() when finished.
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
// Optionally, this may provide an error message to be used for // Optionally, this may provide an error message to be used for
// failing writes. // failing writes.
virtual const char* Error() const; virtual const char* Error() const;
@ -223,6 +215,8 @@ class StreamResource {
void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0)); void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
// Call the current listener's OnStreamAfterWrite() method. // Call the current listener's OnStreamAfterWrite() method.
void EmitAfterWrite(WriteWrap* w, int status); void EmitAfterWrite(WriteWrap* w, int status);
// Call the current listener's OnStreamAfterShutdown() method.
void EmitAfterShutdown(ShutdownWrap* w, int status);
StreamListener* listener_ = nullptr; StreamListener* listener_ = nullptr;
uint64_t bytes_read_ = 0; uint64_t bytes_read_ = 0;
@ -251,21 +245,40 @@ class StreamBase : public StreamResource {
void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf); void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
virtual void AfterWrite(WriteWrap* req, int status);
// This is named `stream_env` to avoid name clashes, because a lot of // This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s. // subclasses are also `BaseObject`s.
Environment* stream_env() const; Environment* stream_env() const;
protected: // Shut down the current stream. This request can use an existing
explicit StreamBase(Environment* env); // ShutdownWrap object (that was created in JS), or a new one will be created.
int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
// Write data to the current stream. This request can use an existing
// WriteWrap object (that was created in JS), or a new one will be created.
// This will first try to write synchronously using `DoTryWrite()`, then
// asynchronously using `DoWrite()`.
// If the return value indicates a synchronous completion, no callback will
// be invoked.
StreamWriteResult Write(
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle = nullptr,
v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
// These can be overridden by subclasses to get more specific wrap instances.
// For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
// (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
// an associated libuv request.
virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
// One of these must be implemented // One of these must be implemented
virtual AsyncWrap* GetAsyncWrap() = 0; virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject(); virtual v8::Local<v8::Object> GetObject();
protected:
explicit StreamBase(Environment* env);
// JS Methods // JS Methods
int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args); int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args); int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
@ -292,6 +305,43 @@ class StreamBase : public StreamResource {
private: private:
Environment* env_; Environment* env_;
EmitToJSStreamListener default_listener_; EmitToJSStreamListener default_listener_;
// These are called by the respective {Write,Shutdown}Wrap class.
void AfterShutdown(ShutdownWrap* req, int status);
void AfterWrite(WriteWrap* req, int status);
template <typename Wrap, typename EmitEvent>
void AfterRequest(Wrap* req_wrap, EmitEvent emit);
friend class WriteWrap;
friend class ShutdownWrap;
};
// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
// `OtherBase` must have a constructor that matches the `AsyncWrap`
// constructorss (Environment*, Local<Object>, AsyncWrap::Provider) signature
// and be a subclass of `AsyncWrap`.
template <typename OtherBase, bool kResetPersistentOnDestroy = true>
class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
public:
SimpleShutdownWrap(StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj);
~SimpleShutdownWrap();
AsyncWrap* GetAsyncWrap() override { return this; }
size_t self_size() const override { return sizeof(*this); }
};
template <typename OtherBase, bool kResetPersistentOnDestroy = true>
class SimpleWriteWrap : public WriteWrap, public OtherBase {
public:
SimpleWriteWrap(StreamBase* stream,
v8::Local<v8::Object> req_wrap_obj);
~SimpleWriteWrap();
AsyncWrap* GetAsyncWrap() override { return this; }
size_t self_size() const override { return sizeof(*this) + StorageSize(); }
}; };
} // namespace node } // namespace node

View File

@ -61,19 +61,22 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
[](const FunctionCallbackInfo<Value>& args) { [](const FunctionCallbackInfo<Value>& args) {
CHECK(args.IsConstructCall()); CHECK(args.IsConstructCall());
ClearWrap(args.This()); ClearWrap(args.This());
args.This()->SetAlignedPointerInInternalField(
StreamReq::kStreamReqField, nullptr);
}; };
Local<FunctionTemplate> sw = Local<FunctionTemplate> sw =
FunctionTemplate::New(env->isolate(), is_construct_call_callback); FunctionTemplate::New(env->isolate(), is_construct_call_callback);
sw->InstanceTemplate()->SetInternalFieldCount(1); sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1);
Local<String> wrapString = Local<String> wrapString =
FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap"); FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap");
sw->SetClassName(wrapString); sw->SetClassName(wrapString);
AsyncWrap::AddWrapMethods(env, sw); AsyncWrap::AddWrapMethods(env, sw);
target->Set(wrapString, sw->GetFunction()); target->Set(wrapString, sw->GetFunction());
env->set_shutdown_wrap_constructor_function(sw->GetFunction());
Local<FunctionTemplate> ww = Local<FunctionTemplate> ww =
FunctionTemplate::New(env->isolate(), is_construct_call_callback); FunctionTemplate::New(env->isolate(), is_construct_call_callback);
ww->InstanceTemplate()->SetInternalFieldCount(1); ww->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1);
Local<String> writeWrapString = Local<String> writeWrapString =
FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"); FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
ww->SetClassName(writeWrapString); ww->SetClassName(writeWrapString);
@ -261,8 +264,20 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable)); args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
} }
typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>, false> LibuvShutdownWrap;
typedef SimpleWriteWrap<ReqWrap<uv_write_t>, false> LibuvWriteWrap;
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) { ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
return new LibuvShutdownWrap(this, object);
}
WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
return new LibuvWriteWrap(this, object);
}
int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
int err; int err;
err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown); err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown);
req_wrap->Dispatched(); req_wrap->Dispatched();
@ -271,7 +286,8 @@ int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) { void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
ShutdownWrap* req_wrap = ShutdownWrap::from_req(req); LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
LibuvShutdownWrap::from_req(req));
CHECK_NE(req_wrap, nullptr); CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate()); HandleScope scope(req_wrap->env()->isolate());
Context::Scope context_scope(req_wrap->env()->context()); Context::Scope context_scope(req_wrap->env()->context());
@ -319,10 +335,11 @@ int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
} }
int LibuvStreamWrap::DoWrite(WriteWrap* w, int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
uv_buf_t* bufs, uv_buf_t* bufs,
size_t count, size_t count,
uv_stream_t* send_handle) { uv_stream_t* send_handle) {
LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
int r; int r;
if (send_handle == nullptr) { if (send_handle == nullptr) {
r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite); r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite);
@ -349,7 +366,8 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = WriteWrap::from_req(req); LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
LibuvWriteWrap::from_req(req));
CHECK_NE(req_wrap, nullptr); CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate()); HandleScope scope(req_wrap->env()->isolate());
Context::Scope context_scope(req_wrap->env()->context()); Context::Scope context_scope(req_wrap->env()->context());

View File

@ -73,6 +73,9 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
return stream()->type == UV_TCP; return stream()->type == UV_TCP;
} }
ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object) override;
WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object) override;
protected: protected:
LibuvStreamWrap(Environment* env, LibuvStreamWrap(Environment* env,
v8::Local<v8::Object> object, v8::Local<v8::Object> object,

View File

@ -285,37 +285,29 @@ void TLSWrap::EncOut() {
for (size_t i = 0; i < count; i++) for (size_t i = 0; i < count; i++)
buf[i] = uv_buf_init(data[i], size[i]); buf[i] = uv_buf_init(data[i], size[i]);
int err = stream_->DoTryWrite(&bufs, &count); StreamWriteResult res = underlying_stream()->Write(bufs, count);
if (err != 0) { if (res.err != 0) {
InvokeQueued(err); InvokeQueued(res.err);
} else if (count == 0) {
env()->SetImmediate([](Environment* env, void* data) {
NODE_COUNT_NET_BYTES_SENT(write_size_);
static_cast<TLSWrap*>(data)->OnStreamAfterWrite(nullptr, 0);
}, this, object());
return; return;
} }
Local<Object> req_wrap_obj = NODE_COUNT_NET_BYTES_SENT(write_size_);
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
WriteWrap* write_req = WriteWrap::New(env(),
req_wrap_obj,
static_cast<StreamBase*>(stream_));
err = stream_->DoWrite(write_req, buf, count, nullptr); if (!res.async) {
// Simulate asynchronous finishing, TLS cannot handle this at the moment.
// Ignore errors, this should be already handled in js env()->SetImmediate([](Environment* env, void* data) {
if (err) { static_cast<TLSWrap*>(data)->OnStreamAfterWrite(nullptr, 0);
write_req->Dispose(); }, this, object());
InvokeQueued(err);
} else {
NODE_COUNT_NET_BYTES_SENT(write_size_);
} }
} }
void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) { void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) {
// Report back to the previous listener as well. This is only needed for the
// "empty" writes that are passed through directly to the underlying stream.
if (req_wrap != nullptr)
previous_listener_->OnStreamAfterWrite(req_wrap, status);
if (ssl_ == nullptr) if (ssl_ == nullptr)
status = UV_ECANCELED; status = UV_ECANCELED;
@ -513,24 +505,24 @@ AsyncWrap* TLSWrap::GetAsyncWrap() {
bool TLSWrap::IsIPCPipe() { bool TLSWrap::IsIPCPipe() {
return static_cast<StreamBase*>(stream_)->IsIPCPipe(); return underlying_stream()->IsIPCPipe();
} }
int TLSWrap::GetFD() { int TLSWrap::GetFD() {
return static_cast<StreamBase*>(stream_)->GetFD(); return underlying_stream()->GetFD();
} }
bool TLSWrap::IsAlive() { bool TLSWrap::IsAlive() {
return ssl_ != nullptr && return ssl_ != nullptr &&
stream_ != nullptr && stream_ != nullptr &&
static_cast<StreamBase*>(stream_)->IsAlive(); underlying_stream()->IsAlive();
} }
bool TLSWrap::IsClosing() { bool TLSWrap::IsClosing() {
return static_cast<StreamBase*>(stream_)->IsClosing(); return underlying_stream()->IsClosing();
} }
@ -580,6 +572,17 @@ int TLSWrap::DoWrite(WriteWrap* w,
// However, if there is any data that should be written to the socket, // However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately // the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0) { if (BIO_pending(enc_out_) == 0) {
// We destroy the current WriteWrap* object and create a new one that
// matches the underlying stream, rather than the TLSWrap itself.
// Note: We cannot simply use w->object() because of the "optimized"
// way in which we read persistent handles; the JS object itself might be
// destroyed by w->Dispose(), and the Local<Object> we have is not a
// "real" handle in the sense the V8 is aware of its existence.
Local<Object> req_wrap_obj =
w->GetAsyncWrap()->persistent().Get(env()->isolate());
w->Dispose();
w = underlying_stream()->CreateWriteWrap(req_wrap_obj);
return stream_->DoWrite(w, bufs, count, send_handle); return stream_->DoWrite(w, bufs, count, send_handle);
} }
} }
@ -587,7 +590,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
// Store the current write wrap // Store the current write wrap
CHECK_EQ(current_write_, nullptr); CHECK_EQ(current_write_, nullptr);
current_write_ = w; current_write_ = w;
w->Dispatched();
// Write queued data // Write queued data
if (empty) { if (empty) {
@ -677,6 +679,11 @@ void TLSWrap::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
} }
ShutdownWrap* TLSWrap::CreateShutdownWrap(Local<Object> req_wrap_object) {
return underlying_stream()->CreateShutdownWrap(req_wrap_object);
}
int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) { int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) {
crypto::MarkPopErrorOnReturn mark_pop_error_on_return; crypto::MarkPopErrorOnReturn mark_pop_error_on_return;

View File

@ -65,6 +65,8 @@ class TLSWrap : public AsyncWrap,
int ReadStart() override; int ReadStart() override;
int ReadStop() override; int ReadStop() override;
ShutdownWrap* CreateShutdownWrap(
v8::Local<v8::Object> req_wrap_object) override;
int DoShutdown(ShutdownWrap* req_wrap) override; int DoShutdown(ShutdownWrap* req_wrap) override;
int DoWrite(WriteWrap* w, int DoWrite(WriteWrap* w,
uv_buf_t* bufs, uv_buf_t* bufs,
@ -78,6 +80,10 @@ class TLSWrap : public AsyncWrap,
size_t self_size() const override { return sizeof(*this); } size_t self_size() const override { return sizeof(*this); }
protected: protected:
inline StreamBase* underlying_stream() {
return static_cast<StreamBase*>(stream_);
}
static const int kClearOutChunkSize = 16384; static const int kClearOutChunkSize = 16384;
// Maximum number of bytes for hello parser // Maximum number of bytes for hello parser

View File

@ -23,10 +23,10 @@ function makeConnection() {
const err = client.shutdown(shutdownReq); const err = client.shutdown(shutdownReq);
assert.strictEqual(err, 0); assert.strictEqual(err, 0);
shutdownReq.oncomplete = function(status, client_, req_) { shutdownReq.oncomplete = function(status, client_, error) {
assert.strictEqual(0, status); assert.strictEqual(0, status);
assert.strictEqual(client, client_); assert.strictEqual(client, client_);
assert.strictEqual(shutdownReq, req_); assert.strictEqual(error, undefined);
shutdownCount++; shutdownCount++;
client.close(); client.close();
}; };