src: revert domain using AsyncListeners
This is a slightly modified revert of bc39bdd. Getting domains to use AsyncListeners became too much of a challenge with many edge cases. While this is still a goal, it will have to be deferred for now until more test coverage can be provided.
This commit is contained in:
parent
0afdfae0eb
commit
828f14556e
@ -457,12 +457,8 @@ ClientRequest.prototype.onSocket = function(socket) {
|
||||
var req = this;
|
||||
|
||||
process.nextTick(function() {
|
||||
// If a domain was added to the request, attach it to the socket.
|
||||
if (req.domain)
|
||||
socket._handle.addAsyncListener(req.domain._listener);
|
||||
tickOnSocket(req, socket);
|
||||
});
|
||||
|
||||
};
|
||||
|
||||
ClientRequest.prototype._deferToConnect = function(method, arguments_, cb) {
|
||||
|
158
lib/domain.js
158
lib/domain.js
@ -28,6 +28,26 @@ var inherits = util.inherits;
|
||||
// a few side effects.
|
||||
EventEmitter.usingDomains = true;
|
||||
|
||||
// overwrite process.domain with a getter/setter that will allow for more
|
||||
// effective optimizations
|
||||
var _domain = [null];
|
||||
Object.defineProperty(process, 'domain', {
|
||||
enumerable: true,
|
||||
get: function() {
|
||||
return _domain[0];
|
||||
},
|
||||
set: function(arg) {
|
||||
return _domain[0] = arg;
|
||||
}
|
||||
});
|
||||
|
||||
// objects with external array data are excellent ways to communicate state
|
||||
// between js and c++ w/o much overhead
|
||||
var _domain_flag = {};
|
||||
|
||||
// let the process know we're using domains
|
||||
process._setupDomainUse(_domain, _domain_flag);
|
||||
|
||||
exports.Domain = Domain;
|
||||
|
||||
exports.create = exports.createDomain = function() {
|
||||
@ -42,71 +62,66 @@ exports._stack = stack;
|
||||
exports.active = null;
|
||||
|
||||
|
||||
var listenerObj = {
|
||||
error: function errorHandler(domain, er) {
|
||||
var caught = false;
|
||||
// ignore errors on disposed domains.
|
||||
//
|
||||
// XXX This is a bit stupid. We should probably get rid of
|
||||
// domain.dispose() altogether. It's almost always a terrible
|
||||
// idea. --isaacs
|
||||
if (domain._disposed)
|
||||
return true;
|
||||
|
||||
er.domain = domain;
|
||||
er.domainThrown = true;
|
||||
// wrap this in a try/catch so we don't get infinite throwing
|
||||
try {
|
||||
// One of three things will happen here.
|
||||
//
|
||||
// 1. There is a handler, caught = true
|
||||
// 2. There is no handler, caught = false
|
||||
// 3. It throws, caught = false
|
||||
//
|
||||
// If caught is false after this, then there's no need to exit()
|
||||
// the domain, because we're going to crash the process anyway.
|
||||
caught = domain.emit('error', er);
|
||||
|
||||
if (stack.length === 0)
|
||||
process.removeAsyncListener(domain._listener);
|
||||
|
||||
// Exit all domains on the stack. Uncaught exceptions end the
|
||||
// current tick and no domains should be left on the stack
|
||||
// between ticks.
|
||||
stack.length = 0;
|
||||
exports.active = process.domain = null;
|
||||
} catch (er2) {
|
||||
// The domain error handler threw! oh no!
|
||||
// See if another domain can catch THIS error,
|
||||
// or else crash on the original one.
|
||||
// If the user already exited it, then don't double-exit.
|
||||
if (domain === exports.active) {
|
||||
stack.pop();
|
||||
}
|
||||
if (stack.length) {
|
||||
exports.active = process.domain = stack[stack.length - 1];
|
||||
caught = process._fatalException(er2);
|
||||
} else {
|
||||
caught = false;
|
||||
}
|
||||
return caught;
|
||||
}
|
||||
return caught;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
inherits(Domain, EventEmitter);
|
||||
|
||||
function Domain() {
|
||||
EventEmitter.call(this);
|
||||
|
||||
this.members = [];
|
||||
this._listener = process.createAsyncListener(listenerObj, this);
|
||||
}
|
||||
|
||||
Domain.prototype.members = undefined;
|
||||
Domain.prototype._disposed = undefined;
|
||||
Domain.prototype._listener = undefined;
|
||||
|
||||
|
||||
// Called by process._fatalException in case an error was thrown.
|
||||
Domain.prototype._errorHandler = function errorHandler(er) {
|
||||
var caught = false;
|
||||
// ignore errors on disposed domains.
|
||||
//
|
||||
// XXX This is a bit stupid. We should probably get rid of
|
||||
// domain.dispose() altogether. It's almost always a terrible
|
||||
// idea. --isaacs
|
||||
if (this._disposed)
|
||||
return true;
|
||||
|
||||
er.domain = this;
|
||||
er.domainThrown = true;
|
||||
// wrap this in a try/catch so we don't get infinite throwing
|
||||
try {
|
||||
// One of three things will happen here.
|
||||
//
|
||||
// 1. There is a handler, caught = true
|
||||
// 2. There is no handler, caught = false
|
||||
// 3. It throws, caught = false
|
||||
//
|
||||
// If caught is false after this, then there's no need to exit()
|
||||
// the domain, because we're going to crash the process anyway.
|
||||
caught = this.emit('error', er);
|
||||
|
||||
// Exit all domains on the stack. Uncaught exceptions end the
|
||||
// current tick and no domains should be left on the stack
|
||||
// between ticks.
|
||||
stack.length = 0;
|
||||
exports.active = process.domain = null;
|
||||
} catch (er2) {
|
||||
// The domain error handler threw! oh no!
|
||||
// See if another domain can catch THIS error,
|
||||
// or else crash on the original one.
|
||||
// If the user already exited it, then don't double-exit.
|
||||
if (this === exports.active) {
|
||||
stack.pop();
|
||||
}
|
||||
if (stack.length) {
|
||||
exports.active = process.domain = stack[stack.length - 1];
|
||||
caught = process._fatalException(er2);
|
||||
} else {
|
||||
caught = false;
|
||||
}
|
||||
return caught;
|
||||
}
|
||||
return caught;
|
||||
};
|
||||
|
||||
|
||||
Domain.prototype.enter = function() {
|
||||
@ -116,22 +131,20 @@ Domain.prototype.enter = function() {
|
||||
// to push it onto the stack so that we can pop it later.
|
||||
exports.active = process.domain = this;
|
||||
stack.push(this);
|
||||
|
||||
process.addAsyncListener(this._listener);
|
||||
_domain_flag[0] = stack.length;
|
||||
};
|
||||
|
||||
|
||||
Domain.prototype.exit = function() {
|
||||
if (this._disposed) return;
|
||||
|
||||
process.removeAsyncListener(this._listener);
|
||||
|
||||
// exit all domains until this one.
|
||||
var index = stack.lastIndexOf(this);
|
||||
if (index !== -1)
|
||||
stack.splice(index + 1);
|
||||
else
|
||||
stack.length = 0;
|
||||
_domain_flag[0] = stack.length;
|
||||
|
||||
exports.active = stack[stack.length - 1];
|
||||
process.domain = exports.active;
|
||||
@ -165,13 +178,6 @@ Domain.prototype.add = function(ee) {
|
||||
|
||||
ee.domain = this;
|
||||
this.members.push(ee);
|
||||
|
||||
// Adding the domain._listener to the Wrap associated with the event
|
||||
// emitter instance will be done automatically either on class
|
||||
// instantiation or manually, like in cases of net listen().
|
||||
// The reason it cannot be done here is because in specific cases the
|
||||
// _handle is not created on EE instantiation, so there's no place to
|
||||
// add the listener.
|
||||
};
|
||||
|
||||
|
||||
@ -180,24 +186,6 @@ Domain.prototype.remove = function(ee) {
|
||||
var index = this.members.indexOf(ee);
|
||||
if (index !== -1)
|
||||
this.members.splice(index, 1);
|
||||
|
||||
// First check if the ee is a handle itself.
|
||||
if (ee.removeAsyncListener)
|
||||
ee.removeAsyncListener(this._listener);
|
||||
|
||||
// Manually remove the asyncListener from the handle, if possible.
|
||||
if (ee._handle && ee._handle.removeAsyncListener)
|
||||
ee._handle.removeAsyncListener(this._listener);
|
||||
|
||||
// TODO(trevnorris): Are there cases where the handle doesn't live on
|
||||
// the ee or the _handle.
|
||||
|
||||
// TODO(trevnorris): For debugging that we've missed adding AsyncWrap's
|
||||
// methods to a handle somewhere on the native side.
|
||||
if (ee._handle && !ee._handle.removeAsyncListener) {
|
||||
process._rawDebug('Wrap handle is missing AsyncWrap methods');
|
||||
process.abort();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
@ -91,6 +91,9 @@ EventEmitter.prototype.emit = function(type) {
|
||||
if (util.isUndefined(handler))
|
||||
return false;
|
||||
|
||||
if (this.domain && this !== process)
|
||||
this.domain.enter();
|
||||
|
||||
if (util.isFunction(handler)) {
|
||||
switch (arguments.length) {
|
||||
// fast cases
|
||||
@ -123,6 +126,9 @@ EventEmitter.prototype.emit = function(type) {
|
||||
listeners[i].apply(this, args);
|
||||
}
|
||||
|
||||
if (this.domain && this !== process)
|
||||
this.domain.exit();
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
|
11
lib/net.js
11
lib/net.js
@ -1089,20 +1089,9 @@ Server.prototype._listen2 = function(address, port, addressType, backlog, fd) {
|
||||
// generate connection key, this should be unique to the connection
|
||||
this._connectionKey = addressType + ':' + address + ':' + port;
|
||||
|
||||
// If a domain is attached to the event emitter then we need to add
|
||||
// the listener to the handle.
|
||||
if (this.domain) {
|
||||
this._handle.addAsyncListener(this.domain._listener);
|
||||
process.addAsyncListener(this.domain._listener);
|
||||
}
|
||||
|
||||
process.nextTick(function() {
|
||||
self.emit('listening');
|
||||
});
|
||||
|
||||
if (this.domain) {
|
||||
process.removeAsyncListener(this.domain._listener);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
@ -112,7 +112,8 @@ function listOnTimeout() {
|
||||
// other timers that expire on this tick should still run.
|
||||
//
|
||||
// https://github.com/joyent/node/issues/2631
|
||||
if (first.domain && first.domain._disposed)
|
||||
var domain = first.domain;
|
||||
if (domain && domain._disposed)
|
||||
continue;
|
||||
|
||||
hasQueue = !!first._asyncQueue;
|
||||
@ -120,8 +121,12 @@ function listOnTimeout() {
|
||||
try {
|
||||
if (hasQueue)
|
||||
loadAsyncQueue(first);
|
||||
if (domain)
|
||||
domain.enter();
|
||||
threw = true;
|
||||
first._onTimeout();
|
||||
if (domain)
|
||||
domain.exit();
|
||||
if (hasQueue)
|
||||
unloadAsyncQueue(first);
|
||||
threw = false;
|
||||
@ -368,7 +373,7 @@ L.init(immediateQueue);
|
||||
|
||||
function processImmediate() {
|
||||
var queue = immediateQueue;
|
||||
var hasQueue, immediate;
|
||||
var domain, hasQueue, immediate;
|
||||
|
||||
immediateQueue = {};
|
||||
L.init(immediateQueue);
|
||||
@ -376,9 +381,12 @@ function processImmediate() {
|
||||
while (L.isEmpty(queue) === false) {
|
||||
immediate = L.shift(queue);
|
||||
hasQueue = !!immediate._asyncQueue;
|
||||
domain = immediate.domain;
|
||||
|
||||
if (hasQueue)
|
||||
loadAsyncQueue(immediate);
|
||||
if (domain)
|
||||
domain.enter();
|
||||
|
||||
var threw = true;
|
||||
try {
|
||||
@ -398,6 +406,8 @@ function processImmediate() {
|
||||
}
|
||||
}
|
||||
|
||||
if (domain)
|
||||
domain.exit();
|
||||
if (hasQueue)
|
||||
unloadAsyncQueue(immediate);
|
||||
}
|
||||
@ -481,7 +491,7 @@ function unrefTimeout() {
|
||||
|
||||
debug('unrefTimer fired');
|
||||
|
||||
var diff, first, hasQueue, threw;
|
||||
var diff, domain, first, hasQueue, threw;
|
||||
while (first = L.peek(unrefList)) {
|
||||
diff = now - first._idleStart;
|
||||
hasQueue = !!first._asyncQueue;
|
||||
@ -496,16 +506,21 @@ function unrefTimeout() {
|
||||
|
||||
L.remove(first);
|
||||
|
||||
domain = first.domain;
|
||||
|
||||
if (!first._onTimeout) continue;
|
||||
if (first.domain && first.domain._disposed) continue;
|
||||
if (domain && domain._disposed) continue;
|
||||
|
||||
try {
|
||||
if (hasQueue)
|
||||
loadAsyncQueue(first);
|
||||
if (domain) domain.enter();
|
||||
threw = true;
|
||||
debug('unreftimer firing timeout');
|
||||
first._onTimeout();
|
||||
threw = false;
|
||||
if (domain)
|
||||
domain.exit();
|
||||
if (hasQueue)
|
||||
unloadAsyncQueue(first);
|
||||
} finally {
|
||||
|
@ -88,10 +88,102 @@ inline bool AsyncWrap::has_async_queue() {
|
||||
}
|
||||
|
||||
|
||||
// I hate you domains.
|
||||
inline v8::Handle<v8::Value> AsyncWrap::MakeDomainCallback(
|
||||
const v8::Handle<v8::Function> cb,
|
||||
int argc,
|
||||
v8::Handle<v8::Value>* argv) {
|
||||
assert(env()->context() == env()->isolate()->GetCurrentContext());
|
||||
|
||||
v8::Local<v8::Object> context = object();
|
||||
v8::Local<v8::Object> process = env()->process_object();
|
||||
v8::Local<v8::Value> domain_v = context->Get(env()->domain_string());
|
||||
v8::Local<v8::Object> domain;
|
||||
|
||||
v8::TryCatch try_catch;
|
||||
try_catch.SetVerbose(true);
|
||||
|
||||
if (has_async_queue()) {
|
||||
v8::Local<v8::Value> val = context.As<v8::Value>();
|
||||
env()->async_listener_load_function()->Call(process, 1, &val);
|
||||
|
||||
if (try_catch.HasCaught())
|
||||
return v8::Undefined(env()->isolate());
|
||||
}
|
||||
|
||||
bool has_domain = domain_v->IsObject();
|
||||
if (has_domain) {
|
||||
domain = domain_v.As<v8::Object>();
|
||||
|
||||
if (domain->Get(env()->disposed_string())->IsTrue())
|
||||
return Undefined(env()->isolate());
|
||||
|
||||
v8::Local<v8::Function> enter =
|
||||
domain->Get(env()->enter_string()).As<v8::Function>();
|
||||
assert(enter->IsFunction());
|
||||
enter->Call(domain, 0, NULL);
|
||||
|
||||
if (try_catch.HasCaught())
|
||||
return Undefined(env()->isolate());
|
||||
}
|
||||
|
||||
v8::Local<v8::Value> ret = cb->Call(context, argc, argv);
|
||||
|
||||
if (try_catch.HasCaught()) {
|
||||
return Undefined(env()->isolate());
|
||||
}
|
||||
|
||||
if (has_domain) {
|
||||
v8::Local<v8::Function> exit =
|
||||
domain->Get(env()->exit_string()).As<v8::Function>();
|
||||
assert(exit->IsFunction());
|
||||
exit->Call(domain, 0, NULL);
|
||||
|
||||
if (try_catch.HasCaught())
|
||||
return Undefined(env()->isolate());
|
||||
}
|
||||
|
||||
if (has_async_queue()) {
|
||||
v8::Local<v8::Value> val = context.As<v8::Value>();
|
||||
env()->async_listener_unload_function()->Call(process, 1, &val);
|
||||
|
||||
if (try_catch.HasCaught())
|
||||
return Undefined(env()->isolate());
|
||||
}
|
||||
|
||||
Environment::TickInfo* tick_info = env()->tick_info();
|
||||
|
||||
if (tick_info->in_tick()) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (tick_info->length() == 0) {
|
||||
tick_info->set_index(0);
|
||||
return ret;
|
||||
}
|
||||
|
||||
tick_info->set_in_tick(true);
|
||||
|
||||
env()->tick_callback_function()->Call(process, 0, NULL);
|
||||
|
||||
tick_info->set_in_tick(false);
|
||||
|
||||
if (try_catch.HasCaught()) {
|
||||
tick_info->set_last_threw(true);
|
||||
return Undefined(env()->isolate());
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
inline v8::Handle<v8::Value> AsyncWrap::MakeCallback(
|
||||
const v8::Handle<v8::Function> cb,
|
||||
int argc,
|
||||
v8::Handle<v8::Value>* argv) {
|
||||
if (env()->using_domains())
|
||||
return MakeDomainCallback(cb, argc, argv);
|
||||
|
||||
assert(env()->context() == env()->isolate()->GetCurrentContext());
|
||||
|
||||
v8::Local<v8::Object> context = object();
|
||||
|
@ -62,6 +62,13 @@ class AsyncWrap : public BaseObject {
|
||||
v8::Handle<v8::Value>* argv);
|
||||
|
||||
private:
|
||||
// TODO(trevnorris): BURN IN FIRE! Remove this as soon as a suitable
|
||||
// replacement is committed.
|
||||
inline v8::Handle<v8::Value> MakeDomainCallback(
|
||||
const v8::Handle<v8::Function> cb,
|
||||
int argc,
|
||||
v8::Handle<v8::Value>* argv);
|
||||
|
||||
// Add an async listener to an existing handle.
|
||||
template <typename Type>
|
||||
static inline void AddAsyncListener(
|
||||
|
@ -86,6 +86,22 @@ inline uint32_t Environment::AsyncListener::count() const {
|
||||
return fields_[kCount];
|
||||
}
|
||||
|
||||
inline Environment::DomainFlag::DomainFlag() {
|
||||
for (int i = 0; i < kFieldsCount; ++i) fields_[i] = 0;
|
||||
}
|
||||
|
||||
inline uint32_t* Environment::DomainFlag::fields() {
|
||||
return fields_;
|
||||
}
|
||||
|
||||
inline int Environment::DomainFlag::fields_count() const {
|
||||
return kFieldsCount;
|
||||
}
|
||||
|
||||
inline uint32_t Environment::DomainFlag::count() const {
|
||||
return fields_[kCount];
|
||||
}
|
||||
|
||||
inline Environment::TickInfo::TickInfo() : in_tick_(false), last_threw_(false) {
|
||||
for (int i = 0; i < kFieldsCount; ++i)
|
||||
fields_[i] = 0;
|
||||
@ -163,6 +179,7 @@ inline Environment::Environment(v8::Local<v8::Context> context)
|
||||
: isolate_(context->GetIsolate()),
|
||||
isolate_data_(IsolateData::GetOrCreate(context->GetIsolate())),
|
||||
using_smalloc_alloc_cb_(false),
|
||||
using_domains_(false),
|
||||
context_(context->GetIsolate(), context) {
|
||||
// We'll be creating new objects so make sure we've entered the context.
|
||||
v8::HandleScope handle_scope(isolate());
|
||||
@ -193,6 +210,12 @@ inline bool Environment::has_async_listeners() const {
|
||||
return const_cast<Environment*>(this)->async_listener()->count() > 0;
|
||||
}
|
||||
|
||||
inline bool Environment::in_domain() const {
|
||||
// The const_cast is okay, it doesn't violate conceptual const-ness.
|
||||
return using_domains() &&
|
||||
const_cast<Environment*>(this)->domain_flag()->count() > 0;
|
||||
}
|
||||
|
||||
inline Environment* Environment::from_immediate_check_handle(
|
||||
uv_check_t* handle) {
|
||||
return CONTAINER_OF(handle, Environment, immediate_check_handle_);
|
||||
@ -231,6 +254,10 @@ inline Environment::AsyncListener* Environment::async_listener() {
|
||||
return &async_listener_count_;
|
||||
}
|
||||
|
||||
inline Environment::DomainFlag* Environment::domain_flag() {
|
||||
return &domain_flag_;
|
||||
}
|
||||
|
||||
inline Environment::TickInfo* Environment::tick_info() {
|
||||
return &tick_info_;
|
||||
}
|
||||
@ -243,6 +270,14 @@ inline void Environment::set_using_smalloc_alloc_cb(bool value) {
|
||||
using_smalloc_alloc_cb_ = value;
|
||||
}
|
||||
|
||||
inline bool Environment::using_domains() const {
|
||||
return using_domains_;
|
||||
}
|
||||
|
||||
inline void Environment::set_using_domains(bool value) {
|
||||
using_domains_ = value;
|
||||
}
|
||||
|
||||
inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
|
||||
return CONTAINER_OF(handle, Environment, cares_timer_handle_);
|
||||
}
|
||||
|
29
src/env.h
29
src/env.h
@ -66,6 +66,7 @@ namespace node {
|
||||
V(ctime_string, "ctime") \
|
||||
V(dev_string, "dev") \
|
||||
V(disposed_string, "_disposed") \
|
||||
V(domain_string, "domain") \
|
||||
V(enter_string, "enter") \
|
||||
V(errno_string, "errno") \
|
||||
V(exit_string, "exit") \
|
||||
@ -143,6 +144,7 @@ namespace node {
|
||||
V(binding_cache_object, v8::Object) \
|
||||
V(buffer_constructor_function, v8::Function) \
|
||||
V(context, v8::Context) \
|
||||
V(domain_array, v8::Array) \
|
||||
V(module_load_list_array, v8::Array) \
|
||||
V(pipe_constructor_template, v8::FunctionTemplate) \
|
||||
V(process_object, v8::Object) \
|
||||
@ -191,6 +193,26 @@ class Environment {
|
||||
DISALLOW_COPY_AND_ASSIGN(AsyncListener);
|
||||
};
|
||||
|
||||
class DomainFlag {
|
||||
public:
|
||||
inline uint32_t* fields();
|
||||
inline int fields_count() const;
|
||||
inline uint32_t count() const;
|
||||
|
||||
private:
|
||||
friend class Environment; // So we can call the constructor.
|
||||
inline DomainFlag();
|
||||
|
||||
enum Fields {
|
||||
kCount,
|
||||
kFieldsCount
|
||||
};
|
||||
|
||||
uint32_t fields_[kFieldsCount];
|
||||
|
||||
DISALLOW_COPY_AND_ASSIGN(DomainFlag);
|
||||
};
|
||||
|
||||
class TickInfo {
|
||||
public:
|
||||
inline uint32_t* fields();
|
||||
@ -232,6 +254,7 @@ class Environment {
|
||||
inline v8::Isolate* isolate() const;
|
||||
inline uv_loop_t* event_loop() const;
|
||||
inline bool has_async_listeners() const;
|
||||
inline bool in_domain() const;
|
||||
|
||||
static inline Environment* from_immediate_check_handle(uv_check_t* handle);
|
||||
inline uv_check_t* immediate_check_handle();
|
||||
@ -244,6 +267,7 @@ class Environment {
|
||||
inline uv_check_t* idle_check_handle();
|
||||
|
||||
inline AsyncListener* async_listener();
|
||||
inline DomainFlag* domain_flag();
|
||||
inline TickInfo* tick_info();
|
||||
|
||||
static inline Environment* from_cares_timer_handle(uv_timer_t* handle);
|
||||
@ -255,6 +279,9 @@ class Environment {
|
||||
inline bool using_smalloc_alloc_cb() const;
|
||||
inline void set_using_smalloc_alloc_cb(bool value);
|
||||
|
||||
inline bool using_domains() const;
|
||||
inline void set_using_domains(bool value);
|
||||
|
||||
// Strings are shared across shared contexts. The getters simply proxy to
|
||||
// the per-isolate primitive.
|
||||
#define V(PropertyName, StringValue) \
|
||||
@ -285,11 +312,13 @@ class Environment {
|
||||
uv_prepare_t idle_prepare_handle_;
|
||||
uv_check_t idle_check_handle_;
|
||||
AsyncListener async_listener_count_;
|
||||
DomainFlag domain_flag_;
|
||||
TickInfo tick_info_;
|
||||
uv_timer_t cares_timer_handle_;
|
||||
ares_channel cares_channel_;
|
||||
ares_task_list cares_task_list_;
|
||||
bool using_smalloc_alloc_cb_;
|
||||
bool using_domains_;
|
||||
|
||||
#define V(PropertyName, TypeName) \
|
||||
v8::Persistent<TypeName> PropertyName ## _;
|
||||
|
162
src/node.cc
162
src/node.cc
@ -876,11 +876,54 @@ void SetupAsyncListener(const FunctionCallbackInfo<Value>& args) {
|
||||
}
|
||||
|
||||
|
||||
void SetupDomainUse(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args.GetIsolate());
|
||||
|
||||
if (env->using_domains())
|
||||
return;
|
||||
env->set_using_domains(true);
|
||||
|
||||
HandleScope scope(node_isolate);
|
||||
Local<Object> process_object = env->process_object();
|
||||
|
||||
Local<String> tick_callback_function_key =
|
||||
FIXED_ONE_BYTE_STRING(node_isolate, "_tickDomainCallback");
|
||||
Local<Function> tick_callback_function =
|
||||
process_object->Get(tick_callback_function_key).As<Function>();
|
||||
|
||||
if (!tick_callback_function->IsFunction()) {
|
||||
fprintf(stderr, "process._tickDomainCallback assigned to non-function\n");
|
||||
abort();
|
||||
}
|
||||
|
||||
process_object->Set(FIXED_ONE_BYTE_STRING(node_isolate, "_tickCallback"),
|
||||
tick_callback_function);
|
||||
env->set_tick_callback_function(tick_callback_function);
|
||||
|
||||
assert(args[0]->IsArray());
|
||||
assert(args[1]->IsObject());
|
||||
|
||||
env->set_domain_array(args[0].As<Array>());
|
||||
|
||||
Local<Object> domain_flag_obj = args[1].As<Object>();
|
||||
Environment::DomainFlag* domain_flag = env->domain_flag();
|
||||
domain_flag_obj->SetIndexedPropertiesToExternalArrayData(
|
||||
domain_flag->fields(),
|
||||
kExternalUnsignedIntArray,
|
||||
domain_flag->fields_count());
|
||||
|
||||
// Do a little housekeeping.
|
||||
env->process_object()->Delete(
|
||||
FIXED_ONE_BYTE_STRING(args.GetIsolate(), "_setupDomainUse"));
|
||||
}
|
||||
|
||||
|
||||
void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
|
||||
HandleScope handle_scope(args.GetIsolate());
|
||||
Environment* env = Environment::GetCurrent(args.GetIsolate());
|
||||
|
||||
assert(args[0]->IsObject() && args[1]->IsFunction());
|
||||
assert(args[0]->IsObject());
|
||||
assert(args[1]->IsFunction());
|
||||
|
||||
// Values use to cross communicate with processNextTick.
|
||||
Local<Object> tick_info_obj = args[0].As<Object>();
|
||||
@ -897,11 +940,114 @@ void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
|
||||
}
|
||||
|
||||
|
||||
Handle<Value> MakeDomainCallback(Environment* env,
|
||||
Handle<Object> object,
|
||||
const Handle<Function> callback,
|
||||
int argc,
|
||||
Handle<Value> argv[]) {
|
||||
// If you hit this assertion, you forgot to enter the v8::Context first.
|
||||
assert(env->context() == env->isolate()->GetCurrentContext());
|
||||
|
||||
Local<Object> process = env->process_object();
|
||||
Local<Value> domain_v = object->Get(env->domain_string());
|
||||
Local<Object> domain;
|
||||
|
||||
TryCatch try_catch;
|
||||
try_catch.SetVerbose(true);
|
||||
|
||||
// TODO(trevnorris): This is sucky for performance. Fix it.
|
||||
bool has_async_queue = object->Has(env->async_queue_string());
|
||||
if (has_async_queue) {
|
||||
Local<Value> argv[] = { object };
|
||||
env->async_listener_load_function()->Call(process, ARRAY_SIZE(argv), argv);
|
||||
|
||||
if (try_catch.HasCaught())
|
||||
return Undefined(node_isolate);
|
||||
}
|
||||
|
||||
bool has_domain = domain_v->IsObject();
|
||||
if (has_domain) {
|
||||
domain = domain_v.As<Object>();
|
||||
|
||||
if (domain->Get(env->disposed_string())->IsTrue()) {
|
||||
// domain has been disposed of.
|
||||
return Undefined(node_isolate);
|
||||
}
|
||||
|
||||
Local<Function> enter =
|
||||
domain->Get(env->enter_string()).As<Function>();
|
||||
assert(enter->IsFunction());
|
||||
enter->Call(domain, 0, NULL);
|
||||
|
||||
if (try_catch.HasCaught()) {
|
||||
return Undefined(node_isolate);
|
||||
}
|
||||
}
|
||||
|
||||
Local<Value> ret = callback->Call(object, argc, argv);
|
||||
|
||||
if (try_catch.HasCaught()) {
|
||||
return Undefined(node_isolate);
|
||||
}
|
||||
|
||||
if (has_domain) {
|
||||
Local<Function> exit =
|
||||
domain->Get(env->exit_string()).As<Function>();
|
||||
assert(exit->IsFunction());
|
||||
exit->Call(domain, 0, NULL);
|
||||
|
||||
if (try_catch.HasCaught()) {
|
||||
return Undefined(node_isolate);
|
||||
}
|
||||
}
|
||||
|
||||
if (has_async_queue) {
|
||||
Local<Value> val = object.As<Value>();
|
||||
env->async_listener_unload_function()->Call(process, 1, &val);
|
||||
|
||||
if (try_catch.HasCaught())
|
||||
return Undefined(node_isolate);
|
||||
}
|
||||
|
||||
Environment::TickInfo* tick_info = env->tick_info();
|
||||
|
||||
if (tick_info->last_threw() == 1) {
|
||||
tick_info->set_last_threw(0);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (tick_info->in_tick()) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (tick_info->length() == 0) {
|
||||
tick_info->set_index(0);
|
||||
return ret;
|
||||
}
|
||||
|
||||
tick_info->set_in_tick(true);
|
||||
|
||||
env->tick_callback_function()->Call(process, 0, NULL);
|
||||
|
||||
tick_info->set_in_tick(false);
|
||||
|
||||
if (try_catch.HasCaught()) {
|
||||
tick_info->set_last_threw(true);
|
||||
return Undefined(node_isolate);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
Handle<Value> MakeCallback(Environment* env,
|
||||
Handle<Object> object,
|
||||
const Handle<Function> callback,
|
||||
int argc,
|
||||
Handle<Value> argv[]) {
|
||||
if (env->using_domains())
|
||||
return MakeDomainCallback(env, object, callback, argc, argv);
|
||||
|
||||
// If you hit this assertion, you forgot to enter the v8::Context first.
|
||||
assert(env->context() == env->isolate()->GetCurrentContext());
|
||||
|
||||
@ -1031,6 +1177,19 @@ Handle<Value> MakeCallback(const Handle<Object> object,
|
||||
}
|
||||
|
||||
|
||||
Handle<Value> MakeDomainCallback(const Handle<Object> object,
|
||||
const Handle<Function> callback,
|
||||
int argc,
|
||||
Handle<Value> argv[]) {
|
||||
Local<Context> context = object->CreationContext();
|
||||
Environment* env = Environment::GetCurrent(context);
|
||||
Context::Scope context_scope(context);
|
||||
HandleScope handle_scope(env->isolate());
|
||||
return handle_scope.Close(
|
||||
MakeDomainCallback(env, object, callback, argc, argv));
|
||||
}
|
||||
|
||||
|
||||
enum encoding ParseEncoding(Handle<Value> encoding_v, enum encoding _default) {
|
||||
HandleScope scope(node_isolate);
|
||||
|
||||
@ -2482,6 +2641,7 @@ void SetupProcessObject(Environment* env,
|
||||
|
||||
NODE_SET_METHOD(process, "_setupAsyncListener", SetupAsyncListener);
|
||||
NODE_SET_METHOD(process, "_setupNextTick", SetupNextTick);
|
||||
NODE_SET_METHOD(process, "_setupDomainUse", SetupDomainUse);
|
||||
|
||||
// values use to cross communicate with processNextTick
|
||||
Local<Object> tick_info_obj = Object::New();
|
||||
|
39
src/node.js
39
src/node.js
@ -224,11 +224,14 @@
|
||||
// First run through error handlers from asyncListener.
|
||||
var caught = _errorHandler(er);
|
||||
|
||||
if (process.domain && process.domain._errorHandler)
|
||||
caught = process.domain._errorHandler(er) || caught;
|
||||
|
||||
if (!caught)
|
||||
caught = process.emit('uncaughtException', er);
|
||||
|
||||
// If someone handled it, then great. Otherwise die in C++ since
|
||||
// that means we'll exit the process, emit the 'exit' event.
|
||||
// If someone handled it, then great. otherwise, die in C++ land
|
||||
// since that means that we'll exit the process, emit the 'exit' event
|
||||
if (!caught) {
|
||||
try {
|
||||
if (!process._exiting) {
|
||||
@ -568,6 +571,7 @@
|
||||
process.nextTick = nextTick;
|
||||
// Needs to be accessible from beyond this scope.
|
||||
process._tickCallback = _tickCallback;
|
||||
process._tickDomainCallback = _tickDomainCallback;
|
||||
|
||||
process._setupNextTick(tickInfo, _tickCallback);
|
||||
|
||||
@ -585,6 +589,7 @@
|
||||
}
|
||||
|
||||
// Run callbacks that have no domain.
|
||||
// Using domains will cause this to be overridden.
|
||||
function _tickCallback() {
|
||||
var callback, hasQueue, threw, tock;
|
||||
|
||||
@ -611,6 +616,35 @@
|
||||
tickDone();
|
||||
}
|
||||
|
||||
function _tickDomainCallback() {
|
||||
var callback, domain, hasQueue, threw, tock;
|
||||
|
||||
while (tickInfo[kIndex] < tickInfo[kLength]) {
|
||||
tock = nextTickQueue[tickInfo[kIndex]++];
|
||||
callback = tock.callback;
|
||||
domain = tock.domain;
|
||||
hasQueue = !!tock._asyncQueue;
|
||||
if (hasQueue)
|
||||
_loadAsyncQueue(tock);
|
||||
if (domain)
|
||||
domain.enter();
|
||||
threw = true;
|
||||
try {
|
||||
callback();
|
||||
threw = false;
|
||||
} finally {
|
||||
if (threw)
|
||||
tickDone();
|
||||
}
|
||||
if (hasQueue)
|
||||
_unloadAsyncQueue(tock);
|
||||
if (domain)
|
||||
domain.exit();
|
||||
}
|
||||
|
||||
tickDone();
|
||||
}
|
||||
|
||||
function nextTick(callback) {
|
||||
// on the way out, don't bother. it won't get fired anyway.
|
||||
if (process._exiting)
|
||||
@ -618,6 +652,7 @@
|
||||
|
||||
var obj = {
|
||||
callback: callback,
|
||||
domain: process.domain || null,
|
||||
_asyncQueue: undefined
|
||||
};
|
||||
|
||||
|
@ -3628,6 +3628,9 @@ void PBKDF2(const FunctionCallbackInfo<Value>& args) {
|
||||
|
||||
if (args[4]->IsFunction()) {
|
||||
obj->Set(env->ondone_string(), args[4]);
|
||||
// XXX(trevnorris): This will need to go with the rest of domains.
|
||||
if (env->in_domain())
|
||||
obj->Set(env->domain_string(), env->domain_array()->Get(0));
|
||||
uv_queue_work(env->event_loop(),
|
||||
req->work_req(),
|
||||
EIO_PBKDF2,
|
||||
@ -3789,6 +3792,9 @@ void RandomBytes(const FunctionCallbackInfo<Value>& args) {
|
||||
|
||||
if (args[1]->IsFunction()) {
|
||||
obj->Set(FIXED_ONE_BYTE_STRING(node_isolate, "ondone"), args[1]);
|
||||
// XXX(trevnorris): This will need to go with the rest of domains.
|
||||
if (env->in_domain())
|
||||
obj->Set(env->domain_string(), env->domain_array()->Get(0));
|
||||
uv_queue_work(env->event_loop(),
|
||||
req->work_req(),
|
||||
RandomBytesWork<pseudoRandom>,
|
||||
|
@ -39,6 +39,9 @@ class ReqWrap : public AsyncWrap {
|
||||
public:
|
||||
ReqWrap(Environment* env, v8::Handle<v8::Object> object)
|
||||
: AsyncWrap(env, object) {
|
||||
if (env->in_domain())
|
||||
object->Set(env->domain_string(), env->domain_array()->Get(0));
|
||||
|
||||
QUEUE_INSERT_TAIL(&req_wrap_queue, &req_wrap_queue_);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user