n-api: guard against cond null dereference
A condition variable is only created by the thread-safe function if the queue size is set to something larger than zero. This adds null-checks around the condition variable and tests for the case where the queue size is zero. Fixes: https://github.com/nodejs/help/issues/1387 PR-URL: https://github.com/nodejs/node/pull/21871 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
This commit is contained in:
parent
b38b8d3b9d
commit
53296e8a3e
@ -3782,7 +3782,7 @@ class TsFn: public node::AsyncResource {
|
||||
if (thread_count == 0 || mode == napi_tsfn_abort) {
|
||||
if (!is_closing) {
|
||||
is_closing = (mode == napi_tsfn_abort);
|
||||
if (is_closing) {
|
||||
if (is_closing && max_queue_size > 0) {
|
||||
cond->Signal(lock);
|
||||
}
|
||||
if (uv_async_send(&async) != 0) {
|
||||
@ -3872,7 +3872,9 @@ class TsFn: public node::AsyncResource {
|
||||
if (size == 0) {
|
||||
if (thread_count == 0) {
|
||||
is_closing = true;
|
||||
cond->Signal(lock);
|
||||
if (max_queue_size > 0) {
|
||||
cond->Signal(lock);
|
||||
}
|
||||
CloseHandlesAndMaybeDelete();
|
||||
} else {
|
||||
if (uv_idle_stop(&idle) != 0) {
|
||||
@ -3939,7 +3941,9 @@ class TsFn: public node::AsyncResource {
|
||||
if (set_closing) {
|
||||
node::Mutex::ScopedLock lock(this->mutex);
|
||||
is_closing = true;
|
||||
cond->Signal(lock);
|
||||
if (max_queue_size > 0) {
|
||||
cond->Signal(lock);
|
||||
}
|
||||
}
|
||||
if (handles_closing) {
|
||||
return;
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include "../common.h"
|
||||
|
||||
#define ARRAY_LENGTH 10
|
||||
#define MAX_QUEUE_SIZE 2
|
||||
|
||||
static uv_thread_t uv_threads[2];
|
||||
static napi_threadsafe_function ts_fn;
|
||||
@ -18,6 +19,7 @@ typedef struct {
|
||||
napi_threadsafe_function_release_mode abort;
|
||||
bool start_secondary;
|
||||
napi_ref js_finalize_cb;
|
||||
uint32_t max_queue_size;
|
||||
} ts_fn_hint;
|
||||
|
||||
static ts_fn_hint ts_info;
|
||||
@ -71,6 +73,12 @@ static void data_source_thread(void* data) {
|
||||
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
|
||||
status = napi_call_threadsafe_function(ts_fn, &ints[index],
|
||||
ts_fn_info->block_on_full);
|
||||
if (ts_fn_info->max_queue_size == 0) {
|
||||
// Let's make this thread really busy for 200 ms to give the main thread a
|
||||
// chance to abort.
|
||||
uint64_t start = uv_hrtime();
|
||||
for (; uv_hrtime() - start < 200000000;);
|
||||
}
|
||||
switch (status) {
|
||||
case napi_queue_full:
|
||||
queue_was_full = true;
|
||||
@ -167,8 +175,8 @@ static napi_value StartThreadInternal(napi_env env,
|
||||
napi_callback_info info,
|
||||
napi_threadsafe_function_call_js cb,
|
||||
bool block_on_full) {
|
||||
size_t argc = 3;
|
||||
napi_value argv[3];
|
||||
size_t argc = 4;
|
||||
napi_value argv[4];
|
||||
|
||||
ts_info.block_on_full =
|
||||
(block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
|
||||
@ -178,8 +186,18 @@ static napi_value StartThreadInternal(napi_env env,
|
||||
napi_value async_name;
|
||||
NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
|
||||
NAPI_AUTO_LENGTH, &async_name));
|
||||
NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name,
|
||||
2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn));
|
||||
NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
|
||||
NAPI_CALL(env, napi_create_threadsafe_function(env,
|
||||
argv[0],
|
||||
NULL,
|
||||
async_name,
|
||||
ts_info.max_queue_size,
|
||||
2,
|
||||
uv_threads,
|
||||
join_the_threads,
|
||||
&ts_info,
|
||||
cb,
|
||||
&ts_fn));
|
||||
bool abort;
|
||||
NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
|
||||
ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
|
||||
@ -224,8 +242,9 @@ static napi_value Init(napi_env env, napi_value exports) {
|
||||
for (index = 0; index < ARRAY_LENGTH; index++) {
|
||||
ints[index] = index;
|
||||
}
|
||||
napi_value js_array_length;
|
||||
napi_value js_array_length, js_max_queue_size;
|
||||
napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
|
||||
napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);
|
||||
|
||||
napi_property_descriptor properties[] = {
|
||||
{
|
||||
@ -238,6 +257,16 @@ static napi_value Init(napi_env env, napi_value exports) {
|
||||
napi_enumerable,
|
||||
NULL
|
||||
},
|
||||
{
|
||||
"MAX_QUEUE_SIZE",
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
js_max_queue_size,
|
||||
napi_enumerable,
|
||||
NULL
|
||||
},
|
||||
DECLARE_NAPI_PROPERTY("StartThread", StartThread),
|
||||
DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
|
||||
DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
|
||||
|
@ -23,7 +23,7 @@ if (process.argv[2] === 'child') {
|
||||
if (callCount === 2) {
|
||||
binding.Unref();
|
||||
}
|
||||
}, false /* abort */, true /* launchSecondary */);
|
||||
}, false /* abort */, true /* launchSecondary */, +process.argv[3]);
|
||||
|
||||
// Release the thread-safe function from the main thread so that it may be
|
||||
// torn down via the environment cleanup handler.
|
||||
@ -35,6 +35,7 @@ function testWithJSMarshaller({
|
||||
threadStarter,
|
||||
quitAfter,
|
||||
abort,
|
||||
maxQueueSize,
|
||||
launchSecondary }) {
|
||||
return new Promise((resolve) => {
|
||||
const array = [];
|
||||
@ -47,7 +48,7 @@ function testWithJSMarshaller({
|
||||
}), !!abort);
|
||||
});
|
||||
}
|
||||
}, !!abort, !!launchSecondary);
|
||||
}, !!abort, !!launchSecondary, maxQueueSize);
|
||||
if (threadStarter === 'StartThreadNonblocking') {
|
||||
// Let's make this thread really busy for a short while to ensure that
|
||||
// the queue fills and the thread receives a napi_queue_full.
|
||||
@ -57,6 +58,24 @@ function testWithJSMarshaller({
|
||||
});
|
||||
}
|
||||
|
||||
function testUnref(queueSize) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let output = '';
|
||||
const child = fork(__filename, ['child', queueSize], {
|
||||
stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
|
||||
});
|
||||
child.on('close', (code) => {
|
||||
if (code === 0) {
|
||||
resolve(output.match(/\S+/g));
|
||||
} else {
|
||||
reject(new Error('Child process died with code ' + code));
|
||||
}
|
||||
});
|
||||
child.stdout.on('data', (data) => (output += data.toString()));
|
||||
})
|
||||
.then((result) => assert.strictEqual(result.indexOf(0), -1));
|
||||
}
|
||||
|
||||
new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
let callCount = 0;
|
||||
binding.StartThreadNoNative(function testCallback() {
|
||||
@ -71,13 +90,23 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
}), false);
|
||||
});
|
||||
}
|
||||
}, false /* abort */, false /* launchSecondary */);
|
||||
}, false /* abort */, false /* launchSecondary */, binding.MAX_QUEUE_SIZE);
|
||||
})
|
||||
|
||||
// Start the thread in blocking mode, and assert that all values are passed.
|
||||
// Quit after it's done.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThread',
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
quitAfter: binding.ARRAY_LENGTH
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
|
||||
// Start the thread in blocking mode with an infinite queue, and assert that all
|
||||
// values are passed. Quit after it's done.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThread',
|
||||
maxQueueSize: 0,
|
||||
quitAfter: binding.ARRAY_LENGTH
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
@ -86,6 +115,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
// Quit after it's done.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThreadNonblocking',
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
quitAfter: binding.ARRAY_LENGTH
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
@ -94,6 +124,16 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
// Quit early, but let the thread finish.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThread',
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
quitAfter: 1
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
|
||||
// Start the thread in blocking mode with an infinite queue, and assert that all
|
||||
// values are passed. Quit early, but let the thread finish.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThread',
|
||||
maxQueueSize: 0,
|
||||
quitAfter: 1
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
@ -102,6 +142,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
// Quit early, but let the thread finish.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThreadNonblocking',
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
quitAfter: 1
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
@ -112,6 +153,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThread',
|
||||
quitAfter: 1,
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
launchSecondary: true
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
@ -122,15 +164,27 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThreadNonblocking',
|
||||
quitAfter: 1,
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
launchSecondary: true
|
||||
}))
|
||||
.then((result) => assert.deepStrictEqual(result, expectedArray))
|
||||
|
||||
// Start the thread in blocking mode, and assert that it could not finish.
|
||||
// Quit early and aborting.
|
||||
// Quit early by aborting.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThread',
|
||||
quitAfter: 1,
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
abort: true
|
||||
}))
|
||||
.then((result) => assert.strictEqual(result.indexOf(0), -1))
|
||||
|
||||
// Start the thread in blocking mode with an infinite queue, and assert that it
|
||||
// could not finish. Quit early by aborting.
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThread',
|
||||
quitAfter: 1,
|
||||
maxQueueSize: 0,
|
||||
abort: true
|
||||
}))
|
||||
.then((result) => assert.strictEqual(result.indexOf(0), -1))
|
||||
@ -140,25 +194,13 @@ new Promise(function testWithoutJSMarshaller(resolve) {
|
||||
.then(() => testWithJSMarshaller({
|
||||
threadStarter: 'StartThreadNonblocking',
|
||||
quitAfter: 1,
|
||||
maxQueueSize: binding.MAX_QUEUE_SIZE,
|
||||
abort: true
|
||||
}))
|
||||
.then((result) => assert.strictEqual(result.indexOf(0), -1))
|
||||
|
||||
// Start a child process to test rapid teardown
|
||||
.then(() => {
|
||||
return new Promise((resolve, reject) => {
|
||||
let output = '';
|
||||
const child = fork(__filename, ['child'], {
|
||||
stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
|
||||
});
|
||||
child.on('close', (code) => {
|
||||
if (code === 0) {
|
||||
resolve(output.match(/\S+/g));
|
||||
} else {
|
||||
reject(new Error('Child process died with code ' + code));
|
||||
}
|
||||
});
|
||||
child.stdout.on('data', (data) => (output += data.toString()));
|
||||
});
|
||||
})
|
||||
.then((result) => assert.strictEqual(result.indexOf(0), -1));
|
||||
.then(() => testUnref(binding.MAX_QUEUE_SIZE))
|
||||
|
||||
// Start a child process with an infinite queue to test rapid teardown
|
||||
.then(() => testUnref(0));
|
||||
|
Loading…
x
Reference in New Issue
Block a user