src: improve StreamBase read throughput

Improve performance by providing JS with the raw ingridients
for the read data, i.e. an `ArrayBuffer` + offset + length
fields, instead of creating `Buffer` instances in C++ land.

PR-URL: https://github.com/nodejs/node/pull/23797
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2018-10-21 08:34:00 +02:00
parent bb79e768e5
commit 1365f657b5
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
16 changed files with 116 additions and 45 deletions

View File

@ -46,15 +46,15 @@ function main({ dur, len, type }) {
process.exit(0); process.exit(0);
}, dur * 1000); }, dur * 1000);
clientHandle.onread = function(nread, buffer) { clientHandle.onread = function(buffer) {
// we're not expecting to ever get an EOF from the client. // we're not expecting to ever get an EOF from the client.
// just lots of data forever. // just lots of data forever.
if (nread < 0) if (!buffer)
fail(nread, 'read'); fail('read');
// don't slice the buffer. the point of this is to isolate, not // don't slice the buffer. the point of this is to isolate, not
// simulate real traffic. // simulate real traffic.
bytes += buffer.length; bytes += buffer.byteLength;
}; };
clientHandle.readStart(); clientHandle.readStart();

View File

@ -43,15 +43,15 @@ function main({ dur, len, type }) {
if (err) if (err)
fail(err, 'connect'); fail(err, 'connect');
clientHandle.onread = function(nread, buffer) { clientHandle.onread = function(buffer) {
// we're not expecting to ever get an EOF from the client. // we're not expecting to ever get an EOF from the client.
// just lots of data forever. // just lots of data forever.
if (nread < 0) if (!buffer)
fail(nread, 'read'); fail('read');
const writeReq = new WriteWrap(); const writeReq = new WriteWrap();
writeReq.async = false; writeReq.async = false;
err = clientHandle.writeBuffer(writeReq, buffer); err = clientHandle.writeBuffer(writeReq, Buffer.from(buffer));
if (err) if (err)
fail(err, 'write'); fail(err, 'write');
@ -89,11 +89,11 @@ function main({ dur, len, type }) {
if (err) if (err)
fail(err, 'connect'); fail(err, 'connect');
clientHandle.onread = function(nread, buffer) { clientHandle.onread = function(buffer) {
if (nread < 0) if (!buffer)
fail(nread, 'read'); fail('read');
bytes += buffer.length; bytes += buffer.byteLength;
}; };
connectReq.oncomplete = function(err) { connectReq.oncomplete = function(err) {

View File

@ -109,15 +109,15 @@ function main({ dur, len, type }) {
connectReq.oncomplete = function() { connectReq.oncomplete = function() {
var bytes = 0; var bytes = 0;
clientHandle.onread = function(nread, buffer) { clientHandle.onread = function(buffer) {
// we're not expecting to ever get an EOF from the client. // we're not expecting to ever get an EOF from the client.
// just lots of data forever. // just lots of data forever.
if (nread < 0) if (!buffer)
fail(nread, 'read'); fail('read');
// don't slice the buffer. the point of this is to isolate, not // don't slice the buffer. the point of this is to isolate, not
// simulate real traffic. // simulate real traffic.
bytes += buffer.length; bytes += buffer.byteLength;
}; };
clientHandle.readStart(); clientHandle.readStart();

View File

@ -22,7 +22,12 @@ const util = require('util');
const assert = require('assert'); const assert = require('assert');
const { Process } = internalBinding('process_wrap'); const { Process } = internalBinding('process_wrap');
const { WriteWrap } = internalBinding('stream_wrap'); const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
streamBaseState
} = internalBinding('stream_wrap');
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
const { TCP } = internalBinding('tcp_wrap'); const { TCP } = internalBinding('tcp_wrap');
const { TTY } = internalBinding('tty_wrap'); const { TTY } = internalBinding('tty_wrap');
@ -486,11 +491,13 @@ function setupChannel(target, channel) {
var pendingHandle = null; var pendingHandle = null;
channel.buffering = false; channel.buffering = false;
channel.pendingHandle = null; channel.pendingHandle = null;
channel.onread = function(nread, pool) { channel.onread = function(arrayBuffer) {
const recvHandle = channel.pendingHandle; const recvHandle = channel.pendingHandle;
channel.pendingHandle = null; channel.pendingHandle = null;
// TODO(bnoordhuis) Check that nread > 0. if (arrayBuffer) {
if (pool) { const nread = streamBaseState[kReadBytesOrError];
const offset = streamBaseState[kArrayBufferOffset];
const pool = new Uint8Array(arrayBuffer, offset, nread);
if (recvHandle) if (recvHandle)
pendingHandle = recvHandle; pendingHandle = recvHandle;

View File

@ -120,7 +120,11 @@ const { isArrayBufferView } = require('internal/util/types');
const { FileHandle } = process.binding('fs'); const { FileHandle } = process.binding('fs');
const binding = internalBinding('http2'); const binding = internalBinding('http2');
const { ShutdownWrap } = internalBinding('stream_wrap'); const {
ShutdownWrap,
kReadBytesOrError,
streamBaseState
} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv'); const { UV_EOF } = internalBinding('uv');
const { StreamPipe } = internalBinding('stream_pipe'); const { StreamPipe } = internalBinding('stream_pipe');
@ -2043,7 +2047,8 @@ function onFileUnpipe() {
// This is only called once the pipe has returned back control, so // This is only called once the pipe has returned back control, so
// it only has to handle errors and End-of-File. // it only has to handle errors and End-of-File.
function onPipedFileHandleRead(err) { function onPipedFileHandleRead() {
const err = streamBaseState[kReadBytesOrError];
if (err < 0 && err !== UV_EOF) { if (err < 0 && err !== UV_EOF) {
this.stream.close(NGHTTP2_INTERNAL_ERROR); this.stream.close(NGHTTP2_INTERNAL_ERROR);
} }

View File

@ -1,7 +1,13 @@
'use strict'; 'use strict';
const { Buffer } = require('buffer'); const { Buffer } = require('buffer');
const { WriteWrap } = internalBinding('stream_wrap'); const { FastBuffer } = require('internal/buffer');
const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
streamBaseState
} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv'); const { UV_EOF } = internalBinding('uv');
const { errnoException } = require('internal/errors'); const { errnoException } = require('internal/errors');
const { owner_symbol } = require('internal/async_hooks').symbols; const { owner_symbol } = require('internal/async_hooks').symbols;
@ -84,13 +90,17 @@ function afterWriteDispatched(self, req, err, cb) {
} }
} }
function onStreamRead(nread, buf) { function onStreamRead(arrayBuffer) {
const nread = streamBaseState[kReadBytesOrError];
const handle = this; const handle = this;
const stream = this[owner_symbol]; const stream = this[owner_symbol];
stream[kUpdateTimer](); stream[kUpdateTimer]();
if (nread > 0 && !stream.destroyed) { if (nread > 0 && !stream.destroyed) {
const offset = streamBaseState[kArrayBufferOffset];
const buf = new FastBuffer(arrayBuffer, offset, nread);
if (!stream.push(buf)) { if (!stream.push(buf)) {
handle.reading = false; handle.reading = false;
if (!stream.destroyed) { if (!stream.destroyed) {

View File

@ -446,6 +446,11 @@ Environment::trace_category_state() {
return trace_category_state_; return trace_category_state_;
} }
inline AliasedBuffer<int32_t, v8::Int32Array>&
Environment::stream_base_state() {
return stream_base_state_;
}
inline uint32_t Environment::get_next_module_id() { inline uint32_t Environment::get_next_module_id() {
return module_id_counter_++; return module_id_counter_++;
} }

View File

@ -158,6 +158,7 @@ Environment::Environment(IsolateData* isolate_data,
makecallback_cntr_(0), makecallback_cntr_(0),
should_abort_on_uncaught_toggle_(isolate_, 1), should_abort_on_uncaught_toggle_(isolate_, 1),
trace_category_state_(isolate_, kTraceCategoryCount), trace_category_state_(isolate_, kTraceCategoryCount),
stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields),
http_parser_buffer_(nullptr), http_parser_buffer_(nullptr),
fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2), fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2),
fs_stats_field_bigint_array_(isolate_, kFsStatsFieldsLength * 2), fs_stats_field_bigint_array_(isolate_, kFsStatsFieldsLength * 2),

View File

@ -668,6 +668,7 @@ class Environment {
should_abort_on_uncaught_toggle(); should_abort_on_uncaught_toggle();
inline AliasedBuffer<uint8_t, v8::Uint8Array>& trace_category_state(); inline AliasedBuffer<uint8_t, v8::Uint8Array>& trace_category_state();
inline AliasedBuffer<int32_t, v8::Int32Array>& stream_base_state();
// The necessary API for async_hooks. // The necessary API for async_hooks.
inline double new_async_id(); inline double new_async_id();
@ -951,6 +952,8 @@ class Environment {
AliasedBuffer<uint8_t, v8::Uint8Array> trace_category_state_; AliasedBuffer<uint8_t, v8::Uint8Array> trace_category_state_;
std::unique_ptr<TrackingTraceStateObserver> trace_state_observer_; std::unique_ptr<TrackingTraceStateObserver> trace_state_observer_;
AliasedBuffer<int32_t, v8::Int32Array> stream_base_state_;
std::unique_ptr<performance::performance_state> performance_state_; std::unique_ptr<performance::performance_state> performance_state_;
std::unordered_map<std::string, uint64_t> performance_marks_; std::unordered_map<std::string, uint64_t> performance_marks_;

View File

@ -1256,10 +1256,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
CHECK_LE(offset, session->stream_buf_.len); CHECK_LE(offset, session->stream_buf_.len);
CHECK_LE(offset + buf.len, session->stream_buf_.len); CHECK_LE(offset + buf.len, session->stream_buf_.len);
Local<Object> buffer = stream->CallJSOnreadMethod(nread, session->stream_buf_ab_, offset);
Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked();
stream->CallJSOnreadMethod(nread, buffer);
} }

View File

@ -17,6 +17,7 @@
namespace node { namespace node {
using v8::Array; using v8::Array;
using v8::ArrayBuffer;
using v8::Boolean; using v8::Boolean;
using v8::Context; using v8::Context;
using v8::FunctionCallbackInfo; using v8::FunctionCallbackInfo;
@ -303,16 +304,28 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
} }
void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) { void StreamBase::CallJSOnreadMethod(ssize_t nread,
Local<ArrayBuffer> ab,
size_t offset) {
Environment* env = env_; Environment* env = env_;
Local<Value> argv[] = { #ifdef DEBUG
Integer::New(env->isolate(), nread), CHECK_EQ(static_cast<int32_t>(nread), nread);
buf CHECK_EQ(static_cast<int32_t>(offset), offset);
};
if (argv[1].IsEmpty()) if (ab.IsEmpty()) {
argv[1] = Undefined(env->isolate()); CHECK_EQ(offset, 0);
CHECK_LE(nread, 0);
} else {
CHECK_GE(nread, 0);
}
#endif
env->stream_base_state()[kReadBytesOrError] = nread;
env->stream_base_state()[kArrayBufferOffset] = offset;
Local<Value> argv[] = {
ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
};
AsyncWrap* wrap = GetAsyncWrap(); AsyncWrap* wrap = GetAsyncWrap();
CHECK_NOT_NULL(wrap); CHECK_NOT_NULL(wrap);
@ -366,14 +379,18 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
if (nread <= 0) { if (nread <= 0) {
free(buf.base); free(buf.base);
if (nread < 0) if (nread < 0)
stream->CallJSOnreadMethod(nread, Local<Object>()); stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
return; return;
} }
CHECK_LE(static_cast<size_t>(nread), buf.len); CHECK_LE(static_cast<size_t>(nread), buf.len);
char* base = Realloc(buf.base, nread); char* base = Realloc(buf.base, nread);
Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked(); Local<ArrayBuffer> obj = ArrayBuffer::New(
env->isolate(),
base,
nread,
v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8.
stream->CallJSOnreadMethod(nread, obj); stream->CallJSOnreadMethod(nread, obj);
} }

View File

@ -264,7 +264,9 @@ class StreamBase : public StreamResource {
virtual bool IsIPCPipe(); virtual bool IsIPCPipe();
virtual int GetFD(); virtual int GetFD();
void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf); void CallJSOnreadMethod(ssize_t nread,
v8::Local<v8::ArrayBuffer> ab,
size_t offset = 0);
// This is named `stream_env` to avoid name clashes, because a lot of // This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s. // subclasses are also `BaseObject`s.
@ -326,12 +328,20 @@ class StreamBase : public StreamResource {
const v8::FunctionCallbackInfo<v8::Value>& args)> const v8::FunctionCallbackInfo<v8::Value>& args)>
static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args); static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
// Internal, used only in StreamBase methods + env.cc.
enum StreamBaseStateFields {
kReadBytesOrError,
kArrayBufferOffset,
kNumStreamBaseStateFields
};
private: private:
Environment* env_; Environment* env_;
EmitToJSStreamListener default_listener_; EmitToJSStreamListener default_listener_;
friend class WriteWrap; friend class WriteWrap;
friend class ShutdownWrap; friend class ShutdownWrap;
friend class Environment; // For kNumStreamBaseStateFields.
}; };

View File

@ -80,6 +80,11 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
target->Set(writeWrapString, target->Set(writeWrapString,
ww->GetFunction(env->context()).ToLocalChecked()); ww->GetFunction(env->context()).ToLocalChecked());
env->set_write_wrap_template(ww->InstanceTemplate()); env->set_write_wrap_template(ww->InstanceTemplate());
NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
env->stream_base_state().GetJSArray()).FromJust();
} }

View File

@ -6,11 +6,15 @@ const net = require('net');
const { internalBinding } = require('internal/test/binding'); const { internalBinding } = require('internal/test/binding');
const { UV_EOF } = internalBinding('uv'); const { UV_EOF } = internalBinding('uv');
const { streamBaseState, kReadBytesOrError } = internalBinding('stream_wrap');
const s = new net.Socket({ const s = new net.Socket({
handle: { handle: {
readStart: function() { readStart: function() {
setImmediate(() => this.onread(UV_EOF, null)); setImmediate(() => {
streamBaseState[kReadBytesOrError] = UV_EOF;
this.onread();
});
}, },
close: (cb) => setImmediate(cb) close: (cb) => setImmediate(cb)
}, },

View File

@ -44,11 +44,10 @@ p.onexit = function(exitCode, signal) {
processExited = true; processExited = true;
}; };
pipe.onread = function(err, b, off, len) { pipe.onread = function(arrayBuffer) {
assert.ok(processExited); assert.ok(processExited);
if (b) { if (arrayBuffer) {
gotPipeData = true; gotPipeData = true;
console.log('read %d', len);
} else { } else {
gotPipeEOF = true; gotPipeEOF = true;
pipe.close(); pipe.close();

View File

@ -5,7 +5,12 @@ const assert = require('assert');
const { internalBinding } = require('internal/test/binding'); const { internalBinding } = require('internal/test/binding');
const { TCP, constants: TCPConstants } = internalBinding('tcp_wrap'); const { TCP, constants: TCPConstants } = internalBinding('tcp_wrap');
const { WriteWrap } = internalBinding('stream_wrap'); const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
streamBaseState
} = internalBinding('stream_wrap');
const server = new TCP(TCPConstants.SOCKET); const server = new TCP(TCPConstants.SOCKET);
@ -30,8 +35,11 @@ server.onconnection = (err, client) => {
client.readStart(); client.readStart();
client.pendingWrites = []; client.pendingWrites = [];
client.onread = common.mustCall((err, buffer) => { client.onread = common.mustCall((arrayBuffer) => {
if (buffer) { if (arrayBuffer) {
const offset = streamBaseState[kArrayBufferOffset];
const nread = streamBaseState[kReadBytesOrError];
const buffer = Buffer.from(arrayBuffer, offset, nread);
assert.ok(buffer.length > 0); assert.ok(buffer.length > 0);
assert.strictEqual(client.writeQueueSize, 0); assert.strictEqual(client.writeQueueSize, 0);