From 7faf7d5c8d5ddb35548a0754f0c0fc3ad713c195 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 8 Apr 2010 10:37:10 -0700 Subject: [PATCH] Put file stream methods into prototype, small style fixes --- lib/fs.js | 351 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 181 insertions(+), 170 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index b75b69ababe..25ce376d5a8 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -492,43 +492,7 @@ var FileReadStream = fs.FileReadStream = function(path, options) { options = options || {}; for (var i in options) this[i] = options[i]; - var - self = this, - buffer = null; - - function read() { - if (!self.readable || self.paused) { - return; - } - - fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) { - if (err) { - self.emit('error', err); - self.readable = false; - return; - } - - if (bytesRead === 0) { - self.emit('end'); - self.forceClose(); - return; - } - - // do not emit events if the stream is paused - if (self.paused) { - buffer = data; - return; - } - - // do not emit events anymore after we declared the stream unreadable - if (!self.readable) { - return; - } - - self.emit('data', data); - read(); - }); - } + var self = this; fs.open(this.path, this.flags, this.mode, function(err, fd) { if (err) { @@ -539,53 +503,97 @@ var FileReadStream = fs.FileReadStream = function(path, options) { self.fd = fd; self.emit('open', fd); - read(); + self._read(); }); - - this.forceClose = function(cb) { - this.readable = false; - - function close() { - fs.close(self.fd, function(err) { - if (err) { - if (cb) { - cb(err); - } - self.emit('error', err); - return; - } - - if (cb) { - cb(null); - } - self.emit('close'); - }); - } - - if (this.fd) { - close(); - } else { - this.addListener('open', close); - } - }; - - this.pause = function() { - this.paused = true; - }; - - this.resume = function() { - this.paused = false; - - if (buffer !== null) { - self.emit('data', buffer); - buffer = null; - } - - read(); - }; }; sys.inherits(FileReadStream, events.EventEmitter); + +FileReadStream.prototype._read = function () { + var self = this; + if (!self.readable || self.paused) return; + + fs.read(self.fd, + self.bufferSize, + undefined, + self.encoding, + function(err, data, bytesRead) { + if (err) { + self.emit('error', err); + self.readable = false; + return; + } + + if (bytesRead === 0) { + self.emit('end'); + self.forceClose(); + return; + } + + // do not emit events if the stream is paused + if (self.paused) { + self.buffer = data; + return; + } + + // do not emit events anymore after we declared the stream unreadable + if (!self.readable) { + return; + } + + self.emit('data', data); + self._read(); + }); +}; + + +FileReadStream.prototype.forceClose = function (cb) { + var self = this; + this.readable = false; + + function close() { + fs.close(self.fd, function(err) { + if (err) { + if (cb) { + cb(err); + } + self.emit('error', err); + return; + } + + if (cb) { + cb(null); + } + self.emit('close'); + }); + } + + if (this.fd) { + close(); + } else { + this.addListener('open', close); + } +}; + + +FileReadStream.prototype.pause = function() { + this.paused = true; +}; + + +FileReadStream.prototype.resume = function() { + this.paused = false; + + if (this.buffer) { + this.emit('data', this.buffer); + this.buffer = null; + } + + this._read(); +}; + + + fs.createWriteStream = function(path, options) { return new FileWriteStream(path, options); }; @@ -604,105 +612,108 @@ var FileWriteStream = fs.FileWriteStream = function(path, options) { options = options || {}; for (var i in options) this[i] = options[i]; + this.busy = false; + this._queue = []; + + this._queue.push([fs.open, this.path, this.flags, this.mode, undefined]); + this.flush(); +}; +sys.inherits(FileWriteStream, events.EventEmitter); + + + +FileWriteStream.prototype.flush = function () { + if (this.busy) return; + var self = this; + + var args = this._queue.shift(); + if (!args) return self.emit('drain'); + + this.busy = true; + var - self = this, - queue = [], - busy = false; + method = args.shift(), + cb = args.pop(); - queue.push([fs.open, this.path, this.flags, this.mode, undefined]); + var self = this; - function flush() { - if (busy) { + args.push(function(err) { + self.busy = false; + + if (err) { + self.writeable = false; + if (cb) { + cb(err); + } + self.emit('error', err); return; } - var args = queue.shift(); - if (!args) { - return self.emit('drain'); - } - - busy = true; - - var - method = args.shift(), - cb = args.pop(); - - args.push(function(err) { - busy = false; - - if (err) { - self.writeable = false; - if (cb) { - cb(err); - } - self.emit('error', err); - return; - } - - // stop flushing after close - if (method === fs.close) { - if (cb) { - cb(null); - } - self.emit('close'); - return; - } - - // save reference for file pointer - if (method === fs.open) { - self.fd = arguments[1]; - self.emit('open', self.fd); - } else if (cb) { - // write callback - cb(null, arguments[1]); - } - - flush(); - }); - - // Inject the file pointer - if (method !== fs.open) { - args.unshift(self.fd); - } - - method.apply(this, args); - }; - - this.write = function(data, cb) { - if (!this.writeable) { - throw new Error('stream not writeable'); - } - - queue.push([fs.write, data, undefined, this.encoding, cb]); - flush(); - return false; - }; - - this.close = function(cb) { - this.writeable = false; - queue.push([fs.close, cb]); - flush(); - }; - - this.forceClose = function(cb) { - this.writeable = false; - fs.close(self.fd, function(err) { - if (err) { - if (cb) { - cb(err); - } - - self.emit('error', err); - return; - } - + // stop flushing after close + if (method === fs.close) { if (cb) { cb(null); } self.emit('close'); - }); - }; + return; + } - flush(); + // save reference for file pointer + if (method === fs.open) { + self.fd = arguments[1]; + self.emit('open', self.fd); + } else if (cb) { + // write callback + cb(null, arguments[1]); + } + + self.flush(); + }); + + // Inject the file pointer + if (method !== fs.open) { + args.unshift(self.fd); + } + + method.apply(this, args); }; -sys.inherits(FileWriteStream, events.EventEmitter); + + +FileWriteStream.prototype.write = function(data, cb) { + if (!this.writeable) { + throw new Error('stream not writeable'); + } + + this._queue.push([fs.write, data, undefined, this.encoding, cb]); + this.flush(); + + return false; +}; + + +FileWriteStream.prototype.close = function (cb) { + this.writeable = false; + this._queue.push([fs.close, cb]); + this.flush(); +}; + + +FileWriteStream.prototype.forceClose = function (cb) { + this.writeable = false; + fs.close(self.fd, function(err) { + if (err) { + if (cb) { + cb(err); + } + + self.emit('error', err); + return; + } + + if (cb) { + cb(null); + } + self.emit('close'); + }); +}; +