timers: allow Immediates to be unrefed

Refactor Immediates handling to allow for them to be unrefed, similar
to setTimeout, but without extra handles.

Document the new `immediate.ref()` and `immediate.unref()` methods.

Add SetImmediateUnref on the C++ side.

PR-URL: https://github.com/nodejs/node/pull/18139
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anatoli Papirovski 2018-01-13 16:51:28 -05:00
parent 7809f386b0
commit c1234673bb
No known key found for this signature in database
GPG Key ID: 614E2E1ABEB4B2C0
10 changed files with 249 additions and 102 deletions

View File

@ -18,6 +18,38 @@ This object is created internally and is returned from [`setImmediate()`][]. It
can be passed to [`clearImmediate()`][] in order to cancel the scheduled
actions.
By default, when an immediate is scheduled, the Node.js event loop will continue
running as long as the immediate is active. The `Immediate` object returned by
[`setImmediate()`][] exports both `immediate.ref()` and `immediate.unref()`
functions that can be used to control this default behavior.
### immediate.ref()
<!-- YAML
added: REPLACEME
-->
When called, requests that the Node.js event loop *not* exit so long as the
`Immediate` is active. Calling `immediate.ref()` multiple times will have no
effect.
*Note*: By default, all `Immediate` objects are "ref'd", making it normally
unnecessary to call `immediate.ref()` unless `immediate.unref()` had been called
previously.
Returns a reference to the `Immediate`.
### immediate.unref()
<!-- YAML
added: REPLACEME
-->
When called, the active `Immediate` object will not require the Node.js event
loop to remain active. If there is no other activity keeping the event loop
running, the process may exit before the `Immediate` object's callback is
invoked. Calling `immediate.unref()` multiple times will have no effect.
Returns a reference to the `Immediate`.
## Class: Timeout
This object is created internally and is returned from [`setTimeout()`][] and

View File

@ -53,11 +53,14 @@ const trigger_async_id_symbol = timerInternals.trigger_async_id_symbol;
// *Must* match Environment::ImmediateInfo::Fields in src/env.h.
const kCount = 0;
const kHasOutstanding = 1;
const kRefCount = 1;
const kHasOutstanding = 2;
const [activateImmediateCheck, immediateInfo] =
const [immediateInfo, toggleImmediateRef] =
setImmediateCallback(processImmediate);
const kRefed = Symbol('refed');
// The Timeout class
const Timeout = timerInternals.Timeout;
@ -656,42 +659,41 @@ function processImmediate() {
const queue = outstandingQueue.head !== null ?
outstandingQueue : immediateQueue;
var immediate = queue.head;
var tail = queue.tail;
const tail = queue.tail;
// Clear the linked list early in case new `setImmediate()` calls occur while
// immediate callbacks are executed
queue.head = queue.tail = null;
while (immediate !== null) {
if (!immediate._onImmediate) {
immediate = immediate._idleNext;
continue;
}
let count = 0;
let refCount = 0;
// Save next in case `clearImmediate(immediate)` is called from callback
const next = immediate._idleNext;
while (immediate !== null) {
immediate._destroyed = true;
const asyncId = immediate[async_id_symbol];
emitBefore(asyncId, immediate[trigger_async_id_symbol]);
tryOnImmediate(immediate, next, tail);
count++;
if (immediate[kRefed])
refCount++;
immediate[kRefed] = undefined;
tryOnImmediate(immediate, tail, count, refCount);
emitAfter(asyncId);
// If `clearImmediate(immediate)` wasn't called from the callback, use the
// `immediate`'s next item
if (immediate._idleNext !== null)
immediate = immediate._idleNext;
else
immediate = next;
immediate = immediate._idleNext;
}
immediateInfo[kCount] -= count;
immediateInfo[kRefCount] -= refCount;
immediateInfo[kHasOutstanding] = 0;
}
// An optimization so that the try/finally only de-optimizes (since at least v8
// 4.7) what is in this smaller function.
function tryOnImmediate(immediate, next, oldTail) {
function tryOnImmediate(immediate, oldTail, count, refCount) {
var threw = true;
try {
// make the actual call outside the try/finally to allow it to be optimized
@ -700,21 +702,21 @@ function tryOnImmediate(immediate, next, oldTail) {
} finally {
immediate._onImmediate = null;
if (!immediate._destroyed) {
immediate._destroyed = true;
immediateInfo[kCount]--;
if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}
if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}
if (threw && (immediate._idleNext !== null || next !== null)) {
// Handle any remaining Immediates after error handling has resolved,
// assuming we're still alive to do so.
outstandingQueue.head = immediate._idleNext || next;
outstandingQueue.tail = oldTail;
immediateInfo[kHasOutstanding] = 1;
if (threw) {
immediateInfo[kCount] -= count;
immediateInfo[kRefCount] -= refCount;
if (immediate._idleNext !== null) {
// Handle any remaining Immediates after error handling has resolved,
// assuming we're still alive to do so.
outstandingQueue.head = immediate._idleNext;
outstandingQueue.tail = oldTail;
immediateInfo[kHasOutstanding] = 1;
}
}
}
}
@ -729,31 +731,51 @@ function runCallback(timer) {
}
function Immediate(callback, args) {
this._idleNext = null;
this._idlePrev = null;
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this._onImmediate = null;
this._onImmediate = callback;
this._argv = args;
this._destroyed = false;
const Immediate = class Immediate {
constructor(callback, args) {
this._idleNext = null;
this._idlePrev = null;
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this._onImmediate = null;
this._onImmediate = callback;
this._argv = args;
this._destroyed = false;
this[kRefed] = false;
this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
if (async_hook_fields[kInit] > 0) {
emitInit(this[async_id_symbol],
'Immediate',
this[trigger_async_id_symbol],
this);
this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
if (async_hook_fields[kInit] > 0) {
emitInit(this[async_id_symbol],
'Immediate',
this[trigger_async_id_symbol],
this);
}
this.ref();
immediateInfo[kCount]++;
immediateQueue.append(this);
}
if (immediateInfo[kCount] === 0)
activateImmediateCheck();
immediateInfo[kCount]++;
ref() {
if (this[kRefed] === false) {
this[kRefed] = true;
if (immediateInfo[kRefCount]++ === 0)
toggleImmediateRef(true);
}
return this;
}
immediateQueue.append(this);
}
unref() {
if (this[kRefed] === true) {
this[kRefed] = false;
if (--immediateInfo[kRefCount] === 0)
toggleImmediateRef(false);
}
return this;
}
};
function setImmediate(callback, arg1, arg2, arg3) {
if (typeof callback !== 'function') {
@ -793,15 +815,18 @@ exports.setImmediate = setImmediate;
exports.clearImmediate = function(immediate) {
if (!immediate) return;
if (!immediate || immediate._destroyed)
return;
if (!immediate._destroyed) {
immediateInfo[kCount]--;
immediate._destroyed = true;
immediateInfo[kCount]--;
immediate._destroyed = true;
if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}
if (immediate[kRefed] && --immediateInfo[kRefCount] === 0)
toggleImmediateRef(false);
immediate[kRefed] = undefined;
if (async_hook_fields[kDestroy] > 0) {
emitDestroy(immediate[async_id_symbol]);
}
immediate._onImmediate = null;

View File

@ -229,6 +229,10 @@ inline uint32_t Environment::ImmediateInfo::count() const {
return fields_[kCount];
}
inline uint32_t Environment::ImmediateInfo::ref_count() const {
return fields_[kRefCount];
}
inline bool Environment::ImmediateInfo::has_outstanding() const {
return fields_[kHasOutstanding] == 1;
}
@ -241,6 +245,14 @@ inline void Environment::ImmediateInfo::count_dec(uint32_t decrement) {
fields_[kCount] = fields_[kCount] - decrement;
}
inline void Environment::ImmediateInfo::ref_count_inc(uint32_t increment) {
fields_[kRefCount] = fields_[kRefCount] + increment;
}
inline void Environment::ImmediateInfo::ref_count_dec(uint32_t decrement) {
fields_[kRefCount] = fields_[kRefCount] - decrement;
}
inline Environment::TickInfo::TickInfo(v8::Isolate* isolate)
: fields_(isolate, kFieldsCount) {}
@ -536,20 +548,36 @@ inline void Environment::set_fs_stats_field_array(double* fields) {
fs_stats_field_array_ = fields;
}
void Environment::SetImmediate(native_immediate_callback cb,
void Environment::CreateImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
v8::Local<v8::Object> obj,
bool ref) {
native_immediate_callbacks_.push_back({
cb,
data,
std::unique_ptr<v8::Persistent<v8::Object>>(
obj.IsEmpty() ? nullptr : new v8::Persistent<v8::Object>(isolate_, obj))
std::unique_ptr<v8::Persistent<v8::Object>>(obj.IsEmpty() ?
nullptr : new v8::Persistent<v8::Object>(isolate_, obj)),
ref
});
if (immediate_info()->count() == 0)
ActivateImmediateCheck();
immediate_info()->count_inc(1);
}
void Environment::SetImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
CreateImmediate(cb, data, obj, true);
if (immediate_info()->ref_count() == 0)
ToggleImmediateRef(true);
immediate_info()->ref_count_inc(1);
}
void Environment::SetUnrefImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj) {
CreateImmediate(cb, data, obj, false);
}
inline performance::performance_state* Environment::performance_state() {
return performance_state_;
}

View File

@ -82,6 +82,8 @@ void Environment::Start(int argc,
uv_idle_init(event_loop(), immediate_idle_handle());
uv_check_start(immediate_check_handle(), CheckImmediate);
// Inform V8's CPU profiler when we're idle. The profiler is sampling-based
// but not all samples are created equal; mark the wall clock time spent in
// epoll_wait() and friends so profiling tools can filter it out. The samples
@ -274,39 +276,35 @@ void Environment::EnvPromiseHook(v8::PromiseHookType type,
void Environment::RunAndClearNativeImmediates() {
size_t count = native_immediate_callbacks_.size();
if (count > 0) {
size_t ref_count = 0;
std::vector<NativeImmediateCallback> list;
native_immediate_callbacks_.swap(list);
for (const auto& cb : list) {
cb.cb_(this, cb.data_);
if (cb.keep_alive_)
cb.keep_alive_->Reset();
if (cb.refed_)
ref_count++;
}
#ifdef DEBUG
CHECK_GE(immediate_info()->count(), count);
#endif
immediate_info()->count_dec(count);
immediate_info()->ref_count_dec(ref_count);
}
}
static bool MaybeStopImmediate(Environment* env) {
if (env->immediate_info()->count() == 0) {
uv_check_stop(env->immediate_check_handle());
uv_idle_stop(env->immediate_idle_handle());
return true;
}
return false;
}
void Environment::CheckImmediate(uv_check_t* handle) {
Environment* env = Environment::from_immediate_check_handle(handle);
if (env->immediate_info()->count() == 0)
return;
HandleScope scope(env->isolate());
Context::Scope context_scope(env->context());
if (MaybeStopImmediate(env))
return;
env->RunAndClearNativeImmediates();
do {
@ -318,13 +316,17 @@ void Environment::CheckImmediate(uv_check_t* handle) {
{0, 0}).ToLocalChecked();
} while (env->immediate_info()->has_outstanding());
MaybeStopImmediate(env);
if (env->immediate_info()->ref_count() == 0)
env->ToggleImmediateRef(false);
}
void Environment::ActivateImmediateCheck() {
uv_check_start(&immediate_check_handle_, CheckImmediate);
// Idle handle is needed only to stop the event loop from blocking in poll.
uv_idle_start(&immediate_idle_handle_, [](uv_idle_t*){ });
void Environment::ToggleImmediateRef(bool ref) {
if (ref) {
// Idle handle is needed only to stop the event loop from blocking in poll.
uv_idle_start(immediate_idle_handle(), [](uv_idle_t*){ });
} else {
uv_idle_stop(immediate_idle_handle());
}
}

View File

@ -453,17 +453,22 @@ class Environment {
public:
inline AliasedBuffer<uint32_t, v8::Uint32Array>& fields();
inline uint32_t count() const;
inline uint32_t ref_count() const;
inline bool has_outstanding() const;
inline void count_inc(uint32_t increment);
inline void count_dec(uint32_t decrement);
inline void ref_count_inc(uint32_t increment);
inline void ref_count_dec(uint32_t decrement);
private:
friend class Environment; // So we can call the constructor.
inline explicit ImmediateInfo(v8::Isolate* isolate);
enum Fields {
kCount,
kRefCount,
kHasOutstanding,
kFieldsCount
};
@ -694,8 +699,12 @@ class Environment {
inline void SetImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj = v8::Local<v8::Object>());
inline void SetUnrefImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj =
v8::Local<v8::Object>());
// This needs to be available for the JS-land setImmediate().
void ActivateImmediateCheck();
void ToggleImmediateRef(bool ref);
class ShouldNotAbortOnUncaughtScope {
public:
@ -712,6 +721,11 @@ class Environment {
static inline Environment* ForAsyncHooks(AsyncHooks* hooks);
private:
inline void CreateImmediate(native_immediate_callback cb,
void* data,
v8::Local<v8::Object> obj,
bool ref);
inline void ThrowError(v8::Local<v8::Value> (*fun)(v8::Local<v8::String>),
const char* errmsg);
@ -776,6 +790,7 @@ class Environment {
native_immediate_callback cb_;
void* data_;
std::unique_ptr<v8::Persistent<v8::Object>> keep_alive_;
bool refed_;
};
std::vector<NativeImmediateCallback> native_immediate_callbacks_;
void RunAndClearNativeImmediates();

View File

@ -182,9 +182,8 @@ void SetupPerformanceObservers(const FunctionCallbackInfo<Value>& args) {
}
// Creates a GC Performance Entry and passes it to observers
void PerformanceGCCallback(uv_async_t* handle) {
GCPerformanceEntry* entry = static_cast<GCPerformanceEntry*>(handle->data);
Environment* env = entry->env();
void PerformanceGCCallback(Environment* env, void* ptr) {
GCPerformanceEntry* entry = static_cast<GCPerformanceEntry*>(ptr);
HandleScope scope(env->isolate());
Local<Context> context = env->context();
@ -201,10 +200,6 @@ void PerformanceGCCallback(uv_async_t* handle) {
}
delete entry;
auto closeCB = [](uv_handle_t* handle) {
delete reinterpret_cast<uv_async_t*>(handle);
};
uv_close(reinterpret_cast<uv_handle_t*>(handle), closeCB);
}
// Marks the start of a GC cycle
@ -221,16 +216,13 @@ void MarkGarbageCollectionEnd(Isolate* isolate,
v8::GCCallbackFlags flags,
void* data) {
Environment* env = static_cast<Environment*>(data);
uv_async_t* async = new uv_async_t();
if (uv_async_init(env->event_loop(), async, PerformanceGCCallback))
return delete async;
uv_unref(reinterpret_cast<uv_handle_t*>(async));
async->data =
GCPerformanceEntry* entry =
new GCPerformanceEntry(env,
static_cast<PerformanceGCKind>(type),
performance_last_gc_start_mark_,
PERFORMANCE_NOW());
CHECK_EQ(0, uv_async_send(async));
env->SetUnrefImmediate(PerformanceGCCallback,
entry);
}

View File

@ -83,16 +83,16 @@ class TimerWrap : public HandleWrap {
CHECK(args[0]->IsFunction());
auto env = Environment::GetCurrent(args);
env->set_immediate_callback_function(args[0].As<Function>());
auto activate_cb = [] (const FunctionCallbackInfo<Value>& args) {
Environment::GetCurrent(args)->ActivateImmediateCheck();
auto toggle_ref_cb = [] (const FunctionCallbackInfo<Value>& args) {
Environment::GetCurrent(args)->ToggleImmediateRef(args[0]->IsTrue());
};
auto activate_function =
env->NewFunctionTemplate(activate_cb)->GetFunction(env->context())
auto toggle_ref_function =
env->NewFunctionTemplate(toggle_ref_cb)->GetFunction(env->context())
.ToLocalChecked();
auto result = Array::New(env->isolate(), 2);
result->Set(env->context(), 0, activate_function).FromJust();
result->Set(env->context(), 1,
result->Set(env->context(), 0,
env->immediate_info()->fields().GetJSArray()).FromJust();
result->Set(env->context(), 1, toggle_ref_function).FromJust();
args.GetReturnValue().Set(result);
}

View File

@ -24,6 +24,15 @@ void* SetImmediate(napi_env env, T&& cb) {
assert(cb() != nullptr);
});
// Idle handle is needed only to stop the event loop from blocking in poll.
uv_idle_t* idle = new uv_idle_t;
uv_idle_init(loop, idle);
uv_idle_start(idle, [](uv_idle_t* idle) {
uv_close(reinterpret_cast<uv_handle_t*>(idle), [](uv_handle_t* handle) {
delete reinterpret_cast<uv_check_t*>(handle);
});
});
return nullptr;
}

View File

@ -0,0 +1,7 @@
'use strict';
const common = require('../common');
// This immediate should not execute as it was unrefed
// and nothing else is keeping the event loop alive
setImmediate(common.mustNotCall()).unref();

View File

@ -0,0 +1,37 @@
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
// This immediate should execute as it was unrefed and refed again.
// It also confirms that unref/ref are chainable.
setImmediate(common.mustCall(firstStep)).ref().unref().unref().ref();
function firstStep() {
const countdown =
new Countdown(2, common.mustCall(() => setImmediate(secondStep)));
// Unrefed setImmediate executes if it was unrefed but something else keeps
// the loop open
setImmediate(() => countdown.dec()).unref();
setTimeout(() => countdown.dec(), 50);
}
function secondStep() {
// clearImmediate works just fine with unref'd immediates
const immA = setImmediate(() => {
clearImmediate(immA);
clearImmediate(immB);
// this should not keep the event loop open indefinitely
// or do anything else weird
immA.ref();
immB.ref();
}).unref();
const immB = setImmediate(common.mustNotCall()).unref();
setImmediate(common.mustCall(finalStep));
}
function finalStep() {
// This immediate should not execute as it was unrefed
// and nothing else is keeping the event loop alive
setImmediate(common.mustNotCall()).unref();
}