http2: add range support for respondWith{File|FD}

* respondWithFD now supports optional statCheck
* respondWithFD and respondWithFile both support offset/length for
  range requests
* Fix linting nits following most recent update

PR-URL: https://github.com/nodejs/node/pull/14239
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
James M Snell 2017-07-22 09:20:53 -07:00
parent 953458f645
commit d6a774b1bd
33 changed files with 357 additions and 79 deletions

View File

@ -998,13 +998,17 @@ server.on('stream', (stream) => {
}); });
``` ```
#### http2stream.respondWithFD(fd[, headers]) #### http2stream.respondWithFD(fd[, headers[, options]])
<!-- YAML <!-- YAML
added: REPLACEME added: REPLACEME
--> -->
* `fd` {number} A readable file descriptor * `fd` {number} A readable file descriptor
* `headers` {[Headers Object][]} * `headers` {[Headers Object][]}
* `options` {Object}
* `statCheck` {Function}
* `offset` {number} The offset position at which to begin reading
* `length` {number} The amount of data from the fd to send
Initiates a response whose data is read from the given file descriptor. No Initiates a response whose data is read from the given file descriptor. No
validation is performed on the given file descriptor. If an error occurs while validation is performed on the given file descriptor. If an error occurs while
@ -1034,6 +1038,16 @@ server.on('stream', (stream) => {
server.on('close', () => fs.closeSync(fd)); server.on('close', () => fs.closeSync(fd));
``` ```
The optional `options.statCheck` function may be specified to give user code
an opportunity to set additional content headers based on the `fs.Stat` details
of the given fd. If the `statCheck` function is provided, the
`http2stream.respondWithFD()` method will perform an `fs.fstat()` call to
collect details on the provided file descriptor.
The `offset` and `length` options may be used to limit the response to a
specific range subset. This can be used, for instance, to support HTTP Range
requests.
#### http2stream.respondWithFile(path[, headers[, options]]) #### http2stream.respondWithFile(path[, headers[, options]])
<!-- YAML <!-- YAML
added: REPLACEME added: REPLACEME
@ -1043,6 +1057,8 @@ added: REPLACEME
* `headers` {[Headers Object][]} * `headers` {[Headers Object][]}
* `options` {Object} * `options` {Object}
* `statCheck` {Function} * `statCheck` {Function}
* `offset` {number} The offset position at which to begin reading
* `length` {number} The amount of data from the fd to send
Sends a regular file as the response. The `path` must specify a regular file Sends a regular file as the response. The `path` must specify a regular file
or an `'error'` event will be emitted on the `Http2Stream` object. or an `'error'` event will be emitted on the `Http2Stream` object.
@ -1096,6 +1112,10 @@ server.on('stream', (stream) => {
The `content-length` header field will be automatically set. The `content-length` header field will be automatically set.
The `offset` and `length` options may be used to limit the response to a
specific range subset. This can be used, for instance, to support HTTP Range
requests.
### Class: Http2Server ### Class: Http2Server
<!-- YAML <!-- YAML
added: REPLACEME added: REPLACEME

View File

@ -1541,7 +1541,7 @@ function processHeaders(headers) {
return headers; return headers;
} }
function processRespondWithFD(fd, headers) { function processRespondWithFD(fd, headers, offset = 0, length = -1) {
const session = this[kSession]; const session = this[kSession];
const state = this[kState]; const state = this[kState];
state.headersSent = true; state.headersSent = true;
@ -1551,7 +1551,7 @@ function processRespondWithFD(fd, headers) {
const handle = session[kHandle]; const handle = session[kHandle];
const ret = const ret =
handle.submitFile(this[kID], fd, headers); handle.submitFile(this[kID], fd, headers, offset, length);
let err; let err;
switch (ret) { switch (ret) {
case NGHTTP2_ERR_NOMEM: case NGHTTP2_ERR_NOMEM:
@ -1575,26 +1575,71 @@ function doSendFD(session, options, fd, headers, err, stat) {
process.nextTick(() => this.emit('error', err)); process.nextTick(() => this.emit('error', err));
return; return;
} }
if (!stat.isFile()) {
err = new errors.Error('ERR_HTTP2_SEND_FILE');
process.nextTick(() => this.emit('error', err));
return;
}
// Set the content-length by default const statOptions = {
headers[HTTP2_HEADER_CONTENT_LENGTH] = stat.size; offset: options.offset !== undefined ? options.offset : 0,
length: options.length !== undefined ? options.length : -1
};
if (typeof options.statCheck === 'function' && if (typeof options.statCheck === 'function' &&
options.statCheck.call(this, stat, headers) === false) { options.statCheck.call(this, stat, headers, statOptions) === false) {
return; return;
} }
const headersList = mapToHeaders(headers, const headersList = mapToHeaders(headers,
assertValidPseudoHeaderResponse); assertValidPseudoHeaderResponse);
if (!Array.isArray(headersList)) { if (!Array.isArray(headersList)) {
throw headersList; process.nextTick(() => this.emit('error', headersList));
} }
processRespondWithFD.call(this, fd, headersList); processRespondWithFD.call(this, fd, headersList,
statOptions.offset,
statOptions.length);
}
function doSendFileFD(session, options, fd, headers, err, stat) {
if (this.destroyed || session.destroyed) {
abort(this);
return;
}
if (err) {
process.nextTick(() => this.emit('error', err));
return;
}
if (!stat.isFile()) {
err = new errors.Error('ERR_HTTP2_SEND_FILE');
process.nextTick(() => this.emit('error', err));
return;
}
const statOptions = {
offset: options.offset !== undefined ? options.offset : 0,
length: options.length !== undefined ? options.length : -1
};
// Set the content-length by default
if (typeof options.statCheck === 'function' &&
options.statCheck.call(this, stat, headers) === false) {
return;
}
statOptions.length =
statOptions.length < 0 ? stat.size - (+statOptions.offset) :
Math.min(stat.size - (+statOptions.offset),
statOptions.length);
if (headers[HTTP2_HEADER_CONTENT_LENGTH] === undefined)
headers[HTTP2_HEADER_CONTENT_LENGTH] = statOptions.length;
const headersList = mapToHeaders(headers,
assertValidPseudoHeaderResponse);
if (!Array.isArray(headersList)) {
process.nextTick(() => this.emit('error', headersList));
}
processRespondWithFD.call(this, fd, headersList,
options.offset,
options.length);
} }
function afterOpen(session, options, headers, err, fd) { function afterOpen(session, options, headers, err, fd) {
@ -1609,7 +1654,7 @@ function afterOpen(session, options, headers, err, fd) {
} }
state.fd = fd; state.fd = fd;
fs.fstat(fd, doSendFD.bind(this, session, options, fd, headers)); fs.fstat(fd, doSendFileFD.bind(this, session, options, fd, headers));
} }
@ -1786,12 +1831,12 @@ class ServerHttp2Stream extends Http2Stream {
} }
// Initiate a response using an open FD. Note that there are fewer // Initiate a response using an open FD. Note that there are fewer
// protections with this approach. For one, the fd is not validated. // protections with this approach. For one, the fd is not validated by
// In respondWithFile, the file is checked to make sure it is a // default. In respondWithFile, the file is checked to make sure it is a
// regular file, here the fd is passed directly. If the underlying // regular file, here the fd is passed directly. If the underlying
// mechanism is not able to read from the fd, then the stream will be // mechanism is not able to read from the fd, then the stream will be
// reset with an error code. // reset with an error code.
respondWithFD(fd, headers) { respondWithFD(fd, headers, options) {
const session = this[kSession]; const session = this[kSession];
if (this.destroyed) if (this.destroyed)
throw new errors.Error('ERR_HTTP2_INVALID_STREAM'); throw new errors.Error('ERR_HTTP2_INVALID_STREAM');
@ -1803,6 +1848,26 @@ class ServerHttp2Stream extends Http2Stream {
if (state.headersSent) if (state.headersSent)
throw new errors.Error('ERR_HTTP2_HEADERS_SENT'); throw new errors.Error('ERR_HTTP2_HEADERS_SENT');
assertIsObject(options, 'options');
options = Object.assign(Object.create(null), options);
if (options.offset !== undefined && typeof options.offset !== 'number')
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'offset',
options.offset);
if (options.length !== undefined && typeof options.length !== 'number')
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'length',
options.length);
if (options.statCheck !== undefined &&
typeof options.statCheck !== 'function') {
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'statCheck',
options.statCheck);
}
if (typeof fd !== 'number') if (typeof fd !== 'number')
throw new errors.TypeError('ERR_INVALID_ARG_TYPE', throw new errors.TypeError('ERR_INVALID_ARG_TYPE',
'fd', 'number'); 'fd', 'number');
@ -1816,13 +1881,20 @@ class ServerHttp2Stream extends Http2Stream {
throw new errors.Error('ERR_HTTP2_PAYLOAD_FORBIDDEN', statusCode); throw new errors.Error('ERR_HTTP2_PAYLOAD_FORBIDDEN', statusCode);
} }
if (options.statCheck !== undefined) {
fs.fstat(fd, doSendFD.bind(this, session, options, fd, headers));
return;
}
const headersList = mapToHeaders(headers, const headersList = mapToHeaders(headers,
assertValidPseudoHeaderResponse); assertValidPseudoHeaderResponse);
if (!Array.isArray(headersList)) { if (!Array.isArray(headersList)) {
throw headersList; process.nextTick(() => this.emit('error', headersList));
} }
processRespondWithFD.call(this, fd, headersList); processRespondWithFD.call(this, fd, headersList,
options.offset,
options.length);
} }
// Initiate a file response on this Http2Stream. The path is passed to // Initiate a file response on this Http2Stream. The path is passed to
@ -1847,6 +1919,16 @@ class ServerHttp2Stream extends Http2Stream {
assertIsObject(options, 'options'); assertIsObject(options, 'options');
options = Object.assign(Object.create(null), options); options = Object.assign(Object.create(null), options);
if (options.offset !== undefined && typeof options.offset !== 'number')
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'offset',
options.offset);
if (options.length !== undefined && typeof options.length !== 'number')
throw new errors.TypeError('ERR_INVALID_OPT_VALUE',
'length',
options.length);
if (options.statCheck !== undefined && if (options.statCheck !== undefined &&
typeof options.statCheck !== 'function') { typeof options.statCheck !== 'function') {
throw new errors.TypeError('ERR_INVALID_OPT_VALUE', throw new errors.TypeError('ERR_INVALID_OPT_VALUE',

View File

@ -605,6 +605,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsNumber()); // Stream ID CHECK(args[0]->IsNumber()); // Stream ID
CHECK(args[1]->IsNumber()); // File Descriptor CHECK(args[1]->IsNumber()); // File Descriptor
CHECK(args[2]->IsArray()); // Headers CHECK(args[2]->IsArray()); // Headers
CHECK(args[3]->IsNumber()); // Offset
CHECK(args[4]->IsNumber()); // Length
Http2Session* session; Http2Session* session;
Nghttp2Stream* stream; Nghttp2Stream* stream;
@ -618,6 +620,11 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
int fd = args[1]->Int32Value(context).ToChecked(); int fd = args[1]->Int32Value(context).ToChecked();
Local<Array> headers = args[2].As<Array>(); Local<Array> headers = args[2].As<Array>();
int64_t offset = args[3]->IntegerValue(context).ToChecked();
int64_t length = args[4]->IntegerValue(context).ToChecked();
CHECK_GE(offset, 0);
DEBUG_HTTP2("Http2Session: submitting file %d for stream %d: headers: %d, " DEBUG_HTTP2("Http2Session: submitting file %d for stream %d: headers: %d, "
"end-stream: %d\n", fd, id, headers->Length()); "end-stream: %d\n", fd, id, headers->Length());
@ -627,7 +634,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
Headers list(isolate, context, headers); Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length())); args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
offset, length));
} }
void Http2Session::SendHeaders(const FunctionCallbackInfo<Value>& args) { void Http2Session::SendHeaders(const FunctionCallbackInfo<Value>& args) {

View File

@ -429,7 +429,10 @@ inline int Nghttp2Stream::SubmitResponse(nghttp2_nv* nva,
} }
// Initiate a response that contains data read from a file descriptor. // Initiate a response that contains data read from a file descriptor.
inline int Nghttp2Stream::SubmitFile(int fd, nghttp2_nv* nva, size_t len) { inline int Nghttp2Stream::SubmitFile(int fd,
nghttp2_nv* nva, size_t len,
int64_t offset,
int64_t length) {
CHECK_GT(len, 0); CHECK_GT(len, 0);
CHECK_GT(fd, 0); CHECK_GT(fd, 0);
DEBUG_HTTP2("Nghttp2Stream %d: submitting file\n", id_); DEBUG_HTTP2("Nghttp2Stream %d: submitting file\n", id_);
@ -438,6 +441,9 @@ inline int Nghttp2Stream::SubmitFile(int fd, nghttp2_nv* nva, size_t len) {
prov.source.fd = fd; prov.source.fd = fd;
prov.read_callback = Nghttp2Session::OnStreamReadFD; prov.read_callback = Nghttp2Session::OnStreamReadFD;
if (offset > 0) fd_offset_ = offset;
if (length > -1) fd_length_ = length;
return nghttp2_submit_response(session_->session(), id_, return nghttp2_submit_response(session_->session(), id_,
nva, len, &prov); nva, len, &prov);
} }

View File

@ -180,18 +180,25 @@ ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session* session,
int fd = source->fd; int fd = source->fd;
int64_t offset = stream->fd_offset_; int64_t offset = stream->fd_offset_;
ssize_t numchars; ssize_t numchars = 0;
if (stream->fd_length_ >= 0 &&
stream->fd_length_ < static_cast<int64_t>(length))
length = stream->fd_length_;
uv_buf_t data; uv_buf_t data;
data.base = reinterpret_cast<char*>(buf); data.base = reinterpret_cast<char*>(buf);
data.len = length; data.len = length;
uv_fs_t read_req; uv_fs_t read_req;
if (length > 0) {
numchars = uv_fs_read(handle->loop_, numchars = uv_fs_read(handle->loop_,
&read_req, &read_req,
fd, &data, 1, fd, &data, 1,
offset, nullptr); offset, nullptr);
uv_fs_req_cleanup(&read_req); uv_fs_req_cleanup(&read_req);
}
// Close the stream with an error if reading fails // Close the stream with an error if reading fails
if (numchars < 0) if (numchars < 0)
@ -199,9 +206,10 @@ ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session* session,
// Update the read offset for the next read // Update the read offset for the next read
stream->fd_offset_ += numchars; stream->fd_offset_ += numchars;
stream->fd_length_ -= numchars;
// if numchars < length, assume that we are done. // if numchars < length, assume that we are done.
if (static_cast<size_t>(numchars) < length) { if (static_cast<size_t>(numchars) < length || length <= 0) {
DEBUG_HTTP2("Nghttp2Session %d: no more data for stream %d\n", DEBUG_HTTP2("Nghttp2Session %d: no more data for stream %d\n",
handle->session_type_, id); handle->session_type_, id);
*flags |= NGHTTP2_DATA_FLAG_EOF; *flags |= NGHTTP2_DATA_FLAG_EOF;

View File

@ -310,7 +310,10 @@ class Nghttp2Stream {
bool emptyPayload = false); bool emptyPayload = false);
// Send data read from a file descriptor as the response on this stream. // Send data read from a file descriptor as the response on this stream.
inline int SubmitFile(int fd, nghttp2_nv* nva, size_t len); inline int SubmitFile(int fd,
nghttp2_nv* nva, size_t len,
int64_t offset,
int64_t length);
// Submit informational headers for this stream // Submit informational headers for this stream
inline int SubmitInfo(nghttp2_nv* nva, size_t len); inline int SubmitInfo(nghttp2_nv* nva, size_t len);
@ -420,7 +423,8 @@ class Nghttp2Stream {
nghttp2_stream_write_queue* queue_tail_ = nullptr; nghttp2_stream_write_queue* queue_tail_ = nullptr;
unsigned int queue_head_index_ = 0; unsigned int queue_head_index_ = 0;
size_t queue_head_offset_ = 0; size_t queue_head_offset_ = 0;
size_t fd_offset_ = 0; int64_t fd_offset_ = 0;
int64_t fd_length_ = -1;
// The Current Headers block... As headers are received for this stream, // The Current Headers block... As headers are received for this stream,
// they are temporarily stored here until the OnFrameReceived is called // they are temporarily stored here until the OnFrameReceived is called

View File

@ -0,0 +1,94 @@
// Flags: --expose-http2
'use strict';
// Tests the ability to minimally request a byte range with respondWithFD
const common = require('../common');
const http2 = require('http2');
const assert = require('assert');
const path = require('path');
const fs = require('fs');
const {
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_CONTENT_LENGTH
} = http2.constants;
const fname = path.resolve(common.fixturesDir, 'printA.js');
const data = fs.readFileSync(fname);
const fd = fs.openSync(fname, 'r');
// Note: this is not anywhere close to a proper implementation of the range
// header.
function getOffsetLength(range) {
if (range === undefined)
return [0, -1];
const r = /bytes=(\d+)-(\d+)/.exec(range);
return [+r[1], +r[2] - +r[1]];
}
const server = http2.createServer();
server.on('stream', (stream, headers) => {
const [ offset, length ] = getOffsetLength(headers.range);
stream.respondWithFD(fd, {
[HTTP2_HEADER_CONTENT_TYPE]: 'text/plain'
}, {
statCheck: common.mustCall((stat, headers, options) => {
assert.strictEqual(options.length, length);
assert.strictEqual(options.offset, offset);
headers[HTTP2_HEADER_CONTENT_LENGTH] =
Math.min(options.length, stat.size - offset);
}),
offset: offset,
length: length
});
});
server.on('close', common.mustCall(() => fs.closeSync(fd)));
server.listen(0, () => {
const client = http2.connect(`http://localhost:${server.address().port}`);
let remaining = 2;
function maybeClose() {
if (--remaining === 0) {
client.destroy();
server.close();
}
}
{
const req = client.request({ range: 'bytes=8-11' });
req.on('response', common.mustCall((headers) => {
assert.strictEqual(headers[HTTP2_HEADER_CONTENT_TYPE], 'text/plain');
assert.strictEqual(+headers[HTTP2_HEADER_CONTENT_LENGTH], 3);
}));
req.setEncoding('utf8');
let check = '';
req.on('data', (chunk) => check += chunk);
req.on('end', common.mustCall(() => {
assert.strictEqual(check, data.toString('utf8', 8, 11));
}));
req.on('streamClosed', common.mustCall(maybeClose));
req.end();
}
{
const req = client.request({ range: 'bytes=8-28' });
req.on('response', common.mustCall((headers) => {
assert.strictEqual(headers[HTTP2_HEADER_CONTENT_TYPE], 'text/plain');
assert.strictEqual(+headers[HTTP2_HEADER_CONTENT_LENGTH], 9);
}));
req.setEncoding('utf8');
let check = '';
req.on('data', (chunk) => check += chunk);
req.on('end', common.mustCall(() => {
assert.strictEqual(check, data.toString('utf8', 8, 28));
}));
req.on('streamClosed', common.mustCall(maybeClose));
req.end();
}
});

View File

@ -0,0 +1,52 @@
// Flags: --expose-http2
'use strict';
const common = require('../common');
const http2 = require('http2');
const assert = require('assert');
const path = require('path');
const fs = require('fs');
const {
HTTP2_HEADER_CONTENT_TYPE,
HTTP2_HEADER_CONTENT_LENGTH,
HTTP2_HEADER_LAST_MODIFIED
} = http2.constants;
const fname = path.resolve(common.fixturesDir, 'printA.js');
const data = fs.readFileSync(fname);
const stat = fs.statSync(fname);
const server = http2.createServer();
server.on('stream', (stream) => {
stream.respondWithFile(fname, {
[HTTP2_HEADER_CONTENT_TYPE]: 'text/plain'
}, {
statCheck: common.mustCall((stat, headers) => {
headers[HTTP2_HEADER_LAST_MODIFIED] = stat.mtime.toUTCString();
}),
offset: 8,
length: 3
});
});
server.listen(0, () => {
const client = http2.connect(`http://localhost:${server.address().port}`);
const req = client.request();
req.on('response', common.mustCall((headers) => {
assert.strictEqual(headers[HTTP2_HEADER_CONTENT_TYPE], 'text/plain');
assert.strictEqual(+headers[HTTP2_HEADER_CONTENT_LENGTH], 3);
assert.strictEqual(headers[HTTP2_HEADER_LAST_MODIFIED],
stat.mtime.toUTCString());
}));
req.setEncoding('utf8');
let check = '';
req.on('data', (chunk) => check += chunk);
req.on('end', common.mustCall(() => {
assert.strictEqual(check, data.toString('utf8', 8, 11));
client.destroy();
server.close();
}));
req.end();
});

View File

@ -39,8 +39,12 @@ const server = tls.Server(options, common.mustCall((socket) => {
server.listen(0, common.mustCall(() => { server.listen(0, common.mustCall(() => {
const port = server.address().port; const port = server.address().port;
const options = {
rejectUnauthorized: false,
port
};
const client = const client =
tls.connect({rejectUnauthorized: false, port: port}, common.mustCall(() => { tls.connect(options, common.mustCall(() => {
client.write(''); client.write('');
// Negotiation is still permitted for this first // Negotiation is still permitted for this first
// attempt. This should succeed. // attempt. This should succeed.