http2: major update to internals

This update does several significant things:

1. It eliminates the base Nghttp2* classes and folds those
   in to node::http2::Http2Session and node::http2::Http2Stream
2. It makes node::http2::Http2Stream a StreamBase instance and
   sends that out to JS-land to act as the [kHandle] for the
   JavaScript Http2Stream class.
3. It shifts some of the callbacks from C++ off of the JavaScript
   Http2Session class to the Http2Stream class.
4. It refactors the data provider structure for FD and Stream
   based sending to help encapsulate those functions easier
5. It streamlines some of the functions at the C++ layer to
   eliminate now unnecessary redirections
6. It cleans up node_http2.cc for better readability and
   maintainability
7. It refactors some of the debug output
8. Because Http2Stream instances are now StreamBases, they are
   now also trackable using async-hooks
9. The Stream::OnRead algorithm has been simplified with a
   couple bugs fixed.
10. I've eliminated node_http2_core.h and node_http2_core-inl.h
11. Detect invalid handshake a report protocol error to session
12. Refactor out of memory error, improve other errors
13. Add Http2Session.prototype.ping

PR-URL: https://github.com/nodejs/node/pull/17105
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Sebastiaan Deckers <sebdeckers83@gmail.com>
This commit is contained in:
James M Snell 2017-11-15 10:55:31 -08:00 committed by Anna Henningsen
parent 2ba93f6ed7
commit 69e6c5a212
No known key found for this signature in database
GPG Key ID: 9C63F3A6CD2AD8F9
37 changed files with 3103 additions and 3410 deletions

View File

@ -908,6 +908,16 @@ limit.
A message payload was specified for an HTTP response code for which a payload is
forbidden.
<a id="ERR_HTTP2_PING_CANCEL"></a>
### ERR_HTTP2_PING_CANCEL
An HTTP/2 ping was cancelled.
<a id="ERR_HTTP2_PING_LENGTH"></a>
### ERR_HTTP2_PING_LENGTH
HTTP/2 ping payloads must be exactly 8 bytes in length.
<a id="ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED"></a>
### ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED

View File

@ -344,6 +344,44 @@ acknowledgement for a sent SETTINGS frame. Will be `true` after calling the
`http2session.settings()` method. Will be `false` once all sent SETTINGS
frames have been acknowledged.
#### http2session.ping([payload, ]callback)
<!-- YAML
added: REPLACEME
-->
* `payload` {Buffer|TypedArray|DataView} Optional ping payload.
* `callback` {Function}
* Returns: {boolean}
Sends a `PING` frame to the connected HTTP/2 peer. A `callback` function must
be provided. The method will return `true` if the `PING` was sent, `false`
otherwise.
The maximum number of outstanding (unacknowledged) pings is determined by the
`maxOutstandingPings` configuration option. The default maximum is 10.
If provided, the `payload` must be a `Buffer`, `TypedArray`, or `DataView`
containing 8 bytes of data that will be transmitted with the `PING` and
returned with the ping acknowledgement.
The callback will be invoked with three arguments: an error argument that will
be `null` if the `PING` was successfully acknowledged, a `duration` argument
that reports the number of milliseconds elapsed since the ping was sent and the
acknowledgement was received, and a `Buffer` containing the 8-byte `PING`
payload.
```js
session.ping(Buffer.from('abcdefgh'), (err, duration, payload) => {
if (!err) {
console.log(`Ping acknowledged in ${duration} milliseconds`);
console.log(`With payload '${payload.toString()}`);
}
});
```
If the `payload` argument is not specified, the default payload will be the
64-bit timestamp (little endian) marking the start of the `PING` duration.
#### http2session.remoteSettings
<!-- YAML
added: v8.4.0
@ -411,19 +449,6 @@ the trailing header fields to send to the peer.
will be emitted if the `getTrailers` callback attempts to set such header
fields.
#### http2session.rstStream(stream, code)
<!-- YAML
added: v8.4.0
-->
* stream {Http2Stream}
* code {number} Unsigned 32-bit integer identifying the error code. **Default:**
`http2.constant.NGHTTP2_NO_ERROR` (`0x00`)
* Returns: {undefined}
Sends an `RST_STREAM` frame to the connected HTTP/2 peer, causing the given
`Http2Stream` to be closed on both sides using [error code][] `code`.
#### http2session.setTimeout(msecs, callback)
<!-- YAML
added: v8.4.0
@ -515,28 +540,6 @@ added: v8.4.0
An object describing the current status of this `Http2Session`.
#### http2session.priority(stream, options)
<!-- YAML
added: v8.4.0
-->
* `stream` {Http2Stream}
* `options` {Object}
* `exclusive` {boolean} When `true` and `parent` identifies a parent Stream,
the given stream is made the sole direct dependency of the parent, with
all other existing dependents made a dependent of the given stream. **Default:**
`false`
* `parent` {number} Specifies the numeric identifier of a stream the given
stream is dependent on.
* `weight` {number} Specifies the relative dependency of a stream in relation
to other streams with the same `parent`. The value is a number between `1`
and `256` (inclusive).
* `silent` {boolean} When `true`, changes the priority locally without
sending a `PRIORITY` frame to the connected peer.
* Returns: {undefined}
Updates the priority for the given `Http2Stream` instance.
#### http2session.settings(settings)
<!-- YAML
added: v8.4.0
@ -624,8 +627,7 @@ is not yet ready for use.
All [`Http2Stream`][] instances are destroyed either when:
* An `RST_STREAM` frame for the stream is received by the connected peer.
* The `http2stream.rstStream()` or `http2session.rstStream()` methods are
called.
* The `http2stream.rstStream()` methods is called.
* The `http2stream.destroy()` or `http2session.destroy()` methods are called.
When an `Http2Stream` instance is destroyed, an attempt will be made to send an
@ -1473,6 +1475,10 @@ not be emitted.
<!-- YAML
added: v8.4.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/17105
description: Added the `maxOutstandingPings` option with a default limit of
10.
- version: v9.1.0
pr-url: https://github.com/nodejs/node/pull/16676
description: Added the `maxHeaderListPairs` option with a default limit of
@ -1484,6 +1490,8 @@ changes:
for deflating header fields. **Default:** `4Kib`
* `maxHeaderListPairs` {number} Sets the maximum number of header entries.
**Default:** `128`. The minimum value is `4`.
* `maxOutstandingPings` {number} Sets the maximum number of outstanding,
unacknowledged pings. The default is `10`.
* `maxSendHeaderBlockLength` {number} Sets the maximum allowed size for a
serialized, compressed block of headers. Attempts to send headers that
exceed this limit will result in a `'frameError'` event being emitted
@ -1535,6 +1543,10 @@ server.listen(80);
<!-- YAML
added: v8.4.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/17105
description: Added the `maxOutstandingPings` option with a default limit of
10.
- version: v9.1.0
pr-url: https://github.com/nodejs/node/pull/16676
description: Added the `maxHeaderListPairs` option with a default limit of
@ -1549,6 +1561,8 @@ changes:
for deflating header fields. **Default:** `4Kib`
* `maxHeaderListPairs` {number} Sets the maximum number of header entries.
**Default:** `128`. The minimum value is `4`.
* `maxOutstandingPings` {number} Sets the maximum number of outstanding,
unacknowledged pings. The default is `10`.
* `maxSendHeaderBlockLength` {number} Sets the maximum allowed size for a
serialized, compressed block of headers. Attempts to send headers that
exceed this limit will result in a `'frameError'` event being emitted
@ -1607,6 +1621,10 @@ server.listen(80);
<!-- YAML
added: v8.4.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/17105
description: Added the `maxOutstandingPings` option with a default limit of
10.
- version: v9.1.0
pr-url: https://github.com/nodejs/node/pull/16676
description: Added the `maxHeaderListPairs` option with a default limit of
@ -1619,6 +1637,8 @@ changes:
for deflating header fields. **Default:** `4Kib`
* `maxHeaderListPairs` {number} Sets the maximum number of header entries.
**Default:** `128`. The minimum value is `1`.
* `maxOutstandingPings` {number} Sets the maximum number of outstanding,
unacknowledged pings. The default is `10`.
* `maxReservedRemoteStreams` {number} Sets the maximum number of reserved push
streams the client will accept at any given time. Once the current number of
currently reserved push streams exceeds reaches this limit, new push streams

View File

@ -299,6 +299,8 @@ E('ERR_HTTP2_OUT_OF_STREAMS',
'No stream ID is available because maximum stream ID has been reached');
E('ERR_HTTP2_PAYLOAD_FORBIDDEN',
'Responses with %s status must not have a payload');
E('ERR_HTTP2_PING_CANCEL', 'HTTP2 ping cancelled');
E('ERR_HTTP2_PING_LENGTH', 'HTTP2 ping payload must be 8 bytes');
E('ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED', 'Cannot set HTTP/2 pseudo-headers');
E('ERR_HTTP2_PUSH_DISABLED', 'HTTP/2 client has disabled push streams');
E('ERR_HTTP2_SEND_FILE', 'Only regular files can be sent');

File diff suppressed because it is too large Load Diff

View File

@ -173,7 +173,8 @@ const IDX_OPTIONS_MAX_SEND_HEADER_BLOCK_LENGTH = 2;
const IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS = 3;
const IDX_OPTIONS_PADDING_STRATEGY = 4;
const IDX_OPTIONS_MAX_HEADER_LIST_PAIRS = 5;
const IDX_OPTIONS_FLAGS = 6;
const IDX_OPTIONS_MAX_OUTSTANDING_PINGS = 6;
const IDX_OPTIONS_FLAGS = 7;
function updateOptionsBuffer(options) {
var flags = 0;
@ -207,6 +208,11 @@ function updateOptionsBuffer(options) {
optionsBuffer[IDX_OPTIONS_MAX_HEADER_LIST_PAIRS] =
options.maxHeaderListPairs;
}
if (typeof options.maxOutstandingPings === 'number') {
flags |= (1 << IDX_OPTIONS_MAX_OUTSTANDING_PINGS);
optionsBuffer[IDX_OPTIONS_MAX_OUTSTANDING_PINGS] =
options.maxOutstandingPings;
}
optionsBuffer[IDX_OPTIONS_FLAGS] = flags;
}
@ -259,25 +265,19 @@ function getDefaultSettings() {
// remote is a boolean. true to fetch remote settings, false to fetch local.
// this is only called internally
function getSettings(session, remote) {
const holder = Object.create(null);
if (remote)
session.refreshRemoteSettings();
session.remoteSettings();
else
session.refreshLocalSettings();
session.localSettings();
holder.headerTableSize =
settingsBuffer[IDX_SETTINGS_HEADER_TABLE_SIZE];
holder.enablePush =
!!settingsBuffer[IDX_SETTINGS_ENABLE_PUSH];
holder.initialWindowSize =
settingsBuffer[IDX_SETTINGS_INITIAL_WINDOW_SIZE];
holder.maxFrameSize =
settingsBuffer[IDX_SETTINGS_MAX_FRAME_SIZE];
holder.maxConcurrentStreams =
settingsBuffer[IDX_SETTINGS_MAX_CONCURRENT_STREAMS];
holder.maxHeaderListSize =
settingsBuffer[IDX_SETTINGS_MAX_HEADER_LIST_SIZE];
return holder;
return {
headerTableSize: settingsBuffer[IDX_SETTINGS_HEADER_TABLE_SIZE],
enablePush: !!settingsBuffer[IDX_SETTINGS_ENABLE_PUSH],
initialWindowSize: settingsBuffer[IDX_SETTINGS_INITIAL_WINDOW_SIZE],
maxFrameSize: settingsBuffer[IDX_SETTINGS_MAX_FRAME_SIZE],
maxConcurrentStreams: settingsBuffer[IDX_SETTINGS_MAX_CONCURRENT_STREAMS],
maxHeaderListSize: settingsBuffer[IDX_SETTINGS_MAX_HEADER_LIST_SIZE]
};
}
function updateSettingsBuffer(settings) {
@ -316,45 +316,39 @@ function updateSettingsBuffer(settings) {
}
function getSessionState(session) {
const holder = Object.create(null);
binding.refreshSessionState(session);
holder.effectiveLocalWindowSize =
sessionState[IDX_SESSION_STATE_EFFECTIVE_LOCAL_WINDOW_SIZE];
holder.effectiveRecvDataLength =
sessionState[IDX_SESSION_STATE_EFFECTIVE_RECV_DATA_LENGTH];
holder.nextStreamID =
sessionState[IDX_SESSION_STATE_NEXT_STREAM_ID];
holder.localWindowSize =
sessionState[IDX_SESSION_STATE_LOCAL_WINDOW_SIZE];
holder.lastProcStreamID =
sessionState[IDX_SESSION_STATE_LAST_PROC_STREAM_ID];
holder.remoteWindowSize =
sessionState[IDX_SESSION_STATE_REMOTE_WINDOW_SIZE];
holder.outboundQueueSize =
sessionState[IDX_SESSION_STATE_OUTBOUND_QUEUE_SIZE];
holder.deflateDynamicTableSize =
sessionState[IDX_SESSION_STATE_HD_DEFLATE_DYNAMIC_TABLE_SIZE];
holder.inflateDynamicTableSize =
sessionState[IDX_SESSION_STATE_HD_INFLATE_DYNAMIC_TABLE_SIZE];
return holder;
session.refreshState();
return {
effectiveLocalWindowSize:
sessionState[IDX_SESSION_STATE_EFFECTIVE_LOCAL_WINDOW_SIZE],
effectiveRecvDataLength:
sessionState[IDX_SESSION_STATE_EFFECTIVE_RECV_DATA_LENGTH],
nextStreamID:
sessionState[IDX_SESSION_STATE_NEXT_STREAM_ID],
localWindowSize:
sessionState[IDX_SESSION_STATE_LOCAL_WINDOW_SIZE],
lastProcStreamID:
sessionState[IDX_SESSION_STATE_LAST_PROC_STREAM_ID],
remoteWindowSize:
sessionState[IDX_SESSION_STATE_REMOTE_WINDOW_SIZE],
outboundQueueSize:
sessionState[IDX_SESSION_STATE_OUTBOUND_QUEUE_SIZE],
deflateDynamicTableSize:
sessionState[IDX_SESSION_STATE_HD_DEFLATE_DYNAMIC_TABLE_SIZE],
inflateDynamicTableSize:
sessionState[IDX_SESSION_STATE_HD_INFLATE_DYNAMIC_TABLE_SIZE]
};
}
function getStreamState(session, stream) {
const holder = Object.create(null);
binding.refreshStreamState(session, stream);
holder.state =
streamState[IDX_STREAM_STATE];
holder.weight =
streamState[IDX_STREAM_STATE_WEIGHT];
holder.sumDependencyWeight =
streamState[IDX_STREAM_STATE_SUM_DEPENDENCY_WEIGHT];
holder.localClose =
streamState[IDX_STREAM_STATE_LOCAL_CLOSE];
holder.remoteClose =
streamState[IDX_STREAM_STATE_REMOTE_CLOSE];
holder.localWindowSize =
streamState[IDX_STREAM_STATE_LOCAL_WINDOW_SIZE];
return holder;
function getStreamState(stream) {
stream.refreshState();
return {
state: streamState[IDX_STREAM_STATE],
weight: streamState[IDX_STREAM_STATE_WEIGHT],
sumDependencyWeight: streamState[IDX_STREAM_STATE_SUM_DEPENDENCY_WEIGHT],
localClose: streamState[IDX_STREAM_STATE_LOCAL_CLOSE],
remoteClose: streamState[IDX_STREAM_STATE_REMOTE_CLOSE],
localWindowSize: streamState[IDX_STREAM_STATE_LOCAL_WINDOW_SIZE]
};
}
function isIllegalConnectionSpecificHeader(name, value) {

View File

@ -257,8 +257,6 @@
'src/js_stream.h',
'src/module_wrap.h',
'src/node.h',
'src/node_http2_core.h',
'src/node_http2_core-inl.h',
'src/node_buffer.h',
'src/node_constants.h',
'src/node_debug_options.h',

View File

@ -41,7 +41,8 @@ namespace node {
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
V(HTTP2SESSION) \
V(HTTP2SESSIONSHUTDOWNWRAP) \
V(HTTP2STREAM) \
V(HTTP2PING) \
V(HTTPPARSER) \
V(JSSTREAM) \
V(PIPECONNECTWRAP) \

View File

@ -310,6 +310,8 @@ class ModuleWrap;
V(buffer_prototype_object, v8::Object) \
V(context, v8::Context) \
V(domains_stack_array, v8::Array) \
V(http2ping_constructor_template, v8::ObjectTemplate) \
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(inspector_console_api_object, v8::Object) \
V(module_load_list_array, v8::Array) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,7 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "node_http2_core-inl.h"
#include "nghttp2/nghttp2.h"
#include "node_http2_state.h"
#include "stream_base-inl.h"
#include "string_bytes.h"
@ -19,6 +19,129 @@ using v8::EscapableHandleScope;
using v8::Isolate;
using v8::MaybeLocal;
#ifdef NODE_DEBUG_HTTP2
// Adapted from nghttp2 own debug printer
static inline void _debug_vfprintf(const char* fmt, va_list args) {
vfprintf(stderr, fmt, args);
}
void inline debug_vfprintf(const char* format, ...) {
va_list args;
va_start(args, format);
_debug_vfprintf(format, args);
va_end(args);
}
#define DEBUG_HTTP2(...) debug_vfprintf(__VA_ARGS__);
#define DEBUG_HTTP2SESSION(session, message) \
do { \
DEBUG_HTTP2("Http2Session %s (%.0lf) " message "\n", \
session->TypeName(), \
session->get_async_id()); \
} while (0)
#define DEBUG_HTTP2SESSION2(session, message, ...) \
do { \
DEBUG_HTTP2("Http2Session %s (%.0lf) " message "\n", \
session->TypeName(), \
session->get_async_id(), \
__VA_ARGS__); \
} while (0)
#define DEBUG_HTTP2STREAM(stream, message) \
do { \
DEBUG_HTTP2("Http2Stream %d (%.0lf) [Http2Session %s (%.0lf)] " message \
"\n", stream->id(), stream->get_async_id(), \
stream->session()->TypeName(), \
stream->session()->get_async_id()); \
} while (0)
#define DEBUG_HTTP2STREAM2(stream, message, ...) \
do { \
DEBUG_HTTP2("Http2Stream %d (%.0lf) [Http2Session %s (%.0lf)] " message \
"\n", stream->id(), stream->get_async_id(), \
stream->session()->TypeName(), \
stream->session()->get_async_id(), \
__VA_ARGS__); \
} while (0)
#else
#define DEBUG_HTTP2(...) do {} while (0)
#define DEBUG_HTTP2SESSION(...) do {} while (0)
#define DEBUG_HTTP2SESSION2(...) do {} while (0)
#define DEBUG_HTTP2STREAM(...) do {} while (0)
#define DEBUG_HTTP2STREAM2(...) do {} while (0)
#endif
#define DEFAULT_MAX_PINGS 10
#define DEFAULT_SETTINGS_HEADER_TABLE_SIZE 4096
#define DEFAULT_SETTINGS_ENABLE_PUSH 1
#define DEFAULT_SETTINGS_INITIAL_WINDOW_SIZE 65535
#define DEFAULT_SETTINGS_MAX_FRAME_SIZE 16384
#define DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE 65535
#define MAX_MAX_FRAME_SIZE 16777215
#define MIN_MAX_FRAME_SIZE DEFAULT_SETTINGS_MAX_FRAME_SIZE
#define MAX_INITIAL_WINDOW_SIZE 2147483647
#define MAX_MAX_HEADER_LIST_SIZE 16777215u
#define DEFAULT_MAX_HEADER_LIST_PAIRS 128u
struct nghttp2_stream_write_t;
#define MAX_BUFFER_COUNT 16
enum nghttp2_session_type {
NGHTTP2_SESSION_SERVER,
NGHTTP2_SESSION_CLIENT
};
enum nghttp2_shutdown_flags {
NGHTTP2_SHUTDOWN_FLAG_GRACEFUL
};
enum nghttp2_stream_flags {
NGHTTP2_STREAM_FLAG_NONE = 0x0,
// Writable side has ended
NGHTTP2_STREAM_FLAG_SHUT = 0x1,
// Reading has started
NGHTTP2_STREAM_FLAG_READ_START = 0x2,
// Reading is paused
NGHTTP2_STREAM_FLAG_READ_PAUSED = 0x4,
// Stream is closed
NGHTTP2_STREAM_FLAG_CLOSED = 0x8,
// Stream is destroyed
NGHTTP2_STREAM_FLAG_DESTROYED = 0x10,
// Stream has trailers
NGHTTP2_STREAM_FLAG_TRAILERS = 0x20
};
enum nghttp2_stream_options {
STREAM_OPTION_EMPTY_PAYLOAD = 0x1,
STREAM_OPTION_GET_TRAILERS = 0x2,
};
// Callbacks
typedef void (*nghttp2_stream_write_cb)(
nghttp2_stream_write_t* req,
int status);
struct nghttp2_stream_write {
unsigned int nbufs = 0;
nghttp2_stream_write_t* req = nullptr;
nghttp2_stream_write_cb cb = nullptr;
MaybeStackBuffer<uv_buf_t, MAX_BUFFER_COUNT> bufs;
};
struct nghttp2_header {
nghttp2_rcbuf* name = nullptr;
nghttp2_rcbuf* value = nullptr;
uint8_t flags = 0;
};
struct nghttp2_stream_write_t {
void* data;
int status;
};
// Unlike the HTTP/1 implementation, the HTTP/2 implementation is not limited
// to a fixed number of known supported HTTP methods. These constants, therefore
// are provided strictly as a convenience to users and are exposed via the
@ -292,6 +415,11 @@ const char* nghttp2_errname(int rv) {
}
}
enum session_state_flags {
SESSION_STATE_NONE = 0x0,
SESSION_STATE_DESTROYING = 0x1
};
// This allows for 4 default-sized frames with their frame headers
static const size_t kAllocBufferSize = 4 * (16384 + 9);
@ -299,6 +427,7 @@ typedef uint32_t(*get_setting)(nghttp2_session* session,
nghttp2_settings_id id);
class Http2Session;
class Http2Stream;
// The Http2Options class is used to parse the options object passed in to
// a Http2Session object and convert those into an appropriate nghttp2_option
@ -332,10 +461,19 @@ class Http2Options {
return padding_strategy_;
}
void SetMaxOutstandingPings(size_t max) {
max_outstanding_pings_ = max;
}
size_t GetMaxOutstandingPings() {
return max_outstanding_pings_;
}
private:
nghttp2_option* options_;
uint32_t max_header_pairs_ = DEFAULT_MAX_HEADER_LIST_PAIRS;
padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE;
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
};
// The Http2Settings class is used to parse the settings passed in for
@ -382,82 +520,133 @@ class Http2Priority {
nghttp2_priority_spec spec;
};
class Http2Session : public AsyncWrap,
public StreamBase,
public Nghttp2Session {
class Http2Stream : public AsyncWrap,
public StreamBase {
public:
Http2Session(Environment* env,
Local<Object> wrap,
nghttp2_session_type type);
~Http2Session() override;
Http2Stream(Http2Session* session,
int32_t id,
nghttp2_headers_category category = NGHTTP2_HCAT_HEADERS,
int options = 0);
~Http2Stream() override;
static void OnStreamAllocImpl(size_t suggested_size,
uv_buf_t* buf,
void* ctx);
static void OnStreamReadImpl(ssize_t nread,
const uv_buf_t* bufs,
uv_handle_type pending,
void* ctx);
nghttp2_stream* operator*();
protected:
ssize_t OnMaxFrameSizePadding(size_t frameLength,
size_t maxPayloadLen);
Http2Session* session() { return session_; }
ssize_t OnCallbackPadding(size_t frame,
size_t maxPayloadLen);
// Queue outbound chunks of data to be sent on this stream
inline int Write(
nghttp2_stream_write_t* req,
const uv_buf_t bufs[],
unsigned int nbufs,
nghttp2_stream_write_cb cb);
bool HasGetPaddingCallback() override {
return padding_strategy_ == PADDING_STRATEGY_MAX ||
padding_strategy_ == PADDING_STRATEGY_CALLBACK;
inline void AddChunk(const uint8_t* data, size_t len);
inline void FlushDataChunks();
// Process a Data Chunk
void OnDataChunk(uv_buf_t* chunk);
// Required for StreamBase
int ReadStart() override;
// Required for StreamBase
int ReadStop() override;
// Required for StreamBase
int DoShutdown(ShutdownWrap* req_wrap);
// Initiate a response on this stream.
inline int SubmitResponse(nghttp2_nv* nva,
size_t len,
int options);
// Send data read from a file descriptor as the response on this stream.
inline int SubmitFile(int fd,
nghttp2_nv* nva, size_t len,
int64_t offset,
int64_t length,
int options);
// Submit informational headers for this stream
inline int SubmitInfo(nghttp2_nv* nva, size_t len);
// Submit a PRIORITY frame for this stream
inline int SubmitPriority(nghttp2_priority_spec* prispec,
bool silent = false);
// Submits an RST_STREAM frame using the given code
inline int SubmitRstStream(const uint32_t code);
// Submits a PUSH_PROMISE frame with this stream as the parent.
inline Http2Stream* SubmitPushPromise(
nghttp2_nv* nva,
size_t len,
int32_t* ret,
int options = 0);
inline void Close(int32_t code);
// Shutdown the writable side of the stream
inline void Shutdown();
// Destroy this stream instance and free all held memory.
inline void Destroy();
inline bool IsDestroyed() const {
return flags_ & NGHTTP2_STREAM_FLAG_DESTROYED;
}
ssize_t GetPadding(size_t frameLength, size_t maxPayloadLen) override {
if (padding_strategy_ == PADDING_STRATEGY_MAX) {
return OnMaxFrameSizePadding(frameLength, maxPayloadLen);
inline bool IsWritable() const {
return !(flags_ & NGHTTP2_STREAM_FLAG_SHUT);
}
inline bool IsPaused() const {
return flags_ & NGHTTP2_STREAM_FLAG_READ_PAUSED;
}
inline bool IsClosed() const {
return flags_ & NGHTTP2_STREAM_FLAG_CLOSED;
}
#if defined(DEBUG) && DEBUG
CHECK_EQ(padding_strategy_, PADDING_STRATEGY_CALLBACK);
#endif
return OnCallbackPadding(frameLength, maxPayloadLen);
inline bool HasTrailers() const {
return flags_ & NGHTTP2_STREAM_FLAG_TRAILERS;
}
void OnHeaders(
Nghttp2Stream* stream,
nghttp2_header* headers,
size_t count,
nghttp2_headers_category cat,
uint8_t flags) override;
void OnStreamClose(int32_t id, uint32_t code) override;
void OnDataChunk(Nghttp2Stream* stream, uv_buf_t* chunk) override;
void OnSettings(bool ack) override;
void OnPriority(int32_t stream,
int32_t parent,
int32_t weight,
int8_t exclusive) override;
void OnGoAway(int32_t lastStreamID,
uint32_t errorCode,
uint8_t* data,
size_t length) override;
void OnFrameError(int32_t id, uint8_t type, int error_code) override;
void OnTrailers(Nghttp2Stream* stream,
const SubmitTrailers& submit_trailers) override;
void Send(WriteWrap* req, char* buf, size_t length) override;
WriteWrap* AllocateSend() override;
int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count,
uv_stream_t* send_handle) override;
AsyncWrap* GetAsyncWrap() override {
return static_cast<AsyncWrap*>(this);
// Returns true if this stream is in the reading state, which occurs when
// the NGHTTP2_STREAM_FLAG_READ_START flag has been set and the
// NGHTTP2_STREAM_FLAG_READ_PAUSED flag is *not* set.
inline bool IsReading() const {
return flags_ & NGHTTP2_STREAM_FLAG_READ_START &&
!(flags_ & NGHTTP2_STREAM_FLAG_READ_PAUSED);
}
void* Cast() override {
return reinterpret_cast<void*>(this);
// Returns the RST_STREAM code used to close this stream
inline int32_t code() const { return code_; }
// Returns the stream identifier for this stream
inline int32_t id() const { return id_; }
inline bool AddHeader(nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags);
inline nghttp2_header* headers() {
return current_headers_.data();
}
inline nghttp2_headers_category headers_category() const {
return current_headers_category_;
}
inline size_t headers_count() const {
return current_headers_.size();
}
void StartHeaders(nghttp2_headers_category category);
// Required for StreamBase
bool IsAlive() override {
return true;
@ -468,47 +657,215 @@ class Http2Session : public AsyncWrap,
return false;
}
// Required for StreamBase
int ReadStart() override { return 0; }
AsyncWrap* GetAsyncWrap() override { return static_cast<AsyncWrap*>(this); }
void* Cast() override { return reinterpret_cast<void*>(this); }
// Required for StreamBase
int ReadStop() override { return 0; }
int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count,
uv_stream_t* send_handle) override;
// Required for StreamBase
int DoShutdown(ShutdownWrap* req_wrap) override {
return 0;
}
size_t self_size() const override { return sizeof(*this); }
uv_loop_t* event_loop() const override {
return env()->event_loop();
}
// Handling Trailer Headers
class SubmitTrailers {
public:
inline void Submit(nghttp2_nv* trailers, size_t length) const;
inline SubmitTrailers(Http2Session* sesion,
Http2Stream* stream,
uint32_t* flags);
private:
Http2Session* const session_;
Http2Stream* const stream_;
uint32_t* const flags_;
friend class Http2Stream;
};
void OnTrailers(const SubmitTrailers& submit_trailers);
// JavaScript API
static void GetID(const FunctionCallbackInfo<Value>& args);
static void Destroy(const FunctionCallbackInfo<Value>& args);
static void FlushData(const FunctionCallbackInfo<Value>& args);
static void Priority(const FunctionCallbackInfo<Value>& args);
static void PushPromise(const FunctionCallbackInfo<Value>& args);
static void RefreshState(const FunctionCallbackInfo<Value>& args);
static void Info(const FunctionCallbackInfo<Value>& args);
static void RespondFD(const FunctionCallbackInfo<Value>& args);
static void Respond(const FunctionCallbackInfo<Value>& args);
static void RstStream(const FunctionCallbackInfo<Value>& args);
class Provider;
private:
Http2Session* session_; // The Parent HTTP/2 Session
int32_t id_; // The Stream Identifier
int32_t code_ = NGHTTP2_NO_ERROR; // The RST_STREAM code (if any)
int flags_ = NGHTTP2_STREAM_FLAG_NONE; // Internal state flags
uint32_t max_header_pairs_ = DEFAULT_MAX_HEADER_LIST_PAIRS;
uint32_t max_header_length_ = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
// The Current Headers block... As headers are received for this stream,
// they are temporarily stored here until the OnFrameReceived is called
// signalling the end of the HEADERS frame
nghttp2_headers_category current_headers_category_ = NGHTTP2_HCAT_HEADERS;
uint32_t current_headers_length_ = 0; // total number of octets
std::vector<nghttp2_header> current_headers_;
// Inbound Data... This is the data received via DATA frames for this stream.
std::queue<uv_buf_t> data_chunks_;
// Outbound Data... This is the data written by the JS layer that is
// waiting to be written out to the socket.
std::queue<nghttp2_stream_write*> queue_;
unsigned int queue_index_ = 0;
size_t queue_offset_ = 0;
int64_t fd_offset_ = 0;
int64_t fd_length_ = -1;
};
class Http2Stream::Provider {
public:
Provider(Http2Stream* stream, int options);
explicit Provider(int options);
virtual ~Provider();
nghttp2_data_provider* operator*() {
return !empty_ ? &provider_ : nullptr;
}
class FD;
class Stream;
protected:
nghttp2_data_provider provider_;
private:
bool empty_ = false;
};
class Http2Stream::Provider::FD : public Http2Stream::Provider {
public:
FD(int options, int fd);
FD(Http2Stream* stream, int options, int fd);
static ssize_t OnRead(nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data);
};
class Http2Stream::Provider::Stream : public Http2Stream::Provider {
public:
Stream(Http2Stream* stream, int options);
explicit Stream(int options);
static ssize_t OnRead(nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data);
};
class Http2Session : public AsyncWrap {
public:
Http2Session(Environment* env,
Local<Object> wrap,
nghttp2_session_type type = NGHTTP2_SESSION_SERVER);
~Http2Session() override;
class Http2Ping;
void Start();
void Stop();
void Close();
void Consume(Local<External> external);
void Unconsume();
bool Ping(v8::Local<v8::Function> function);
inline void SendPendingData();
// Submits a new request. If the request is a success, assigned
// will be a pointer to the Http2Stream instance assigned.
// This only works if the session is a client session.
inline Http2Stream* SubmitRequest(
nghttp2_priority_spec* prispec,
nghttp2_nv* nva,
size_t len,
int32_t* ret,
int options = 0);
nghttp2_session_type type() const { return session_type_; }
inline nghttp2_session* session() const { return session_; }
nghttp2_session* operator*() { return session_; }
uint32_t GetMaxHeaderPairs() const { return max_header_pairs_; }
inline const char* TypeName();
inline void MarkDestroying() { flags_ |= SESSION_STATE_DESTROYING; }
inline bool IsDestroying() { return flags_ & SESSION_STATE_DESTROYING; }
// Returns pointer to the stream, or nullptr if stream does not exist
inline Http2Stream* FindStream(int32_t id);
// Adds a stream instance to this session
inline void AddStream(Http2Stream* stream);
// Removes a stream instance from this session
inline void RemoveStream(int32_t id);
// Sends a notice to the connected peer that the session is shutting down.
inline void SubmitShutdownNotice();
// Submits a SETTINGS frame to the connected peer.
inline void Settings(const nghttp2_settings_entry iv[], size_t niv);
// Write data to the session
inline ssize_t Write(const uv_buf_t* bufs, size_t nbufs);
inline void SetChunksSinceLastWrite(size_t n = 0);
size_t self_size() const override { return sizeof(*this); }
char* stream_alloc() {
return stream_buf_;
}
inline void GetTrailers(Http2Stream* stream, uint32_t* flags);
static void OnStreamAllocImpl(size_t suggested_size,
uv_buf_t* buf,
void* ctx);
static void OnStreamReadImpl(ssize_t nread,
const uv_buf_t* bufs,
uv_handle_type pending,
void* ctx);
// The JavaScript API
static void New(const FunctionCallbackInfo<Value>& args);
static void Consume(const FunctionCallbackInfo<Value>& args);
static void Unconsume(const FunctionCallbackInfo<Value>& args);
static void Destroying(const FunctionCallbackInfo<Value>& args);
static void Destroy(const FunctionCallbackInfo<Value>& args);
static void SubmitSettings(const FunctionCallbackInfo<Value>& args);
static void SubmitRstStream(const FunctionCallbackInfo<Value>& args);
static void SubmitResponse(const FunctionCallbackInfo<Value>& args);
static void SubmitFile(const FunctionCallbackInfo<Value>& args);
static void SubmitRequest(const FunctionCallbackInfo<Value>& args);
static void SubmitPushPromise(const FunctionCallbackInfo<Value>& args);
static void SubmitPriority(const FunctionCallbackInfo<Value>& args);
static void SendHeaders(const FunctionCallbackInfo<Value>& args);
static void ShutdownStream(const FunctionCallbackInfo<Value>& args);
static void StreamWrite(const FunctionCallbackInfo<Value>& args);
static void StreamReadStart(const FunctionCallbackInfo<Value>& args);
static void StreamReadStop(const FunctionCallbackInfo<Value>& args);
static void Settings(const FunctionCallbackInfo<Value>& args);
static void Request(const FunctionCallbackInfo<Value>& args);
static void SetNextStreamID(const FunctionCallbackInfo<Value>& args);
static void SendShutdownNotice(const FunctionCallbackInfo<Value>& args);
static void SubmitGoaway(const FunctionCallbackInfo<Value>& args);
static void DestroyStream(const FunctionCallbackInfo<Value>& args);
static void FlushData(const FunctionCallbackInfo<Value>& args);
static void ShutdownNotice(const FunctionCallbackInfo<Value>& args);
static void Goaway(const FunctionCallbackInfo<Value>& args);
static void UpdateChunksSent(const FunctionCallbackInfo<Value>& args);
static void RefreshState(const FunctionCallbackInfo<Value>& args);
static void Ping(const FunctionCallbackInfo<Value>& args);
template <get_setting fn>
static void RefreshSettings(const FunctionCallbackInfo<Value>& args);
@ -516,17 +873,125 @@ class Http2Session : public AsyncWrap,
template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args);
size_t self_size() const override {
return sizeof(*this);
void Send(WriteWrap* req, char* buf, size_t length);
WriteWrap* AllocateSend();
uv_loop_t* event_loop() const {
return env()->event_loop();
}
char* stream_alloc() {
return stream_buf_;
}
void Close() override;
Http2Ping* PopPing();
bool AddPing(Http2Ping* ping);
private:
// Frame Padding Strategies
inline ssize_t OnMaxFrameSizePadding(size_t frameLength,
size_t maxPayloadLen);
inline ssize_t OnCallbackPadding(size_t frame,
size_t maxPayloadLen);
// Frame Handler
inline void HandleDataFrame(const nghttp2_frame* frame);
inline void HandleGoawayFrame(const nghttp2_frame* frame);
inline void HandleHeadersFrame(const nghttp2_frame* frame);
inline void HandlePriorityFrame(const nghttp2_frame* frame);
inline void HandleSettingsFrame(const nghttp2_frame* frame);
inline void HandlePingFrame(const nghttp2_frame* frame);
// nghttp2 callbacks
static inline int OnBeginHeadersCallback(
nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data);
static inline int OnHeaderCallback(
nghttp2_session* session,
const nghttp2_frame* frame,
nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags,
void* user_data);
static inline int OnFrameReceive(
nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data);
static inline int OnFrameNotSent(
nghttp2_session* session,
const nghttp2_frame* frame,
int error_code,
void* user_data);
static inline int OnStreamClose(
nghttp2_session* session,
int32_t id,
uint32_t code,
void* user_data);
static inline int OnInvalidHeader(
nghttp2_session* session,
const nghttp2_frame* frame,
nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags,
void* user_data);
static inline int OnDataChunkReceived(
nghttp2_session* session,
uint8_t flags,
int32_t id,
const uint8_t* data,
size_t len,
void* user_data);
static inline ssize_t OnSelectPadding(
nghttp2_session* session,
const nghttp2_frame* frame,
size_t maxPayloadLen,
void* user_data);
static inline int OnNghttpError(
nghttp2_session* session,
const char* message,
size_t len,
void* user_data);
static inline ssize_t OnStreamReadFD(
nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data);
static inline ssize_t OnStreamRead(
nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data);
struct Callbacks {
inline explicit Callbacks(bool kHasGetPaddingCallback);
inline ~Callbacks();
nghttp2_session_callbacks* callbacks;
};
/* Use callback_struct_saved[kHasGetPaddingCallback ? 1 : 0] */
static const Callbacks callback_struct_saved[2];
// The underlying nghttp2_session handle
nghttp2_session* session_;
// The session type: client or server
nghttp2_session_type session_type_;
// The maximum number of header pairs permitted for streams on this session
uint32_t max_header_pairs_ = DEFAULT_MAX_HEADER_LIST_PAIRS;
// The collection of active Http2Streams associated with this session
std::unordered_map<int32_t, Http2Stream*> streams_;
int flags_ = SESSION_STATE_NONE;
// The StreamBase instance being used for i/o
StreamBase* stream_;
StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
@ -534,9 +999,27 @@ class Http2Session : public AsyncWrap,
// use this to allow timeout tracking during long-lasting writes
uint32_t chunks_sent_since_last_write_ = 0;
uv_prepare_t* prep_ = nullptr;
uv_prepare_t* prep_ = nullptr;
char stream_buf_[kAllocBufferSize];
size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
std::queue<Http2Ping*> outstanding_pings_;
};
class Http2Session::Http2Ping : public AsyncWrap {
public:
explicit Http2Ping(Http2Session* session);
~Http2Ping();
size_t self_size() const override { return sizeof(*this); }
void Send(uint8_t* payload);
void Done(bool ack, const uint8_t* payload = nullptr);
private:
Http2Session* session_;
uint64_t startTime_;
};
class ExternalHeader :

View File

@ -1,925 +0,0 @@
#ifndef SRC_NODE_HTTP2_CORE_INL_H_
#define SRC_NODE_HTTP2_CORE_INL_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "node_http2_core.h"
#include "node_internals.h" // arraysize
#include <algorithm>
namespace node {
namespace http2 {
#ifdef NODE_DEBUG_HTTP2
inline int Nghttp2Session::OnNghttpError(nghttp2_session* session,
const char* message,
size_t len,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: Error '%.*s'\n",
handle->TypeName(), len, message);
return 0;
}
#endif
inline int32_t GetFrameID(const nghttp2_frame* frame) {
// If this is a push promise, we want to grab the id of the promised stream
return (frame->hd.type == NGHTTP2_PUSH_PROMISE) ?
frame->push_promise.promised_stream_id :
frame->hd.stream_id;
}
// nghttp2 calls this at the beginning a new HEADERS or PUSH_PROMISE frame.
// We use it to ensure that an Nghttp2Stream instance is allocated to store
// the state.
inline int Nghttp2Session::OnBeginHeadersCallback(nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
int32_t id = GetFrameID(frame);
DEBUG_HTTP2("Nghttp2Session %s: beginning headers for stream %d\n",
handle->TypeName(), id);
Nghttp2Stream* stream = handle->FindStream(id);
if (stream == nullptr) {
new Nghttp2Stream(id, handle, frame->headers.cat);
} else {
stream->StartHeaders(frame->headers.cat);
}
return 0;
}
inline size_t GetBufferLength(nghttp2_rcbuf* buf) {
return nghttp2_rcbuf_get_buf(buf).len;
}
inline bool Nghttp2Stream::AddHeader(nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags) {
size_t length = GetBufferLength(name) + GetBufferLength(value) + 32;
if (current_headers_.size() == max_header_pairs_ ||
current_headers_length_ + length > max_header_length_) {
return false;
}
nghttp2_header header;
header.name = name;
header.value = value;
header.flags = flags;
current_headers_.push_back(header);
nghttp2_rcbuf_incref(name);
nghttp2_rcbuf_incref(value);
current_headers_length_ += length;
return true;
}
// nghttp2 calls this once for every header name-value pair in a HEADERS
// or PUSH_PROMISE block. CONTINUATION frames are handled automatically
// and transparently so we do not need to worry about those at all.
inline int Nghttp2Session::OnHeaderCallback(nghttp2_session* session,
const nghttp2_frame* frame,
nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
int32_t id = GetFrameID(frame);
Nghttp2Stream* stream = handle->FindStream(id);
if (!stream->AddHeader(name, value, flags)) {
// This will only happen if the connected peer sends us more
// than the allowed number of header items at any given time
stream->SubmitRstStream(NGHTTP2_ENHANCE_YOUR_CALM);
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
return 0;
}
// When nghttp2 has completely processed a frame, it calls OnFrameReceive.
// It is our responsibility to delegate out from there. We can ignore most
// control frames since nghttp2 will handle those for us.
inline int Nghttp2Session::OnFrameReceive(nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: complete frame received: type: %d\n",
handle->TypeName(), frame->hd.type);
bool ack;
switch (frame->hd.type) {
case NGHTTP2_DATA:
handle->HandleDataFrame(frame);
break;
case NGHTTP2_PUSH_PROMISE:
// Intentional fall-through, handled just like headers frames
case NGHTTP2_HEADERS:
handle->HandleHeadersFrame(frame);
break;
case NGHTTP2_SETTINGS:
ack = (frame->hd.flags & NGHTTP2_FLAG_ACK) == NGHTTP2_FLAG_ACK;
handle->OnSettings(ack);
break;
case NGHTTP2_PRIORITY:
handle->HandlePriorityFrame(frame);
break;
case NGHTTP2_GOAWAY:
handle->HandleGoawayFrame(frame);
break;
default:
break;
}
return 0;
}
// nghttp2 will call this if an error occurs attempting to send a frame.
// Unless the stream or session is closed, this really should not happen
// unless there is a serious flaw in our implementation.
inline int Nghttp2Session::OnFrameNotSent(nghttp2_session* session,
const nghttp2_frame* frame,
int error_code,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: frame type %d was not sent, code: %d\n",
handle->TypeName(), frame->hd.type, error_code);
// Do not report if the frame was not sent due to the session closing
if (error_code != NGHTTP2_ERR_SESSION_CLOSING &&
error_code != NGHTTP2_ERR_STREAM_CLOSED &&
error_code != NGHTTP2_ERR_STREAM_CLOSING) {
handle->OnFrameError(frame->hd.stream_id,
frame->hd.type,
error_code);
}
return 0;
}
inline int Nghttp2Session::OnInvalidHeader(nghttp2_session* session,
const nghttp2_frame* frame,
nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags,
void* user_data) {
// Ignore invalid header fields by default.
return 0;
}
// Called when nghttp2 closes a stream, either in response to an RST_STREAM
// frame or the stream closing naturally on it's own
inline int Nghttp2Session::OnStreamClose(nghttp2_session* session,
int32_t id,
uint32_t code,
void* user_data) {
Nghttp2Session*handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: stream %d closed, code: %d\n",
handle->TypeName(), id, code);
Nghttp2Stream* stream = handle->FindStream(id);
// Intentionally ignore the callback if the stream does not exist
if (stream != nullptr)
stream->Close(code);
return 0;
}
// Called by nghttp2 to collect the data while a file response is sent.
// The buf is the DATA frame buffer that needs to be filled with at most
// length bytes. flags is used to control what nghttp2 does next.
inline ssize_t Nghttp2Session::OnStreamReadFD(nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: reading outbound file data for stream %d\n",
handle->TypeName(), id);
Nghttp2Stream* stream = handle->FindStream(id);
int fd = source->fd;
int64_t offset = stream->fd_offset_;
ssize_t numchars = 0;
if (stream->fd_length_ >= 0 &&
stream->fd_length_ < static_cast<int64_t>(length))
length = stream->fd_length_;
uv_buf_t data;
data.base = reinterpret_cast<char*>(buf);
data.len = length;
uv_fs_t read_req;
if (length > 0) {
// TODO(addaleax): Never use synchronous I/O on the main thread.
numchars = uv_fs_read(handle->event_loop(),
&read_req,
fd, &data, 1,
offset, nullptr);
uv_fs_req_cleanup(&read_req);
}
// Close the stream with an error if reading fails
if (numchars < 0)
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
// Update the read offset for the next read
stream->fd_offset_ += numchars;
stream->fd_length_ -= numchars;
// if numchars < length, assume that we are done.
if (static_cast<size_t>(numchars) < length || length <= 0) {
DEBUG_HTTP2("Nghttp2Session %s: no more data for stream %d\n",
handle->TypeName(), id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
GetTrailers(session, handle, stream, flags);
}
return numchars;
}
// Called by nghttp2 to collect the data to pack within a DATA frame.
// The buf is the DATA frame buffer that needs to be filled with at most
// length bytes. flags is used to control what nghttp2 does next.
inline ssize_t Nghttp2Session::OnStreamRead(nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: reading outbound data for stream %d\n",
handle->TypeName(), id);
Nghttp2Stream* stream = handle->FindStream(id);
size_t remaining = length;
size_t offset = 0;
// While there is data in the queue, copy data into buf until it is full.
// There may be data left over, which will be sent the next time nghttp
// calls this callback.
while (!stream->queue_.empty()) {
DEBUG_HTTP2("Nghttp2Session %s: processing outbound data chunk\n",
handle->TypeName());
nghttp2_stream_write* head = stream->queue_.front();
while (stream->queue_index_ < head->nbufs) {
if (remaining == 0)
goto end;
unsigned int n = stream->queue_index_;
// len is the number of bytes in head->bufs[n] that are yet to be written
size_t len = head->bufs[n].len - stream->queue_offset_;
size_t bytes_to_write = len < remaining ? len : remaining;
memcpy(buf + offset,
head->bufs[n].base + stream->queue_offset_,
bytes_to_write);
offset += bytes_to_write;
remaining -= bytes_to_write;
if (bytes_to_write < len) {
stream->queue_offset_ += bytes_to_write;
} else {
stream->queue_index_++;
stream->queue_offset_ = 0;
}
}
stream->queue_offset_ = 0;
stream->queue_index_ = 0;
head->cb(head->req, 0);
delete head;
stream->queue_.pop();
}
end:
// If we are no longer writable and there is no more data in the queue,
// then we need to set the NGHTTP2_DATA_FLAG_EOF flag.
// If we are still writable but there is not yet any data to send, set the
// NGHTTP2_ERR_DEFERRED flag. This will put the stream into a pending state
// that will wait for data to become available.
// If neither of these flags are set, then nghttp2 will call this callback
// again to get the data for the next DATA frame.
int writable = !stream->queue_.empty() || stream->IsWritable();
if (offset == 0 && writable && stream->queue_.empty()) {
DEBUG_HTTP2("Nghttp2Session %s: deferring stream %d\n",
handle->TypeName(), id);
return NGHTTP2_ERR_DEFERRED;
}
if (!writable) {
DEBUG_HTTP2("Nghttp2Session %s: no more data for stream %d\n",
handle->TypeName(), id);
*flags |= NGHTTP2_DATA_FLAG_EOF;
GetTrailers(session, handle, stream, flags);
}
#if defined(DEBUG) && DEBUG
CHECK(offset <= length);
#endif
return offset;
}
// Called by nghttp2 when it needs to determine how much padding to apply
// to a DATA or HEADERS frame
inline ssize_t Nghttp2Session::OnSelectPadding(nghttp2_session* session,
const nghttp2_frame* frame,
size_t maxPayloadLen,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
#if defined(DEBUG) && DEBUG
CHECK(handle->HasGetPaddingCallback());
#endif
ssize_t padding = handle->GetPadding(frame->hd.length, maxPayloadLen);
DEBUG_HTTP2("Nghttp2Session %s: using padding, size: %d\n",
handle->TypeName(), padding);
return padding;
}
// While nghttp2 is processing a DATA frame, it will call the
// OnDataChunkReceived callback multiple times, passing along individual
// chunks of data from the DATA frame payload. These *must* be memcpy'd
// out because the pointer to the data will quickly become invalid.
inline int Nghttp2Session::OnDataChunkReceived(nghttp2_session* session,
uint8_t flags,
int32_t id,
const uint8_t* data,
size_t len,
void* user_data) {
Nghttp2Session* handle = static_cast<Nghttp2Session*>(user_data);
DEBUG_HTTP2("Nghttp2Session %s: buffering data chunk for stream %d, size: "
"%d, flags: %d\n", handle->TypeName(),
id, len, flags);
// We should never actually get a 0-length chunk so this check is
// only a precaution at this point.
if (len > 0) {
nghttp2_session_consume_connection(session, len);
Nghttp2Stream* stream = handle->FindStream(id);
char* buf = Malloc<char>(len);
memcpy(buf, data, len);
stream->data_chunks_.emplace(uv_buf_init(buf, len));
}
return 0;
}
// Only when we are done sending the last chunk of data do we check for
// any trailing headers that are to be sent. This is the only opportunity
// we have to make this check. If there are trailers, then the
// NGHTTP2_DATA_FLAG_NO_END_STREAM flag must be set.
inline void Nghttp2Session::GetTrailers(nghttp2_session* session,
Nghttp2Session* handle,
Nghttp2Stream* stream,
uint32_t* flags) {
if (stream->GetTrailers()) {
SubmitTrailers submit_trailers{handle, stream, flags};
handle->OnTrailers(stream, submit_trailers);
}
}
// Submits any trailing header fields that have been collected
inline void Nghttp2Session::SubmitTrailers::Submit(nghttp2_nv* trailers,
size_t length) const {
if (length == 0)
return;
DEBUG_HTTP2("Nghttp2Session %s: sending trailers for stream %d, "
"count: %d\n", handle_->TypeName(),
stream_->id(), length);
*flags_ |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
nghttp2_submit_trailer(handle_->session_,
stream_->id(),
trailers,
length);
}
// Submits a graceful shutdown notice to nghttp
// See: https://nghttp2.org/documentation/nghttp2_submit_shutdown_notice.html
inline void Nghttp2Session::SubmitShutdownNotice() {
DEBUG_HTTP2("Nghttp2Session %s: submitting shutdown notice\n",
TypeName());
nghttp2_submit_shutdown_notice(session_);
}
// Sends a SETTINGS frame on the current session
// Note that this *should* send a SETTINGS frame even if niv == 0 and there
// are no settings entries to send.
inline int Nghttp2Session::SubmitSettings(const nghttp2_settings_entry iv[],
size_t niv) {
DEBUG_HTTP2("Nghttp2Session %s: submitting settings, count: %d\n",
TypeName(), niv);
return nghttp2_submit_settings(session_, NGHTTP2_FLAG_NONE, iv, niv);
}
// Returns the Nghttp2Stream associated with the given id, or nullptr if none
inline Nghttp2Stream* Nghttp2Session::FindStream(int32_t id) {
auto s = streams_.find(id);
if (s != streams_.end()) {
DEBUG_HTTP2("Nghttp2Session %s: stream %d found\n",
TypeName(), id);
return s->second;
} else {
DEBUG_HTTP2("Nghttp2Session %s: stream %d not found\n", TypeName(), id);
return nullptr;
}
}
// Flushes one buffered data chunk at a time.
inline void Nghttp2Stream::FlushDataChunks() {
if (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
data_chunks_.pop();
if (buf.len > 0) {
nghttp2_session_consume_stream(session_->session(), id_, buf.len);
session_->OnDataChunk(this, &buf);
} else {
session_->OnDataChunk(this, nullptr);
}
}
}
// Called when a DATA frame has been completely processed. Will check to
// see if the END_STREAM flag is set, and will flush the queued data chunks
// to JS if the stream is flowing
inline void Nghttp2Session::HandleDataFrame(const nghttp2_frame* frame) {
int32_t id = GetFrameID(frame);
DEBUG_HTTP2("Nghttp2Session %s: handling data frame for stream %d\n",
TypeName(), id);
Nghttp2Stream* stream = this->FindStream(id);
// If the stream does not exist, something really bad happened
#if defined(DEBUG) && DEBUG
CHECK_NE(stream, nullptr);
#endif
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
stream->data_chunks_.emplace(uv_buf_init(0, 0));
if (stream->IsReading())
stream->FlushDataChunks();
}
// Passes all of the collected headers for a HEADERS frame out to the JS layer.
// The headers are collected as the frame is being processed and sent out
// to the JS side only when the frame is fully processed.
inline void Nghttp2Session::HandleHeadersFrame(const nghttp2_frame* frame) {
int32_t id = GetFrameID(frame);
DEBUG_HTTP2("Nghttp2Session %s: handling headers frame for stream %d\n",
TypeName(), id);
Nghttp2Stream* stream = FindStream(id);
// If the stream does not exist, something really bad happened
#if defined(DEBUG) && DEBUG
CHECK_NE(stream, nullptr);
#endif
OnHeaders(stream,
stream->headers(),
stream->headers_count(),
stream->headers_category(),
frame->hd.flags);
}
// Notifies the JS layer that a PRIORITY frame has been received
inline void Nghttp2Session::HandlePriorityFrame(const nghttp2_frame* frame) {
nghttp2_priority priority_frame = frame->priority;
int32_t id = GetFrameID(frame);
DEBUG_HTTP2("Nghttp2Session %s: handling priority frame for stream %d\n",
TypeName(), id);
// Priority frame stream ID should never be <= 0. nghttp2 handles this
// as an error condition that terminates the session, so we should be
// good here
#if defined(DEBUG) && DEBUG
CHECK_GT(id, 0);
#endif
nghttp2_priority_spec spec = priority_frame.pri_spec;
OnPriority(id, spec.stream_id, spec.weight, spec.exclusive);
}
// Notifies the JS layer that a GOAWAY frame has been received
inline void Nghttp2Session::HandleGoawayFrame(const nghttp2_frame* frame) {
nghttp2_goaway goaway_frame = frame->goaway;
DEBUG_HTTP2("Nghttp2Session %s: handling goaway frame\n", TypeName());
OnGoAway(goaway_frame.last_stream_id,
goaway_frame.error_code,
goaway_frame.opaque_data,
goaway_frame.opaque_data_len);
}
// Prompts nghttp2 to flush the queue of pending data frames
inline void Nghttp2Session::SendPendingData() {
DEBUG_HTTP2("Nghttp2Session %s: Sending pending data\n", TypeName());
// Do not attempt to send data on the socket if the destroying flag has
// been set. That means everything is shutting down and the socket
// will not be usable.
if (IsDestroying())
return;
WriteWrap* req = nullptr;
char* dest = nullptr;
size_t destRemaining = 0;
size_t destLength = 0; // amount of data stored in dest
size_t destOffset = 0; // current write offset of dest
const uint8_t* src; // pointer to the serialized data
ssize_t srcLength = 0; // length of serialized data chunk
// While srcLength is greater than zero
while ((srcLength = nghttp2_session_mem_send(session_, &src)) > 0) {
if (req == nullptr) {
req = AllocateSend();
destRemaining = req->ExtraSize();
dest = req->Extra();
}
DEBUG_HTTP2("Nghttp2Session %s: nghttp2 has %d bytes to send\n",
TypeName(), srcLength);
size_t srcRemaining = srcLength;
size_t srcOffset = 0;
// The amount of data we have to copy is greater than the space
// remaining. Copy what we can into the remaining space, send it,
// the proceed with the rest.
while (srcRemaining > destRemaining) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
TypeName(), destLength + destRemaining);
memcpy(dest + destOffset, src + srcOffset, destRemaining);
destLength += destRemaining;
Send(req, dest, destLength);
destOffset = 0;
destLength = 0;
srcRemaining -= destRemaining;
srcOffset += destRemaining;
req = AllocateSend();
destRemaining = req->ExtraSize();
dest = req->Extra();
}
if (srcRemaining > 0) {
memcpy(dest + destOffset, src + srcOffset, srcRemaining);
destLength += srcRemaining;
destOffset += srcRemaining;
destRemaining -= srcRemaining;
srcRemaining = 0;
srcOffset = 0;
}
}
if (destLength > 0) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
TypeName(), destLength);
Send(req, dest, destLength);
}
}
// Initialize the Nghttp2Session handle by creating and
// assigning the Nghttp2Session instance and associated
// uv_loop_t.
inline int Nghttp2Session::Init(const nghttp2_session_type type,
nghttp2_option* options,
nghttp2_mem* mem,
uint32_t maxHeaderPairs) {
session_type_ = type;
DEBUG_HTTP2("Nghttp2Session %s: initializing session\n", TypeName());
destroying_ = false;
max_header_pairs_ = maxHeaderPairs;
nghttp2_session_callbacks* callbacks
= callback_struct_saved[HasGetPaddingCallback() ? 1 : 0].callbacks;
CHECK_NE(options, nullptr);
typedef int (*init_fn)(nghttp2_session** session,
const nghttp2_session_callbacks* callbacks,
void* user_data,
const nghttp2_option* options,
nghttp2_mem* mem);
init_fn fn = type == NGHTTP2_SESSION_SERVER ?
nghttp2_session_server_new3 :
nghttp2_session_client_new3;
return fn(&session_, callbacks, this, options, mem);
}
inline void Nghttp2Session::MarkDestroying() {
destroying_ = true;
}
inline Nghttp2Session::~Nghttp2Session() {
Close();
}
inline void Nghttp2Session::Close() {
if (IsClosed())
return;
DEBUG_HTTP2("Nghttp2Session %s: freeing session\n", TypeName());
nghttp2_session_terminate_session(session_, NGHTTP2_NO_ERROR);
nghttp2_session_del(session_);
session_ = nullptr;
DEBUG_HTTP2("Nghttp2Session %s: session freed\n", TypeName());
}
// Write data received from the socket to the underlying nghttp2_session.
inline ssize_t Nghttp2Session::Write(const uv_buf_t* bufs, unsigned int nbufs) {
size_t total = 0;
for (unsigned int n = 0; n < nbufs; n++) {
ssize_t ret =
nghttp2_session_mem_recv(session_,
reinterpret_cast<uint8_t*>(bufs[n].base),
bufs[n].len);
if (ret < 0) {
return ret;
} else {
total += ret;
}
}
SendPendingData();
return total;
}
inline void Nghttp2Session::AddStream(Nghttp2Stream* stream) {
streams_[stream->id()] = stream;
}
// Removes a stream instance from this session
inline void Nghttp2Session::RemoveStream(int32_t id) {
streams_.erase(id);
}
// Implementation for Nghttp2Stream functions
Nghttp2Stream::Nghttp2Stream(
int32_t id,
Nghttp2Session* session,
nghttp2_headers_category category,
int options) : id_(id),
session_(session),
current_headers_category_(category) {
// Limit the number of header pairs
max_header_pairs_ = session->GetMaxHeaderPairs();
if (max_header_pairs_ == 0)
max_header_pairs_ = DEFAULT_MAX_HEADER_LIST_PAIRS;
current_headers_.reserve(max_header_pairs_);
// Limit the number of header octets
max_header_length_ =
std::min(
nghttp2_session_get_local_settings(
session->session(),
NGHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE),
MAX_MAX_HEADER_LIST_SIZE);
getTrailers_ = options & STREAM_OPTION_GET_TRAILERS;
if (options & STREAM_OPTION_EMPTY_PAYLOAD)
Shutdown();
session->AddStream(this);
}
inline void Nghttp2Stream::Destroy() {
DEBUG_HTTP2("Nghttp2Stream %d: destroying stream\n", id_);
// Do nothing if this stream instance is already destroyed
if (IsDestroyed())
return;
flags_ |= NGHTTP2_STREAM_FLAG_DESTROYED;
Nghttp2Session* session = this->session_;
if (session != nullptr) {
session_->RemoveStream(this->id());
session_ = nullptr;
}
// Free any remaining incoming data chunks.
while (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
free(buf.base);
data_chunks_.pop();
}
// Free any remaining outgoing data chunks.
while (!queue_.empty()) {
nghttp2_stream_write* head = queue_.front();
head->cb(head->req, UV_ECANCELED);
delete head;
queue_.pop();
}
delete this;
}
// Submit informational headers for a stream.
inline int Nghttp2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
DEBUG_HTTP2("Nghttp2Stream %d: sending informational headers, count: %d\n",
id_, len);
CHECK_GT(len, 0);
return nghttp2_submit_headers(session_->session(),
NGHTTP2_FLAG_NONE,
id_, nullptr,
nva, len, nullptr);
}
inline int Nghttp2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
bool silent) {
DEBUG_HTTP2("Nghttp2Stream %d: sending priority spec\n", id_);
return silent ?
nghttp2_session_change_stream_priority(session_->session(),
id_, prispec) :
nghttp2_submit_priority(session_->session(),
NGHTTP2_FLAG_NONE,
id_, prispec);
}
// Submit an RST_STREAM frame
inline int Nghttp2Stream::SubmitRstStream(const uint32_t code) {
DEBUG_HTTP2("Nghttp2Stream %d: sending rst-stream, code: %d\n", id_, code);
session_->SendPendingData();
return nghttp2_submit_rst_stream(session_->session(),
NGHTTP2_FLAG_NONE,
id_,
code);
}
// Submit a push promise.
inline int32_t Nghttp2Stream::SubmitPushPromise(
nghttp2_nv* nva,
size_t len,
Nghttp2Stream** assigned,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
#endif
DEBUG_HTTP2("Nghttp2Stream %d: sending push promise\n", id_);
int32_t ret = nghttp2_submit_push_promise(session_->session(),
NGHTTP2_FLAG_NONE,
id_, nva, len,
nullptr);
if (ret > 0) {
auto stream = new Nghttp2Stream(ret, session_,
NGHTTP2_HCAT_HEADERS,
options);
if (assigned != nullptr) *assigned = stream;
}
return ret;
}
// Initiate a response. If the nghttp2_stream is still writable by
// the time this is called, then an nghttp2_data_provider will be
// initialized, causing at least one (possibly empty) data frame to
// be sent.
inline int Nghttp2Stream::SubmitResponse(nghttp2_nv* nva,
size_t len,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
#endif
DEBUG_HTTP2("Nghttp2Stream %d: submitting response\n", id_);
getTrailers_ = options & STREAM_OPTION_GET_TRAILERS;
nghttp2_data_provider* provider = nullptr;
nghttp2_data_provider prov;
prov.source.ptr = this;
prov.read_callback = Nghttp2Session::OnStreamRead;
if (IsWritable() && !(options & STREAM_OPTION_EMPTY_PAYLOAD))
provider = &prov;
return nghttp2_submit_response(session_->session(), id_,
nva, len, provider);
}
// Initiate a response that contains data read from a file descriptor.
inline int Nghttp2Stream::SubmitFile(int fd,
nghttp2_nv* nva, size_t len,
int64_t offset,
int64_t length,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
CHECK_GT(fd, 0);
#endif
DEBUG_HTTP2("Nghttp2Stream %d: submitting file\n", id_);
getTrailers_ = options & STREAM_OPTION_GET_TRAILERS;
nghttp2_data_provider prov;
prov.source.fd = fd;
prov.read_callback = Nghttp2Session::OnStreamReadFD;
if (offset > 0) fd_offset_ = offset;
if (length > -1) fd_length_ = length;
return nghttp2_submit_response(session_->session(), id_,
nva, len, &prov);
}
// Initiate a request. If writable is true (the default), then
// an nghttp2_data_provider will be initialized, causing at
// least one (possibly empty) data frame to to be sent.
inline int32_t Nghttp2Session::SubmitRequest(
nghttp2_priority_spec* prispec,
nghttp2_nv* nva,
size_t len,
Nghttp2Stream** assigned,
int options) {
#if defined(DEBUG) && DEBUG
CHECK_GT(len, 0);
#endif
DEBUG_HTTP2("Nghttp2Session: submitting request\n");
nghttp2_data_provider* provider = nullptr;
nghttp2_data_provider prov;
prov.source.ptr = this;
prov.read_callback = OnStreamRead;
if (!(options & STREAM_OPTION_EMPTY_PAYLOAD))
provider = &prov;
int32_t ret = nghttp2_submit_request(session_,
prispec, nva, len,
provider, nullptr);
// Assign the Nghttp2Stream handle
if (ret > 0) {
auto stream = new Nghttp2Stream(ret, this, NGHTTP2_HCAT_HEADERS, options);
if (assigned != nullptr) *assigned = stream;
}
return ret;
}
// Queue the given set of uv_but_t handles for writing to an
// nghttp2_stream. The callback will be invoked once the chunks
// of data have been flushed to the underlying nghttp2_session.
// Note that this does *not* mean that the data has been flushed
// to the socket yet.
inline int Nghttp2Stream::Write(nghttp2_stream_write_t* req,
const uv_buf_t bufs[],
unsigned int nbufs,
nghttp2_stream_write_cb cb) {
if (!IsWritable()) {
if (cb != nullptr)
cb(req, UV_EOF);
return 0;
}
DEBUG_HTTP2("Nghttp2Stream %d: queuing buffers to send, count: %d\n",
id_, nbufs);
nghttp2_stream_write* item = new nghttp2_stream_write;
item->cb = cb;
item->req = req;
item->nbufs = nbufs;
item->bufs.AllocateSufficientStorage(nbufs);
memcpy(*(item->bufs), bufs, nbufs * sizeof(*bufs));
queue_.push(item);
nghttp2_session_resume_data(session_->session(), id_);
return 0;
}
inline void Nghttp2Stream::ReadStart() {
if (IsReading())
return;
DEBUG_HTTP2("Nghttp2Stream %d: start reading\n", id_);
flags_ |= NGHTTP2_STREAM_FLAG_READ_START;
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
// Flush any queued data chunks immediately out to the JS layer
FlushDataChunks();
}
inline void Nghttp2Stream::ReadResume() {
DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_);
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
// Flush any queued data chunks immediately out to the JS layer
FlushDataChunks();
}
inline void Nghttp2Stream::ReadStop() {
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
if (!IsReading())
return;
flags_ |= NGHTTP2_STREAM_FLAG_READ_PAUSED;
}
Nghttp2Session::Callbacks::Callbacks(bool kHasGetPaddingCallback) {
nghttp2_session_callbacks_new(&callbacks);
nghttp2_session_callbacks_set_on_begin_headers_callback(
callbacks, OnBeginHeadersCallback);
nghttp2_session_callbacks_set_on_header_callback2(
callbacks, OnHeaderCallback);
nghttp2_session_callbacks_set_on_frame_recv_callback(
callbacks, OnFrameReceive);
nghttp2_session_callbacks_set_on_stream_close_callback(
callbacks, OnStreamClose);
nghttp2_session_callbacks_set_on_data_chunk_recv_callback(
callbacks, OnDataChunkReceived);
nghttp2_session_callbacks_set_on_frame_not_send_callback(
callbacks, OnFrameNotSent);
nghttp2_session_callbacks_set_on_invalid_header_callback2(
callbacks, OnInvalidHeader);
#ifdef NODE_DEBUG_HTTP2
nghttp2_session_callbacks_set_error_callback(
callbacks, OnNghttpError);
#endif
if (kHasGetPaddingCallback) {
nghttp2_session_callbacks_set_select_padding_callback(
callbacks, OnSelectPadding);
}
}
Nghttp2Session::Callbacks::~Callbacks() {
nghttp2_session_callbacks_del(callbacks);
}
Nghttp2Session::SubmitTrailers::SubmitTrailers(
Nghttp2Session* handle,
Nghttp2Stream* stream,
uint32_t* flags)
: handle_(handle), stream_(stream), flags_(flags) { }
} // namespace http2
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#endif // SRC_NODE_HTTP2_CORE_INL_H_

View File

@ -1,516 +0,0 @@
#ifndef SRC_NODE_HTTP2_CORE_H_
#define SRC_NODE_HTTP2_CORE_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "stream_base.h"
#include "util-inl.h"
#include "uv.h"
#include "nghttp2/nghttp2.h"
#include <queue>
#include <vector>
#include <stdio.h>
#include <unordered_map>
namespace node {
namespace http2 {
#ifdef NODE_DEBUG_HTTP2
// Adapted from nghttp2 own debug printer
static inline void _debug_vfprintf(const char* fmt, va_list args) {
vfprintf(stderr, fmt, args);
}
void inline debug_vfprintf(const char* format, ...) {
va_list args;
va_start(args, format);
_debug_vfprintf(format, args);
va_end(args);
}
#define DEBUG_HTTP2(...) debug_vfprintf(__VA_ARGS__);
#else
#define DEBUG_HTTP2(...) \
do { \
} while (0)
#endif
#define DEFAULT_SETTINGS_HEADER_TABLE_SIZE 4096
#define DEFAULT_SETTINGS_ENABLE_PUSH 1
#define DEFAULT_SETTINGS_INITIAL_WINDOW_SIZE 65535
#define DEFAULT_SETTINGS_MAX_FRAME_SIZE 16384
#define DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE 65535
#define MAX_MAX_FRAME_SIZE 16777215
#define MIN_MAX_FRAME_SIZE DEFAULT_SETTINGS_MAX_FRAME_SIZE
#define MAX_INITIAL_WINDOW_SIZE 2147483647
#define MAX_MAX_HEADER_LIST_SIZE 16777215u
#define DEFAULT_MAX_HEADER_LIST_PAIRS 128u
class Nghttp2Session;
class Nghttp2Stream;
struct nghttp2_stream_write_t;
#define MAX_BUFFER_COUNT 16
enum nghttp2_session_type {
NGHTTP2_SESSION_SERVER,
NGHTTP2_SESSION_CLIENT
};
enum nghttp2_shutdown_flags {
NGHTTP2_SHUTDOWN_FLAG_GRACEFUL
};
enum nghttp2_stream_flags {
NGHTTP2_STREAM_FLAG_NONE = 0x0,
// Writable side has ended
NGHTTP2_STREAM_FLAG_SHUT = 0x1,
// Reading has started
NGHTTP2_STREAM_FLAG_READ_START = 0x2,
// Reading is paused
NGHTTP2_STREAM_FLAG_READ_PAUSED = 0x4,
// Stream is closed
NGHTTP2_STREAM_FLAG_CLOSED = 0x8,
// Stream is destroyed
NGHTTP2_STREAM_FLAG_DESTROYED = 0x10
};
enum nghttp2_stream_options {
STREAM_OPTION_EMPTY_PAYLOAD = 0x1,
STREAM_OPTION_GET_TRAILERS = 0x2,
};
// Callbacks
typedef void (*nghttp2_stream_write_cb)(
nghttp2_stream_write_t* req,
int status);
struct nghttp2_stream_write {
unsigned int nbufs = 0;
nghttp2_stream_write_t* req = nullptr;
nghttp2_stream_write_cb cb = nullptr;
MaybeStackBuffer<uv_buf_t, MAX_BUFFER_COUNT> bufs;
};
struct nghttp2_header {
nghttp2_rcbuf* name = nullptr;
nghttp2_rcbuf* value = nullptr;
uint8_t flags = 0;
};
// Handle Types
class Nghttp2Session {
public:
// Initializes the session instance
inline int Init(
const nghttp2_session_type type = NGHTTP2_SESSION_SERVER,
nghttp2_option* options = nullptr,
nghttp2_mem* mem = nullptr,
uint32_t maxHeaderPairs = DEFAULT_MAX_HEADER_LIST_PAIRS);
// Frees this session instance
inline ~Nghttp2Session();
inline void MarkDestroying();
bool IsDestroying() {
return destroying_;
}
uint32_t GetMaxHeaderPairs() const {
return max_header_pairs_;
}
inline const char* TypeName() {
switch (session_type_) {
case NGHTTP2_SESSION_SERVER: return "server";
case NGHTTP2_SESSION_CLIENT: return "client";
default:
// This should never happen
ABORT();
}
}
// Returns the pointer to the identified stream, or nullptr if
// the stream does not exist
inline Nghttp2Stream* FindStream(int32_t id);
// Submits a new request. If the request is a success, assigned
// will be a pointer to the Nghttp2Stream instance assigned.
// This only works if the session is a client session.
inline int32_t SubmitRequest(
nghttp2_priority_spec* prispec,
nghttp2_nv* nva,
size_t len,
Nghttp2Stream** assigned = nullptr,
int options = 0);
// Submits a notice to the connected peer that the session is in the
// process of shutting down.
inline void SubmitShutdownNotice();
// Submits a SETTINGS frame to the connected peer.
inline int SubmitSettings(const nghttp2_settings_entry iv[], size_t niv);
// Write data to the session
inline ssize_t Write(const uv_buf_t* bufs, unsigned int nbufs);
// Returns the nghttp2 library session
inline nghttp2_session* session() const { return session_; }
inline bool IsClosed() const { return session_ == nullptr; }
nghttp2_session_type type() const {
return session_type_;
}
protected:
// Adds a stream instance to this session
inline void AddStream(Nghttp2Stream* stream);
// Removes a stream instance from this session
inline void RemoveStream(int32_t id);
virtual void OnHeaders(
Nghttp2Stream* stream,
nghttp2_header* headers,
size_t count,
nghttp2_headers_category cat,
uint8_t flags) {}
virtual void OnStreamClose(int32_t id, uint32_t code) {}
virtual void OnDataChunk(Nghttp2Stream* stream,
uv_buf_t* chunk) {}
virtual void OnSettings(bool ack) {}
virtual void OnPriority(int32_t id,
int32_t parent,
int32_t weight,
int8_t exclusive) {}
virtual void OnGoAway(int32_t lastStreamID,
uint32_t errorCode,
uint8_t* data,
size_t length) {}
virtual void OnFrameError(int32_t id,
uint8_t type,
int error_code) {}
virtual ssize_t GetPadding(size_t frameLength,
size_t maxFrameLength) { return 0; }
inline void SendPendingData();
virtual void Send(WriteWrap* req, char* buf, size_t length) = 0;
virtual WriteWrap* AllocateSend() = 0;
virtual bool HasGetPaddingCallback() { return false; }
class SubmitTrailers {
public:
inline void Submit(nghttp2_nv* trailers, size_t length) const;
private:
inline SubmitTrailers(Nghttp2Session* handle,
Nghttp2Stream* stream,
uint32_t* flags);
Nghttp2Session* const handle_;
Nghttp2Stream* const stream_;
uint32_t* const flags_;
friend class Nghttp2Session;
};
virtual void OnTrailers(Nghttp2Stream* stream,
const SubmitTrailers& submit_trailers) {}
virtual uv_loop_t* event_loop() const = 0;
virtual void Close();
private:
inline void HandleHeadersFrame(const nghttp2_frame* frame);
inline void HandlePriorityFrame(const nghttp2_frame* frame);
inline void HandleDataFrame(const nghttp2_frame* frame);
inline void HandleGoawayFrame(const nghttp2_frame* frame);
static inline void GetTrailers(nghttp2_session* session,
Nghttp2Session* handle,
Nghttp2Stream* stream,
uint32_t* flags);
/* callbacks for nghttp2 */
#ifdef NODE_DEBUG_HTTP2
static inline int OnNghttpError(nghttp2_session* session,
const char* message,
size_t len,
void* user_data);
#endif
static inline int OnBeginHeadersCallback(nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data);
static inline int OnHeaderCallback(nghttp2_session* session,
const nghttp2_frame* frame,
nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags,
void* user_data);
static inline int OnFrameReceive(nghttp2_session* session,
const nghttp2_frame* frame,
void* user_data);
static inline int OnFrameNotSent(nghttp2_session* session,
const nghttp2_frame* frame,
int error_code,
void* user_data);
static inline int OnStreamClose(nghttp2_session* session,
int32_t id,
uint32_t code,
void* user_data);
static inline int OnInvalidHeader(nghttp2_session* session,
const nghttp2_frame* frame,
nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags,
void* user_data);
static inline int OnDataChunkReceived(nghttp2_session* session,
uint8_t flags,
int32_t id,
const uint8_t* data,
size_t len,
void* user_data);
static inline ssize_t OnStreamReadFD(nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data);
static inline ssize_t OnStreamRead(nghttp2_session* session,
int32_t id,
uint8_t* buf,
size_t length,
uint32_t* flags,
nghttp2_data_source* source,
void* user_data);
static inline ssize_t OnSelectPadding(nghttp2_session* session,
const nghttp2_frame* frame,
size_t maxPayloadLen,
void* user_data);
struct Callbacks {
inline explicit Callbacks(bool kHasGetPaddingCallback);
inline ~Callbacks();
nghttp2_session_callbacks* callbacks;
};
/* Use callback_struct_saved[kHasGetPaddingCallback ? 1 : 0] */
static Callbacks callback_struct_saved[2];
nghttp2_session* session_;
nghttp2_session_type session_type_;
uint32_t max_header_pairs_ = DEFAULT_MAX_HEADER_LIST_PAIRS;
std::unordered_map<int32_t, Nghttp2Stream*> streams_;
bool destroying_ = false;
friend class Nghttp2Stream;
};
class Nghttp2Stream {
public:
// Resets the state of the stream instance to defaults
Nghttp2Stream(
int32_t id,
Nghttp2Session* session,
nghttp2_headers_category category = NGHTTP2_HCAT_HEADERS,
int options = 0);
inline ~Nghttp2Stream() {}
inline void FlushDataChunks();
// Destroy this stream instance and free all held memory.
// Note that this will free queued outbound and inbound
// data chunks and inbound headers, so it's important not
// to call this until those are fully consumed.
inline void Destroy();
// Returns true if this stream has been destroyed
inline bool IsDestroyed() const {
return flags_ & NGHTTP2_STREAM_FLAG_DESTROYED;
}
// Queue outbound chunks of data to be sent on this stream
inline int Write(
nghttp2_stream_write_t* req,
const uv_buf_t bufs[],
unsigned int nbufs,
nghttp2_stream_write_cb cb);
// Initiate a response on this stream.
inline int SubmitResponse(nghttp2_nv* nva,
size_t len,
int options);
// Send data read from a file descriptor as the response on this stream.
inline int SubmitFile(int fd,
nghttp2_nv* nva, size_t len,
int64_t offset,
int64_t length,
int options);
// Submit informational headers for this stream
inline int SubmitInfo(nghttp2_nv* nva, size_t len);
// Submit a PRIORITY frame for this stream
inline int SubmitPriority(nghttp2_priority_spec* prispec,
bool silent = false);
// Submits an RST_STREAM frame using the given code
inline int SubmitRstStream(const uint32_t code);
// Submits a PUSH_PROMISE frame with this stream as the parent.
inline int SubmitPushPromise(
nghttp2_nv* nva,
size_t len,
Nghttp2Stream** assigned = nullptr,
int options = 0);
// Marks the Writable side of the stream as being shutdown
inline void Shutdown() {
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
nghttp2_session_resume_data(session_->session(), id_);
}
// Returns true if this stream is writable.
inline bool IsWritable() const {
return !(flags_ & NGHTTP2_STREAM_FLAG_SHUT);
}
// Start Reading. If there are queued data chunks, they are pushed into
// the session to be emitted at the JS side
inline void ReadStart();
// Resume Reading
inline void ReadResume();
// Stop/Pause Reading.
inline void ReadStop();
// Returns true if reading is paused
inline bool IsPaused() const {
return flags_ & NGHTTP2_STREAM_FLAG_READ_PAUSED;
}
inline bool GetTrailers() const {
return getTrailers_;
}
// Returns true if this stream is in the reading state, which occurs when
// the NGHTTP2_STREAM_FLAG_READ_START flag has been set and the
// NGHTTP2_STREAM_FLAG_READ_PAUSED flag is *not* set.
inline bool IsReading() const {
return flags_ & NGHTTP2_STREAM_FLAG_READ_START &&
!(flags_ & NGHTTP2_STREAM_FLAG_READ_PAUSED);
}
inline void Close(int32_t code) {
DEBUG_HTTP2("Nghttp2Stream %d: closing with code %d\n", id_, code);
flags_ |= NGHTTP2_STREAM_FLAG_CLOSED;
code_ = code;
session_->OnStreamClose(id_, code);
DEBUG_HTTP2("Nghttp2Stream %d: closed\n", id_);
}
// Returns true if this stream has been closed either by receiving or
// sending an RST_STREAM frame.
inline bool IsClosed() const {
return flags_ & NGHTTP2_STREAM_FLAG_CLOSED;
}
// Returns the RST_STREAM code used to close this stream
inline int32_t code() const {
return code_;
}
// Returns the stream identifier for this stream
inline int32_t id() const {
return id_;
}
inline bool AddHeader(nghttp2_rcbuf* name,
nghttp2_rcbuf* value,
uint8_t flags);
inline nghttp2_header* headers() {
return current_headers_.data();
}
inline nghttp2_headers_category headers_category() const {
return current_headers_category_;
}
inline size_t headers_count() const {
return current_headers_.size();
}
void StartHeaders(nghttp2_headers_category category) {
DEBUG_HTTP2("Nghttp2Stream %d: starting headers, category: %d\n",
id_, category);
current_headers_length_ = 0;
current_headers_.clear();
current_headers_category_ = category;
}
private:
// The Stream Identifier
int32_t id_;
// The Parent HTTP/2 Session
Nghttp2Session* session_;
// Internal state flags
int flags_ = NGHTTP2_STREAM_FLAG_NONE;
uint32_t max_header_pairs_ = DEFAULT_MAX_HEADER_LIST_PAIRS;
uint32_t max_header_length_ = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE;
// The RST_STREAM code used to close this stream
int32_t code_ = NGHTTP2_NO_ERROR;
// Outbound Data... This is the data written by the JS layer that is
// waiting to be written out to the socket.
std::queue<nghttp2_stream_write*> queue_;
unsigned int queue_index_ = 0;
size_t queue_offset_ = 0;
int64_t fd_offset_ = 0;
int64_t fd_length_ = -1;
// True if this stream will have outbound trailers
bool getTrailers_ = false;
// The Current Headers block... As headers are received for this stream,
// they are temporarily stored here until the OnFrameReceived is called
// signalling the end of the HEADERS frame
nghttp2_headers_category current_headers_category_ = NGHTTP2_HCAT_HEADERS;
uint32_t current_headers_length_ = 0; // total number of octets
std::vector<nghttp2_header> current_headers_;
// Inbound Data... This is the data received via DATA frames for this stream.
std::queue<uv_buf_t> data_chunks_;
friend class Nghttp2Session;
};
struct nghttp2_stream_write_t {
void* data;
int status;
};
} // namespace http2
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#endif // SRC_NODE_HTTP2_CORE_H_

View File

@ -48,6 +48,7 @@ namespace http2 {
IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS,
IDX_OPTIONS_PADDING_STRATEGY,
IDX_OPTIONS_MAX_HEADER_LIST_PAIRS,
IDX_OPTIONS_MAX_OUTSTANDING_PINGS,
IDX_OPTIONS_FLAGS
};

View File

@ -82,7 +82,7 @@ server.listen(0, common.mustCall(() => {
let data = '';
req.setEncoding('utf8');
req.on('data', common.mustCall((d) => data += d));
req.on('data', common.mustCallAtLeast((d) => data += d));
req.on('end', common.mustCall(() => {
assert.strictEqual(data, 'test');
maybeClose();

View File

@ -95,7 +95,6 @@ const { kSocket } = require('internal/http2/util');
common.expectsError(() => client.request(), sessionError);
common.expectsError(() => client.settings({}), sessionError);
common.expectsError(() => client.priority(req, {}), sessionError);
common.expectsError(() => client.shutdown(), sessionError);
// Wait for setImmediate call from destroy() to complete
@ -103,9 +102,7 @@ const { kSocket } = require('internal/http2/util');
setImmediate(() => {
common.expectsError(() => client.request(), sessionError);
common.expectsError(() => client.settings({}), sessionError);
common.expectsError(() => client.priority(req, {}), sessionError);
common.expectsError(() => client.shutdown(), sessionError);
common.expectsError(() => client.rstStream(req), sessionError);
});
req.on(

View File

@ -0,0 +1,27 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const http = require('http');
const http2 = require('http2');
const server = http.createServer(common.mustNotCall());
server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
const req = client.request();
req.on('streamClosed', common.mustCall());
client.on('error', common.expectsError({
code: 'ERR_HTTP2_ERROR',
type: Error,
message: 'Protocol error'
}));
client.on('close', (...args) => {
server.close();
});
}));

View File

@ -11,27 +11,16 @@ if (!common.hasCrypto)
const http2 = require('http2');
// tests error handling within requestOnConnect
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - NGHTTP2_ERR_STREAM_ID_NOT_AVAILABLE (should emit session error)
// - NGHTTP2_ERR_INVALID_ARGUMENT (should emit stream error)
// - every other NGHTTP2 error from binding (should emit session error)
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM',
'NGHTTP2_ERR_STREAM_ID_NOT_AVAILABLE',
'NGHTTP2_ERR_INVALID_ARGUMENT'
];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
},
type: 'session'
},
{
ngError: constants.NGHTTP2_ERR_STREAM_ID_NOT_AVAILABLE,
error: {
@ -40,7 +29,7 @@ const specificTests = [
message: 'No stream ID is available because ' +
'maximum stream ID has been reached'
},
type: 'session'
type: 'stream'
},
{
ngError: constants.NGHTTP2_ERR_INVALID_ARGUMENT,
@ -72,24 +61,15 @@ const tests = specificTests.concat(genericTests);
let currentError;
// mock submitRequest because we only care about testing error handling
Http2Session.prototype.submitRequest = () => currentError;
Http2Session.prototype.request = () => currentError;
const server = http2.createServer(common.mustNotCall());
server.listen(0, common.mustCall(() => runTest(tests.shift())));
function runTest(test) {
const port = server.address().port;
const url = `http://localhost:${port}`;
const headers = {
':path': '/',
':method': 'POST',
':scheme': 'http',
':authority': `localhost:${port}`
};
const client = http2.connect(url);
const req = client.request(headers);
const client = http2.connect(`http://localhost:${server.address().port}`);
const req = client.request({ ':method': 'POST' });
currentError = test.ngError;
req.resume();

View File

@ -25,7 +25,7 @@ server.on('listening', common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
const req = client.request({ ':path': '/' });
client.priority(req, {});
req.priority({});
req.on('response', common.mustCall());
req.resume();

View File

@ -7,6 +7,10 @@ const assert = require('assert');
const h2 = require('http2');
const server = h2.createServer();
server.on('stream', (stream) => {
stream.respond();
stream.end('ok');
});
server.listen(0);
@ -15,15 +19,13 @@ server.on('listening', common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
const req = client.request({ ':path': '/' });
client.rstStream(req, 0);
assert.strictEqual(req.rstCode, 0);
req.rstStream(0);
// make sure that destroy is called
req._destroy = common.mustCall(req._destroy.bind(req));
// second call doesn't do anything
assert.doesNotThrow(() => client.rstStream(req, 8));
assert.strictEqual(req.rstCode, 0);
assert.doesNotThrow(() => req.rstStream(8));
req.on('streamClosed', common.mustCall((code) => {
assert.strictEqual(req.destroyed, true);

View File

@ -1,84 +0,0 @@
'use strict';
const {
constants,
Http2Session,
nghttp2ErrorString
} = process.binding('http2');
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const http2 = require('http2');
// tests error handling within requestOnConnect
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - every other NGHTTP2 error from binding (should emit session error)
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM'
];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
}
}
];
const genericTests = Object.getOwnPropertyNames(constants)
.filter((key) => (
key.indexOf('NGHTTP2_ERR') === 0 && specificTestKeys.indexOf(key) < 0
))
.map((key) => ({
ngError: constants[key],
error: {
code: 'ERR_HTTP2_ERROR',
type: Error,
message: nghttp2ErrorString(constants[key])
}
}));
const tests = specificTests.concat(genericTests);
const server = http2.createServer(common.mustNotCall());
server.on('sessionError', () => {}); // not being tested
server.listen(0, common.mustCall(() => runTest(tests.shift())));
function runTest(test) {
// mock submitSettings because we only care about testing error handling
Http2Session.prototype.submitSettings = () => test.ngError;
const errorMustCall = common.expectsError(test.error);
const errorMustNotCall = common.mustNotCall(
`${test.error.code} should emit on session`
);
const url = `http://localhost:${server.address().port}`;
const client = http2.connect(url, {
settings: {
maxHeaderListSize: 1
}
});
const req = client.request();
req.resume();
req.end();
client.on('error', errorMustCall);
req.on('error', errorMustNotCall);
req.on('end', common.mustCall(() => {
client.destroy();
if (!tests.length) {
server.close();
} else {
runTest(tests.shift());
}
}));
}

View File

@ -19,7 +19,7 @@ server.listen(0, common.mustCall(function() {
server.once('request', common.mustCall(function(request, response) {
let data = '';
request.setEncoding('utf8');
request.on('data', common.mustCall((chunk) => data += chunk));
request.on('data', common.mustCallAtLeast((chunk) => data += chunk));
request.on('end', common.mustCall(() => {
const trailers = request.trailers;
for (const [name, value] of Object.entries(expectedTrailers)) {

View File

@ -104,7 +104,8 @@ assert.doesNotThrow(() => http2.getPackedSettings({ enablePush: false }));
}, common.expectsError({
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError,
message: 'The "buf" argument must be one of type Buffer or Uint8Array'
message:
'The "buf" argument must be one of type Buffer, TypedArray, or DataView'
}));
});

View File

@ -6,29 +6,15 @@ if (!common.hasCrypto)
const http2 = require('http2');
const {
constants,
Http2Session,
Http2Stream,
nghttp2ErrorString
} = process.binding('http2');
// tests error handling within additionalHeaders
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - every other NGHTTP2 error from binding (should emit stream error)
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM'
];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
},
type: 'session'
}
];
const specificTestKeys = [];
const specificTests = [];
const genericTests = Object.getOwnPropertyNames(constants)
.filter((key) => (
@ -50,7 +36,7 @@ const tests = specificTests.concat(genericTests);
let currentError;
// mock sendHeaders because we only care about testing error handling
Http2Session.prototype.sendHeaders = () => currentError.ngError;
Http2Stream.prototype.info = () => currentError.ngError;
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {

View File

@ -16,15 +16,7 @@ server.on(
message: `The "${param}" argument must be of type ${type}`
});
common.expectsError(
() => stream.session.priority(undefined, {}),
invalidArgTypeError('stream', 'Http2Stream')
);
common.expectsError(
() => stream.session.rstStream(undefined),
invalidArgTypeError('stream', 'Http2Stream')
);
common.expectsError(
() => stream.session.rstStream(stream, 'string'),
() => stream.rstStream('string'),
invalidArgTypeError('code', 'number')
);
stream.session.destroy();

View File

@ -0,0 +1,53 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');
const Countdown = require('../common/countdown');
const server = http2.createServer();
server.on('stream', (stream) => {
stream.respond();
stream.end('ok');
});
server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
const nextID = 2 ** 31 - 1;
client.on('connect', () => {
client.setNextStreamID(nextID);
assert.strictEqual(client.state.nextStreamID, nextID);
const countdown = new Countdown(2, common.mustCall(() => {
server.close();
client.destroy();
}));
{
// This one will be ok
const req = client.request();
assert.strictEqual(req.id, nextID);
req.on('error', common.mustNotCall());
req.resume();
req.on('end', () => countdown.dec());
}
{
// This one will error because there are no more stream IDs available
const req = client.request();
req.on('error', common.expectsError({
code: 'ERR_HTTP2_OUT_OF_STREAMS',
type: Error,
message:
'No stream ID is available because maximum stream ID has been reached'
}));
req.on('error', () => countdown.dec());
}
});
}));

View File

@ -0,0 +1,87 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const async_hooks = require('async_hooks');
const assert = require('assert');
const http2 = require('http2');
const pings = new Set();
const events = [0, 0, 0, 0];
const hook = async_hooks.createHook({
init(id, type, trigger, resource) {
if (type === 'HTTP2PING') {
pings.add(id);
events[0]++;
}
},
before(id) {
if (pings.has(id)) {
events[1]++;
}
},
after(id) {
if (pings.has(id)) {
events[2]++;
}
},
destroy(id) {
if (pings.has(id)) {
events[3]++;
}
}
});
hook.enable();
process.on('exit', () => {
assert.deepStrictEqual(events, [4, 4, 4, 4]);
});
const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
assert(stream.session.ping(common.mustCall((err, duration, ret) => {
assert.strictEqual(err, null);
assert.strictEqual(typeof duration, 'number');
assert.strictEqual(ret.length, 8);
stream.end('ok');
})));
stream.respond();
}));
server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`,
{ maxOutstandingPings: 2 });
client.on('connect', common.mustCall(() => {
{
const payload = Buffer.from('abcdefgh');
assert(client.ping(payload, common.mustCall((err, duration, ret) => {
assert.strictEqual(err, null);
assert.strictEqual(typeof duration, 'number');
assert.deepStrictEqual(payload, ret);
})));
}
{
const payload = Buffer.from('abcdefgi');
assert(client.ping(payload, common.mustCall((err, duration, ret) => {
assert.strictEqual(err, null);
assert.strictEqual(typeof duration, 'number');
assert.deepStrictEqual(payload, ret);
})));
}
// Only max 2 pings at a time based on the maxOutstandingPings option
assert(!client.ping(common.expectsError({
code: 'ERR_HTTP2_PING_CANCEL',
type: Error,
message: 'HTTP2 ping cancelled'
})));
const req = client.request();
req.resume();
req.on('end', common.mustCall(() => {
client.destroy();
server.close();
}));
}));
}));

View File

@ -8,6 +8,7 @@ const assert = require('assert');
const http2 = require('http2');
const fs = require('fs');
const path = require('path');
const Countdown = require('../common/countdown');
// piping should work as expected with createWriteStream
@ -31,19 +32,16 @@ server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);
let remaining = 2;
function maybeClose() {
if (--remaining === 0) {
server.close();
client.destroy();
}
}
const countdown = new Countdown(2, common.mustCall(() => {
server.close();
client.destroy();
}));
const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall());
req.resume();
req.on('end', common.mustCall(maybeClose));
req.on('end', common.mustCall(() => countdown.dec()));
const str = fs.createReadStream(loc);
str.on('end', common.mustCall(maybeClose));
str.on('end', common.mustCall(() => countdown.dec()));
str.pipe(req);
}));

View File

@ -1,109 +0,0 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const http2 = require('http2');
const {
constants,
Http2Session,
nghttp2ErrorString
} = process.binding('http2');
// tests error handling within priority
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - every other NGHTTP2 error from binding (should emit stream error)
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM'
];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
},
type: 'session'
}
];
const genericTests = Object.getOwnPropertyNames(constants)
.filter((key) => (
key.indexOf('NGHTTP2_ERR') === 0 && specificTestKeys.indexOf(key) < 0
))
.map((key) => ({
ngError: constants[key],
error: {
code: 'ERR_HTTP2_ERROR',
type: Error,
message: nghttp2ErrorString(constants[key])
},
type: 'stream'
}));
const tests = specificTests.concat(genericTests);
let currentError;
// mock submitPriority because we only care about testing error handling
Http2Session.prototype.submitPriority = () => currentError.ngError;
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {
const errorMustCall = common.expectsError(currentError.error);
const errorMustNotCall = common.mustNotCall(
`${currentError.error.code} should emit on ${currentError.type}`
);
if (currentError.type === 'stream') {
stream.session.on('error', errorMustNotCall);
stream.on('error', errorMustCall);
stream.on('error', common.mustCall(() => {
stream.respond();
stream.end();
}));
} else {
stream.session.once('error', errorMustCall);
stream.on('error', errorMustNotCall);
}
stream.priority({
parent: 0,
weight: 1,
exclusive: false
});
}, tests.length));
server.listen(0, common.mustCall(() => runTest(tests.shift())));
function runTest(test) {
const port = server.address().port;
const url = `http://localhost:${port}`;
const headers = {
':path': '/',
':method': 'POST',
':scheme': 'http',
':authority': `localhost:${port}`
};
const client = http2.connect(url);
const req = client.request(headers);
currentError = test;
req.resume();
req.end();
req.on('end', common.mustCall(() => {
client.destroy();
if (!tests.length) {
server.close();
} else {
runTest(tests.shift());
}
}));
}

View File

@ -6,29 +6,16 @@ if (!common.hasCrypto)
const http2 = require('http2');
const {
constants,
Http2Session,
Http2Stream,
nghttp2ErrorString
} = process.binding('http2');
// tests error handling within respond
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - every other NGHTTP2 error from binding (should emit stream error)
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM'
];
const specificTestKeys = [];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
},
type: 'session'
}
];
const specificTests = [];
const genericTests = Object.getOwnPropertyNames(constants)
.filter((key) => (
@ -50,7 +37,7 @@ const tests = specificTests.concat(genericTests);
let currentError;
// mock submitResponse because we only care about testing error handling
Http2Session.prototype.submitResponse = () => currentError.ngError;
Http2Stream.prototype.respond = () => currentError.ngError;
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {

View File

@ -11,32 +11,18 @@ const http2 = require('http2');
const {
constants,
Http2Session,
Http2Stream,
nghttp2ErrorString
} = process.binding('http2');
// tests error handling within processRespondWithFD
// (called by respondWithFD & respondWithFile)
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - every other NGHTTP2 error from binding (should emit stream error)
const fname = fixtures.path('elipses.txt');
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM'
];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
},
type: 'session'
}
];
const specificTestKeys = [];
const specificTests = [];
const genericTests = Object.getOwnPropertyNames(constants)
.filter((key) => (
@ -57,8 +43,8 @@ const tests = specificTests.concat(genericTests);
let currentError;
// mock submitFile because we only care about testing error handling
Http2Session.prototype.submitFile = () => currentError.ngError;
// mock respondFD because we only care about testing error handling
Http2Stream.prototype.respondFD = () => currentError.ngError;
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {

View File

@ -6,29 +6,15 @@ if (!common.hasCrypto)
const http2 = require('http2');
const {
constants,
Http2Session,
Http2Stream,
nghttp2ErrorString
} = process.binding('http2');
// tests error handling within rstStream
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - every other NGHTTP2 error from binding (should emit stream error)
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM'
];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
},
type: 'session'
}
];
const specificTestKeys = [];
const specificTests = [];
const genericTests = Object.getOwnPropertyNames(constants)
.filter((key) => (
@ -50,7 +36,7 @@ const tests = specificTests.concat(genericTests);
let currentError;
// mock submitRstStream because we only care about testing error handling
Http2Session.prototype.submitRstStream = () => currentError.ngError;
Http2Stream.prototype.rstStream = () => currentError.ngError;
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {

View File

@ -0,0 +1,22 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const http = require('http');
const http2 = require('http2');
const server = http2.createServer();
server.on('stream', common.mustNotCall());
server.on('session', common.mustCall((session) => {
session.on('close', common.mustCall());
}));
server.listen(0, common.mustCall(() => {
const req = http.get(`http://localhost:${server.address().port}`);
req.on('error', (error) => {
server.close();
});
}));

View File

@ -6,32 +6,21 @@ if (!common.hasCrypto)
const http2 = require('http2');
const {
constants,
Http2Session,
Http2Stream,
nghttp2ErrorString
} = process.binding('http2');
// tests error handling within pushStream
// - NGHTTP2_ERR_NOMEM (should emit session error)
// - NGHTTP2_ERR_STREAM_ID_NOT_AVAILABLE (should emit session error)
// - NGHTTP2_ERR_STREAM_CLOSED (should emit stream error)
// - every other NGHTTP2 error from binding (should emit stream error)
const specificTestKeys = [
'NGHTTP2_ERR_NOMEM',
'NGHTTP2_ERR_STREAM_ID_NOT_AVAILABLE',
'NGHTTP2_ERR_STREAM_CLOSED'
];
const specificTests = [
{
ngError: constants.NGHTTP2_ERR_NOMEM,
error: {
code: 'ERR_OUTOFMEMORY',
type: Error,
message: 'Out of memory'
},
type: 'session'
},
{
ngError: constants.NGHTTP2_ERR_STREAM_ID_NOT_AVAILABLE,
error: {
@ -40,7 +29,7 @@ const specificTests = [
message: 'No stream ID is available because ' +
'maximum stream ID has been reached'
},
type: 'session'
type: 'stream'
},
{
ngError: constants.NGHTTP2_ERR_STREAM_CLOSED,
@ -73,7 +62,7 @@ const tests = specificTests.concat(genericTests);
let currentError;
// mock submitPushPromise because we only care about testing error handling
Http2Session.prototype.submitPushPromise = () => currentError.ngError;
Http2Stream.prototype.pushPromise = () => currentError.ngError;
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {

View File

@ -15,7 +15,7 @@ server.on(
// Test that stream.state getter returns an empty object
// when the stream session has been destroyed
assert.deepStrictEqual(Object.create(null), stream.state);
assert.deepStrictEqual({}, stream.state);
// Test that ERR_HTTP2_INVALID_STREAM is thrown while calling
// stream operations after the stream session has been destroyed
@ -31,7 +31,6 @@ server.on(
invalidStreamError
);
common.expectsError(() => stream.respond(), invalidStreamError);
common.expectsError(() => stream.rstStream(), invalidStreamError);
common.expectsError(() => stream.write('data'), invalidStreamError);
// Test that ERR_HTTP2_INVALID_SESSION is thrown while calling
@ -41,17 +40,14 @@ server.on(
code: 'ERR_HTTP2_INVALID_SESSION',
message: 'The session has been destroyed'
};
common.expectsError(() => stream.session.priority(), invalidSessionError);
common.expectsError(() => stream.session.settings(), invalidSessionError);
common.expectsError(() => stream.session.shutdown(), invalidSessionError);
// Wait for setImmediate call from destroy() to complete
// so that state.destroyed is set to true
setImmediate((session) => {
common.expectsError(() => session.priority(), invalidSessionError);
common.expectsError(() => session.settings(), invalidSessionError);
common.expectsError(() => session.shutdown(), invalidSessionError);
common.expectsError(() => session.rstStream(), invalidSessionError);
}, stream.session);
})
);

View File

@ -29,7 +29,7 @@ const tests = Object.getOwnPropertyNames(constants)
let currentError;
// mock submitGoaway because we only care about testing error handling
Http2Session.prototype.submitGoaway = () => currentError.ngError;
Http2Session.prototype.goaway = () => currentError.ngError;
const server = http2.createServer();
server.on('stream', common.mustCall((stream, headers) => {

View File

@ -16,7 +16,8 @@ const IDX_OPTIONS_MAX_SEND_HEADER_BLOCK_LENGTH = 2;
const IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS = 3;
const IDX_OPTIONS_PADDING_STRATEGY = 4;
const IDX_OPTIONS_MAX_HEADER_LIST_PAIRS = 5;
const IDX_OPTIONS_FLAGS = 6;
const IDX_OPTIONS_MAX_OUTSTANDING_PINGS = 6;
const IDX_OPTIONS_FLAGS = 7;
{
updateOptionsBuffer({
@ -25,7 +26,8 @@ const IDX_OPTIONS_FLAGS = 6;
maxSendHeaderBlockLength: 3,
peerMaxConcurrentStreams: 4,
paddingStrategy: 5,
maxHeaderListPairs: 6
maxHeaderListPairs: 6,
maxOutstandingPings: 7
});
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_DEFLATE_DYNAMIC_TABLE_SIZE], 1);
@ -34,6 +36,7 @@ const IDX_OPTIONS_FLAGS = 6;
strictEqual(optionsBuffer[IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS], 4);
strictEqual(optionsBuffer[IDX_OPTIONS_PADDING_STRATEGY], 5);
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_HEADER_LIST_PAIRS], 6);
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_OUTSTANDING_PINGS], 7);
const flags = optionsBuffer[IDX_OPTIONS_FLAGS];
@ -43,10 +46,12 @@ const IDX_OPTIONS_FLAGS = 6;
ok(flags & (1 << IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS));
ok(flags & (1 << IDX_OPTIONS_PADDING_STRATEGY));
ok(flags & (1 << IDX_OPTIONS_MAX_HEADER_LIST_PAIRS));
ok(flags & (1 << IDX_OPTIONS_MAX_OUTSTANDING_PINGS));
}
{
optionsBuffer[IDX_OPTIONS_MAX_SEND_HEADER_BLOCK_LENGTH] = 0;
optionsBuffer[IDX_OPTIONS_MAX_OUTSTANDING_PINGS] = 0;
updateOptionsBuffer({
maxDeflateDynamicTableSize: 1,
@ -58,17 +63,20 @@ const IDX_OPTIONS_FLAGS = 6;
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_DEFLATE_DYNAMIC_TABLE_SIZE], 1);
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_RESERVED_REMOTE_STREAMS], 2);
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_SEND_HEADER_BLOCK_LENGTH], 0);
strictEqual(optionsBuffer[IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS], 4);
strictEqual(optionsBuffer[IDX_OPTIONS_PADDING_STRATEGY], 5);
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_HEADER_LIST_PAIRS], 6);
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_SEND_HEADER_BLOCK_LENGTH], 0);
strictEqual(optionsBuffer[IDX_OPTIONS_MAX_OUTSTANDING_PINGS], 0);
const flags = optionsBuffer[IDX_OPTIONS_FLAGS];
ok(flags & (1 << IDX_OPTIONS_MAX_DEFLATE_DYNAMIC_TABLE_SIZE));
ok(flags & (1 << IDX_OPTIONS_MAX_RESERVED_REMOTE_STREAMS));
ok(!(flags & (1 << IDX_OPTIONS_MAX_SEND_HEADER_BLOCK_LENGTH)));
ok(flags & (1 << IDX_OPTIONS_PEER_MAX_CONCURRENT_STREAMS));
ok(flags & (1 << IDX_OPTIONS_PADDING_STRATEGY));
ok(flags & (1 << IDX_OPTIONS_MAX_HEADER_LIST_PAIRS));
ok(!(flags & (1 << IDX_OPTIONS_MAX_SEND_HEADER_BLOCK_LENGTH)));
ok(!(flags & (1 << IDX_OPTIONS_MAX_OUTSTANDING_PINGS)));
}

View File

@ -23,7 +23,8 @@ const fixtures = require('../common/fixtures');
// TODO(jasnell): Test for these
delete providers.HTTP2SESSION;
delete providers.HTTP2SESSIONSHUTDOWNWRAP;
delete providers.HTTP2STREAM;
delete providers.HTTP2PING;
const obj_keys = Object.keys(providers);
if (obj_keys.length > 0)