GVL Instrumentation API: add STARTED and EXITED events

[Feature #18339]

After experimenting with the initial version of the API I figured there is a need
for an exit event to cleanup instrumentation data. e.g. if you record data in a
{thread_id -> data} table, you need to free associated data when a thread goes away.
This commit is contained in:
Jean Boussier 2022-06-15 14:37:41 +02:00
parent 20d4168250
commit b6c1e1158d
Notes: git 2022-06-17 16:08:50 +09:00
7 changed files with 57 additions and 31 deletions

View File

@ -2,22 +2,30 @@
#include "ruby/atomic.h" #include "ruby/atomic.h"
#include "ruby/thread.h" #include "ruby/thread.h"
static rb_atomic_t acquire_enter_count = 0; static rb_atomic_t started_count = 0;
static rb_atomic_t acquire_exit_count = 0; static rb_atomic_t ready_count = 0;
static rb_atomic_t release_count = 0; static rb_atomic_t resumed_count = 0;
static rb_atomic_t suspended_count = 0;
static rb_atomic_t exited_count = 0;
void void
ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *user_data) ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *user_data)
{ {
switch(event) { switch(event) {
case RUBY_INTERNAL_THREAD_EVENT_STARTED:
RUBY_ATOMIC_INC(started_count);
break;
case RUBY_INTERNAL_THREAD_EVENT_READY: case RUBY_INTERNAL_THREAD_EVENT_READY:
RUBY_ATOMIC_INC(acquire_enter_count); RUBY_ATOMIC_INC(ready_count);
break; break;
case RUBY_INTERNAL_THREAD_EVENT_RESUMED: case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
RUBY_ATOMIC_INC(acquire_exit_count); RUBY_ATOMIC_INC(resumed_count);
break; break;
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED: case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
RUBY_ATOMIC_INC(release_count); RUBY_ATOMIC_INC(suspended_count);
break;
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
RUBY_ATOMIC_INC(exited_count);
break; break;
} }
} }
@ -27,19 +35,23 @@ static rb_internal_thread_event_hook_t * single_hook = NULL;
static VALUE static VALUE
thread_counters(VALUE thread) thread_counters(VALUE thread)
{ {
VALUE array = rb_ary_new2(3); VALUE array = rb_ary_new2(5);
rb_ary_push(array, UINT2NUM(acquire_enter_count)); rb_ary_push(array, UINT2NUM(started_count));
rb_ary_push(array, UINT2NUM(acquire_exit_count)); rb_ary_push(array, UINT2NUM(ready_count));
rb_ary_push(array, UINT2NUM(release_count)); rb_ary_push(array, UINT2NUM(resumed_count));
rb_ary_push(array, UINT2NUM(suspended_count));
rb_ary_push(array, UINT2NUM(exited_count));
return array; return array;
} }
static VALUE static VALUE
thread_reset_counters(VALUE thread) thread_reset_counters(VALUE thread)
{ {
RUBY_ATOMIC_SET(acquire_enter_count, 0); RUBY_ATOMIC_SET(started_count, 0);
RUBY_ATOMIC_SET(acquire_exit_count, 0); RUBY_ATOMIC_SET(ready_count, 0);
RUBY_ATOMIC_SET(release_count, 0); RUBY_ATOMIC_SET(resumed_count, 0);
RUBY_ATOMIC_SET(suspended_count, 0);
RUBY_ATOMIC_SET(exited_count, 0);
return Qtrue; return Qtrue;
} }
@ -48,7 +60,11 @@ thread_register_callback(VALUE thread)
{ {
single_hook = rb_internal_thread_add_event_hook( single_hook = rb_internal_thread_add_event_hook(
*ex_callback, *ex_callback,
RUBY_INTERNAL_THREAD_EVENT_READY | RUBY_INTERNAL_THREAD_EVENT_RESUMED | RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, RUBY_INTERNAL_THREAD_EVENT_STARTED |
RUBY_INTERNAL_THREAD_EVENT_READY |
RUBY_INTERNAL_THREAD_EVENT_RESUMED |
RUBY_INTERNAL_THREAD_EVENT_SUSPENDED |
RUBY_INTERNAL_THREAD_EVENT_EXITED,
NULL NULL
); );

View File

@ -190,10 +190,12 @@ void *rb_nogvl(void *(*func)(void *), void *data1,
*/ */
#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS_ #define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS_
#define RUBY_INTERNAL_THREAD_EVENT_READY 0x01 /** acquiring GVL */ #define RUBY_INTERNAL_THREAD_EVENT_STARTED 1 << 0 /** thread started */
#define RUBY_INTERNAL_THREAD_EVENT_RESUMED 0x02 /** acquired GVL */ #define RUBY_INTERNAL_THREAD_EVENT_READY 1 << 1 /** acquiring GVL */
#define RUBY_INTERNAL_THREAD_EVENT_SUSPENDED 0x04 /** released GVL */ #define RUBY_INTERNAL_THREAD_EVENT_RESUMED 1 << 2 /** acquired GVL */
#define RUBY_INTERNAL_THREAD_EVENT_MASK 0x07 /** All Thread events */ #define RUBY_INTERNAL_THREAD_EVENT_SUSPENDED 1 << 3 /** released GVL */
#define RUBY_INTERNAL_THREAD_EVENT_EXITED 1 << 4 /** thread terminated */
#define RUBY_INTERNAL_THREAD_EVENT_MASK 0xff /** All Thread events */
typedef void rb_internal_thread_event_data_t; // for future extension. typedef void rb_internal_thread_event_data_t; // for future extension.

View File

@ -44,8 +44,10 @@ class TestThreadInstrumentation < Test::Unit::TestCase
counters = Marshal.load(read_pipe) counters = Marshal.load(read_pipe)
read_pipe.close read_pipe.close
counters.each do |c| counters.each do |c|
assert_predicate c,:nonzero?, "Call counters: #{counters.inspect}" assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}"
end end
assert_equal counters.first, counters.last # exited as many times as we entered
ensure ensure
Bug::ThreadInstrumentation::unregister_callback Bug::ThreadInstrumentation::unregister_callback
end end

View File

@ -631,7 +631,6 @@ thread_do_start(rb_thread_t *th)
} }
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
#define thread_sched_to_dead thread_sched_to_waiting
static int static int
thread_start_func_2(rb_thread_t *th, VALUE *stack_start) thread_start_func_2(rb_thread_t *th, VALUE *stack_start)

View File

@ -30,6 +30,8 @@ thread_sched_to_waiting(struct rb_thread_sched *sched)
{ {
} }
#define thread_sched_to_dead thread_sched_to_waiting
static void static void
thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th) thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
{ {

View File

@ -109,6 +109,8 @@ struct rb_internal_thread_event_hook {
static rb_internal_thread_event_hook_t *rb_internal_thread_event_hooks = NULL; static rb_internal_thread_event_hook_t *rb_internal_thread_event_hooks = NULL;
static pthread_rwlock_t rb_internal_thread_event_hooks_rw_lock = PTHREAD_RWLOCK_INITIALIZER; static pthread_rwlock_t rb_internal_thread_event_hooks_rw_lock = PTHREAD_RWLOCK_INITIALIZER;
#define RB_INTERNAL_THREAD_HOOK(event) if (rb_internal_thread_event_hooks) { rb_thread_execute_hooks(event); }
rb_internal_thread_event_hook_t * rb_internal_thread_event_hook_t *
rb_internal_thread_add_event_hook(rb_internal_thread_event_callback callback, rb_event_flag_t internal_event, void *user_data) rb_internal_thread_add_event_hook(rb_internal_thread_event_callback callback, rb_event_flag_t internal_event, void *user_data)
{ {
@ -377,10 +379,7 @@ thread_sched_to_ready_common(struct rb_thread_sched *sched, rb_thread_t *th)
static void static void
thread_sched_to_running_common(struct rb_thread_sched *sched, rb_thread_t *th) thread_sched_to_running_common(struct rb_thread_sched *sched, rb_thread_t *th)
{ {
if (rb_internal_thread_event_hooks) { RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_READY);
rb_thread_execute_hooks(RUBY_INTERNAL_THREAD_EVENT_READY);
}
if (sched->running) { if (sched->running) {
VM_ASSERT(th->unblock.func == 0 && VM_ASSERT(th->unblock.func == 0 &&
"we must not be in ubf_list and GVL readyq at the same time"); "we must not be in ubf_list and GVL readyq at the same time");
@ -412,9 +411,7 @@ thread_sched_to_running_common(struct rb_thread_sched *sched, rb_thread_t *th)
// ready -> running // ready -> running
sched->running = th; sched->running = th;
if (rb_internal_thread_event_hooks) { RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED);
rb_thread_execute_hooks(RUBY_INTERNAL_THREAD_EVENT_RESUMED);
}
if (!sched->timer) { if (!sched->timer) {
if (!designate_timer_thread(sched) && !ubf_threads_empty()) { if (!designate_timer_thread(sched) && !ubf_threads_empty()) {
@ -434,10 +431,6 @@ thread_sched_to_running(struct rb_thread_sched *sched, rb_thread_t *th)
static rb_thread_t * static rb_thread_t *
thread_sched_to_waiting_common(struct rb_thread_sched *sched) thread_sched_to_waiting_common(struct rb_thread_sched *sched)
{ {
if (rb_internal_thread_event_hooks) {
rb_thread_execute_hooks(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED);
}
rb_thread_t *next; rb_thread_t *next;
sched->running = NULL; sched->running = NULL;
next = ccan_list_top(&sched->readyq, rb_thread_t, sched.node.readyq); next = ccan_list_top(&sched->readyq, rb_thread_t, sched.node.readyq);
@ -449,11 +442,19 @@ thread_sched_to_waiting_common(struct rb_thread_sched *sched)
static void static void
thread_sched_to_waiting(struct rb_thread_sched *sched) thread_sched_to_waiting(struct rb_thread_sched *sched)
{ {
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED);
rb_native_mutex_lock(&sched->lock); rb_native_mutex_lock(&sched->lock);
thread_sched_to_waiting_common(sched); thread_sched_to_waiting_common(sched);
rb_native_mutex_unlock(&sched->lock); rb_native_mutex_unlock(&sched->lock);
} }
static void
thread_sched_to_dead(struct rb_thread_sched *sched)
{
thread_sched_to_waiting(sched);
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_EXITED);
}
static void static void
thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th) thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
{ {
@ -1173,6 +1174,8 @@ thread_start_func_1(void *th_ptr)
native_thread_init(th->nt); native_thread_init(th->nt);
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_STARTED);
/* run */ /* run */
#if defined USE_NATIVE_THREAD_INIT #if defined USE_NATIVE_THREAD_INIT
thread_start_func_2(th, th->ec->machine.stack_start); thread_start_func_2(th, th->ec->machine.stack_start);

View File

@ -135,6 +135,8 @@ thread_sched_to_waiting(struct rb_thread_sched *sched)
ReleaseMutex(sched->lock); ReleaseMutex(sched->lock);
} }
#define thread_sched_to_dead thread_sched_to_waiting
static void static void
thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th) thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th)
{ {