timers: refactor setImmediate error handling
If an error is encountered during the processing of Immediates, schedule the remaining queue to finish after all error handling code runs (if the process is still alive to do so). The new changes make the Immediates error handling behaviour entirely deterministic and predictable, as the full queue will be flushed on each Immediates cycle, regardless of whether an error is encountered or not. Currently this processing is scheduled for nextTick which can yield unpredictable results as the nextTick might happen as early as close callbacks phase or as late as after the next event loop turns Immediates all fully processed. The latter can result in two full cycles of Immediates processing during one even loop turn. The current implementation also doesn't differentiate between Immediates scheduled for the current queue run or the next one, so Immediates that were scheduled for the next turn of the event loop, will process alongside the ones that were scheduled for the current turn. PR-URL: https://github.com/nodejs/node/pull/17879 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
parent
11a1bc1136
commit
54062d30cf
@ -51,7 +51,11 @@ const { kInit, kDestroy, kAsyncIdCounter } = async_wrap.constants;
|
|||||||
const async_id_symbol = timerInternals.async_id_symbol;
|
const async_id_symbol = timerInternals.async_id_symbol;
|
||||||
const trigger_async_id_symbol = timerInternals.trigger_async_id_symbol;
|
const trigger_async_id_symbol = timerInternals.trigger_async_id_symbol;
|
||||||
|
|
||||||
const [activateImmediateCheck, scheduledImmediateCountArray] =
|
// *Must* match Environment::ImmediateInfo::Fields in src/env.h.
|
||||||
|
const kCount = 0;
|
||||||
|
const kHasOutstanding = 1;
|
||||||
|
|
||||||
|
const [activateImmediateCheck, immediateInfo] =
|
||||||
setImmediateCallback(processImmediate);
|
setImmediateCallback(processImmediate);
|
||||||
|
|
||||||
// The Timeout class
|
// The Timeout class
|
||||||
@ -627,16 +631,23 @@ ImmediateList.prototype.remove = function(item) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Create a single linked list instance only once at startup
|
// Create a single linked list instance only once at startup
|
||||||
var immediateQueue = new ImmediateList();
|
const immediateQueue = new ImmediateList();
|
||||||
|
|
||||||
|
// If an uncaught exception was thrown during execution of immediateQueue,
|
||||||
|
// this queue will store all remaining Immediates that need to run upon
|
||||||
|
// resolution of all error handling (if process is still alive).
|
||||||
|
const outstandingQueue = new ImmediateList();
|
||||||
|
|
||||||
|
|
||||||
function processImmediate() {
|
function processImmediate() {
|
||||||
var immediate = immediateQueue.head;
|
const queue = outstandingQueue.head !== null ?
|
||||||
var tail = immediateQueue.tail;
|
outstandingQueue : immediateQueue;
|
||||||
|
var immediate = queue.head;
|
||||||
|
var tail = queue.tail;
|
||||||
|
|
||||||
// Clear the linked list early in case new `setImmediate()` calls occur while
|
// Clear the linked list early in case new `setImmediate()` calls occur while
|
||||||
// immediate callbacks are executed
|
// immediate callbacks are executed
|
||||||
immediateQueue.head = immediateQueue.tail = null;
|
queue.head = queue.tail = null;
|
||||||
|
|
||||||
while (immediate !== null) {
|
while (immediate !== null) {
|
||||||
if (!immediate._onImmediate) {
|
if (!immediate._onImmediate) {
|
||||||
@ -645,9 +656,14 @@ function processImmediate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Save next in case `clearImmediate(immediate)` is called from callback
|
// Save next in case `clearImmediate(immediate)` is called from callback
|
||||||
var next = immediate._idleNext;
|
const next = immediate._idleNext;
|
||||||
|
|
||||||
tryOnImmediate(immediate, tail);
|
const asyncId = immediate[async_id_symbol];
|
||||||
|
emitBefore(asyncId, immediate[trigger_async_id_symbol]);
|
||||||
|
|
||||||
|
tryOnImmediate(immediate, next, tail);
|
||||||
|
|
||||||
|
emitAfter(asyncId);
|
||||||
|
|
||||||
// If `clearImmediate(immediate)` wasn't called from the callback, use the
|
// If `clearImmediate(immediate)` wasn't called from the callback, use the
|
||||||
// `immediate`'s next item
|
// `immediate`'s next item
|
||||||
@ -656,45 +672,36 @@ function processImmediate() {
|
|||||||
else
|
else
|
||||||
immediate = next;
|
immediate = next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
immediateInfo[kHasOutstanding] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// An optimization so that the try/finally only de-optimizes (since at least v8
|
// An optimization so that the try/finally only de-optimizes (since at least v8
|
||||||
// 4.7) what is in this smaller function.
|
// 4.7) what is in this smaller function.
|
||||||
function tryOnImmediate(immediate, oldTail) {
|
function tryOnImmediate(immediate, next, oldTail) {
|
||||||
var threw = true;
|
var threw = true;
|
||||||
emitBefore(immediate[async_id_symbol], immediate[trigger_async_id_symbol]);
|
|
||||||
try {
|
try {
|
||||||
// make the actual call outside the try/finally to allow it to be optimized
|
// make the actual call outside the try/finally to allow it to be optimized
|
||||||
runCallback(immediate);
|
runCallback(immediate);
|
||||||
threw = false;
|
threw = false;
|
||||||
} finally {
|
} finally {
|
||||||
immediate._onImmediate = null;
|
immediate._onImmediate = null;
|
||||||
if (!threw)
|
|
||||||
emitAfter(immediate[async_id_symbol]);
|
|
||||||
|
|
||||||
if (!immediate._destroyed) {
|
if (!immediate._destroyed) {
|
||||||
immediate._destroyed = true;
|
immediate._destroyed = true;
|
||||||
scheduledImmediateCountArray[0]--;
|
immediateInfo[kCount]--;
|
||||||
|
|
||||||
if (async_hook_fields[kDestroy] > 0) {
|
if (async_hook_fields[kDestroy] > 0) {
|
||||||
emitDestroy(immediate[async_id_symbol]);
|
emitDestroy(immediate[async_id_symbol]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (threw && immediate._idleNext !== null) {
|
if (threw && (immediate._idleNext !== null || next !== null)) {
|
||||||
// Handle any remaining on next tick, assuming we're still alive to do so.
|
// Handle any remaining Immediates after error handling has resolved,
|
||||||
const curHead = immediateQueue.head;
|
// assuming we're still alive to do so.
|
||||||
const next = immediate._idleNext;
|
outstandingQueue.head = immediate._idleNext || next;
|
||||||
if (curHead !== null) {
|
outstandingQueue.tail = oldTail;
|
||||||
curHead._idlePrev = oldTail;
|
immediateInfo[kHasOutstanding] = 1;
|
||||||
oldTail._idleNext = curHead;
|
|
||||||
next._idlePrev = null;
|
|
||||||
immediateQueue.head = next;
|
|
||||||
} else {
|
|
||||||
immediateQueue.head = next;
|
|
||||||
immediateQueue.tail = oldTail;
|
|
||||||
}
|
|
||||||
process.nextTick(processImmediate);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -728,9 +735,9 @@ function Immediate(callback, args) {
|
|||||||
this);
|
this);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (scheduledImmediateCountArray[0] === 0)
|
if (immediateInfo[kCount] === 0)
|
||||||
activateImmediateCheck();
|
activateImmediateCheck();
|
||||||
scheduledImmediateCountArray[0]++;
|
immediateInfo[kCount]++;
|
||||||
|
|
||||||
immediateQueue.append(this);
|
immediateQueue.append(this);
|
||||||
}
|
}
|
||||||
@ -776,7 +783,7 @@ exports.clearImmediate = function(immediate) {
|
|||||||
if (!immediate) return;
|
if (!immediate) return;
|
||||||
|
|
||||||
if (!immediate._destroyed) {
|
if (!immediate._destroyed) {
|
||||||
scheduledImmediateCountArray[0]--;
|
immediateInfo[kCount]--;
|
||||||
immediate._destroyed = true;
|
immediate._destroyed = true;
|
||||||
|
|
||||||
if (async_hook_fields[kDestroy] > 0) {
|
if (async_hook_fields[kDestroy] > 0) {
|
||||||
|
@ -217,6 +217,30 @@ inline bool Environment::AsyncCallbackScope::in_makecallback() const {
|
|||||||
return env_->makecallback_cntr_ > 1;
|
return env_->makecallback_cntr_ > 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline Environment::ImmediateInfo::ImmediateInfo(v8::Isolate* isolate)
|
||||||
|
: fields_(isolate, kFieldsCount) {}
|
||||||
|
|
||||||
|
inline AliasedBuffer<uint32_t, v8::Uint32Array>&
|
||||||
|
Environment::ImmediateInfo::fields() {
|
||||||
|
return fields_;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline uint32_t Environment::ImmediateInfo::count() const {
|
||||||
|
return fields_[kCount];
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool Environment::ImmediateInfo::has_outstanding() const {
|
||||||
|
return fields_[kHasOutstanding] == 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void Environment::ImmediateInfo::count_inc(uint32_t increment) {
|
||||||
|
fields_[kCount] = fields_[kCount] + increment;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) {
|
||||||
|
fields_[kCount] = fields_[kCount] - decrement;
|
||||||
|
}
|
||||||
|
|
||||||
inline Environment::TickInfo::TickInfo(v8::Isolate* isolate)
|
inline Environment::TickInfo::TickInfo(v8::Isolate* isolate)
|
||||||
: fields_(isolate, kFieldsCount) {}
|
: fields_(isolate, kFieldsCount) {}
|
||||||
|
|
||||||
@ -263,6 +287,7 @@ inline Environment::Environment(IsolateData* isolate_data,
|
|||||||
v8::Local<v8::Context> context)
|
v8::Local<v8::Context> context)
|
||||||
: isolate_(context->GetIsolate()),
|
: isolate_(context->GetIsolate()),
|
||||||
isolate_data_(isolate_data),
|
isolate_data_(isolate_data),
|
||||||
|
immediate_info_(context->GetIsolate()),
|
||||||
tick_info_(context->GetIsolate()),
|
tick_info_(context->GetIsolate()),
|
||||||
timer_base_(uv_now(isolate_data->event_loop())),
|
timer_base_(uv_now(isolate_data->event_loop())),
|
||||||
using_domains_(false),
|
using_domains_(false),
|
||||||
@ -271,7 +296,6 @@ inline Environment::Environment(IsolateData* isolate_data,
|
|||||||
abort_on_uncaught_exception_(false),
|
abort_on_uncaught_exception_(false),
|
||||||
emit_napi_warning_(true),
|
emit_napi_warning_(true),
|
||||||
makecallback_cntr_(0),
|
makecallback_cntr_(0),
|
||||||
scheduled_immediate_count_(isolate_, 1),
|
|
||||||
should_abort_on_uncaught_toggle_(isolate_, 1),
|
should_abort_on_uncaught_toggle_(isolate_, 1),
|
||||||
#if HAVE_INSPECTOR
|
#if HAVE_INSPECTOR
|
||||||
inspector_agent_(new inspector::Agent(this)),
|
inspector_agent_(new inspector::Agent(this)),
|
||||||
@ -371,6 +395,10 @@ inline Environment::AsyncHooks* Environment::async_hooks() {
|
|||||||
return &async_hooks_;
|
return &async_hooks_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline Environment::ImmediateInfo* Environment::immediate_info() {
|
||||||
|
return &immediate_info_;
|
||||||
|
}
|
||||||
|
|
||||||
inline Environment::TickInfo* Environment::tick_info() {
|
inline Environment::TickInfo* Environment::tick_info() {
|
||||||
return &tick_info_;
|
return &tick_info_;
|
||||||
}
|
}
|
||||||
@ -508,11 +536,6 @@ inline void Environment::set_fs_stats_field_array(double* fields) {
|
|||||||
fs_stats_field_array_ = fields;
|
fs_stats_field_array_ = fields;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline AliasedBuffer<uint32_t, v8::Uint32Array>&
|
|
||||||
Environment::scheduled_immediate_count() {
|
|
||||||
return scheduled_immediate_count_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void Environment::SetImmediate(native_immediate_callback cb,
|
void Environment::SetImmediate(native_immediate_callback cb,
|
||||||
void* data,
|
void* data,
|
||||||
v8::Local<v8::Object> obj) {
|
v8::Local<v8::Object> obj) {
|
||||||
@ -522,9 +545,9 @@ void Environment::SetImmediate(native_immediate_callback cb,
|
|||||||
std::unique_ptr<v8::Persistent<v8::Object>>(
|
std::unique_ptr<v8::Persistent<v8::Object>>(
|
||||||
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
|
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
|
||||||
});
|
});
|
||||||
if (scheduled_immediate_count_[0] == 0)
|
if (immediate_info()->count() == 0)
|
||||||
ActivateImmediateCheck();
|
ActivateImmediateCheck();
|
||||||
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1;
|
immediate_info()->count_inc(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline performance::performance_state* Environment::performance_state() {
|
inline performance::performance_state* Environment::performance_state() {
|
||||||
|
20
src/env.cc
20
src/env.cc
@ -283,14 +283,14 @@ void Environment::RunAndClearNativeImmediates() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
CHECK_GE(scheduled_immediate_count_[0], count);
|
CHECK_GE(immediate_info()->count(), count);
|
||||||
#endif
|
#endif
|
||||||
scheduled_immediate_count_[0] = scheduled_immediate_count_[0] - count;
|
immediate_info()->count_dec(count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool MaybeStopImmediate(Environment* env) {
|
static bool MaybeStopImmediate(Environment* env) {
|
||||||
if (env->scheduled_immediate_count()[0] == 0) {
|
if (env->immediate_info()->count() == 0) {
|
||||||
uv_check_stop(env->immediate_check_handle());
|
uv_check_stop(env->immediate_check_handle());
|
||||||
uv_idle_stop(env->immediate_idle_handle());
|
uv_idle_stop(env->immediate_idle_handle());
|
||||||
return true;
|
return true;
|
||||||
@ -309,12 +309,14 @@ void Environment::CheckImmediate(uv_check_t* handle) {
|
|||||||
|
|
||||||
env->RunAndClearNativeImmediates();
|
env->RunAndClearNativeImmediates();
|
||||||
|
|
||||||
MakeCallback(env->isolate(),
|
do {
|
||||||
env->process_object(),
|
MakeCallback(env->isolate(),
|
||||||
env->immediate_callback_function(),
|
env->process_object(),
|
||||||
0,
|
env->immediate_callback_function(),
|
||||||
nullptr,
|
0,
|
||||||
{0, 0}).ToLocalChecked();
|
nullptr,
|
||||||
|
{0, 0}).ToLocalChecked();
|
||||||
|
} while (env->immediate_info()->has_outstanding());
|
||||||
|
|
||||||
MaybeStopImmediate(env);
|
MaybeStopImmediate(env);
|
||||||
}
|
}
|
||||||
|
29
src/env.h
29
src/env.h
@ -453,6 +453,30 @@ class Environment {
|
|||||||
DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope);
|
DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class ImmediateInfo {
|
||||||
|
public:
|
||||||
|
inline AliasedBuffer<uint32_t, v8::Uint32Array>& fields();
|
||||||
|
inline uint32_t count() const;
|
||||||
|
inline bool has_outstanding() const;
|
||||||
|
|
||||||
|
inline void count_inc(uint32_t increment);
|
||||||
|
inline void count_dec(uint32_t decrement);
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class Environment; // So we can call the constructor.
|
||||||
|
inline explicit ImmediateInfo(v8::Isolate* isolate);
|
||||||
|
|
||||||
|
enum Fields {
|
||||||
|
kCount,
|
||||||
|
kHasOutstanding,
|
||||||
|
kFieldsCount
|
||||||
|
};
|
||||||
|
|
||||||
|
AliasedBuffer<uint32_t, v8::Uint32Array> fields_;
|
||||||
|
|
||||||
|
DISALLOW_COPY_AND_ASSIGN(ImmediateInfo);
|
||||||
|
};
|
||||||
|
|
||||||
class TickInfo {
|
class TickInfo {
|
||||||
public:
|
public:
|
||||||
inline AliasedBuffer<uint8_t, v8::Uint8Array>& fields();
|
inline AliasedBuffer<uint8_t, v8::Uint8Array>& fields();
|
||||||
@ -532,6 +556,7 @@ class Environment {
|
|||||||
inline void FinishHandleCleanup(uv_handle_t* handle);
|
inline void FinishHandleCleanup(uv_handle_t* handle);
|
||||||
|
|
||||||
inline AsyncHooks* async_hooks();
|
inline AsyncHooks* async_hooks();
|
||||||
|
inline ImmediateInfo* immediate_info();
|
||||||
inline TickInfo* tick_info();
|
inline TickInfo* tick_info();
|
||||||
inline uint64_t timer_base() const;
|
inline uint64_t timer_base() const;
|
||||||
|
|
||||||
@ -582,8 +607,6 @@ class Environment {
|
|||||||
inline double* fs_stats_field_array() const;
|
inline double* fs_stats_field_array() const;
|
||||||
inline void set_fs_stats_field_array(double* fields);
|
inline void set_fs_stats_field_array(double* fields);
|
||||||
|
|
||||||
inline AliasedBuffer<uint32_t, v8::Uint32Array>& scheduled_immediate_count();
|
|
||||||
|
|
||||||
inline performance::performance_state* performance_state();
|
inline performance::performance_state* performance_state();
|
||||||
inline std::map<std::string, uint64_t>* performance_marks();
|
inline std::map<std::string, uint64_t>* performance_marks();
|
||||||
|
|
||||||
@ -704,6 +727,7 @@ class Environment {
|
|||||||
uv_check_t idle_check_handle_;
|
uv_check_t idle_check_handle_;
|
||||||
|
|
||||||
AsyncHooks async_hooks_;
|
AsyncHooks async_hooks_;
|
||||||
|
ImmediateInfo immediate_info_;
|
||||||
TickInfo tick_info_;
|
TickInfo tick_info_;
|
||||||
const uint64_t timer_base_;
|
const uint64_t timer_base_;
|
||||||
bool using_domains_;
|
bool using_domains_;
|
||||||
@ -714,7 +738,6 @@ class Environment {
|
|||||||
size_t makecallback_cntr_;
|
size_t makecallback_cntr_;
|
||||||
std::vector<double> destroy_async_id_list_;
|
std::vector<double> destroy_async_id_list_;
|
||||||
|
|
||||||
AliasedBuffer<uint32_t, v8::Uint32Array> scheduled_immediate_count_;
|
|
||||||
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;
|
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;
|
||||||
|
|
||||||
int should_not_abort_scope_counter_ = 0;
|
int should_not_abort_scope_counter_ = 0;
|
||||||
|
@ -90,8 +90,9 @@ class TimerWrap : public HandleWrap {
|
|||||||
env->NewFunctionTemplate(activate_cb)->GetFunction(env->context())
|
env->NewFunctionTemplate(activate_cb)->GetFunction(env->context())
|
||||||
.ToLocalChecked();
|
.ToLocalChecked();
|
||||||
auto result = Array::New(env->isolate(), 2);
|
auto result = Array::New(env->isolate(), 2);
|
||||||
result->Set(0, activate_function);
|
result->Set(env->context(), 0, activate_function).FromJust();
|
||||||
result->Set(1, env->scheduled_immediate_count().GetJSArray());
|
result->Set(env->context(), 1,
|
||||||
|
env->immediate_info()->fields().GetJSArray()).FromJust();
|
||||||
args.GetReturnValue().Set(result);
|
args.GetReturnValue().Set(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
53
test/parallel/test-timers-immediate-queue-throw.js
Normal file
53
test/parallel/test-timers-immediate-queue-throw.js
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const common = require('../common');
|
||||||
|
const assert = require('assert');
|
||||||
|
const domain = require('domain');
|
||||||
|
|
||||||
|
// setImmediate should run clear its queued cbs once per event loop turn
|
||||||
|
// but immediates queued while processing the current queue should happen
|
||||||
|
// on the next turn of the event loop.
|
||||||
|
|
||||||
|
// In addition, if any setImmediate throws, the rest of the queue should
|
||||||
|
// be processed after all error handling is resolved, but that queue
|
||||||
|
// should not include any setImmediate calls scheduled after the
|
||||||
|
// processing of the queue started.
|
||||||
|
|
||||||
|
let threw = false;
|
||||||
|
let stage = -1;
|
||||||
|
|
||||||
|
const QUEUE = 10;
|
||||||
|
|
||||||
|
const errObj = {
|
||||||
|
type: Error,
|
||||||
|
message: 'setImmediate Err'
|
||||||
|
};
|
||||||
|
|
||||||
|
process.once('uncaughtException', common.expectsError(errObj));
|
||||||
|
process.once('uncaughtException', () => assert.strictEqual(stage, 0));
|
||||||
|
|
||||||
|
const d1 = domain.create();
|
||||||
|
d1.once('error', common.expectsError(errObj));
|
||||||
|
d1.once('error', () => assert.strictEqual(stage, 0));
|
||||||
|
|
||||||
|
const run = common.mustCall((callStage) => {
|
||||||
|
assert(callStage >= stage);
|
||||||
|
stage = callStage;
|
||||||
|
if (threw)
|
||||||
|
return;
|
||||||
|
|
||||||
|
setImmediate(run, 2);
|
||||||
|
}, QUEUE * 3);
|
||||||
|
|
||||||
|
for (let i = 0; i < QUEUE; i++)
|
||||||
|
setImmediate(run, 0);
|
||||||
|
setImmediate(() => {
|
||||||
|
threw = true;
|
||||||
|
process.nextTick(() => assert.strictEqual(stage, 1));
|
||||||
|
throw new Error('setImmediate Err');
|
||||||
|
});
|
||||||
|
d1.run(() => setImmediate(() => {
|
||||||
|
throw new Error('setImmediate Err');
|
||||||
|
}));
|
||||||
|
for (let i = 0; i < QUEUE; i++)
|
||||||
|
setImmediate(run, 1);
|
Loading…
x
Reference in New Issue
Block a user