From 6db43f4c2992834274a33f818276731d4788cd7f Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 18 Mar 2010 13:21:33 -0700 Subject: [PATCH] net2 HTTPClient work --- lib/http2.js | 839 +++++++++++++------------ test/simple/test-http-client-upload.js | 4 +- 2 files changed, 452 insertions(+), 391 deletions(-) diff --git a/lib/http2.js b/lib/http2.js index ed31454c053..995a585e00e 100644 --- a/lib/http2.js +++ b/lib/http2.js @@ -4,311 +4,6 @@ var events = require('events'); var HTTPParser = process.binding('http_parser').HTTPParser; -var CRLF = "\r\n"; -var STATUS_CODES = exports.STATUS_CODES = { - 100 : 'Continue', - 101 : 'Switching Protocols', - 200 : 'OK', - 201 : 'Created', - 202 : 'Accepted', - 203 : 'Non-Authoritative Information', - 204 : 'No Content', - 205 : 'Reset Content', - 206 : 'Partial Content', - 300 : 'Multiple Choices', - 301 : 'Moved Permanently', - 302 : 'Moved Temporarily', - 303 : 'See Other', - 304 : 'Not Modified', - 305 : 'Use Proxy', - 400 : 'Bad Request', - 401 : 'Unauthorized', - 402 : 'Payment Required', - 403 : 'Forbidden', - 404 : 'Not Found', - 405 : 'Method Not Allowed', - 406 : 'Not Acceptable', - 407 : 'Proxy Authentication Required', - 408 : 'Request Time-out', - 409 : 'Conflict', - 410 : 'Gone', - 411 : 'Length Required', - 412 : 'Precondition Failed', - 413 : 'Request Entity Too Large', - 414 : 'Request-URI Too Large', - 415 : 'Unsupported Media Type', - 500 : 'Internal Server Error', - 501 : 'Not Implemented', - 502 : 'Bad Gateway', - 503 : 'Service Unavailable', - 504 : 'Gateway Time-out', - 505 : 'HTTP Version not supported' -}; - -var connectionExpression = /Connection/i; -var transferEncodingExpression = /Transfer-Encoding/i; -var closeExpression = /close/i; -var chunkExpression = /chunk/i; -var contentLengthExpression = /Content-Length/i; - - -/* Abstract base class for ServerRequest and ClientResponse. */ -function IncomingMessage (socket) { - events.EventEmitter.call(this); - - this.socket = socket; - this.httpVersion = null; - this.headers = {}; - - this.method = null; - - // response (client) only - this.statusCode = null; -} -sys.inherits(IncomingMessage, events.EventEmitter); -exports.IncomingMessage = IncomingMessage; - -IncomingMessage.prototype._parseQueryString = function () { - throw new Error("_parseQueryString is deprecated. Use require(\"querystring\") to parse query strings.\n"); -}; - -IncomingMessage.prototype.setBodyEncoding = function (enc) { - // TODO: Find a cleaner way of doing this. - this.socket.setEncoding(enc); -}; - -IncomingMessage.prototype.pause = function () { - this.socket.readPause(); -}; - -IncomingMessage.prototype.resume = function () { - this.socket.readResume(); -}; - -IncomingMessage.prototype._addHeaderLine = function (field, value) { - if (field in this.headers) { - // TODO Certain headers like 'Content-Type' should not be concatinated. - // See https://www.google.com/reader/view/?tab=my#overview-page - this.headers[field] += ", " + value; - } else { - this.headers[field] = value; - } -}; - -function OutgoingMessage () { - events.EventEmitter.call(this); - - this.output = []; - this.outputEncodings = []; - - this.closeOnFinish = false; - this.chunkEncoding = false; - this.shouldKeepAlive = true; - this.useChunkedEncodingByDefault = true; - - this.flushing = false; - - this.finished = false; -} -sys.inherits(OutgoingMessage, events.EventEmitter); -exports.OutgoingMessage = OutgoingMessage; - -OutgoingMessage.prototype._send = function (data, encoding) { - var length = this.output.length; - - if (length === 0) { - this.output.push(data); - encoding = encoding || "ascii"; - this.outputEncodings.push(encoding); - return; - } - - var lastEncoding = this.outputEncodings[length-1]; - var lastData = this.output[length-1]; - - if ((lastEncoding === encoding) || - (!encoding && data.constructor === lastData.constructor)) { - if (lastData.constructor === String) { - this.output[length-1] = lastData + data; - } else { - this.output[length-1] = lastData.concat(data); - } - return; - } - - this.output.push(data); - encoding = encoding || "ascii"; - this.outputEncodings.push(encoding); -}; - -OutgoingMessage.prototype._sendHeaderLines = function (first_line, headers) { - var sentConnectionHeader = false; - var sendContentLengthHeader = false; - var sendTransferEncodingHeader = false; - - // first_line in the case of request is: "GET /index.html HTTP/1.1\r\n" - // in the case of response it is: "HTTP/1.1 200 OK\r\n" - var messageHeader = first_line; - var field, value; - for (var i in headers) { - if (headers[i] instanceof Array) { - field = headers[i][0]; - value = headers[i][1]; - } else { - if (!headers.hasOwnProperty(i)) continue; - field = i; - value = headers[i]; - } - - messageHeader += field + ": " + value + CRLF; - - if (connectionExpression.test(field)) { - sentConnectionHeader = true; - if (closeExpression.test(value)) this.closeOnFinish = true; - - } else if (transferEncodingExpression.test(field)) { - sendTransferEncodingHeader = true; - if (chunkExpression.test(value)) this.chunkEncoding = true; - - } else if (contentLengthExpression.test(field)) { - sendContentLengthHeader = true; - - } - } - - // keep-alive logic - if (sentConnectionHeader == false) { - if (this.shouldKeepAlive && - (sendContentLengthHeader || this.useChunkedEncodingByDefault)) { - messageHeader += "Connection: keep-alive\r\n"; - } else { - this.closeOnFinish = true; - messageHeader += "Connection: close\r\n"; - } - } - - if (sendContentLengthHeader == false && sendTransferEncodingHeader == false) { - if (this.useChunkedEncodingByDefault) { - messageHeader += "Transfer-Encoding: chunked\r\n"; - this.chunkEncoding = true; - } - else { - this.closeOnFinish = true; - } - } - - messageHeader += CRLF; - - this._send(messageHeader); - // wait until the first body chunk, or finish(), is sent to flush. -}; - -OutgoingMessage.prototype.write = function (chunk, encoding) { - encoding = encoding || "ascii"; - if (this.chunkEncoding) { - this._send(process._byteLength(chunk, encoding).toString(16)); - this._send(CRLF); - this._send(chunk, encoding); - this._send(CRLF); - } else { - this._send(chunk, encoding); - } - - if (this.flushing) { - this.flush(); - } else { - this.flushing = true; - } -}; - -OutgoingMessage.prototype.sendBody = function () { - throw new Error('sendBody() renamed to write()'); -}; - - -OutgoingMessage.prototype.flush = function () { - this.emit("flush"); -}; - -OutgoingMessage.prototype.close = function () { - if (this.chunkEncoding) this._send("0\r\n\r\n"); // last chunk - this.finished = true; - this.flush(); -}; - - -function ServerResponse (req) { - OutgoingMessage.call(this); - - if (req.httpVersionMajor < 1 || req.httpVersionMinor < 1) { - this.useChunkedEncodingByDefault = false; - this.shouldKeepAlive = false; - } -} -sys.inherits(ServerResponse, OutgoingMessage); -exports.ServerResponse = ServerResponse; - -ServerResponse.prototype.writeHead = function (statusCode, headers) { - var reason = STATUS_CODES[statusCode] || "unknown"; - var status_line = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF; - this._sendHeaderLines(status_line, headers); -}; - -ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead; - -ServerResponse.prototype.sendHeader = function () { - throw new Error('sendHeader renamed to writeHead()'); -}; - - -function ClientRequest (method, url, headers) { - OutgoingMessage.call(this); - - this.shouldKeepAlive = false; - if (method === "GET" || method === "HEAD") { - this.useChunkedEncodingByDefault = false; - } else { - this.useChunkedEncodingByDefault = true; - } - this.closeOnFinish = true; - - this._sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers); -} -sys.inherits(ClientRequest, OutgoingMessage); -exports.ClientRequest = ClientRequest; - -ClientRequest.prototype.finish = function (responseListener) { - this.addListener("response", responseListener); - OutgoingMessage.prototype.finish.call(this); -}; - - -/* Returns true if the message queue is finished and the socket - * should be closed. */ -function flushMessageQueue (socket, queue) { - while (queue[0]) { - var message = queue[0]; - - while (message.output.length > 0) { - if (!socket.writable) return true; - - var data = message.output.shift(); - var encoding = message.outputEncodings.shift(); - - socket.write(data, encoding); - } - - if (!message.finished) break; - - message.emit("sent"); - queue.shift(); - - if (message.closeOnFinish) return true; - } - return false; -} - - var parserFreeList = []; function newParser (type) { @@ -379,7 +74,24 @@ function newParser (type) { }; parser.onBody = function (b, start, len) { - parser.incoming.emit("data", b.slice(start, start+len)); + // TODO body encoding? + var enc = parser.incoming._encoding; + if (!enc) { + parser.incoming.emit('data', b.slice(start, start+len)); + } else { + var string; + switch (enc) { + case 'utf8': + string = b.utf8Slice(start, start+len); + break; + case 'ascii': + string = b.asciiSlice(start, start+len); + break; + default: + throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer'); + } + parser.incoming.emit('data', string); + } }; parser.onMessageComplete = function () { @@ -393,13 +105,382 @@ function freeParser (parser) { if (parserFreeList.length < 1000) parserFreeList.push(parser); } + +var CRLF = "\r\n"; +var STATUS_CODES = exports.STATUS_CODES = { + 100 : 'Continue', + 101 : 'Switching Protocols', + 200 : 'OK', + 201 : 'Created', + 202 : 'Accepted', + 203 : 'Non-Authoritative Information', + 204 : 'No Content', + 205 : 'Reset Content', + 206 : 'Partial Content', + 300 : 'Multiple Choices', + 301 : 'Moved Permanently', + 302 : 'Moved Temporarily', + 303 : 'See Other', + 304 : 'Not Modified', + 305 : 'Use Proxy', + 400 : 'Bad Request', + 401 : 'Unauthorized', + 402 : 'Payment Required', + 403 : 'Forbidden', + 404 : 'Not Found', + 405 : 'Method Not Allowed', + 406 : 'Not Acceptable', + 407 : 'Proxy Authentication Required', + 408 : 'Request Time-out', + 409 : 'Conflict', + 410 : 'Gone', + 411 : 'Length Required', + 412 : 'Precondition Failed', + 413 : 'Request Entity Too Large', + 414 : 'Request-URI Too Large', + 415 : 'Unsupported Media Type', + 500 : 'Internal Server Error', + 501 : 'Not Implemented', + 502 : 'Bad Gateway', + 503 : 'Service Unavailable', + 504 : 'Gateway Time-out', + 505 : 'HTTP Version not supported' +}; + +var connection_expression = /Connection/i; +var transfer_encoding_expression = /Transfer-Encoding/i; +var close_expression = /close/i; +var chunk_expression = /chunk/i; +var content_length_expression = /Content-Length/i; + + +/* Abstract base class for ServerRequest and ClientResponse. */ +function IncomingMessage (socket) { + events.EventEmitter.call(this); + + this.socket = socket; + this.httpVersion = null; + this.headers = {}; + + // request (server) only + this.url = ""; + + this.method = null; + + // response (client) only + this.statusCode = null; + this.client = this.socket; +} +sys.inherits(IncomingMessage, events.EventEmitter); +exports.IncomingMessage = IncomingMessage; + +IncomingMessage.prototype._parseQueryString = function () { + throw new Error("_parseQueryString is deprecated. Use require(\"querystring\") to parse query strings.\n"); +}; + +IncomingMessage.prototype.setBodyEncoding = function (enc) { + // TODO deprecation message? + this.setEncoding(enc); +}; + +IncomingMessage.prototype.setEncoding = function (enc) { + // TODO check values, error out on bad, and deprecation message? + this._encoding = enc.toLowerCase(); +}; + +IncomingMessage.prototype.pause = function () { + this.socket.pause(); +}; + +IncomingMessage.prototype.resume = function () { + this.socket.resume(); +}; + +IncomingMessage.prototype._addHeaderLine = function (field, value) { + if (field in this.headers) { + // TODO Certain headers like 'Content-Type' should not be concatinated. + // See https://www.google.com/reader/view/?tab=my#overview-page + this.headers[field] += ", " + value; + } else { + this.headers[field] = value; + } +}; + +function OutgoingMessage (socket) { + events.EventEmitter.call(this, socket); + + this.socket = socket; + + this.output = []; + this.outputEncodings = []; + + this.closeOnFinish = false; + this.chunked_encoding = false; + this.should_keep_alive = true; + this.use_chunked_encoding_by_default = true; + + this.flushing = false; + this.headWritten = false; + + this.finished = false; +} +sys.inherits(OutgoingMessage, events.EventEmitter); +exports.OutgoingMessage = OutgoingMessage; + +OutgoingMessage.prototype._send = function (data, encoding) { + var length = this.output.length; + + if (length === 0) { + this.output.push(data); + encoding = encoding || "ascii"; + this.outputEncodings.push(encoding); + return; + } + + var lastEncoding = this.outputEncodings[length-1]; + var lastData = this.output[length-1]; + + if ((lastEncoding === encoding) || + (!encoding && data.constructor === lastData.constructor)) { + if (lastData.constructor === String) { + this.output[length-1] = lastData + data; + } else { + this.output[length-1] = lastData.concat(data); + } + return; + } + + this.output.push(data); + encoding = encoding || "ascii"; + this.outputEncodings.push(encoding); +}; + +OutgoingMessage.prototype.sendHeaderLines = function (first_line, headers) { + var sent_connection_header = false; + var sent_content_length_header = false; + var sent_transfer_encoding_header = false; + + // first_line in the case of request is: "GET /index.html HTTP/1.1\r\n" + // in the case of response it is: "HTTP/1.1 200 OK\r\n" + var message_header = first_line; + var field, value; + for (var i in headers) { + if (headers[i] instanceof Array) { + field = headers[i][0]; + value = headers[i][1]; + } else { + if (!headers.hasOwnProperty(i)) continue; + field = i; + value = headers[i]; + } + + message_header += field + ": " + value + CRLF; + + if (connection_expression.test(field)) { + sent_connection_header = true; + if (close_expression.test(value)) this.closeOnFinish = true; + + } else if (transfer_encoding_expression.test(field)) { + sent_transfer_encoding_header = true; + if (chunk_expression.test(value)) this.chunked_encoding = true; + + } else if (content_length_expression.test(field)) { + sent_content_length_header = true; + + } + } + + // keep-alive logic + if (sent_connection_header == false) { + if (this.should_keep_alive && + (sent_content_length_header || this.use_chunked_encoding_by_default)) { + message_header += "Connection: keep-alive\r\n"; + } else { + this.closeOnFinish = true; + message_header += "Connection: close\r\n"; + } + } + + if (sent_content_length_header == false && sent_transfer_encoding_header == false) { + if (this.use_chunked_encoding_by_default) { + message_header += "Transfer-Encoding: chunked\r\n"; + this.chunked_encoding = true; + } + else { + this.closeOnFinish = true; + } + } + + message_header += CRLF; + + this._send(message_header); + // wait until the first body chunk, or close(), is sent to flush. +}; + + +OutgoingMessage.prototype.sendBody = function () { + throw new Error("sendBody() has been renamed to write(). " + + "The 'body' event has been renamed to 'data' and " + + "the 'complete' event has been renamed to 'end'."); +}; + + +OutgoingMessage.prototype.write = function (chunk, encoding) { + if ( (this instanceof ServerResponse) && !this.headWritten) { + throw new Error("writeHead() must be called before write()") + } + + encoding = encoding || "ascii"; + if (this.chunked_encoding) { + this._send(process._byteLength(chunk, encoding).toString(16)); + this._send(CRLF); + this._send(chunk, encoding); + this._send(CRLF); + } else { + this._send(chunk, encoding); + } + + if (this.flushing) { + this.flush(); + } else { + this.flushing = true; + } +}; + +OutgoingMessage.prototype.flush = function () { + this.emit("flush"); +}; + +OutgoingMessage.prototype.finish = function () { + throw new Error("finish() has been renamed to close()."); +}; + +OutgoingMessage.prototype.close = function () { + if (this.chunked_encoding) this._send("0\r\n\r\n"); // last chunk + this.finished = true; + this.flush(); +}; + + +function ServerResponse (req) { + OutgoingMessage.call(this, req.socket); + + if (req.httpVersionMajor < 1 || req.httpVersionMinor < 1) { + this.use_chunked_encoding_by_default = false; + this.should_keep_alive = false; + } +} +sys.inherits(ServerResponse, OutgoingMessage); +exports.ServerResponse = ServerResponse; + + +ServerResponse.prototype.writeHead = function (statusCode) { + var reasonPhrase, headers, headerIndex; + + if (typeof arguments[1] == 'string') { + reasonPhrase = arguments[1]; + headerIndex = 2; + } else { + reasonPhrase = STATUS_CODES[statusCode] || "unknown"; + headerIndex = 1; + } + + if (typeof arguments[headerIndex] == 'object') { + headers = arguments[headerIndex]; + } else { + headers = {}; + } + + var status_line = "HTTP/1.1 " + statusCode.toString() + " " + + reasonPhrase + CRLF; + this.sendHeaderLines(status_line, headers); + this.headWritten = true; +}; + +// TODO eventually remove sendHeader(), writeHeader() +ServerResponse.prototype.sendHeader = ServerResponse.prototype.writeHead; +ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead; + +function ClientRequest (socket, method, url, headers) { + OutgoingMessage.call(this, socket); + + this.should_keep_alive = false; + if (method === "GET" || method === "HEAD") { + this.use_chunked_encoding_by_default = false; + } else { + this.use_chunked_encoding_by_default = true; + } + this.closeOnFinish = true; + + this.sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers); +} +sys.inherits(ClientRequest, OutgoingMessage); +exports.ClientRequest = ClientRequest; + +ClientRequest.prototype.finish = function () { + throw new Error( "finish() has been renamed to close() and no longer takes " + + "a response handler as an argument. Manually add a 'response' listener " + + "to the request object." + ); +}; + +ClientRequest.prototype.close = function () { + if (arguments.length > 0) { + throw new Error( "ClientRequest.prototype.close does not take any arguments. " + + "Add a response listener manually to the request object." + ); + } + OutgoingMessage.prototype.close.call(this); +}; + + +/* Returns true if the message queue is finished and the socket + * should be closed. */ +function flushMessageQueue (socket, queue) { + while (queue[0]) { + var message = queue[0]; + + while (message.output.length > 0) { + if (!socket.writable) return true; + + var data = message.output.shift(); + var encoding = message.outputEncodings.shift(); + + socket.write(data, encoding); + } + + if (!message.finished) break; + + message.emit("sent"); + queue.shift(); + + if (message.closeOnFinish) return true; + } + return false; +} + + +function Server (requestListener) { + net.Server.call(this); + this.addListener("request", requestListener); + this.addListener("connection", connectionListener); +} +sys.inherits(Server, net.Server); + +exports.Server = Server; + +exports.createServer = function (requestListener) { + return new Server(requestListener); +}; + function connectionListener (socket) { var self = this; - var parser = newParser('request'); // An array of responses for each socket. In pipelined connections // we need to keep track of the order they were sent. var responses = []; + var parser = newParser('request'); + socket.ondata = function (d, start, end) { parser.execute(d, start, end - start); }; @@ -437,64 +518,56 @@ function connectionListener (socket) { } -function Server (requestListener, options) { - net.Server.call(this, connectionListener); - //server.setOptions(options); - this.addListener('request', requestListener); -} -sys.inherits(Server, net.Server); -exports.Server = Server; -exports.createServer = function (requestListener, options) { - return new Server(requestListener, options); -}; - - - -function Client () { +function Client ( ) { net.Stream.call(this); var self = this; + var requests = []; var currentRequest; var parser = newParser('response'); - parser.socket = self; + parser.socket = this; + + self._reconnect = function () { + if (self.readyState != "opening") { + //sys.debug("HTTP CLIENT: reconnecting readyState = " + self.readyState); + self.connect(self.port, self.host); + } + }; + + self._pushRequest = function (req) { + req.addListener("flush", function () { + /* + if (self.readyState == "closed") { + //sys.debug("HTTP CLIENT request flush. reconnect. readyState = " + self.readyState); + self._reconnect(); + return; + } + */ + //sys.debug("self flush readyState = " + self.readyState); + if (req == currentRequest) flushMessageQueue(self, [req]); + }); + requests.push(req); + }; + + this.ondata = function (d, start, end) { + parser.execute(d, start, end - start); + }; self.addListener("connect", function () { - self.resetParser(); + parser.reinitialize('response'); currentRequest = requests.shift(); currentRequest.flush(); }); - self.ondata = function (d, start, end) { - parser.execute(d, start, end - start); - }; - - parser.onIncoming = function (res) { - //sys.debug("incoming response!"); - - res.addListener('end', function ( ) { - //sys.debug("request complete disconnecting. readyState = " + self.readyState); - self.close(); - }); - - currentRequest.emit("response", res); - }; - - self._pushRequest = function (req) { - }; - self.addListener("end", function () { - self.close(); - }); - - self.onend = function () { parser.finish(); - // unref the parser for easy gc freeParser(parser); + //sys.debug("self got end closing. readyState = " + self.readyState); self.close(); - }; + }); self.addListener("close", function (had_error) { if (had_error) { @@ -509,54 +582,31 @@ function Client () { self._reconnect(); } }); -} -sys.inherits(Client, net.Stream); + parser.onIncoming = function (res) { + sys.debug("incoming response!"); + + res.addListener('end', function ( ) { + //sys.debug("request complete disconnecting. readyState = " + self.readyState); + self.close(); + }); + + currentRequest.emit("response", res); + }; +}; +sys.inherits(Client, net.Stream); exports.Client = Client; - exports.createClient = function (port, host) { - var client = new Client(); - client.port = port; - client.host = host; - client.connect(port, host); - return client; -}; + var c = new Client; + c.port = port; + c.host = host; + c.connect(port, host); + return c; +} -Client.prototype._reconnect = function () { - if (this.readyState != "opening") { - //sys.debug("HTTP CLIENT: reconnecting readyState = " + self.readyState); - this.connect(this.port, this.host); - } -}; - - -Client.prototype.request = function (method, url, headers) { - var self = this; - - if (typeof(url) != "string") { // assume method was omitted, shift arguments - headers = url; - url = method; - method = null; - } - var req = new ClientRequest(this, method || "GET", url, headers); - - req.addListener("flush", function () { - if (self.readyState == "closed") { - //sys.debug("HTTP CLIENT request flush. reconnect. readyState = " + self.readyState); - self._reconnect(); - return; - } - //sys.debug("self flush readyState = " + self.readyState); - if (req == currentRequest) flushMessageQueue(self, [req]); - }); - requests.push(req); - - return req; -}; - Client.prototype.get = function () { throw new Error("client.get(...) is now client.request('GET', ...)"); }; @@ -577,9 +627,20 @@ Client.prototype.put = function () { throw new Error("client.put(...) is now client.request('PUT', ...)"); }; +Client.prototype.request = function (method, url, headers) { + if (typeof(url) != "string") { // assume method was omitted, shift arguments + headers = url; + url = method; + method = null; + } + var req = new ClientRequest(this, method || "GET", url, headers); + this._pushRequest(req); + return req; +}; + exports.cat = function (url, encoding_, headers_) { - var encoding = 'utf8', + var encoding = 'utf8', headers = {}, callback = null; diff --git a/test/simple/test-http-client-upload.js b/test/simple/test-http-client-upload.js index 4c11bd4078c..e1d746c35d2 100644 --- a/test/simple/test-http-client-upload.js +++ b/test/simple/test-http-client-upload.js @@ -1,5 +1,5 @@ require("../common"); -http = require("http"); +http = require("http2"); var sent_body = ""; var server_req_complete = false; @@ -33,7 +33,7 @@ req.write('3\n'); puts("client finished sending request"); req.addListener('response', function(res) { - res.setBodyEncoding("utf8"); + res.setEncoding("utf8"); res.addListener('data', function(chunk) { puts(chunk); });