inspector: address race conditions
Stress tests uncovered 2 race conditions, when IO events happened during V8 entering event loop on pause or during Node.js shutdown. Fixes: nodejs/node#8669 PR-URL: https://github.com/nodejs/node/pull/8672 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: Aleksey Kozyatinskiy <kozyatinskiy@chromium.org>
This commit is contained in:
parent
782620f03f
commit
5acbeb0272
@ -265,12 +265,13 @@ class AgentImpl {
|
|||||||
const String16& message);
|
const String16& message);
|
||||||
void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2);
|
void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2);
|
||||||
void PostIncomingMessage(const String16& message);
|
void PostIncomingMessage(const String16& message);
|
||||||
|
void WaitForFrontendMessage();
|
||||||
|
void NotifyMessageReceived();
|
||||||
State ToState(State state);
|
State ToState(State state);
|
||||||
|
|
||||||
uv_sem_t start_sem_;
|
uv_sem_t start_sem_;
|
||||||
ConditionVariable pause_cond_;
|
ConditionVariable incoming_message_cond_;
|
||||||
Mutex pause_lock_;
|
Mutex state_lock_;
|
||||||
Mutex queue_lock_;
|
|
||||||
uv_thread_t thread_;
|
uv_thread_t thread_;
|
||||||
uv_loop_t child_loop_;
|
uv_loop_t child_loop_;
|
||||||
|
|
||||||
@ -370,15 +371,11 @@ class V8NodeInspector : public v8_inspector::V8InspectorClient {
|
|||||||
return;
|
return;
|
||||||
terminated_ = false;
|
terminated_ = false;
|
||||||
running_nested_loop_ = true;
|
running_nested_loop_ = true;
|
||||||
agent_->DispatchMessages();
|
while (!terminated_) {
|
||||||
do {
|
agent_->WaitForFrontendMessage();
|
||||||
{
|
|
||||||
Mutex::ScopedLock scoped_lock(agent_->pause_lock_);
|
|
||||||
agent_->pause_cond_.Wait(scoped_lock);
|
|
||||||
}
|
|
||||||
while (v8::platform::PumpMessageLoop(platform_, env_->isolate()))
|
while (v8::platform::PumpMessageLoop(platform_, env_->isolate()))
|
||||||
{}
|
{}
|
||||||
} while (!terminated_);
|
}
|
||||||
terminated_ = false;
|
terminated_ = false;
|
||||||
running_nested_loop_ = false;
|
running_nested_loop_ = false;
|
||||||
}
|
}
|
||||||
@ -661,7 +658,6 @@ bool AgentImpl::OnInspectorHandshakeIO(InspectorSocket* socket,
|
|||||||
void AgentImpl::OnRemoteDataIO(InspectorSocket* socket,
|
void AgentImpl::OnRemoteDataIO(InspectorSocket* socket,
|
||||||
ssize_t read,
|
ssize_t read,
|
||||||
const uv_buf_t* buf) {
|
const uv_buf_t* buf) {
|
||||||
Mutex::ScopedLock scoped_lock(pause_lock_);
|
|
||||||
if (read > 0) {
|
if (read > 0) {
|
||||||
String16 str = String16::fromUTF8(buf->base, read);
|
String16 str = String16::fromUTF8(buf->base, read);
|
||||||
// TODO(pfeldman): Instead of blocking execution while debugger
|
// TODO(pfeldman): Instead of blocking execution while debugger
|
||||||
@ -686,7 +682,6 @@ void AgentImpl::OnRemoteDataIO(InspectorSocket* socket,
|
|||||||
if (buf) {
|
if (buf) {
|
||||||
delete[] buf->base;
|
delete[] buf->base;
|
||||||
}
|
}
|
||||||
pause_cond_.Broadcast(scoped_lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// static
|
// static
|
||||||
@ -752,14 +747,14 @@ void AgentImpl::WorkerRunIO() {
|
|||||||
|
|
||||||
bool AgentImpl::AppendMessage(MessageQueue* queue, int session_id,
|
bool AgentImpl::AppendMessage(MessageQueue* queue, int session_id,
|
||||||
const String16& message) {
|
const String16& message) {
|
||||||
Mutex::ScopedLock scoped_lock(queue_lock_);
|
Mutex::ScopedLock scoped_lock(state_lock_);
|
||||||
bool trigger_pumping = queue->empty();
|
bool trigger_pumping = queue->empty();
|
||||||
queue->push_back(std::make_pair(session_id, message));
|
queue->push_back(std::make_pair(session_id, message));
|
||||||
return trigger_pumping;
|
return trigger_pumping;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) {
|
void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) {
|
||||||
Mutex::ScopedLock scoped_lock(queue_lock_);
|
Mutex::ScopedLock scoped_lock(state_lock_);
|
||||||
vector1->swap(*vector2);
|
vector1->swap(*vector2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -771,6 +766,18 @@ void AgentImpl::PostIncomingMessage(const String16& message) {
|
|||||||
isolate->RequestInterrupt(InterruptCallback, this);
|
isolate->RequestInterrupt(InterruptCallback, this);
|
||||||
uv_async_send(data_written_);
|
uv_async_send(data_written_);
|
||||||
}
|
}
|
||||||
|
NotifyMessageReceived();
|
||||||
|
}
|
||||||
|
|
||||||
|
void AgentImpl::WaitForFrontendMessage() {
|
||||||
|
Mutex::ScopedLock scoped_lock(state_lock_);
|
||||||
|
if (incoming_message_queue_.empty())
|
||||||
|
incoming_message_cond_.Wait(scoped_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AgentImpl::NotifyMessageReceived() {
|
||||||
|
Mutex::ScopedLock scoped_lock(state_lock_);
|
||||||
|
incoming_message_cond_.Broadcast(scoped_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AgentImpl::OnInspectorConnectionIO(InspectorSocket* socket) {
|
void AgentImpl::OnInspectorConnectionIO(InspectorSocket* socket) {
|
||||||
|
@ -6,6 +6,8 @@ const http = require('http');
|
|||||||
const path = require('path');
|
const path = require('path');
|
||||||
const spawn = require('child_process').spawn;
|
const spawn = require('child_process').spawn;
|
||||||
|
|
||||||
|
const DEBUG = false;
|
||||||
|
|
||||||
const TIMEOUT = 15 * 1000;
|
const TIMEOUT = 15 * 1000;
|
||||||
|
|
||||||
const mainScript = path.join(common.fixturesDir, 'loop.js');
|
const mainScript = path.join(common.fixturesDir, 'loop.js');
|
||||||
@ -13,6 +15,8 @@ const mainScript = path.join(common.fixturesDir, 'loop.js');
|
|||||||
function send(socket, message, id, callback) {
|
function send(socket, message, id, callback) {
|
||||||
const msg = JSON.parse(JSON.stringify(message)); // Clone!
|
const msg = JSON.parse(JSON.stringify(message)); // Clone!
|
||||||
msg['id'] = id;
|
msg['id'] = id;
|
||||||
|
if (DEBUG)
|
||||||
|
console.log('[sent]', JSON.stringify(msg));
|
||||||
const messageBuf = Buffer.from(JSON.stringify(msg));
|
const messageBuf = Buffer.from(JSON.stringify(msg));
|
||||||
|
|
||||||
const wsHeaderBuf = Buffer.allocUnsafe(16);
|
const wsHeaderBuf = Buffer.allocUnsafe(16);
|
||||||
@ -61,6 +65,8 @@ function parseWSFrame(buffer, handler) {
|
|||||||
return 0;
|
return 0;
|
||||||
const message = JSON.parse(
|
const message = JSON.parse(
|
||||||
buffer.slice(bodyOffset, bodyOffset + dataLen).toString('utf8'));
|
buffer.slice(bodyOffset, bodyOffset + dataLen).toString('utf8'));
|
||||||
|
if (DEBUG)
|
||||||
|
console.log('[received]', JSON.stringify(message));
|
||||||
handler(message);
|
handler(message);
|
||||||
return bodyOffset + dataLen;
|
return bodyOffset + dataLen;
|
||||||
}
|
}
|
||||||
@ -117,7 +123,7 @@ const TestSession = function(socket, harness) {
|
|||||||
this.expectedId_ = 1;
|
this.expectedId_ = 1;
|
||||||
this.lastMessageResponseCallback_ = null;
|
this.lastMessageResponseCallback_ = null;
|
||||||
|
|
||||||
let buffer = Buffer.from('');
|
let buffer = new Buffer(0);
|
||||||
socket.on('data', (data) => {
|
socket.on('data', (data) => {
|
||||||
buffer = Buffer.concat([buffer, data]);
|
buffer = Buffer.concat([buffer, data]);
|
||||||
let consumed;
|
let consumed;
|
||||||
@ -195,9 +201,10 @@ TestSession.prototype.sendInspectorCommands = function(commands) {
|
|||||||
timeoutId = setTimeout(() => {
|
timeoutId = setTimeout(() => {
|
||||||
let s = '';
|
let s = '';
|
||||||
for (const id in this.messages_) {
|
for (const id in this.messages_) {
|
||||||
s += this.messages_[id] + '\n';
|
s += id + ', ';
|
||||||
}
|
}
|
||||||
common.fail(s.substring(0, s.length - 1));
|
common.fail('Messages without response: ' +
|
||||||
|
s.substring(0, s.length - 2));
|
||||||
}, TIMEOUT);
|
}, TIMEOUT);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -269,6 +276,7 @@ TestSession.prototype.disconnect = function(childDone) {
|
|||||||
this.harness_.childInstanceDone =
|
this.harness_.childInstanceDone =
|
||||||
this.harness_.childInstanceDone || childDone;
|
this.harness_.childInstanceDone || childDone;
|
||||||
this.socket_.end();
|
this.socket_.end();
|
||||||
|
console.log('[test]', 'Connection terminated');
|
||||||
callback();
|
callback();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
@ -293,7 +301,7 @@ const Harness = function(port, childProcess) {
|
|||||||
if (!filter(message)) pending.push(filter);
|
if (!filter(message)) pending.push(filter);
|
||||||
this.stderrFilters_ = pending;
|
this.stderrFilters_ = pending;
|
||||||
}));
|
}));
|
||||||
childProcess.on('close', (code, signal) => {
|
childProcess.on('exit', (code, signal) => {
|
||||||
assert(this.childInstanceDone, 'Child instance died prematurely');
|
assert(this.childInstanceDone, 'Child instance died prematurely');
|
||||||
this.returnCode_ = code;
|
this.returnCode_ = code;
|
||||||
this.running_ = false;
|
this.running_ = false;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user