inspector: Unify event queues
Current implementation tracks connected/disconnected status separately which potentially introduces race condition. This change introduces notion of session IDs and also posts connect/disconnect events into the same queue as the messages. This way Node knows what session given response belongs to and can discard messages if the frontend for that session had disconnected. This also fixes an issue when frontend was unable to attach to V8 instance that was running infinite loop. PR-URL: https://github.com/nodejs/node/pull/7271 Reviewed-By: bnoordhuis - Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
parent
6e15ae98fe
commit
4220c24b17
@ -17,6 +17,7 @@
|
|||||||
#include "libplatform/libplatform.h"
|
#include "libplatform/libplatform.h"
|
||||||
|
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
// We need pid to use as ID with Chrome
|
// We need pid to use as ID with Chrome
|
||||||
@ -31,6 +32,9 @@
|
|||||||
namespace node {
|
namespace node {
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
const char TAG_CONNECT[] = "#connect";
|
||||||
|
const char TAG_DISCONNECT[] = "#disconnect";
|
||||||
|
|
||||||
const char DEVTOOLS_PATH[] = "/node";
|
const char DEVTOOLS_PATH[] = "/node";
|
||||||
const char DEVTOOLS_HASH[] = "521e5b7e2b7cc66b4006a8a54cb9c4e57494a5ef";
|
const char DEVTOOLS_HASH[] = "521e5b7e2b7cc66b4006a8a54cb9c4e57494a5ef";
|
||||||
|
|
||||||
@ -154,7 +158,6 @@ bool RespondToGet(inspector_socket_t* socket, const char* path, int port) {
|
|||||||
namespace inspector {
|
namespace inspector {
|
||||||
|
|
||||||
using blink::protocol::DictionaryValue;
|
using blink::protocol::DictionaryValue;
|
||||||
using blink::protocol::String16;
|
|
||||||
|
|
||||||
class AgentImpl {
|
class AgentImpl {
|
||||||
public:
|
public:
|
||||||
@ -171,24 +174,27 @@ class AgentImpl {
|
|||||||
void WaitForDisconnect();
|
void WaitForDisconnect();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
using MessageQueue = std::vector<std::pair<int, String16>>;
|
||||||
|
|
||||||
static void ThreadCbIO(void* agent);
|
static void ThreadCbIO(void* agent);
|
||||||
static void OnSocketConnectionIO(uv_stream_t* server, int status);
|
static void OnSocketConnectionIO(uv_stream_t* server, int status);
|
||||||
static bool OnInspectorHandshakeIO(inspector_socket_t* socket,
|
static bool OnInspectorHandshakeIO(inspector_socket_t* socket,
|
||||||
enum inspector_handshake_event state,
|
enum inspector_handshake_event state,
|
||||||
const char* path);
|
const char* path);
|
||||||
static void OnRemoteDataIO(uv_stream_t* stream, ssize_t read,
|
|
||||||
const uv_buf_t* b);
|
|
||||||
static void WriteCbIO(uv_async_t* async);
|
static void WriteCbIO(uv_async_t* async);
|
||||||
|
|
||||||
void WorkerRunIO();
|
void WorkerRunIO();
|
||||||
void OnInspectorConnectionIO(inspector_socket_t* socket);
|
void OnInspectorConnectionIO(inspector_socket_t* socket);
|
||||||
void PushPendingMessage(std::vector<std::string>* queue,
|
void OnRemoteDataIO(inspector_socket_t* stream, ssize_t read,
|
||||||
const std::string& message);
|
const uv_buf_t* b);
|
||||||
void SwapBehindLock(std::vector<std::string> AgentImpl::*queue,
|
|
||||||
std::vector<std::string>* output);
|
|
||||||
void PostMessages();
|
void PostMessages();
|
||||||
void SetConnected(bool connected);
|
void SetConnected(bool connected);
|
||||||
void Write(const std::string& message);
|
void DispatchMessages();
|
||||||
|
void Write(int session_id, const String16& message);
|
||||||
|
void AppendMessage(MessageQueue* vector, int session_id,
|
||||||
|
const String16& message);
|
||||||
|
void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2);
|
||||||
|
void PostIncomingMessage(const String16& message);
|
||||||
|
|
||||||
uv_sem_t start_sem_;
|
uv_sem_t start_sem_;
|
||||||
ConditionVariable pause_cond_;
|
ConditionVariable pause_cond_;
|
||||||
@ -208,19 +214,28 @@ class AgentImpl {
|
|||||||
inspector_socket_t* client_socket_;
|
inspector_socket_t* client_socket_;
|
||||||
blink::V8Inspector* inspector_;
|
blink::V8Inspector* inspector_;
|
||||||
v8::Platform* platform_;
|
v8::Platform* platform_;
|
||||||
std::vector<std::string> message_queue_;
|
MessageQueue incoming_message_queue_;
|
||||||
std::vector<std::string> outgoing_message_queue_;
|
MessageQueue outgoing_message_queue_;
|
||||||
bool dispatching_messages_;
|
bool dispatching_messages_;
|
||||||
|
int frontend_session_id_;
|
||||||
|
int backend_session_id_;
|
||||||
|
|
||||||
friend class ChannelImpl;
|
friend class ChannelImpl;
|
||||||
friend class DispatchOnInspectorBackendTask;
|
friend class DispatchOnInspectorBackendTask;
|
||||||
friend class SetConnectedTask;
|
friend class SetConnectedTask;
|
||||||
friend class V8NodeInspector;
|
friend class V8NodeInspector;
|
||||||
friend void InterruptCallback(v8::Isolate*, void* agent);
|
friend void InterruptCallback(v8::Isolate*, void* agent);
|
||||||
|
friend void DataCallback(uv_stream_t* stream, ssize_t read,
|
||||||
|
const uv_buf_t* buf);
|
||||||
};
|
};
|
||||||
|
|
||||||
void InterruptCallback(v8::Isolate*, void* agent) {
|
void InterruptCallback(v8::Isolate*, void* agent) {
|
||||||
static_cast<AgentImpl*>(agent)->PostMessages();
|
static_cast<AgentImpl*>(agent)->DispatchMessages();
|
||||||
|
}
|
||||||
|
|
||||||
|
void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) {
|
||||||
|
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
|
||||||
|
static_cast<AgentImpl*>(socket->data)->OnRemoteDataIO(socket, read, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
class DispatchOnInspectorBackendTask : public v8::Task {
|
class DispatchOnInspectorBackendTask : public v8::Task {
|
||||||
@ -228,7 +243,7 @@ class DispatchOnInspectorBackendTask : public v8::Task {
|
|||||||
explicit DispatchOnInspectorBackendTask(AgentImpl* agent) : agent_(agent) {}
|
explicit DispatchOnInspectorBackendTask(AgentImpl* agent) : agent_(agent) {}
|
||||||
|
|
||||||
void Run() override {
|
void Run() override {
|
||||||
agent_->PostMessages();
|
agent_->DispatchMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -251,27 +266,12 @@ class ChannelImpl final : public blink::protocol::FrontendChannel {
|
|||||||
void flushProtocolNotifications() override { }
|
void flushProtocolNotifications() override { }
|
||||||
|
|
||||||
void sendMessageToFrontend(const String16& message) {
|
void sendMessageToFrontend(const String16& message) {
|
||||||
agent_->Write(message.utf8());
|
agent_->Write(agent_->frontend_session_id_, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
AgentImpl* const agent_;
|
AgentImpl* const agent_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class SetConnectedTask : public v8::Task {
|
|
||||||
public:
|
|
||||||
SetConnectedTask(AgentImpl* agent, bool connected)
|
|
||||||
: agent_(agent),
|
|
||||||
connected_(connected) {}
|
|
||||||
|
|
||||||
void Run() override {
|
|
||||||
agent_->SetConnected(connected_);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
AgentImpl* agent_;
|
|
||||||
bool connected_;
|
|
||||||
};
|
|
||||||
|
|
||||||
class V8NodeInspector : public blink::V8Inspector {
|
class V8NodeInspector : public blink::V8Inspector {
|
||||||
public:
|
public:
|
||||||
V8NodeInspector(AgentImpl* agent, node::Environment* env,
|
V8NodeInspector(AgentImpl* agent, node::Environment* env,
|
||||||
@ -320,7 +320,9 @@ AgentImpl::AgentImpl(Environment* env) : port_(0),
|
|||||||
client_socket_(nullptr),
|
client_socket_(nullptr),
|
||||||
inspector_(nullptr),
|
inspector_(nullptr),
|
||||||
platform_(nullptr),
|
platform_(nullptr),
|
||||||
dispatching_messages_(false) {
|
dispatching_messages_(false),
|
||||||
|
frontend_session_id_(0),
|
||||||
|
backend_session_id_(0) {
|
||||||
CHECK_EQ(0, uv_sem_init(&start_sem_, 0));
|
CHECK_EQ(0, uv_sem_init(&start_sem_, 0));
|
||||||
memset(&data_written_, 0, sizeof(data_written_));
|
memset(&data_written_, 0, sizeof(data_written_));
|
||||||
memset(&io_thread_req_, 0, sizeof(io_thread_req_));
|
memset(&io_thread_req_, 0, sizeof(io_thread_req_));
|
||||||
@ -355,10 +357,7 @@ void AgentImpl::Start(v8::Platform* platform, int port, bool wait) {
|
|||||||
uv_sem_wait(&start_sem_);
|
uv_sem_wait(&start_sem_);
|
||||||
|
|
||||||
if (wait) {
|
if (wait) {
|
||||||
// Flush messages in case of wait to connect, see OnRemoteDataIO on how it
|
DispatchMessages();
|
||||||
// should be fixed.
|
|
||||||
SetConnected(true);
|
|
||||||
PostMessages();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -424,55 +423,39 @@ bool AgentImpl::OnInspectorHandshakeIO(inspector_socket_t* socket,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// static
|
void AgentImpl::OnRemoteDataIO(inspector_socket_t* socket,
|
||||||
void AgentImpl::OnRemoteDataIO(uv_stream_t* stream,
|
ssize_t read,
|
||||||
ssize_t read,
|
const uv_buf_t* buf) {
|
||||||
const uv_buf_t* b) {
|
Mutex::ScopedLock scoped_lock(pause_lock_);
|
||||||
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data);
|
|
||||||
AgentImpl* agent = static_cast<AgentImpl*>(socket->data);
|
|
||||||
Mutex::ScopedLock scoped_lock(agent->pause_lock_);
|
|
||||||
if (read > 0) {
|
if (read > 0) {
|
||||||
std::string str(b->base, read);
|
String16 str = String16::fromUTF8(buf->base, read);
|
||||||
agent->PushPendingMessage(&agent->message_queue_, str);
|
PostIncomingMessage(str);
|
||||||
free(b->base);
|
|
||||||
|
|
||||||
// TODO(pfeldman): Instead of blocking execution while debugger
|
// TODO(pfeldman): Instead of blocking execution while debugger
|
||||||
// engages, node should wait for the run callback from the remote client
|
// engages, node should wait for the run callback from the remote client
|
||||||
// and initiate its startup. This is a change to node.cc that should be
|
// and initiate its startup. This is a change to node.cc that should be
|
||||||
// upstreamed separately.
|
// upstreamed separately.
|
||||||
if (agent->wait_ && str.find("\"Runtime.run\"") != std::string::npos) {
|
if (wait_ && str.find("\"Runtime.run\"") != std::string::npos) {
|
||||||
agent->wait_ = false;
|
wait_ = false;
|
||||||
uv_sem_post(&agent->start_sem_);
|
uv_sem_post(&start_sem_);
|
||||||
}
|
}
|
||||||
|
|
||||||
agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(),
|
platform_->CallOnForegroundThread(parent_env_->isolate(),
|
||||||
new DispatchOnInspectorBackendTask(agent));
|
new DispatchOnInspectorBackendTask(this));
|
||||||
agent->parent_env_->isolate()
|
parent_env_->isolate()->RequestInterrupt(InterruptCallback, this);
|
||||||
->RequestInterrupt(InterruptCallback, agent);
|
uv_async_send(&data_written_);
|
||||||
uv_async_send(&agent->data_written_);
|
|
||||||
} else if (read <= 0) {
|
} else if (read <= 0) {
|
||||||
// EOF
|
// EOF
|
||||||
if (agent->client_socket_ == socket) {
|
if (client_socket_ == socket) {
|
||||||
agent->client_socket_ = nullptr;
|
String16 message(TAG_DISCONNECT, sizeof(TAG_DISCONNECT) - 1);
|
||||||
agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(),
|
client_socket_ = nullptr;
|
||||||
new SetConnectedTask(agent, false));
|
PostIncomingMessage(message);
|
||||||
uv_async_send(&agent->data_written_);
|
|
||||||
}
|
}
|
||||||
DisconnectAndDisposeIO(socket);
|
DisconnectAndDisposeIO(socket);
|
||||||
}
|
}
|
||||||
agent->pause_cond_.Broadcast(scoped_lock);
|
if (buf) {
|
||||||
}
|
free(buf->base);
|
||||||
|
}
|
||||||
void AgentImpl::PushPendingMessage(std::vector<std::string>* queue,
|
pause_cond_.Broadcast(scoped_lock);
|
||||||
const std::string& message) {
|
|
||||||
Mutex::ScopedLock scoped_lock(queue_lock_);
|
|
||||||
queue->push_back(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AgentImpl::SwapBehindLock(std::vector<std::string> AgentImpl::*queue,
|
|
||||||
std::vector<std::string>* output) {
|
|
||||||
Mutex::ScopedLock scoped_lock(queue_lock_);
|
|
||||||
(this->*queue).swap(*output);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// static
|
// static
|
||||||
@ -480,11 +463,14 @@ void AgentImpl::WriteCbIO(uv_async_t* async) {
|
|||||||
AgentImpl* agent = static_cast<AgentImpl*>(async->data);
|
AgentImpl* agent = static_cast<AgentImpl*>(async->data);
|
||||||
inspector_socket_t* socket = agent->client_socket_;
|
inspector_socket_t* socket = agent->client_socket_;
|
||||||
if (socket) {
|
if (socket) {
|
||||||
std::vector<std::string> outgoing_messages;
|
MessageQueue outgoing_messages;
|
||||||
agent->SwapBehindLock(&AgentImpl::outgoing_message_queue_,
|
agent->SwapBehindLock(&agent->outgoing_message_queue_, &outgoing_messages);
|
||||||
&outgoing_messages);
|
for (const MessageQueue::value_type& outgoing : outgoing_messages) {
|
||||||
for (auto const& message : outgoing_messages)
|
if (outgoing.first == agent->frontend_session_id_) {
|
||||||
inspector_write(socket, message.c_str(), message.length());
|
std::string message = outgoing.second.utf8();
|
||||||
|
inspector_write(socket, message.c_str(), message.length());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -518,49 +504,70 @@ void AgentImpl::WorkerRunIO() {
|
|||||||
uv_run(&child_loop_, UV_RUN_DEFAULT);
|
uv_run(&child_loop_, UV_RUN_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void AgentImpl::AppendMessage(MessageQueue* queue, int session_id,
|
||||||
|
const String16& message) {
|
||||||
|
Mutex::ScopedLock scoped_lock(queue_lock_);
|
||||||
|
queue->push_back(std::make_pair(session_id, message));
|
||||||
|
}
|
||||||
|
|
||||||
|
void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) {
|
||||||
|
Mutex::ScopedLock scoped_lock(queue_lock_);
|
||||||
|
vector1->swap(*vector2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AgentImpl::PostIncomingMessage(const String16& message) {
|
||||||
|
AppendMessage(&incoming_message_queue_, frontend_session_id_, message);
|
||||||
|
v8::Isolate* isolate = parent_env_->isolate();
|
||||||
|
platform_->CallOnForegroundThread(isolate,
|
||||||
|
new DispatchOnInspectorBackendTask(this));
|
||||||
|
isolate->RequestInterrupt(InterruptCallback, this);
|
||||||
|
uv_async_send(&data_written_);
|
||||||
|
}
|
||||||
|
|
||||||
void AgentImpl::OnInspectorConnectionIO(inspector_socket_t* socket) {
|
void AgentImpl::OnInspectorConnectionIO(inspector_socket_t* socket) {
|
||||||
if (client_socket_) {
|
if (client_socket_) {
|
||||||
DisconnectAndDisposeIO(socket);
|
DisconnectAndDisposeIO(socket);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
client_socket_ = socket;
|
client_socket_ = socket;
|
||||||
inspector_read_start(socket, OnBufferAlloc, AgentImpl::OnRemoteDataIO);
|
inspector_read_start(socket, OnBufferAlloc, DataCallback);
|
||||||
platform_->CallOnForegroundThread(parent_env_->isolate(),
|
frontend_session_id_++;
|
||||||
new SetConnectedTask(this, true));
|
PostIncomingMessage(String16(TAG_CONNECT, sizeof(TAG_CONNECT) - 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
void AgentImpl::PostMessages() {
|
void AgentImpl::DispatchMessages() {
|
||||||
if (dispatching_messages_)
|
if (dispatching_messages_)
|
||||||
return;
|
return;
|
||||||
dispatching_messages_ = true;
|
dispatching_messages_ = true;
|
||||||
std::vector<std::string> messages;
|
MessageQueue tasks;
|
||||||
SwapBehindLock(&AgentImpl::message_queue_, &messages);
|
SwapBehindLock(&incoming_message_queue_, &tasks);
|
||||||
for (auto const& message : messages)
|
for (const MessageQueue::value_type& pair : tasks) {
|
||||||
inspector_->dispatchMessageFromFrontend(
|
const String16& message = pair.second;
|
||||||
String16::fromUTF8(message.c_str(), message.length()));
|
if (message == TAG_CONNECT) {
|
||||||
|
CHECK_EQ(false, connected_);
|
||||||
|
backend_session_id_++;
|
||||||
|
connected_ = true;
|
||||||
|
fprintf(stderr, "Debugger attached.\n");
|
||||||
|
inspector_->connectFrontend(new ChannelImpl(this));
|
||||||
|
} else if (message == TAG_DISCONNECT) {
|
||||||
|
CHECK(connected_);
|
||||||
|
connected_ = false;
|
||||||
|
if (!shutting_down_)
|
||||||
|
PrintDebuggerReadyMessage(port_);
|
||||||
|
inspector_->quitMessageLoopOnPause();
|
||||||
|
inspector_->disconnectFrontend();
|
||||||
|
} else {
|
||||||
|
inspector_->dispatchMessageFromFrontend(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
uv_async_send(&data_written_);
|
uv_async_send(&data_written_);
|
||||||
dispatching_messages_ = false;
|
dispatching_messages_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AgentImpl::SetConnected(bool connected) {
|
void AgentImpl::Write(int session_id, const String16& message) {
|
||||||
if (connected_ == connected)
|
AppendMessage(&outgoing_message_queue_, session_id, message);
|
||||||
return;
|
int err = uv_async_send(&io_thread_req_);
|
||||||
|
CHECK_EQ(0, err);
|
||||||
connected_ = connected;
|
|
||||||
if (connected) {
|
|
||||||
fprintf(stderr, "Debugger attached.\n");
|
|
||||||
inspector_->connectFrontend(new ChannelImpl(this));
|
|
||||||
} else {
|
|
||||||
if (!shutting_down_)
|
|
||||||
PrintDebuggerReadyMessage(port_);
|
|
||||||
inspector_->quitMessageLoopOnPause();
|
|
||||||
inspector_->disconnectFrontend();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void AgentImpl::Write(const std::string& message) {
|
|
||||||
PushPendingMessage(&outgoing_message_queue_, message);
|
|
||||||
ASSERT_EQ(0, uv_async_send(&io_thread_req_));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exported class Agent
|
// Exported class Agent
|
||||||
|
Loading…
x
Reference in New Issue
Block a user