src: avoid manual memory management in inspector

Make the inspector code easier to reason about by restructuring it
to avoid manual memory allocation and copying as much as possible.

An amusing side effect is that it reduces the total amount of memory
used in the test suite.

Before:

    $ valgrind ./out/Release/cctest 2>&1 | grep 'total heap' | cut -c31-
    1,017 allocs, 1,017 frees, 21,695,456 allocated

After:

    $ valgrind ./out/Release/cctest 2>&1 | grep 'total heap' | cut -c31-
    869 allocs, 869 frees, 14,484,641 bytes allocated

PR-URL: https://github.com/nodejs/node/pull/7906
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: James M Snell <jasnell@gmail.com>
This commit is contained in:
Ben Noordhuis 2016-07-28 12:14:11 +02:00
parent 0190db44ac
commit c8c1f96abe
4 changed files with 161 additions and 228 deletions

View File

@ -48,8 +48,8 @@ void PrintDebuggerReadyMessage(int port) {
port, DEVTOOLS_HASH, port); port, DEVTOOLS_HASH, port);
} }
bool AcceptsConnection(inspector_socket_t* socket, const char* path) { bool AcceptsConnection(inspector_socket_t* socket, const std::string& path) {
return strncmp(DEVTOOLS_PATH, path, sizeof(DEVTOOLS_PATH)) == 0; return 0 == path.compare(0, sizeof(DEVTOOLS_PATH) - 1, DEVTOOLS_PATH);
} }
void DisposeInspector(inspector_socket_t* socket, int status) { void DisposeInspector(inspector_socket_t* socket, int status) {
@ -63,10 +63,7 @@ void DisconnectAndDisposeIO(inspector_socket_t* socket) {
} }
void OnBufferAlloc(uv_handle_t* handle, size_t len, uv_buf_t* buf) { void OnBufferAlloc(uv_handle_t* handle, size_t len, uv_buf_t* buf) {
if (len > 0) { buf->base = new char[len];
buf->base = static_cast<char*>(malloc(len));
CHECK_NE(buf->base, nullptr);
}
buf->len = len; buf->len = len;
} }
@ -133,18 +130,19 @@ void SendTargentsListResponse(inspector_socket_t* socket, int port) {
SendHttpResponse(socket, buffer, len); SendHttpResponse(socket, buffer, len);
} }
bool RespondToGet(inspector_socket_t* socket, const char* path, int port) { bool RespondToGet(inspector_socket_t* socket, const std::string& path,
int port) {
const char PATH[] = "/json"; const char PATH[] = "/json";
const char PATH_LIST[] = "/json/list"; const char PATH_LIST[] = "/json/list";
const char PATH_VERSION[] = "/json/version"; const char PATH_VERSION[] = "/json/version";
const char PATH_ACTIVATE[] = "/json/activate/"; const char PATH_ACTIVATE[] = "/json/activate/";
if (!strncmp(PATH_VERSION, path, sizeof(PATH_VERSION))) { if (0 == path.compare(0, sizeof(PATH_VERSION) - 1, PATH_VERSION)) {
SendVersionResponse(socket); SendVersionResponse(socket);
} else if (!strncmp(PATH_LIST, path, sizeof(PATH_LIST)) || } else if (0 == path.compare(0, sizeof(PATH_LIST) - 1, PATH_LIST) ||
!strncmp(PATH, path, sizeof(PATH))) { 0 == path.compare(0, sizeof(PATH) - 1, PATH)) {
SendTargentsListResponse(socket, port); SendTargentsListResponse(socket, port);
} else if (!strncmp(path, PATH_ACTIVATE, sizeof(PATH_ACTIVATE) - 1) && } else if (0 == path.compare(0, sizeof(PATH_ACTIVATE) - 1, PATH_ACTIVATE) &&
atoi(path + (sizeof(PATH_ACTIVATE) - 1)) == getpid()) { atoi(path.substr(sizeof(PATH_ACTIVATE) - 1).c_str()) == getpid()) {
const char TARGET_ACTIVATED[] = "Target activated"; const char TARGET_ACTIVATED[] = "Target activated";
SendHttpResponse(socket, TARGET_ACTIVATED, sizeof(TARGET_ACTIVATED) - 1); SendHttpResponse(socket, TARGET_ACTIVATED, sizeof(TARGET_ACTIVATED) - 1);
} else { } else {
@ -181,7 +179,7 @@ class AgentImpl {
static void OnSocketConnectionIO(uv_stream_t* server, int status); static void OnSocketConnectionIO(uv_stream_t* server, int status);
static bool OnInspectorHandshakeIO(inspector_socket_t* socket, static bool OnInspectorHandshakeIO(inspector_socket_t* socket,
enum inspector_handshake_event state, enum inspector_handshake_event state,
const char* path); const std::string& path);
static void WriteCbIO(uv_async_t* async); static void WriteCbIO(uv_async_t* async);
void WorkerRunIO(); void WorkerRunIO();
@ -388,7 +386,6 @@ void AgentImpl::ThreadCbIO(void* agent) {
void AgentImpl::OnSocketConnectionIO(uv_stream_t* server, int status) { void AgentImpl::OnSocketConnectionIO(uv_stream_t* server, int status) {
if (status == 0) { if (status == 0) {
inspector_socket_t* socket = new inspector_socket_t(); inspector_socket_t* socket = new inspector_socket_t();
memset(socket, 0, sizeof(*socket));
socket->data = server->data; socket->data = server->data;
if (inspector_accept(server, socket, if (inspector_accept(server, socket,
AgentImpl::OnInspectorHandshakeIO) != 0) { AgentImpl::OnInspectorHandshakeIO) != 0) {
@ -399,8 +396,8 @@ void AgentImpl::OnSocketConnectionIO(uv_stream_t* server, int status) {
// static // static
bool AgentImpl::OnInspectorHandshakeIO(inspector_socket_t* socket, bool AgentImpl::OnInspectorHandshakeIO(inspector_socket_t* socket,
enum inspector_handshake_event state, enum inspector_handshake_event state,
const char* path) { const std::string& path) {
AgentImpl* agent = static_cast<AgentImpl*>(socket->data); AgentImpl* agent = static_cast<AgentImpl*>(socket->data);
switch (state) { switch (state) {
case kInspectorHandshakeHttpGet: case kInspectorHandshakeHttpGet:
@ -443,7 +440,7 @@ void AgentImpl::OnRemoteDataIO(inspector_socket_t* socket,
DisconnectAndDisposeIO(socket); DisconnectAndDisposeIO(socket);
} }
if (buf) { if (buf) {
free(buf->base); delete[] buf->base;
} }
pause_cond_.Broadcast(scoped_lock); pause_cond_.Broadcast(scoped_lock);
} }

View File

@ -18,25 +18,6 @@
static const char CLOSE_FRAME[] = {'\x88', '\x00'}; static const char CLOSE_FRAME[] = {'\x88', '\x00'};
struct http_parsing_state_s {
http_parser parser;
http_parser_settings parser_settings;
handshake_cb callback;
bool done;
bool parsing_value;
char* ws_key;
char* path;
char* current_header;
};
struct ws_state_s {
uv_alloc_cb alloc_cb;
uv_read_cb read_cb;
inspector_cb close_cb;
bool close_sent;
bool received_close;
};
enum ws_decode_result { enum ws_decode_result {
FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR
}; };
@ -72,11 +53,9 @@ static void dispose_inspector(uv_handle_t* handle) {
reinterpret_cast<inspector_socket_t*>(handle->data); reinterpret_cast<inspector_socket_t*>(handle->data);
inspector_cb close = inspector_cb close =
inspector->ws_mode ? inspector->ws_state->close_cb : nullptr; inspector->ws_mode ? inspector->ws_state->close_cb : nullptr;
free(inspector->buffer); inspector->buffer.clear();
free(inspector->ws_state); delete inspector->ws_state;
inspector->ws_state = nullptr; inspector->ws_state = nullptr;
inspector->buffer = nullptr;
inspector->buffer_size = 0;
inspector->data_len = 0; inspector->data_len = 0;
inspector->last_read_end = 0; inspector->last_read_end = 0;
if (close) { if (close) {
@ -92,11 +71,25 @@ static void close_connection(inspector_socket_t* inspector) {
} }
} }
struct WriteRequest {
WriteRequest(inspector_socket_t* inspector, const char* data, size_t size)
: inspector(inspector)
, storage(data, data + size)
, buf(uv_buf_init(&storage[0], storage.size())) {}
static WriteRequest* from_write_req(uv_write_t* req) {
return node::ContainerOf(&WriteRequest::req, req);
}
inspector_socket_t* const inspector;
std::vector<char> storage;
uv_write_t req;
uv_buf_t buf;
};
// Cleanup // Cleanup
static void write_request_cleanup(uv_write_t* req, int status) { static void write_request_cleanup(uv_write_t* req, int status) {
free((reinterpret_cast<uv_buf_t*>(req->data))->base); delete WriteRequest::from_write_req(req);
free(req->data);
free(req);
} }
static int write_to_client(inspector_socket_t* inspector, static int write_to_client(inspector_socket_t* inspector,
@ -109,21 +102,9 @@ static int write_to_client(inspector_socket_t* inspector,
#endif #endif
// Freed in write_request_cleanup // Freed in write_request_cleanup
uv_buf_t* buf = reinterpret_cast<uv_buf_t*>(malloc(sizeof(uv_buf_t))); WriteRequest* wr = new WriteRequest(inspector, msg, len);
uv_write_t* req = reinterpret_cast<uv_write_t*>(malloc(sizeof(uv_write_t)));
CHECK_NE(buf, nullptr);
CHECK_NE(req, nullptr);
memset(req, 0, sizeof(*req));
buf->base = reinterpret_cast<char*>(malloc(len));
CHECK_NE(buf->base, nullptr);
memcpy(buf->base, msg, len);
buf->len = len;
req->data = buf;
uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&inspector->client); uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&inspector->client);
return uv_write(req, stream, buf, 1, write_cb) < 0; return uv_write(&wr->req, stream, &wr->buf, 1, write_cb) < 0;
} }
// Constants for hybi-10 frame format. // Constants for hybi-10 frame format.
@ -278,10 +259,10 @@ static void shutdown_complete(inspector_socket_t* inspector) {
close_connection(inspector); close_connection(inspector);
} }
static void on_close_frame_written(uv_write_t* write, int status) { static void on_close_frame_written(uv_write_t* req, int status) {
inspector_socket_t* inspector = WriteRequest* wr = WriteRequest::from_write_req(req);
reinterpret_cast<inspector_socket_t*>(write->handle->data); inspector_socket_t* inspector = wr->inspector;
write_request_cleanup(write, status); delete wr;
inspector->ws_state->close_sent = true; inspector->ws_state->close_sent = true;
if (inspector->ws_state->received_close) { if (inspector->ws_state->received_close) {
shutdown_complete(inspector); shutdown_complete(inspector);
@ -304,7 +285,7 @@ static int parse_ws_frames(inspector_socket_t* inspector, size_t len) {
std::vector<char> output; std::vector<char> output;
bool compressed = false; bool compressed = false;
ws_decode_result r = decode_frame_hybi17(inspector->buffer, ws_decode_result r = decode_frame_hybi17(&inspector->buffer[0],
len, true /* client_frame */, len, true /* client_frame */,
&bytes_consumed, &output, &bytes_consumed, &output,
&compressed); &compressed);
@ -334,16 +315,13 @@ static void prepare_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
inspector_socket_t* inspector = inspector_socket_t* inspector =
reinterpret_cast<inspector_socket_t*>(stream->data); reinterpret_cast<inspector_socket_t*>(stream->data);
if (len > (inspector->buffer_size - inspector->data_len)) { if (len > (inspector->buffer.size() - inspector->data_len)) {
int new_size = (inspector->data_len + len + BUFFER_GROWTH_CHUNK_SIZE - 1) / int new_size = (inspector->data_len + len + BUFFER_GROWTH_CHUNK_SIZE - 1) /
BUFFER_GROWTH_CHUNK_SIZE * BUFFER_GROWTH_CHUNK_SIZE *
BUFFER_GROWTH_CHUNK_SIZE; BUFFER_GROWTH_CHUNK_SIZE;
inspector->buffer_size = new_size; inspector->buffer.resize(new_size);
inspector->buffer = reinterpret_cast<char*>(realloc(inspector->buffer,
inspector->buffer_size));
ASSERT_NE(inspector->buffer, nullptr);
} }
buf->base = inspector->buffer + inspector->data_len; buf->base = &inspector->buffer[inspector->data_len];
buf->len = len; buf->len = len;
inspector->data_len += len; inspector->data_len += len;
} }
@ -366,10 +344,10 @@ static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
#endif #endif
// 1. Move read bytes to continue the buffer // 1. Move read bytes to continue the buffer
// Should be same as this is supposedly last buffer // Should be same as this is supposedly last buffer
ASSERT_EQ(buf->base + buf->len, inspector->buffer + inspector->data_len); ASSERT_EQ(buf->base + buf->len, &inspector->buffer[inspector->data_len]);
// Should be noop... // Should be noop...
memmove(inspector->buffer + inspector->last_read_end, buf->base, nread); memmove(&inspector->buffer[inspector->last_read_end], buf->base, nread);
inspector->last_read_end += nread; inspector->last_read_end += nread;
// 2. Parse. // 2. Parse.
@ -378,8 +356,8 @@ static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
processed = parse_ws_frames(inspector, inspector->last_read_end); processed = parse_ws_frames(inspector, inspector->last_read_end);
// 3. Fix the buffer size & length // 3. Fix the buffer size & length
if (processed > 0) { if (processed > 0) {
memmove(inspector->buffer, inspector->buffer + processed, memmove(&inspector->buffer[0], &inspector->buffer[processed],
inspector->last_read_end - processed); inspector->last_read_end - processed);
inspector->last_read_end -= processed; inspector->last_read_end -= processed;
inspector->data_len = inspector->last_read_end; inspector->data_len = inspector->last_read_end;
} }
@ -410,73 +388,53 @@ void inspector_read_stop(inspector_socket_t* inspector) {
inspector->ws_state->read_cb = nullptr; inspector->ws_state->read_cb = nullptr;
} }
static void generate_accept_string(const char* client_key, char* buffer) { static void generate_accept_string(const std::string& client_key,
char (*buffer)[ACCEPT_KEY_LENGTH]) {
// Magic string from websockets spec. // Magic string from websockets spec.
const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; static const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
size_t key_len = strlen(client_key); std::string input(client_key + ws_magic);
size_t magic_len = sizeof(ws_magic) - 1; char hash[SHA_DIGEST_LENGTH];
SHA1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(),
char* buf = reinterpret_cast<char*>(malloc(key_len + magic_len)); reinterpret_cast<unsigned char*>(hash));
CHECK_NE(buf, nullptr); node::base64_encode(hash, sizeof(hash), *buffer, sizeof(*buffer));
memcpy(buf, client_key, key_len);
memcpy(buf + key_len, ws_magic, magic_len);
char hash[20];
SHA1((unsigned char*) buf, key_len + magic_len, (unsigned char*) hash);
free(buf);
node::base64_encode(hash, 20, buffer, ACCEPT_KEY_LENGTH);
buffer[ACCEPT_KEY_LENGTH] = '\0';
}
static void append(char** value, const char* string, size_t length) {
const size_t INCREMENT = 500; // There should never be more then 1 chunk...
int current_len = *value ? strlen(*value) : 0;
int new_len = current_len + length;
int adjusted = (new_len / INCREMENT + 1) * INCREMENT;
*value = reinterpret_cast<char*>(realloc(*value, adjusted));
memcpy(*value + current_len, string, length);
(*value)[new_len] = '\0';
} }
static int header_value_cb(http_parser* parser, const char* at, size_t length) { static int header_value_cb(http_parser* parser, const char* at, size_t length) {
char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key"; static const char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key";
struct http_parsing_state_s* state = (struct http_parsing_state_s*) auto inspector = static_cast<inspector_socket_t*>(parser->data);
(reinterpret_cast<inspector_socket_t*>(parser->data))->http_parsing_state; auto state = inspector->http_parsing_state;
state->parsing_value = true; state->parsing_value = true;
if (state->current_header && if (state->current_header.size() == sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1 &&
node::StringEqualNoCaseN(state->current_header, node::StringEqualNoCaseN(state->current_header.data(),
SEC_WEBSOCKET_KEY_HEADER, SEC_WEBSOCKET_KEY_HEADER,
sizeof(SEC_WEBSOCKET_KEY_HEADER))) { sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1)) {
append(&state->ws_key, at, length); state->ws_key.append(at, length);
} }
return 0; return 0;
} }
static int header_field_cb(http_parser* parser, const char* at, size_t length) { static int header_field_cb(http_parser* parser, const char* at, size_t length) {
struct http_parsing_state_s* state = (struct http_parsing_state_s*) auto inspector = static_cast<inspector_socket_t*>(parser->data);
(reinterpret_cast<inspector_socket_t*>(parser->data))->http_parsing_state; auto state = inspector->http_parsing_state;
if (state->parsing_value) { if (state->parsing_value) {
state->parsing_value = false; state->parsing_value = false;
if (state->current_header) state->current_header.clear();
state->current_header[0] = '\0';
} }
append(&state->current_header, at, length); state->current_header.append(at, length);
return 0; return 0;
} }
static int path_cb(http_parser* parser, const char* at, size_t length) { static int path_cb(http_parser* parser, const char* at, size_t length) {
struct http_parsing_state_s* state = (struct http_parsing_state_s*) auto inspector = static_cast<inspector_socket_t*>(parser->data);
(reinterpret_cast<inspector_socket_t*>(parser->data))->http_parsing_state; auto state = inspector->http_parsing_state;
append(&state->path, at, length); state->path.append(at, length);
return 0; return 0;
} }
static void handshake_complete(inspector_socket_t* inspector) { static void handshake_complete(inspector_socket_t* inspector) {
uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->client)); uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->client));
handshake_cb callback = inspector->http_parsing_state->callback; handshake_cb callback = inspector->http_parsing_state->callback;
inspector->ws_state = (struct ws_state_s*) malloc(sizeof(struct ws_state_s)); inspector->ws_state = new ws_state_s();
ASSERT_NE(nullptr, inspector->ws_state);
memset(inspector->ws_state, 0, sizeof(struct ws_state_s));
inspector->last_read_end = 0; inspector->last_read_end = 0;
inspector->ws_mode = true; inspector->ws_mode = true;
callback(inspector, kInspectorHandshakeUpgraded, callback(inspector, kInspectorHandshakeUpgraded,
@ -484,11 +442,7 @@ static void handshake_complete(inspector_socket_t* inspector) {
} }
static void cleanup_http_parsing_state(inspector_socket_t* inspector) { static void cleanup_http_parsing_state(inspector_socket_t* inspector) {
struct http_parsing_state_s* state = inspector->http_parsing_state; delete inspector->http_parsing_state;
free(state->current_header);
free(state->path);
free(state->ws_key);
free(state);
inspector->http_parsing_state = nullptr; inspector->http_parsing_state = nullptr;
} }
@ -498,7 +452,7 @@ static void report_handshake_failure_cb(uv_handle_t* handle) {
static_cast<inspector_socket_t*>(handle->data); static_cast<inspector_socket_t*>(handle->data);
handshake_cb cb = inspector->http_parsing_state->callback; handshake_cb cb = inspector->http_parsing_state->callback;
cleanup_http_parsing_state(inspector); cleanup_http_parsing_state(inspector);
cb(inspector, kInspectorHandshakeFailed, nullptr); cb(inspector, kInspectorHandshakeFailed, std::string());
} }
static void close_and_report_handshake_failure(inspector_socket_t* inspector) { static void close_and_report_handshake_failure(inspector_socket_t* inspector) {
@ -537,29 +491,21 @@ static int message_complete_cb(http_parser* parser) {
} else { } else {
handshake_failed(inspector); handshake_failed(inspector);
} }
} else if (!state->ws_key) { } else if (state->ws_key.empty()) {
handshake_failed(inspector); handshake_failed(inspector);
} else if (state->callback(inspector, kInspectorHandshakeUpgrading, } else if (state->callback(inspector, kInspectorHandshakeUpgrading,
state->path)) { state->path)) {
char accept_string[ACCEPT_KEY_LENGTH + 1]; char accept_string[ACCEPT_KEY_LENGTH];
generate_accept_string(state->ws_key, accept_string); generate_accept_string(state->ws_key, &accept_string);
const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n" const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n" "Upgrade: websocket\r\n"
"Connection: Upgrade\r\n" "Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: "; "Sec-WebSocket-Accept: ";
const char accept_ws_suffix[] = "\r\n\r\n"; const char accept_ws_suffix[] = "\r\n\r\n";
// Format has two chars (%s) that are replaced with actual key std::string reply(accept_ws_prefix, sizeof(accept_ws_prefix) - 1);
char accept_response[sizeof(accept_ws_prefix) - 1 + reply.append(accept_string, sizeof(accept_string));
sizeof(accept_ws_suffix) - 1 + reply.append(accept_ws_suffix, sizeof(accept_ws_suffix) - 1);
ACCEPT_KEY_LENGTH]; if (write_to_client(inspector, &reply[0], reply.size()) >= 0) {
memcpy(accept_response, accept_ws_prefix, sizeof(accept_ws_prefix) - 1);
memcpy(accept_response + sizeof(accept_ws_prefix) - 1,
accept_string, ACCEPT_KEY_LENGTH);
memcpy(accept_response + sizeof(accept_ws_prefix) - 1 + ACCEPT_KEY_LENGTH,
accept_ws_suffix, sizeof(accept_ws_suffix) - 1);
int len = sizeof(accept_response);
if (write_to_client(inspector, accept_response, len) >= 0) {
handshake_complete(inspector); handshake_complete(inspector);
inspector->http_parsing_state->done = true; inspector->http_parsing_state->done = true;
} else { } else {
@ -588,7 +534,7 @@ static void data_received_cb(uv_stream_s* client, ssize_t nread,
} else { } else {
http_parsing_state_s* state = inspector->http_parsing_state; http_parsing_state_s* state = inspector->http_parsing_state;
http_parser* parser = &state->parser; http_parser* parser = &state->parser;
http_parser_execute(parser, &state->parser_settings, inspector->buffer, http_parser_execute(parser, &state->parser_settings, &inspector->buffer[0],
nread); nread);
if (parser->http_errno != HPE_OK) { if (parser->http_errno != HPE_OK) {
handshake_failed(inspector); handshake_failed(inspector);
@ -603,15 +549,9 @@ static void data_received_cb(uv_stream_s* client, ssize_t nread,
static void init_handshake(inspector_socket_t* inspector) { static void init_handshake(inspector_socket_t* inspector) {
http_parsing_state_s* state = inspector->http_parsing_state; http_parsing_state_s* state = inspector->http_parsing_state;
CHECK_NE(state, nullptr); CHECK_NE(state, nullptr);
if (state->current_header) { state->current_header.clear();
state->current_header[0] = '\0'; state->ws_key.clear();
} state->path.clear();
if (state->ws_key) {
state->ws_key[0] = '\0';
}
if (state->path) {
state->path[0] = '\0';
}
state->done = false; state->done = false;
http_parser_init(&state->parser, HTTP_REQUEST); http_parser_init(&state->parser, HTTP_REQUEST);
state->parser.data = inspector; state->parser.data = inspector;
@ -626,15 +566,8 @@ static void init_handshake(inspector_socket_t* inspector) {
int inspector_accept(uv_stream_t* server, inspector_socket_t* inspector, int inspector_accept(uv_stream_t* server, inspector_socket_t* inspector,
handshake_cb callback) { handshake_cb callback) {
ASSERT_NE(callback, nullptr); ASSERT_NE(callback, nullptr);
// The only field that users should care about. CHECK_EQ(inspector->http_parsing_state, nullptr);
void* data = inspector->data; inspector->http_parsing_state = new http_parsing_state_s();
memset(inspector, 0, sizeof(*inspector));
inspector->data = data;
inspector->http_parsing_state = (struct http_parsing_state_s*)
malloc(sizeof(struct http_parsing_state_s));
ASSERT_NE(nullptr, inspector->http_parsing_state);
memset(inspector->http_parsing_state, 0, sizeof(struct http_parsing_state_s));
uv_stream_t* client = reinterpret_cast<uv_stream_t*>(&inspector->client); uv_stream_t* client = reinterpret_cast<uv_stream_t*>(&inspector->client);
CHECK_NE(client, nullptr); CHECK_NE(client, nullptr);
int err = uv_tcp_init(server->loop, &inspector->client); int err = uv_tcp_init(server->loop, &inspector->client);

View File

@ -4,6 +4,9 @@
#include "http_parser.h" #include "http_parser.h"
#include "uv.h" #include "uv.h"
#include <string>
#include <vector>
enum inspector_handshake_event { enum inspector_handshake_event {
kInspectorHandshakeUpgrading, kInspectorHandshakeUpgrading,
kInspectorHandshakeUpgraded, kInspectorHandshakeUpgraded,
@ -19,17 +22,32 @@ typedef void (*inspector_cb)(struct inspector_socket_s*, int);
// the connection. inspector_write can be used from the callback. // the connection. inspector_write can be used from the callback.
typedef bool (*handshake_cb)(struct inspector_socket_s*, typedef bool (*handshake_cb)(struct inspector_socket_s*,
enum inspector_handshake_event state, enum inspector_handshake_event state,
const char* path); const std::string& path);
struct http_parsing_state_s; struct http_parsing_state_s {
struct ws_state_s; http_parser parser;
http_parser_settings parser_settings;
handshake_cb callback;
bool done;
bool parsing_value;
std::string ws_key;
std::string path;
std::string current_header;
};
struct ws_state_s {
uv_alloc_cb alloc_cb;
uv_read_cb read_cb;
inspector_cb close_cb;
bool close_sent;
bool received_close;
};
struct inspector_socket_s { struct inspector_socket_s {
void* data; void* data;
struct http_parsing_state_s* http_parsing_state; struct http_parsing_state_s* http_parsing_state;
struct ws_state_s* ws_state; struct ws_state_s* ws_state;
char* buffer; std::vector<char> buffer;
size_t buffer_size;
size_t data_len; size_t data_len;
size_t last_read_end; size_t last_read_end;
uv_tcp_t client; uv_tcp_t client;

View File

@ -26,9 +26,10 @@ static enum inspector_handshake_event last_event = kInspectorHandshakeHttpGet;
static uv_loop_t loop; static uv_loop_t loop;
static uv_tcp_t server, client_socket; static uv_tcp_t server, client_socket;
static inspector_socket_t inspector; static inspector_socket_t inspector;
static char last_path[100]; static std::string last_path;
static void (*handshake_delegate)(enum inspector_handshake_event state, static void (*handshake_delegate)(enum inspector_handshake_event state,
const char* path, bool* should_continue); const std::string& path,
bool* should_continue);
struct read_expects { struct read_expects {
const char* expected; const char* expected;
@ -50,19 +51,19 @@ static void set_timeout_flag(uv_timer_t* timer) {
} }
static void stop_if_stop_path(enum inspector_handshake_event state, static void stop_if_stop_path(enum inspector_handshake_event state,
const char* path, bool* cont) { const std::string& path, bool* cont) {
*cont = path == nullptr || strcmp(path, "/close") != 0; *cont = path.empty() || path != "/close";
} }
static bool connected_cb(inspector_socket_t* socket, static bool connected_cb(inspector_socket_t* socket,
enum inspector_handshake_event state, enum inspector_handshake_event state,
const char* path) { const std::string& path) {
inspector_ready = state == kInspectorHandshakeUpgraded; inspector_ready = state == kInspectorHandshakeUpgraded;
last_event = state; last_event = state;
if (!path) { if (path.empty()) {
strcpy(last_path, "@@@ Nothing Recieved @@@"); last_path = "@@@ Nothing received @@@";
} else { } else {
strncpy(last_path, path, sizeof(last_path) - 1); last_path = path;
} }
handshake_events++; handshake_events++;
bool should_continue = true; bool should_continue = true;
@ -92,7 +93,7 @@ static void do_write(const char* data, int len) {
} }
static void buffer_alloc_cb(uv_handle_t* stream, size_t len, uv_buf_t* buf) { static void buffer_alloc_cb(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
buf->base = static_cast<char*>(malloc(len)); buf->base = new char[len];
buf->len = len; buf->len = len;
} }
@ -113,7 +114,7 @@ static void check_data_cb(read_expects* expectation, ssize_t nread,
} }
} }
GTEST_ASSERT_EQ(i, nread); GTEST_ASSERT_EQ(i, nread);
free(buf->base); delete[] buf->base;
if (expectation->pos == expectation->expected_len) { if (expectation->pos == expectation->expected_len) {
expectation->read_expected = true; expectation->read_expected = true;
*retval = true; *retval = true;
@ -169,7 +170,7 @@ static void expect_on_client(const char* data, size_t len) {
} }
struct expectations { struct expectations {
char* actual_data; std::string actual_data;
size_t actual_offset; size_t actual_offset;
size_t actual_end; size_t actual_end;
int err_code; int err_code;
@ -181,9 +182,8 @@ static void grow_expects_buffer(uv_handle_t* stream, size_t size, uv_buf_t* b) {
size_t end = expects->actual_end; size_t end = expects->actual_end;
// Grow the buffer in chunks of 64k. // Grow the buffer in chunks of 64k.
size_t new_length = (end + size + 65535) & ~((size_t) 0xFFFF); size_t new_length = (end + size + 65535) & ~((size_t) 0xFFFF);
expects->actual_data = expects->actual_data.resize(new_length);
static_cast<char*>(realloc(expects->actual_data, new_length)); *b = uv_buf_init(&expects->actual_data[end], new_length - end);
*b = uv_buf_init(expects->actual_data + end, new_length - end);
} }
// static void dump_hex(const char* buf, size_t len) { // static void dump_hex(const char* buf, size_t len) {
@ -224,8 +224,7 @@ static void setup_inspector_expecting() {
if (inspector.data) { if (inspector.data) {
return; return;
} }
expectations* expects = static_cast<expectations*>(malloc(sizeof(*expects))); expectations* expects = new expectations();
memset(expects, 0, sizeof(*expects));
inspector.data = expects; inspector.data = expects;
inspector_read_start(&inspector, grow_expects_buffer, save_read_data); inspector_read_start(&inspector, grow_expects_buffer, save_read_data);
} }
@ -246,8 +245,8 @@ static void expect_on_server(const char* data, size_t len) {
} }
expects->actual_end -= expects->actual_offset; expects->actual_end -= expects->actual_offset;
if (!expects->actual_end) { if (!expects->actual_end) {
memmove(expects->actual_data, memmove(&expects->actual_data[0],
expects->actual_data + expects->actual_offset, &expects->actual_data[expects->actual_offset],
expects->actual_end); expects->actual_end);
} }
expects->actual_offset = 0; expects->actual_offset = 0;
@ -301,10 +300,11 @@ static void manual_inspector_socket_cleanup() {
EXPECT_EQ(0, uv_is_active( EXPECT_EQ(0, uv_is_active(
reinterpret_cast<uv_handle_t*>(&inspector.client))); reinterpret_cast<uv_handle_t*>(&inspector.client)));
really_close(reinterpret_cast<uv_handle_t*>(&inspector.client)); really_close(reinterpret_cast<uv_handle_t*>(&inspector.client));
free(inspector.ws_state); delete inspector.ws_state;
free(inspector.http_parsing_state); inspector.ws_state = nullptr;
free(inspector.buffer); delete inspector.http_parsing_state;
inspector.buffer = nullptr; inspector.http_parsing_state = nullptr;
inspector.buffer.clear();
} }
static void on_connection(uv_connect_t* connect, int status) { static void on_connection(uv_connect_t* connect, int status) {
@ -321,9 +321,9 @@ protected:
inspector_ready = false; inspector_ready = false;
last_event = kInspectorHandshakeHttpGet; last_event = kInspectorHandshakeHttpGet;
uv_loop_init(&loop); uv_loop_init(&loop);
memset(&inspector, 0, sizeof(inspector)); inspector = inspector_socket_t();
memset(&server, 0, sizeof(server)); server = uv_tcp_t();
memset(&client_socket, 0, sizeof(client_socket)); client_socket = uv_tcp_t();
server.data = &inspector; server.data = &inspector;
sockaddr_in addr; sockaddr_in addr;
uv_timer_init(&loop, &timeout_timer); uv_timer_init(&loop, &timeout_timer);
@ -346,13 +346,11 @@ protected:
virtual void TearDown() { virtual void TearDown() {
really_close(reinterpret_cast<uv_handle_t*>(&client_socket)); really_close(reinterpret_cast<uv_handle_t*>(&client_socket));
really_close(reinterpret_cast<uv_handle_t*>(&timeout_timer)); really_close(reinterpret_cast<uv_handle_t*>(&timeout_timer));
EXPECT_EQ(nullptr, inspector.buffer); EXPECT_TRUE(inspector.buffer.empty());
expectations* expects = static_cast<expectations*>(inspector.data); expectations* expects = static_cast<expectations*>(inspector.data);
if (expects != nullptr) { if (expects != nullptr) {
GTEST_ASSERT_EQ(expects->actual_end, expects->actual_offset); GTEST_ASSERT_EQ(expects->actual_end, expects->actual_offset);
free(expects->actual_data); delete expects;
expects->actual_data = nullptr;
free(expects);
inspector.data = nullptr; inspector.data = nullptr;
} }
const int err = uv_loop_close(&loop); const int err = uv_loop_close(&loop);
@ -608,10 +606,10 @@ static void send_in_chunks(const char* data, size_t len) {
static const char TEST_SUCCESS[] = "Test Success\n\n"; static const char TEST_SUCCESS[] = "Test Success\n\n";
static void ReportsHttpGet_handshake(enum inspector_handshake_event state, static void ReportsHttpGet_handshake(enum inspector_handshake_event state,
const char* path, bool* cont) { const std::string& path, bool* cont) {
*cont = true; *cont = true;
enum inspector_handshake_event expected_state = kInspectorHandshakeHttpGet; enum inspector_handshake_event expected_state = kInspectorHandshakeHttpGet;
const char* expected_path; std::string expected_path;
switch (handshake_events) { switch (handshake_events) {
case 1: case 1:
expected_path = "/some/path"; expected_path = "/some/path";
@ -625,18 +623,16 @@ static void ReportsHttpGet_handshake(enum inspector_handshake_event state,
break; break;
case 5: case 5:
expected_state = kInspectorHandshakeFailed; expected_state = kInspectorHandshakeFailed;
expected_path = nullptr;
break; break;
case 4: case 4:
expected_path = "/close"; expected_path = "/close";
*cont = false; *cont = false;
break; break;
default: default:
expected_path = nullptr;
ASSERT_TRUE(false); ASSERT_TRUE(false);
} }
EXPECT_EQ(expected_state, state); EXPECT_EQ(expected_state, state);
EXPECT_STREQ(expected_path, path); EXPECT_EQ(expected_path, path);
} }
TEST_F(InspectorSocketTest, ReportsHttpGet) { TEST_F(InspectorSocketTest, ReportsHttpGet) {
@ -672,15 +668,15 @@ TEST_F(InspectorSocketTest, ReportsHttpGet) {
static void static void
HandshakeCanBeCanceled_handshake(enum inspector_handshake_event state, HandshakeCanBeCanceled_handshake(enum inspector_handshake_event state,
const char* path, bool* cont) { const std::string& path, bool* cont) {
switch (handshake_events - 1) { switch (handshake_events - 1) {
case 0: case 0:
EXPECT_EQ(kInspectorHandshakeUpgrading, state); EXPECT_EQ(kInspectorHandshakeUpgrading, state);
EXPECT_STREQ("/ws/path", path); EXPECT_EQ("/ws/path", path);
break; break;
case 1: case 1:
EXPECT_EQ(kInspectorHandshakeFailed, state); EXPECT_EQ(kInspectorHandshakeFailed, state);
EXPECT_STREQ(nullptr, path); EXPECT_TRUE(path.empty());
break; break;
default: default:
EXPECT_TRUE(false); EXPECT_TRUE(false);
@ -699,9 +695,9 @@ TEST_F(InspectorSocketTest, HandshakeCanBeCanceled) {
} }
static void GetThenHandshake_handshake(enum inspector_handshake_event state, static void GetThenHandshake_handshake(enum inspector_handshake_event state,
const char* path, bool* cont) { const std::string& path, bool* cont) {
*cont = true; *cont = true;
const char* expected_path = "/ws/path"; std::string expected_path = "/ws/path";
switch (handshake_events - 1) { switch (handshake_events - 1) {
case 0: case 0:
EXPECT_EQ(kInspectorHandshakeHttpGet, state); EXPECT_EQ(kInspectorHandshakeHttpGet, state);
@ -718,7 +714,7 @@ static void GetThenHandshake_handshake(enum inspector_handshake_event state,
EXPECT_TRUE(false); EXPECT_TRUE(false);
break; break;
} }
EXPECT_STREQ(expected_path, path); EXPECT_EQ(expected_path, path);
} }
TEST_F(InspectorSocketTest, GetThenHandshake) { TEST_F(InspectorSocketTest, GetThenHandshake) {
@ -793,19 +789,17 @@ TEST_F(InspectorSocketTest, EOFBeforeHandshake) {
SPIN_WHILE(last_event != kInspectorHandshakeFailed); SPIN_WHILE(last_event != kInspectorHandshakeFailed);
} }
static void fill_message(char* buffer, size_t len) { static void fill_message(std::string* buffer) {
buffer[len - 1] = '\0'; for (size_t i = 0; i < buffer->size(); i += 1) {
for (size_t i = 0; i < len - 1; i++) { (*buffer)[i] = 'a' + (i % ('z' - 'a'));
buffer[i] = 'a' + (i % ('z' - 'a'));
} }
} }
static void mask_message(const char* message, static void mask_message(const std::string& message,
char* buffer, const char mask[]) { char* buffer, const char mask[]) {
const size_t mask_len = 4; const size_t mask_len = 4;
int i = 0; for (size_t i = 0; i < message.size(); i += 1) {
while (*message != '\0') { buffer[i] = message[i] ^ mask[i % mask_len];
*buffer++ = *message++ ^ mask[i++ % mask_len];
} }
} }
@ -816,25 +810,20 @@ TEST_F(InspectorSocketTest, Send1Mb) {
SPIN_WHILE(!inspector_ready); SPIN_WHILE(!inspector_ready);
expect_handshake(); expect_handshake();
const size_t message_len = 1000000;
// 2. Brief exchange // 2. Brief exchange
char* message = static_cast<char*>(malloc(message_len + 1)); std::string message(1000000, '\0');
fill_message(message, message_len + 1); fill_message(&message);
// 1000000 is 0xF4240 hex // 1000000 is 0xF4240 hex
const char EXPECTED_FRAME_HEADER[] = { const char EXPECTED_FRAME_HEADER[] = {
'\x81', '\x7f', '\x00', '\x00', '\x00', '\x00', '\x00', '\x0F', '\x81', '\x7f', '\x00', '\x00', '\x00', '\x00', '\x00', '\x0F',
'\x42', '\x40' '\x42', '\x40'
}; };
char* expected = std::string expected(EXPECTED_FRAME_HEADER, sizeof(EXPECTED_FRAME_HEADER));
static_cast<char*>(malloc(sizeof(EXPECTED_FRAME_HEADER) + message_len)); expected.append(message);
memcpy(expected, EXPECTED_FRAME_HEADER, sizeof(EXPECTED_FRAME_HEADER)); inspector_write(&inspector, &message[0], message.size());
memcpy(expected + sizeof(EXPECTED_FRAME_HEADER), message, message_len); expect_on_client(&expected[0], expected.size());
inspector_write(&inspector, message, message_len);
expect_on_client(expected, sizeof(EXPECTED_FRAME_HEADER) + message_len);
char MASK[4] = {'W', 'h', 'O', 'a'}; char MASK[4] = {'W', 'h', 'O', 'a'};
@ -843,14 +832,13 @@ TEST_F(InspectorSocketTest, Send1Mb) {
'\x42', '\x40', MASK[0], MASK[1], MASK[2], MASK[3] '\x42', '\x40', MASK[0], MASK[1], MASK[2], MASK[3]
}; };
const size_t outgoing_len = sizeof(FRAME_TO_SERVER_HEADER) + message_len; std::string outgoing(FRAME_TO_SERVER_HEADER, sizeof(FRAME_TO_SERVER_HEADER));
char* outgoing = static_cast<char*>(malloc(outgoing_len)); outgoing.resize(outgoing.size() + message.size());
memcpy(outgoing, FRAME_TO_SERVER_HEADER, sizeof(FRAME_TO_SERVER_HEADER)); mask_message(message, &outgoing[sizeof(FRAME_TO_SERVER_HEADER)], MASK);
mask_message(message, outgoing + sizeof(FRAME_TO_SERVER_HEADER), MASK);
setup_inspector_expecting(); // Buffer on the client side. setup_inspector_expecting(); // Buffer on the client side.
do_write(outgoing, outgoing_len); do_write(&outgoing[0], outgoing.size());
expect_on_server(message, message_len); expect_on_server(&message[0], message.size());
// 3. Close // 3. Close
const char CLIENT_CLOSE_FRAME[] = {'\x88', '\x80', '\x2D', const char CLIENT_CLOSE_FRAME[] = {'\x88', '\x80', '\x2D',
@ -860,9 +848,6 @@ TEST_F(InspectorSocketTest, Send1Mb) {
expect_on_client(SERVER_CLOSE_FRAME, sizeof(SERVER_CLOSE_FRAME)); expect_on_client(SERVER_CLOSE_FRAME, sizeof(SERVER_CLOSE_FRAME));
GTEST_ASSERT_EQ(0, uv_is_active( GTEST_ASSERT_EQ(0, uv_is_active(
reinterpret_cast<uv_handle_t*>(&client_socket))); reinterpret_cast<uv_handle_t*>(&client_socket)));
free(outgoing);
free(expected);
free(message);
} }
static ssize_t err; static ssize_t err;