http2: use native pipe instead of synchronous I/O
This resolves the issue of using synchronous I/O for `respondWithFile()` and `respondWithFD()`, and enables scenarios in which the underlying file does not need to be a regular file. PR-URL: https://github.com/nodejs/node/pull/18936 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
parent
67f1d76956
commit
1eb6b01fca
@ -4,9 +4,13 @@
|
||||
|
||||
require('internal/util').assertCrypto();
|
||||
|
||||
const { internalBinding } = require('internal/bootstrap_loaders');
|
||||
const { async_id_symbol } = require('internal/async_hooks').symbols;
|
||||
const { UV_EOF } = process.binding('uv');
|
||||
const http = require('http');
|
||||
const binding = process.binding('http2');
|
||||
const { FileHandle } = process.binding('fs');
|
||||
const { StreamPipe } = internalBinding('stream_pipe');
|
||||
const assert = require('assert');
|
||||
const { Buffer } = require('buffer');
|
||||
const EventEmitter = require('events');
|
||||
@ -65,6 +69,7 @@ const { onServerStream,
|
||||
const { utcDate } = require('internal/http');
|
||||
const { promisify } = require('internal/util');
|
||||
const { isArrayBufferView } = require('internal/util/types');
|
||||
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
|
||||
const { _connectionListener: httpConnectionListener } = require('http');
|
||||
const { createPromise, promiseResolve } = process.binding('util');
|
||||
const debug = util.debuglog('http2');
|
||||
@ -345,9 +350,7 @@ function onStreamClose(code) {
|
||||
stream.end();
|
||||
}
|
||||
|
||||
if (state.fd !== undefined)
|
||||
tryClose(state.fd);
|
||||
|
||||
state.fd = -1;
|
||||
// Defer destroy we actually emit end.
|
||||
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
|
||||
// If errored or ended, we can destroy immediately.
|
||||
@ -1928,6 +1931,26 @@ function processHeaders(headers) {
|
||||
return headers;
|
||||
}
|
||||
|
||||
function onFileCloseError(stream, err) {
|
||||
stream.emit(err);
|
||||
}
|
||||
|
||||
function onFileUnpipe() {
|
||||
const stream = this.sink[kOwner];
|
||||
if (stream.ownsFd)
|
||||
this.source.close().catch(onFileCloseError.bind(stream));
|
||||
else
|
||||
this.source.releaseFD();
|
||||
}
|
||||
|
||||
// This is only called once the pipe has returned back control, so
|
||||
// it only has to handle errors and End-of-File.
|
||||
function onPipedFileHandleRead(err) {
|
||||
if (err < 0 && err !== UV_EOF) {
|
||||
this.stream.close(NGHTTP2_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
|
||||
streamOptions = 0) {
|
||||
const state = self[kState];
|
||||
@ -1940,18 +1963,32 @@ function processRespondWithFD(self, fd, headers, offset = 0, length = -1,
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// Close the writable side of the stream
|
||||
// Close the writable side of the stream, but only as far as the writable
|
||||
// stream implementation is concerned.
|
||||
self._final = null;
|
||||
self.end();
|
||||
|
||||
const ret = self[kHandle].respondFD(fd, headersList,
|
||||
offset, length,
|
||||
streamOptions);
|
||||
const ret = self[kHandle].respond(headersList, streamOptions);
|
||||
|
||||
if (ret < 0) {
|
||||
self.destroy(new NghttpError(ret));
|
||||
return;
|
||||
}
|
||||
|
||||
defaultTriggerAsyncIdScope(self[async_id_symbol], startFilePipe,
|
||||
self, fd, offset, length);
|
||||
}
|
||||
|
||||
function startFilePipe(self, fd, offset, length) {
|
||||
const handle = new FileHandle(fd, offset, length);
|
||||
handle.onread = onPipedFileHandleRead;
|
||||
handle.stream = self;
|
||||
|
||||
const pipe = new StreamPipe(handle._externalStream,
|
||||
self[kHandle]._externalStream);
|
||||
pipe.onunpipe = onFileUnpipe;
|
||||
pipe.start();
|
||||
|
||||
// exact length of the file doesn't matter here, since the
|
||||
// stream is closing anyway - just use 1 to signify that
|
||||
// a write does exist
|
||||
@ -2270,8 +2307,9 @@ class ServerHttp2Stream extends Http2Stream {
|
||||
throw new ERR_INVALID_ARG_TYPE('fd', 'number');
|
||||
|
||||
debug(`Http2Stream ${this[kID]} [Http2Session ` +
|
||||
`${sessionName(session[kType])}]: initiating response`);
|
||||
`${sessionName(session[kType])}]: initiating response from fd`);
|
||||
this[kUpdateTimer]();
|
||||
this.ownsFd = false;
|
||||
|
||||
headers = processHeaders(headers);
|
||||
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
|
||||
@ -2333,9 +2371,9 @@ class ServerHttp2Stream extends Http2Stream {
|
||||
|
||||
const session = this[kSession];
|
||||
debug(`Http2Stream ${this[kID]} [Http2Session ` +
|
||||
`${sessionName(session[kType])}]: initiating response`);
|
||||
`${sessionName(session[kType])}]: initiating response from file`);
|
||||
this[kUpdateTimer]();
|
||||
|
||||
this.ownsFd = true;
|
||||
|
||||
headers = processHeaders(headers);
|
||||
const statusCode = headers[HTTP2_HEADER_STATUS] |= 0;
|
||||
|
@ -1888,28 +1888,6 @@ inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
|
||||
}
|
||||
|
||||
|
||||
// Initiate a response that contains data read from a file descriptor.
|
||||
inline int Http2Stream::SubmitFile(int fd,
|
||||
nghttp2_nv* nva, size_t len,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int options) {
|
||||
CHECK(!this->IsDestroyed());
|
||||
Http2Scope h2scope(this);
|
||||
DEBUG_HTTP2STREAM(this, "submitting file");
|
||||
if (options & STREAM_OPTION_GET_TRAILERS)
|
||||
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
|
||||
|
||||
if (offset > 0) fd_offset_ = offset;
|
||||
if (length > -1) fd_length_ = length;
|
||||
|
||||
Http2Stream::Provider::FD prov(this, options, fd);
|
||||
int ret = nghttp2_submit_response(session_->session(), id_, nva, len, *prov);
|
||||
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
// Submit informational headers for a stream.
|
||||
inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
|
||||
CHECK(!this->IsDestroyed());
|
||||
@ -2085,87 +2063,6 @@ Http2Stream::Provider::~Provider() {
|
||||
provider_.source.ptr = nullptr;
|
||||
}
|
||||
|
||||
// The FD Provider pulls data from a file descriptor using libuv. All of the
|
||||
// data transfer occurs in C++, without any chunks being passed through JS
|
||||
// land.
|
||||
Http2Stream::Provider::FD::FD(Http2Stream* stream, int options, int fd)
|
||||
: Http2Stream::Provider(stream, options) {
|
||||
CHECK(!stream->IsDestroyed());
|
||||
provider_.source.fd = fd;
|
||||
provider_.read_callback = Http2Stream::Provider::FD::OnRead;
|
||||
}
|
||||
|
||||
Http2Stream::Provider::FD::FD(int options, int fd)
|
||||
: Http2Stream::Provider(options) {
|
||||
provider_.source.fd = fd;
|
||||
provider_.read_callback = Http2Stream::Provider::FD::OnRead;
|
||||
}
|
||||
|
||||
ssize_t Http2Stream::Provider::FD::OnRead(nghttp2_session* handle,
|
||||
int32_t id,
|
||||
uint8_t* buf,
|
||||
size_t length,
|
||||
uint32_t* flags,
|
||||
nghttp2_data_source* source,
|
||||
void* user_data) {
|
||||
Http2Session* session = static_cast<Http2Session*>(user_data);
|
||||
Http2Stream* stream = session->FindStream(id);
|
||||
if (stream->statistics_.first_byte_sent == 0)
|
||||
stream->statistics_.first_byte_sent = uv_hrtime();
|
||||
|
||||
DEBUG_HTTP2SESSION2(session, "reading outbound file data for stream %d", id);
|
||||
CHECK_EQ(id, stream->id());
|
||||
|
||||
int fd = source->fd;
|
||||
int64_t offset = stream->fd_offset_;
|
||||
ssize_t numchars = 0;
|
||||
|
||||
if (stream->fd_length_ >= 0 &&
|
||||
stream->fd_length_ < static_cast<int64_t>(length))
|
||||
length = stream->fd_length_;
|
||||
|
||||
uv_buf_t data;
|
||||
data.base = reinterpret_cast<char*>(buf);
|
||||
data.len = length;
|
||||
|
||||
uv_fs_t read_req;
|
||||
|
||||
if (length > 0) {
|
||||
// TODO(addaleax): Never use synchronous I/O on the main thread.
|
||||
numchars = uv_fs_read(session->event_loop(),
|
||||
&read_req,
|
||||
fd, &data, 1,
|
||||
offset, nullptr);
|
||||
uv_fs_req_cleanup(&read_req);
|
||||
}
|
||||
|
||||
// Close the stream with an error if reading fails
|
||||
if (numchars < 0)
|
||||
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
|
||||
|
||||
// Update the read offset for the next read
|
||||
stream->fd_offset_ += numchars;
|
||||
stream->fd_length_ -= numchars;
|
||||
|
||||
DEBUG_HTTP2SESSION2(session, "sending %d bytes", numchars);
|
||||
|
||||
// if numchars < length, assume that we are done.
|
||||
if (static_cast<size_t>(numchars) < length || length <= 0) {
|
||||
DEBUG_HTTP2SESSION2(session, "no more data for stream %d", id);
|
||||
*flags |= NGHTTP2_DATA_FLAG_EOF;
|
||||
session->GetTrailers(stream, flags);
|
||||
// If the stream or session gets destroyed during the GetTrailers
|
||||
// callback, check that here and close down the stream
|
||||
if (stream->IsDestroyed())
|
||||
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
|
||||
if (session->IsDestroyed())
|
||||
return NGHTTP2_ERR_CALLBACK_FAILURE;
|
||||
}
|
||||
|
||||
stream->statistics_.sent_bytes += numchars;
|
||||
return numchars;
|
||||
}
|
||||
|
||||
// The Stream Provider pulls data from a linked list of uv_buf_t structs
|
||||
// built via the StreamBase API and the Streams js API.
|
||||
Http2Stream::Provider::Stream::Stream(int options)
|
||||
@ -2508,27 +2405,6 @@ void Http2Stream::Respond(const FunctionCallbackInfo<Value>& args) {
|
||||
DEBUG_HTTP2STREAM(stream, "response submitted");
|
||||
}
|
||||
|
||||
// Initiates a response on the Http2Stream using a file descriptor to provide
|
||||
// outbound DATA frames.
|
||||
void Http2Stream::RespondFD(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
Local<Context> context = env->context();
|
||||
Isolate* isolate = env->isolate();
|
||||
Http2Stream* stream;
|
||||
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
|
||||
|
||||
int fd = args[0]->Int32Value(context).ToChecked();
|
||||
Local<Array> headers = args[1].As<Array>();
|
||||
|
||||
int64_t offset = args[2]->IntegerValue(context).ToChecked();
|
||||
int64_t length = args[3]->IntegerValue(context).ToChecked();
|
||||
int options = args[4]->IntegerValue(context).ToChecked();
|
||||
|
||||
Headers list(isolate, context, headers);
|
||||
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
|
||||
offset, length, options));
|
||||
DEBUG_HTTP2STREAM2(stream, "file response submitted for fd %d", fd);
|
||||
}
|
||||
|
||||
// Submits informational headers on the Http2Stream
|
||||
void Http2Stream::Info(const FunctionCallbackInfo<Value>& args) {
|
||||
@ -2891,7 +2767,6 @@ void Initialize(Local<Object> target,
|
||||
env->SetProtoMethod(stream, "priority", Http2Stream::Priority);
|
||||
env->SetProtoMethod(stream, "pushPromise", Http2Stream::PushPromise);
|
||||
env->SetProtoMethod(stream, "info", Http2Stream::Info);
|
||||
env->SetProtoMethod(stream, "respondFD", Http2Stream::RespondFD);
|
||||
env->SetProtoMethod(stream, "respond", Http2Stream::Respond);
|
||||
env->SetProtoMethod(stream, "rstStream", Http2Stream::RstStream);
|
||||
env->SetProtoMethod(stream, "refreshState", Http2Stream::RefreshState);
|
||||
|
@ -580,13 +580,6 @@ class Http2Stream : public AsyncWrap,
|
||||
size_t len,
|
||||
int options);
|
||||
|
||||
// Send data read from a file descriptor as the response on this stream.
|
||||
inline int SubmitFile(int fd,
|
||||
nghttp2_nv* nva, size_t len,
|
||||
int64_t offset,
|
||||
int64_t length,
|
||||
int options);
|
||||
|
||||
// Submit informational headers for this stream
|
||||
inline int SubmitInfo(nghttp2_nv* nva, size_t len);
|
||||
|
||||
@ -709,7 +702,6 @@ class Http2Stream : public AsyncWrap,
|
||||
static void PushPromise(const FunctionCallbackInfo<Value>& args);
|
||||
static void RefreshState(const FunctionCallbackInfo<Value>& args);
|
||||
static void Info(const FunctionCallbackInfo<Value>& args);
|
||||
static void RespondFD(const FunctionCallbackInfo<Value>& args);
|
||||
static void Respond(const FunctionCallbackInfo<Value>& args);
|
||||
static void RstStream(const FunctionCallbackInfo<Value>& args);
|
||||
|
||||
@ -753,8 +745,6 @@ class Http2Stream : public AsyncWrap,
|
||||
// waiting to be written out to the socket.
|
||||
std::queue<nghttp2_stream_write> queue_;
|
||||
size_t available_outbound_length_ = 0;
|
||||
int64_t fd_offset_ = 0;
|
||||
int64_t fd_length_ = -1;
|
||||
|
||||
Http2StreamListener stream_listener_;
|
||||
|
||||
@ -780,20 +770,6 @@ class Http2Stream::Provider {
|
||||
bool empty_ = false;
|
||||
};
|
||||
|
||||
class Http2Stream::Provider::FD : public Http2Stream::Provider {
|
||||
public:
|
||||
FD(int options, int fd);
|
||||
FD(Http2Stream* stream, int options, int fd);
|
||||
|
||||
static ssize_t OnRead(nghttp2_session* session,
|
||||
int32_t id,
|
||||
uint8_t* buf,
|
||||
size_t length,
|
||||
uint32_t* flags,
|
||||
nghttp2_data_source* source,
|
||||
void* user_data);
|
||||
};
|
||||
|
||||
class Http2Stream::Provider::Stream : public Http2Stream::Provider {
|
||||
public:
|
||||
Stream(Http2Stream* stream, int options);
|
||||
|
@ -46,8 +46,8 @@ const tests = specificTests.concat(genericTests);
|
||||
|
||||
let currentError;
|
||||
|
||||
// mock respondFD because we only care about testing error handling
|
||||
Http2Stream.prototype.respondFD = () => currentError.ngError;
|
||||
// mock `respond` because we only care about testing error handling
|
||||
Http2Stream.prototype.respond = () => currentError.ngError;
|
||||
|
||||
const server = http2.createServer();
|
||||
server.on('stream', common.mustCall((stream, headers) => {
|
||||
|
Loading…
x
Reference in New Issue
Block a user