http2: refactor read mechanism

Refactor the read mechanism to completely avoid copying.

Instead of copying individual `DATA` frame contents into buffers,
create `ArrayBuffer` instances for all socket reads and emit
slices of those `ArrayBuffer`s to JS.

PR-URL: https://github.com/nodejs/node/pull/18030
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Anna Henningsen 2018-01-08 03:50:51 +01:00
parent da3078835a
commit 0625627d82
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
7 changed files with 174 additions and 127 deletions

View File

@ -279,7 +279,7 @@ function submitRstStream(code) {
// point, close them. If there is an open fd for file send, close that also.
// At this point the underlying node::http2:Http2Stream handle is no
// longer usable so destroy it also.
function onStreamClose(code, hasData) {
function onStreamClose(code) {
const stream = this[kOwner];
if (stream.destroyed)
return;
@ -287,8 +287,7 @@ function onStreamClose(code, hasData) {
const state = stream[kState];
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: closed with code ${code}` +
` [has data? ${hasData}]`);
`${sessionName(stream[kSession][kType])}]: closed with code ${code}`);
if (!stream.closed) {
// Clear timeout and remove timeout listeners
@ -306,13 +305,14 @@ function onStreamClose(code, hasData) {
if (state.fd !== undefined)
tryClose(state.fd);
stream[kMaybeDestroy](null, code, hasData);
stream.push(null);
stream[kMaybeDestroy](null, code);
}
// Receives a chunk of data for a given stream and forwards it on
// to the Http2Stream Duplex for processing.
function onStreamRead(nread, buf, handle) {
const stream = handle[kOwner];
function onStreamRead(nread, buf) {
const stream = this[kOwner];
if (nread >= 0 && !stream.destroyed) {
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: receiving data chunk ` +
@ -320,7 +320,7 @@ function onStreamRead(nread, buf, handle) {
stream[kUpdateTimer]();
if (!stream.push(buf)) {
if (!stream.destroyed) // we have to check a second time
handle.readStop();
this.readStop();
}
return;
}
@ -1431,13 +1431,8 @@ function streamOnResume() {
}
function streamOnPause() {
// if (!this.destroyed && !this.pending)
// this[kHandle].readStop();
}
function handleFlushData(self) {
if (!this.destroyed && !this.pending)
this[kHandle].flushData();
this[kHandle].readStop();
}
// If the writable side of the Http2Stream is still open, emit the
@ -1686,11 +1681,10 @@ class Http2Stream extends Duplex {
this.push(null);
return;
}
const flushfn = handleFlushData.bind(this);
if (!this.pending) {
flushfn();
streamOnResume.call(this);
} else {
this.once('ready', flushfn);
this.once('ready', streamOnResume);
}
}
@ -1831,10 +1825,10 @@ class Http2Stream extends Duplex {
// The Http2Stream can be destroyed if it has closed and if the readable
// side has received the final chunk.
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR, hasData = true) {
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
if (error == null) {
if (code === NGHTTP2_NO_ERROR &&
((!this._readableState.ended && hasData) ||
(!this._readableState.ended ||
!this._writableState.ended ||
this._writableState.pendingcb > 0 ||
!this.closed)) {

View File

@ -9,6 +9,7 @@
namespace node {
using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context;
using v8::Float64Array;
@ -978,7 +979,6 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
// Intentionally ignore the callback if the stream does not exist or has
// already been destroyed
if (stream != nullptr && !stream->IsDestroyed()) {
stream->AddChunk(nullptr, 0);
stream->Close(code);
// It is possible for the stream close to occur before the stream is
// ever passed on to the javascript side. If that happens, skip straight
@ -989,9 +989,8 @@ inline int Http2Session::OnStreamClose(nghttp2_session* handle,
stream->object()->Get(context, env->onstreamclose_string())
.ToLocalChecked();
if (fn->IsFunction()) {
Local<Value> argv[2] = {
Integer::NewFromUnsigned(isolate, code),
Boolean::New(isolate, stream->HasDataChunks(true))
Local<Value> argv[] = {
Integer::NewFromUnsigned(isolate, code)
};
stream->MakeCallback(fn.As<Function>(), arraysize(argv), argv);
} else {
@ -1028,6 +1027,8 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
Http2Session* session = static_cast<Http2Session*>(user_data);
DEBUG_HTTP2SESSION2(session, "buffering data chunk for stream %d, size: "
"%d, flags: %d", id, len, flags);
Environment* env = session->env();
HandleScope scope(env->isolate());
// We should never actually get a 0-length chunk so this check is
// only a precaution at this point.
if (len > 0) {
@ -1039,8 +1040,25 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
// If the stream has been destroyed, ignore this chunk
if (stream->IsDestroyed())
return 0;
stream->statistics_.received_bytes += len;
stream->AddChunk(data, len);
// There is a single large array buffer for the entire data read from the
// network; create a slice of that array buffer and emit it as the
// received data buffer.
CHECK(!session->stream_buf_ab_.IsEmpty());
size_t offset = reinterpret_cast<const char*>(data) - session->stream_buf_;
// Verify that the data offset is inside the current read buffer.
CHECK_LE(offset, session->stream_buf_size_);
Local<Object> buf =
Buffer::New(env, session->stream_buf_ab_, offset, len).ToLocalChecked();
stream->EmitData(len, buf, Local<Object>());
if (!stream->IsReading())
stream->inbound_consumed_data_while_paused_ += len;
else
nghttp2_session_consume_stream(handle, id, len);
}
return 0;
}
@ -1226,9 +1244,8 @@ inline void Http2Session::HandlePriorityFrame(const nghttp2_frame* frame) {
// Called by OnFrameReceived when a complete DATA frame has been received.
// If we know that this is the last DATA frame (because the END_STREAM flag
// is set), then we'll terminate the readable side of the StreamBase. If
// the StreamBase is flowing, we'll push the chunks of data out to JS land.
// If we know that this was the last DATA frame (because the END_STREAM flag
// is set), then we'll terminate the readable side of the StreamBase.
inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
int32_t id = GetFrameID(frame);
DEBUG_HTTP2SESSION2(this, "handling data frame for stream %d", id);
@ -1239,11 +1256,8 @@ inline void Http2Session::HandleDataFrame(const nghttp2_frame* frame) {
return;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
stream->AddChunk(nullptr, 0);
stream->EmitData(UV_EOF, Local<Object>(), Local<Object>());
}
if (stream->IsReading())
stream->FlushDataChunks();
}
@ -1618,45 +1632,67 @@ void Http2Session::OnStreamAllocImpl(size_t suggested_size,
uv_buf_t* buf,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
buf->base = session->stream_alloc();
buf->len = kAllocBufferSize;
CHECK_EQ(session->stream_buf_, nullptr);
CHECK_EQ(session->stream_buf_size_, 0);
buf->base = session->stream_buf_ = Malloc(suggested_size);
buf->len = session->stream_buf_size_ = suggested_size;
session->IncrementCurrentSessionMemory(suggested_size);
}
// Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamReadImpl(ssize_t nread,
const uv_buf_t* bufs,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
Http2Scope h2scope(session);
CHECK_NE(session->stream_, nullptr);
DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread);
if (nread < 0) {
uv_buf_t tmp_buf;
tmp_buf.base = nullptr;
tmp_buf.len = 0;
session->prev_read_cb_.fn(nread,
&tmp_buf,
pending,
session->prev_read_cb_.ctx);
return;
}
if (bufs->len > 0) {
if (nread <= 0) {
free(session->stream_buf_);
if (nread < 0) {
uv_buf_t tmp_buf = uv_buf_init(nullptr, 0);
session->prev_read_cb_.fn(nread,
&tmp_buf,
pending,
session->prev_read_cb_.ctx);
}
} else {
// Only pass data on if nread > 0
uv_buf_t buf[] { uv_buf_init((*bufs).base, nread) };
// Verify that currently: There is memory allocated into which
// the data has been read, and that memory buffer is at least as large
// as the amount of data we have read, but we have not yet made an
// ArrayBuffer out of it.
CHECK_NE(session->stream_buf_, nullptr);
CHECK_EQ(session->stream_buf_, buf->base);
CHECK_EQ(session->stream_buf_size_, buf->len);
CHECK_GE(session->stream_buf_size_, static_cast<size_t>(nread));
CHECK(session->stream_buf_ab_.IsEmpty());
Environment* env = session->env();
Isolate* isolate = env->isolate();
HandleScope scope(isolate);
Local<Context> context = env->context();
Context::Scope context_scope(context);
// Create an array buffer for the read data. DATA frames will be emitted
// as slices of this array buffer to avoid having to copy memory.
session->stream_buf_ab_ =
ArrayBuffer::New(isolate,
session->stream_buf_,
session->stream_buf_size_,
v8::ArrayBufferCreationMode::kInternalized);
uv_buf_t buf_ = uv_buf_init(buf->base, nread);
session->statistics_.data_received += nread;
ssize_t ret = session->Write(buf, 1);
ssize_t ret = session->Write(&buf_, 1);
// Note: if ssize_t is not defined (e.g. on Win32), nghttp2 will typedef
// ssize_t to int. Cast here so that the < 0 check actually works on
// Windows.
if (static_cast<int>(ret) < 0) {
DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret);
Environment* env = session->env();
Isolate* isolate = env->isolate();
HandleScope scope(isolate);
Local<Context> context = env->context();
Context::Scope context_scope(context);
Local<Value> argv[1] = {
Integer::New(isolate, ret),
@ -1667,6 +1703,13 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
nghttp2_session_want_read(**session));
}
}
// Since we are finished handling this write, reset the stream buffer.
// The memory has either been free()d or was handed over to V8.
session->DecrementCurrentSessionMemory(session->stream_buf_size_);
session->stream_buf_ = nullptr;
session->stream_buf_size_ = 0;
session->stream_buf_ab_ = Local<ArrayBuffer>();
}
void Http2Session::OnStreamDestructImpl(void* ctx) {
@ -1781,30 +1824,6 @@ void Http2Stream::OnTrailers(const SubmitTrailers& submit_trailers) {
}
}
inline bool Http2Stream::HasDataChunks(bool ignore_eos) {
return data_chunks_.size() > (ignore_eos ? 1 : 0);
}
// Appends a chunk of received DATA frame data to this Http2Streams internal
// queue. Note that we must memcpy each chunk because of the way that nghttp2
// handles it's internal memory`.
inline void Http2Stream::AddChunk(const uint8_t* data, size_t len) {
CHECK(!this->IsDestroyed());
if (this->statistics_.first_byte == 0)
this->statistics_.first_byte = uv_hrtime();
if (flags_ & NGHTTP2_STREAM_FLAG_EOS)
return;
char* buf = nullptr;
if (len > 0 && data != nullptr) {
buf = Malloc<char>(len);
memcpy(buf, data, len);
} else if (data == nullptr) {
flags_ |= NGHTTP2_STREAM_FLAG_EOS;
}
data_chunks_.emplace(uv_buf_init(buf, len));
}
inline void Http2Stream::Close(int32_t code) {
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
@ -1841,13 +1860,6 @@ inline void Http2Stream::Destroy() {
DEBUG_HTTP2STREAM(this, "destroying stream");
// Free any remaining incoming data chunks.
while (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
free(buf.base);
data_chunks_.pop();
}
// Wait until the start of the next loop to delete because there
// may still be some pending operations queued for this stream.
env()->SetImmediate([](Environment* env, void* data) {
@ -1873,39 +1885,6 @@ inline void Http2Stream::Destroy() {
}
// Uses the StreamBase API to push a single chunk of queued inbound DATA
// to JS land.
void Http2Stream::OnDataChunk(uv_buf_t* chunk) {
CHECK(!this->IsDestroyed());
Isolate* isolate = env()->isolate();
HandleScope scope(isolate);
ssize_t len = -1;
Local<Object> buf;
if (chunk != nullptr) {
len = chunk->len;
buf = Buffer::New(isolate, chunk->base, len).ToLocalChecked();
}
EmitData(len, buf, this->object());
}
inline void Http2Stream::FlushDataChunks() {
CHECK(!this->IsDestroyed());
Http2Scope h2scope(this);
if (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
data_chunks_.pop();
if (buf.len > 0) {
CHECK_EQ(nghttp2_session_consume_stream(session_->session(),
id_, buf.len), 0);
OnDataChunk(&buf);
} else {
OnDataChunk(nullptr);
}
}
}
// Initiates a response on the Http2Stream using data provided via the
// StreamBase Streams API.
inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
@ -2012,13 +1991,20 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
// Switch the StreamBase into flowing mode to begin pushing chunks of data
// out to JS land.
inline int Http2Stream::ReadStart() {
Http2Scope h2scope(this);
CHECK(!this->IsDestroyed());
flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
// Flush any queued data chunks immediately out to the JS layer
FlushDataChunks();
DEBUG_HTTP2STREAM(this, "reading starting");
// Tell nghttp2 about our consumption of the data that was handed
// off to JS land.
nghttp2_session_consume_stream(session_->session(),
id_,
inbound_consumed_data_while_paused_);
inbound_consumed_data_while_paused_ = 0;
return 0;
}

View File

@ -550,12 +550,6 @@ class Http2Stream : public AsyncWrap,
inline void EmitStatistics();
inline bool HasDataChunks(bool ignore_eos = false);
inline void AddChunk(const uint8_t* data, size_t len);
inline void FlushDataChunks();
// Process a Data Chunk
void OnDataChunk(uv_buf_t* chunk);
@ -740,8 +734,11 @@ class Http2Stream : public AsyncWrap,
uint32_t current_headers_length_ = 0; // total number of octets
std::vector<nghttp2_header> current_headers_;
// Inbound Data... This is the data received via DATA frames for this stream.
std::queue<uv_buf_t> data_chunks_;
// This keeps track of the amount of data read from the socket while the
// socket was in paused mode. When `ReadStart()` is called (and not before
// then), we tell nghttp2 that we consumed that data to get proper
// backpressure handling.
size_t inbound_consumed_data_while_paused_ = 0;
// Outbound Data... This is the data written by the JS layer that is
// waiting to be written out to the socket.
@ -1085,8 +1082,9 @@ class Http2Session : public AsyncWrap {
// use this to allow timeout tracking during long-lasting writes
uint32_t chunks_sent_since_last_write_ = 0;
uv_prepare_t* prep_ = nullptr;
char stream_buf_[kAllocBufferSize];
char* stream_buf_ = nullptr;
size_t stream_buf_size_ = 0;
v8::Local<v8::ArrayBuffer> stream_buf_ab_;
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
std::queue<Http2Ping*> outstanding_pings_;

View File

@ -268,6 +268,17 @@ fail.
If `fn` is not provided, an empty function will be used.
### mustCallAsync([fn][, exact])
* `fn` [&lt;Function>]
* `exact` [&lt;Number>] default = 1
* return [&lt;Function>]
The same as `mustCall()`, except that it is also checked that the Promise
returned by the function is fulfilled for each invocation of the function.
The return value of the wrapped function is the return value of the original
function, if necessary wrapped as a promise.
### mustCallAtLeast([fn][, minimum])
* `fn` [&lt;Function>] default = () => {}
* `minimum` [&lt;Number>] default = 1

View File

@ -501,6 +501,12 @@ exports.mustCallAtLeast = function(fn, minimum) {
return _mustCallInner(fn, minimum, 'minimum');
};
exports.mustCallAsync = function(fn, exact) {
return exports.mustCall((...args) => {
return Promise.resolve(fn(...args)).then(exports.mustCall((val) => val));
}, exact);
};
function _mustCallInner(fn, criteria = 1, field) {
if (process._exiting)
throw new Error('Cannot use common.mustCall*() in process exit handler');

View File

@ -0,0 +1,49 @@
'use strict';
// Verifies that a full HTTP2 pipeline handles backpressure.
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');
const makeDuplexPair = require('../common/duplexpair');
common.crashOnUnhandledRejection();
{
let req;
const server = http2.createServer();
server.on('stream', common.mustCallAsync(async (stream, headers) => {
stream.respond({
'content-type': 'text/html',
':status': 200
});
req._readableState.highWaterMark = 20;
stream._writableState.highWaterMark = 20;
assert.strictEqual(stream.write('A'.repeat(5)), true);
assert.strictEqual(stream.write('A'.repeat(40)), false);
assert.strictEqual(await event(req, 'data'), 'A'.repeat(5));
assert.strictEqual(await event(req, 'data'), 'A'.repeat(40));
await event(stream, 'drain');
assert.strictEqual(stream.write('A'.repeat(5)), true);
assert.strictEqual(stream.write('A'.repeat(40)), false);
}));
const { clientSide, serverSide } = makeDuplexPair();
server.emit('connection', serverSide);
const client = http2.connect('http://localhost:80', {
createConnection: common.mustCall(() => clientSide)
});
req = client.request({ ':path': '/' });
req.setEncoding('utf8');
req.end();
}
function event(ee, eventName) {
return new Promise((resolve) => {
ee.once(eventName, common.mustCall(resolve));
});
}

View File

@ -56,6 +56,9 @@ let client;
const server = h2.createServer({ settings: { initialWindowSize: 36 } });
server.on('stream', (stream) => {
// Set the high water mark to zero, since otherwise we still accept
// reads from the source stream (if we can consume them).
stream._readableState.highWaterMark = 0;
stream.pause();
stream.on('error', common.expectsError({
code: 'ERR_HTTP2_STREAM_ERROR',