net: implement ._writev for .cork/uncork() support
Add Writev method to StreamWrap class for writing mixed array of strings and buffers. Expose this method for TCP class.
This commit is contained in:
parent
21ed8df696
commit
60ed2c5434
69
lib/net.js
69
lib/net.js
@ -130,6 +130,10 @@ function initSocketHandle(self) {
|
||||
if (self._handle) {
|
||||
self._handle.owner = self;
|
||||
self._handle.onread = onread;
|
||||
|
||||
// If handle doesn't support writev - neither do we
|
||||
if (!self._handle.writev)
|
||||
self._writev = null;
|
||||
}
|
||||
}
|
||||
|
||||
@ -597,7 +601,7 @@ Socket.prototype.write = function(chunk, encoding, cb) {
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype._write = function(data, encoding, cb) {
|
||||
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
|
||||
// If we are still connecting, then buffer this for later.
|
||||
// The Writable logic will buffer up any more writes while
|
||||
// waiting for this one to be done.
|
||||
@ -605,7 +609,7 @@ Socket.prototype._write = function(data, encoding, cb) {
|
||||
this._pendingData = data;
|
||||
this._pendingEncoding = encoding;
|
||||
this.once('connect', function() {
|
||||
this._write(data, encoding, cb);
|
||||
this._writeGeneric(writev, data, encoding, cb);
|
||||
});
|
||||
return;
|
||||
}
|
||||
@ -619,8 +623,31 @@ Socket.prototype._write = function(data, encoding, cb) {
|
||||
return false;
|
||||
}
|
||||
|
||||
var enc = Buffer.isBuffer(data) ? 'buffer' : encoding;
|
||||
var writeReq = createWriteReq(this._handle, data, enc);
|
||||
var writeReq;
|
||||
if (writev) {
|
||||
var chunks = new Array(data.length << 1);
|
||||
for (var i = 0; i < data.length; i++) {
|
||||
var entry = data[i];
|
||||
var enc = entry.encoding;
|
||||
var chunk = entry.chunk;
|
||||
var code = getEncodingId(enc);
|
||||
|
||||
// Buffer encoding, translate argument to buffer
|
||||
if (code === 0 && !Buffer.isBuffer(chunk))
|
||||
chunk = new Buffer(chunk, enc);
|
||||
|
||||
chunks[i * 2] = chunk;
|
||||
chunks[i * 2 + 1] = code;
|
||||
}
|
||||
var writeReq = this._handle.writev(chunks);
|
||||
|
||||
// Retain chunks
|
||||
if (writeReq)
|
||||
writeReq._chunks = chunks;
|
||||
} else {
|
||||
var enc = Buffer.isBuffer(data) ? 'buffer' : encoding;
|
||||
var writeReq = createWriteReq(this._handle, data, enc);
|
||||
}
|
||||
|
||||
if (!writeReq || typeof writeReq !== 'object')
|
||||
return this._destroy(errnoException(process._errno, 'write'), cb);
|
||||
@ -636,6 +663,40 @@ Socket.prototype._write = function(data, encoding, cb) {
|
||||
writeReq.cb = cb;
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype._writev = function(chunks, cb) {
|
||||
this._writeGeneric(true, chunks, '', cb);
|
||||
};
|
||||
|
||||
|
||||
Socket.prototype._write = function(data, encoding, cb) {
|
||||
this._writeGeneric(false, data, encoding, cb);
|
||||
};
|
||||
|
||||
// Important: this should have the same values as in src/stream_wrap.h
|
||||
function getEncodingId(encoding) {
|
||||
switch (encoding) {
|
||||
case 'buffer':
|
||||
return 0;
|
||||
|
||||
case 'utf8':
|
||||
case 'utf-8':
|
||||
return 1;
|
||||
|
||||
case 'ascii':
|
||||
return 2;
|
||||
|
||||
case 'ucs2':
|
||||
case 'ucs-2':
|
||||
case 'utf16le':
|
||||
case 'utf-16le':
|
||||
return 3;
|
||||
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
function createWriteReq(handle, data, encoding) {
|
||||
switch (encoding) {
|
||||
case 'buffer':
|
||||
|
@ -40,6 +40,7 @@ namespace node {
|
||||
|
||||
using v8::AccessorInfo;
|
||||
using v8::Arguments;
|
||||
using v8::Array;
|
||||
using v8::Context;
|
||||
using v8::Exception;
|
||||
using v8::Function;
|
||||
@ -283,6 +284,109 @@ void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
|
||||
}
|
||||
|
||||
|
||||
size_t StreamWrap::WriteBuffer(WriteWrap* req,
|
||||
Handle<Value> val,
|
||||
uv_buf_t* buf) {
|
||||
assert(Buffer::HasInstance(val));
|
||||
|
||||
// Simple non-writev case
|
||||
buf->base = Buffer::Data(val);
|
||||
buf->len = Buffer::Length(val);
|
||||
|
||||
return buf->len;
|
||||
}
|
||||
|
||||
|
||||
template <WriteEncoding encoding>
|
||||
size_t StreamWrap::WriteStringImpl(char* storage,
|
||||
size_t storage_size,
|
||||
Handle<Value> val,
|
||||
uv_buf_t* buf) {
|
||||
assert(val->IsString());
|
||||
Handle<String> string = val.As<String>();
|
||||
|
||||
size_t data_size;
|
||||
switch (encoding) {
|
||||
case kAscii:
|
||||
data_size = string->WriteOneByte(
|
||||
reinterpret_cast<uint8_t*>(storage),
|
||||
0,
|
||||
-1,
|
||||
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
|
||||
break;
|
||||
|
||||
case kUtf8:
|
||||
data_size = string->WriteUtf8(
|
||||
storage,
|
||||
-1,
|
||||
NULL,
|
||||
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
|
||||
break;
|
||||
|
||||
case kUcs2: {
|
||||
int chars_copied = string->Write(
|
||||
reinterpret_cast<uint16_t*>(storage),
|
||||
0,
|
||||
-1,
|
||||
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
|
||||
data_size = chars_copied * sizeof(uint16_t);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
// Unreachable
|
||||
assert(0);
|
||||
}
|
||||
assert(data_size <= storage_size);
|
||||
|
||||
buf->base = storage;
|
||||
buf->len = data_size;
|
||||
|
||||
return data_size;
|
||||
}
|
||||
|
||||
|
||||
template <WriteEncoding encoding>
|
||||
size_t StreamWrap::GetStringSizeImpl(Handle<Value> val) {
|
||||
assert(val->IsString());
|
||||
Handle<String> string = val.As<String>();
|
||||
|
||||
switch (encoding) {
|
||||
case kAscii:
|
||||
return string->Length();
|
||||
break;
|
||||
|
||||
case kUtf8:
|
||||
if (!(string->MayContainNonAscii())) {
|
||||
// If the string has only ascii characters, we know exactly how big
|
||||
// the storage should be.
|
||||
return string->Length();
|
||||
} else if (string->Length() < 65536) {
|
||||
// A single UCS2 codepoint never takes up more than 3 utf8 bytes.
|
||||
// Unless the string is really long we just allocate so much space that
|
||||
// we're certain the string fits in there entirely.
|
||||
// TODO: maybe check handle->write_queue_size instead of string length?
|
||||
return 3 * string->Length();
|
||||
} else {
|
||||
// The string is really long. Compute the allocation size that we
|
||||
// actually need.
|
||||
return string->Utf8Length();
|
||||
}
|
||||
break;
|
||||
|
||||
case kUcs2:
|
||||
return string->Length() * sizeof(uint16_t);
|
||||
break;
|
||||
|
||||
default:
|
||||
// Unreachable.
|
||||
assert(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
|
||||
HandleScope scope(node_isolate);
|
||||
|
||||
@ -290,17 +394,14 @@ Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
|
||||
|
||||
// The first argument is a buffer.
|
||||
assert(args.Length() >= 1 && Buffer::HasInstance(args[0]));
|
||||
Local<Object> buffer_obj = args[0]->ToObject();
|
||||
size_t offset = 0;
|
||||
size_t length = Buffer::Length(buffer_obj);
|
||||
size_t length = Buffer::Length(args[0]);
|
||||
char* storage = new char[sizeof(WriteWrap)];
|
||||
WriteWrap* req_wrap = new (storage) WriteWrap();
|
||||
|
||||
req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
|
||||
req_wrap->object_->SetHiddenValue(buffer_sym, args[0]);
|
||||
|
||||
uv_buf_t buf;
|
||||
buf.base = Buffer::Data(buffer_obj) + offset;
|
||||
buf.len = length;
|
||||
WriteBuffer(req_wrap, args[0], &buf);
|
||||
|
||||
int r = uv_write(&req_wrap->req_,
|
||||
wrap->stream_,
|
||||
@ -344,38 +445,7 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
|
||||
Local<String> string = args[0]->ToString();
|
||||
|
||||
// Compute the size of the storage that the string will be flattened into.
|
||||
size_t storage_size;
|
||||
switch (encoding) {
|
||||
case kAscii:
|
||||
storage_size = string->Length();
|
||||
break;
|
||||
|
||||
case kUtf8:
|
||||
if (!(string->MayContainNonAscii())) {
|
||||
// If the string has only ascii characters, we know exactly how big
|
||||
// the storage should be.
|
||||
storage_size = string->Length();
|
||||
} else if (string->Length() < 65536) {
|
||||
// A single UCS2 codepoint never takes up more than 3 utf8 bytes.
|
||||
// Unless the string is really long we just allocate so much space that
|
||||
// we're certain the string fits in there entirely.
|
||||
// TODO: maybe check handle->write_queue_size instead of string length?
|
||||
storage_size = 3 * string->Length();
|
||||
} else {
|
||||
// The string is really long. Compute the allocation size that we
|
||||
// actually need.
|
||||
storage_size = string->Utf8Length();
|
||||
}
|
||||
break;
|
||||
|
||||
case kUcs2:
|
||||
storage_size = string->Length() * sizeof(uint16_t);
|
||||
break;
|
||||
|
||||
default:
|
||||
// Unreachable.
|
||||
assert(0);
|
||||
}
|
||||
size_t storage_size = GetStringSizeImpl<encoding>(string);
|
||||
|
||||
if (storage_size > INT_MAX) {
|
||||
uv_err_t err;
|
||||
@ -389,35 +459,10 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
|
||||
|
||||
char* data = reinterpret_cast<char*>(ROUND_UP(
|
||||
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
|
||||
size_t data_size;
|
||||
switch (encoding) {
|
||||
case kAscii:
|
||||
data_size = string->WriteOneByte(reinterpret_cast<uint8_t*>(data), 0, -1,
|
||||
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
|
||||
break;
|
||||
|
||||
case kUtf8:
|
||||
data_size = string->WriteUtf8(data, -1, NULL,
|
||||
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
|
||||
break;
|
||||
|
||||
case kUcs2: {
|
||||
int chars_copied = string->Write((uint16_t*) data, 0, -1,
|
||||
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
|
||||
data_size = chars_copied * sizeof(uint16_t);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
// Unreachable
|
||||
assert(0);
|
||||
}
|
||||
|
||||
assert(data_size <= storage_size);
|
||||
|
||||
uv_buf_t buf;
|
||||
buf.base = data;
|
||||
buf.len = data_size;
|
||||
size_t data_size =
|
||||
WriteStringImpl<encoding>(data, storage_size, string, &buf);
|
||||
|
||||
bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
|
||||
((uv_pipe_t*)wrap->stream_)->ipc;
|
||||
@ -478,6 +523,143 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
|
||||
}
|
||||
|
||||
|
||||
Handle<Value> StreamWrap::Writev(const Arguments& args) {
|
||||
HandleScope scope;
|
||||
|
||||
UNWRAP(StreamWrap)
|
||||
|
||||
if (args.Length() < 1)
|
||||
return ThrowTypeError("Not enough arguments");
|
||||
|
||||
if (!args[0]->IsArray())
|
||||
return ThrowTypeError("Argument should be array");
|
||||
|
||||
Handle<Array> chunks = args[0].As<Array>();
|
||||
size_t count = chunks->Length() >> 1;
|
||||
|
||||
uv_buf_t bufs_[16];
|
||||
uv_buf_t* bufs = bufs_;
|
||||
|
||||
if (ARRAY_SIZE(bufs_) < count)
|
||||
bufs = new uv_buf_t[count];
|
||||
|
||||
// Determine storage size first
|
||||
size_t storage_size = 0;
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
Handle<Value> chunk = chunks->Get(i * 2);
|
||||
|
||||
if (Buffer::HasInstance(chunk))
|
||||
continue;
|
||||
// Buffer chunk, no additional storage required
|
||||
|
||||
// String chunk
|
||||
Handle<Value> string = chunk->ToString();
|
||||
switch (static_cast<WriteEncoding>(chunks->Get(i * 2 + 1)->Int32Value())) {
|
||||
case kAscii:
|
||||
storage_size += GetStringSizeImpl<kAscii>(string);
|
||||
break;
|
||||
|
||||
case kUtf8:
|
||||
storage_size += GetStringSizeImpl<kUtf8>(string);
|
||||
break;
|
||||
|
||||
case kUcs2:
|
||||
storage_size += GetStringSizeImpl<kUcs2>(string);
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0); // Unreachable
|
||||
}
|
||||
storage_size += 15;
|
||||
}
|
||||
|
||||
if (storage_size > INT_MAX) {
|
||||
uv_err_t err;
|
||||
err.code = UV_ENOBUFS;
|
||||
SetErrno(err);
|
||||
return scope.Close(v8::Null(node_isolate));
|
||||
}
|
||||
|
||||
storage_size += sizeof(WriteWrap);
|
||||
char* storage = new char[storage_size];
|
||||
WriteWrap* req_wrap = new (storage) WriteWrap();
|
||||
|
||||
uint32_t bytes = 0;
|
||||
size_t offset = sizeof(WriteWrap);
|
||||
for (size_t i = 0; i < count; i++) {
|
||||
Handle<Value> chunk = chunks->Get(i * 2);
|
||||
|
||||
// Write buffer
|
||||
if (Buffer::HasInstance(chunk)) {
|
||||
bytes += WriteBuffer(req_wrap, chunk, &bufs[i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Write string
|
||||
offset = ROUND_UP(offset, 16);
|
||||
assert(offset < storage_size);
|
||||
char* str_storage = storage + offset;
|
||||
size_t str_size = storage_size - offset;
|
||||
|
||||
Handle<String> string = chunk->ToString();
|
||||
switch (static_cast<WriteEncoding>(chunks->Get(i * 2 + 1)->Int32Value())) {
|
||||
case kAscii:
|
||||
str_size = WriteStringImpl<kAscii>(str_storage,
|
||||
str_size,
|
||||
string,
|
||||
&bufs[i]);
|
||||
break;
|
||||
case kUtf8:
|
||||
str_size = WriteStringImpl<kUtf8>(str_storage,
|
||||
str_size,
|
||||
string,
|
||||
&bufs[i]);
|
||||
break;
|
||||
case kUcs2:
|
||||
str_size = WriteStringImpl<kUcs2>(str_storage,
|
||||
str_size,
|
||||
string,
|
||||
&bufs[i]);
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
offset += str_size;
|
||||
bytes += str_size;
|
||||
}
|
||||
|
||||
int r = uv_write(&req_wrap->req_,
|
||||
wrap->stream_,
|
||||
bufs,
|
||||
count,
|
||||
StreamWrap::AfterWrite);
|
||||
|
||||
// Deallocate space
|
||||
if (bufs != bufs_)
|
||||
delete[] bufs;
|
||||
|
||||
req_wrap->Dispatched();
|
||||
req_wrap->object_->Set(bytes_sym, Number::New(bytes));
|
||||
|
||||
wrap->UpdateWriteQueueSize();
|
||||
|
||||
if (r) {
|
||||
SetErrno(uv_last_error(uv_default_loop()));
|
||||
req_wrap->~WriteWrap();
|
||||
delete[] storage;
|
||||
return scope.Close(v8::Null(node_isolate));
|
||||
} else {
|
||||
if (wrap->stream_->type == UV_TCP) {
|
||||
NODE_COUNT_NET_BYTES_SENT(bytes);
|
||||
} else if (wrap->stream_->type == UV_NAMED_PIPE) {
|
||||
NODE_COUNT_PIPE_BYTES_SENT(bytes);
|
||||
}
|
||||
|
||||
return scope.Close(req_wrap->object_);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Handle<Value> StreamWrap::WriteAsciiString(const Arguments& args) {
|
||||
return WriteStringImpl<kAscii>(args);
|
||||
}
|
||||
|
@ -29,10 +29,15 @@
|
||||
namespace node {
|
||||
|
||||
|
||||
// Forward declaration
|
||||
class WriteWrap;
|
||||
|
||||
|
||||
// Important: this should have the same values as in lib/net.js
|
||||
enum WriteEncoding {
|
||||
kAscii,
|
||||
kUtf8,
|
||||
kUcs2
|
||||
kUtf8 = 0x1,
|
||||
kAscii = 0x2,
|
||||
kUcs2 = 0x3
|
||||
};
|
||||
|
||||
|
||||
@ -50,12 +55,24 @@ class StreamWrap : public HandleWrap {
|
||||
static v8::Handle<v8::Value> ReadStop(const v8::Arguments& args);
|
||||
static v8::Handle<v8::Value> Shutdown(const v8::Arguments& args);
|
||||
|
||||
static v8::Handle<v8::Value> Writev(const v8::Arguments& args);
|
||||
static v8::Handle<v8::Value> WriteBuffer(const v8::Arguments& args);
|
||||
static v8::Handle<v8::Value> WriteAsciiString(const v8::Arguments& args);
|
||||
static v8::Handle<v8::Value> WriteUtf8String(const v8::Arguments& args);
|
||||
static v8::Handle<v8::Value> WriteUcs2String(const v8::Arguments& args);
|
||||
|
||||
protected:
|
||||
static size_t WriteBuffer(WriteWrap* req,
|
||||
v8::Handle<v8::Value> val,
|
||||
uv_buf_t* buf);
|
||||
template <enum WriteEncoding encoding>
|
||||
static size_t WriteStringImpl(char* storage,
|
||||
size_t storage_size,
|
||||
v8::Handle<v8::Value> val,
|
||||
uv_buf_t* buf);
|
||||
template <enum WriteEncoding encoding>
|
||||
static size_t GetStringSizeImpl(v8::Handle<v8::Value> val);
|
||||
|
||||
StreamWrap(v8::Handle<v8::Object> object, uv_stream_t* stream);
|
||||
virtual void SetHandle(uv_handle_t* h);
|
||||
void StateChange() { }
|
||||
|
@ -103,6 +103,7 @@ void TCPWrap::Initialize(Handle<Object> target) {
|
||||
NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString);
|
||||
NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String);
|
||||
NODE_SET_PROTOTYPE_METHOD(t, "writeUcs2String", StreamWrap::WriteUcs2String);
|
||||
NODE_SET_PROTOTYPE_METHOD(t, "writev", StreamWrap::Writev);
|
||||
|
||||
NODE_SET_PROTOTYPE_METHOD(t, "open", Open);
|
||||
NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
|
||||
|
Loading…
x
Reference in New Issue
Block a user