n-api: clean up thread-safe function
* Move class `TsFn` to name space `v8impl` and rename it to `ThreadSafeFunction` * Remove `NAPI_EXTERN` from API declarations, because it's only needed in the header file. PR-URL: https://github.com/nodejs/node/pull/22259 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Kyle Farnung <kfarnung@microsoft.com> Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
This commit is contained in:
parent
030ef35bf0
commit
403df7c8a1
687
src/node_api.cc
687
src/node_api.cc
@ -827,6 +827,336 @@ napi_status ConcludeDeferred(napi_env env,
|
|||||||
return GET_RETURN_STATUS(env);
|
return GET_RETURN_STATUS(env);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class ThreadSafeFunction : public node::AsyncResource {
|
||||||
|
public:
|
||||||
|
ThreadSafeFunction(v8::Local<v8::Function> func,
|
||||||
|
v8::Local<v8::Object> resource,
|
||||||
|
v8::Local<v8::String> name,
|
||||||
|
size_t thread_count_,
|
||||||
|
void* context_,
|
||||||
|
size_t max_queue_size_,
|
||||||
|
napi_env env_,
|
||||||
|
void* finalize_data_,
|
||||||
|
napi_finalize finalize_cb_,
|
||||||
|
napi_threadsafe_function_call_js call_js_cb_):
|
||||||
|
AsyncResource(env_->isolate,
|
||||||
|
resource,
|
||||||
|
*v8::String::Utf8Value(env_->isolate, name)),
|
||||||
|
thread_count(thread_count_),
|
||||||
|
is_closing(false),
|
||||||
|
context(context_),
|
||||||
|
max_queue_size(max_queue_size_),
|
||||||
|
env(env_),
|
||||||
|
finalize_data(finalize_data_),
|
||||||
|
finalize_cb(finalize_cb_),
|
||||||
|
call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
|
||||||
|
handles_closing(false) {
|
||||||
|
ref.Reset(env->isolate, func);
|
||||||
|
node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
~ThreadSafeFunction() {
|
||||||
|
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
// These methods can be called from any thread.
|
||||||
|
|
||||||
|
napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
|
||||||
|
node::Mutex::ScopedLock lock(this->mutex);
|
||||||
|
|
||||||
|
while (queue.size() >= max_queue_size &&
|
||||||
|
max_queue_size > 0 &&
|
||||||
|
!is_closing) {
|
||||||
|
if (mode == napi_tsfn_nonblocking) {
|
||||||
|
return napi_queue_full;
|
||||||
|
}
|
||||||
|
cond->Wait(lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_closing) {
|
||||||
|
if (thread_count == 0) {
|
||||||
|
return napi_invalid_arg;
|
||||||
|
} else {
|
||||||
|
thread_count--;
|
||||||
|
return napi_closing;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (uv_async_send(&async) != 0) {
|
||||||
|
return napi_generic_failure;
|
||||||
|
}
|
||||||
|
queue.push(data);
|
||||||
|
return napi_ok;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
napi_status Acquire() {
|
||||||
|
node::Mutex::ScopedLock lock(this->mutex);
|
||||||
|
|
||||||
|
if (is_closing) {
|
||||||
|
return napi_closing;
|
||||||
|
}
|
||||||
|
|
||||||
|
thread_count++;
|
||||||
|
|
||||||
|
return napi_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
napi_status Release(napi_threadsafe_function_release_mode mode) {
|
||||||
|
node::Mutex::ScopedLock lock(this->mutex);
|
||||||
|
|
||||||
|
if (thread_count == 0) {
|
||||||
|
return napi_invalid_arg;
|
||||||
|
}
|
||||||
|
|
||||||
|
thread_count--;
|
||||||
|
|
||||||
|
if (thread_count == 0 || mode == napi_tsfn_abort) {
|
||||||
|
if (!is_closing) {
|
||||||
|
is_closing = (mode == napi_tsfn_abort);
|
||||||
|
if (is_closing && max_queue_size > 0) {
|
||||||
|
cond->Signal(lock);
|
||||||
|
}
|
||||||
|
if (uv_async_send(&async) != 0) {
|
||||||
|
return napi_generic_failure;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return napi_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
void EmptyQueueAndDelete() {
|
||||||
|
for (; !queue.empty() ; queue.pop()) {
|
||||||
|
call_js_cb(nullptr, nullptr, context, queue.front());
|
||||||
|
}
|
||||||
|
delete this;
|
||||||
|
}
|
||||||
|
|
||||||
|
// These methods must only be called from the loop thread.
|
||||||
|
|
||||||
|
napi_status Init() {
|
||||||
|
ThreadSafeFunction* ts_fn = this;
|
||||||
|
|
||||||
|
if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
|
||||||
|
if (max_queue_size > 0) {
|
||||||
|
cond.reset(new node::ConditionVariable);
|
||||||
|
}
|
||||||
|
if ((max_queue_size == 0 || cond.get() != nullptr) &&
|
||||||
|
uv_idle_init(env->loop, &idle) == 0) {
|
||||||
|
return napi_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
node::Environment::GetCurrent(env->isolate)->CloseHandle(
|
||||||
|
reinterpret_cast<uv_handle_t*>(&async),
|
||||||
|
[](uv_handle_t* handle) -> void {
|
||||||
|
ThreadSafeFunction* ts_fn =
|
||||||
|
node::ContainerOf(&ThreadSafeFunction::async,
|
||||||
|
reinterpret_cast<uv_async_t*>(handle));
|
||||||
|
delete ts_fn;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prevent the thread-safe function from being deleted here, because
|
||||||
|
// the callback above will delete it.
|
||||||
|
ts_fn = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete ts_fn;
|
||||||
|
|
||||||
|
return napi_generic_failure;
|
||||||
|
}
|
||||||
|
|
||||||
|
napi_status Unref() {
|
||||||
|
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
|
||||||
|
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
|
||||||
|
|
||||||
|
return napi_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
napi_status Ref() {
|
||||||
|
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
|
||||||
|
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
|
||||||
|
|
||||||
|
return napi_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DispatchOne() {
|
||||||
|
void* data = nullptr;
|
||||||
|
bool popped_value = false;
|
||||||
|
bool idle_stop_failed = false;
|
||||||
|
|
||||||
|
{
|
||||||
|
node::Mutex::ScopedLock lock(this->mutex);
|
||||||
|
if (is_closing) {
|
||||||
|
CloseHandlesAndMaybeDelete();
|
||||||
|
} else {
|
||||||
|
size_t size = queue.size();
|
||||||
|
if (size > 0) {
|
||||||
|
data = queue.front();
|
||||||
|
queue.pop();
|
||||||
|
popped_value = true;
|
||||||
|
if (size == max_queue_size && max_queue_size > 0) {
|
||||||
|
cond->Signal(lock);
|
||||||
|
}
|
||||||
|
size--;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (size == 0) {
|
||||||
|
if (thread_count == 0) {
|
||||||
|
is_closing = true;
|
||||||
|
if (max_queue_size > 0) {
|
||||||
|
cond->Signal(lock);
|
||||||
|
}
|
||||||
|
CloseHandlesAndMaybeDelete();
|
||||||
|
} else {
|
||||||
|
if (uv_idle_stop(&idle) != 0) {
|
||||||
|
idle_stop_failed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (popped_value || idle_stop_failed) {
|
||||||
|
v8::HandleScope scope(env->isolate);
|
||||||
|
CallbackScope cb_scope(this);
|
||||||
|
|
||||||
|
if (idle_stop_failed) {
|
||||||
|
CHECK(napi_throw_error(env,
|
||||||
|
"ERR_NAPI_TSFN_STOP_IDLE_LOOP",
|
||||||
|
"Failed to stop the idle loop") == napi_ok);
|
||||||
|
} else {
|
||||||
|
v8::Local<v8::Function> js_cb =
|
||||||
|
v8::Local<v8::Function>::New(env->isolate, ref);
|
||||||
|
call_js_cb(env,
|
||||||
|
v8impl::JsValueFromV8LocalValue(js_cb),
|
||||||
|
context,
|
||||||
|
data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
node::Environment* NodeEnv() {
|
||||||
|
// For some reason grabbing the Node.js environment requires a handle scope.
|
||||||
|
v8::HandleScope scope(env->isolate);
|
||||||
|
return node::Environment::GetCurrent(env->isolate);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MaybeStartIdle() {
|
||||||
|
if (uv_idle_start(&idle, IdleCb) != 0) {
|
||||||
|
v8::HandleScope scope(env->isolate);
|
||||||
|
CallbackScope cb_scope(this);
|
||||||
|
CHECK(napi_throw_error(env,
|
||||||
|
"ERR_NAPI_TSFN_START_IDLE_LOOP",
|
||||||
|
"Failed to start the idle loop") == napi_ok);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Finalize() {
|
||||||
|
v8::HandleScope scope(env->isolate);
|
||||||
|
if (finalize_cb) {
|
||||||
|
CallbackScope cb_scope(this);
|
||||||
|
finalize_cb(env, finalize_data, context);
|
||||||
|
}
|
||||||
|
EmptyQueueAndDelete();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void* Context() {
|
||||||
|
return context;
|
||||||
|
}
|
||||||
|
|
||||||
|
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
|
||||||
|
if (set_closing) {
|
||||||
|
node::Mutex::ScopedLock lock(this->mutex);
|
||||||
|
is_closing = true;
|
||||||
|
if (max_queue_size > 0) {
|
||||||
|
cond->Signal(lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (handles_closing) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
handles_closing = true;
|
||||||
|
NodeEnv()->CloseHandle(
|
||||||
|
reinterpret_cast<uv_handle_t*>(&async),
|
||||||
|
[](uv_handle_t* handle) -> void {
|
||||||
|
ThreadSafeFunction* ts_fn =
|
||||||
|
node::ContainerOf(&ThreadSafeFunction::async,
|
||||||
|
reinterpret_cast<uv_async_t*>(handle));
|
||||||
|
ts_fn->NodeEnv()->CloseHandle(
|
||||||
|
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
|
||||||
|
[](uv_handle_t* handle) -> void {
|
||||||
|
ThreadSafeFunction* ts_fn =
|
||||||
|
node::ContainerOf(&ThreadSafeFunction::idle,
|
||||||
|
reinterpret_cast<uv_idle_t*>(handle));
|
||||||
|
ts_fn->Finalize();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
|
||||||
|
// without a call_js_cb_.
|
||||||
|
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
|
||||||
|
if (!(env == nullptr || cb == nullptr)) {
|
||||||
|
napi_value recv;
|
||||||
|
napi_status status;
|
||||||
|
|
||||||
|
status = napi_get_undefined(env, &recv);
|
||||||
|
if (status != napi_ok) {
|
||||||
|
napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
|
||||||
|
"Failed to retrieve undefined value");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
|
||||||
|
if (status != napi_ok && status != napi_pending_exception) {
|
||||||
|
napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
|
||||||
|
"Failed to call JS callback");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void IdleCb(uv_idle_t* idle) {
|
||||||
|
ThreadSafeFunction* ts_fn =
|
||||||
|
node::ContainerOf(&ThreadSafeFunction::idle, idle);
|
||||||
|
ts_fn->DispatchOne();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void AsyncCb(uv_async_t* async) {
|
||||||
|
ThreadSafeFunction* ts_fn =
|
||||||
|
node::ContainerOf(&ThreadSafeFunction::async, async);
|
||||||
|
ts_fn->MaybeStartIdle();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void Cleanup(void* data) {
|
||||||
|
reinterpret_cast<ThreadSafeFunction*>(data)
|
||||||
|
->CloseHandlesAndMaybeDelete(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// These are variables protected by the mutex.
|
||||||
|
node::Mutex mutex;
|
||||||
|
std::unique_ptr<node::ConditionVariable> cond;
|
||||||
|
std::queue<void*> queue;
|
||||||
|
uv_async_t async;
|
||||||
|
uv_idle_t idle;
|
||||||
|
size_t thread_count;
|
||||||
|
bool is_closing;
|
||||||
|
|
||||||
|
// These are variables set once, upon creation, and then never again, which
|
||||||
|
// means we don't need the mutex to read them.
|
||||||
|
void* context;
|
||||||
|
size_t max_queue_size;
|
||||||
|
|
||||||
|
// These are variables accessed only from the loop thread.
|
||||||
|
node::Persistent<v8::Function> ref;
|
||||||
|
napi_env env;
|
||||||
|
void* finalize_data;
|
||||||
|
napi_finalize finalize_cb;
|
||||||
|
napi_threadsafe_function_call_js call_js_cb;
|
||||||
|
bool handles_closing;
|
||||||
|
};
|
||||||
|
|
||||||
} // end of namespace v8impl
|
} // end of namespace v8impl
|
||||||
|
|
||||||
// Intercepts the Node-V8 module registration callback. Converts parameters
|
// Intercepts the Node-V8 module registration callback. Converts parameters
|
||||||
@ -3705,334 +4035,7 @@ napi_status napi_run_script(napi_env env,
|
|||||||
return GET_RETURN_STATUS(env);
|
return GET_RETURN_STATUS(env);
|
||||||
}
|
}
|
||||||
|
|
||||||
class TsFn: public node::AsyncResource {
|
napi_status
|
||||||
public:
|
|
||||||
TsFn(v8::Local<v8::Function> func,
|
|
||||||
v8::Local<v8::Object> resource,
|
|
||||||
v8::Local<v8::String> name,
|
|
||||||
size_t thread_count_,
|
|
||||||
void* context_,
|
|
||||||
size_t max_queue_size_,
|
|
||||||
napi_env env_,
|
|
||||||
void* finalize_data_,
|
|
||||||
napi_finalize finalize_cb_,
|
|
||||||
napi_threadsafe_function_call_js call_js_cb_):
|
|
||||||
AsyncResource(env_->isolate,
|
|
||||||
resource,
|
|
||||||
*v8::String::Utf8Value(env_->isolate, name)),
|
|
||||||
thread_count(thread_count_),
|
|
||||||
is_closing(false),
|
|
||||||
context(context_),
|
|
||||||
max_queue_size(max_queue_size_),
|
|
||||||
env(env_),
|
|
||||||
finalize_data(finalize_data_),
|
|
||||||
finalize_cb(finalize_cb_),
|
|
||||||
call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
|
|
||||||
handles_closing(false) {
|
|
||||||
ref.Reset(env->isolate, func);
|
|
||||||
node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
~TsFn() {
|
|
||||||
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
// These methods can be called from any thread.
|
|
||||||
|
|
||||||
napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
|
|
||||||
node::Mutex::ScopedLock lock(this->mutex);
|
|
||||||
|
|
||||||
while (queue.size() >= max_queue_size &&
|
|
||||||
max_queue_size > 0 &&
|
|
||||||
!is_closing) {
|
|
||||||
if (mode == napi_tsfn_nonblocking) {
|
|
||||||
return napi_queue_full;
|
|
||||||
}
|
|
||||||
cond->Wait(lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (is_closing) {
|
|
||||||
if (thread_count == 0) {
|
|
||||||
return napi_invalid_arg;
|
|
||||||
} else {
|
|
||||||
thread_count--;
|
|
||||||
return napi_closing;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (uv_async_send(&async) != 0) {
|
|
||||||
return napi_generic_failure;
|
|
||||||
}
|
|
||||||
queue.push(data);
|
|
||||||
return napi_ok;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
napi_status Acquire() {
|
|
||||||
node::Mutex::ScopedLock lock(this->mutex);
|
|
||||||
|
|
||||||
if (is_closing) {
|
|
||||||
return napi_closing;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_count++;
|
|
||||||
|
|
||||||
return napi_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
napi_status Release(napi_threadsafe_function_release_mode mode) {
|
|
||||||
node::Mutex::ScopedLock lock(this->mutex);
|
|
||||||
|
|
||||||
if (thread_count == 0) {
|
|
||||||
return napi_invalid_arg;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_count--;
|
|
||||||
|
|
||||||
if (thread_count == 0 || mode == napi_tsfn_abort) {
|
|
||||||
if (!is_closing) {
|
|
||||||
is_closing = (mode == napi_tsfn_abort);
|
|
||||||
if (is_closing && max_queue_size > 0) {
|
|
||||||
cond->Signal(lock);
|
|
||||||
}
|
|
||||||
if (uv_async_send(&async) != 0) {
|
|
||||||
return napi_generic_failure;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return napi_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
void EmptyQueueAndDelete() {
|
|
||||||
for (; !queue.empty() ; queue.pop()) {
|
|
||||||
call_js_cb(nullptr, nullptr, context, queue.front());
|
|
||||||
}
|
|
||||||
delete this;
|
|
||||||
}
|
|
||||||
|
|
||||||
// These methods must only be called from the loop thread.
|
|
||||||
|
|
||||||
napi_status Init() {
|
|
||||||
TsFn* ts_fn = this;
|
|
||||||
|
|
||||||
if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
|
|
||||||
if (max_queue_size > 0) {
|
|
||||||
cond.reset(new node::ConditionVariable);
|
|
||||||
}
|
|
||||||
if ((max_queue_size == 0 || cond.get() != nullptr) &&
|
|
||||||
uv_idle_init(env->loop, &idle) == 0) {
|
|
||||||
return napi_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
node::Environment::GetCurrent(env->isolate)->CloseHandle(
|
|
||||||
reinterpret_cast<uv_handle_t*>(&async),
|
|
||||||
[] (uv_handle_t* handle) -> void {
|
|
||||||
TsFn* ts_fn =
|
|
||||||
node::ContainerOf(&TsFn::async,
|
|
||||||
reinterpret_cast<uv_async_t*>(handle));
|
|
||||||
delete ts_fn;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Prevent the thread-safe function from being deleted here, because
|
|
||||||
// the callback above will delete it.
|
|
||||||
ts_fn = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
delete ts_fn;
|
|
||||||
|
|
||||||
return napi_generic_failure;
|
|
||||||
}
|
|
||||||
|
|
||||||
napi_status Unref() {
|
|
||||||
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
|
|
||||||
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
|
|
||||||
|
|
||||||
return napi_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
napi_status Ref() {
|
|
||||||
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
|
|
||||||
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
|
|
||||||
|
|
||||||
return napi_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
void DispatchOne() {
|
|
||||||
void* data = nullptr;
|
|
||||||
bool popped_value = false;
|
|
||||||
bool idle_stop_failed = false;
|
|
||||||
|
|
||||||
{
|
|
||||||
node::Mutex::ScopedLock lock(this->mutex);
|
|
||||||
if (is_closing) {
|
|
||||||
CloseHandlesAndMaybeDelete();
|
|
||||||
} else {
|
|
||||||
size_t size = queue.size();
|
|
||||||
if (size > 0) {
|
|
||||||
data = queue.front();
|
|
||||||
queue.pop();
|
|
||||||
popped_value = true;
|
|
||||||
if (size == max_queue_size && max_queue_size > 0) {
|
|
||||||
cond->Signal(lock);
|
|
||||||
}
|
|
||||||
size--;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (size == 0) {
|
|
||||||
if (thread_count == 0) {
|
|
||||||
is_closing = true;
|
|
||||||
if (max_queue_size > 0) {
|
|
||||||
cond->Signal(lock);
|
|
||||||
}
|
|
||||||
CloseHandlesAndMaybeDelete();
|
|
||||||
} else {
|
|
||||||
if (uv_idle_stop(&idle) != 0) {
|
|
||||||
idle_stop_failed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (popped_value || idle_stop_failed) {
|
|
||||||
v8::HandleScope scope(env->isolate);
|
|
||||||
CallbackScope cb_scope(this);
|
|
||||||
|
|
||||||
if (idle_stop_failed) {
|
|
||||||
CHECK(napi_throw_error(env,
|
|
||||||
"ERR_NAPI_TSFN_STOP_IDLE_LOOP",
|
|
||||||
"Failed to stop the idle loop") == napi_ok);
|
|
||||||
} else {
|
|
||||||
v8::Local<v8::Function> js_cb =
|
|
||||||
v8::Local<v8::Function>::New(env->isolate, ref);
|
|
||||||
call_js_cb(env,
|
|
||||||
v8impl::JsValueFromV8LocalValue(js_cb),
|
|
||||||
context,
|
|
||||||
data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
node::Environment* NodeEnv() {
|
|
||||||
// For some reason grabbing the Node.js environment requires a handle scope.
|
|
||||||
v8::HandleScope scope(env->isolate);
|
|
||||||
return node::Environment::GetCurrent(env->isolate);
|
|
||||||
}
|
|
||||||
|
|
||||||
void MaybeStartIdle() {
|
|
||||||
if (uv_idle_start(&idle, IdleCb) != 0) {
|
|
||||||
v8::HandleScope scope(env->isolate);
|
|
||||||
CallbackScope cb_scope(this);
|
|
||||||
CHECK(napi_throw_error(env,
|
|
||||||
"ERR_NAPI_TSFN_START_IDLE_LOOP",
|
|
||||||
"Failed to start the idle loop") == napi_ok);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void Finalize() {
|
|
||||||
v8::HandleScope scope(env->isolate);
|
|
||||||
if (finalize_cb) {
|
|
||||||
CallbackScope cb_scope(this);
|
|
||||||
finalize_cb(env, finalize_data, context);
|
|
||||||
}
|
|
||||||
EmptyQueueAndDelete();
|
|
||||||
}
|
|
||||||
|
|
||||||
inline void* Context() {
|
|
||||||
return context;
|
|
||||||
}
|
|
||||||
|
|
||||||
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
|
|
||||||
if (set_closing) {
|
|
||||||
node::Mutex::ScopedLock lock(this->mutex);
|
|
||||||
is_closing = true;
|
|
||||||
if (max_queue_size > 0) {
|
|
||||||
cond->Signal(lock);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (handles_closing) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
handles_closing = true;
|
|
||||||
NodeEnv()->CloseHandle(
|
|
||||||
reinterpret_cast<uv_handle_t*>(&async),
|
|
||||||
[] (uv_handle_t* handle) -> void {
|
|
||||||
TsFn* ts_fn = node::ContainerOf(&TsFn::async,
|
|
||||||
reinterpret_cast<uv_async_t*>(handle));
|
|
||||||
ts_fn->NodeEnv()->CloseHandle(
|
|
||||||
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
|
|
||||||
[] (uv_handle_t* handle) -> void {
|
|
||||||
TsFn* ts_fn = node::ContainerOf(&TsFn::idle,
|
|
||||||
reinterpret_cast<uv_idle_t*>(handle));
|
|
||||||
ts_fn->Finalize();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Default way of calling into JavaScript. Used when TsFn is constructed
|
|
||||||
// without a call_js_cb_.
|
|
||||||
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
|
|
||||||
if (!(env == nullptr || cb == nullptr)) {
|
|
||||||
napi_value recv;
|
|
||||||
napi_status status;
|
|
||||||
|
|
||||||
status = napi_get_undefined(env, &recv);
|
|
||||||
if (status != napi_ok) {
|
|
||||||
napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
|
|
||||||
"Failed to retrieve undefined value");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
|
|
||||||
if (status != napi_ok && status != napi_pending_exception) {
|
|
||||||
napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
|
|
||||||
"Failed to call JS callback");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void IdleCb(uv_idle_t* idle) {
|
|
||||||
TsFn* ts_fn =
|
|
||||||
node::ContainerOf(&TsFn::idle, idle);
|
|
||||||
ts_fn->DispatchOne();
|
|
||||||
}
|
|
||||||
|
|
||||||
static void AsyncCb(uv_async_t* async) {
|
|
||||||
TsFn* ts_fn =
|
|
||||||
node::ContainerOf(&TsFn::async, async);
|
|
||||||
ts_fn->MaybeStartIdle();
|
|
||||||
}
|
|
||||||
|
|
||||||
static void Cleanup(void* data) {
|
|
||||||
reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
// These are variables protected by the mutex.
|
|
||||||
node::Mutex mutex;
|
|
||||||
std::unique_ptr<node::ConditionVariable> cond;
|
|
||||||
std::queue<void*> queue;
|
|
||||||
uv_async_t async;
|
|
||||||
uv_idle_t idle;
|
|
||||||
size_t thread_count;
|
|
||||||
bool is_closing;
|
|
||||||
|
|
||||||
// These are variables set once, upon creation, and then never again, which
|
|
||||||
// means we don't need the mutex to read them.
|
|
||||||
void* context;
|
|
||||||
size_t max_queue_size;
|
|
||||||
|
|
||||||
// These are variables accessed only from the loop thread.
|
|
||||||
node::Persistent<v8::Function> ref;
|
|
||||||
napi_env env;
|
|
||||||
void* finalize_data;
|
|
||||||
napi_finalize finalize_cb;
|
|
||||||
napi_threadsafe_function_call_js call_js_cb;
|
|
||||||
bool handles_closing;
|
|
||||||
};
|
|
||||||
|
|
||||||
NAPI_EXTERN napi_status
|
|
||||||
napi_create_threadsafe_function(napi_env env,
|
napi_create_threadsafe_function(napi_env env,
|
||||||
napi_value func,
|
napi_value func,
|
||||||
napi_value async_resource,
|
napi_value async_resource,
|
||||||
@ -4067,7 +4070,8 @@ napi_create_threadsafe_function(napi_env env,
|
|||||||
v8::Local<v8::String> v8_name;
|
v8::Local<v8::String> v8_name;
|
||||||
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name);
|
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name);
|
||||||
|
|
||||||
TsFn* ts_fn = new TsFn(v8_func,
|
v8impl::ThreadSafeFunction* ts_fn =
|
||||||
|
new v8impl::ThreadSafeFunction(v8_func,
|
||||||
v8_resource,
|
v8_resource,
|
||||||
v8_name,
|
v8_name,
|
||||||
initial_thread_count,
|
initial_thread_count,
|
||||||
@ -4091,45 +4095,46 @@ napi_create_threadsafe_function(napi_env env,
|
|||||||
return napi_set_last_error(env, status);
|
return napi_set_last_error(env, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
NAPI_EXTERN napi_status
|
napi_status
|
||||||
napi_get_threadsafe_function_context(napi_threadsafe_function func,
|
napi_get_threadsafe_function_context(napi_threadsafe_function func,
|
||||||
void** result) {
|
void** result) {
|
||||||
CHECK(func != nullptr);
|
CHECK(func != nullptr);
|
||||||
CHECK(result != nullptr);
|
CHECK(result != nullptr);
|
||||||
|
|
||||||
*result = reinterpret_cast<TsFn*>(func)->Context();
|
*result = reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Context();
|
||||||
return napi_ok;
|
return napi_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
NAPI_EXTERN napi_status
|
napi_status
|
||||||
napi_call_threadsafe_function(napi_threadsafe_function func,
|
napi_call_threadsafe_function(napi_threadsafe_function func,
|
||||||
void* data,
|
void* data,
|
||||||
napi_threadsafe_function_call_mode is_blocking) {
|
napi_threadsafe_function_call_mode is_blocking) {
|
||||||
CHECK(func != nullptr);
|
CHECK(func != nullptr);
|
||||||
return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking);
|
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Push(data,
|
||||||
|
is_blocking);
|
||||||
}
|
}
|
||||||
|
|
||||||
NAPI_EXTERN napi_status
|
napi_status
|
||||||
napi_acquire_threadsafe_function(napi_threadsafe_function func) {
|
napi_acquire_threadsafe_function(napi_threadsafe_function func) {
|
||||||
CHECK(func != nullptr);
|
CHECK(func != nullptr);
|
||||||
return reinterpret_cast<TsFn*>(func)->Acquire();
|
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Acquire();
|
||||||
}
|
}
|
||||||
|
|
||||||
NAPI_EXTERN napi_status
|
napi_status
|
||||||
napi_release_threadsafe_function(napi_threadsafe_function func,
|
napi_release_threadsafe_function(napi_threadsafe_function func,
|
||||||
napi_threadsafe_function_release_mode mode) {
|
napi_threadsafe_function_release_mode mode) {
|
||||||
CHECK(func != nullptr);
|
CHECK(func != nullptr);
|
||||||
return reinterpret_cast<TsFn*>(func)->Release(mode);
|
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Release(mode);
|
||||||
}
|
}
|
||||||
|
|
||||||
NAPI_EXTERN napi_status
|
napi_status
|
||||||
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
|
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
|
||||||
CHECK(func != nullptr);
|
CHECK(func != nullptr);
|
||||||
return reinterpret_cast<TsFn*>(func)->Unref();
|
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Unref();
|
||||||
}
|
}
|
||||||
|
|
||||||
NAPI_EXTERN napi_status
|
napi_status
|
||||||
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
|
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
|
||||||
CHECK(func != nullptr);
|
CHECK(func != nullptr);
|
||||||
return reinterpret_cast<TsFn*>(func)->Ref();
|
return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Ref();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user