http: Use streams3 directly, not .ondata/end
This commit is contained in:
parent
cec81593d7
commit
967b5dbb45
@ -253,6 +253,8 @@ function socketOnData(d) {
|
|||||||
var req = this._httpMessage;
|
var req = this._httpMessage;
|
||||||
var parser = this.parser;
|
var parser = this.parser;
|
||||||
|
|
||||||
|
assert(parser);
|
||||||
|
|
||||||
var ret = parser.execute(d);
|
var ret = parser.execute(d);
|
||||||
if (ret instanceof Error) {
|
if (ret instanceof Error) {
|
||||||
debug('parse error');
|
debug('parse error');
|
||||||
@ -266,8 +268,8 @@ function socketOnData(d) {
|
|||||||
var res = parser.incoming;
|
var res = parser.incoming;
|
||||||
req.res = res;
|
req.res = res;
|
||||||
|
|
||||||
socket.ondata = null;
|
socket.removeListener('data', socketOnData);
|
||||||
socket.onend = null;
|
socket.removeListener('end', socketOnEnd);
|
||||||
parser.finish();
|
parser.finish();
|
||||||
|
|
||||||
var bodyHead = d.slice(bytesParsed, d.length);
|
var bodyHead = d.slice(bytesParsed, d.length);
|
||||||
@ -281,6 +283,10 @@ function socketOnData(d) {
|
|||||||
socket.removeListener('close', socketCloseListener);
|
socket.removeListener('close', socketCloseListener);
|
||||||
socket.removeListener('error', socketErrorListener);
|
socket.removeListener('error', socketErrorListener);
|
||||||
|
|
||||||
|
// TODO(isaacs): Need a way to reset a stream to fresh state
|
||||||
|
// IE, not flowing, and not explicitly paused.
|
||||||
|
socket._readableState.flowing = null;
|
||||||
|
|
||||||
req.emit(eventName, res, socket, bodyHead);
|
req.emit(eventName, res, socket, bodyHead);
|
||||||
req.emit('close');
|
req.emit('close');
|
||||||
} else {
|
} else {
|
||||||
@ -293,6 +299,8 @@ function socketOnData(d) {
|
|||||||
// send a final response after this client sends a request
|
// send a final response after this client sends a request
|
||||||
// body. So, we must not free the parser.
|
// body. So, we must not free the parser.
|
||||||
parser.incoming.statusCode !== 100) {
|
parser.incoming.statusCode !== 100) {
|
||||||
|
socket.removeListener('data', socketOnData);
|
||||||
|
socket.removeListener('end', socketOnEnd);
|
||||||
freeParser(parser, req);
|
freeParser(parser, req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -422,11 +430,11 @@ ClientRequest.prototype.onSocket = function(socket) {
|
|||||||
parser.maxHeaderPairs = 2000;
|
parser.maxHeaderPairs = 2000;
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.on('error', socketErrorListener);
|
|
||||||
socket.ondata = socketOnData;
|
|
||||||
socket.onend = socketOnEnd;
|
|
||||||
socket.on('close', socketCloseListener);
|
|
||||||
parser.onIncoming = parserOnIncomingClient;
|
parser.onIncoming = parserOnIncomingClient;
|
||||||
|
socket.on('error', socketErrorListener);
|
||||||
|
socket.on('data', socketOnData);
|
||||||
|
socket.on('end', socketOnEnd);
|
||||||
|
socket.on('close', socketCloseListener);
|
||||||
req.emit('socket', socket);
|
req.emit('socket', socket);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -55,6 +55,7 @@ function parserOnHeaders(headers, url) {
|
|||||||
// info.url is not set for response parsers but that's not
|
// info.url is not set for response parsers but that's not
|
||||||
// applicable here since all our parsers are request parsers.
|
// applicable here since all our parsers are request parsers.
|
||||||
function parserOnHeadersComplete(info) {
|
function parserOnHeadersComplete(info) {
|
||||||
|
debug('parserOnHeadersComplete', info);
|
||||||
var parser = this;
|
var parser = this;
|
||||||
var headers = info.headers;
|
var headers = info.headers;
|
||||||
var url = info.url;
|
var url = info.url;
|
||||||
@ -200,11 +201,8 @@ function freeParser(parser, req) {
|
|||||||
if (parser) {
|
if (parser) {
|
||||||
parser._headers = [];
|
parser._headers = [];
|
||||||
parser.onIncoming = null;
|
parser.onIncoming = null;
|
||||||
if (parser.socket) {
|
if (parser.socket)
|
||||||
parser.socket.onend = null;
|
|
||||||
parser.socket.ondata = null;
|
|
||||||
parser.socket.parser = null;
|
parser.socket.parser = null;
|
||||||
}
|
|
||||||
parser.socket = null;
|
parser.socket = null;
|
||||||
parser.incoming = null;
|
parser.incoming = null;
|
||||||
parsers.free(parser);
|
parsers.free(parser);
|
||||||
|
@ -339,11 +339,19 @@ function connectionListener(socket) {
|
|||||||
parser.maxHeaderPairs = 2000;
|
parser.maxHeaderPairs = 2000;
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.addListener('error', function(e) {
|
socket.addListener('error', socketOnError);
|
||||||
self.emit('clientError', e, this);
|
socket.addListener('close', serverSocketCloseListener);
|
||||||
});
|
parser.onIncoming = parserOnIncoming;
|
||||||
|
socket.on('end', socketOnEnd);
|
||||||
|
socket.on('data', socketOnData);
|
||||||
|
|
||||||
socket.ondata = function(d) {
|
// TODO(isaacs): Move all these functions out of here
|
||||||
|
function socketOnError(e) {
|
||||||
|
self.emit('clientError', e, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
function socketOnData(d) {
|
||||||
|
debug('SERVER socketOnData %d', d.length);
|
||||||
var ret = parser.execute(d);
|
var ret = parser.execute(d);
|
||||||
if (ret instanceof Error) {
|
if (ret instanceof Error) {
|
||||||
debug('parse error');
|
debug('parse error');
|
||||||
@ -352,26 +360,32 @@ function connectionListener(socket) {
|
|||||||
// Upgrade or CONNECT
|
// Upgrade or CONNECT
|
||||||
var bytesParsed = ret;
|
var bytesParsed = ret;
|
||||||
var req = parser.incoming;
|
var req = parser.incoming;
|
||||||
|
debug('SERVER upgrade or connect', req.method);
|
||||||
|
|
||||||
socket.ondata = null;
|
socket.removeListener('data', socketOnData);
|
||||||
socket.onend = null;
|
socket.removeListener('end', socketOnEnd);
|
||||||
socket.removeListener('close', serverSocketCloseListener);
|
socket.removeListener('close', serverSocketCloseListener);
|
||||||
parser.finish();
|
parser.finish();
|
||||||
freeParser(parser, req);
|
freeParser(parser, req);
|
||||||
|
|
||||||
var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
|
var eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
|
||||||
if (EventEmitter.listenerCount(self, eventName) > 0) {
|
if (EventEmitter.listenerCount(self, eventName) > 0) {
|
||||||
|
debug('SERVER have listener for %s', eventName);
|
||||||
var bodyHead = d.slice(bytesParsed, d.length);
|
var bodyHead = d.slice(bytesParsed, d.length);
|
||||||
|
|
||||||
self.emit(eventName, req, req.socket, bodyHead);
|
// TODO(isaacs): Need a way to reset a stream to fresh state
|
||||||
|
// IE, not flowing, and not explicitly paused.
|
||||||
|
socket._readableState.flowing = null;
|
||||||
|
self.emit(eventName, req, socket, bodyHead);
|
||||||
} else {
|
} else {
|
||||||
// Got upgrade header or CONNECT method, but have no handler.
|
// Got upgrade header or CONNECT method, but have no handler.
|
||||||
socket.destroy();
|
socket.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
socket.onend = function() {
|
function socketOnEnd() {
|
||||||
|
var socket = this;
|
||||||
var ret = parser.finish();
|
var ret = parser.finish();
|
||||||
|
|
||||||
if (ret instanceof Error) {
|
if (ret instanceof Error) {
|
||||||
@ -390,14 +404,14 @@ function connectionListener(socket) {
|
|||||||
} else {
|
} else {
|
||||||
if (socket.writable) socket.end();
|
if (socket.writable) socket.end();
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
socket.addListener('close', serverSocketCloseListener);
|
|
||||||
|
|
||||||
// The following callback is issued after the headers have been read on a
|
// The following callback is issued after the headers have been read on a
|
||||||
// new message. In this callback we setup the response object and pass it
|
// new message. In this callback we setup the response object and pass it
|
||||||
// to the user.
|
// to the user.
|
||||||
parser.onIncoming = function(req, shouldKeepAlive) {
|
|
||||||
|
function parserOnIncoming(req, shouldKeepAlive) {
|
||||||
incoming.push(req);
|
incoming.push(req);
|
||||||
|
|
||||||
var res = new ServerResponse(req);
|
var res = new ServerResponse(req);
|
||||||
@ -415,7 +429,8 @@ function connectionListener(socket) {
|
|||||||
|
|
||||||
// When we're finished writing the response, check if this is the last
|
// When we're finished writing the response, check if this is the last
|
||||||
// respose, if so destroy the socket.
|
// respose, if so destroy the socket.
|
||||||
res.on('finish', function() {
|
res.on('finish', resOnFinish);
|
||||||
|
function resOnFinish() {
|
||||||
// Usually the first incoming element should be our request. it may
|
// Usually the first incoming element should be our request. it may
|
||||||
// be that in the case abortIncoming() was called that the incoming
|
// be that in the case abortIncoming() was called that the incoming
|
||||||
// array will be empty.
|
// array will be empty.
|
||||||
@ -440,7 +455,7 @@ function connectionListener(socket) {
|
|||||||
m.assignSocket(socket);
|
m.assignSocket(socket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
if (!util.isUndefined(req.headers.expect) &&
|
if (!util.isUndefined(req.headers.expect) &&
|
||||||
(req.httpVersionMajor == 1 && req.httpVersionMinor == 1) &&
|
(req.httpVersionMajor == 1 && req.httpVersionMinor == 1) &&
|
||||||
@ -456,6 +471,6 @@ function connectionListener(socket) {
|
|||||||
self.emit('request', req, res);
|
self.emit('request', req, res);
|
||||||
}
|
}
|
||||||
return false; // Not a HEAD response. (Not even a response!)
|
return false; // Not a HEAD response. (Not even a response!)
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
exports._connectionListener = connectionListener;
|
exports._connectionListener = connectionListener;
|
||||||
|
@ -125,8 +125,6 @@ function onCryptoStreamEnd() {
|
|||||||
} else {
|
} else {
|
||||||
debug('encrypted.onend');
|
debug('encrypted.onend');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.onend) this.onend();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -306,16 +304,6 @@ CryptoStream.prototype._read = function read(size) {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Give them requested data
|
// Give them requested data
|
||||||
if (this.ondata) {
|
|
||||||
var self = this;
|
|
||||||
this.ondata(pool, start, start + bytesRead);
|
|
||||||
|
|
||||||
// Consume data automatically
|
|
||||||
// simple/test-https-drain fails without it
|
|
||||||
process.nextTick(function() {
|
|
||||||
self.read(bytesRead);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
this.push(pool.slice(start, start + bytesRead));
|
this.push(pool.slice(start, start + bytesRead));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,8 +154,6 @@ function Socket(options) {
|
|||||||
this.readable = this.writable = false;
|
this.readable = this.writable = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.onend = null;
|
|
||||||
|
|
||||||
// shut down the socket when we're finished with it.
|
// shut down the socket when we're finished with it.
|
||||||
this.on('finish', onSocketFinish);
|
this.on('finish', onSocketFinish);
|
||||||
this.on('_socketEnd', onSocketEnd);
|
this.on('_socketEnd', onSocketEnd);
|
||||||
@ -507,9 +505,7 @@ function onread(nread, buffer) {
|
|||||||
self.bytesRead += nread;
|
self.bytesRead += nread;
|
||||||
|
|
||||||
// Optimization: emit the original buffer with end points
|
// Optimization: emit the original buffer with end points
|
||||||
var ret = true;
|
var ret = self.push(buffer);
|
||||||
if (self.ondata) self.ondata(buffer);
|
|
||||||
else ret = self.push(buffer);
|
|
||||||
|
|
||||||
if (handle.reading && !ret) {
|
if (handle.reading && !ret) {
|
||||||
handle.reading = false;
|
handle.reading = false;
|
||||||
@ -540,8 +536,6 @@ function onread(nread, buffer) {
|
|||||||
maybeDestroy(self);
|
maybeDestroy(self);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (self.onend) self.once('end', self.onend);
|
|
||||||
|
|
||||||
// push a null to signal the end of data.
|
// push a null to signal the end of data.
|
||||||
self.push(null);
|
self.push(null);
|
||||||
|
|
||||||
|
@ -57,14 +57,14 @@ function testServer() {
|
|||||||
|
|
||||||
request_upgradeHead = upgradeHead;
|
request_upgradeHead = upgradeHead;
|
||||||
|
|
||||||
socket.ondata = function(d, start, end) {
|
socket.on('data', function(d) {
|
||||||
var data = d.toString('utf8', start, end);
|
var data = d.toString('utf8');
|
||||||
if (data == 'kill') {
|
if (data == 'kill') {
|
||||||
socket.end();
|
socket.end();
|
||||||
} else {
|
} else {
|
||||||
socket.write(data, 'utf8');
|
socket.write(data, 'utf8');
|
||||||
}
|
}
|
||||||
};
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user