async-wrap: add methods to udp/tcp/pipe/timers
Now it's possible to add/remove an async listener to an individual handle created by UDP, TCP, Pipe or Timer.
This commit is contained in:
parent
8b8e3b6798
commit
ccec14b568
@ -200,6 +200,33 @@ exports.active = function(item) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
function timerAddAsyncListener(obj) {
|
||||||
|
if (!this._asyncQueue)
|
||||||
|
this._asyncQueue = [];
|
||||||
|
var queue = this._asyncQueue;
|
||||||
|
// This queue will be small. Probably always <= 3 items.
|
||||||
|
for (var i = 0; i < queue.length; i++) {
|
||||||
|
if (queue[i].uid === obj.uid)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this._asyncQueue.push(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function timerRemoveAsyncListener(obj) {
|
||||||
|
if (!this._asyncQueue)
|
||||||
|
return;
|
||||||
|
var queue = this._asyncQueue;
|
||||||
|
// This queue will be small. Probably always <= 3 items.
|
||||||
|
for (var i = 0; i < queue.length; i++) {
|
||||||
|
if (queue[i].uid === obj.uid) {
|
||||||
|
queue.splice(i, 1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DOM-style timers
|
* DOM-style timers
|
||||||
*/
|
*/
|
||||||
@ -335,6 +362,10 @@ Timeout.prototype.close = function() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// For domain compatibility need to attach this API.
|
||||||
|
Timeout.prototype.addAsyncListener = timerAddAsyncListener;
|
||||||
|
Timeout.prototype.removeAsyncListener = timerRemoveAsyncListener;
|
||||||
|
|
||||||
|
|
||||||
var immediateQueue = {};
|
var immediateQueue = {};
|
||||||
L.init(immediateQueue);
|
L.init(immediateQueue);
|
||||||
@ -390,8 +421,20 @@ function processImmediate() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
function Immediate() { }
|
||||||
|
|
||||||
|
Immediate.prototype.addAsyncListener = timerAddAsyncListener;
|
||||||
|
Immediate.prototype.removeAsyncListener = timerRemoveAsyncListener;
|
||||||
|
Immediate.prototype.domain = undefined;
|
||||||
|
Immediate.prototype._onImmediate = undefined;
|
||||||
|
Immediate.prototype._asyncQueue = undefined;
|
||||||
|
Immediate.prototype._idleNext = undefined;
|
||||||
|
Immediate.prototype._idlePrev = undefined;
|
||||||
|
|
||||||
|
|
||||||
exports.setImmediate = function(callback) {
|
exports.setImmediate = function(callback) {
|
||||||
var immediate = {}, args;
|
var immediate = new Immediate();
|
||||||
|
var args;
|
||||||
|
|
||||||
L.init(immediate);
|
L.init(immediate);
|
||||||
|
|
||||||
|
@ -111,6 +111,8 @@ void PipeWrap::Initialize(Handle<Object> target,
|
|||||||
NODE_SET_PROTOTYPE_METHOD(t, "setPendingInstances", SetPendingInstances);
|
NODE_SET_PROTOTYPE_METHOD(t, "setPendingInstances", SetPendingInstances);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
AsyncWrap::AddMethods<PipeWrap>(t);
|
||||||
|
|
||||||
target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "Pipe"), t->GetFunction());
|
target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "Pipe"), t->GetFunction());
|
||||||
env->set_pipe_constructor_template(t);
|
env->set_pipe_constructor_template(t);
|
||||||
}
|
}
|
||||||
|
@ -116,6 +116,8 @@ void TCPWrap::Initialize(Handle<Object> target,
|
|||||||
SetSimultaneousAccepts);
|
SetSimultaneousAccepts);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
AsyncWrap::AddMethods<TCPWrap>(t);
|
||||||
|
|
||||||
target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "TCP"), t->GetFunction());
|
target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "TCP"), t->GetFunction());
|
||||||
env->set_tcp_constructor_template(t);
|
env->set_tcp_constructor_template(t);
|
||||||
}
|
}
|
||||||
|
@ -119,6 +119,9 @@ void UDPWrap::Initialize(Handle<Object> target,
|
|||||||
NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
|
NODE_SET_PROTOTYPE_METHOD(t, "ref", HandleWrap::Ref);
|
||||||
NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
|
NODE_SET_PROTOTYPE_METHOD(t, "unref", HandleWrap::Unref);
|
||||||
|
|
||||||
|
|
||||||
|
AsyncWrap::AddMethods<UDPWrap>(t);
|
||||||
|
|
||||||
target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "UDP"), t->GetFunction());
|
target->Set(FIXED_ONE_BYTE_STRING(node_isolate, "UDP"), t->GetFunction());
|
||||||
env->set_udp_constructor_function(t->GetFunction());
|
env->set_udp_constructor_function(t->GetFunction());
|
||||||
}
|
}
|
||||||
|
161
test/simple/test-asynclistener-error-add-after.js
Normal file
161
test/simple/test-asynclistener-error-add-after.js
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the
|
||||||
|
// "Software"), to deal in the Software without restriction, including
|
||||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||||
|
// following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included
|
||||||
|
// in all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
var common = require('../common');
|
||||||
|
var assert = require('assert');
|
||||||
|
var dns = require('dns');
|
||||||
|
var fs = require('fs');
|
||||||
|
var net = require('net');
|
||||||
|
|
||||||
|
var errorMsgs = [];
|
||||||
|
var caught = 0;
|
||||||
|
var expectCaught = 0;
|
||||||
|
var exitCbRan = false;
|
||||||
|
|
||||||
|
function asyncL() { }
|
||||||
|
|
||||||
|
var callbacksObj = {
|
||||||
|
error: function(value, er) {
|
||||||
|
var idx = errorMsgs.indexOf(er.message);
|
||||||
|
caught++;
|
||||||
|
|
||||||
|
process._rawDebug('Handling error: ' + er.message);
|
||||||
|
|
||||||
|
if (-1 < idx)
|
||||||
|
errorMsgs.splice(idx, 1);
|
||||||
|
else
|
||||||
|
throw new Error('Message not found: ' + er.message);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
var listener = process.createAsyncListener(asyncL, callbacksObj);
|
||||||
|
|
||||||
|
process.on('exit', function(code) {
|
||||||
|
// Just in case.
|
||||||
|
process.removeAsyncListener(listener);
|
||||||
|
|
||||||
|
if (code > 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
assert.ok(!exitCbRan);
|
||||||
|
exitCbRan = true;
|
||||||
|
|
||||||
|
if (errorMsgs.length > 0)
|
||||||
|
throw new Error('Errors not fired: ' + errorMsgs);
|
||||||
|
|
||||||
|
assert.equal(caught, expectCaught);
|
||||||
|
process._rawDebug('ok');
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
// Simple cases
|
||||||
|
errorMsgs.push('setTimeout - simple');
|
||||||
|
errorMsgs.push('setImmediate - simple');
|
||||||
|
errorMsgs.push('setInterval - simple');
|
||||||
|
setTimeout(function() {
|
||||||
|
throw new Error('setTimeout - simple');
|
||||||
|
}).addAsyncListener(listener);
|
||||||
|
expectCaught++;
|
||||||
|
|
||||||
|
setImmediate(function() {
|
||||||
|
throw new Error('setImmediate - simple');
|
||||||
|
}).addAsyncListener(listener);
|
||||||
|
expectCaught++;
|
||||||
|
|
||||||
|
var b = setInterval(function() {
|
||||||
|
clearInterval(b);
|
||||||
|
throw new Error('setInterval - simple');
|
||||||
|
}).addAsyncListener(listener);
|
||||||
|
expectCaught++;
|
||||||
|
|
||||||
|
|
||||||
|
// Deeply nested
|
||||||
|
errorMsgs.push('setInterval - nested');
|
||||||
|
errorMsgs.push('setImmediate - nested');
|
||||||
|
errorMsgs.push('setTimeout - nested');
|
||||||
|
setTimeout(function() {
|
||||||
|
setImmediate(function() {
|
||||||
|
var b = setInterval(function() {
|
||||||
|
clearInterval(b);
|
||||||
|
throw new Error('setInterval - nested');
|
||||||
|
}).addAsyncListener(listener);
|
||||||
|
expectCaught++;
|
||||||
|
throw new Error('setImmediate - nested');
|
||||||
|
}).addAsyncListener(listener);
|
||||||
|
expectCaught++;
|
||||||
|
throw new Error('setTimeout - nested');
|
||||||
|
}).addAsyncListener(listener);
|
||||||
|
expectCaught++;
|
||||||
|
|
||||||
|
|
||||||
|
// Net
|
||||||
|
var iter = 3;
|
||||||
|
for (var i = 0; i < iter; i++) {
|
||||||
|
errorMsgs.push('net - error: server connection');
|
||||||
|
errorMsgs.push('net - error: client data');
|
||||||
|
errorMsgs.push('net - error: server data');
|
||||||
|
}
|
||||||
|
errorMsgs.push('net - error: server closed');
|
||||||
|
|
||||||
|
var server = net.createServer(function(c) {
|
||||||
|
c._handle.addAsyncListener(listener);
|
||||||
|
|
||||||
|
c.on('data', function() {
|
||||||
|
if (0 === --iter) {
|
||||||
|
server.close(function() {
|
||||||
|
process._rawDebug('net - server closing');
|
||||||
|
throw new Error('net - error: server closed');
|
||||||
|
});
|
||||||
|
expectCaught++;
|
||||||
|
}
|
||||||
|
process._rawDebug('net - connection received data');
|
||||||
|
throw new Error('net - error: server data');
|
||||||
|
});
|
||||||
|
expectCaught++;
|
||||||
|
|
||||||
|
c.end('bye');
|
||||||
|
process._rawDebug('net - connection received');
|
||||||
|
throw new Error('net - error: server connection');
|
||||||
|
});
|
||||||
|
expectCaught += iter;
|
||||||
|
|
||||||
|
server.listen(common.PORT, function() {
|
||||||
|
// Test adding the async listener after server creation. Though it
|
||||||
|
// won't catch errors that originate synchronously from this point.
|
||||||
|
server._handle.addAsyncListener(listener);
|
||||||
|
for (var i = 0; i < iter; i++)
|
||||||
|
clientConnect();
|
||||||
|
});
|
||||||
|
|
||||||
|
function clientConnect() {
|
||||||
|
var client = net.connect(common.PORT, function() {
|
||||||
|
client._handle.addAsyncListener(listener);
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('data', function() {
|
||||||
|
client.end('see ya');
|
||||||
|
process._rawDebug('net - client received data');
|
||||||
|
throw new Error('net - error: client data');
|
||||||
|
});
|
||||||
|
expectCaught++;
|
||||||
|
}
|
109
test/simple/test-asynclistener-error-net.js
Normal file
109
test/simple/test-asynclistener-error-net.js
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the
|
||||||
|
// "Software"), to deal in the Software without restriction, including
|
||||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||||
|
// following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included
|
||||||
|
// in all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
var common = require('../common');
|
||||||
|
var assert = require('assert');
|
||||||
|
var dns = require('dns');
|
||||||
|
var fs = require('fs');
|
||||||
|
var net = require('net');
|
||||||
|
|
||||||
|
var errorMsgs = [];
|
||||||
|
var caught = 0;
|
||||||
|
var expectCaught = 0;
|
||||||
|
|
||||||
|
function asyncL() { }
|
||||||
|
|
||||||
|
var callbacksObj = {
|
||||||
|
error: function(value, er) {
|
||||||
|
var idx = errorMsgs.indexOf(er.message);
|
||||||
|
caught++;
|
||||||
|
|
||||||
|
process._rawDebug('Handling error: ' + er.message);
|
||||||
|
|
||||||
|
if (-1 < idx)
|
||||||
|
errorMsgs.splice(idx, 1);
|
||||||
|
else
|
||||||
|
throw new Error('Message not found: ' + er.message);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
var listener = process.addAsyncListener(asyncL, callbacksObj);
|
||||||
|
|
||||||
|
process.on('exit', function(code) {
|
||||||
|
process.removeAsyncListener(listener);
|
||||||
|
|
||||||
|
if (code > 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (errorMsgs.length > 0)
|
||||||
|
throw new Error('Errors not fired: ' + errorMsgs);
|
||||||
|
|
||||||
|
assert.equal(caught, expectCaught);
|
||||||
|
process._rawDebug('ok');
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
// Net
|
||||||
|
var iter = 3;
|
||||||
|
for (var i = 0; i < iter; i++) {
|
||||||
|
errorMsgs.push('net - error: server connection');
|
||||||
|
errorMsgs.push('net - error: client data');
|
||||||
|
errorMsgs.push('net - error: server data');
|
||||||
|
}
|
||||||
|
errorMsgs.push('net - error: server closed');
|
||||||
|
|
||||||
|
var server = net.createServer(function(c) {
|
||||||
|
c.on('data', function() {
|
||||||
|
if (0 === --iter) {
|
||||||
|
server.close(function() {
|
||||||
|
process._rawDebug('net - server closing');
|
||||||
|
throw new Error('net - error: server closed');
|
||||||
|
});
|
||||||
|
expectCaught++;
|
||||||
|
}
|
||||||
|
process._rawDebug('net - connection received data');
|
||||||
|
throw new Error('net - error: server data');
|
||||||
|
});
|
||||||
|
expectCaught++;
|
||||||
|
|
||||||
|
c.end('bye');
|
||||||
|
process._rawDebug('net - connection received');
|
||||||
|
throw new Error('net - error: server connection');
|
||||||
|
});
|
||||||
|
expectCaught += iter;
|
||||||
|
|
||||||
|
server.listen(common.PORT, function() {
|
||||||
|
for (var i = 0; i < iter; i++)
|
||||||
|
clientConnect();
|
||||||
|
});
|
||||||
|
|
||||||
|
function clientConnect() {
|
||||||
|
var client = net.connect(common.PORT, function() { });
|
||||||
|
|
||||||
|
client.on('data', function() {
|
||||||
|
client.end('see ya');
|
||||||
|
process._rawDebug('net - client received data');
|
||||||
|
throw new Error('net - error: client data');
|
||||||
|
});
|
||||||
|
expectCaught++;
|
||||||
|
}
|
66
test/simple/test-asynclistener-remove-after.js
Normal file
66
test/simple/test-asynclistener-remove-after.js
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the
|
||||||
|
// "Software"), to deal in the Software without restriction, including
|
||||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||||
|
// following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included
|
||||||
|
// in all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
|
||||||
|
var common = require('../common');
|
||||||
|
var assert = require('assert');
|
||||||
|
var net = require('net');
|
||||||
|
|
||||||
|
|
||||||
|
// TODO(trevnorris): Test has the flaw that it's not checking if the async
|
||||||
|
// flag has been removed on the class instance.
|
||||||
|
var listener = process.addAsyncListener(function() { });
|
||||||
|
|
||||||
|
|
||||||
|
// Test timers
|
||||||
|
|
||||||
|
setImmediate(function() {
|
||||||
|
assert.equal(this._asyncQueue.length, 0);
|
||||||
|
}).removeAsyncListener(listener);
|
||||||
|
|
||||||
|
setTimeout(function() {
|
||||||
|
assert.equal(this._asyncQueue.length, 0);
|
||||||
|
}).removeAsyncListener(listener);
|
||||||
|
|
||||||
|
setInterval(function() {
|
||||||
|
clearInterval(this);
|
||||||
|
assert.equal(this._asyncQueue.length, 0);
|
||||||
|
}).removeAsyncListener(listener);
|
||||||
|
|
||||||
|
|
||||||
|
// Test net
|
||||||
|
|
||||||
|
var server = net.createServer(function(c) {
|
||||||
|
c._handle.removeAsyncListener(listener);
|
||||||
|
assert.equal(c._handle._asyncQueue.length, 0);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(common.PORT, function() {
|
||||||
|
server._handle.removeAsyncListener(listener);
|
||||||
|
assert.equal(server._handle._asyncQueue.length, 0);
|
||||||
|
|
||||||
|
var client = net.connect(common.PORT, function() {
|
||||||
|
client._handle.removeAsyncListener(listener);
|
||||||
|
assert.equal(client._handle._asyncQueue.length, 0);
|
||||||
|
client.end();
|
||||||
|
server.close();
|
||||||
|
});
|
||||||
|
});
|
Loading…
x
Reference in New Issue
Block a user