Add callbacks to stream methods

Allows for more fine graining, especially finding out about an individual
chunk of data being flushed in a write stream rather than the whole queue.

This commit also fixes a bug causing forceClose to fail on a readStream that
did not finish opening yet.
This commit is contained in:
Felix Geisendörfer 2010-03-07 16:33:21 +01:00 committed by Ryan Dahl
parent c47391526c
commit 9a9f08b1bc
4 changed files with 84 additions and 31 deletions

View File

@ -823,7 +823,7 @@ until the stream is resumed.
+readStream.resume()+ :: +readStream.resume()+ ::
Resumes the stream. Together with +pause()+ this useful to throttle reading. Resumes the stream. Together with +pause()+ this useful to throttle reading.
+readStream.forceClose()+ :: +readStream.forceClose([callback])+ ::
Allows to close the stream before the +"end"+ is reached. No more events other Allows to close the stream before the +"end"+ is reached. No more events other
than +"close"+ will be fired after this method has been called. than +"close"+ will be fired after this method has been called.
@ -855,15 +855,18 @@ Returns a new FileWriteStream object.
A boolean that is +true+ by default, but turns +false+ after an +"error"+ A boolean that is +true+ by default, but turns +false+ after an +"error"+
occured or +close()+ / +forceClose()+ was called. occured or +close()+ / +forceClose()+ was called.
+writeStream.write(data)+ :: +writeStream.write(data, [callback])+ ::
Returns +true+ if the data was flushed to the kernel, and +false+ if it was Returns +true+ if the data was flushed to the kernel, and +false+ if it was
queued up for being written later. A +"drain"+ will fire after all queued data queued up for being written later. A +"drain"+ will fire after all queued data
has been written. has been written.
+
You can also specify +callback+ to be notified when the data from this write
has been flushed. The first param is +err+, the second is +bytesWritten+.
+writeStream.close()+ :: +writeStream.close([callback])+ ::
Closes the stream right after all queued +write()+ calls have finished. Closes the stream right after all queued +write()+ calls have finished.
+writeStream.forceClose()+ :: +writeStream.forceClose([callback])+ ::
Allows to close the stream regardless of its current state. Allows to close the stream regardless of its current state.
== HTTP == HTTP

View File

@ -495,16 +495,31 @@ var FileReadStream = exports.FileReadStream = function(path, options) {
read(); read();
}); });
this.forceClose = function() { this.forceClose = function(cb) {
this.readable = false; this.readable = false;
fs.close(this.fd, function(err) {
if (err) {
self.emit('error', err);
return;
}
self.emit('close'); function close() {
}); fs.close(self.fd, function(err) {
if (err) {
if (cb) {
cb(err);
}
self.emit('error', err);
return;
}
if (cb) {
cb(null);
}
self.emit('close');
});
}
if (this.fd) {
close();
} else {
this.addListener('open', close);
}
}; };
this.pause = function() { this.pause = function() {
@ -546,7 +561,7 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) {
queue = [], queue = [],
busy = false; busy = false;
queue.push([fs.open, this.path, this.flags, this.mode]); queue.push([fs.open, this.path, this.flags, this.mode, undefined]);
function flush() { function flush() {
if (busy) { if (busy) {
@ -560,27 +575,38 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) {
busy = true; busy = true;
var method = args.shift(); var
method = args.shift(),
cb = args.pop();
args.push(function(err) { args.push(function(err) {
busy = false; busy = false;
if (err) { if (err) {
self.writeable = false; self.writeable = false;
if (cb) {
cb(err);
}
self.emit('error', err); self.emit('error', err);
return; return;
} }
// stop flushing after close
if (method === fs.close) {
if (cb) {
cb(null);
}
self.emit('close');
return;
}
// save reference for file pointer // save reference for file pointer
if (method === fs.open) { if (method === fs.open) {
self.fd = arguments[1]; self.fd = arguments[1];
self.emit('open', self.fd); self.emit('open', self.fd);
} } else if (cb) {
// write callback
// stop flushing after close cb(null, arguments[1]);
if (method === fs.close) {
self.emit('close');
return;
} }
flush(); flush();
@ -594,30 +620,37 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) {
method.apply(null, args); method.apply(null, args);
}; };
this.write = function(data) { this.write = function(data, cb) {
if (!this.writeable) { if (!this.writeable) {
throw new Error('stream not writeable'); throw new Error('stream not writeable');
} }
queue.push([fs.write, data, undefined, this.encoding]); queue.push([fs.write, data, undefined, this.encoding, cb]);
flush(); flush();
return false; return false;
}; };
this.close = function() { this.close = function(cb) {
this.writeable = false; this.writeable = false;
queue.push([fs.close,]); queue.push([fs.close, cb]);
flush(); flush();
}; };
this.forceClose = function() { this.forceClose = function(cb) {
this.writeable = false; this.writeable = false;
fs.close(self.fd, function(err) { fs.close(self.fd, function(err) {
if (err) { if (err) {
if (cb) {
cb(err);
}
self.emit('error', err); self.emit('error', err);
return; return;
} }
if (cb) {
cb(null);
}
self.emit('close'); self.emit('close');
}); });
}; };

View File

@ -7,7 +7,8 @@ var
callbacks = { callbacks = {
open: -1, open: -1,
end: -1, end: -1,
close: -1 close: -1,
forceClose: -1
}, },
paused = false, paused = false,
@ -47,6 +48,12 @@ file
assert.equal(fs.readFileSync(fn), fileContent); assert.equal(fs.readFileSync(fn), fileContent);
}); });
var file2 = fs.createReadStream(fn);
file2.forceClose(function(err) {
assert.ok(!err);
callbacks.forceClose++;
});
process.addListener('exit', function() { process.addListener('exit', function() {
for (var k in callbacks) { for (var k in callbacks) {
assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]); assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]);

View File

@ -4,12 +4,14 @@ var
fn = path.join(fixturesDir, "write.txt"), fn = path.join(fixturesDir, "write.txt"),
file = fs.createWriteStream(fn), file = fs.createWriteStream(fn),
EXPECTED = '0123456789', EXPECTED = '012345678910',
callbacks = { callbacks = {
open: -1, open: -1,
drain: -2, drain: -2,
close: -1 close: -1,
closeCb: -1,
write: -11,
}; };
file file
@ -27,7 +29,10 @@ file
file.write(EXPECTED); file.write(EXPECTED);
} else if (callbacks.drain == 0) { } else if (callbacks.drain == 0) {
assert.equal(EXPECTED+EXPECTED, fs.readFileSync(fn)); assert.equal(EXPECTED+EXPECTED, fs.readFileSync(fn));
file.close(); file.close(function(err) {
assert.ok(!err);
callbacks.closeCb++;
});
} }
}) })
.addListener('close', function() { .addListener('close', function() {
@ -39,8 +44,13 @@ file
fs.unlinkSync(fn); fs.unlinkSync(fn);
}); });
for (var i = 0; i < 10; i++) { for (var i = 0; i < 11; i++) {
assert.strictEqual(false, file.write(i)); (function(i) {
assert.strictEqual(false, file.write(i, function(err, bytesWritten) {
callbacks.write++;
assert.equal(new String(i).length, bytesWritten);
}));
})(i);
} }
process.addListener('exit', function() { process.addListener('exit', function() {