From 0e57aafbb1c4ad9c8157bfe7f5718e867861492e Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Mon, 7 May 2012 23:30:55 +0200 Subject: [PATCH] Optimize writing strings with Socket.write --- lib/child_process.js | 4 +- lib/net.js | 48 ++++++- src/pipe_wrap.cc | 6 +- src/stream_wrap.cc | 198 ++++++++++++++++++++++++++-- src/stream_wrap.h | 9 +- src/tcp_wrap.cc | 6 +- src/tty_wrap.cc | 6 +- test/simple/test-tcp-wrap-listen.js | 5 +- 8 files changed, 260 insertions(+), 22 deletions(-) diff --git a/lib/child_process.js b/lib/child_process.js index d40566f3d63..fdea1440825 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -112,12 +112,12 @@ function setupChannel(target, channel) { return false; } - var buffer = Buffer(JSON.stringify(message) + '\n'); + var string = JSON.stringify(message) + '\n'; // Update simultaneous accepts on Windows net._setSimultaneousAccepts(sendHandle); - var writeReq = channel.write(buffer, 0, buffer.length, sendHandle); + var writeReq = channel.writeUtf8String(string, sendHandle); if (!writeReq) { var er = errnoException(errno, 'write', 'cannot write to IPC channel.'); diff --git a/lib/net.js b/lib/net.js index 4ebc940c3c3..ebf3962e8f7 100644 --- a/lib/net.js +++ b/lib/net.js @@ -477,9 +477,22 @@ Socket.prototype.write = function(data, arg1, arg2) { } } - // Change strings to buffers. SLOW if (typeof data === 'string') { - data = new Buffer(data, encoding); + encoding = (encoding || 'utf8').toLowerCase(); + switch (encoding) { + case 'utf8': + case 'utf-8': + case 'ascii': + case 'ucs2': + case 'ucs-2': + case 'utf16le': + case 'utf-16le': + // This encoding can be handled in the binding layer. + break; + + default: + data = new Buffer(data, encoding); + } } else if (!Buffer.isBuffer(data)) { throw new TypeError('First argument must be a buffer or a string.'); } @@ -509,8 +522,33 @@ Socket.prototype._write = function(data, encoding, cb) { return false; } - // `encoding` is unused right now, `data` is always a buffer. - var writeReq = this._handle.write(data); + var writeReq; + + if (Buffer.isBuffer(data)) { + writeReq = this._handle.writeBuffer(data); + + } else { + switch (encoding) { + case 'utf8': + case 'utf-8': + writeReq = this._handle.writeUtf8String(data); + break; + + case 'ascii': + writeReq = this._handle.writeAsciiString(data); + break; + + case 'ucs2': + case 'ucs-2': + case 'utf16le': + case 'utf-16le': + writeReq = this._handle.writeUcs2String(data); + break; + + default: + assert(0); + } + } if (!writeReq || typeof writeReq !== 'object') { this._destroy(errnoException(errno, 'write'), cb); @@ -525,7 +563,7 @@ Socket.prototype._write = function(data, encoding, cb) { }; -function afterWrite(status, handle, req, buffer) { +function afterWrite(status, handle, req) { var self = handle.socket; // callback may come after call to destroy. diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index 8065461c69e..d98f901e155 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -100,9 +100,13 @@ void PipeWrap::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart); NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop); - NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write); NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown); + NODE_SET_PROTOTYPE_METHOD(t, "writeBuffer", StreamWrap::WriteBuffer); + NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString); + NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String); + NODE_SET_PROTOTYPE_METHOD(t, "writeUtf16String", StreamWrap::WriteUcs2String); + NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind); NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen); NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 9fcb5b1b954..e67396c12d5 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -48,6 +48,8 @@ using v8::TryCatch; using v8::Context; using v8::Arguments; using v8::Integer; +using v8::Number; +using v8::Exception; #define UNWRAP \ @@ -64,10 +66,25 @@ using v8::Integer; typedef class ReqWrap ShutdownWrap; -typedef class ReqWrap WriteWrap; + +class WriteWrap: public ReqWrap { + public: + void* operator new(size_t size, char* storage) { return storage; } + + // This is just to keep the compiler happy. It should never be called, since + // we don't use exceptions in node. + void operator delete(void* ptr, char* storage) { assert(0); } + + protected: + // People should not be using the non-placement new and delete operator on a + // WriteWrap. Ensure this never happens. + void* operator new (size_t size) { assert(0); }; + void operator delete(void* ptr) { assert(0); }; +}; static Persistent buffer_sym; +static Persistent bytes_sym; static Persistent write_queue_size_sym; static Persistent onread_sym; static Persistent oncomplete_sym; @@ -84,6 +101,7 @@ void StreamWrap::Initialize(Handle target) { HandleWrap::Initialize(target); buffer_sym = NODE_PSYMBOL("buffer"); + bytes_sym = NODE_PSYMBOL("bytes"); write_queue_size_sym = NODE_PSYMBOL("writeQueueSize"); onread_sym = NODE_PSYMBOL("onread"); oncomplete_sym = NODE_PSYMBOL("oncomplete"); @@ -226,7 +244,7 @@ void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf, } -Handle StreamWrap::Write(const Arguments& args) { +Handle StreamWrap::WriteBuffer(const Arguments& args) { HandleScope scope; UNWRAP @@ -248,7 +266,15 @@ Handle StreamWrap::Write(const Arguments& args) { length = args[2]->IntegerValue(); } - WriteWrap* req_wrap = new WriteWrap(); + if (length > INT_MAX) { + uv_err_t err; + err.code = UV_ENOBUFS; + SetErrno(err); + return scope.Close(v8::Null()); + } + + char* storage = new char[sizeof(WriteWrap)]; + WriteWrap* req_wrap = new (storage) WriteWrap(); req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj); @@ -280,12 +306,14 @@ Handle StreamWrap::Write(const Arguments& args) { } req_wrap->Dispatched(); + req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) length)); wrap->UpdateWriteQueueSize(); if (r) { SetErrno(uv_last_error(uv_default_loop())); - delete req_wrap; + req_wrap->~WriteWrap(); + delete[] storage; return scope.Close(v8::Null()); } else { return scope.Close(req_wrap->object_); @@ -293,6 +321,160 @@ Handle StreamWrap::Write(const Arguments& args) { } +enum WriteEncoding { + kAscii, + kUtf8, + kUcs2 +}; + +template +Handle StreamWrap::WriteStringImpl(const Arguments& args) { + HandleScope scope; + int r; + + UNWRAP + + if (args.Length() < 1) + return ThrowTypeError("Not enough arguments"); + + Local string = args[0]->ToString(); + + // Compute the size of the storage that the string will be flattened into. + size_t storage_size; + switch (encoding) { + case kAscii: + storage_size = string->Length(); + break; + + case kUtf8: + if (!(string->MayContainNonAscii())) { + // If the string has only ascii characters, we know exactly how big + // the storage should be. + storage_size = string->Length(); + } else if (string->Length() < 65536) { + // A single UCS2 codepoint never takes up more than 3 utf8 bytes. + // Unless the string is really long we just allocate so much space that + // we're certain the string fits in there entirely. + // TODO: maybe check handle->write_queue_size instead of string length? + storage_size = 3 * string->Length(); + } else { + // The string is really long. Compute the allocation size that we + // actually need. + storage_size = string->Utf8Length(); + } + break; + + case kUcs2: + storage_size += string->Length() * sizeof(uint16_t); + break; + + default: + // Unreachable. + assert(0); + } + + if (storage_size > INT_MAX) { + uv_err_t err; + err.code = UV_ENOBUFS; + SetErrno(err); + return scope.Close(v8::Null()); + } + + char* storage = new char[sizeof(WriteWrap) + storage_size + 15]; + WriteWrap* req_wrap = new (storage) WriteWrap(); + + char* data = reinterpret_cast(ROUND_UP( + reinterpret_cast(storage) + sizeof(WriteWrap), 16)); + size_t data_size; + switch (encoding) { + case kAscii: + data_size = string->WriteAscii(data, 0, -1, + String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); + break; + + case kUtf8: + data_size = string->WriteUtf8(data, -1, NULL, + String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); + break; + + case kUcs2: { + int chars_copied = string->Write((uint16_t*) data, 0, -1, + String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); + data_size = chars_copied * sizeof(uint16_t); + break; + } + + default: + // Unreachable + assert(0); + } + + assert(data_size <= storage_size); + + uv_buf_t buf; + buf.base = data; + buf.len = data_size; + + bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE && + ((uv_pipe_t*)wrap->stream_)->ipc; + + if (!ipc_pipe) { + r = uv_write(&req_wrap->req_, + wrap->stream_, + &buf, + 1, + StreamWrap::AfterWrite); + + } else { + uv_stream_t* send_stream = NULL; + + if (args[1]->IsObject()) { + Local send_stream_obj = args[1]->ToObject(); + assert(send_stream_obj->InternalFieldCount() > 0); + StreamWrap* send_stream_wrap = static_cast( + send_stream_obj->GetPointerFromInternalField(0)); + send_stream = send_stream_wrap->GetStream(); + } + + r = uv_write2(&req_wrap->req_, + wrap->stream_, + &buf, + 1, + send_stream, + StreamWrap::AfterWrite); + } + + req_wrap->Dispatched(); + req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) data_size)); + + wrap->UpdateWriteQueueSize(); + + if (r) { + SetErrno(uv_last_error(uv_default_loop())); + req_wrap->~WriteWrap(); + delete[] storage; + return scope.Close(v8::Null()); + } else { + return scope.Close(req_wrap->object_); + } +} + + +Handle StreamWrap::WriteAsciiString(const Arguments& args) { + return WriteStringImpl(args); +} + + +Handle StreamWrap::WriteUtf8String(const Arguments& args) { + return WriteStringImpl(args); +} + + +Handle StreamWrap::WriteUcs2String(const Arguments& args) { + return WriteStringImpl(args); +} + + void StreamWrap::AfterWrite(uv_write_t* req, int status) { WriteWrap* req_wrap = (WriteWrap*) req->data; StreamWrap* wrap = (StreamWrap*) req->handle->data; @@ -309,16 +491,16 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) { wrap->UpdateWriteQueueSize(); - Local argv[4] = { + Local argv[] = { Integer::New(status), Local::New(wrap->object_), - Local::New(req_wrap->object_), - req_wrap->object_->GetHiddenValue(buffer_sym), + Local::New(req_wrap->object_) }; MakeCallback(req_wrap->object_, oncomplete_sym, ARRAY_SIZE(argv), argv); - delete req_wrap; + req_wrap->~WriteWrap(); + delete[] reinterpret_cast(req_wrap); } diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 278fda78270..40947d5085b 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -35,11 +35,15 @@ class StreamWrap : public HandleWrap { static void Initialize(v8::Handle target); // JavaScript functions - static v8::Handle Write(const v8::Arguments& args); static v8::Handle ReadStart(const v8::Arguments& args); static v8::Handle ReadStop(const v8::Arguments& args); static v8::Handle Shutdown(const v8::Arguments& args); + static v8::Handle WriteBuffer(const v8::Arguments& args); + static v8::Handle WriteAsciiString(const v8::Arguments& args); + static v8::Handle WriteUtf8String(const v8::Arguments& args); + static v8::Handle WriteUcs2String(const v8::Arguments& args); + protected: StreamWrap(v8::Handle object, uv_stream_t* stream); virtual ~StreamWrap() { } @@ -61,6 +65,9 @@ class StreamWrap : public HandleWrap { static void OnReadCommon(uv_stream_t* handle, ssize_t nread, uv_buf_t buf, uv_handle_type pending); + template + static v8::Handle WriteStringImpl(const v8::Arguments& args); + size_t slab_offset_; uv_stream_t* stream_; }; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index a158c4587b7..4a38602145c 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -110,9 +110,13 @@ void TCPWrap::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart); NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop); - NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write); NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown); + NODE_SET_PROTOTYPE_METHOD(t, "writeBuffer", StreamWrap::WriteBuffer); + NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString); + NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String); + NODE_SET_PROTOTYPE_METHOD(t, "writeUtf16String", StreamWrap::WriteUcs2String); + NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind); NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen); NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc index 8647ea0b9cb..f3a956b29fc 100644 --- a/src/tty_wrap.cc +++ b/src/tty_wrap.cc @@ -73,7 +73,11 @@ class TTYWrap : StreamWrap { NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart); NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop); - NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write); + + NODE_SET_PROTOTYPE_METHOD(t, "writeBuffer", StreamWrap::WriteBuffer); + NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString); + NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String); + NODE_SET_PROTOTYPE_METHOD(t, "writeUtf16String", StreamWrap::WriteUcs2String); NODE_SET_PROTOTYPE_METHOD(t, "getWindowSize", TTYWrap::GetWindowSize); NODE_SET_PROTOTYPE_METHOD(t, "setRawMode", SetRawMode); diff --git a/test/simple/test-tcp-wrap-listen.js b/test/simple/test-tcp-wrap-listen.js index 18c2c642d0d..a272a193d26 100644 --- a/test/simple/test-tcp-wrap-listen.js +++ b/test/simple/test-tcp-wrap-listen.js @@ -55,21 +55,20 @@ server.onconnection = function(client) { assert.equal(0, client.writeQueueSize); - var req = client.write(buffer, offset, length); + var req = client.writeBuffer(buffer.slice(offset, offset + length)); client.pendingWrites.push(req); console.log('client.writeQueueSize: ' + client.writeQueueSize); // 11 bytes should flush assert.equal(0, client.writeQueueSize); - req.oncomplete = function(status, client_, req_, buffer_) { + req.oncomplete = function(status, client_, req_) { assert.equal(req, client.pendingWrites.shift()); // Check parameters. assert.equal(0, status); assert.equal(client, client_); assert.equal(req, req_); - assert.equal(buffer, buffer_); console.log('client.writeQueueSize: ' + client.writeQueueSize); assert.equal(0, client.writeQueueSize);