lib: refactor wrap_js_stream for ES6/readability
PR-URL: https://github.com/nodejs/node/pull/16158 Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Tobias Nießen <tniessen@tnie.de>
This commit is contained in:
parent
542e94cdce
commit
33b4320cf9
@ -8,146 +8,207 @@ const uv = process.binding('uv');
|
|||||||
const debug = util.debuglog('stream_wrap');
|
const debug = util.debuglog('stream_wrap');
|
||||||
const errors = require('internal/errors');
|
const errors = require('internal/errors');
|
||||||
|
|
||||||
function StreamWrap(stream) {
|
/* This class serves as a wrapper for when the C++ side of Node wants access
|
||||||
const handle = new JSStream();
|
* to a standard JS stream. For example, TLS or HTTP do not operate on network
|
||||||
|
* resources conceptually, although that is the common case and what we are
|
||||||
|
* optimizing for; in theory, they are completely composable and can work with
|
||||||
|
* any stream resource they see.
|
||||||
|
*
|
||||||
|
* For the common case, i.e. a TLS socket wrapping around a net.Socket, we
|
||||||
|
* can skip going through the JS layer and let TLS access the raw C++ handle
|
||||||
|
* of a net.Socket. The flipside of this is that, to maintain composability,
|
||||||
|
* we need a way to create "fake" net.Socket instances that call back into a
|
||||||
|
* "real" JavaScript stream. JSStreamWrap is exactly this.
|
||||||
|
*/
|
||||||
|
class JSStreamWrap extends Socket {
|
||||||
|
constructor(stream) {
|
||||||
|
const handle = new JSStream();
|
||||||
|
handle.close = (cb) => {
|
||||||
|
debug('close');
|
||||||
|
this.doClose(cb);
|
||||||
|
};
|
||||||
|
handle.isAlive = () => this.isAlive();
|
||||||
|
handle.isClosing = () => this.isClosing();
|
||||||
|
handle.onreadstart = () => this.readStart();
|
||||||
|
handle.onreadstop = () => this.readStop();
|
||||||
|
handle.onshutdown = (req) => this.doShutdown(req);
|
||||||
|
handle.onwrite = (req, bufs) => this.doWrite(req, bufs);
|
||||||
|
|
||||||
this.stream = stream;
|
stream.pause();
|
||||||
|
stream.on('error', (err) => this.emit('error', err));
|
||||||
|
const ondata = (chunk) => {
|
||||||
|
if (typeof chunk === 'string' ||
|
||||||
|
stream._readableState.objectMode === true) {
|
||||||
|
// Make sure that no further `data` events will happen.
|
||||||
|
stream.pause();
|
||||||
|
stream.removeListener('data', ondata);
|
||||||
|
|
||||||
this._list = null;
|
this.emit('error', new errors.Error('ERR_STREAM_WRAP'));
|
||||||
|
|
||||||
const self = this;
|
|
||||||
handle.close = function(cb) {
|
|
||||||
debug('close');
|
|
||||||
self.doClose(cb);
|
|
||||||
};
|
|
||||||
handle.isAlive = function() {
|
|
||||||
return self.isAlive();
|
|
||||||
};
|
|
||||||
handle.isClosing = function() {
|
|
||||||
return self.isClosing();
|
|
||||||
};
|
|
||||||
handle.onreadstart = function() {
|
|
||||||
return self.readStart();
|
|
||||||
};
|
|
||||||
handle.onreadstop = function() {
|
|
||||||
return self.readStop();
|
|
||||||
};
|
|
||||||
handle.onshutdown = function(req) {
|
|
||||||
return self.doShutdown(req);
|
|
||||||
};
|
|
||||||
handle.onwrite = function(req, bufs) {
|
|
||||||
return self.doWrite(req, bufs);
|
|
||||||
};
|
|
||||||
|
|
||||||
this.stream.pause();
|
|
||||||
this.stream.on('error', function onerror(err) {
|
|
||||||
self.emit('error', err);
|
|
||||||
});
|
|
||||||
this.stream.on('data', function ondata(chunk) {
|
|
||||||
if (typeof chunk === 'string' || this._readableState.objectMode === true) {
|
|
||||||
// Make sure that no further `data` events will happen
|
|
||||||
this.pause();
|
|
||||||
this.removeListener('data', ondata);
|
|
||||||
|
|
||||||
self.emit('error', new errors.Error('ERR_STREAM_WRAP'));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug('data', chunk.length);
|
|
||||||
if (self._handle)
|
|
||||||
self._handle.readBuffer(chunk);
|
|
||||||
});
|
|
||||||
this.stream.once('end', function onend() {
|
|
||||||
debug('end');
|
|
||||||
if (self._handle)
|
|
||||||
self._handle.emitEOF();
|
|
||||||
});
|
|
||||||
|
|
||||||
Socket.call(this, {
|
|
||||||
handle: handle
|
|
||||||
});
|
|
||||||
}
|
|
||||||
util.inherits(StreamWrap, Socket);
|
|
||||||
module.exports = StreamWrap;
|
|
||||||
|
|
||||||
// require('_stream_wrap').StreamWrap
|
|
||||||
StreamWrap.StreamWrap = StreamWrap;
|
|
||||||
|
|
||||||
StreamWrap.prototype.isAlive = function isAlive() {
|
|
||||||
return true;
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamWrap.prototype.isClosing = function isClosing() {
|
|
||||||
return !this.readable || !this.writable;
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamWrap.prototype.readStart = function readStart() {
|
|
||||||
this.stream.resume();
|
|
||||||
return 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamWrap.prototype.readStop = function readStop() {
|
|
||||||
this.stream.pause();
|
|
||||||
return 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamWrap.prototype.doShutdown = function doShutdown(req) {
|
|
||||||
const self = this;
|
|
||||||
const handle = this._handle;
|
|
||||||
const item = this._enqueue('shutdown', req);
|
|
||||||
|
|
||||||
this.stream.end(function() {
|
|
||||||
// Ensure that write was dispatched
|
|
||||||
setImmediate(function() {
|
|
||||||
if (!self._dequeue(item))
|
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
handle.finishShutdown(req, 0);
|
debug('data', chunk.length);
|
||||||
|
if (this._handle)
|
||||||
|
this._handle.readBuffer(chunk);
|
||||||
|
};
|
||||||
|
stream.on('data', ondata);
|
||||||
|
stream.once('end', () => {
|
||||||
|
debug('end');
|
||||||
|
if (this._handle)
|
||||||
|
this._handle.emitEOF();
|
||||||
});
|
});
|
||||||
});
|
|
||||||
return 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
|
super({ handle, manualStart: true });
|
||||||
const self = this;
|
this.stream = stream;
|
||||||
const handle = self._handle;
|
this._list = null;
|
||||||
|
this.read(0);
|
||||||
var pending = bufs.length;
|
|
||||||
|
|
||||||
// Queue the request to be able to cancel it
|
|
||||||
const item = self._enqueue('write', req);
|
|
||||||
|
|
||||||
self.stream.cork();
|
|
||||||
for (var n = 0; n < bufs.length; n++)
|
|
||||||
self.stream.write(bufs[n], done);
|
|
||||||
self.stream.uncork();
|
|
||||||
|
|
||||||
function done(err) {
|
|
||||||
if (!err && --pending !== 0)
|
|
||||||
return;
|
|
||||||
|
|
||||||
// Ensure that this is called once in case of error
|
|
||||||
pending = 0;
|
|
||||||
|
|
||||||
let errCode = 0;
|
|
||||||
if (err) {
|
|
||||||
const code = uv[`UV_${err.code}`];
|
|
||||||
errCode = (err.code && code) ? code : uv.UV_EPIPE;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure that write was dispatched
|
|
||||||
setImmediate(function() {
|
|
||||||
// Do not invoke callback twice
|
|
||||||
if (!self._dequeue(item))
|
|
||||||
return;
|
|
||||||
|
|
||||||
handle.doAfterWrite(req);
|
|
||||||
handle.finishWrite(req, errCode);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
// Legacy
|
||||||
};
|
static get StreamWrap() {
|
||||||
|
return JSStreamWrap;
|
||||||
|
}
|
||||||
|
|
||||||
|
isAlive() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
isClosing() {
|
||||||
|
return !this.readable || !this.writable;
|
||||||
|
}
|
||||||
|
|
||||||
|
readStart() {
|
||||||
|
this.stream.resume();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
readStop() {
|
||||||
|
this.stream.pause();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
doShutdown(req) {
|
||||||
|
const handle = this._handle;
|
||||||
|
const item = this._enqueue('shutdown', req);
|
||||||
|
|
||||||
|
this.stream.end(() => {
|
||||||
|
// Ensure that write was dispatched
|
||||||
|
setImmediate(() => {
|
||||||
|
if (!this._dequeue(item))
|
||||||
|
return;
|
||||||
|
|
||||||
|
handle.finishShutdown(req, 0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
doWrite(req, bufs) {
|
||||||
|
const self = this;
|
||||||
|
const handle = this._handle;
|
||||||
|
|
||||||
|
var pending = bufs.length;
|
||||||
|
|
||||||
|
// Queue the request to be able to cancel it
|
||||||
|
const item = this._enqueue('write', req);
|
||||||
|
|
||||||
|
this.stream.cork();
|
||||||
|
for (var n = 0; n < bufs.length; n++)
|
||||||
|
this.stream.write(bufs[n], done);
|
||||||
|
this.stream.uncork();
|
||||||
|
|
||||||
|
function done(err) {
|
||||||
|
if (!err && --pending !== 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// Ensure that this is called once in case of error
|
||||||
|
pending = 0;
|
||||||
|
|
||||||
|
let errCode = 0;
|
||||||
|
if (err) {
|
||||||
|
const code = uv[`UV_${err.code}`];
|
||||||
|
errCode = (err.code && code) ? code : uv.UV_EPIPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that write was dispatched
|
||||||
|
setImmediate(function() {
|
||||||
|
// Do not invoke callback twice
|
||||||
|
if (!self._dequeue(item))
|
||||||
|
return;
|
||||||
|
|
||||||
|
handle.doAfterWrite(req);
|
||||||
|
handle.finishWrite(req, errCode);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
_enqueue(type, req) {
|
||||||
|
const item = new QueueItem(type, req);
|
||||||
|
if (this._list === null) {
|
||||||
|
this._list = item;
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
item.next = this._list.next;
|
||||||
|
item.prev = this._list;
|
||||||
|
item.next.prev = item;
|
||||||
|
item.prev.next = item;
|
||||||
|
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
_dequeue(item) {
|
||||||
|
assert(item instanceof QueueItem);
|
||||||
|
|
||||||
|
var next = item.next;
|
||||||
|
var prev = item.prev;
|
||||||
|
|
||||||
|
if (next === null && prev === null)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
item.next = null;
|
||||||
|
item.prev = null;
|
||||||
|
|
||||||
|
if (next === item) {
|
||||||
|
prev = null;
|
||||||
|
next = null;
|
||||||
|
} else {
|
||||||
|
prev.next = next;
|
||||||
|
next.prev = prev;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this._list === item)
|
||||||
|
this._list = next;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
doClose(cb) {
|
||||||
|
const handle = this._handle;
|
||||||
|
|
||||||
|
setImmediate(() => {
|
||||||
|
while (this._list !== null) {
|
||||||
|
const item = this._list;
|
||||||
|
const req = item.req;
|
||||||
|
this._dequeue(item);
|
||||||
|
|
||||||
|
const errCode = uv.UV_ECANCELED;
|
||||||
|
if (item.type === 'write') {
|
||||||
|
handle.doAfterWrite(req);
|
||||||
|
handle.finishWrite(req, errCode);
|
||||||
|
} else if (item.type === 'shutdown') {
|
||||||
|
handle.finishShutdown(req, errCode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be already set by net.js
|
||||||
|
assert.strictEqual(this._handle, null);
|
||||||
|
cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function QueueItem(type, req) {
|
function QueueItem(type, req) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
@ -156,68 +217,4 @@ function QueueItem(type, req) {
|
|||||||
this.next = this;
|
this.next = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
StreamWrap.prototype._enqueue = function _enqueue(type, req) {
|
module.exports = JSStreamWrap;
|
||||||
const item = new QueueItem(type, req);
|
|
||||||
if (this._list === null) {
|
|
||||||
this._list = item;
|
|
||||||
return item;
|
|
||||||
}
|
|
||||||
|
|
||||||
item.next = this._list.next;
|
|
||||||
item.prev = this._list;
|
|
||||||
item.next.prev = item;
|
|
||||||
item.prev.next = item;
|
|
||||||
|
|
||||||
return item;
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamWrap.prototype._dequeue = function _dequeue(item) {
|
|
||||||
assert(item instanceof QueueItem);
|
|
||||||
|
|
||||||
var next = item.next;
|
|
||||||
var prev = item.prev;
|
|
||||||
|
|
||||||
if (next === null && prev === null)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
item.next = null;
|
|
||||||
item.prev = null;
|
|
||||||
|
|
||||||
if (next === item) {
|
|
||||||
prev = null;
|
|
||||||
next = null;
|
|
||||||
} else {
|
|
||||||
prev.next = next;
|
|
||||||
next.prev = prev;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this._list === item)
|
|
||||||
this._list = next;
|
|
||||||
|
|
||||||
return true;
|
|
||||||
};
|
|
||||||
|
|
||||||
StreamWrap.prototype.doClose = function doClose(cb) {
|
|
||||||
const self = this;
|
|
||||||
const handle = self._handle;
|
|
||||||
|
|
||||||
setImmediate(function() {
|
|
||||||
while (self._list !== null) {
|
|
||||||
const item = self._list;
|
|
||||||
const req = item.req;
|
|
||||||
self._dequeue(item);
|
|
||||||
|
|
||||||
const errCode = uv.UV_ECANCELED;
|
|
||||||
if (item.type === 'write') {
|
|
||||||
handle.doAfterWrite(req);
|
|
||||||
handle.finishWrite(req, errCode);
|
|
||||||
} else if (item.type === 'shutdown') {
|
|
||||||
handle.finishShutdown(req, errCode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Should be already set by net.js
|
|
||||||
assert(self._handle === null);
|
|
||||||
cb();
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
@ -245,7 +245,7 @@ function Socket(options) {
|
|||||||
this._handle.reading = false;
|
this._handle.reading = false;
|
||||||
this._handle.readStop();
|
this._handle.readStop();
|
||||||
this._readableState.flowing = false;
|
this._readableState.flowing = false;
|
||||||
} else {
|
} else if (!options.manualStart) {
|
||||||
this.read(0);
|
this.read(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,5 +15,5 @@ const TlsSocket = require('tls').TLSSocket;
|
|||||||
const EventEmitter = require('events').EventEmitter;
|
const EventEmitter = require('events').EventEmitter;
|
||||||
assert.throws(
|
assert.throws(
|
||||||
() => { new TlsSocket(new EventEmitter()); },
|
() => { new TlsSocket(new EventEmitter()); },
|
||||||
/^TypeError: this\.stream\.pause is not a function/
|
/^TypeError: (.+) is not a function$/
|
||||||
);
|
);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user