src: allow generic C++ callables in SetImmediate()

Modify the native `SetImmediate()` functions to take generic C++
callables as arguments. This makes passing arguments to the callback
easier, and in particular, it allows passing `std::unique_ptr`s
directly, which in turn makes sure that the data they point to is
deleted if the `Environment` is torn down before the callback can run.

PR-URL: https://github.com/nodejs/node/pull/28704
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2019-07-15 22:17:45 +02:00
parent 61f3a5c60a
commit 5207dec017
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
12 changed files with 202 additions and 159 deletions

View File

@ -87,7 +87,7 @@ struct AsyncWrapObject : public AsyncWrap {
SET_SELF_SIZE(AsyncWrapObject) SET_SELF_SIZE(AsyncWrapObject)
}; };
void AsyncWrap::DestroyAsyncIdsCallback(Environment* env, void* data) { void AsyncWrap::DestroyAsyncIdsCallback(Environment* env) {
Local<Function> fn = env->async_hooks_destroy_function(); Local<Function> fn = env->async_hooks_destroy_function();
TryCatchScope try_catch(env, TryCatchScope::CatchMode::kFatal); TryCatchScope try_catch(env, TryCatchScope::CatchMode::kFatal);
@ -642,7 +642,7 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) {
} }
if (env->destroy_async_id_list()->empty()) { if (env->destroy_async_id_list()->empty()) {
env->SetUnrefImmediate(DestroyAsyncIdsCallback, nullptr); env->SetUnrefImmediate(&DestroyAsyncIdsCallback);
} }
env->destroy_async_id_list()->push_back(async_id); env->destroy_async_id_list()->push_back(async_id);

View File

@ -154,7 +154,7 @@ class AsyncWrap : public BaseObject {
static void EmitTraceEventAfter(ProviderType type, double async_id); static void EmitTraceEventAfter(ProviderType type, double async_id);
void EmitTraceEventDestroy(); void EmitTraceEventDestroy();
static void DestroyAsyncIdsCallback(Environment* env, void* data); static void DestroyAsyncIdsCallback(Environment* env);
inline ProviderType provider_type() const; inline ProviderType provider_type() const;
inline ProviderType set_provider_type(ProviderType provider); inline ProviderType set_provider_type(ProviderType provider);

View File

@ -690,9 +690,9 @@ class QueryWrap : public AsyncWrap {
} }
void QueueResponseCallback(int status) { void QueueResponseCallback(int status) {
env()->SetImmediate([](Environment*, void* data) { env()->SetImmediate([this](Environment*) {
static_cast<QueryWrap*>(data)->AfterResponse(); AfterResponse();
}, this, object()); }, object());
channel_->set_query_last_ok(status != ARES_ECONNREFUSED); channel_->set_query_last_ok(status != ARES_ECONNREFUSED);
channel_->ModifyActivityQueryCount(-1); channel_->ModifyActivityQueryCount(-1);

View File

@ -753,33 +753,66 @@ inline void IsolateData::set_options(
options_ = std::move(options); options_ = std::move(options);
} }
void Environment::CreateImmediate(native_immediate_callback cb, template <typename Fn>
void* data, void Environment::CreateImmediate(Fn&& cb,
v8::Local<v8::Object> obj, v8::Local<v8::Object> keep_alive,
bool ref) { bool ref) {
native_immediate_callbacks_.push_back({ auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
cb, std::move(cb),
data, v8::Global<v8::Object>(isolate(), keep_alive),
v8::Global<v8::Object>(isolate_, obj), ref);
ref NativeImmediateCallback* prev_tail = native_immediate_callbacks_tail_;
});
native_immediate_callbacks_tail_ = callback.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(callback));
else
native_immediate_callbacks_head_ = std::move(callback);
immediate_info()->count_inc(1); immediate_info()->count_inc(1);
} }
void Environment::SetImmediate(native_immediate_callback cb, template <typename Fn>
void* data, void Environment::SetImmediate(Fn&& cb, v8::Local<v8::Object> keep_alive) {
v8::Local<v8::Object> obj) { CreateImmediate(std::move(cb), keep_alive, true);
CreateImmediate(cb, data, obj, true);
if (immediate_info()->ref_count() == 0) if (immediate_info()->ref_count() == 0)
ToggleImmediateRef(true); ToggleImmediateRef(true);
immediate_info()->ref_count_inc(1); immediate_info()->ref_count_inc(1);
} }
void Environment::SetUnrefImmediate(native_immediate_callback cb, template <typename Fn>
void* data, void Environment::SetUnrefImmediate(Fn&& cb, v8::Local<v8::Object> keep_alive) {
v8::Local<v8::Object> obj) { CreateImmediate(std::move(cb), keep_alive, false);
CreateImmediate(cb, data, obj, false); }
Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}
bool Environment::NativeImmediateCallback::is_refed() const {
return refed_;
}
std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateCallback::get_next() {
return std::move(next_);
}
void Environment::NativeImmediateCallback::set_next(
std::unique_ptr<NativeImmediateCallback> next) {
next_ = std::move(next);
}
template <typename Fn>
Environment::NativeImmediateCallbackImpl<Fn>::NativeImmediateCallbackImpl(
Fn&& callback, v8::Global<v8::Object>&& keep_alive, bool refed)
: NativeImmediateCallback(refed),
callback_(std::move(callback)),
keep_alive_(std::move(keep_alive)) {}
template <typename Fn>
void Environment::NativeImmediateCallbackImpl<Fn>::Call(Environment* env) {
callback_(env);
} }
inline bool Environment::can_call_into_js() const { inline bool Environment::can_call_into_js() const {

View File

@ -339,7 +339,7 @@ Environment::Environment(IsolateData* isolate_data,
[](void* arg) { [](void* arg) {
Environment* env = static_cast<Environment*>(arg); Environment* env = static_cast<Environment*>(arg);
if (!env->destroy_async_id_list()->empty()) if (!env->destroy_async_id_list()->empty())
AsyncWrap::DestroyAsyncIdsCallback(env, nullptr); AsyncWrap::DestroyAsyncIdsCallback(env);
}, },
this); this);
@ -642,42 +642,38 @@ void Environment::AtExit(void (*cb)(void* arg), void* arg) {
void Environment::RunAndClearNativeImmediates() { void Environment::RunAndClearNativeImmediates() {
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunAndClearNativeImmediates", this); "RunAndClearNativeImmediates", this);
size_t count = native_immediate_callbacks_.size();
if (count > 0) {
size_t ref_count = 0; size_t ref_count = 0;
std::vector<NativeImmediateCallback> list; size_t count = 0;
native_immediate_callbacks_.swap(list); std::unique_ptr<NativeImmediateCallback> head;
head.swap(native_immediate_callbacks_head_);
native_immediate_callbacks_tail_ = nullptr;
auto drain_list = [&]() { auto drain_list = [&]() {
TryCatchScope try_catch(this); TryCatchScope try_catch(this);
for (auto it = list.begin(); it != list.end(); ++it) { for (; head; head = head->get_next()) {
DebugSealHandleScope seal_handle_scope(isolate()); DebugSealHandleScope seal_handle_scope(isolate());
it->cb_(this, it->data_); count++;
if (it->refed_) if (head->is_refed())
ref_count++; ref_count++;
head->Call(this);
if (UNLIKELY(try_catch.HasCaught())) { if (UNLIKELY(try_catch.HasCaught())) {
if (!try_catch.HasTerminated()) if (!try_catch.HasTerminated())
errors::TriggerUncaughtException(isolate(), try_catch); errors::TriggerUncaughtException(isolate(), try_catch);
// We are done with the current callback. Increase the counter so that // We are done with the current callback. Move one iteration along,
// the steps below make everything *after* the current item part of // as if we had completed successfully.
// the new list. head = head->get_next();
it++;
// Bail out, remove the already executed callbacks from list
// and set up a new TryCatch for the other pending callbacks.
std::move_backward(it, list.end(), list.begin() + (list.end() - it));
list.resize(list.end() - it);
return true; return true;
} }
} }
return false; return false;
}; };
while (drain_list()) {} while (head && drain_list()) {}
DCHECK_GE(immediate_info()->count(), count); DCHECK_GE(immediate_info()->count(), count);
immediate_info()->count_dec(count); immediate_info()->count_dec(count);
immediate_info()->ref_count_dec(ref_count); immediate_info()->ref_count_dec(ref_count);
}
} }

View File

@ -1175,15 +1175,15 @@ class Environment : public MemoryRetainer {
return current_value; return current_value;
} }
typedef void (*native_immediate_callback)(Environment* env, void* data); // cb will be called as cb(env) on the next event loop iteration.
// cb will be called as cb(env, data) on the next event loop iteration. // keep_alive will be kept alive between now and after the callback has run.
// obj will be kept alive between now and after the callback has run. template <typename Fn>
inline void SetImmediate(native_immediate_callback cb, inline void SetImmediate(Fn&& cb,
void* data, v8::Local<v8::Object> keep_alive =
v8::Local<v8::Object> obj = v8::Local<v8::Object>()); v8::Local<v8::Object>());
inline void SetUnrefImmediate(native_immediate_callback cb, template <typename Fn>
void* data, inline void SetUnrefImmediate(Fn&& cb,
v8::Local<v8::Object> obj = v8::Local<v8::Object> keep_alive =
v8::Local<v8::Object>()); v8::Local<v8::Object>());
// This needs to be available for the JS-land setImmediate(). // This needs to be available for the JS-land setImmediate().
void ToggleImmediateRef(bool ref); void ToggleImmediateRef(bool ref);
@ -1248,9 +1248,9 @@ class Environment : public MemoryRetainer {
#endif // HAVE_INSPECTOR #endif // HAVE_INSPECTOR
private: private:
inline void CreateImmediate(native_immediate_callback cb, template <typename Fn>
void* data, inline void CreateImmediate(Fn&& cb,
v8::Local<v8::Object> obj, v8::Local<v8::Object> keep_alive,
bool ref); bool ref);
inline void ThrowError(v8::Local<v8::Value> (*fun)(v8::Local<v8::String>), inline void ThrowError(v8::Local<v8::Value> (*fun)(v8::Local<v8::String>),
@ -1374,13 +1374,38 @@ class Environment : public MemoryRetainer {
std::list<ExitCallback> at_exit_functions_; std::list<ExitCallback> at_exit_functions_;
struct NativeImmediateCallback { class NativeImmediateCallback {
native_immediate_callback cb_; public:
void* data_; explicit inline NativeImmediateCallback(bool refed);
v8::Global<v8::Object> keep_alive_;
virtual ~NativeImmediateCallback() = default;
virtual void Call(Environment* env) = 0;
inline bool is_refed() const;
inline std::unique_ptr<NativeImmediateCallback> get_next();
inline void set_next(std::unique_ptr<NativeImmediateCallback> next);
private:
bool refed_; bool refed_;
std::unique_ptr<NativeImmediateCallback> next_;
}; };
std::vector<NativeImmediateCallback> native_immediate_callbacks_;
template <typename Fn>
class NativeImmediateCallbackImpl final : public NativeImmediateCallback {
public:
NativeImmediateCallbackImpl(Fn&& callback,
v8::Global<v8::Object>&& keep_alive,
bool refed);
void Call(Environment* env) override;
private:
Fn callback_;
v8::Global<v8::Object> keep_alive_;
};
std::unique_ptr<NativeImmediateCallback> native_immediate_callbacks_head_;
NativeImmediateCallback* native_immediate_callbacks_tail_ = nullptr;
void RunAndClearNativeImmediates(); void RunAndClearNativeImmediates();
static void CheckImmediate(uv_check_t* handle); static void CheckImmediate(uv_check_t* handle);

View File

@ -36,13 +36,16 @@ class BufferFinalizer : private Finalizer {
public: public:
// node::Buffer::FreeCallback // node::Buffer::FreeCallback
static void FinalizeBufferCallback(char* data, void* hint) { static void FinalizeBufferCallback(char* data, void* hint) {
BufferFinalizer* finalizer = static_cast<BufferFinalizer*>(hint); std::unique_ptr<BufferFinalizer, Deleter> finalizer{
static_cast<BufferFinalizer*>(hint)};
finalizer->_finalize_data = data; finalizer->_finalize_data = data;
static_cast<node_napi_env>(finalizer->_env)->node_env()
->SetImmediate([](node::Environment* env, void* hint) {
BufferFinalizer* finalizer = static_cast<BufferFinalizer*>(hint);
if (finalizer->_finalize_callback != nullptr) { node::Environment* node_env =
static_cast<node_napi_env>(finalizer->_env)->node_env();
node_env->SetImmediate(
[finalizer = std::move(finalizer)](node::Environment* env) {
if (finalizer->_finalize_callback == nullptr) return;
v8::HandleScope handle_scope(finalizer->_env->isolate); v8::HandleScope handle_scope(finalizer->_env->isolate);
v8::Context::Scope context_scope(finalizer->_env->context()); v8::Context::Scope context_scope(finalizer->_env->context());
@ -52,11 +55,14 @@ class BufferFinalizer : private Finalizer {
finalizer->_finalize_data, finalizer->_finalize_data,
finalizer->_finalize_hint); finalizer->_finalize_hint);
}); });
});
} }
Delete(finalizer); struct Deleter {
}, hint); void operator()(BufferFinalizer* finalizer) {
Finalizer::Delete(finalizer);
} }
};
}; };
static inline napi_env NewEnv(v8::Local<v8::Context> context) { static inline napi_env NewEnv(v8::Local<v8::Context> context) {

View File

@ -170,35 +170,33 @@ inline void FileHandle::Close() {
struct err_detail { int ret; int fd; }; struct err_detail { int ret; int fd; };
err_detail* detail = new err_detail { ret, fd_ }; err_detail detail { ret, fd_ };
if (ret < 0) { if (ret < 0) {
// Do not unref this // Do not unref this
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([detail](Environment* env) {
char msg[70]; char msg[70];
std::unique_ptr<err_detail> detail(static_cast<err_detail*>(data));
snprintf(msg, arraysize(msg), snprintf(msg, arraysize(msg),
"Closing file descriptor %d on garbage collection failed", "Closing file descriptor %d on garbage collection failed",
detail->fd); detail.fd);
// This exception will end up being fatal for the process because // This exception will end up being fatal for the process because
// it is being thrown from within the SetImmediate handler and // it is being thrown from within the SetImmediate handler and
// there is no JS stack to bubble it to. In other words, tearing // there is no JS stack to bubble it to. In other words, tearing
// down the process is the only reasonable thing we can do here. // down the process is the only reasonable thing we can do here.
HandleScope handle_scope(env->isolate()); HandleScope handle_scope(env->isolate());
env->ThrowUVException(detail->ret, "close", msg); env->ThrowUVException(detail.ret, "close", msg);
}, detail); });
return; return;
} }
// If the close was successful, we still want to emit a process warning // If the close was successful, we still want to emit a process warning
// to notify that the file descriptor was gc'd. We want to be noisy about // to notify that the file descriptor was gc'd. We want to be noisy about
// this because not explicitly closing the FileHandle is a bug. // this because not explicitly closing the FileHandle is a bug.
env()->SetUnrefImmediate([](Environment* env, void* data) { env()->SetUnrefImmediate([detail](Environment* env) {
std::unique_ptr<err_detail> detail(static_cast<err_detail*>(data));
ProcessEmitWarning(env, ProcessEmitWarning(env,
"Closing file descriptor %d on garbage collection", "Closing file descriptor %d on garbage collection",
detail->fd); detail.fd);
}, detail); });
} }
void FileHandle::CloseReq::Resolve() { void FileHandle::CloseReq::Resolve() {

View File

@ -663,12 +663,9 @@ inline bool HasHttp2Observer(Environment* env) {
void Http2Stream::EmitStatistics() { void Http2Stream::EmitStatistics() {
if (!HasHttp2Observer(env())) if (!HasHttp2Observer(env()))
return; return;
Http2StreamPerformanceEntry* entry = auto entry =
new Http2StreamPerformanceEntry(env(), id_, statistics_); std::make_unique<Http2StreamPerformanceEntry>(env(), id_, statistics_);
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([entry = move(entry)](Environment* env) {
// This takes ownership, the entry is destroyed at the end of this scope.
std::unique_ptr<Http2StreamPerformanceEntry> entry {
static_cast<Http2StreamPerformanceEntry*>(data) };
if (!HasHttp2Observer(env)) if (!HasHttp2Observer(env))
return; return;
HandleScope handle_scope(env->isolate()); HandleScope handle_scope(env->isolate());
@ -696,18 +693,15 @@ void Http2Stream::EmitStatistics() {
buffer[IDX_STREAM_STATS_RECEIVEDBYTES] = entry->received_bytes(); buffer[IDX_STREAM_STATS_RECEIVEDBYTES] = entry->received_bytes();
Local<Object> obj; Local<Object> obj;
if (entry->ToObject().ToLocal(&obj)) entry->Notify(obj); if (entry->ToObject().ToLocal(&obj)) entry->Notify(obj);
}, static_cast<void*>(entry)); });
} }
void Http2Session::EmitStatistics() { void Http2Session::EmitStatistics() {
if (!HasHttp2Observer(env())) if (!HasHttp2Observer(env()))
return; return;
Http2SessionPerformanceEntry* entry = auto entry = std::make_unique<Http2SessionPerformanceEntry>(
new Http2SessionPerformanceEntry(env(), statistics_, session_type_); env(), statistics_, session_type_);
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([entry = std::move(entry)](Environment* env) {
// This takes ownership, the entr is destroyed at the end of this scope.
std::unique_ptr<Http2SessionPerformanceEntry> entry {
static_cast<Http2SessionPerformanceEntry*>(data) };
if (!HasHttp2Observer(env)) if (!HasHttp2Observer(env))
return; return;
HandleScope handle_scope(env->isolate()); HandleScope handle_scope(env->isolate());
@ -725,7 +719,7 @@ void Http2Session::EmitStatistics() {
entry->max_concurrent_streams(); entry->max_concurrent_streams();
Local<Object> obj; Local<Object> obj;
if (entry->ToObject().ToLocal(&obj)) entry->Notify(obj); if (entry->ToObject().ToLocal(&obj)) entry->Notify(obj);
}, static_cast<void*>(entry)); });
} }
// Closes the session and frees the associated resources // Closes the session and frees the associated resources
@ -760,11 +754,9 @@ void Http2Session::Close(uint32_t code, bool socket_closed) {
while (std::unique_ptr<Http2Ping> ping = PopPing()) { while (std::unique_ptr<Http2Ping> ping = PopPing()) {
ping->DetachFromSession(); ping->DetachFromSession();
env()->SetImmediate( env()->SetImmediate(
[](Environment* env, void* data) { [ping = std::move(ping)](Environment* env) {
std::unique_ptr<Http2Ping> ping{static_cast<Http2Ping*>(data)};
ping->Done(false); ping->Done(false);
}, });
static_cast<void*>(ping.release()));
} }
statistics_.end_time = uv_hrtime(); statistics_.end_time = uv_hrtime();
@ -1532,10 +1524,8 @@ void Http2Session::MaybeScheduleWrite() {
HandleScope handle_scope(env()->isolate()); HandleScope handle_scope(env()->isolate());
Debug(this, "scheduling write"); Debug(this, "scheduling write");
flags_ |= SESSION_STATE_WRITE_SCHEDULED; flags_ |= SESSION_STATE_WRITE_SCHEDULED;
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([this](Environment* env) {
Http2Session* session = static_cast<Http2Session*>(data); if (session_ == nullptr || !(flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
if (session->session_ == nullptr ||
!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
// This can happen e.g. when a stream was reset before this turn // This can happen e.g. when a stream was reset before this turn
// of the event loop, in which case SendPendingData() is called early, // of the event loop, in which case SendPendingData() is called early,
// or the session was destroyed in the meantime. // or the session was destroyed in the meantime.
@ -1545,9 +1535,9 @@ void Http2Session::MaybeScheduleWrite() {
// Sending data may call arbitrary JS code, so keep track of // Sending data may call arbitrary JS code, so keep track of
// async context. // async context.
HandleScope handle_scope(env->isolate()); HandleScope handle_scope(env->isolate());
InternalCallbackScope callback_scope(session); InternalCallbackScope callback_scope(this);
session->SendPendingData(); SendPendingData();
}, static_cast<void*>(this), object()); }, object());
} }
} }
@ -1975,25 +1965,23 @@ void Http2Stream::Destroy() {
// Wait until the start of the next loop to delete because there // Wait until the start of the next loop to delete because there
// may still be some pending operations queued for this stream. // may still be some pending operations queued for this stream.
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([this](Environment* env) {
Http2Stream* stream = static_cast<Http2Stream*>(data);
// Free any remaining outgoing data chunks here. This should be done // Free any remaining outgoing data chunks here. This should be done
// here because it's possible for destroy to have been called while // here because it's possible for destroy to have been called while
// we still have queued outbound writes. // we still have queued outbound writes.
while (!stream->queue_.empty()) { while (!queue_.empty()) {
nghttp2_stream_write& head = stream->queue_.front(); nghttp2_stream_write& head = queue_.front();
if (head.req_wrap != nullptr) if (head.req_wrap != nullptr)
head.req_wrap->Done(UV_ECANCELED); head.req_wrap->Done(UV_ECANCELED);
stream->queue_.pop(); queue_.pop();
} }
// We can destroy the stream now if there are no writes for it // We can destroy the stream now if there are no writes for it
// already on the socket. Otherwise, we'll wait for the garbage collector // already on the socket. Otherwise, we'll wait for the garbage collector
// to take care of cleaning up. // to take care of cleaning up.
if (stream->session() == nullptr || if (session() == nullptr || !session()->HasWritesOnSocketForStream(this))
!stream->session()->HasWritesOnSocketForStream(stream)) delete this;
delete stream; }, object());
}, this, this->object());
statistics_.end_time = uv_hrtime(); statistics_.end_time = uv_hrtime();
session_->statistics_.stream_average_duration = session_->statistics_.stream_average_duration =

View File

@ -229,9 +229,8 @@ void SetupPerformanceObservers(const FunctionCallbackInfo<Value>& args) {
} }
// Creates a GC Performance Entry and passes it to observers // Creates a GC Performance Entry and passes it to observers
void PerformanceGCCallback(Environment* env, void* ptr) { void PerformanceGCCallback(Environment* env,
std::unique_ptr<GCPerformanceEntry> entry{ std::unique_ptr<GCPerformanceEntry> entry) {
static_cast<GCPerformanceEntry*>(ptr)};
HandleScope scope(env->isolate()); HandleScope scope(env->isolate());
Local<Context> context = env->context(); Local<Context> context = env->context();
@ -268,13 +267,14 @@ void MarkGarbageCollectionEnd(Isolate* isolate,
// If no one is listening to gc performance entries, do not create them. // If no one is listening to gc performance entries, do not create them.
if (!state->observers[NODE_PERFORMANCE_ENTRY_TYPE_GC]) if (!state->observers[NODE_PERFORMANCE_ENTRY_TYPE_GC])
return; return;
GCPerformanceEntry* entry = auto entry = std::make_unique<GCPerformanceEntry>(
new GCPerformanceEntry(env, env,
static_cast<PerformanceGCKind>(type), static_cast<PerformanceGCKind>(type),
state->performance_last_gc_start_mark, state->performance_last_gc_start_mark,
PERFORMANCE_NOW()); PERFORMANCE_NOW());
env->SetUnrefImmediate(PerformanceGCCallback, env->SetUnrefImmediate([entry = std::move(entry)](Environment* env) mutable {
entry); PerformanceGCCallback(env, std::move(entry));
});
} }
static void SetupGarbageCollectionTracking( static void SetupGarbageCollectionTracking(

View File

@ -71,18 +71,16 @@ void StreamPipe::Unpipe() {
// Delay the JS-facing part with SetImmediate, because this might be from // Delay the JS-facing part with SetImmediate, because this might be from
// inside the garbage collector, so we cant run JS here. // inside the garbage collector, so we cant run JS here.
HandleScope handle_scope(env()->isolate()); HandleScope handle_scope(env()->isolate());
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([this](Environment* env) {
StreamPipe* pipe = static_cast<StreamPipe*>(data);
HandleScope handle_scope(env->isolate()); HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context()); Context::Scope context_scope(env->context());
Local<Object> object = pipe->object(); Local<Object> object = this->object();
Local<Value> onunpipe; Local<Value> onunpipe;
if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe)) if (!object->Get(env->context(), env->onunpipe_string()).ToLocal(&onunpipe))
return; return;
if (onunpipe->IsFunction() && if (onunpipe->IsFunction() &&
pipe->MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) { MakeCallback(onunpipe.As<Function>(), 0, nullptr).IsEmpty()) {
return; return;
} }
@ -107,7 +105,7 @@ void StreamPipe::Unpipe() {
.IsNothing()) { .IsNothing()) {
return; return;
} }
}, static_cast<void*>(this), object()); }, object());
} }
uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) { uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {

View File

@ -316,9 +316,9 @@ void TLSWrap::EncOut() {
// its not clear if it is always correct. Not calling Done() could block // its not clear if it is always correct. Not calling Done() could block
// data flow, so for now continue to call Done(), just do it in the next // data flow, so for now continue to call Done(), just do it in the next
// tick. // tick.
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([this](Environment* env) {
static_cast<TLSWrap*>(data)->InvokeQueued(0); InvokeQueued(0);
}, this, object()); }, object());
} }
} }
return; return;
@ -349,9 +349,9 @@ void TLSWrap::EncOut() {
HandleScope handle_scope(env()->isolate()); HandleScope handle_scope(env()->isolate());
// Simulate asynchronous finishing, TLS cannot handle this at the moment. // Simulate asynchronous finishing, TLS cannot handle this at the moment.
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([this](Environment* env) {
static_cast<TLSWrap*>(data)->OnStreamAfterWrite(nullptr, 0); OnStreamAfterWrite(nullptr, 0);
}, this, object()); }, object());
} }
} }
@ -718,10 +718,9 @@ int TLSWrap::DoWrite(WriteWrap* w,
StreamWriteResult res = StreamWriteResult res =
underlying_stream()->Write(bufs, count, send_handle); underlying_stream()->Write(bufs, count, send_handle);
if (!res.async) { if (!res.async) {
env()->SetImmediate([](Environment* env, void* data) { env()->SetImmediate([this](Environment* env) {
TLSWrap* self = static_cast<TLSWrap*>(data); OnStreamAfterWrite(current_empty_write_, 0);
self->OnStreamAfterWrite(self->current_empty_write_, 0); }, object());
}, this, object());
} }
return 0; return 0;
} }