interrupt_exec
introduce - rb_threadptr_interrupt_exec - rb_ractor_interrupt_exec to intercept the thread/ractor execution.
This commit is contained in:
parent
29578773c2
commit
c8297c3eed
Notes:
git
2024-11-08 09:03:10 +00:00
@ -82,4 +82,25 @@ RUBY_SYMBOL_EXPORT_END
|
|||||||
int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing);
|
int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing);
|
||||||
bool rb_thread_mn_schedulable(VALUE thread);
|
bool rb_thread_mn_schedulable(VALUE thread);
|
||||||
|
|
||||||
|
// interrupt exec
|
||||||
|
|
||||||
|
typedef VALUE (rb_interrupt_exec_func_t)(void *data);
|
||||||
|
|
||||||
|
enum rb_interrupt_exec_flag {
|
||||||
|
rb_interrupt_exec_flag_none = 0x00,
|
||||||
|
rb_interrupt_exec_flag_value_data = 0x01,
|
||||||
|
};
|
||||||
|
|
||||||
|
// interrupt the target_th and run func.
|
||||||
|
struct rb_ractor_struct;
|
||||||
|
|
||||||
|
void rb_threadptr_interrupt_exec(struct rb_thread_struct *target_th,
|
||||||
|
rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags);
|
||||||
|
|
||||||
|
// create a thread in the target_r and run func on the created thread.
|
||||||
|
void rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
|
||||||
|
rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags);
|
||||||
|
|
||||||
|
void rb_threadptr_interrupt_exec_task_mark(struct rb_thread_struct *th);
|
||||||
|
|
||||||
#endif /* INTERNAL_THREAD_H */
|
#endif /* INTERNAL_THREAD_H */
|
||||||
|
190
thread.c
190
thread.c
@ -342,25 +342,33 @@ unblock_function_clear(rb_thread_t *th)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
|
threadptr_interrupt_locked(rb_thread_t *th, bool trap)
|
||||||
{
|
{
|
||||||
|
// th->interrupt_lock should be acquired here
|
||||||
|
|
||||||
RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);
|
RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);
|
||||||
|
|
||||||
|
if (trap) {
|
||||||
|
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
RUBY_VM_SET_INTERRUPT(th->ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (th->unblock.func != NULL) {
|
||||||
|
(th->unblock.func)(th->unblock.arg);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
/* none */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
threadptr_interrupt(rb_thread_t *th, int trap)
|
||||||
|
{
|
||||||
rb_native_mutex_lock(&th->interrupt_lock);
|
rb_native_mutex_lock(&th->interrupt_lock);
|
||||||
{
|
{
|
||||||
if (trap) {
|
threadptr_interrupt_locked(th, trap);
|
||||||
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
RUBY_VM_SET_INTERRUPT(th->ec);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (th->unblock.func != NULL) {
|
|
||||||
(th->unblock.func)(th->unblock.arg);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
/* none */
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
rb_native_mutex_unlock(&th->interrupt_lock);
|
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||||
}
|
}
|
||||||
@ -369,13 +377,13 @@ void
|
|||||||
rb_threadptr_interrupt(rb_thread_t *th)
|
rb_threadptr_interrupt(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
|
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
|
||||||
rb_threadptr_interrupt_common(th, 0);
|
threadptr_interrupt(th, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
threadptr_trap_interrupt(rb_thread_t *th)
|
threadptr_trap_interrupt(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
rb_threadptr_interrupt_common(th, 1);
|
threadptr_interrupt(th, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -490,6 +498,7 @@ rb_thread_terminate_all(rb_thread_t *th)
|
|||||||
}
|
}
|
||||||
|
|
||||||
void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
|
void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
|
||||||
|
static void threadptr_interrupt_exec_cleanup(rb_thread_t *th);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
thread_cleanup_func_before_exec(void *th_ptr)
|
thread_cleanup_func_before_exec(void *th_ptr)
|
||||||
@ -500,6 +509,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
|
|||||||
// The thread stack doesn't exist in the forked process:
|
// The thread stack doesn't exist in the forked process:
|
||||||
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
|
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
|
||||||
|
|
||||||
|
threadptr_interrupt_exec_cleanup(th);
|
||||||
rb_threadptr_root_fiber_terminate(th);
|
rb_threadptr_root_fiber_terminate(th);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2418,6 +2428,8 @@ threadptr_get_interrupts(rb_thread_t *th)
|
|||||||
return interrupt & (rb_atomic_t)~ec->interrupt_mask;
|
return interrupt & (rb_atomic_t)~ec->interrupt_mask;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void threadptr_interrupt_exec_exec(rb_thread_t *th);
|
||||||
|
|
||||||
int
|
int
|
||||||
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
||||||
{
|
{
|
||||||
@ -2449,17 +2461,29 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
|||||||
rb_postponed_job_flush(th->vm);
|
rb_postponed_job_flush(th->vm);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* signal handling */
|
if (trap_interrupt) {
|
||||||
if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
|
/* signal handling */
|
||||||
enum rb_thread_status prev_status = th->status;
|
if (th == th->vm->ractor.main_thread) {
|
||||||
|
enum rb_thread_status prev_status = th->status;
|
||||||
|
|
||||||
th->status = THREAD_RUNNABLE;
|
th->status = THREAD_RUNNABLE;
|
||||||
{
|
{
|
||||||
while ((sig = rb_get_next_signal()) != 0) {
|
while ((sig = rb_get_next_signal()) != 0) {
|
||||||
ret |= rb_signal_exec(th, sig);
|
ret |= rb_signal_exec(th, sig);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
th->status = prev_status;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ccan_list_empty(&th->interrupt_exec_tasks)) {
|
||||||
|
enum rb_thread_status prev_status = th->status;
|
||||||
|
|
||||||
|
th->status = THREAD_RUNNABLE;
|
||||||
|
{
|
||||||
|
threadptr_interrupt_exec_exec(th);
|
||||||
|
}
|
||||||
|
th->status = prev_status;
|
||||||
}
|
}
|
||||||
th->status = prev_status;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* exception from another thread */
|
/* exception from another thread */
|
||||||
@ -4700,6 +4724,7 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
|
|||||||
|
|
||||||
/* may be held by any thread in parent */
|
/* may be held by any thread in parent */
|
||||||
rb_native_mutex_initialize(&th->interrupt_lock);
|
rb_native_mutex_initialize(&th->interrupt_lock);
|
||||||
|
ccan_list_head_init(&th->interrupt_exec_tasks);
|
||||||
|
|
||||||
vm->fork_gen++;
|
vm->fork_gen++;
|
||||||
rb_ractor_sleeper_threads_clear(th->ractor);
|
rb_ractor_sleeper_threads_clear(th->ractor);
|
||||||
@ -5920,3 +5945,120 @@ rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_ke
|
|||||||
|
|
||||||
th->specific_storage[key] = data;
|
th->specific_storage[key] = data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// interrupt_exec
|
||||||
|
|
||||||
|
struct rb_interrupt_exec_task {
|
||||||
|
struct ccan_list_node node;
|
||||||
|
|
||||||
|
rb_interrupt_exec_func_t *func;
|
||||||
|
void *data;
|
||||||
|
enum rb_interrupt_exec_flag flags;
|
||||||
|
};
|
||||||
|
|
||||||
|
void
|
||||||
|
rb_threadptr_interrupt_exec_task_mark(rb_thread_t *th)
|
||||||
|
{
|
||||||
|
struct rb_interrupt_exec_task *task;
|
||||||
|
|
||||||
|
ccan_list_for_each(&th->interrupt_exec_tasks, task, node) {
|
||||||
|
if (task->flags & rb_interrupt_exec_flag_value_data) {
|
||||||
|
rb_gc_mark((VALUE)task->data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// native thread safe
|
||||||
|
// th should be available
|
||||||
|
void
|
||||||
|
rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
|
||||||
|
{
|
||||||
|
// should not use ALLOC
|
||||||
|
struct rb_interrupt_exec_task *task = ALLOC(struct rb_interrupt_exec_task);
|
||||||
|
*task = (struct rb_interrupt_exec_task) {
|
||||||
|
.flags = flags,
|
||||||
|
.func = func,
|
||||||
|
.data = data,
|
||||||
|
};
|
||||||
|
|
||||||
|
rb_native_mutex_lock(&th->interrupt_lock);
|
||||||
|
{
|
||||||
|
ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
|
||||||
|
threadptr_interrupt_locked(th, true);
|
||||||
|
}
|
||||||
|
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
threadptr_interrupt_exec_exec(rb_thread_t *th)
|
||||||
|
{
|
||||||
|
while (1) {
|
||||||
|
struct rb_interrupt_exec_task *task;
|
||||||
|
|
||||||
|
rb_native_mutex_lock(&th->interrupt_lock);
|
||||||
|
{
|
||||||
|
task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node);
|
||||||
|
}
|
||||||
|
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||||
|
|
||||||
|
if (task) {
|
||||||
|
(*task->func)(task->data);
|
||||||
|
ruby_xfree(task);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
threadptr_interrupt_exec_cleanup(rb_thread_t *th)
|
||||||
|
{
|
||||||
|
rb_native_mutex_lock(&th->interrupt_lock);
|
||||||
|
{
|
||||||
|
struct rb_interrupt_exec_task *task;
|
||||||
|
|
||||||
|
while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) {
|
||||||
|
ruby_xfree(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct interrupt_ractor_new_thread_data {
|
||||||
|
rb_interrupt_exec_func_t *func;
|
||||||
|
void *data;
|
||||||
|
};
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
interrupt_ractor_new_thread_func(void *data)
|
||||||
|
{
|
||||||
|
struct interrupt_ractor_new_thread_data d = *(struct interrupt_ractor_new_thread_data *)data;
|
||||||
|
ruby_xfree(data);
|
||||||
|
|
||||||
|
d.func(d.data);
|
||||||
|
return Qnil;
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
interrupt_ractor_func(void *data)
|
||||||
|
{
|
||||||
|
rb_thread_create(interrupt_ractor_new_thread_func, data);
|
||||||
|
return Qnil;
|
||||||
|
}
|
||||||
|
|
||||||
|
// native thread safe
|
||||||
|
// func/data should be native thread safe
|
||||||
|
void
|
||||||
|
rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
|
||||||
|
rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
|
||||||
|
{
|
||||||
|
struct interrupt_ractor_new_thread_data *d = ALLOC(struct interrupt_ractor_new_thread_data);
|
||||||
|
|
||||||
|
d->func = func;
|
||||||
|
d->data = data;
|
||||||
|
rb_thread_t *main_th = target_r->threads.main;
|
||||||
|
rb_threadptr_interrupt_exec(main_th, interrupt_ractor_func, d, flags);
|
||||||
|
|
||||||
|
// TODO MEMO: we can create a new thread in a ractor, but not sure how to do that now.
|
||||||
|
}
|
||||||
|
4
vm.c
4
vm.c
@ -3491,6 +3491,8 @@ thread_mark(void *ptr)
|
|||||||
|
|
||||||
rb_gc_mark(th->scheduler);
|
rb_gc_mark(th->scheduler);
|
||||||
|
|
||||||
|
rb_threadptr_interrupt_exec_task_mark(th);
|
||||||
|
|
||||||
RUBY_MARK_LEAVE("thread");
|
RUBY_MARK_LEAVE("thread");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3644,6 +3646,8 @@ th_init(rb_thread_t *th, VALUE self, rb_vm_t *vm)
|
|||||||
th->report_on_exception = vm->thread_report_on_exception;
|
th->report_on_exception = vm->thread_report_on_exception;
|
||||||
th->ext_config.ractor_safe = true;
|
th->ext_config.ractor_safe = true;
|
||||||
|
|
||||||
|
ccan_list_head_init(&th->interrupt_exec_tasks);
|
||||||
|
|
||||||
#if USE_RUBY_DEBUG_LOG
|
#if USE_RUBY_DEBUG_LOG
|
||||||
static rb_atomic_t thread_serial = 1;
|
static rb_atomic_t thread_serial = 1;
|
||||||
th->serial = RUBY_ATOMIC_FETCH_ADD(thread_serial, 1);
|
th->serial = RUBY_ATOMIC_FETCH_ADD(thread_serial, 1);
|
||||||
|
@ -1156,6 +1156,7 @@ typedef struct rb_thread_struct {
|
|||||||
struct rb_unblock_callback unblock;
|
struct rb_unblock_callback unblock;
|
||||||
VALUE locking_mutex;
|
VALUE locking_mutex;
|
||||||
struct rb_mutex_struct *keeping_mutexes;
|
struct rb_mutex_struct *keeping_mutexes;
|
||||||
|
struct ccan_list_head interrupt_exec_tasks;
|
||||||
|
|
||||||
struct rb_waiting_list *join_list;
|
struct rb_waiting_list *join_list;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user