Add callback to socket.write(), fix test-sendfds

This commit is contained in:
Ryan Dahl 2010-11-12 16:24:53 -08:00
parent a6d8425382
commit fa556a1425
4 changed files with 64 additions and 30 deletions

View File

@ -56,17 +56,18 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
IOWatcher.prototype.ondrain = function () { IOWatcher.prototype.ondrain = function () {
assert(this.socket); if (this.socket) {
var socket = this.socket; var socket = this.socket;
if (socket.writable || socket.readable) { if (socket.writable || socket.readable) {
require('timers').active(socket); require('timers').active(socket);
}
socket.emit('drain');
if (socket.ondrain) socket.ondrain();
if (socket._eof) socket._shutdown();
} }
socket.emit('drain');
if (socket.ondrain) socket.ondrain();
if (socket._eof) socket._shutdown();
}; };
@ -252,12 +253,13 @@ Object.defineProperty(Stream.prototype, 'readyState', {
}); });
Stream.prototype._appendBucket = function (data, encoding, fd) { Stream.prototype._appendBucket = function (data, encoding, fd, callback) {
if (data.length != 0) { if (data.length != 0) {
// TODO reject empty data. // TODO reject empty data.
var newBucket = { data: data }; var newBucket = { data: data };
if (encoding) newBucket.encoding = encoding; if (encoding) newBucket.encoding = encoding;
if (fd) newBucket.fd = fd; if (fd) newBucket.fd = fd;
if (callback) newBucket.callback = callback;
// TODO properly calculate queueSize // TODO properly calculate queueSize
@ -280,7 +282,7 @@ Stream.prototype._appendBucket = function (data, encoding, fd) {
}; };
Stream.prototype.write = function (data, encoding, fd) { Stream.prototype.write = function (data /* encoding, fd, callback */) {
if (this._eof) { if (this._eof) {
throw new Error('Stream.end() called already; cannot write.'); throw new Error('Stream.end() called already; cannot write.');
} }
@ -289,7 +291,29 @@ Stream.prototype.write = function (data, encoding, fd) {
throw new Error('Stream is not writable'); throw new Error('Stream is not writable');
} }
var queueSize = this._appendBucket(data, encoding, fd); // parse the arguments. ugly.
var encoding, fd, callback;
if (arguments[1] === undefined || typeof arguments[1] == 'string') {
encoding = arguments[1];
if (typeof arguments[2] == 'number') {
fd = arguments[2];
callback = arguments[3];
} else {
callback = arguments[2];
}
} else if (typeof arguments[1] == 'number') {
fd = arguments[1];
callback = arguments[2];
} else if (typeof arguments[1] == 'function') {
callback = arguments[1];
} else {
throw new Error("Bad type for second argument");
}
var queueSize = this._appendBucket(data, encoding, fd, callback);
if (this._connecting) return false; if (this._connecting) return false;

View File

@ -38,6 +38,7 @@ static Persistent<String> is_unix_socket_sym;
static Persistent<String> first_bucket_sym; static Persistent<String> first_bucket_sym;
static Persistent<String> last_bucket_sym; static Persistent<String> last_bucket_sym;
static Persistent<String> queue_size_sym; static Persistent<String> queue_size_sym;
static Persistent<String> callback_sym;
void IOWatcher::Initialize(Handle<Object> target) { void IOWatcher::Initialize(Handle<Object> target) {
@ -73,6 +74,7 @@ void IOWatcher::Initialize(Handle<Object> target) {
is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket");
data_sym = NODE_PSYMBOL("data"); data_sym = NODE_PSYMBOL("data");
encoding_sym = NODE_PSYMBOL("encoding"); encoding_sym = NODE_PSYMBOL("encoding");
callback_sym = NODE_PSYMBOL("callback");
ev_prepare_init(&dumper, IOWatcher::Dump); ev_prepare_init(&dumper, IOWatcher::Dump);
@ -497,6 +499,17 @@ void IOWatcher::Dump() {
written -= bucket_len - offset; written -= bucket_len - offset;
Local<Value> bucket_callback_v = bucket->Get(callback_sym);
if (bucket_callback_v->IsFunction()) {
Local<Function> bucket_callback =
Local<Function>::Cast(bucket_callback_v);
TryCatch try_catch;
bucket_callback->Call(io->handle_, 0, NULL);
if (try_catch.HasCaught()) {
FatalException(try_catch);
}
}
// Offset is now zero // Offset is now zero
watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
} }

View File

@ -22,35 +22,33 @@ function processData(s) {
// version of our modified object back. Clean up when we're done. // version of our modified object back. Clean up when we're done.
var pipeStream = new net.Stream(fd); var pipeStream = new net.Stream(fd);
var drainFunc = function() { pipeStream.resume();
pipeStream.write(JSON.stringify(d) + '\n', function () {
pipeStream.destroy(); pipeStream.destroy();
if (++numSentMessages == 2) { if (++numSentMessages == 2) {
s.destroy(); s.destroy();
} }
}; });
pipeStream.addListener('drain', drainFunc);
pipeStream.resume();
if (pipeStream.write(JSON.stringify(d) + '\n')) {
drainFunc();
}
}; };
// Create a UNIX socket to the path defined by argv[2] and read a file // Create a UNIX socket to the path defined by argv[2] and read a file
// descriptor and misc data from it. // descriptor and misc data from it.
var s = new net.Stream(); var s = new net.Stream();
s.addListener('fd', function(fd) { s.addListener('fd', function(fd) {
receivedFDs.unshift(fd); receivedFDs.unshift(fd);
processData(s); processData(s);
}); });
s.addListener('data', function(data) { s.addListener('data', function(data) {
data.toString('utf8').trim().split('\n').forEach(function(d) { data.toString('utf8').trim().split('\n').forEach(function(d) {
receivedData.unshift(JSON.parse(d)); receivedData.unshift(JSON.parse(d));
}); });
processData(s); processData(s);
}); });
s.connect(process.argv[2]); s.connect(process.argv[2]);
// vim:ts=2 sw=2 et // vim:ts=2 sw=2 et

View File

@ -53,7 +53,7 @@ var logChild = function(d) {
d.split('\n').forEach(function(l) { d.split('\n').forEach(function(l) {
if (l.length > 0) { if (l.length > 0) {
common.debug('CHILD: ' + l); console.error('CHILD: ' + l);
} }
}); });
}; };
@ -96,19 +96,18 @@ var srv = net.createServer(function(s) {
buf.write(JSON.stringify(DATA) + '\n', 'utf8'); buf.write(JSON.stringify(DATA) + '\n', 'utf8');
s.write(str, 'utf8', pipeFDs[1]); s.write(str, 'utf8', pipeFDs[1]);
if (s.write(buf, undefined, pipeFDs[1])) {
s.write(buf, pipeFDs[1], function () {
console.error("close pipeFDs[1]");
netBinding.close(pipeFDs[1]); netBinding.close(pipeFDs[1]);
} else { });
s.addListener('drain', function() {
netBinding.close(pipeFDs[1]);
});
}
}); });
srv.listen(SOCK_PATH); srv.listen(SOCK_PATH);
// Spawn a child running test/fixtures/recvfd.js // Spawn a child running test/fixtures/recvfd.js
var cp = child_process.spawn(process.argv[0], var cp = child_process.spawn(process.execPath,
[path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]); [path.join(common.fixturesDir, 'recvfd.js'),
SOCK_PATH]);
cp.stdout.addListener('data', logChild); cp.stdout.addListener('data', logChild);
cp.stderr.addListener('data', logChild); cp.stderr.addListener('data', logChild);