domain: re-implement domain over async_hook
Domain core module has been re-implemented over async_hook. PR-URL: https://github.com/nodejs/node/pull/16222 Reviewed-By: Andreas Madsen <amwebdk@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
parent
2d50b5e537
commit
51e0948862
@ -28,7 +28,7 @@
|
||||
|
||||
const util = require('util');
|
||||
const EventEmitter = require('events');
|
||||
const { inherits } = util;
|
||||
const { createHook } = require('async_hooks');
|
||||
|
||||
// communicate with events module, but don't require that
|
||||
// module to have to load this one, since this module has
|
||||
@ -48,13 +48,54 @@ Object.defineProperty(process, 'domain', {
|
||||
}
|
||||
});
|
||||
|
||||
const pairing = new Map();
|
||||
const asyncHook = createHook({
|
||||
init(asyncId, type, triggerAsyncId, resource) {
|
||||
if (process.domain !== null && process.domain !== undefined) {
|
||||
// if this operation is created while in a domain, let's mark it
|
||||
pairing.set(asyncId, process.domain);
|
||||
resource.domain = process.domain;
|
||||
if (resource.promise !== undefined &&
|
||||
resource.promise instanceof Promise) {
|
||||
// resource.promise instanceof Promise make sure that the
|
||||
// promise comes from the same context
|
||||
// see https://github.com/nodejs/node/issues/15673
|
||||
resource.promise.domain = process.domain;
|
||||
}
|
||||
}
|
||||
},
|
||||
before(asyncId) {
|
||||
const current = pairing.get(asyncId);
|
||||
if (current !== undefined) { // enter domain for this cb
|
||||
current.enter();
|
||||
}
|
||||
},
|
||||
after(asyncId) {
|
||||
const current = pairing.get(asyncId);
|
||||
if (current !== undefined) { // exit domain for this cb
|
||||
current.exit();
|
||||
}
|
||||
},
|
||||
destroy(asyncId) {
|
||||
pairing.delete(asyncId); // cleaning up
|
||||
}
|
||||
});
|
||||
|
||||
// It's possible to enter one domain while already inside
|
||||
// another one. The stack is each entered domain.
|
||||
const stack = [];
|
||||
exports._stack = stack;
|
||||
process._setupDomainUse(stack);
|
||||
|
||||
// let the process know we're using domains
|
||||
const _domain_flag = process._setupDomainUse(_domain, stack);
|
||||
class Domain extends EventEmitter {
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
|
||||
this.members = [];
|
||||
asyncHook.enable();
|
||||
}
|
||||
}
|
||||
|
||||
exports.Domain = Domain;
|
||||
|
||||
@ -64,19 +105,8 @@ exports.create = exports.createDomain = function() {
|
||||
|
||||
// the active domain is always the one that we're currently in.
|
||||
exports.active = null;
|
||||
|
||||
|
||||
inherits(Domain, EventEmitter);
|
||||
|
||||
function Domain() {
|
||||
EventEmitter.call(this);
|
||||
|
||||
this.members = [];
|
||||
}
|
||||
|
||||
Domain.prototype.members = undefined;
|
||||
|
||||
|
||||
// Called by process._fatalException in case an error was thrown.
|
||||
Domain.prototype._errorHandler = function _errorHandler(er) {
|
||||
var caught = false;
|
||||
@ -155,7 +185,6 @@ Domain.prototype.enter = function() {
|
||||
// to push it onto the stack so that we can pop it later.
|
||||
exports.active = process.domain = this;
|
||||
stack.push(this);
|
||||
_domain_flag[0] = stack.length;
|
||||
};
|
||||
|
||||
|
||||
@ -166,7 +195,6 @@ Domain.prototype.exit = function() {
|
||||
|
||||
// exit all domains until this one.
|
||||
stack.splice(index);
|
||||
_domain_flag[0] = stack.length;
|
||||
|
||||
exports.active = stack[stack.length - 1];
|
||||
process.domain = exports.active;
|
||||
|
@ -73,7 +73,6 @@ function setupNextTick() {
|
||||
process.nextTick = nextTick;
|
||||
// Needs to be accessible from beyond this scope.
|
||||
process._tickCallback = _tickCallback;
|
||||
process._tickDomainCallback = _tickDomainCallback;
|
||||
|
||||
// Set the nextTick() function for internal usage.
|
||||
exports.nextTick = internalNextTick;
|
||||
@ -190,46 +189,6 @@ function setupNextTick() {
|
||||
} while (tickInfo[kLength] !== 0);
|
||||
}
|
||||
|
||||
function _tickDomainCallback() {
|
||||
do {
|
||||
while (tickInfo[kIndex] < tickInfo[kLength]) {
|
||||
++tickInfo[kIndex];
|
||||
const tock = nextTickQueue.shift();
|
||||
const callback = tock.callback;
|
||||
const domain = tock.domain;
|
||||
const args = tock.args;
|
||||
if (domain)
|
||||
domain.enter();
|
||||
|
||||
// CHECK(Number.isSafeInteger(tock[async_id_symbol]))
|
||||
// CHECK(tock[async_id_symbol] > 0)
|
||||
// CHECK(Number.isSafeInteger(tock[trigger_async_id_symbol]))
|
||||
// CHECK(tock[trigger_async_id_symbol] > 0)
|
||||
|
||||
emitBefore(tock[async_id_symbol], tock[trigger_async_id_symbol]);
|
||||
// TODO(trevnorris): See comment in _tickCallback() as to why this
|
||||
// isn't a good solution.
|
||||
if (async_hook_fields[kDestroy] > 0)
|
||||
emitDestroy(tock[async_id_symbol]);
|
||||
|
||||
// Using separate callback execution functions allows direct
|
||||
// callback invocation with small numbers of arguments to avoid the
|
||||
// performance hit associated with using `fn.apply()`
|
||||
_combinedTickCallback(args, callback);
|
||||
|
||||
emitAfter(tock[async_id_symbol]);
|
||||
|
||||
if (kMaxCallbacksPerLoop < tickInfo[kIndex])
|
||||
tickDone();
|
||||
if (domain)
|
||||
domain.exit();
|
||||
}
|
||||
tickDone();
|
||||
_runMicrotasks();
|
||||
emitPendingUnhandledRejections();
|
||||
} while (tickInfo[kLength] !== 0);
|
||||
}
|
||||
|
||||
class TickObject {
|
||||
constructor(callback, args, asyncId, triggerAsyncId) {
|
||||
this.callback = callback;
|
||||
|
@ -596,12 +596,6 @@ class QueryWrap : public AsyncWrap {
|
||||
QueryWrap(ChannelWrap* channel, Local<Object> req_wrap_obj)
|
||||
: AsyncWrap(channel->env(), req_wrap_obj, AsyncWrap::PROVIDER_QUERYWRAP),
|
||||
channel_(channel) {
|
||||
if (env()->in_domain()) {
|
||||
req_wrap_obj->Set(env()->domain_string(),
|
||||
env()->domain_array()->Get(env()->context(), 0)
|
||||
.ToLocalChecked());
|
||||
}
|
||||
|
||||
Wrap(req_wrap_obj, this);
|
||||
|
||||
// Make sure the channel object stays alive during the query lifetime.
|
||||
|
@ -200,22 +200,6 @@ inline bool Environment::AsyncCallbackScope::in_makecallback() const {
|
||||
return env_->makecallback_cntr_ > 1;
|
||||
}
|
||||
|
||||
inline Environment::DomainFlag::DomainFlag() {
|
||||
for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
|
||||
}
|
||||
|
||||
inline uint32_t* Environment::DomainFlag::fields() {
|
||||
return fields_;
|
||||
}
|
||||
|
||||
inline int Environment::DomainFlag::fields_count() const {
|
||||
return kFieldsCount;
|
||||
}
|
||||
|
||||
inline uint32_t Environment::DomainFlag::count() const {
|
||||
return fields_[kCount];
|
||||
}
|
||||
|
||||
inline Environment::TickInfo::TickInfo() {
|
||||
for (int i = 0; i < kFieldsCount; ++i)
|
||||
fields_[i] = 0;
|
||||
@ -348,12 +332,6 @@ inline v8::Isolate* Environment::isolate() const {
|
||||
return isolate_;
|
||||
}
|
||||
|
||||
inline bool Environment::in_domain() const {
|
||||
// The const_cast is okay, it doesn't violate conceptual const-ness.
|
||||
return using_domains() &&
|
||||
const_cast<Environment*>(this)->domain_flag()->count() > 0;
|
||||
}
|
||||
|
||||
inline Environment* Environment::from_immediate_check_handle(
|
||||
uv_check_t* handle) {
|
||||
return ContainerOf(&Environment::immediate_check_handle_, handle);
|
||||
@ -394,10 +372,6 @@ inline Environment::AsyncHooks* Environment::async_hooks() {
|
||||
return &async_hooks_;
|
||||
}
|
||||
|
||||
inline Environment::DomainFlag* Environment::domain_flag() {
|
||||
return &domain_flag_;
|
||||
}
|
||||
|
||||
inline Environment::TickInfo* Environment::tick_info() {
|
||||
return &tick_info_;
|
||||
}
|
||||
|
24
src/env.h
24
src/env.h
@ -309,7 +309,6 @@ class ModuleWrap;
|
||||
V(internal_binding_cache_object, v8::Object) \
|
||||
V(buffer_prototype_object, v8::Object) \
|
||||
V(context, v8::Context) \
|
||||
V(domain_array, v8::Array) \
|
||||
V(domains_stack_array, v8::Array) \
|
||||
V(inspector_console_api_object, v8::Object) \
|
||||
V(module_load_list_array, v8::Array) \
|
||||
@ -472,26 +471,6 @@ class Environment {
|
||||
DISALLOW_COPY_AND_ASSIGN(AsyncCallbackScope);
|
||||
};
|
||||
|
||||
class DomainFlag {
|
||||
public:
|
||||
inline uint32_t* fields();
|
||||
inline int fields_count() const;
|
||||
inline uint32_t count() const;
|
||||
|
||||
private:
|
||||
friend class Environment; // So we can call the constructor.
|
||||
inline DomainFlag();
|
||||
|
||||
enum Fields {
|
||||
kCount,
|
||||
kFieldsCount
|
||||
};
|
||||
|
||||
uint32_t fields_[kFieldsCount];
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(DomainFlag);
|
||||
};
|
||||
|
||||
class TickInfo {
|
||||
public:
|
||||
inline uint32_t* fields();
|
||||
@ -560,7 +539,6 @@ class Environment {
|
||||
|
||||
inline v8::Isolate* isolate() const;
|
||||
inline uv_loop_t* event_loop() const;
|
||||
inline bool in_domain() const;
|
||||
inline uint32_t watched_providers() const;
|
||||
|
||||
static inline Environment* from_immediate_check_handle(uv_check_t* handle);
|
||||
@ -577,7 +555,6 @@ class Environment {
|
||||
inline void FinishHandleCleanup(uv_handle_t* handle);
|
||||
|
||||
inline AsyncHooks* async_hooks();
|
||||
inline DomainFlag* domain_flag();
|
||||
inline TickInfo* tick_info();
|
||||
inline uint64_t timer_base() const;
|
||||
|
||||
@ -722,7 +699,6 @@ class Environment {
|
||||
uv_check_t idle_check_handle_;
|
||||
|
||||
AsyncHooks async_hooks_;
|
||||
DomainFlag domain_flag_;
|
||||
TickInfo tick_info_;
|
||||
const uint64_t timer_base_;
|
||||
bool using_domains_;
|
||||
|
65
src/node.cc
65
src/node.cc
@ -160,7 +160,6 @@ using v8::Number;
|
||||
using v8::Object;
|
||||
using v8::ObjectTemplate;
|
||||
using v8::Promise;
|
||||
using v8::PromiseHookType;
|
||||
using v8::PromiseRejectMessage;
|
||||
using v8::PropertyCallbackInfo;
|
||||
using v8::ScriptOrigin;
|
||||
@ -842,7 +841,6 @@ bool ShouldAbortOnUncaughtException(Isolate* isolate) {
|
||||
return isEmittingTopLevelDomainError || !DomainsStackHasErrorHandler(env);
|
||||
}
|
||||
|
||||
|
||||
Local<Value> GetDomainProperty(Environment* env, Local<Object> object) {
|
||||
Local<Value> domain_v =
|
||||
object->GetPrivate(env->context(), env->domain_private_symbol())
|
||||
@ -883,36 +881,6 @@ void DomainExit(Environment* env, v8::Local<v8::Object> object) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DomainPromiseHook(PromiseHookType type,
|
||||
Local<Promise> promise,
|
||||
Local<Value> parent,
|
||||
void* arg) {
|
||||
Environment* env = static_cast<Environment*>(arg);
|
||||
Local<Context> context = env->context();
|
||||
|
||||
if (type == PromiseHookType::kInit && env->in_domain()) {
|
||||
Local<Value> domain_obj =
|
||||
env->domain_array()->Get(context, 0).ToLocalChecked();
|
||||
if (promise->CreationContext() == context) {
|
||||
promise->Set(context, env->domain_string(), domain_obj).FromJust();
|
||||
} else {
|
||||
// Do not expose object from another context publicly in promises created
|
||||
// in non-main contexts.
|
||||
promise->SetPrivate(context, env->domain_private_symbol(), domain_obj)
|
||||
.FromJust();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (type == PromiseHookType::kBefore) {
|
||||
DomainEnter(env, promise);
|
||||
} else if (type == PromiseHookType::kAfter) {
|
||||
DomainExit(env, promise);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
|
||||
@ -923,38 +891,13 @@ void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
|
||||
HandleScope scope(env->isolate());
|
||||
Local<Object> process_object = env->process_object();
|
||||
|
||||
Local<String> tick_callback_function_key = env->tick_domain_cb_string();
|
||||
Local<Function> tick_callback_function =
|
||||
process_object->Get(tick_callback_function_key).As<Function>();
|
||||
|
||||
if (!tick_callback_function->IsFunction()) {
|
||||
fprintf(stderr, "process._tickDomainCallback assigned to non-function\n");
|
||||
ABORT();
|
||||
}
|
||||
|
||||
process_object->Set(env->tick_callback_string(), tick_callback_function);
|
||||
env->set_tick_callback_function(tick_callback_function);
|
||||
|
||||
CHECK(args[0]->IsArray());
|
||||
env->set_domain_array(args[0].As<Array>());
|
||||
|
||||
CHECK(args[1]->IsArray());
|
||||
env->set_domains_stack_array(args[1].As<Array>());
|
||||
env->set_domains_stack_array(args[0].As<Array>());
|
||||
|
||||
// Do a little housekeeping.
|
||||
env->process_object()->Delete(
|
||||
env->context(),
|
||||
FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse")).FromJust();
|
||||
|
||||
uint32_t* const fields = env->domain_flag()->fields();
|
||||
uint32_t const fields_count = env->domain_flag()->fields_count();
|
||||
|
||||
Local<ArrayBuffer> array_buffer =
|
||||
ArrayBuffer::New(env->isolate(), fields, sizeof(*fields) * fields_count);
|
||||
|
||||
env->AddPromiseHook(DomainPromiseHook, static_cast<void*>(env));
|
||||
|
||||
args.GetReturnValue().Set(Uint32Array::New(array_buffer, 0, fields_count));
|
||||
}
|
||||
|
||||
|
||||
@ -1078,7 +1021,8 @@ InternalCallbackScope::InternalCallbackScope(Environment* env,
|
||||
// If you hit this assertion, you forgot to enter the v8::Context first.
|
||||
CHECK_EQ(Environment::GetCurrent(env->isolate()), env);
|
||||
|
||||
if (env->using_domains() && !object_.IsEmpty()) {
|
||||
if (asyncContext.async_id == 0 && env->using_domains() &&
|
||||
!object_.IsEmpty()) {
|
||||
DomainEnter(env, object_);
|
||||
}
|
||||
|
||||
@ -1111,7 +1055,8 @@ void InternalCallbackScope::Close() {
|
||||
AsyncWrap::EmitAfter(env_, async_context_.async_id);
|
||||
}
|
||||
|
||||
if (env_->using_domains() && !object_.IsEmpty()) {
|
||||
if (async_context_.async_id == 0 && env_->using_domains() &&
|
||||
!object_.IsEmpty()) {
|
||||
DomainExit(env_, object_);
|
||||
}
|
||||
|
||||
|
@ -5593,13 +5593,6 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
|
||||
if (args[5]->IsFunction()) {
|
||||
obj->Set(env->ondone_string(), args[5]);
|
||||
|
||||
if (env->in_domain()) {
|
||||
obj->Set(env->context(),
|
||||
env->domain_string(),
|
||||
env->domain_array()->Get(env->context(), 0).ToLocalChecked())
|
||||
.FromJust();
|
||||
}
|
||||
|
||||
uv_queue_work(env->event_loop(),
|
||||
req.release()->work_req(),
|
||||
PBKDF2Request::Work,
|
||||
@ -5788,13 +5781,6 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
|
||||
if (args[1]->IsFunction()) {
|
||||
obj->Set(env->ondone_string(), args[1]);
|
||||
|
||||
if (env->in_domain()) {
|
||||
obj->Set(env->context(),
|
||||
env->domain_string(),
|
||||
env->domain_array()->Get(env->context(), 0).ToLocalChecked())
|
||||
.FromJust();
|
||||
}
|
||||
|
||||
uv_queue_work(env->event_loop(),
|
||||
req.release()->work_req(),
|
||||
RandomBytesWork,
|
||||
@ -5834,13 +5820,6 @@ void RandomBytesBuffer(const FunctionCallbackInfo<Value>& args) {
|
||||
if (args[3]->IsFunction()) {
|
||||
obj->Set(env->context(), env->ondone_string(), args[3]).FromJust();
|
||||
|
||||
if (env->in_domain()) {
|
||||
obj->Set(env->context(),
|
||||
env->domain_string(),
|
||||
env->domain_array()->Get(env->context(), 0).ToLocalChecked())
|
||||
.FromJust();
|
||||
}
|
||||
|
||||
uv_queue_work(env->event_loop(),
|
||||
req.release()->work_req(),
|
||||
RandomBytesWork,
|
||||
|
@ -15,8 +15,6 @@ ReqWrap<T>::ReqWrap(Environment* env,
|
||||
v8::Local<v8::Object> object,
|
||||
AsyncWrap::ProviderType provider)
|
||||
: AsyncWrap(env, object, provider) {
|
||||
if (env->in_domain())
|
||||
object->Set(env->domain_string(), env->domain_array()->Get(0));
|
||||
|
||||
// FIXME(bnoordhuis) The fact that a reinterpret_cast is needed is
|
||||
// arguably a good indicator that there should be more than one queue.
|
||||
|
@ -1,32 +0,0 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
let allsGood = false;
|
||||
let cntr = 0;
|
||||
|
||||
process.on('exit', () => {
|
||||
assert.ok(cntr > 0, '_tickDomainCallback was never called');
|
||||
});
|
||||
|
||||
/**
|
||||
* This test relies upon the following internals to work as specified:
|
||||
* - require('domain') causes node::Environment::set_tick_callback_function()
|
||||
* to use process._tickDomainCallback() to process the nextTickQueue;
|
||||
* replacing process._tickCallback().
|
||||
* - setImmediate() uses node::MakeCallback() instead of
|
||||
* node::AsyncWrap::MakeCallback(). Otherwise the test will always pass.
|
||||
* Have not found a way to verify that node::MakeCallback() is used.
|
||||
*/
|
||||
process._tickDomainCallback = function _tickDomainCallback() {
|
||||
assert.ok(allsGood, '_tickDomainCallback should not have been called');
|
||||
cntr++;
|
||||
};
|
||||
|
||||
setImmediate(common.mustCall(() => {
|
||||
require('domain');
|
||||
setImmediate(common.mustCall(() => setImmediate(common.mustCall(() => {
|
||||
allsGood = true;
|
||||
process.nextTick(() => {});
|
||||
}))));
|
||||
}));
|
Loading…
x
Reference in New Issue
Block a user