src: improve StreamBase write throughput

Improve performance by transferring information about write status
to JS through an `AliasedBuffer`, rather than object properties
set from C++.

PR-URL: https://github.com/nodejs/node/pull/23843
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
This commit is contained in:
Anna Henningsen 2018-10-23 08:23:02 +02:00 committed by Michaël Zasso
parent 0a23538e49
commit f01518edfd
No known key found for this signature in database
GPG Key ID: 770F7A9A5AE15600
11 changed files with 41 additions and 37 deletions

View File

@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT; const PORT = common.PORT;
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16], len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}); });

View File

@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT; const PORT = common.PORT;
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16], len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}); });

View File

@ -5,7 +5,7 @@ const common = require('../common.js');
const PORT = common.PORT; const PORT = common.PORT;
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16], len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5] dur: [5]
}); });

View File

@ -5,7 +5,7 @@ const common = require('../common.js');
const { PassThrough } = require('stream'); const { PassThrough } = require('stream');
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16], len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}, { }, {

View File

@ -26,6 +26,7 @@ const {
WriteWrap, WriteWrap,
kReadBytesOrError, kReadBytesOrError,
kArrayBufferOffset, kArrayBufferOffset,
kLastWriteWasAsync,
streamBaseState streamBaseState
} = internalBinding('stream_wrap'); } = internalBinding('stream_wrap');
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
@ -716,10 +717,10 @@ function setupChannel(target, channel) {
} }
var req = new WriteWrap(); var req = new WriteWrap();
req.async = false;
var string = JSON.stringify(message) + '\n'; var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle); var err = channel.writeUtf8String(req, string, handle);
var wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
if (err === 0) { if (err === 0) {
if (handle) { if (handle) {
@ -729,7 +730,7 @@ function setupChannel(target, channel) {
obj.postSend(message, handle, options, callback, target); obj.postSend(message, handle, options, callback, target);
} }
if (req.async) { if (wasAsyncWrite) {
req.oncomplete = function() { req.oncomplete = function() {
control.unref(); control.unref();
if (typeof callback === 'function') if (typeof callback === 'function')

View File

@ -6,6 +6,8 @@ const {
WriteWrap, WriteWrap,
kReadBytesOrError, kReadBytesOrError,
kArrayBufferOffset, kArrayBufferOffset,
kBytesWritten,
kLastWriteWasAsync,
streamBaseState streamBaseState
} = internalBinding('stream_wrap'); } = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv'); const { UV_EOF } = internalBinding('uv');
@ -20,7 +22,12 @@ function handleWriteReq(req, data, encoding) {
switch (encoding) { switch (encoding) {
case 'buffer': case 'buffer':
return handle.writeBuffer(req, data); {
const ret = handle.writeBuffer(req, data);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = data;
return ret;
}
case 'latin1': case 'latin1':
case 'binary': case 'binary':
return handle.writeLatin1String(req, data); return handle.writeLatin1String(req, data);
@ -35,7 +42,13 @@ function handleWriteReq(req, data, encoding) {
case 'utf-16le': case 'utf-16le':
return handle.writeUcs2String(req, data); return handle.writeUcs2String(req, data);
default: default:
return handle.writeBuffer(req, Buffer.from(data, encoding)); {
const buffer = Buffer.from(data, encoding);
const ret = handle.writeBuffer(req, buffer);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = buffer;
return ret;
}
} }
} }
@ -45,6 +58,8 @@ function createWriteWrap(handle, oncomplete) {
req.handle = handle; req.handle = handle;
req.oncomplete = oncomplete; req.oncomplete = oncomplete;
req.async = false; req.async = false;
req.bytes = 0;
req.buffer = null;
return req; return req;
} }
@ -80,6 +95,9 @@ function writeGeneric(self, req, data, encoding, cb) {
} }
function afterWriteDispatched(self, req, err, cb) { function afterWriteDispatched(self, req, err, cb) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];
if (err !== 0) if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb); return self.destroy(errnoException(err, 'write', req.error), cb);

View File

@ -128,10 +128,8 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
V(address_string, "address") \ V(address_string, "address") \
V(aliases_string, "aliases") \ V(aliases_string, "aliases") \
V(args_string, "args") \ V(args_string, "args") \
V(async, "async") \
V(async_ids_stack_string, "async_ids_stack") \ V(async_ids_stack_string, "async_ids_stack") \
V(buffer_string, "buffer") \ V(buffer_string, "buffer") \
V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \ V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \ V(bytes_read_string, "bytesRead") \
V(bytes_written_string, "bytesWritten") \ V(bytes_written_string, "bytesWritten") \

View File

@ -18,13 +18,11 @@ namespace node {
using v8::Array; using v8::Array;
using v8::ArrayBuffer; using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context; using v8::Context;
using v8::FunctionCallbackInfo; using v8::FunctionCallbackInfo;
using v8::HandleScope; using v8::HandleScope;
using v8::Integer; using v8::Integer;
using v8::Local; using v8::Local;
using v8::Number;
using v8::Object; using v8::Object;
using v8::String; using v8::String;
using v8::Value; using v8::Value;
@ -56,18 +54,9 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
return Shutdown(req_wrap_obj); return Shutdown(req_wrap_obj);
} }
inline void SetWriteResultPropertiesOnWrapObject( void StreamBase::SetWriteResult(const StreamWriteResult& res) {
Environment* env, env_->stream_base_state()[kBytesWritten] = res.bytes;
Local<Object> req_wrap_obj, env_->stream_base_state()[kLastWriteWasAsync] = res.async;
const StreamWriteResult& res) {
req_wrap_obj->Set(
env->context(),
env->bytes_string(),
Number::New(env->isolate(), res.bytes)).FromJust();
req_wrap_obj->Set(
env->context(),
env->async(),
Boolean::New(env->isolate(), res.async)).FromJust();
} }
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) { int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
@ -160,7 +149,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
} }
StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); SetWriteResult(res);
if (res.wrap != nullptr && storage_size > 0) { if (res.wrap != nullptr && storage_size > 0) {
res.wrap->SetAllocatedStorage(storage.release(), storage_size); res.wrap->SetAllocatedStorage(storage.release(), storage_size);
} }
@ -185,10 +174,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
buf.len = Buffer::Length(args[1]); buf.len = Buffer::Length(args[1]);
StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj); StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
SetWriteResult(res);
if (res.async)
req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
return res.err; return res.err;
} }
@ -247,12 +233,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
// Immediate failure or success // Immediate failure or success
if (err != 0 || count == 0) { if (err != 0 || count == 0) {
req_wrap_obj->Set(env->context(), env->async(), False(env->isolate())) SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
.FromJust();
req_wrap_obj->Set(env->context(),
env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), data_size))
.FromJust();
return err; return err;
} }
@ -295,7 +276,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
res.bytes += synchronously_written; res.bytes += synchronously_written;
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); SetWriteResult(res);
if (res.wrap != nullptr) { if (res.wrap != nullptr) {
res.wrap->SetAllocatedStorage(data.release(), data_size); res.wrap->SetAllocatedStorage(data.release(), data_size);
} }

View File

@ -332,6 +332,8 @@ class StreamBase : public StreamResource {
enum StreamBaseStateFields { enum StreamBaseStateFields {
kReadBytesOrError, kReadBytesOrError,
kArrayBufferOffset, kArrayBufferOffset,
kBytesWritten,
kLastWriteWasAsync,
kNumStreamBaseStateFields kNumStreamBaseStateFields
}; };
@ -339,6 +341,8 @@ class StreamBase : public StreamResource {
Environment* env_; Environment* env_;
EmitToJSStreamListener default_listener_; EmitToJSStreamListener default_listener_;
void SetWriteResult(const StreamWriteResult& res);
friend class WriteWrap; friend class WriteWrap;
friend class ShutdownWrap; friend class ShutdownWrap;
friend class Environment; // For kNumStreamBaseStateFields. friend class Environment; // For kNumStreamBaseStateFields.

View File

@ -83,6 +83,8 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
NODE_DEFINE_CONSTANT(target, kReadBytesOrError); NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
NODE_DEFINE_CONSTANT(target, kArrayBufferOffset); NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
NODE_DEFINE_CONSTANT(target, kBytesWritten);
NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"), target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
env->stream_base_state().GetJSArray()).FromJust(); env->stream_base_state().GetJSArray()).FromJust();
} }

View File

@ -239,7 +239,7 @@ if (common.hasCrypto) { // eslint-disable-line node-core/crypto-check
const err = handle.writeLatin1String(wreq, 'hi'.repeat(100000)); const err = handle.writeLatin1String(wreq, 'hi'.repeat(100000));
if (err) if (err)
throw new Error(`write failed: ${getSystemErrorName(err)}`); throw new Error(`write failed: ${getSystemErrorName(err)}`);
if (!wreq.async) { if (!stream_wrap.streamBaseState[stream_wrap.kLastWriteWasAsync]) {
testUninitialized(wreq, 'WriteWrap'); testUninitialized(wreq, 'WriteWrap');
// Synchronous finish. Write more data until we hit an // Synchronous finish. Write more data until we hit an
// asynchronous write. // asynchronous write.