Put file stream methods into prototype, small style fixes

This commit is contained in:
Ryan Dahl 2010-04-08 10:37:10 -07:00
parent 3819920d77
commit 7faf7d5c8d

351
lib/fs.js
View File

@ -492,43 +492,7 @@ var FileReadStream = fs.FileReadStream = function(path, options) {
options = options || {};
for (var i in options) this[i] = options[i];
var
self = this,
buffer = null;
function read() {
if (!self.readable || self.paused) {
return;
}
fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) {
if (err) {
self.emit('error', err);
self.readable = false;
return;
}
if (bytesRead === 0) {
self.emit('end');
self.forceClose();
return;
}
// do not emit events if the stream is paused
if (self.paused) {
buffer = data;
return;
}
// do not emit events anymore after we declared the stream unreadable
if (!self.readable) {
return;
}
self.emit('data', data);
read();
});
}
var self = this;
fs.open(this.path, this.flags, this.mode, function(err, fd) {
if (err) {
@ -539,53 +503,97 @@ var FileReadStream = fs.FileReadStream = function(path, options) {
self.fd = fd;
self.emit('open', fd);
read();
self._read();
});
this.forceClose = function(cb) {
this.readable = false;
function close() {
fs.close(self.fd, function(err) {
if (err) {
if (cb) {
cb(err);
}
self.emit('error', err);
return;
}
if (cb) {
cb(null);
}
self.emit('close');
});
}
if (this.fd) {
close();
} else {
this.addListener('open', close);
}
};
this.pause = function() {
this.paused = true;
};
this.resume = function() {
this.paused = false;
if (buffer !== null) {
self.emit('data', buffer);
buffer = null;
}
read();
};
};
sys.inherits(FileReadStream, events.EventEmitter);
FileReadStream.prototype._read = function () {
var self = this;
if (!self.readable || self.paused) return;
fs.read(self.fd,
self.bufferSize,
undefined,
self.encoding,
function(err, data, bytesRead) {
if (err) {
self.emit('error', err);
self.readable = false;
return;
}
if (bytesRead === 0) {
self.emit('end');
self.forceClose();
return;
}
// do not emit events if the stream is paused
if (self.paused) {
self.buffer = data;
return;
}
// do not emit events anymore after we declared the stream unreadable
if (!self.readable) {
return;
}
self.emit('data', data);
self._read();
});
};
FileReadStream.prototype.forceClose = function (cb) {
var self = this;
this.readable = false;
function close() {
fs.close(self.fd, function(err) {
if (err) {
if (cb) {
cb(err);
}
self.emit('error', err);
return;
}
if (cb) {
cb(null);
}
self.emit('close');
});
}
if (this.fd) {
close();
} else {
this.addListener('open', close);
}
};
FileReadStream.prototype.pause = function() {
this.paused = true;
};
FileReadStream.prototype.resume = function() {
this.paused = false;
if (this.buffer) {
this.emit('data', this.buffer);
this.buffer = null;
}
this._read();
};
fs.createWriteStream = function(path, options) {
return new FileWriteStream(path, options);
};
@ -604,105 +612,108 @@ var FileWriteStream = fs.FileWriteStream = function(path, options) {
options = options || {};
for (var i in options) this[i] = options[i];
this.busy = false;
this._queue = [];
this._queue.push([fs.open, this.path, this.flags, this.mode, undefined]);
this.flush();
};
sys.inherits(FileWriteStream, events.EventEmitter);
FileWriteStream.prototype.flush = function () {
if (this.busy) return;
var self = this;
var args = this._queue.shift();
if (!args) return self.emit('drain');
this.busy = true;
var
self = this,
queue = [],
busy = false;
method = args.shift(),
cb = args.pop();
queue.push([fs.open, this.path, this.flags, this.mode, undefined]);
var self = this;
function flush() {
if (busy) {
args.push(function(err) {
self.busy = false;
if (err) {
self.writeable = false;
if (cb) {
cb(err);
}
self.emit('error', err);
return;
}
var args = queue.shift();
if (!args) {
return self.emit('drain');
}
busy = true;
var
method = args.shift(),
cb = args.pop();
args.push(function(err) {
busy = false;
if (err) {
self.writeable = false;
if (cb) {
cb(err);
}
self.emit('error', err);
return;
}
// stop flushing after close
if (method === fs.close) {
if (cb) {
cb(null);
}
self.emit('close');
return;
}
// save reference for file pointer
if (method === fs.open) {
self.fd = arguments[1];
self.emit('open', self.fd);
} else if (cb) {
// write callback
cb(null, arguments[1]);
}
flush();
});
// Inject the file pointer
if (method !== fs.open) {
args.unshift(self.fd);
}
method.apply(this, args);
};
this.write = function(data, cb) {
if (!this.writeable) {
throw new Error('stream not writeable');
}
queue.push([fs.write, data, undefined, this.encoding, cb]);
flush();
return false;
};
this.close = function(cb) {
this.writeable = false;
queue.push([fs.close, cb]);
flush();
};
this.forceClose = function(cb) {
this.writeable = false;
fs.close(self.fd, function(err) {
if (err) {
if (cb) {
cb(err);
}
self.emit('error', err);
return;
}
// stop flushing after close
if (method === fs.close) {
if (cb) {
cb(null);
}
self.emit('close');
});
};
return;
}
flush();
// save reference for file pointer
if (method === fs.open) {
self.fd = arguments[1];
self.emit('open', self.fd);
} else if (cb) {
// write callback
cb(null, arguments[1]);
}
self.flush();
});
// Inject the file pointer
if (method !== fs.open) {
args.unshift(self.fd);
}
method.apply(this, args);
};
sys.inherits(FileWriteStream, events.EventEmitter);
FileWriteStream.prototype.write = function(data, cb) {
if (!this.writeable) {
throw new Error('stream not writeable');
}
this._queue.push([fs.write, data, undefined, this.encoding, cb]);
this.flush();
return false;
};
FileWriteStream.prototype.close = function (cb) {
this.writeable = false;
this._queue.push([fs.close, cb]);
this.flush();
};
FileWriteStream.prototype.forceClose = function (cb) {
this.writeable = false;
fs.close(self.fd, function(err) {
if (err) {
if (cb) {
cb(err);
}
self.emit('error', err);
return;
}
if (cb) {
cb(null);
}
self.emit('close');
});
};