n-api: add API for asynchronous functions

Bundle a `uv_async_t`, a `uv_idle_t`, a `uv_mutex_t`, a `uv_cond_t`,
and a `v8::Persistent<v8::Function>` to make it possible to call into JS
from another thread. The API accepts a void data pointer and a callback
which will be invoked on the loop thread and which will receive the
`napi_value` representing the JavaScript function to call so as to
perform the call into JS. The callback is run inside a
`node::CallbackScope`.

A `std::queue<void*>` is used to store calls from the secondary
threads, and an idle loop is started by the `uv_async_t` callback on the
loop thread to drain the queue, calling into JS with each item.

Items can be added to the queue blockingly or non-blockingly.

The thread-safe function can be referenced or unreferenced, with the
same semantics as libuv handles.

Re: https://github.com/nodejs/help/issues/1035
Re: https://github.com/nodejs/node/issues/20964
Fixes: https://github.com/nodejs/node/issues/13512
PR-URL: https://github.com/nodejs/node/pull/17887
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
This commit is contained in:
Gabriel Schulhof 2018-06-17 11:52:49 -04:00
parent 1c30343984
commit 81f06ba7e4
8 changed files with 1329 additions and 4 deletions

View File

@ -1376,6 +1376,31 @@ multiple of the element size.
While calling `napi_create_typedarray()`, `(length * size_of_element) +
byte_offset` was larger than the length of given `buffer`.
<a id="ERR_NAPI_TSFN_CALL_JS"></a>
### ERR_NAPI_TSFN_CALL_JS
An error occurred while invoking the JavaScript portion of the thread-safe
function.
<a id="ERR_NAPI_TSFN_GET_UNDEFINED"></a>
### ERR_NAPI_TSFN_GET_UNDEFINED
An error occurred while attempting to retrieve the JavaScript `undefined`
value.
<a id="ERR_NAPI_TSFN_START_IDLE_LOOP"></a>
### ERR_NAPI_TSFN_START_IDLE_LOOP
On the main thread, values are removed from the queue associated with the
thread-safe function in an idle loop. This error indicates that an error
has occurred when attemping to start the loop.
<a id="ERR_NAPI_TSFN_STOP_IDLE_LOOP"></a>
### ERR_NAPI_TSFN_STOP_IDLE_LOOP
Once no more items are left in the queue, the idle loop must be suspended. This
error indicates that the idle loop has failed to stop.
<a id="ERR_NO_CRYPTO"></a>
### ERR_NO_CRYPTO

View File

@ -75,7 +75,11 @@ typedef enum {
napi_cancelled,
napi_escape_called_twice,
napi_handle_scope_mismatch,
napi_callback_scope_mismatch
napi_callback_scope_mismatch,
#ifdef NAPI_EXPERIMENTAL
napi_queue_full,
napi_closing,
#endif // NAPI_EXPERIMENTAL
} napi_status;
```
If additional information is required upon an API returning a failed status,
@ -113,6 +117,43 @@ not allowed.
### napi_value
This is an opaque pointer that is used to represent a JavaScript value.
### napi_threadsafe_function
> Stability: 1 - Experimental
This is an opaque pointer that represents a JavaScript function which can be
called asynchronously from multiple threads via
`napi_call_threadsafe_function()`.
### napi_threadsafe_function_release_mode
> Stability: 1 - Experimental
A value to be given to `napi_release_threadsafe_function()` to indicate whether
the thread-safe function is to be closed immediately (`napi_tsfn_abort`) or
merely released (`napi_tsfn_release`) and thus available for subsequent use via
`napi_acquire_threadsafe_function()` and `napi_call_threadsafe_function()`.
```C
typedef enum {
napi_tsfn_release,
napi_tsfn_abort
} napi_threadsafe_function_release_mode;
```
### napi_threadsafe_function_call_mode
> Stability: 1 - Experimental
A value to be given to `napi_call_threadsafe_function()` to indicate whether
the call should block whenever the queue associated with the thread-safe
function is full.
```C
typedef enum {
napi_tsfn_nonblocking,
napi_tsfn_blocking
} napi_threadsafe_function_call_mode;
```
### N-API Memory Management types
#### napi_handle_scope
This is an abstraction used to control and modify the lifetime of objects
@ -194,6 +235,43 @@ typedef void (*napi_async_complete_callback)(napi_env env,
void* data);
```
#### napi_threadsafe_function_call_js
> Stability: 1 - Experimental
Function pointer used with asynchronous thread-safe function calls. The callback
will be called on the main thread. Its purpose is to use a data item arriving
via the queue from one of the secondary threads to construct the parameters
necessary for a call into JavaScript, usually via `napi_call_function`, and then
make the call into JavaScript.
The data arriving from the secondary thread via the queue is given in the `data`
parameter and the JavaScript function to call is given in the `js_callback`
parameter.
N-API sets up the environment prior to calling this callback, so it is
sufficient to call the JavaScript function via `napi_call_function` rather than
via `napi_make_callback`.
Callback functions must satisfy the following signature:
```C
typedef void (*napi_threadsafe_function_call_js)(napi_env env,
napi_value js_callback,
void* context,
void* data);
```
- `[in] env`: The environment to use for API calls, or `NULL` if the thread-safe
function is being torn down and `data` may need to be freed.
- `[in] js_callback`: The JavaScript function to call, or `NULL` if the
thread-safe function is being torn down and `data` may need to be freed.
- `[in] context`: The optional data with which the thread-safe function was
created.
- `[in] data`: Data created by the secondary thread. It is the responsibility of
the callback to convert this native data to JavaScript values (with N-API
functions) that can be passed as parameters when `js_callback` is invoked. This
pointer is managed entirely by the threads and this callback. Thus this callback
should free the data.
## Error Handling
N-API uses both return values and JavaScript exceptions for error handling.
The following sections explain the approach for each case.
@ -3851,6 +3929,298 @@ NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env,
- `[in] env`: The environment that the API is invoked under.
- `[out] loop`: The current libuv loop instance.
<!-- it's very convenient to have all the anchors indexed -->
<!--lint disable no-unused-definitions remark-lint-->
## Asynchronous Thread-safe Function Calls
> Stability: 1 - Experimental
JavaScript functions can normally only be called from a native addon's main
thread. If an addon creates additional threads, then N-API functions that
require a `napi_env`, `napi_value`, or `napi_ref` must not be called from those
threads.
When an addon has additional threads and JavaScript functions need to be invoked
based on the processing completed by those threads, those threads must
communicate with the addon's main thread so that the main thread can invoke the
JavaScript function on their behalf. The thread-safe function APIs provide an
easy way to do this.
These APIs provide the type `napi_threadsafe_function` as well as APIs to
create, destroy, and call objects of this type.
`napi_create_threadsafe_function()` creates a persistent reference to a
`napi_value` that holds a JavaScript function which can be called from multiple
threads. The calls happen asynchronously. This means that values with which the
JavaScript callback is to be called will be placed in a queue, and, for each
value in the queue, a call will eventually be made to the JavaScript function.
Upon creation of a `napi_threadsafe_function` a `napi_finalize` callback can be
provided. This callback will be invoked on the main thread when the thread-safe
function is about to be destroyed. It receives the context and the finalize data
given during construction, and provides an opportunity for cleaning up after the
threads e.g. by calling `uv_thread_join()`. **It is important that, aside from
the main loop thread, there be no threads left using the thread-safe function
after the finalize callback completes.**
The `context` given during the call to `napi_create_threadsafe_function()` can
be retrieved from any thread with a call to
`napi_get_threadsafe_function_context()`.
`napi_call_threadsafe_function()` can then be used for initiating a call into
JavaScript. `napi_call_threadsafe_function()` accepts a parameter which controls
whether the API behaves blockingly. If set to `napi_tsfn_nonblocking`, the API
behaves non-blockingly, returning `napi_queue_full` if the queue was full,
preventing data from being successfully added to the queue. If set to
`napi_tsfn_blocking`, the API blocks until space becomes available in the queue.
`napi_call_threadsafe_function()` never blocks if the thread-safe function was
created with a maximum queue size of 0.
The actual call into JavaScript is controlled by the callback given via the
`call_js_cb` parameter. `call_js_cb` is invoked on the main thread once for each
value that was placed into the queue by a successful call to
`napi_call_threadsafe_function()`. If such a callback is not given, a default
callback will be used, and the resulting JavaScript call will have no arguments.
The `call_js_cb` callback receives the JavaScript function to call as a
`napi_value` in its parameters, as well as the `void*` context pointer used when
creating the `napi_threadsafe_function`, and the next data pointer that was
created by one of the secondary threads. The callback can then use an API such
as `napi_call_function()` to call into JavaScript.
The callback may also be invoked with `env` and `call_js_cb` both set to `NULL`
to indicate that calls into JavaScript are no longer possible, while items
remain in the queue that may need to be freed. This normally occurs when the
Node.js process exits while there is a thread-safe function still active.
It is not necessary to call into JavaScript via `napi_make_callback()` because
N-API runs `call_js_cb` in a context appropriate for callbacks.
Threads can be added to and removed from a `napi_threadsafe_function` object
during its existence. Thus, in addition to specifying an initial number of
threads upon creation, `napi_acquire_threadsafe_function` can be called to
indicate that a new thread will start making use of the thread-safe function.
Similarly, `napi_release_threadsafe_function` can be called to indicate that an
existing thread will stop making use of the thread-safe function.
`napi_threadsafe_function` objects are destroyed when every thread which uses
the object has called `napi_release_threadsafe_function()` or has received a
return status of `napi_closing` in response to a call to
`napi_call_threadsafe_function`. The queue is emptied before the
`napi_threadsafe_function` is destroyed. It is important that
`napi_release_threadsafe_function()` be the last API call made in conjunction
with a given `napi_threadsafe_function`, because after the call completes, there
is no guarantee that the `napi_threadsafe_function` is still allocated. For the
same reason it is also important that no more use be made of a thread-safe
function after receiving a return value of `napi_closing` in response to a call
to `napi_call_threadsafe_function`. Data associated with the
`napi_threadsafe_function` can be freed in its `napi_finalize` callback which
was passed to `napi_create_threadsafe_function()`.
Once the number of threads making use of a `napi_threadsafe_function` reaches
zero, no further threads can start making use of it by calling
`napi_acquire_threadsafe_function()`. In fact, all subsequent API calls
associated with it, except `napi_release_threadsafe_function()`, will return an
error value of `napi_closing`.
The thread-safe function can be "aborted" by giving a value of `napi_tsfn_abort`
to `napi_release_threadsafe_function()`. This will cause all subsequent APIs
associated with the thread-safe function except
`napi_release_threadsafe_function()` to return `napi_closing` even before its
reference count reaches zero. In particular, `napi_call_threadsafe_function()`
will return `napi_closing`, thus informing the threads that it is no longer
possible to make asynchronous calls to the thread-safe function. This can be
used as a criterion for terminating the thread. **Upon receiving a return value
of `napi_closing` from `napi_call_threadsafe_function()` a thread must make no
further use of the thread-safe function because it is no longer guaranteed to
be allocated.**
Similarly to libuv handles, thread-safe functions can be "referenced" and
"unreferenced". A "referenced" thread-safe function will cause the event loop on
the thread on which it is created to remain alive until the thread-safe function
is destroyed. In contrast, an "unreferenced" thread-safe function will not
prevent the event loop from exiting. The APIs `napi_ref_threadsafe_function` and
`napi_unref_threadsafe_function` exist for this purpose.
### napi_create_threadsafe_function
> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->
```C
NAPI_EXTERN napi_status
napi_create_threadsafe_function(napi_env env,
napi_value func,
napi_value async_resource,
napi_value async_resource_name,
size_t max_queue_size,
size_t initial_thread_count,
void* thread_finalize_data,
napi_finalize thread_finalize_cb,
void* context,
napi_threadsafe_function_call_js call_js_cb,
napi_threadsafe_function* result);
```
- `[in] env`: The environment that the API is invoked under.
- `[in] func`: The JavaScript function to call from another thread.
- `[in] async_resource`: An optional object associated with the async work that
will be passed to possible `async_hooks` [`init` hooks][].
- `[in] async_resource_name`: A javaScript string to provide an identifier for
the kind of resource that is being provided for diagnostic information exposed
by the `async_hooks` API.
- `[in] max_queue_size`: Maximum size of the queue. 0 for no limit.
- `[in] initial_thread_count`: The initial number of threads, including the main
thread, which will be making use of this function.
- `[in] thread_finalize_data`: Data to be passed to `thread_finalize_cb`.
- `[in] thread_finalize_cb`: Function to call when the
`napi_threadsafe_function` is being destroyed.
- `[in] context`: Optional data to attach to the resulting
`napi_threadsafe_function`.
- `[in] call_js_cb`: Optional callback which calls the JavaScript function in
response to a call on a different thread. This callback will be called on the
main thread. If not given, the JavaScript function will be called with no
parameters and with `undefined` as its `this` value.
- `[out] result`: The asynchronous thread-safe JavaScript function.
### napi_get_threadsafe_function_context
> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->
```C
NAPI_EXTERN napi_status
napi_get_threadsafe_function_context(napi_threadsafe_function func,
void** result);
```
- `[in] func`: The thread-safe function for which to retrieve the context.
- `[out] context`: The location where to store the context.
This API may be called from any thread which makes use of `func`.
### napi_call_threadsafe_function
> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->
```C
NAPI_EXTERN napi_status
napi_call_threadsafe_function(napi_threadsafe_function func,
void* data,
napi_threadsafe_function_call_mode is_blocking);
```
- `[in] func`: The asynchronous thread-safe JavaScript function to invoke.
- `[in] data`: Data to send into JavaScript via the callback `call_js_cb`
provided during the creation of the thread-safe JavaScript function.
- `[in] is_blocking`: Flag whose value can be either `napi_tsfn_blocking` to
indicate that the call should block if the queue is full or
`napi_tsfn_nonblocking` to indicate that the call should return immediately with
a status of `napi_queue_full` whenever the queue is full.
This API will return `napi_closing` if `napi_release_threadsafe_function()` was
called with `abort` set to `napi_tsfn_abort` from any thread. The value is only
added to the queue if the API returns `napi_ok`.
This API may be called from any thread which makes use of `func`.
### napi_acquire_threadsafe_function
> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->
```C
NAPI_EXTERN napi_status
napi_acquire_threadsafe_function(napi_threadsafe_function func);
```
- `[in] func`: The asynchronous thread-safe JavaScript function to start making
use of.
A thread should call this API before passing `func` to any other thread-safe
function APIs to indicate that it will be making use of `func`. This prevents
`func` from being destroyed when all other threads have stopped making use of
it.
This API may be called from any thread which will start making use of `func`.
### napi_release_threadsafe_function
> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->
```C
NAPI_EXTERN napi_status
napi_release_threadsafe_function(napi_threadsafe_function func,
napi_threadsafe_function_release_mode mode);
```
- `[in] func`: The asynchronous thread-safe JavaScript function whose reference
count to decrement.
- `[in] mode`: Flag whose value can be either `napi_tsfn_release` to indicate
that the current thread will make no further calls to the thread-safe function,
or `napi_tsfn_abort` to indicate that in addition to the current thread, no
other thread should make any further calls to the thread-safe function. If set
to `napi_tsfn_abort`, further calls to `napi_call_threadsafe_function()` will
return `napi_closing`, and no further values will be placed in the queue.
A thread should call this API when it stops making use of `func`. Passing `func`
to any thread-safe APIs after having called this API has undefined results, as
`func` may have been destroyed.
This API may be called from any thread which will stop making use of `func`.
### napi_ref_threadsafe_function
> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->
```C
NAPI_EXTERN napi_status
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func);
```
- `[in] env`: The environment that the API is invoked under.
- `[in] func`: The thread-safe function to reference.
This API is used to indicate that the event loop running on the main thread
should not exit until `func` has been destroyed. Similar to [`uv_ref`][] it is
also idempotent.
This API may only be called from the main thread.
### napi_unref_threadsafe_function
> Stability: 1 - Experimental
<!-- YAML
added: REPLACEME
-->
```C
NAPI_EXTERN napi_status
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func);
```
- `[in] env`: The environment that the API is invoked under.
- `[in] func`: The thread-safe function to unreference.
This API is used to indicate that the event loop running on the main thread
may exit before `func` is destroyed. Similar to [`uv_unref`][] it is also
idempotent.
This API may only be called from the main thread.
[ECMAScript Language Specification]: https://tc39.github.io/ecma262/
[Error Handling]: #n_api_error_handling
[Native Abstractions for Node.js]: https://github.com/nodejs/nan
@ -3913,6 +4283,8 @@ NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env,
[`napi_throw_type_error`]: #n_api_napi_throw_type_error
[`napi_unwrap`]: #n_api_napi_unwrap
[`napi_wrap`]: #n_api_napi_wrap
[`uv_ref`]: http://docs.libuv.org/en/v1.x/handle.html#c.uv_ref
[`uv_unref`]: http://docs.libuv.org/en/v1.x/handle.html#c.uv_unref
[`process.release`]: process.html#process_process_release
[`init` hooks]: async_hooks.html#async_hooks_init_asyncid_type_triggerasyncid_resource

View File

@ -5,6 +5,7 @@
#include <algorithm>
#include <cmath>
#include <vector>
#define NAPI_EXPERIMENTAL
#include "node_api.h"
#include "node_internals.h"
#include "env.h"
@ -923,7 +924,10 @@ const char* error_messages[] = {nullptr,
"The async work item was cancelled",
"napi_escape_handle already called on scope",
"Invalid handle scope usage",
"Invalid callback scope usage"};
"Invalid callback scope usage",
"Thread-safe function queue is full",
"Thread-safe function handle is closing"
};
static inline napi_status napi_clear_last_error(napi_env env) {
env->last_error.error_code = napi_ok;
@ -954,7 +958,7 @@ napi_status napi_get_last_error_info(napi_env env,
// We don't have a napi_status_last as this would result in an ABI
// change each time a message was added.
static_assert(
node::arraysize(error_messages) == napi_callback_scope_mismatch + 1,
node::arraysize(error_messages) == napi_closing + 1,
"Count of error messages must match count of error values");
CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch);
@ -3553,3 +3557,435 @@ napi_status napi_run_script(napi_env env,
*result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked());
return GET_RETURN_STATUS(env);
}
class TsFn: public node::AsyncResource {
public:
TsFn(v8::Local<v8::Function> func,
v8::Local<v8::Object> resource,
v8::Local<v8::String> name,
size_t thread_count_,
void* context_,
size_t max_queue_size_,
napi_env env_,
void* finalize_data_,
napi_finalize finalize_cb_,
napi_threadsafe_function_call_js call_js_cb_):
AsyncResource(env_->isolate,
resource,
*v8::String::Utf8Value(env_->isolate, name)),
thread_count(thread_count_),
is_closing(false),
context(context_),
max_queue_size(max_queue_size_),
env(env_),
finalize_data(finalize_data_),
finalize_cb(finalize_cb_),
idle_running(false),
call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
handles_closing(false) {
ref.Reset(env->isolate, func);
node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
}
~TsFn() {
node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
}
// These methods can be called from any thread.
napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
node::Mutex::ScopedLock lock(this->mutex);
while (queue.size() >= max_queue_size &&
max_queue_size > 0 &&
!is_closing) {
if (mode == napi_tsfn_nonblocking) {
return napi_queue_full;
}
cond->Wait(lock);
}
if (is_closing) {
if (thread_count == 0) {
return napi_invalid_arg;
} else {
thread_count--;
return napi_closing;
}
} else {
if (uv_async_send(&async) != 0) {
return napi_generic_failure;
}
queue.push(data);
return napi_ok;
}
}
napi_status Acquire() {
node::Mutex::ScopedLock lock(this->mutex);
if (is_closing) {
return napi_closing;
}
thread_count++;
return napi_ok;
}
napi_status Release(napi_threadsafe_function_release_mode mode) {
node::Mutex::ScopedLock lock(this->mutex);
if (thread_count == 0) {
return napi_invalid_arg;
}
thread_count--;
if (thread_count == 0 || mode == napi_tsfn_abort) {
if (!is_closing) {
is_closing = (mode == napi_tsfn_abort);
if (is_closing) {
cond->Signal(lock);
}
if (uv_async_send(&async) != 0) {
return napi_generic_failure;
}
}
}
return napi_ok;
}
void EmptyQueueAndDelete() {
for (; !queue.empty() ; queue.pop()) {
call_js_cb(nullptr, nullptr, context, queue.front());
}
delete this;
}
// These methods must only be called from the loop thread.
napi_status Init() {
TsFn* ts_fn = this;
if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
if (max_queue_size > 0) {
cond.reset(new node::ConditionVariable);
}
if ((max_queue_size == 0 || cond.get() != nullptr) &&
uv_idle_init(env->loop, &idle) == 0) {
return napi_ok;
}
node::Environment::GetCurrent(env->isolate)->CloseHandle(
reinterpret_cast<uv_handle_t*>(&async),
[] (uv_handle_t* handle) -> void {
TsFn* ts_fn =
node::ContainerOf(&TsFn::async,
reinterpret_cast<uv_async_t*>(handle));
delete ts_fn;
});
// Prevent the thread-safe function from being deleted here, because
// the callback above will delete it.
ts_fn = nullptr;
}
delete ts_fn;
return napi_generic_failure;
}
napi_status Unref() {
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
return napi_ok;
}
napi_status Ref() {
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
return napi_ok;
}
void DispatchOne() {
void* data;
bool popped_value = false;
bool idle_stop_failed = false;
{
node::Mutex::ScopedLock lock(this->mutex);
if (is_closing) {
CloseHandlesAndMaybeDelete();
} else {
size_t size = queue.size();
if (size > 0) {
data = queue.front();
queue.pop();
popped_value = true;
if (size == max_queue_size && max_queue_size > 0) {
cond->Signal(lock);
}
size--;
}
if (size == 0) {
if (thread_count == 0) {
is_closing = true;
cond->Signal(lock);
CloseHandlesAndMaybeDelete();
} else {
if (uv_idle_stop(&idle) != 0) {
idle_stop_failed = true;
} else {
idle_running = false;
}
}
}
}
}
if (popped_value || idle_stop_failed) {
v8::HandleScope scope(env->isolate);
CallbackScope cb_scope(this);
if (idle_stop_failed) {
CHECK(napi_throw_error(env,
"ERR_NAPI_TSFN_STOP_IDLE_LOOP",
"Failed to stop the idle loop") == napi_ok);
} else {
v8::Local<v8::Function> js_cb =
v8::Local<v8::Function>::New(env->isolate, ref);
call_js_cb(env,
v8impl::JsValueFromV8LocalValue(js_cb),
context,
data);
}
}
}
node::Environment* NodeEnv() {
// For some reason grabbing the Node.js environment requires a handle scope.
v8::HandleScope scope(env->isolate);
return node::Environment::GetCurrent(env->isolate);
}
void MaybeStartIdle() {
if (!idle_running) {
if (uv_idle_start(&idle, IdleCb) != 0) {
v8::HandleScope scope(env->isolate);
CallbackScope cb_scope(this);
CHECK(napi_throw_error(env,
"ERR_NAPI_TSFN_START_IDLE_LOOP",
"Failed to start the idle loop") == napi_ok);
}
}
}
void Finalize() {
v8::HandleScope scope(env->isolate);
if (finalize_cb) {
CallbackScope cb_scope(this);
finalize_cb(env, finalize_data, context);
}
EmptyQueueAndDelete();
}
inline void* Context() {
return context;
}
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
if (set_closing) {
node::Mutex::ScopedLock lock(this->mutex);
is_closing = true;
cond->Signal(lock);
}
if (handles_closing) {
return;
}
handles_closing = true;
NodeEnv()->CloseHandle(
reinterpret_cast<uv_handle_t*>(&async),
[] (uv_handle_t* handle) -> void {
TsFn* ts_fn = node::ContainerOf(&TsFn::async,
reinterpret_cast<uv_async_t*>(handle));
ts_fn->NodeEnv()->CloseHandle(
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
[] (uv_handle_t* handle) -> void {
TsFn* ts_fn = node::ContainerOf(&TsFn::idle,
reinterpret_cast<uv_idle_t*>(handle));
ts_fn->Finalize();
});
});
}
// Default way of calling into JavaScript. Used when TsFn is constructed
// without a call_js_cb_.
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
if (!(env == nullptr || cb == nullptr)) {
napi_value recv;
napi_status status;
status = napi_get_undefined(env, &recv);
if (status != napi_ok) {
napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
"Failed to retrieve undefined value");
return;
}
status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
if (status != napi_ok && status != napi_pending_exception) {
napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
"Failed to call JS callback");
return;
}
}
}
static void IdleCb(uv_idle_t* idle) {
TsFn* ts_fn =
node::ContainerOf(&TsFn::idle, idle);
ts_fn->DispatchOne();
}
static void AsyncCb(uv_async_t* async) {
TsFn* ts_fn =
node::ContainerOf(&TsFn::async, async);
ts_fn->MaybeStartIdle();
}
static void Cleanup(void* data) {
reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true);
}
private:
// These are variables protected by the mutex.
node::Mutex mutex;
std::unique_ptr<node::ConditionVariable> cond;
std::queue<void*> queue;
uv_async_t async;
uv_idle_t idle;
size_t thread_count;
bool is_closing;
// These are variables set once, upon creation, and then never again, which
// means we don't need the mutex to read them.
void* context;
size_t max_queue_size;
// These are variables accessed only from the loop thread.
node::Persistent<v8::Function> ref;
napi_env env;
void* finalize_data;
napi_finalize finalize_cb;
bool idle_running;
napi_async_context async_context;
napi_threadsafe_function_call_js call_js_cb;
bool handles_closing;
};
NAPI_EXTERN napi_status
napi_create_threadsafe_function(napi_env env,
napi_value func,
napi_value async_resource,
napi_value async_resource_name,
size_t max_queue_size,
size_t initial_thread_count,
void* thread_finalize_data,
napi_finalize thread_finalize_cb,
void* context,
napi_threadsafe_function_call_js call_js_cb,
napi_threadsafe_function* result) {
CHECK_ENV(env);
CHECK_ARG(env, func);
CHECK_ARG(env, async_resource_name);
RETURN_STATUS_IF_FALSE(env, initial_thread_count > 0, napi_invalid_arg);
CHECK_ARG(env, result);
napi_status status = napi_ok;
v8::Local<v8::Function> v8_func;
CHECK_TO_FUNCTION(env, v8_func, func);
v8::Local<v8::Context> v8_context = env->isolate->GetCurrentContext();
v8::Local<v8::Object> v8_resource;
if (async_resource == nullptr) {
v8_resource = v8::Object::New(env->isolate);
} else {
CHECK_TO_OBJECT(env, v8_context, v8_resource, async_resource);
}
v8::Local<v8::String> v8_name;
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name);
TsFn* ts_fn = new TsFn(v8_func,
v8_resource,
v8_name,
initial_thread_count,
context,
max_queue_size,
env,
thread_finalize_data,
thread_finalize_cb,
call_js_cb);
if (ts_fn == nullptr) {
status = napi_generic_failure;
} else {
// Init deletes ts_fn upon failure.
status = ts_fn->Init();
if (status == napi_ok) {
*result = reinterpret_cast<napi_threadsafe_function>(ts_fn);
}
}
return napi_set_last_error(env, status);
}
NAPI_EXTERN napi_status
napi_get_threadsafe_function_context(napi_threadsafe_function func,
void** result) {
CHECK(func != nullptr);
CHECK(result != nullptr);
*result = reinterpret_cast<TsFn*>(func)->Context();
return napi_ok;
}
NAPI_EXTERN napi_status
napi_call_threadsafe_function(napi_threadsafe_function func,
void* data,
napi_threadsafe_function_call_mode is_blocking) {
CHECK(func != nullptr);
return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking);
}
NAPI_EXTERN napi_status
napi_acquire_threadsafe_function(napi_threadsafe_function func) {
CHECK(func != nullptr);
return reinterpret_cast<TsFn*>(func)->Acquire();
}
NAPI_EXTERN napi_status
napi_release_threadsafe_function(napi_threadsafe_function func,
napi_threadsafe_function_release_mode mode) {
CHECK(func != nullptr);
return reinterpret_cast<TsFn*>(func)->Release(mode);
}
NAPI_EXTERN napi_status
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
CHECK(func != nullptr);
return reinterpret_cast<TsFn*>(func)->Unref();
}
NAPI_EXTERN napi_status
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
CHECK(func != nullptr);
return reinterpret_cast<TsFn*>(func)->Ref();
}

View File

@ -614,6 +614,44 @@ NAPI_EXTERN napi_status napi_run_script(napi_env env,
NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env,
struct uv_loop_s** loop);
#ifdef NAPI_EXPERIMENTAL
// Calling into JS from other threads
NAPI_EXTERN napi_status
napi_create_threadsafe_function(napi_env env,
napi_value func,
napi_value async_resource,
napi_value async_resource_name,
size_t max_queue_size,
size_t initial_thread_count,
void* thread_finalize_data,
napi_finalize thread_finalize_cb,
void* context,
napi_threadsafe_function_call_js call_js_cb,
napi_threadsafe_function* result);
NAPI_EXTERN napi_status
napi_get_threadsafe_function_context(napi_threadsafe_function func,
void** result);
NAPI_EXTERN napi_status
napi_call_threadsafe_function(napi_threadsafe_function func,
void* data,
napi_threadsafe_function_call_mode is_blocking);
NAPI_EXTERN napi_status
napi_acquire_threadsafe_function(napi_threadsafe_function func);
NAPI_EXTERN napi_status
napi_release_threadsafe_function(napi_threadsafe_function func,
napi_threadsafe_function_release_mode mode);
NAPI_EXTERN napi_status
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func);
NAPI_EXTERN napi_status
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func);
#endif // NAPI_EXPERIMENTAL
EXTERN_C_END
#endif // SRC_NODE_API_H_

View File

@ -20,6 +20,9 @@ typedef struct napi_callback_info__* napi_callback_info;
typedef struct napi_async_context__* napi_async_context;
typedef struct napi_async_work__* napi_async_work;
typedef struct napi_deferred__* napi_deferred;
#ifdef NAPI_EXPERIMENTAL
typedef struct napi_threadsafe_function__* napi_threadsafe_function;
#endif // NAPI_EXPERIMENTAL
typedef enum {
napi_default = 0,
@ -72,9 +75,25 @@ typedef enum {
napi_cancelled,
napi_escape_called_twice,
napi_handle_scope_mismatch,
napi_callback_scope_mismatch
napi_callback_scope_mismatch,
#ifdef NAPI_EXPERIMENTAL
napi_queue_full,
napi_closing,
#endif // NAPI_EXPERIMENTAL
} napi_status;
#ifdef NAPI_EXPERIMENTAL
typedef enum {
napi_tsfn_release,
napi_tsfn_abort
} napi_threadsafe_function_release_mode;
typedef enum {
napi_tsfn_nonblocking,
napi_tsfn_blocking
} napi_threadsafe_function_call_mode;
#endif // NAPI_EXPERIMENTAL
typedef napi_value (*napi_callback)(napi_env env,
napi_callback_info info);
typedef void (*napi_finalize)(napi_env env,
@ -86,6 +105,13 @@ typedef void (*napi_async_complete_callback)(napi_env env,
napi_status status,
void* data);
#ifdef NAPI_EXPERIMENTAL
typedef void (*napi_threadsafe_function_call_js)(napi_env env,
napi_value js_callback,
void* context,
void* data);
#endif // NAPI_EXPERIMENTAL
typedef struct {
// One of utf8name or name should be NULL.
const char* utf8name;

View File

@ -0,0 +1,254 @@
// For the purpose of this test we use libuv's threading library. When deciding
// on a threading library for a new project it bears remembering that in the
// future libuv may introduce API changes which may render it non-ABI-stable,
// which, in turn, may affect the ABI stability of the project despite its use
// of N-API.
#include <uv.h>
#define NAPI_EXPERIMENTAL
#include <node_api.h>
#include "../common.h"
#define ARRAY_LENGTH 10
static uv_thread_t uv_threads[2];
static napi_threadsafe_function ts_fn;
typedef struct {
napi_threadsafe_function_call_mode block_on_full;
napi_threadsafe_function_release_mode abort;
bool start_secondary;
napi_ref js_finalize_cb;
} ts_fn_hint;
static ts_fn_hint ts_info;
// Thread data to transmit to JS
static int ints[ARRAY_LENGTH];
static void secondary_thread(void* data) {
napi_threadsafe_function ts_fn = data;
if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
"napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
}
// Source thread producing the data
static void data_source_thread(void* data) {
napi_threadsafe_function ts_fn = data;
int index;
void* hint;
ts_fn_hint *ts_fn_info;
napi_status status;
bool queue_was_full = false;
bool queue_was_closing = false;
if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
}
ts_fn_info = (ts_fn_hint *)hint;
if (ts_fn_info != &ts_info) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
}
if (ts_fn_info->start_secondary) {
if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"failed to start secondary thread", NAPI_AUTO_LENGTH);
}
}
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
status = napi_call_threadsafe_function(ts_fn, &ints[index],
ts_fn_info->block_on_full);
switch (status) {
case napi_queue_full:
queue_was_full = true;
index++;
// fall through
case napi_ok:
continue;
case napi_closing:
queue_was_closing = true;
break;
default:
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
}
// Assert that the enqueuing of a value was refused at least once, if this is
// a non-blocking test run.
if (!ts_fn_info->block_on_full && !queue_was_full) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"queue was never full", NAPI_AUTO_LENGTH);
}
// Assert that the queue was marked as closing at least once, if this is an
// aborting test run.
if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"queue was never closing", NAPI_AUTO_LENGTH);
}
if (!queue_was_closing &&
napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
}
// Getting the data into JS
static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
if (!(env == NULL || cb == NULL)) {
napi_value argv, undefined;
NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
NULL));
}
}
// Cleanup
static napi_value StopThread(napi_env env, napi_callback_info info) {
size_t argc = 2;
napi_value argv[2];
NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
napi_valuetype value_type;
NAPI_CALL(env, napi_typeof(env, argv[0], &value_type));
NAPI_ASSERT(env, value_type == napi_function,
"StopThread argument is a function");
NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
NAPI_CALL(env,
napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
bool abort;
NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
NAPI_CALL(env,
napi_release_threadsafe_function(ts_fn,
abort ? napi_tsfn_abort : napi_tsfn_release));
ts_fn = NULL;
return NULL;
}
// Join the thread and inform JS that we're done.
static void join_the_threads(napi_env env, void *data, void *hint) {
uv_thread_t *the_threads = data;
ts_fn_hint *the_hint = hint;
napi_value js_cb, undefined;
uv_thread_join(&the_threads[0]);
if (the_hint->start_secondary) {
uv_thread_join(&the_threads[1]);
}
NAPI_CALL_RETURN_VOID(env,
napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
NAPI_CALL_RETURN_VOID(env,
napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env,
the_hint->js_finalize_cb));
}
static napi_value StartThreadInternal(napi_env env,
napi_callback_info info,
napi_threadsafe_function_call_js cb,
bool block_on_full) {
size_t argc = 3;
napi_value argv[3];
ts_info.block_on_full =
(block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
napi_value async_name;
NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
NAPI_AUTO_LENGTH, &async_name));
NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name,
2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn));
bool abort;
NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
NAPI_ASSERT(env,
(uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
"Thread creation");
return NULL;
}
static napi_value Unref(napi_env env, napi_callback_info info) {
NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
return NULL;
}
static napi_value Release(napi_env env, napi_callback_info info) {
NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
return NULL;
}
// Startup
static napi_value StartThread(napi_env env, napi_callback_info info) {
return StartThreadInternal(env, info, call_js, true);
}
static napi_value StartThreadNonblocking(napi_env env,
napi_callback_info info) {
return StartThreadInternal(env, info, call_js, false);
}
static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
return StartThreadInternal(env, info, NULL, true);
}
// Module init
static napi_value Init(napi_env env, napi_value exports) {
size_t index;
for (index = 0; index < ARRAY_LENGTH; index++) {
ints[index] = index;
}
napi_value js_array_length;
napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
napi_property_descriptor properties[] = {
{
"ARRAY_LENGTH",
NULL,
NULL,
NULL,
NULL,
js_array_length,
napi_enumerable,
NULL
},
DECLARE_NAPI_PROPERTY("StartThread", StartThread),
DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
DECLARE_NAPI_PROPERTY("StopThread", StopThread),
DECLARE_NAPI_PROPERTY("Unref", Unref),
DECLARE_NAPI_PROPERTY("Release", Release),
};
NAPI_CALL(env, napi_define_properties(env, exports,
sizeof(properties)/sizeof(properties[0]), properties));
return exports;
}
NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)

View File

@ -0,0 +1,8 @@
{
'targets': [
{
'target_name': 'binding',
'sources': ['binding.c']
}
]
}

View File

@ -0,0 +1,166 @@
'use strict';
const common = require('../../common');
const assert = require('assert');
const binding = require(`./build/${common.buildType}/binding`);
const { fork } = require('child_process');
const expectedArray = (function(arrayLength) {
const result = [];
for (let index = 0; index < arrayLength; index++) {
result.push(arrayLength - 1 - index);
}
return result;
})(binding.ARRAY_LENGTH);
common.crashOnUnhandledRejection();
// Handle the rapid teardown test case as the child process. We unref the
// thread-safe function after we have received two values. This causes the
// process to exit and the environment cleanup handler to be invoked.
if (process.argv[2] === 'child') {
let callCount = 0;
binding.StartThread((value) => {
callCount++;
console.log(value);
if (callCount === 2) {
binding.Unref();
}
}, false /* abort */, true /* launchSecondary */);
// Release the thread-safe function from the main thread so that it may be
// torn down via the environment cleanup handler.
binding.Release();
return;
}
function testWithJSMarshaller({
threadStarter,
quitAfter,
abort,
launchSecondary }) {
return new Promise((resolve) => {
const array = [];
binding[threadStarter](function testCallback(value) {
array.push(value);
if (array.length === quitAfter) {
setImmediate(() => {
binding.StopThread(common.mustCall(() => {
resolve(array);
}), !!abort);
});
}
}, !!abort, !!launchSecondary);
if (threadStarter === 'StartThreadNonblocking') {
// Let's make this thread really busy for a short while to ensure that
// the queue fills and the thread receives a napi_queue_full.
const start = Date.now();
while (Date.now() - start < 200);
}
});
}
new Promise(function testWithoutJSMarshaller(resolve) {
let callCount = 0;
binding.StartThreadNoNative(function testCallback() {
callCount++;
// The default call-into-JS implementation passes no arguments.
assert.strictEqual(arguments.length, 0);
if (callCount === binding.ARRAY_LENGTH) {
setImmediate(() => {
binding.StopThread(common.mustCall(() => {
resolve();
}), false);
});
}
}, false /* abort */, false /* launchSecondary */);
})
// Start the thread in blocking mode, and assert that all values are passed.
// Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
// Start the thread in non-blocking mode, and assert that all values are passed.
// Quit after it's done.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
// Start the thread in blocking mode, and assert that all values are passed.
// Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
// Start the thread in non-blocking mode, and assert that all values are passed.
// Quit early, but let the thread finish.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: 1
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
// Start the thread in blocking mode, and assert that all values are passed.
// Quit early, but let the thread finish. Launch a secondary thread to test the
// reference counter incrementing functionality.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
launchSecondary: true
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
// Start the thread in non-blocking mode, and assert that all values are passed.
// Quit early, but let the thread finish. Launch a secondary thread to test the
// reference counter incrementing functionality.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: 1,
launchSecondary: true
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))
// Start the thread in blocking mode, and assert that it could not finish.
// Quit early and aborting.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThread',
quitAfter: 1,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))
// Start the thread in non-blocking mode, and assert that it could not finish.
// Quit early and aborting.
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
quitAfter: 1,
abort: true
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))
// Start a child process to test rapid teardown
.then(() => {
return new Promise((resolve, reject) => {
let output = '';
const child = fork(__filename, ['child'], {
stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
});
child.on('close', (code) => {
if (code === 0) {
resolve(output.match(/\S+/g));
} else {
reject(new Error('Child process died with code ' + code));
}
});
child.stdout.on('data', (data) => (output += data.toString()));
});
})
.then((result) => assert.strictEqual(result.indexOf(0), -1));