From f4a8db647ae66621f5d37402f5a11a3d57c69bb0 Mon Sep 17 00:00:00 2001 From: ko1 Date: Thu, 19 Jul 2012 14:19:40 +0000 Subject: [PATCH] * thread.c (rb_thread_s_control_interrupt, rb_thread_s_check_interrupt): added for Thread.control_intgerrupt and Thread.check_interrupt. See details on rdoc. I'll make an ticket for this feature. * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt. * thread.c (rb_threadptr_raise): make a new exception object even if argc is 0. * thread.c (rb_thread_kill): kill thread immediately if target thread is current thread. * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added. CHECK_INTS while/after blocking operation. * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr. * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS(). * eval.c (ruby_cleanup): ditto. * insns.def: ditto. * process.c (rb_waitpid): ditto. * vm_eval.c (vm_call0): ditto. * vm_insnhelper.c (vm_call_method): ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36470 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 33 +++ cont.c | 2 +- eval.c | 2 +- insns.def | 12 +- process.c | 2 +- test/ruby/test_thread.rb | 66 +++++ thread.c | 515 +++++++++++++++++++++++++++++++-------- vm_core.h | 14 +- vm_eval.c | 4 +- vm_insnhelper.c | 2 +- 10 files changed, 526 insertions(+), 126 deletions(-) diff --git a/ChangeLog b/ChangeLog index 600cdb958e..8d3f4a35a3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,36 @@ +Thu Jul 19 15:08:40 2012 Koichi Sasada + + * thread.c (rb_thread_s_control_interrupt, + rb_thread_s_check_interrupt): added for + Thread.control_intgerrupt and Thread.check_interrupt. + See details on rdoc. + I'll make an ticket for this feature. + + * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt. + + * thread.c (rb_threadptr_raise): make a new exception object + even if argc is 0. + + * thread.c (rb_thread_kill): kill thread immediately if target thread + is current thread. + + * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added. + CHECK_INTS while/after blocking operation. + + * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr. + + * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS(). + + * eval.c (ruby_cleanup): ditto. + + * insns.def: ditto. + + * process.c (rb_waitpid): ditto. + + * vm_eval.c (vm_call0): ditto. + + * vm_insnhelper.c (vm_call_method): ditto. + Thu Jul 19 22:46:48 2012 Tanaka Akira * test/ruby/test_io.rb: remove temporally files early. diff --git a/cont.c b/cont.c index 1da0ba4705..58b2d303ac 100644 --- a/cont.c +++ b/cont.c @@ -1328,7 +1328,7 @@ fiber_switch(VALUE fibval, int argc, VALUE *argv, int is_resume) rb_bug("rb_fiber_resume: unreachable"); } #endif - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); return value; } diff --git a/eval.c b/eval.c index 411e50727f..14e0c5ce53 100644 --- a/eval.c +++ b/eval.c @@ -160,7 +160,7 @@ ruby_cleanup(volatile int ex) rb_threadptr_check_signal(th); PUSH_TAG(); if ((state = EXEC_TAG()) == 0) { - SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(); }); + SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(th); }); } POP_TAG(); diff --git a/insns.def b/insns.def index aa5b0eaf9f..1a7bc93754 100644 --- a/insns.def +++ b/insns.def @@ -1086,7 +1086,7 @@ leave } } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); if (UNLIKELY(VM_FRAME_TYPE_FINISH_P(GET_CFP()))) { #if OPT_CALL_THREADED_CODE @@ -1117,7 +1117,7 @@ throw (VALUE throwobj) (VALUE val) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); val = vm_throw(th, GET_CFP(), throw_state, throwobj); THROW_EXCEPTION(val); /* unreachable */ @@ -1138,7 +1138,7 @@ jump () () { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } @@ -1154,7 +1154,7 @@ branchif () { if (RTEST(val)) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } } @@ -1171,7 +1171,7 @@ branchunless () { if (!RTEST(val)) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } } @@ -1220,7 +1220,7 @@ onceinlinecache } else if (ic->ic_value.value == Qundef) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); rb_thread_schedule(); goto retry; } diff --git a/process.c b/process.c index 90eaa0e2e7..3aacc5d60b 100644 --- a/process.c +++ b/process.c @@ -663,7 +663,7 @@ rb_waitpid(rb_pid_t pid, int *st, int flags) RUBY_UBF_PROCESS, 0); if (result < 0) { if (errno == EINTR) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(GET_THREAD()); goto retry; } return (rb_pid_t)-1; diff --git a/test/ruby/test_thread.rb b/test/ruby/test_thread.rb index 946d522cd9..ed22cf9b44 100644 --- a/test/ruby/test_thread.rb +++ b/test/ruby/test_thread.rb @@ -615,6 +615,72 @@ class TestThread < Test::Unit::TestCase end assert_equal("Can't call on top of Fiber or Thread", error.message, bug5083) end + + def make_control_interrupt_test_thread1 flag + r = [] + q = Queue.new + th = Thread.new{ + begin + Thread.control_interrupt(RuntimeError => flag){ + q << :go + begin + sleep 0.5 + rescue + r << :c1 + end + } + sleep 0.5 + rescue + r << :c2 + end + } + q.pop # wait + th.raise + begin + th.join + rescue + r << :c3 + end + r + end + + def test_control_interrupt + [[:never, :c2], + [:immediate, :c1], + [:on_blocking, :c1]].each{|(flag, c)| + assert_equal([flag, c], [flag] + make_control_interrupt_test_thread1(flag)) + } + # TODO: complex cases are needed. + end + + def test_check_interrupt + q = Queue.new + Thread.control_interrupt(RuntimeError => :never){ + th = Thread.new{ + q.push :e + begin + begin + sleep 0.5 + rescue => e + q.push :ng1 + end + begin + Thread.check_interrupt + rescue => e + q.push :ok + end + rescue => e + q.push :ng2 + ensure + q.push :ng3 + end + } + q.pop + th.raise + th.join + assert_equal(:ok, q.pop) + } + end end class TestThreadGroup < Test::Unit::TestCase diff --git a/thread.c b/thread.c index 6cd95696ef..acd9ac4a0b 100644 --- a/thread.c +++ b/thread.c @@ -265,7 +265,7 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, struct rb_unblock_callback *old) { check_ints: - RUBY_VM_CHECK_INTS(); /* check signal or so */ + RUBY_VM_CHECK_INTS(th); /* check signal or so */ native_mutex_lock(&th->interrupt_lock); if (th->interrupt_flag) { native_mutex_unlock(&th->interrupt_lock); @@ -545,7 +545,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s static VALUE thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) { - rb_thread_t *th; + rb_thread_t *th, *current_th = GET_THREAD(); int err; if (OBJ_FROZEN(GET_THREAD()->thgroup)) { @@ -559,12 +559,12 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) th->first_proc = fn ? Qfalse : rb_block_proc(); th->first_args = args; /* GC: shouldn't put before above line */ - th->priority = GET_THREAD()->priority; - th->thgroup = GET_THREAD()->thgroup; + th->priority = current_th->priority; + th->thgroup = current_th->thgroup; th->async_errinfo_queue = rb_ary_new(); th->async_errinfo_queue_checked = 0; - th->async_errinfo_mask_stack = rb_ary_new(); + th->async_errinfo_mask_stack = rb_ary_dup(current_th->async_errinfo_mask_stack); native_mutex_initialize(&th->interrupt_lock); if (GET_VM()->event_hooks != NULL) @@ -859,7 +859,7 @@ sleep_forever(rb_thread_t *th, int deadlockable) if (deadlockable) { th->vm->sleeper--; } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } while (th->status == status); th->status = prev_status; } @@ -896,7 +896,7 @@ sleep_timeval(rb_thread_t *th, struct timeval tv) th->status = THREAD_STOPPED; do { native_sleep(th, &tv); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); getclockofday(&tvn); if (to.tv_sec < tvn.tv_sec) break; if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; @@ -968,9 +968,9 @@ rb_thread_wait_for(struct timeval time) void rb_thread_polling(void) { - RUBY_VM_CHECK_INTS(); if (!rb_thread_alone()) { rb_thread_t *th = GET_THREAD(); + RUBY_VM_CHECK_INTS_BLOCKING(th); sleep_for_polling(th); } } @@ -985,7 +985,7 @@ rb_thread_polling(void) void rb_thread_check_ints(void) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD()); } /* @@ -1013,7 +1013,7 @@ rb_thread_sleep(int sec) rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); } -static void rb_threadptr_execute_interrupts_common(rb_thread_t *); +static void rb_threadptr_execute_interrupts_common(rb_thread_t *, int blocking); static void rb_thread_schedule_limits(unsigned long limits_us) @@ -1040,7 +1040,7 @@ rb_thread_schedule(void) rb_thread_schedule_limits(0); if (UNLIKELY(GET_THREAD()->interrupt_flag)) { - rb_threadptr_execute_interrupts_common(GET_THREAD()); + rb_threadptr_execute_interrupts_common(GET_THREAD(), 0); } } @@ -1076,7 +1076,7 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) rb_thread_t *th = GET_THREAD(); blocking_region_end(th, region); xfree(region); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; } @@ -1181,7 +1181,7 @@ rb_thread_call_without_gvl2(void *(*func)(void *data, int *skip_checkints), void }, ubf, data2); if (!skip_checkints) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } errno = saved_errno; @@ -1237,7 +1237,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) JUMP_TAG(state); } /* TODO: check func() */ - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; @@ -1350,86 +1350,6 @@ thread_s_pass(VALUE klass) return Qnil; } -/* - * - */ - -static void -rb_threadptr_execute_interrupts_common(rb_thread_t *th) -{ - rb_atomic_t interrupt; - - if (th->raised_flag) return; - - while ((interrupt = ATOMIC_EXCHANGE(th->interrupt_flag, 0)) != 0) { - enum rb_thread_status status = th->status; - int timer_interrupt = interrupt & 0x01; - int finalizer_interrupt = interrupt & 0x04; - int sig; - - th->status = THREAD_RUNNABLE; - - /* signal handling */ - if (th == th->vm->main_thread) { - while ((sig = rb_get_next_signal()) != 0) { - rb_signal_exec(th, sig); - } - } - - /* exception from another thread */ - if (rb_threadptr_async_errinfo_active_p(th)) { - VALUE err = rb_threadptr_async_errinfo_deque(th); - thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); - - if (err == eKillSignal /* Thread#kill receieved */ || - err == eTerminateSignal /* Terminate thread */ ) { - rb_threadptr_async_errinfo_clear(th); - th->status = THREAD_TO_KILL; - th->errinfo = INT2FIX(TAG_FATAL); - TH_JUMP_TAG(th, TAG_FATAL); - } - else { - rb_exc_raise(err); - } - } - th->status = status; - - if (finalizer_interrupt) { - rb_gc_finalize_deferred(); - } - - if (timer_interrupt) { - unsigned long limits_us = TIME_QUANTUM_USEC; - - if (th->priority > 0) - limits_us <<= th->priority; - else - limits_us >>= -th->priority; - - if (status == THREAD_RUNNABLE) - th->running_time_us += TIME_QUANTUM_USEC; - - EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0); - - rb_thread_schedule_limits(limits_us); - } - } -} - -void -rb_threadptr_execute_interrupts(rb_thread_t *th) -{ - rb_threadptr_execute_interrupts_common(th); -} - -void -rb_thread_execute_interrupts(VALUE thval) -{ - rb_thread_t *th; - GetThreadPtr(thval, th); - rb_threadptr_execute_interrupts_common(th); -} - /*****************************************************/ /* @@ -1461,27 +1381,391 @@ rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) th->async_errinfo_queue_checked = 0; } -VALUE -rb_threadptr_async_errinfo_deque(rb_thread_t *th) +enum interrupt_timing { + INTERRUPT_NONE, + INTERRUPT_IMMEDIATE, + INTERRUPT_ON_BLOCKING, + INTERRUPT_NEVER +}; + +static enum interrupt_timing +rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err) { + VALUE mask; + long mask_stack_len = RARRAY_LEN(th->async_errinfo_mask_stack); + VALUE *mask_stack = RARRAY_PTR(th->async_errinfo_mask_stack); + VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */ + long ancestors_len = RARRAY_LEN(ancestors); + VALUE *ancestors_ptr = RARRAY_PTR(ancestors); + int i, j; + + for (i=0; iasync_errinfo_queue) == 0; +} + +static VALUE +rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum interrupt_timing timing) +{ +#if 1 /* 1 to enable Thread#control_interrupt, 0 to ignore it */ + int i; + + for (i=0; iasync_errinfo_queue); i++) { + VALUE err = RARRAY_PTR(th->async_errinfo_queue)[i]; + + enum interrupt_timing mask_timing = rb_threadptr_async_errinfo_check_mask(th, CLASS_OF(err)); + + switch (mask_timing) { + case INTERRUPT_ON_BLOCKING: + if (timing != INTERRUPT_ON_BLOCKING) { + break; + } + /* fall through */ + case INTERRUPT_NONE: /* default: IMMEDIATE */ + case INTERRUPT_IMMEDIATE: + rb_ary_delete_at(th->async_errinfo_queue, i); + return err; + case INTERRUPT_NEVER: + break; + } + } + + th->async_errinfo_queue_checked = 1; + return Qundef; +#else VALUE err = rb_ary_shift(th->async_errinfo_queue); - if (RARRAY_LEN(th->async_errinfo_queue) == 0) { + if (rb_threadptr_async_errinfo_empty_p(th)) { th->async_errinfo_queue_checked = 1; } return err; +#endif } int rb_threadptr_async_errinfo_active_p(rb_thread_t *th) { - if (th->async_errinfo_queue_checked) { + if (th->async_errinfo_queue_checked || rb_threadptr_async_errinfo_empty_p(th)) { return 0; } else { - return RARRAY_LEN(th->async_errinfo_queue) > 0; + return 1; } } +static VALUE +rb_threadptr_interrupt_mask(rb_thread_t *th, VALUE mask, VALUE (*func)(rb_thread_t *th)) +{ + VALUE r = Qnil; + int state; + + rb_ary_push(th->async_errinfo_mask_stack, mask); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + TH_PUSH_TAG(th); + if ((state = EXEC_TAG()) == 0) { + r = func(th); + } + TH_POP_TAG(); + + rb_ary_pop(th->async_errinfo_mask_stack); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + if (state) { + JUMP_TAG(state); + } + + return r; +} + +/* + * call-seq: + * Thread.control_interrupt(hash) { ... } -> result of the block + * + * Thread.control_interrupt controls interrupt timing. + * + * _interrupt_ means asynchronous event and corresponding procedure + * by Thread#raise, Thread#kill, signal trap (not supported yet) + * and main thread termination (if main thread terminates, then all + * other thread will be killed). + * + * _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol + * is one of them: + * - :immediate Invoke interrupt immediately. + * - :on_blocking Invoke interrupt while _BlockingOperation_. + * - :never Never invoke interrupt. + * + * _BlockingOperation_ means that the operation will block the calling thread, + * such as read and write. On CRuby implementation, _BlockingOperation_ is + * operation executed without GVL. + * + * Masked interrupts are delayed until they are enabled. + * This method is similar to sigprocmask(3). + * + * TODO (DOC): control_interrupt is stacked. + * TODO (DOC): check ancestors. + * TODO (DOC): to prevent all interrupt, {Object => :never} works. + * + * NOTE: Asynchronous interrupts are difficult to use. + * If you need to communicate between threads, + * please consider to use another way such as Queue. + * Or use them with deep understanding about this method. + * + * + * # example: Guard from Thread#raise + * th = Thread.new do + * Thead.control_interrupt(RuntimeError => :never) { + * begin + * # Thread#raise doesn't interrupt here. + * # You can write resource allocation code safely. + * Thread.control_interrupt(RuntimeError => :immediate) { + * # ... + * # It is possible to be interrupted by Thread#raise. + * } + * ensure + * # Thread#raise doesn't interrupt here. + * # You can write resource dealocation code safely. + * end + * } + * end + * Thread.pass + * # ... + * th.raise "stop" + * + * # example: Guard from TimeoutError + * require 'timeout' + * Thread.control_interrupt(TimeoutError => :never) { + * timeout(10){ + * # TimeoutError doesn't occur here + * Thread.control_interrupt(TimeoutError => :on_blocking) { + * # possible to be killed by TimeoutError + * # while blocking operation + * } + * # TimeoutError doesn't occur here + * } + * } + * + * # example: Stack control settings + * Thread.control_interrupt(FooError => :never) { + * Thread.control_interrupt(BarError => :never) { + * # FooError and BarError are prohibited. + * } + * } + * + * # example: check ancestors + * Thread.control_interrupt(Exception => :never) { + * # all exceptions inherited from Exception are prohibited. + * } + * + */ + +static VALUE +control_interrupt_func(rb_thread_t *th) +{ + return rb_yield(Qnil); +} + +static VALUE +rb_thread_s_control_interrupt(VALUE self, VALUE mask_arg) +{ + if (!rb_block_given_p()) { + rb_raise(rb_eArgError, "block is needed."); + } + + return rb_threadptr_interrupt_mask(GET_THREAD(), + rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"), + control_interrupt_func); +} + +/* + * call-seq: + * Thread.check_interrupt() -> nil + * + * Check queued interrupts. + * + * If there are queued interrupts, process respective procedures. + * + * This method can be defined as the following Ruby code: + * + * def Thread.check_interrupt + * Thread.control_interrupt(Object => :immediate) { + * Thread.pass + * } + * end + * + * Examples: + * + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * Thread.check_interrupt + * ... + * end + * } + * } + * ... + * th.raise # stop thread + * + * NOTE: This example can be described by the another code. + * You need to keep to avoid asynchronous interrupts. + * + * flag = true + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * break if flag == false + * ... + * end + * } + * } + * ... + * flag = false # stop thread + */ + +static VALUE +check_interrupt_func(rb_thread_t *th) +{ + RUBY_VM_CHECK_INTS(th); + return Qnil; +} + +static VALUE +rb_thread_s_check_interrupt(VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + + if (!rb_threadptr_async_errinfo_empty_p(th)) { + VALUE mask = rb_hash_new(); + rb_hash_aset(mask, rb_cObject, ID2SYM(rb_intern("immediate"))); + rb_threadptr_interrupt_mask(GET_THREAD(), mask, check_interrupt_func); + } + + return Qnil; +} + +static void +rb_threadptr_to_kill(rb_thread_t *th) +{ + rb_threadptr_async_errinfo_clear(th); + th->status = THREAD_TO_KILL; + th->errinfo = INT2FIX(TAG_FATAL); + TH_JUMP_TAG(th, TAG_FATAL); +} + +static void +rb_threadptr_execute_interrupts_common(rb_thread_t *th, int blocking_timing) +{ + rb_atomic_t interrupt; + + if (th->raised_flag) return; + + while ((interrupt = ATOMIC_EXCHANGE(th->interrupt_flag, 0)) != 0) { + enum rb_thread_status status = th->status; + int timer_interrupt = interrupt & 0x01; + int finalizer_interrupt = interrupt & 0x04; + int sig; + + th->status = THREAD_RUNNABLE; + + /* signal handling */ + if (th == th->vm->main_thread) { + while ((sig = rb_get_next_signal()) != 0) { + rb_signal_exec(th, sig); + } + } + + /* exception from another thread */ + if (rb_threadptr_async_errinfo_active_p(th)) { + VALUE err = rb_threadptr_async_errinfo_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); + thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); + + if (err == Qundef) { + /* no error */ + } + else if (err == eKillSignal /* Thread#kill receieved */ || + err == eTerminateSignal /* Terminate thread */ ) { + rb_threadptr_to_kill(th); + } + else { + rb_exc_raise(err); + } + } + th->status = status; + + if (finalizer_interrupt) { + rb_gc_finalize_deferred(); + } + + if (timer_interrupt) { + unsigned long limits_us = TIME_QUANTUM_USEC; + + if (th->priority > 0) + limits_us <<= th->priority; + else + limits_us >>= -th->priority; + + if (status == THREAD_RUNNABLE) + th->running_time_us += TIME_QUANTUM_USEC; + + EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0); + + rb_thread_schedule_limits(limits_us); + } + } +} + +void +rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) +{ + rb_threadptr_execute_interrupts_common(th, blocking_timing); +} + +void +rb_thread_execute_interrupts(VALUE thval) +{ + rb_thread_t *th; + GetThreadPtr(thval, th); + rb_threadptr_execute_interrupts_common(th, 1); +} + static void rb_threadptr_ready(rb_thread_t *th) { @@ -1497,7 +1781,12 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) return Qnil; } - exc = rb_make_exception(argc, argv); + if (argc == 0) { + exc = rb_exc_new(rb_eRuntimeError, 0, 0); + } + else { + exc = rb_make_exception(argc, argv); + } rb_threadptr_async_errinfo_enque(th, exc); rb_threadptr_interrupt(th); return Qnil; @@ -1642,9 +1931,15 @@ rb_thread_kill(VALUE thread) thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); - rb_threadptr_async_errinfo_enque(th, eKillSignal); - th->status = THREAD_TO_KILL; - rb_threadptr_interrupt(th); + if (th == GET_THREAD()) { + /* kill myself immediately */ + rb_threadptr_to_kill(th); + } + else { + rb_threadptr_async_errinfo_enque(th, eKillSignal); + th->status = THREAD_TO_KILL; + rb_threadptr_interrupt(th); + } return thread; } @@ -1683,7 +1978,8 @@ rb_thread_s_kill(VALUE obj, VALUE th) static VALUE rb_thread_exit(void) { - return rb_thread_kill(GET_THREAD()->self); + rb_thread_t *th = GET_THREAD(); + return rb_thread_kill(th->self); } @@ -2704,7 +3000,7 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, if (result < 0) lerrno = errno; }, ubf_select, th); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = lerrno; @@ -2910,6 +3206,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) double limit = 0; struct timespec ts; struct timespec *timeout = NULL; + rb_thread_t *th = GET_THREAD(); if (tv) { ts.tv_sec = tv->tv_sec; @@ -2927,9 +3224,9 @@ retry: BLOCKING_REGION({ result = ppoll(&fds, 1, timeout, NULL); if (result < 0) lerrno = errno; - }, ubf_select, GET_THREAD()); + }, ubf_select, th); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); if (result < 0) { errno = lerrno; @@ -3673,7 +3970,7 @@ rb_mutex_lock(VALUE self) if (mutex->th == th) mutex_locked(th, self); if (interrupted) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } } } @@ -4755,6 +5052,8 @@ Init_Thread(void) rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); #endif + rb_define_singleton_method(rb_cThread, "control_interrupt", rb_thread_s_control_interrupt, 1); + rb_define_singleton_method(rb_cThread, "check_interrupt", rb_thread_s_check_interrupt, 1); rb_define_method(rb_cThread, "initialize", thread_initialize, -2); rb_define_method(rb_cThread, "raise", thread_raise_m, -1); diff --git a/vm_core.h b/vm_core.h index b6dd7982df..97d4410d73 100644 --- a/vm_core.h +++ b/vm_core.h @@ -769,25 +769,27 @@ void rb_signal_exec(rb_thread_t *th, int sig); void rb_threadptr_check_signal(rb_thread_t *mth); void rb_threadptr_signal_raise(rb_thread_t *th, int sig); void rb_threadptr_signal_exit(rb_thread_t *th); -void rb_threadptr_execute_interrupts(rb_thread_t *); +void rb_threadptr_execute_interrupts(rb_thread_t *, int); void rb_threadptr_interrupt(rb_thread_t *th); void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th); void rb_threadptr_async_errinfo_clear(rb_thread_t *th); void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v); -VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th); int rb_threadptr_async_errinfo_active_p(rb_thread_t *th); void rb_thread_lock_unlock(rb_thread_lock_t *); void rb_thread_lock_destroy(rb_thread_lock_t *); -#define RUBY_VM_CHECK_INTS_TH(th) do { \ +#define RUBY_VM_CHECK_INTS_BLOCKING(th) do { \ if (UNLIKELY((th)->interrupt_flag)) { \ - rb_threadptr_execute_interrupts(th); \ + rb_threadptr_execute_interrupts(th, 1); \ } \ } while (0) -#define RUBY_VM_CHECK_INTS() \ - RUBY_VM_CHECK_INTS_TH(GET_THREAD()) +#define RUBY_VM_CHECK_INTS(th) do { \ + if (UNLIKELY((th)->interrupt_flag)) { \ + rb_threadptr_execute_interrupts(th, 0); \ + } \ +} while (0) /* tracer */ void diff --git a/vm_eval.c b/vm_eval.c index ea52273d52..12e4eb4e0b 100644 --- a/vm_eval.c +++ b/vm_eval.c @@ -103,7 +103,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv, if (!klass || !(me = rb_method_entry(klass, id))) { return method_missing(recv, id, argc, argv, NOEX_SUPER); } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); if (!(def = me->def)) return Qnil; goto again; } @@ -138,7 +138,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv, rb_bug("vm_call0: unsupported method type (%d)", def->type); val = Qundef; } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); return val; } diff --git a/vm_insnhelper.c b/vm_insnhelper.c index e4a45e669d..1b23ee6e18 100644 --- a/vm_insnhelper.c +++ b/vm_insnhelper.c @@ -706,7 +706,7 @@ vm_call_method(rb_thread_t *th, rb_control_frame_t *cfp, } } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); return val; }