stream_wrap: use uv_try_write where possible

Use `uv_try_write` for string and buffer writes, thus avoiding to do
allocations and copying in some of the cases.
This commit is contained in:
Fedor Indutny 2014-01-29 02:48:10 +04:00
parent eaf76648a6
commit 9836a4eeda
9 changed files with 148 additions and 37 deletions

View File

@ -51,7 +51,7 @@ function server() {
if (nread < 0) if (nread < 0)
fail(nread, 'read'); fail(nread, 'read');
var writeReq = {}; var writeReq = { async: false };
err = clientHandle.writeBuffer(writeReq, buffer); err = clientHandle.writeBuffer(writeReq, buffer);
if (err) if (err)

View File

@ -68,7 +68,7 @@ function server() {
write(); write();
function write() { function write() {
var writeReq = { oncomplete: afterWrite }; var writeReq = { async: false, oncomplete: afterWrite };
var err; var err;
switch (type) { switch (type) {
case 'buf': case 'buf':
@ -82,8 +82,13 @@ function server() {
break; break;
} }
if (err) if (err) {
fail(err, 'write'); fail(err, 'write');
} else if (!writeReq.async) {
process.nextTick(function() {
afterWrite(null, clientHandle, writeReq);
});
}
} }
function afterWrite(err, handle, req) { function afterWrite(err, handle, req) {

View File

@ -626,7 +626,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
return false; return false;
} }
var req = { oncomplete: afterWrite }; var req = { oncomplete: afterWrite, async: false };
var err; var err;
if (writev) { if (writev) {
@ -660,10 +660,10 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// If it was entirely flushed, we can write some more right now. // If it was entirely flushed, we can write some more right now.
// However, if more is left in the queue, then wait until that clears. // However, if more is left in the queue, then wait until that clears.
if (this._handle.writeQueueSize === 0) if (req.async && this._handle.writeQueueSize != 0)
cb();
else
req.cb = cb; req.cb = cb;
else
cb();
}; };

View File

@ -53,6 +53,7 @@ namespace node {
#define PER_ISOLATE_STRING_PROPERTIES(V) \ #define PER_ISOLATE_STRING_PROPERTIES(V) \
V(address_string, "address") \ V(address_string, "address") \
V(atime_string, "atime") \ V(atime_string, "atime") \
V(async, "async") \
V(async_queue_string, "_asyncQueue") \ V(async_queue_string, "_asyncQueue") \
V(birthtime_string, "birthtime") \ V(birthtime_string, "birthtime") \
V(blksize_string, "blksize") \ V(blksize_string, "blksize") \

View File

@ -33,6 +33,7 @@
#include "util-inl.h" #include "util-inl.h"
#include <stdlib.h> // abort() #include <stdlib.h> // abort()
#include <string.h> // memcpy()
#include <limits.h> // INT_MAX #include <limits.h> // INT_MAX
@ -49,6 +50,7 @@ using v8::Number;
using v8::Object; using v8::Object;
using v8::PropertyCallbackInfo; using v8::PropertyCallbackInfo;
using v8::String; using v8::String;
using v8::True;
using v8::Undefined; using v8::Undefined;
using v8::Value; using v8::Value;
@ -200,30 +202,43 @@ void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
Local<Object> buf_obj = args[1].As<Object>(); Local<Object> buf_obj = args[1].As<Object>();
size_t length = Buffer::Length(buf_obj); size_t length = Buffer::Length(buf_obj);
char* storage = new char[sizeof(WriteWrap)];
WriteWrap* req_wrap =
new(storage) WriteWrap(env, req_wrap_obj, wrap);
char* storage;
WriteWrap* req_wrap;
uv_buf_t buf; uv_buf_t buf;
WriteBuffer(buf_obj, &buf); WriteBuffer(buf_obj, &buf);
int err = wrap->callbacks()->DoWrite(req_wrap, // Try writing immediately without allocation
&buf, uv_buf_t* bufs = &buf;
1, size_t count = 1;
int err = wrap->callbacks()->TryWrite(&bufs, &count);
if (err == 0)
goto done;
assert(count == 1);
// Allocate, or write rest
storage = new char[sizeof(WriteWrap)];
req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
err = wrap->callbacks()->DoWrite(req_wrap,
bufs,
count,
NULL, NULL,
StreamWrap::AfterWrite); StreamWrap::AfterWrite);
req_wrap->Dispatched(); req_wrap->Dispatched();
req_wrap_obj->Set(env->bytes_string(), req_wrap_obj->Set(env->async(), True(node_isolate));
Integer::NewFromUnsigned(length, node_isolate));
const char* msg = wrap->callbacks()->Error();
if (msg != NULL)
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
if (err) { if (err) {
req_wrap->~WriteWrap(); req_wrap->~WriteWrap();
delete[] storage; delete[] storage;
} }
done:
const char* msg = wrap->callbacks()->Error();
if (msg != NULL)
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(length, node_isolate));
args.GetReturnValue().Set(err); args.GetReturnValue().Set(err);
} }
@ -256,22 +271,53 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
return; return;
} }
char* storage = new char[sizeof(WriteWrap) + storage_size + 15]; // Try writing immediately if write size isn't too big
WriteWrap* req_wrap = char* storage;
new(storage) WriteWrap(env, req_wrap_obj, wrap); WriteWrap* req_wrap;
char* data;
char stack_storage[16384]; // 16kb
size_t data_size;
uv_buf_t buf;
char* data = reinterpret_cast<char*>(ROUND_UP( bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
(!wrap->is_named_pipe_ipc() || !args[2]->IsObject());
if (try_write) {
data_size = StringBytes::Write(stack_storage,
storage_size,
string,
encoding);
buf = uv_buf_init(stack_storage, data_size);
uv_buf_t* bufs = &buf;
size_t count = 1;
err = wrap->callbacks()->TryWrite(&bufs, &count);
// Success
if (err == 0)
goto done;
// Failure, or partial write
assert(count == 1);
}
storage = new char[sizeof(WriteWrap) + storage_size + 15];
req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16)); reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
size_t data_size; if (try_write) {
// Copy partial data
memcpy(data, buf.base, buf.len);
data_size = buf.len;
} else {
// Write it
data_size = StringBytes::Write(data, storage_size, string, encoding); data_size = StringBytes::Write(data, storage_size, string, encoding);
}
assert(data_size <= storage_size); assert(data_size <= storage_size);
uv_buf_t buf; buf = uv_buf_init(data, data_size);
buf.base = data;
buf.len = data_size;
if (!wrap->is_named_pipe_ipc()) { if (!wrap->is_named_pipe_ipc()) {
err = wrap->callbacks()->DoWrite(req_wrap, err = wrap->callbacks()->DoWrite(req_wrap,
@ -301,17 +347,19 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
} }
req_wrap->Dispatched(); req_wrap->Dispatched();
req_wrap->object()->Set(env->bytes_string(), req_wrap->object()->Set(env->async(), True(node_isolate));
Integer::NewFromUnsigned(data_size, node_isolate));
const char* msg = wrap->callbacks()->Error();
if (msg != NULL)
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
if (err) { if (err) {
req_wrap->~WriteWrap(); req_wrap->~WriteWrap();
delete[] storage; delete[] storage;
} }
done:
const char* msg = wrap->callbacks()->Error();
if (msg != NULL)
req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
req_wrap_obj->Set(env->bytes_string(),
Integer::NewFromUnsigned(data_size, node_isolate));
args.GetReturnValue().Set(err); args.GetReturnValue().Set(err);
} }
@ -405,6 +453,7 @@ void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
delete[] bufs; delete[] bufs;
req_wrap->Dispatched(); req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(node_isolate));
req_wrap->object()->Set(env->bytes_string(), req_wrap->object()->Set(env->bytes_string(),
Number::New(node_isolate, bytes)); Number::New(node_isolate, bytes));
const char* msg = wrap->callbacks()->Error(); const char* msg = wrap->callbacks()->Error();
@ -518,6 +567,47 @@ const char* StreamWrapCallbacks::Error() {
} }
// NOTE: Call to this function could change both `buf`'s and `count`'s
// values, shifting their base and decrementing their length. This is
// required in order to skip the data that was successfully written via
// uv_try_write().
int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
int err;
size_t written;
uv_buf_t* vbufs = *bufs;
size_t vcount = *count;
err = uv_try_write(wrap()->stream(), vbufs, vcount);
if (err < 0)
return err;
// Slice off the buffers: skip all written buffers and slice the one that
// was partially written.
written = err;
for (; written != 0 && vcount > 0; vbufs++, vcount--) {
// Slice
if (vbufs[0].len > written) {
vbufs[0].base += written;
vbufs[0].len -= written;
written = 0;
break;
// Discard
} else {
written -= vbufs[0].len;
}
}
*bufs = vbufs;
*count = vcount;
if (vcount == 0)
return 0;
else
return -1;
}
int StreamWrapCallbacks::DoWrite(WriteWrap* w, int StreamWrapCallbacks::DoWrite(WriteWrap* w,
uv_buf_t* bufs, uv_buf_t* bufs,
size_t count, size_t count,

View File

@ -74,6 +74,9 @@ class StreamWrapCallbacks {
} }
virtual const char* Error(); virtual const char* Error();
virtual int TryWrite(uv_buf_t** bufs, size_t* count);
virtual int DoWrite(WriteWrap* w, virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs, uv_buf_t* bufs,
size_t count, size_t count,

View File

@ -511,6 +511,12 @@ const char* TLSCallbacks::Error() {
} }
int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
// TODO(indutny): Support it
return -1;
}
int TLSCallbacks::DoWrite(WriteWrap* w, int TLSCallbacks::DoWrite(WriteWrap* w,
uv_buf_t* bufs, uv_buf_t* bufs,
size_t count, size_t count,

View File

@ -51,6 +51,7 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>,
v8::Handle<v8::Context> context); v8::Handle<v8::Context> context);
const char* Error(); const char* Error();
int TryWrite(uv_buf_t** bufs, size_t* count);
int DoWrite(WriteWrap* w, int DoWrite(WriteWrap* w,
uv_buf_t* bufs, uv_buf_t* bufs,
size_t count, size_t count,

View File

@ -55,7 +55,7 @@ server.onconnection = function(err, client) {
assert.equal(0, client.writeQueueSize); assert.equal(0, client.writeQueueSize);
var req = {}; var req = { async: false };
var err = client.writeBuffer(req, buffer); var err = client.writeBuffer(req, buffer);
assert.equal(err, 0); assert.equal(err, 0);
client.pendingWrites.push(req); client.pendingWrites.push(req);
@ -64,7 +64,12 @@ server.onconnection = function(err, client) {
// 11 bytes should flush // 11 bytes should flush
assert.equal(0, client.writeQueueSize); assert.equal(0, client.writeQueueSize);
req.oncomplete = function(status, client_, req_) { if (req.async && client.writeQueueSize != 0)
req.oncomplete = done;
else
process.nextTick(done.bind(null, 0, client, req));
function done(status, client_, req_) {
assert.equal(req, client.pendingWrites.shift()); assert.equal(req, client.pendingWrites.shift());
// Check parameters. // Check parameters.