* thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.
Async events such as an exception throwed by Thread#raise, Thread#kill and thread termination (after main thread termination) will be queued to th->async_errinfo_queue. - clear: clear the queue. - enque: enque err object into queue. - deque: deque err object from queue. - active_p: return 1 if the queue should be checked. rb_thread_t#thrown_errinfo was removed. * vm_core.h: add declarations of rb_threadptr_async_errinfo_*. remove rb_thread_t#thrown_errinfo field and add rb_thread_t#async_errinfo_queue (queue body: Array), rb_thread_t#async_errinfo_queue_checked (flag), rb_thread_t#async_errinfo_mask_stack(Array, not used yet). * vm.c (rb_thread_mark): fix a mark function. * cont.c (rb_fiber_start): enque an error. * process.c (after_fork): clear async errinfo queue. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36430 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
18c04b880a
commit
28144433b2
24
ChangeLog
24
ChangeLog
@ -1,3 +1,27 @@
|
|||||||
|
Wed Jul 18 14:16:51 2012 Koichi Sasada <ko1@atdot.net>
|
||||||
|
|
||||||
|
* thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.
|
||||||
|
Async events such as an exception throwed by Thread#raise,
|
||||||
|
Thread#kill and thread termination (after main thread termination)
|
||||||
|
will be queued to th->async_errinfo_queue.
|
||||||
|
- clear: clear the queue.
|
||||||
|
- enque: enque err object into queue.
|
||||||
|
- deque: deque err object from queue.
|
||||||
|
- active_p: return 1 if the queue should be checked.
|
||||||
|
rb_thread_t#thrown_errinfo was removed.
|
||||||
|
|
||||||
|
* vm_core.h: add declarations of rb_threadptr_async_errinfo_*.
|
||||||
|
remove rb_thread_t#thrown_errinfo field and
|
||||||
|
add rb_thread_t#async_errinfo_queue (queue body: Array),
|
||||||
|
rb_thread_t#async_errinfo_queue_checked (flag),
|
||||||
|
rb_thread_t#async_errinfo_mask_stack(Array, not used yet).
|
||||||
|
|
||||||
|
* vm.c (rb_thread_mark): fix a mark function.
|
||||||
|
|
||||||
|
* cont.c (rb_fiber_start): enque an error.
|
||||||
|
|
||||||
|
* process.c (after_fork): clear async errinfo queue.
|
||||||
|
|
||||||
Wed Jul 18 14:25:55 2012 URABE Shyouhei <shyouhei@ruby-lang.org>
|
Wed Jul 18 14:25:55 2012 URABE Shyouhei <shyouhei@ruby-lang.org>
|
||||||
|
|
||||||
* pack.c: (ditto) bitwise operations are not char. Apply explicit
|
* pack.c: (ditto) bitwise operations are not char. Apply explicit
|
||||||
|
6
cont.c
6
cont.c
@ -1164,11 +1164,11 @@ rb_fiber_start(void)
|
|||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
if (state == TAG_RAISE) {
|
if (state == TAG_RAISE) {
|
||||||
th->thrown_errinfo = th->errinfo;
|
rb_threadptr_async_errinfo_enque(th, th->errinfo);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
th->thrown_errinfo =
|
VALUE err = rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
|
||||||
rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
|
rb_threadptr_async_errinfo_enque(th, err);
|
||||||
}
|
}
|
||||||
RUBY_VM_SET_INTERRUPT(th);
|
RUBY_VM_SET_INTERRUPT(th);
|
||||||
}
|
}
|
||||||
|
@ -1055,7 +1055,7 @@ after_exec(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#define before_fork() before_exec()
|
#define before_fork() before_exec()
|
||||||
#define after_fork() (GET_THREAD()->thrown_errinfo = 0, after_exec())
|
#define after_fork() (rb_threadptr_async_errinfo_clear(GET_THREAD()), after_exec())
|
||||||
|
|
||||||
#include "dln.h"
|
#include "dln.h"
|
||||||
|
|
||||||
|
143
thread.c
143
thread.c
@ -68,7 +68,6 @@ static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec);
|
|||||||
static void sleep_forever(rb_thread_t *th, int nodeadlock);
|
static void sleep_forever(rb_thread_t *th, int nodeadlock);
|
||||||
static double timeofday(void);
|
static double timeofday(void);
|
||||||
static int rb_threadptr_dead(rb_thread_t *th);
|
static int rb_threadptr_dead(rb_thread_t *th);
|
||||||
|
|
||||||
static void rb_check_deadlock(rb_vm_t *vm);
|
static void rb_check_deadlock(rb_vm_t *vm);
|
||||||
|
|
||||||
#define eKillSignal INT2FIX(0)
|
#define eKillSignal INT2FIX(0)
|
||||||
@ -131,7 +130,6 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
|
|||||||
blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \
|
blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \
|
||||||
exec; \
|
exec; \
|
||||||
blocking_region_end(__th, &__region); \
|
blocking_region_end(__th, &__region); \
|
||||||
RUBY_VM_CHECK_INTS(); \
|
|
||||||
} while(0)
|
} while(0)
|
||||||
|
|
||||||
#if THREAD_DEBUG
|
#if THREAD_DEBUG
|
||||||
@ -313,9 +311,9 @@ terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread)
|
|||||||
|
|
||||||
if (th != main_thread) {
|
if (th != main_thread) {
|
||||||
thread_debug("terminate_i: %p\n", (void *)th);
|
thread_debug("terminate_i: %p\n", (void *)th);
|
||||||
rb_threadptr_interrupt(th);
|
rb_threadptr_async_errinfo_enque(th, eTerminateSignal);
|
||||||
th->thrown_errinfo = eTerminateSignal;
|
|
||||||
th->status = THREAD_TO_KILL;
|
th->status = THREAD_TO_KILL;
|
||||||
|
rb_threadptr_interrupt(th);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
thread_debug("terminate_i: main thread (%p)\n", (void *)th);
|
thread_debug("terminate_i: main thread (%p)\n", (void *)th);
|
||||||
@ -564,6 +562,10 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
|
|||||||
th->priority = GET_THREAD()->priority;
|
th->priority = GET_THREAD()->priority;
|
||||||
th->thgroup = GET_THREAD()->thgroup;
|
th->thgroup = GET_THREAD()->thgroup;
|
||||||
|
|
||||||
|
th->async_errinfo_queue = rb_ary_new();
|
||||||
|
th->async_errinfo_queue_checked = 0;
|
||||||
|
th->async_errinfo_mask_stack = rb_ary_new();
|
||||||
|
|
||||||
native_mutex_initialize(&th->interrupt_lock);
|
native_mutex_initialize(&th->interrupt_lock);
|
||||||
if (GET_VM()->event_hooks != NULL)
|
if (GET_VM()->event_hooks != NULL)
|
||||||
th->event_flags |= RUBY_EVENT_VM;
|
th->event_flags |= RUBY_EVENT_VM;
|
||||||
@ -1133,6 +1135,10 @@ rb_thread_call_without_gvl(void *(*func)(void *), void *data1,
|
|||||||
val = func(data1);
|
val = func(data1);
|
||||||
saved_errno = errno;
|
saved_errno = errno;
|
||||||
}, ubf, data2);
|
}, ubf, data2);
|
||||||
|
|
||||||
|
/* TODO: check */
|
||||||
|
RUBY_VM_CHECK_INTS();
|
||||||
|
|
||||||
errno = saved_errno;
|
errno = saved_errno;
|
||||||
|
|
||||||
return val;
|
return val;
|
||||||
@ -1144,13 +1150,28 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
|||||||
VALUE val;
|
VALUE val;
|
||||||
rb_thread_t *th = GET_THREAD();
|
rb_thread_t *th = GET_THREAD();
|
||||||
int saved_errno = 0;
|
int saved_errno = 0;
|
||||||
|
int state;
|
||||||
|
|
||||||
th->waiting_fd = fd;
|
th->waiting_fd = fd;
|
||||||
BLOCKING_REGION({
|
|
||||||
val = func(data1);
|
TH_PUSH_TAG(th);
|
||||||
saved_errno = errno;
|
if ((state = EXEC_TAG()) == 0) {
|
||||||
}, ubf_select, th);
|
BLOCKING_REGION({
|
||||||
|
val = func(data1);
|
||||||
|
saved_errno = errno;
|
||||||
|
}, ubf_select, th);
|
||||||
|
}
|
||||||
|
TH_POP_TAG();
|
||||||
|
|
||||||
|
/* clear waitinf_fd anytime */
|
||||||
th->waiting_fd = -1;
|
th->waiting_fd = -1;
|
||||||
|
|
||||||
|
if (state) {
|
||||||
|
JUMP_TAG(state);
|
||||||
|
}
|
||||||
|
/* TODO: check func() */
|
||||||
|
RUBY_VM_CHECK_INTS();
|
||||||
|
|
||||||
errno = saved_errno;
|
errno = saved_errno;
|
||||||
|
|
||||||
return val;
|
return val;
|
||||||
@ -1294,12 +1315,14 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* exception from another thread */
|
/* exception from another thread */
|
||||||
if (th->thrown_errinfo) {
|
if (rb_threadptr_async_errinfo_active_p(th)) {
|
||||||
VALUE err = th->thrown_errinfo;
|
VALUE err = rb_threadptr_async_errinfo_deque(th);
|
||||||
th->thrown_errinfo = 0;
|
|
||||||
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
|
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
|
||||||
|
|
||||||
if (err == eKillSignal || err == eTerminateSignal) {
|
if (err == eKillSignal /* Thread#kill receieved */ ||
|
||||||
|
err == eTerminateSignal /* Terminate thread */ ) {
|
||||||
|
rb_threadptr_async_errinfo_clear(th);
|
||||||
|
th->status = THREAD_TO_KILL;
|
||||||
th->errinfo = INT2FIX(TAG_FATAL);
|
th->errinfo = INT2FIX(TAG_FATAL);
|
||||||
TH_JUMP_TAG(th, TAG_FATAL);
|
TH_JUMP_TAG(th, TAG_FATAL);
|
||||||
}
|
}
|
||||||
@ -1353,6 +1376,59 @@ rb_gc_mark_threads(void)
|
|||||||
|
|
||||||
/*****************************************************/
|
/*****************************************************/
|
||||||
|
|
||||||
|
/*
|
||||||
|
* rb_threadptr_async_errinfo_* - manage async errors queue
|
||||||
|
*
|
||||||
|
* Async events such as an exception throwed by Thread#raise,
|
||||||
|
* Thread#kill and thread termination (after main thread termination)
|
||||||
|
* will be queued to th->async_errinfo_queue.
|
||||||
|
* - clear: clear the queue.
|
||||||
|
* - enque: enque err object into queue.
|
||||||
|
* - deque: deque err object from queue.
|
||||||
|
* - active_p: return 1 if the queue should be checked.
|
||||||
|
*
|
||||||
|
* All rb_threadptr_async_errinfo_* functions are called by
|
||||||
|
* a GVL acquired thread, of course.
|
||||||
|
* Note that all "rb_" prefix APIs need GVL to call.
|
||||||
|
*/
|
||||||
|
|
||||||
|
void
|
||||||
|
rb_threadptr_async_errinfo_clear(rb_thread_t *th)
|
||||||
|
{
|
||||||
|
rb_ary_clear(th->async_errinfo_queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
|
||||||
|
{
|
||||||
|
rb_ary_push(th->async_errinfo_queue, v);
|
||||||
|
th->async_errinfo_queue_checked = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
VALUE
|
||||||
|
rb_threadptr_async_errinfo_deque(rb_thread_t *th)
|
||||||
|
{
|
||||||
|
VALUE err = rb_ary_shift(th->async_errinfo_queue);
|
||||||
|
if (RARRAY_LEN(th->async_errinfo_queue) == 0) {
|
||||||
|
th->async_errinfo_queue_checked = 1;
|
||||||
|
}
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
|
||||||
|
{
|
||||||
|
if (th->async_errinfo_queue_checked) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return RARRAY_LEN(th->async_errinfo_queue) > 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
VALUE
|
||||||
|
rb_thread
|
||||||
|
|
||||||
static void
|
static void
|
||||||
rb_threadptr_ready(rb_thread_t *th)
|
rb_threadptr_ready(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
@ -1364,19 +1440,13 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
|
|||||||
{
|
{
|
||||||
VALUE exc;
|
VALUE exc;
|
||||||
|
|
||||||
again:
|
|
||||||
if (rb_threadptr_dead(th)) {
|
if (rb_threadptr_dead(th)) {
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (th->thrown_errinfo != 0 || th->raised_flag) {
|
|
||||||
rb_thread_schedule();
|
|
||||||
goto again;
|
|
||||||
}
|
|
||||||
|
|
||||||
exc = rb_make_exception(argc, argv);
|
exc = rb_make_exception(argc, argv);
|
||||||
th->thrown_errinfo = exc;
|
rb_threadptr_async_errinfo_enque(th, exc);
|
||||||
rb_threadptr_ready(th);
|
rb_threadptr_interrupt(th);
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1436,13 +1506,6 @@ rb_threadptr_reset_raised(rb_thread_t *th)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define THREAD_IO_WAITING_P(th) ( \
|
|
||||||
((th)->status == THREAD_STOPPED || \
|
|
||||||
(th)->status == THREAD_STOPPED_FOREVER) && \
|
|
||||||
(th)->blocking_region_buffer && \
|
|
||||||
(th)->unblock.func == ubf_select && \
|
|
||||||
1)
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
|
thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
|
||||||
{
|
{
|
||||||
@ -1450,14 +1513,10 @@ thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
|
|||||||
rb_thread_t *th;
|
rb_thread_t *th;
|
||||||
GetThreadPtr((VALUE)key, th);
|
GetThreadPtr((VALUE)key, th);
|
||||||
|
|
||||||
if (THREAD_IO_WAITING_P(th)) {
|
if (th->waiting_fd == fd) {
|
||||||
native_mutex_lock(&th->interrupt_lock);
|
VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
|
||||||
if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) {
|
rb_threadptr_async_errinfo_enque(th, err);
|
||||||
th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream];
|
rb_threadptr_interrupt(th);
|
||||||
RUBY_VM_SET_INTERRUPT(th);
|
|
||||||
(th->unblock.func)(th->unblock.arg);
|
|
||||||
}
|
|
||||||
native_mutex_unlock(&th->interrupt_lock);
|
|
||||||
}
|
}
|
||||||
return ST_CONTINUE;
|
return ST_CONTINUE;
|
||||||
}
|
}
|
||||||
@ -1530,10 +1589,9 @@ rb_thread_kill(VALUE thread)
|
|||||||
|
|
||||||
thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
|
thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
|
||||||
|
|
||||||
rb_threadptr_interrupt(th);
|
rb_threadptr_async_errinfo_enque(th, eKillSignal);
|
||||||
th->thrown_errinfo = eKillSignal;
|
|
||||||
th->status = THREAD_TO_KILL;
|
th->status = THREAD_TO_KILL;
|
||||||
|
rb_threadptr_interrupt(th);
|
||||||
return thread;
|
return thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2592,6 +2650,9 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
|
|||||||
result = native_fd_select(n, read, write, except, timeout, th);
|
result = native_fd_select(n, read, write, except, timeout, th);
|
||||||
if (result < 0) lerrno = errno;
|
if (result < 0) lerrno = errno;
|
||||||
}, ubf_select, th);
|
}, ubf_select, th);
|
||||||
|
|
||||||
|
RUBY_VM_CHECK_INTS();
|
||||||
|
|
||||||
errno = lerrno;
|
errno = lerrno;
|
||||||
|
|
||||||
if (result < 0) {
|
if (result < 0) {
|
||||||
@ -2815,6 +2876,8 @@ retry:
|
|||||||
if (result < 0) lerrno = errno;
|
if (result < 0) lerrno = errno;
|
||||||
}, ubf_select, GET_THREAD());
|
}, ubf_select, GET_THREAD());
|
||||||
|
|
||||||
|
RUBY_VM_CHECK_INTS();
|
||||||
|
|
||||||
if (result < 0) {
|
if (result < 0) {
|
||||||
errno = lerrno;
|
errno = lerrno;
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
@ -4707,6 +4770,10 @@ Init_Thread(void)
|
|||||||
gvl_init(th->vm);
|
gvl_init(th->vm);
|
||||||
gvl_acquire(th->vm, th);
|
gvl_acquire(th->vm, th);
|
||||||
native_mutex_initialize(&th->interrupt_lock);
|
native_mutex_initialize(&th->interrupt_lock);
|
||||||
|
|
||||||
|
th->async_errinfo_queue = rb_ary_new();
|
||||||
|
th->async_errinfo_queue_checked = 0;
|
||||||
|
th->async_errinfo_mask_stack = rb_ary_new();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
3
vm.c
3
vm.c
@ -1645,7 +1645,8 @@ rb_thread_mark(void *ptr)
|
|||||||
RUBY_MARK_UNLESS_NULL(th->thgroup);
|
RUBY_MARK_UNLESS_NULL(th->thgroup);
|
||||||
RUBY_MARK_UNLESS_NULL(th->value);
|
RUBY_MARK_UNLESS_NULL(th->value);
|
||||||
RUBY_MARK_UNLESS_NULL(th->errinfo);
|
RUBY_MARK_UNLESS_NULL(th->errinfo);
|
||||||
RUBY_MARK_UNLESS_NULL(th->thrown_errinfo);
|
RUBY_MARK_UNLESS_NULL(th->async_errinfo_queue);
|
||||||
|
RUBY_MARK_UNLESS_NULL(th->async_errinfo_mask_stack);
|
||||||
RUBY_MARK_UNLESS_NULL(th->root_svar);
|
RUBY_MARK_UNLESS_NULL(th->root_svar);
|
||||||
RUBY_MARK_UNLESS_NULL(th->top_self);
|
RUBY_MARK_UNLESS_NULL(th->top_self);
|
||||||
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
|
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
|
||||||
|
11
vm_core.h
11
vm_core.h
@ -446,8 +446,13 @@ typedef struct rb_thread_struct {
|
|||||||
VALUE thgroup;
|
VALUE thgroup;
|
||||||
VALUE value;
|
VALUE value;
|
||||||
|
|
||||||
|
/* temporary place of errinfo */
|
||||||
VALUE errinfo;
|
VALUE errinfo;
|
||||||
VALUE thrown_errinfo;
|
|
||||||
|
/* async errinfo queue */
|
||||||
|
VALUE async_errinfo_queue;
|
||||||
|
int async_errinfo_queue_checked;
|
||||||
|
VALUE async_errinfo_mask_stack;
|
||||||
|
|
||||||
rb_atomic_t interrupt_flag;
|
rb_atomic_t interrupt_flag;
|
||||||
rb_thread_lock_t interrupt_lock;
|
rb_thread_lock_t interrupt_lock;
|
||||||
@ -767,6 +772,10 @@ void rb_threadptr_signal_exit(rb_thread_t *th);
|
|||||||
void rb_threadptr_execute_interrupts(rb_thread_t *);
|
void rb_threadptr_execute_interrupts(rb_thread_t *);
|
||||||
void rb_threadptr_interrupt(rb_thread_t *th);
|
void rb_threadptr_interrupt(rb_thread_t *th);
|
||||||
void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
|
void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
|
||||||
|
void rb_threadptr_async_errinfo_clear(rb_thread_t *th);
|
||||||
|
void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v);
|
||||||
|
VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th);
|
||||||
|
int rb_threadptr_async_errinfo_active_p(rb_thread_t *th);
|
||||||
|
|
||||||
void rb_thread_lock_unlock(rb_thread_lock_t *);
|
void rb_thread_lock_unlock(rb_thread_lock_t *);
|
||||||
void rb_thread_lock_destroy(rb_thread_lock_t *);
|
void rb_thread_lock_destroy(rb_thread_lock_t *);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user