thread_pthread.c: restore timer-thread for now :<

[ruby-core:88306]

Revert "process.c: ensure th->interrupt lock is held when migrating"

This reverts commit 5ca416bdf6b6785cb20f139c2c514eda005fe42f (r64201)

Revert "process.c (rb_waitpid): reduce sigwait_fd bouncing"

This reverts commit 217bdd776fbeea3bfd0b9324eefbfcec3b1ccb3e (r64200).

Revert "test/ruby/test_thread.rb (test_thread_timer_and_interrupt): add timeouts"

This reverts commit 9f395f11202fc3c7edbd76f5aa6ce1f8a1e752a9 (r64199).

Revert "thread_pthread.c (native_sleep): reduce ppoll sleeps"

This reverts commit b3aa256c4d43d3d7e9975ec18eb127f45f623c9b (r64193).

Revert "thread.c (consume_communication_pipe): do not retry after short read"

This reverts commit 291a82f748de56e65fac10edefc51ec7a54a82d4 (r64185).

Revert "test/ruby/test_io.rb (test_race_gets_and_close): timeout each thread"

This reverts commit 3dbd8d1f66537f968f0461ed8547460b3b1241b3 (r64184).

Revert "thread_pthread.c (gvl_acquire_common): persist timeout across calls"

This reverts commit 8c2ae6e3ed072b06fc3cbc34fa8a14b2acbb49d5 (r64165).

Revert "test/ruby/test_io.rb (test_race_gets_and_close): use SIGABRT on timeout"

This reverts commit 931cda4db8afd6b544a8d85a6815765a9c417213 (r64135).

Revert "thread_pthread.c (gvl_yield): do ubf wakeups when uncontended"

This reverts commit 508f00314f46c08b6e9b0141c01355d24954260c (r64133).

Revert "thread_pthread.h (native_thread_data): split condvars on some platforms"

This reverts commit a038bf238bd9a24bf1e1622f618a27db261fc91b (r64124).

Revert "process.c (waitpid_nogvl): prevent conflicting use of sleep_cond"

This reverts commit 7018acc946882f21d519af7c42ccf84b22a46b27 (r64117).

Revert "thread_pthread.c (rb_sigwait_sleep): th may be 0 from MJIT"

This reverts commit 56491afc7916fb24f5c4dc2c632fb93fa7063992 (r64116).

Revert "thread*.c: waiting on sigwait_fd performs periodic ubf wakeups"

This reverts commit ab47a57a46e70634d049e4da20a5441c7a14cdec (r64115).

Revert "thread_pthread.c (gvl_destroy): make no-op on GVL bits"

This reverts commit 95cae748171f4754b97f4ba54da2ae62a8d484fd (r64114).

Revert "thread_pthread.c (rb_sigwait_sleep): fix uninitialized poll set in UBF case"

This reverts commit 4514362948fdb914c6138b12d961d92e9c0fee6c (r64113).

Revert "thread_pthread.c (rb_sigwait_sleep): re-fix [Bug #5343] harder"

This reverts commit 26b8a70bb309c7a367b9134045508b5b5a580a77 (r64111).

Revert "thread.c: move ppoll wrapper into thread_pthread.c"

This reverts commit 3dc7727d22fecbc355597edda25d2a245bf55ba1 (r64110).

Revert "thread.c: move ppoll wrapper before thread_pthread.c"

This reverts commit 2fa1e2e3c3c5c4b3ce84730dee4bcbe9d81b8e35 (r64109).

Revert "thread_pthread.c (ubf_select): refix [Bug #5343]"

This reverts commit 4c1ab82f0623eca91a95d2a44053be22bbce48ad (r64108).

Revert "thread_win32.c: suppress warnings by -Wsuggest-attribute"

This reverts commit 6a9b63e39075c53870933fbac5c1065f7d22047c (r64159).

Revert "thread_pthread: remove timer-thread by restructuring GVL"

This reverts commit 708bfd21156828526fe72de2cedecfaca6647dc1 (r64107).

git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64203 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
normal 2018-08-06 05:22:00 +00:00
parent 828158704c
commit 194a6a2c68
11 changed files with 577 additions and 739 deletions

View File

@ -77,9 +77,6 @@ extern "C" {
# define __has_extension __has_feature # define __has_extension __has_feature
#endif #endif
/* Prevent compiler from reordering access */
#define ACCESS_ONCE(type,x) (*((volatile type *)&(x)))
#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 201112L) #if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 201112L)
# define STATIC_ASSERT(name, expr) _Static_assert(expr, #name ": " #expr) # define STATIC_ASSERT(name, expr) _Static_assert(expr, #name ": " #expr)
#elif GCC_VERSION_SINCE(4, 6, 0) || __has_extension(c_static_assert) #elif GCC_VERSION_SINCE(4, 6, 0) || __has_extension(c_static_assert)

140
process.c
View File

@ -928,7 +928,6 @@ struct waitpid_state {
int status; int status;
int options; int options;
int errnum; int errnum;
int sigwait_fd;
}; };
void rb_native_mutex_lock(rb_nativethread_lock_t *); void rb_native_mutex_lock(rb_nativethread_lock_t *);
@ -937,65 +936,13 @@ void rb_native_cond_signal(rb_nativethread_cond_t *);
void rb_native_cond_wait(rb_nativethread_cond_t *, rb_nativethread_lock_t *); void rb_native_cond_wait(rb_nativethread_cond_t *, rb_nativethread_lock_t *);
rb_nativethread_cond_t *rb_sleep_cond_get(const rb_execution_context_t *); rb_nativethread_cond_t *rb_sleep_cond_get(const rb_execution_context_t *);
void rb_sleep_cond_put(rb_nativethread_cond_t *); void rb_sleep_cond_put(rb_nativethread_cond_t *);
int rb_sigwait_fd_get(const rb_thread_t *);
void rb_sigwait_sleep(const rb_thread_t *, int fd, const struct timespec *);
void rb_sigwait_fd_put(const rb_thread_t *, int fd);
static int
sigwait_fd_migrate_signaled_p(struct waitpid_state *w)
{
int signaled = FALSE;
rb_thread_t *th = w->ec ? rb_ec_thread_ptr(w->ec) : 0;
if (th) rb_native_mutex_lock(&th->interrupt_lock);
if (w->cond) {
rb_native_cond_signal(w->cond);
signaled = TRUE;
}
if (th) rb_native_mutex_unlock(&th->interrupt_lock);
return signaled;
}
/*
* When a thread is done using sigwait_fd and there are other threads
* sleeping on waitpid, we must kick one of the threads out of
* rb_native_cond_wait so it can switch to rb_sigwait_sleep
*/
static void
sigwait_fd_migrate_sleeper(rb_vm_t *vm)
{
struct waitpid_state *w = 0;
list_for_each(&vm->waiting_pids, w, wnode) {
if (sigwait_fd_migrate_signaled_p(w)) return;
}
list_for_each(&vm->waiting_grps, w, wnode) {
if (sigwait_fd_migrate_signaled_p(w)) return;
}
}
void
rb_sigwait_fd_migrate(rb_vm_t *vm)
{
rb_native_mutex_lock(&vm->waitpid_lock);
sigwait_fd_migrate_sleeper(vm);
rb_native_mutex_unlock(&vm->waitpid_lock);
}
static void static void
waitpid_notify(struct waitpid_state *w, rb_pid_t ret) waitpid_notify(struct waitpid_state *w, rb_pid_t ret)
{ {
w->ret = ret; w->ret = ret;
list_del_init(&w->wnode); list_del_init(&w->wnode);
if (w->cond) { rb_native_cond_signal(w->cond);
rb_native_cond_signal(w->cond);
}
else {
/* w is owned by this thread */
}
} }
#ifdef _WIN32 /* for spawnvp result from mjit.c */ #ifdef _WIN32 /* for spawnvp result from mjit.c */
@ -1007,7 +954,7 @@ waitpid_notify(struct waitpid_state *w, rb_pid_t ret)
#endif #endif
extern volatile unsigned int ruby_nocldwait; /* signal.c */ extern volatile unsigned int ruby_nocldwait; /* signal.c */
/* called by timer thread or thread which acquired sigwait_fd */ /* called by timer thread */
static void static void
waitpid_each(struct list_head *head) waitpid_each(struct list_head *head)
{ {
@ -1061,17 +1008,6 @@ waitpid_state_init(struct waitpid_state *w, rb_pid_t pid, int options)
w->options = options; w->options = options;
} }
static const struct timespec *
sigwait_sleep_time(void)
{
if (SIGCHLD_LOSSY) {
static const struct timespec busy_wait = { 0, 100000000 };
return &busy_wait;
}
return 0;
}
/* /*
* must be called with vm->waitpid_lock held, this is not interruptible * must be called with vm->waitpid_lock held, this is not interruptible
*/ */
@ -1090,31 +1026,13 @@ ruby_waitpid_locked(rb_vm_t *vm, rb_pid_t pid, int *status, int options,
if (w.ret == -1) w.errnum = errno; if (w.ret == -1) w.errnum = errno;
} }
else { else {
w.cond = cond;
w.ec = 0; w.ec = 0;
w.sigwait_fd = -1;
list_add(w.pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w.wnode); list_add(w.pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w.wnode);
do { do {
if (w.sigwait_fd < 0) rb_native_cond_wait(w.cond, &vm->waitpid_lock);
w.sigwait_fd = rb_sigwait_fd_get(0);
if (w.sigwait_fd >= 0) {
w.cond = 0;
rb_native_mutex_unlock(&vm->waitpid_lock);
rb_sigwait_sleep(0, w.sigwait_fd, sigwait_sleep_time());
rb_native_mutex_lock(&vm->waitpid_lock);
}
else {
w.cond = cond;
rb_native_cond_wait(w.cond, &vm->waitpid_lock);
}
} while (!w.ret); } while (!w.ret);
list_del(&w.wnode); list_del(&w.wnode);
/* we're done, maybe other waitpid callers are not: */
if (w.sigwait_fd >= 0) {
rb_sigwait_fd_put(0, w.sigwait_fd);
sigwait_fd_migrate_sleeper(vm);
}
} }
if (status) { if (status) {
*status = w.status; *status = w.status;
@ -1129,10 +1047,7 @@ waitpid_wake(void *x)
struct waitpid_state *w = x; struct waitpid_state *w = x;
/* th->interrupt_lock is already held by rb_threadptr_interrupt_common */ /* th->interrupt_lock is already held by rb_threadptr_interrupt_common */
if (w->cond) rb_native_cond_signal(w->cond);
rb_native_cond_signal(w->cond);
else
rb_thread_wakeup_timer_thread(0); /* kick sigwait_fd */
} }
static void * static void *
@ -1147,40 +1062,11 @@ waitpid_nogvl(void *x)
* by the time we enter this. And we may also be interrupted. * by the time we enter this. And we may also be interrupted.
*/ */
if (!w->ret && !RUBY_VM_INTERRUPTED_ANY(w->ec)) { if (!w->ret && !RUBY_VM_INTERRUPTED_ANY(w->ec)) {
if (w->sigwait_fd < 0) if (SIGCHLD_LOSSY) {
w->sigwait_fd = rb_sigwait_fd_get(th); rb_thread_wakeup_timer_thread();
if (w->sigwait_fd >= 0) {
rb_nativethread_cond_t *cond = w->cond;
w->cond = 0;
rb_native_mutex_unlock(&th->interrupt_lock);
rb_sigwait_sleep(th, w->sigwait_fd, sigwait_sleep_time());
rb_native_mutex_lock(&th->interrupt_lock);
w->cond = cond;
}
else {
if (!w->cond)
w->cond = rb_sleep_cond_get(w->ec);
/* another thread calling rb_sigwait_sleep will process
* signals for us */
if (SIGCHLD_LOSSY) {
rb_thread_wakeup_timer_thread(0);
}
rb_native_cond_wait(w->cond, &th->interrupt_lock);
} }
rb_native_cond_wait(w->cond, &th->interrupt_lock);
} }
/*
* we must release th->native_thread_data.sleep_cond when
* re-acquiring GVL:
*/
if (w->cond) {
rb_sleep_cond_put(w->cond);
w->cond = 0;
}
rb_native_mutex_unlock(&th->interrupt_lock); rb_native_mutex_unlock(&th->interrupt_lock);
return 0; return 0;
@ -1210,15 +1096,8 @@ waitpid_cleanup(VALUE x)
list_del(&w->wnode); list_del(&w->wnode);
rb_native_mutex_unlock(&vm->waitpid_lock); rb_native_mutex_unlock(&vm->waitpid_lock);
} }
rb_sleep_cond_put(w->cond);
/* we may have never released and re-acquired GVL */
if (w->cond)
rb_sleep_cond_put(w->cond);
if (w->sigwait_fd >= 0) {
rb_sigwait_fd_put(rb_ec_thread_ptr(w->ec), w->sigwait_fd);
rb_sigwait_fd_migrate(rb_ec_vm_ptr(w->ec));
}
return Qfalse; return Qfalse;
} }
@ -1245,7 +1124,6 @@ waitpid_wait(struct waitpid_state *w)
} }
else { else {
w->cond = rb_sleep_cond_get(w->ec); w->cond = rb_sleep_cond_get(w->ec);
w->sigwait_fd = -1;
/* order matters, favor specified PIDs rather than -1 or 0 */ /* order matters, favor specified PIDs rather than -1 or 0 */
list_add(w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w->wnode); list_add(w->pid > 0 ? &vm->waiting_pids : &vm->waiting_grps, &w->wnode);
} }

View File

@ -709,6 +709,9 @@ signal_enque(int sig)
static rb_atomic_t sigchld_hit; static rb_atomic_t sigchld_hit;
/* Prevent compiler from reordering access */
#define ACCESS_ONCE(type,x) (*((volatile type *)&(x)))
static RETSIGTYPE static RETSIGTYPE
sighandler(int sig) sighandler(int sig)
{ {
@ -727,7 +730,7 @@ sighandler(int sig)
else { else {
signal_enque(sig); signal_enque(sig);
} }
rb_thread_wakeup_timer_thread(sig); rb_thread_wakeup_timer_thread();
#if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL) #if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL)
ruby_signal(sig, sighandler); ruby_signal(sig, sighandler);
#endif #endif
@ -761,6 +764,7 @@ rb_enable_interrupt(void)
#ifdef HAVE_PTHREAD_SIGMASK #ifdef HAVE_PTHREAD_SIGMASK
sigset_t mask; sigset_t mask;
sigemptyset(&mask); sigemptyset(&mask);
sigaddset(&mask, RUBY_SIGCHLD); /* timer-thread handles this */
pthread_sigmask(SIG_SETMASK, &mask, NULL); pthread_sigmask(SIG_SETMASK, &mask, NULL);
#endif #endif
} }
@ -1073,6 +1077,7 @@ rb_trap_exit(void)
void ruby_waitpid_all(rb_vm_t *); /* process.c */ void ruby_waitpid_all(rb_vm_t *); /* process.c */
/* only runs in the timer-thread */
void void
ruby_sigchld_handler(rb_vm_t *vm) ruby_sigchld_handler(rb_vm_t *vm)
{ {

View File

@ -3556,8 +3556,7 @@ __END__
end if File::BINARY != 0 end if File::BINARY != 0
def test_race_gets_and_close def test_race_gets_and_close
opt = { signal: :ABRT, timeout: 200 } assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}")
assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}", opt)
bug13076 = '[ruby-core:78845] [Bug #13076]' bug13076 = '[ruby-core:78845] [Bug #13076]'
begin; begin;
10.times do |i| 10.times do |i|
@ -3579,9 +3578,9 @@ __END__
w.close w.close
r.close r.close
end end
t.each do |th| assert_nothing_raised(IOError, bug13076) {
assert_same(th, th.join(2), bug13076) t.each(&:join)
end }
end end
end; end;
end end

View File

@ -1767,7 +1767,7 @@ class TestProcess < Test::Unit::TestCase
puts Dir.entries("/proc/self/task") - %W[. ..] puts Dir.entries("/proc/self/task") - %W[. ..]
end end
bug4920 = '[ruby-dev:43873]' bug4920 = '[ruby-dev:43873]'
assert_include(1..2, data.size, bug4920) assert_equal(2, data.size, bug4920)
assert_not_include(data.map(&:to_i), pid) assert_not_include(data.map(&:to_i), pid)
end end
else # darwin else # darwin

View File

@ -952,16 +952,15 @@ _eom
def test_thread_timer_and_interrupt def test_thread_timer_and_interrupt
bug5757 = '[ruby-dev:44985]' bug5757 = '[ruby-dev:44985]'
pid = nil pid = nil
cmd = 'Signal.trap(:INT, "DEFAULT"); pipe=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; pipe[0].read' cmd = 'Signal.trap(:INT, "DEFAULT"); r,=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; r.read'
opt = {} opt = {}
opt[:new_pgroup] = true if /mswin|mingw/ =~ RUBY_PLATFORM opt[:new_pgroup] = true if /mswin|mingw/ =~ RUBY_PLATFORM
s, t, _err = EnvUtil.invoke_ruby(['-e', cmd], "", true, true, opt) do |in_p, out_p, err_p, cpid| s, t, _err = EnvUtil.invoke_ruby(['-e', cmd], "", true, true, opt) do |in_p, out_p, err_p, cpid|
assert IO.select([out_p], nil, nil, 10), 'subprocess not ready'
out_p.gets out_p.gets
pid = cpid pid = cpid
t0 = Time.now.to_f t0 = Time.now.to_f
Process.kill(:SIGINT, pid) Process.kill(:SIGINT, pid)
Timeout.timeout(10) { Process.wait(pid) } Process.wait(pid)
t1 = Time.now.to_f t1 = Time.now.to_f
[$?, t1 - t0, err_p.read] [$?, t1 - t0, err_p.read]
end end

397
thread.c
View File

@ -106,13 +106,8 @@ static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
static const char *thread_status_name(rb_thread_t *th, int detail); static const char *thread_status_name(rb_thread_t *th, int detail);
static void timespec_add(struct timespec *, const struct timespec *); static void timespec_add(struct timespec *, const struct timespec *);
static void timespec_sub(struct timespec *, const struct timespec *); static void timespec_sub(struct timespec *, const struct timespec *);
static int timespec_cmp(const struct timespec *a, const struct timespec *b);
static int timespec_update_expire(struct timespec *, const struct timespec *); static int timespec_update_expire(struct timespec *, const struct timespec *);
static void getclockofday(struct timespec *); static void getclockofday(struct timespec *);
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
static int consume_communication_pipe(int fd);
static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
#define eKillSignal INT2FIX(0) #define eKillSignal INT2FIX(0)
#define eTerminateSignal INT2FIX(1) #define eTerminateSignal INT2FIX(1)
@ -353,14 +348,7 @@ rb_thread_s_debug_set(VALUE self, VALUE val)
#endif #endif
NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
VALUE *register_stack_start)); VALUE *register_stack_start));
static void timer_thread_function(void); static void timer_thread_function(void *);
void ruby_sigchld_handler(rb_vm_t *); /* signal.c */
static void
ubf_sigwait(void *ignore)
{
rb_thread_wakeup_timer_thread(0);
}
#if defined(_WIN32) #if defined(_WIN32)
#include "thread_win32.c" #include "thread_win32.c"
@ -385,15 +373,6 @@ ubf_sigwait(void *ignore)
#error "unsupported thread type" #error "unsupported thread type"
#endif #endif
/*
* TODO: somebody with win32 knowledge should be able to get rid of
* timer-thread by busy-waiting on signals. And it should be possible
* to make the GVL in thread_pthread.c be platform-independent.
*/
#ifndef BUSY_WAIT_SIGNALS
# define BUSY_WAIT_SIGNALS (0)
#endif
#if THREAD_DEBUG #if THREAD_DEBUG
static int debug_mutex_initialized = 1; static int debug_mutex_initialized = 1;
static rb_nativethread_lock_t debug_mutex; static rb_nativethread_lock_t debug_mutex;
@ -433,6 +412,7 @@ rb_vm_gvl_destroy(rb_vm_t *vm)
{ {
gvl_release(vm); gvl_release(vm);
gvl_destroy(vm); gvl_destroy(vm);
rb_native_mutex_destroy(&vm->thread_destruct_lock);
if (0) { if (0) {
/* may be held by running threads */ /* may be held by running threads */
rb_native_mutex_destroy(&vm->waitpid_lock); rb_native_mutex_destroy(&vm->waitpid_lock);
@ -793,6 +773,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s
rb_fiber_close(th->ec->fiber_ptr); rb_fiber_close(th->ec->fiber_ptr);
} }
rb_native_mutex_lock(&th->vm->thread_destruct_lock);
/* make sure vm->running_thread never point me after this point.*/
th->vm->running_thread = NULL;
rb_native_mutex_unlock(&th->vm->thread_destruct_lock);
thread_cleanup_func(th, FALSE); thread_cleanup_func(th, FALSE);
gvl_release(th->vm); gvl_release(th->vm);
@ -2179,14 +2163,6 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
/* signal handling */ /* signal handling */
if (trap_interrupt && (th == th->vm->main_thread)) { if (trap_interrupt && (th == th->vm->main_thread)) {
enum rb_thread_status prev_status = th->status; enum rb_thread_status prev_status = th->status;
int sigwait_fd = rb_sigwait_fd_get(th);
if (sigwait_fd >= 0) {
(void)consume_communication_pipe(sigwait_fd);
ruby_sigchld_handler(th->vm);
rb_sigwait_fd_put(th, sigwait_fd);
rb_sigwait_fd_migrate(th->vm);
}
th->status = THREAD_RUNNABLE; th->status = THREAD_RUNNABLE;
while ((sig = rb_get_next_signal()) != 0) { while ((sig = rb_get_next_signal()) != 0) {
rb_signal_exec(th, sig); rb_signal_exec(th, sig);
@ -3864,95 +3840,86 @@ wait_retryable(int *result, int errnum, struct timespec *timeout,
return FALSE; return FALSE;
} }
#define restore_fdset(fds1, fds2) \
((fds1) ? rb_fd_dup(fds1, fds2) : (void)0)
struct select_set { struct select_set {
int max; rb_fdset_t read;
int sigwait_fd; rb_fdset_t write;
rb_thread_t *th; rb_fdset_t except;
rb_fdset_t *rset;
rb_fdset_t *wset;
rb_fdset_t *eset;
rb_fdset_t orig_rset;
rb_fdset_t orig_wset;
rb_fdset_t orig_eset;
struct timeval *timeout;
}; };
static VALUE static size_t
select_set_free(VALUE p) select_set_memsize(const void *p)
{ {
struct select_set *set = (struct select_set *)p; return sizeof(struct select_set);
if (set->sigwait_fd >= 0) {
rb_sigwait_fd_put(set->th, set->sigwait_fd);
rb_sigwait_fd_migrate(set->th->vm);
}
rb_fd_term(&set->orig_rset);
rb_fd_term(&set->orig_wset);
rb_fd_term(&set->orig_eset);
return Qfalse;
} }
static const struct timespec * static void
sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig, select_set_free(void *p)
int *drained_p)
{ {
static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 }; struct select_set *orig = p;
if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { rb_fd_term(&orig->read);
*drained_p = check_signals_nogvl(th, sigwait_fd); rb_fd_term(&orig->write);
if (!orig || timespec_cmp(orig, &quantum) > 0) rb_fd_term(&orig->except);
return &quantum; xfree(orig);
}
return orig;
} }
static VALUE static const rb_data_type_t select_set_type = {
do_select(VALUE p) "select_set",
{NULL, select_set_free, select_set_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
};
static int
do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds,
rb_fdset_t *const exceptfds, struct timeval *timeout)
{ {
struct select_set *set = (struct select_set *)p;
int MAYBE_UNUSED(result); int MAYBE_UNUSED(result);
int lerrno; int lerrno;
struct timespec ts, end, *tsp; struct timespec ts, end, *tsp;
const struct timespec *to; rb_thread_t *th = GET_THREAD();
struct timeval tv; VALUE o;
struct select_set *orig;
timeout_prepare(&tsp, &ts, &end, set->timeout); o = TypedData_Make_Struct(0, struct select_set, &select_set_type, orig);
#define restore_fdset(dst, src) \
((dst) ? rb_fd_dup(dst, src) : (void)0) timeout_prepare(&tsp, &ts, &end, timeout);
#define do_select_update() \ #define do_select_update() \
(restore_fdset(set->rset, &set->orig_rset), \ (restore_fdset(readfds, &orig->read), \
restore_fdset(set->wset, &set->orig_wset), \ restore_fdset(writefds, &orig->write), \
restore_fdset(set->eset, &set->orig_eset), \ restore_fdset(exceptfds, &orig->except), \
TRUE) TRUE)
#define fd_init_copy(f) \
(f##fds) ? rb_fd_init_copy(&orig->f, f##fds) : rb_fd_no_init(&orig->f)
fd_init_copy(read);
fd_init_copy(write);
fd_init_copy(except);
#undef fd_init_copy
do { do {
int drained;
lerrno = 0; lerrno = 0;
BLOCKING_REGION(set->th, { BLOCKING_REGION(th, {
to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained); result = native_fd_select(n, readfds, writefds, exceptfds,
result = native_fd_select(set->max, set->rset, set->wset, set->eset, timeval_for(timeout, tsp), th);
timeval_for(&tv, to), set->th);
if (result < 0) lerrno = errno; if (result < 0) lerrno = errno;
}, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE); }, ubf_select, th, FALSE);
if (set->sigwait_fd >= 0) { RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may raise */
if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset))
result--;
(void)check_signals_nogvl(set->th, set->sigwait_fd);
}
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
} while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update());
/* didn't raise, perform cleanup ourselves */
select_set_free(orig);
rb_gc_force_recycle(o);
if (result < 0) { if (result < 0) {
errno = lerrno; errno = lerrno;
} }
return (VALUE)result; return result;
} }
static void static void
@ -3988,42 +3955,11 @@ rb_thread_fd_writable(int fd)
return TRUE; return TRUE;
} }
static rb_fdset_t *
init_set_fd(int fd, rb_fdset_t *fds)
{
if (fd < 0) {
return 0;
}
rb_fd_init(fds);
rb_fd_set(fd, fds);
return fds;
}
int int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
struct timeval *timeout) struct timeval *timeout)
{ {
struct select_set set; if (!read && !write && !except) {
set.th = GET_THREAD();
set.max = max;
set.sigwait_fd = rb_sigwait_fd_get(set.th);
set.rset = read;
set.wset = write;
set.eset = except;
set.timeout = timeout;
if (set.sigwait_fd >= 0) {
if (set.rset)
rb_fd_set(set.sigwait_fd, set.rset);
else
set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
if (set.sigwait_fd > set.max) {
set.max = set.sigwait_fd + 1;
}
}
if (!set.rset && !set.wset && !set.eset) {
if (!timeout) { if (!timeout) {
rb_thread_sleep_forever(); rb_thread_sleep_forever();
return 0; return 0;
@ -4032,23 +3968,16 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return 0; return 0;
} }
#define fd_init_copy(f) do { \ if (read) {
if (set.f) { \ rb_fd_resize(max - 1, read);
rb_fd_resize(set.max - 1, set.f); \ }
if (&set.orig_##f != set.f) { /* sigwait_fd */ \ if (write) {
rb_fd_init_copy(&set.orig_##f, set.f); \ rb_fd_resize(max - 1, write);
} \ }
} \ if (except) {
else { \ rb_fd_resize(max - 1, except);
rb_fd_no_init(&set.orig_##f); \ }
} \ return do_select(max, read, write, except, timeout);
} while (0)
fd_init_copy(rset);
fd_init_copy(wset);
fd_init_copy(eset);
#undef fd_init_copy
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
} }
#ifdef USE_POLL #ifdef USE_POLL
@ -4062,64 +3991,68 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
# define POLLERR_SET (0) # define POLLERR_SET (0)
#endif #endif
#ifndef HAVE_PPOLL
/* TODO: don't ignore sigmask */
static int
ruby_ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *ts, const sigset_t *sigmask)
{
int timeout_ms;
if (ts) {
int tmp, tmp2;
if (ts->tv_sec > INT_MAX/1000)
timeout_ms = INT_MAX;
else {
tmp = (int)(ts->tv_sec * 1000);
/* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */
tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L));
if (INT_MAX - tmp < tmp2)
timeout_ms = INT_MAX;
else
timeout_ms = (int)(tmp + tmp2);
}
}
else
timeout_ms = -1;
return poll(fds, nfds, timeout_ms);
}
# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask))
#endif
/* /*
* returns a mask of events * returns a mask of events
*/ */
int int
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{ {
struct pollfd fds[2]; struct pollfd fds;
int result = 0, lerrno; int result = 0, lerrno;
struct timespec ts, end, *tsp; struct timespec ts, end, *tsp;
const struct timespec *to;
int drained;
rb_thread_t *th = GET_THREAD(); rb_thread_t *th = GET_THREAD();
nfds_t nfds;
rb_unblock_function_t *ubf;
timeout_prepare(&tsp, &ts, &end, timeout); timeout_prepare(&tsp, &ts, &end, timeout);
fds[0].fd = fd; fds.fd = fd;
fds[0].events = (short)events; fds.events = (short)events;
do { do {
fds[0].revents = 0; fds.revents = 0;
fds[1].fd = rb_sigwait_fd_get(th);
if (fds[1].fd >= 0) {
fds[1].events = POLLIN;
fds[1].revents = 0;
nfds = 2;
ubf = ubf_sigwait;
}
else {
nfds = 1;
ubf = ubf_select;
}
lerrno = 0; lerrno = 0;
BLOCKING_REGION(th, { BLOCKING_REGION(th, {
to = sigwait_timeout(th, fds[1].fd, tsp, &drained); result = ppoll(&fds, 1, tsp, NULL);
result = ppoll(fds, nfds, to, NULL);
if (result < 0) lerrno = errno; if (result < 0) lerrno = errno;
}, ubf, th, FALSE); }, ubf_select, th, FALSE);
if (fds[1].fd >= 0) {
if (result > 0 && fds[1].revents) {
result--;
fds[1].revents = 0;
}
(void)check_signals_nogvl(th, fds[1].fd);
rb_sigwait_fd_put(th, fds[1].fd);
rb_sigwait_fd_migrate(th->vm);
}
RUBY_VM_CHECK_INTS_BLOCKING(th->ec); RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
} while (wait_retryable(&result, lerrno, tsp, &end)); } while (wait_retryable(&result, lerrno, tsp, &end));
if (result < 0) { if (result < 0) {
errno = lerrno; errno = lerrno;
return -1; return -1;
} }
if (fds[0].revents & POLLNVAL) { if (fds.revents & POLLNVAL) {
errno = EBADF; errno = EBADF;
return -1; return -1;
} }
@ -4129,20 +4062,32 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
* Therefore we need to fix it up. * Therefore we need to fix it up.
*/ */
result = 0; result = 0;
if (fds[0].revents & POLLIN_SET) if (fds.revents & POLLIN_SET)
result |= RB_WAITFD_IN; result |= RB_WAITFD_IN;
if (fds[0].revents & POLLOUT_SET) if (fds.revents & POLLOUT_SET)
result |= RB_WAITFD_OUT; result |= RB_WAITFD_OUT;
if (fds[0].revents & POLLEX_SET) if (fds.revents & POLLEX_SET)
result |= RB_WAITFD_PRI; result |= RB_WAITFD_PRI;
/* all requested events are ready if there is an error */ /* all requested events are ready if there is an error */
if (fds[0].revents & POLLERR_SET) if (fds.revents & POLLERR_SET)
result |= events; result |= events;
return result; return result;
} }
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
static rb_fdset_t *
init_set_fd(int fd, rb_fdset_t *fds)
{
if (fd < 0) {
return 0;
}
rb_fd_init(fds);
rb_fd_set(fd, fds);
return fds;
}
struct select_args { struct select_args {
union { union {
int fd; int fd;
@ -4223,6 +4168,10 @@ rb_gc_set_stack_end(VALUE **stack_end_p)
} }
#endif #endif
/* signal.c */
void ruby_sigchld_handler(rb_vm_t *);
/* /*
* *
*/ */
@ -4238,81 +4187,36 @@ rb_threadptr_check_signal(rb_thread_t *mth)
} }
static void static void
timer_thread_function(void) timer_thread_function(void *arg)
{ {
volatile rb_execution_context_t *ec; rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */
/*
* Tricky: thread_destruct_lock doesn't close a race against
* vm->running_thread switch. however it guarantees th->running_thread
* point to valid pointer or NULL.
*/
rb_native_mutex_lock(&vm->thread_destruct_lock);
/* for time slice */ /* for time slice */
ec = ACCESS_ONCE(rb_execution_context_t *, if (vm->running_thread) {
ruby_current_execution_context_ptr); RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread->ec);
if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec);
}
static void
async_bug_fd(const char *mesg, int errno_arg, int fd)
{
char buff[64];
size_t n = strlcpy(buff, mesg, sizeof(buff));
if (n < sizeof(buff)-3) {
ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
} }
rb_async_bug_errno(buff, errno_arg); rb_native_mutex_unlock(&vm->thread_destruct_lock);
}
/* VM-dependent API is not available for this function */ /* check signal */
static int
consume_communication_pipe(int fd)
{
#define CCP_READ_BUFF_SIZE 1024
/* buffer can be shared because no one refers to them. */
static char buff[CCP_READ_BUFF_SIZE];
ssize_t result;
int ret = FALSE; /* for rb_sigwait_sleep */
while (1) {
result = read(fd, buff, sizeof(buff));
if (result > 0) {
ret = TRUE;
if (result < (ssize_t)sizeof(buff)) {
return ret;
}
}
else if (result == 0) {
return ret;
}
else if (result < 0) {
int e = errno;
switch (e) {
case EINTR:
continue; /* retry */
case EAGAIN:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return ret;
default:
async_bug_fd("consume_communication_pipe: read", e, fd);
}
}
}
}
static int
check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
{
rb_vm_t *vm = GET_VM(); /* th may be 0 */
int ret = consume_communication_pipe(sigwait_fd);
ubf_wakeup_all_threads();
ruby_sigchld_handler(vm); ruby_sigchld_handler(vm);
if (rb_signal_buff_size()) { rb_threadptr_check_signal(vm->main_thread);
if (th == vm->main_thread)
/* no need to lock + wakeup if already in main thread */ #if 0
RUBY_VM_SET_TRAP_INTERRUPT(th->ec); /* prove profiler */
else if (vm->prove_profile.enable) {
threadptr_trap_interrupt(vm->main_thread); rb_thread_t *th = vm->running_thread;
ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
if (vm->during_gc) {
/* GC prove profiling */
}
} }
return ret; #endif
} }
void void
@ -5142,6 +5046,7 @@ Init_Thread(void)
/* acquire global vm lock */ /* acquire global vm lock */
gvl_init(th->vm); gvl_init(th->vm);
gvl_acquire(th->vm, th); gvl_acquire(th->vm, th);
rb_native_mutex_initialize(&th->vm->thread_destruct_lock);
rb_native_mutex_initialize(&th->vm->waitpid_lock); rb_native_mutex_initialize(&th->vm->waitpid_lock);
rb_native_mutex_initialize(&th->interrupt_lock); rb_native_mutex_initialize(&th->interrupt_lock);

View File

@ -45,21 +45,27 @@ void rb_native_cond_broadcast(rb_nativethread_cond_t *cond);
void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex); void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
void rb_native_cond_initialize(rb_nativethread_cond_t *cond); void rb_native_cond_initialize(rb_nativethread_cond_t *cond);
void rb_native_cond_destroy(rb_nativethread_cond_t *cond); void rb_native_cond_destroy(rb_nativethread_cond_t *cond);
static void rb_thread_wakeup_timer_thread_low(void);
static void clear_thread_cache_altstack(void); static void clear_thread_cache_altstack(void);
static void ubf_wakeup_all_threads(void);
static int ubf_threads_empty(void);
static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
const struct timespec *);
static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd,
const struct timespec *,
int *drained_p);
#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) #define TIMER_THREAD_MASK (1)
#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK)
#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK)
/* for testing, and in case we come across a platform w/o pipes: */ #if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \
#define BUSY_WAIT_SIGNALS (0) defined(F_SETFL) && defined(O_NONBLOCK) && \
#define THREAD_INVALID ((const rb_thread_t *)-1) defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC)
static const rb_thread_t *sigwait_th; /* The timer thread sleeps while only one Ruby thread is running. */
# define TIMER_IMPL TIMER_THREAD_SLEEPY
#else
# define TIMER_IMPL TIMER_THREAD_BUSY
#endif
static struct {
pthread_t id;
int created;
} timer_thread;
#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0)
#ifdef HAVE_SCHED_YIELD #ifdef HAVE_SCHED_YIELD
#define native_thread_yield() (void)sched_yield() #define native_thread_yield() (void)sched_yield()
@ -76,96 +82,49 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono;
static const void *const condattr_monotonic = NULL; static const void *const condattr_monotonic = NULL;
#endif #endif
/* 100ms. 10ms is too small for user level thread scheduling
* on recent Linux (tested on 2.6.35)
*/
#define TIME_QUANTUM_USEC (100 * 1000)
static struct timespec native_cond_timeout(rb_nativethread_cond_t *,
struct timespec rel);
static void static void
gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) gvl_acquire_common(rb_vm_t *vm)
{ {
if (vm->gvl.acquired) { if (vm->gvl.acquired) {
native_thread_data_t *nd = &th->native_thread_data;
VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq"); if (!vm->gvl.waiting++) {
/*
* Wake up timer thread iff timer thread is slept.
* When timer thread is polling mode, we don't want to
* make confusing timer thread interval time.
*/
rb_thread_wakeup_timer_thread_low();
}
list_add_tail(&vm->gvl.waitq, &nd->ubf_list); while (vm->gvl.acquired) {
do { rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
if (!vm->gvl.timer) { }
static struct timespec ts;
static int err = ETIMEDOUT;
/* --vm->gvl.waiting;
* become designated timer thread to kick vm->gvl.acquired
* periodically. Continue on old timeout if it expired:
*/
if (err == ETIMEDOUT) {
ts.tv_sec = 0;
ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
ts = native_cond_timeout(&nd->cond.gvlq, ts);
}
vm->gvl.timer = th;
err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &ts);
vm->gvl.timer = 0;
ubf_wakeup_all_threads();
/* if (vm->gvl.need_yield) {
* Timeslice. We can't touch thread_destruct_lock here, vm->gvl.need_yield = 0;
* as the process may fork while this thread is contending
* for GVL:
*/
if (vm->gvl.acquired) timer_thread_function();
}
else {
rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock);
}
} while (vm->gvl.acquired);
list_del_init(&nd->ubf_list);
if (vm->gvl.need_yield) {
vm->gvl.need_yield = 0;
rb_native_cond_signal(&vm->gvl.switch_cond); rb_native_cond_signal(&vm->gvl.switch_cond);
} }
} }
vm->gvl.acquired = th;
/*
* Designate the next gvl.timer thread, favor the last thread in
* the waitq since it will be in waitq longest
*/
if (!vm->gvl.timer) {
native_thread_data_t *last;
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); vm->gvl.acquired = 1;
if (last) {
rb_native_cond_signal(&last->cond.gvlq);
}
else if (!ubf_threads_empty()) {
rb_thread_wakeup_timer_thread(0);
}
}
} }
static void static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th) gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{ {
rb_native_mutex_lock(&vm->gvl.lock); rb_native_mutex_lock(&vm->gvl.lock);
gvl_acquire_common(vm, th); gvl_acquire_common(vm);
rb_native_mutex_unlock(&vm->gvl.lock); rb_native_mutex_unlock(&vm->gvl.lock);
} }
static native_thread_data_t * static void
gvl_release_common(rb_vm_t *vm) gvl_release_common(rb_vm_t *vm)
{ {
native_thread_data_t *next;
vm->gvl.acquired = 0; vm->gvl.acquired = 0;
next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list); if (vm->gvl.waiting > 0)
if (next) rb_native_cond_signal(&next->cond.gvlq); rb_native_cond_signal(&vm->gvl.cond);
return next;
} }
static void static void
@ -179,38 +138,34 @@ gvl_release(rb_vm_t *vm)
static void static void
gvl_yield(rb_vm_t *vm, rb_thread_t *th) gvl_yield(rb_vm_t *vm, rb_thread_t *th)
{ {
native_thread_data_t *next;
rb_native_mutex_lock(&vm->gvl.lock); rb_native_mutex_lock(&vm->gvl.lock);
next = gvl_release_common(vm);
gvl_release_common(vm);
/* An another thread is processing GVL yield. */ /* An another thread is processing GVL yield. */
if (UNLIKELY(vm->gvl.wait_yield)) { if (UNLIKELY(vm->gvl.wait_yield)) {
while (vm->gvl.wait_yield) while (vm->gvl.wait_yield)
rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock);
goto acquire;
} }
else if (next) {
/* Wait until another thread task takes GVL. */ if (vm->gvl.waiting > 0) {
vm->gvl.need_yield = 1; /* Wait until another thread task take GVL. */
vm->gvl.wait_yield = 1; vm->gvl.need_yield = 1;
while (vm->gvl.need_yield) vm->gvl.wait_yield = 1;
while (vm->gvl.need_yield)
rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
vm->gvl.wait_yield = 0; vm->gvl.wait_yield = 0;
rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
} }
else { else {
rb_native_mutex_unlock(&vm->gvl.lock); rb_native_mutex_unlock(&vm->gvl.lock);
/* sched_yield();
* GVL was not contended when we released, so we have no potential
* contenders for reacquisition. Perhaps they are stuck in blocking
* region w/o GVL, too, so we kick them:
*/
ubf_wakeup_all_threads();
native_thread_yield();
rb_native_mutex_lock(&vm->gvl.lock); rb_native_mutex_lock(&vm->gvl.lock);
rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
} }
gvl_acquire_common(vm, th);
rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
acquire:
gvl_acquire_common(vm);
rb_native_mutex_unlock(&vm->gvl.lock); rb_native_mutex_unlock(&vm->gvl.lock);
} }
@ -218,11 +173,11 @@ static void
gvl_init(rb_vm_t *vm) gvl_init(rb_vm_t *vm)
{ {
rb_native_mutex_initialize(&vm->gvl.lock); rb_native_mutex_initialize(&vm->gvl.lock);
rb_native_cond_initialize(&vm->gvl.cond);
rb_native_cond_initialize(&vm->gvl.switch_cond); rb_native_cond_initialize(&vm->gvl.switch_cond);
rb_native_cond_initialize(&vm->gvl.switch_wait_cond); rb_native_cond_initialize(&vm->gvl.switch_wait_cond);
list_head_init(&vm->gvl.waitq);
vm->gvl.acquired = 0; vm->gvl.acquired = 0;
vm->gvl.timer = 0; vm->gvl.waiting = 0;
vm->gvl.need_yield = 0; vm->gvl.need_yield = 0;
vm->gvl.wait_yield = 0; vm->gvl.wait_yield = 0;
} }
@ -230,16 +185,10 @@ gvl_init(rb_vm_t *vm)
static void static void
gvl_destroy(rb_vm_t *vm) gvl_destroy(rb_vm_t *vm)
{ {
/* rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
* only called once at VM shutdown (not atfork), another thread rb_native_cond_destroy(&vm->gvl.switch_cond);
* may still grab vm->gvl.lock when calling gvl_release at rb_native_cond_destroy(&vm->gvl.cond);
* the end of thread_start_func_2 rb_native_mutex_destroy(&vm->gvl.lock);
*/
if (0) {
rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
rb_native_cond_destroy(&vm->gvl.switch_cond);
rb_native_mutex_destroy(&vm->gvl.lock);
}
clear_thread_cache_altstack(); clear_thread_cache_altstack();
} }
@ -484,9 +433,7 @@ native_thread_init(rb_thread_t *th)
#ifdef USE_UBF_LIST #ifdef USE_UBF_LIST
list_node_init(&nd->ubf_list); list_node_init(&nd->ubf_list);
#endif #endif
rb_native_cond_initialize(&nd->cond.gvlq); rb_native_cond_initialize(&nd->sleep_cond);
if (&nd->cond.gvlq != &nd->cond.intr)
rb_native_cond_initialize(&nd->cond.intr);
ruby_thread_set_native(th); ruby_thread_set_native(th);
} }
@ -497,11 +444,7 @@ native_thread_init(rb_thread_t *th)
static void static void
native_thread_destroy(rb_thread_t *th) native_thread_destroy(rb_thread_t *th)
{ {
native_thread_data_t *nd = &th->native_thread_data; rb_native_cond_destroy(&th->native_thread_data.sleep_cond);
rb_native_cond_destroy(&nd->cond.gvlq);
if (&nd->cond.gvlq != &nd->cond.intr)
rb_native_cond_destroy(&nd->cond.intr);
/* /*
* prevent false positive from ruby_thread_has_gvl_p if that * prevent false positive from ruby_thread_has_gvl_p if that
@ -1069,6 +1012,17 @@ native_thread_create(rb_thread_t *th)
return err; return err;
} }
#if (TIMER_IMPL & TIMER_THREAD_MASK)
static void
native_thread_join(pthread_t th)
{
int err = pthread_join(th, 0);
if (err) {
rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err);
}
}
#endif /* TIMER_THREAD_MASK */
#if USE_NATIVE_THREAD_PRIORITY #if USE_NATIVE_THREAD_PRIORITY
static void static void
@ -1110,15 +1064,15 @@ ubf_pthread_cond_signal(void *ptr)
{ {
rb_thread_t *th = (rb_thread_t *)ptr; rb_thread_t *th = (rb_thread_t *)ptr;
thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th); thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th);
rb_native_cond_signal(&th->native_thread_data.cond.intr); rb_native_cond_signal(&th->native_thread_data.sleep_cond);
} }
static void static void
native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel) native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
{ {
struct timespec timeout; struct timespec timeout;
rb_nativethread_lock_t *lock = &th->interrupt_lock; rb_nativethread_lock_t *lock = &th->interrupt_lock;
rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr; rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond;
if (timeout_rel) { if (timeout_rel) {
/* Solaris cond_timedwait() return EINVAL if an argument is greater than /* Solaris cond_timedwait() return EINVAL if an argument is greater than
@ -1210,30 +1164,17 @@ static void
ubf_select(void *ptr) ubf_select(void *ptr)
{ {
rb_thread_t *th = (rb_thread_t *)ptr; rb_thread_t *th = (rb_thread_t *)ptr;
rb_vm_t *vm = th->vm;
register_ubf_list(th); register_ubf_list(th);
/* /*
* ubf_wakeup_thread() doesn't guarantee to wake up a target thread. * ubf_wakeup_thread() doesn't guarantee to wake up a target thread.
* Therefore, we repeatedly call ubf_wakeup_thread() until a target thread * Therefore, we repeatedly call ubf_wakeup_thread() until a target thread
* exit from ubf function. We must designate a timer-thread to perform * exit from ubf function.
* this operation. * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread()
* if running on timer thread because it may make endless wakeups.
*/ */
rb_native_mutex_lock(&vm->gvl.lock); if (!pthread_equal(pthread_self(), timer_thread.id))
if (!vm->gvl.timer) { rb_thread_wakeup_timer_thread();
native_thread_data_t *last;
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
if (last) {
rb_native_cond_signal(&last->cond.gvlq);
}
else {
rb_thread_wakeup_timer_thread(0);
}
}
rb_native_mutex_unlock(&vm->gvl.lock);
ubf_wakeup_thread(th); ubf_wakeup_thread(th);
} }
@ -1270,16 +1211,39 @@ static int ubf_threads_empty(void) { return 1; }
#define TT_DEBUG 0 #define TT_DEBUG 0
#define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0)
/* 100ms. 10ms is too small for user level thread scheduling
* on recent Linux (tested on 2.6.35)
*/
#define TIME_QUANTUM_USEC (100 * 1000)
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
static struct { static struct {
/* pipes are closed in forked children when owner_process does not match */ /*
* Read end of each pipe is closed inside timer thread for shutdown
* Write ends are closed by a normal Ruby thread during shutdown
*/
int normal[2]; int normal[2];
int low[2];
/* volatile for signal handler use: */ /* volatile for signal handler use: */
volatile rb_pid_t owner_process; volatile rb_pid_t owner_process;
} timer_thread_pipe = { } timer_thread_pipe = {
{-1, -1}, {-1, -1},
{-1, -1}, /* low priority */
}; };
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
static void
async_bug_fd(const char *mesg, int errno_arg, int fd)
{
char buff[64];
size_t n = strlcpy(buff, mesg, sizeof(buff));
if (n < sizeof(buff)-3) {
ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
}
rb_async_bug_errno(buff, errno_arg);
}
/* only use signal-safe system calls here */ /* only use signal-safe system calls here */
static void static void
rb_thread_wakeup_timer_thread_fd(int fd) rb_thread_wakeup_timer_thread_fd(int fd)
@ -1311,33 +1275,49 @@ rb_thread_wakeup_timer_thread_fd(int fd)
} }
void void
rb_thread_wakeup_timer_thread(int sig) rb_thread_wakeup_timer_thread(void)
{ {
/* must be safe inside sighandler, so no mutex */ /* must be safe inside sighandler, so no mutex */
if (timer_thread_pipe.owner_process == getpid()) { if (timer_thread_pipe.owner_process == getpid()) {
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
}
}
/* static void
* system_working check is required because vm and main_thread are rb_thread_wakeup_timer_thread_low(void)
* freed during shutdown {
*/ if (timer_thread_pipe.owner_process == getpid()) {
if (sig && system_working) { rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]);
volatile rb_execution_context_t *ec; }
rb_vm_t *vm = GET_VM(); }
rb_thread_t *mth;
/* /* VM-dependent API is not available for this function */
* FIXME: root VM and main_thread should be static and not static void
* on heap for maximum safety (and startup/shutdown speed) consume_communication_pipe(int fd)
*/ {
if (!vm) return; #define CCP_READ_BUFF_SIZE 1024
mth = vm->main_thread; /* buffer can be shared because no one refers to them. */
if (!mth || !system_working) return; static char buff[CCP_READ_BUFF_SIZE];
ssize_t result;
/* this relies on GC for grace period before cont_free */ while (1) {
ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); result = read(fd, buff, sizeof(buff));
if (result == 0) {
if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec); return;
}
else if (result < 0) {
int e = errno;
switch (e) {
case EINTR:
continue; /* retry */
case EAGAIN:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return;
default:
async_bug_fd("consume_communication_pipe: read", e, fd);
}
} }
} }
} }
@ -1370,7 +1350,6 @@ set_nonblock(int fd)
rb_sys_fail(0); rb_sys_fail(0);
} }
/* communication pipe with timer thread and signal handler */
static int static int
setup_communication_pipe_internal(int pipes[2]) setup_communication_pipe_internal(int pipes[2])
{ {
@ -1395,6 +1374,108 @@ setup_communication_pipe_internal(int pipes[2])
return 0; return 0;
} }
/* communication pipe with timer thread and signal handler */
static int
setup_communication_pipe(void)
{
rb_pid_t owner = timer_thread_pipe.owner_process;
if (owner && owner != getpid()) {
CLOSE_INVALIDATE(normal[0]);
CLOSE_INVALIDATE(normal[1]);
CLOSE_INVALIDATE(low[0]);
CLOSE_INVALIDATE(low[1]);
}
if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) {
return errno;
}
if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) {
return errno;
}
return 0;
}
/**
* Let the timer thread sleep a while.
*
* The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running.
* @pre the calling context is in the timer thread.
*/
static inline void
timer_thread_sleep(rb_vm_t *vm)
{
int result;
int need_polling;
struct pollfd pollfds[2];
pollfds[0].fd = timer_thread_pipe.normal[0];
pollfds[0].events = POLLIN;
pollfds[1].fd = timer_thread_pipe.low[0];
pollfds[1].events = POLLIN;
need_polling = !ubf_threads_empty();
if (SIGCHLD_LOSSY && !need_polling) {
rb_native_mutex_lock(&vm->waitpid_lock);
if (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)) {
need_polling = 1;
}
rb_native_mutex_unlock(&vm->waitpid_lock);
}
if (vm->gvl.waiting > 0 || need_polling) {
/* polling (TIME_QUANTUM_USEC usec) */
result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
}
else {
/* wait (infinite) */
result = poll(pollfds, numberof(pollfds), -1);
}
if (result == 0) {
/* maybe timeout */
}
else if (result > 0) {
consume_communication_pipe(timer_thread_pipe.normal[0]);
consume_communication_pipe(timer_thread_pipe.low[0]);
}
else { /* result < 0 */
int e = errno;
switch (e) {
case EBADF:
case EINVAL:
case ENOMEM: /* from Linux man */
case EFAULT: /* from FreeBSD man */
rb_async_bug_errno("thread_timer: select", e);
default:
/* ignore */;
}
}
}
#endif /* TIMER_THREAD_SLEEPY */
#if TIMER_IMPL == TIMER_THREAD_BUSY
# define PER_NANO 1000000000
void rb_thread_wakeup_timer_thread(void) {}
static void rb_thread_wakeup_timer_thread_low(void) {}
static rb_nativethread_lock_t timer_thread_lock;
static rb_nativethread_cond_t timer_thread_cond;
static inline void
timer_thread_sleep(rb_vm_t *unused)
{
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
ts = native_cond_timeout(&timer_thread_cond, ts);
native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts);
}
#endif /* TIMER_IMPL == TIMER_THREAD_BUSY */
#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) #if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) # define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
#endif #endif
@ -1445,26 +1526,137 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name)
return name; return name;
} }
static void *
thread_timer(void *p)
{
rb_vm_t *vm = p;
#ifdef HAVE_PTHREAD_SIGMASK /* mainly to enable SIGCHLD */
{
sigset_t mask;
sigemptyset(&mask);
pthread_sigmask(SIG_SETMASK, &mask, NULL);
}
#endif
if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n");
#ifdef SET_CURRENT_THREAD_NAME
SET_CURRENT_THREAD_NAME("ruby-timer-thr");
#endif
#if TIMER_IMPL == TIMER_THREAD_BUSY
rb_native_mutex_initialize(&timer_thread_lock);
rb_native_cond_initialize(&timer_thread_cond);
rb_native_mutex_lock(&timer_thread_lock);
#endif
while (system_working > 0) {
/* timer function */
ubf_wakeup_all_threads();
timer_thread_function(0);
if (TT_DEBUG) WRITE_CONST(2, "tick\n");
/* wait */
timer_thread_sleep(vm);
}
#if TIMER_IMPL == TIMER_THREAD_BUSY
rb_native_mutex_unlock(&timer_thread_lock);
rb_native_cond_destroy(&timer_thread_cond);
rb_native_mutex_destroy(&timer_thread_lock);
#endif
if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n");
return NULL;
}
#if (TIMER_IMPL & TIMER_THREAD_MASK)
static void static void
rb_thread_create_timer_thread(void) rb_thread_create_timer_thread(void)
{ {
/* we only create the pipe, and lazy-spawn */ if (!timer_thread.created) {
rb_pid_t current = getpid(); size_t stack_size = 0;
rb_pid_t owner = timer_thread_pipe.owner_process; int err;
pthread_attr_t attr;
rb_vm_t *vm = GET_VM();
if (owner && owner != current) { err = pthread_attr_init(&attr);
CLOSE_INVALIDATE(normal[0]); if (err != 0) {
CLOSE_INVALIDATE(normal[1]); rb_warn("pthread_attr_init failed for timer: %s, scheduling broken",
} strerror(err));
return;
}
# ifdef PTHREAD_STACK_MIN
{
size_t stack_min = PTHREAD_STACK_MIN; /* may be dynamic, get only once */
const size_t min_size = (4096 * 4);
/* Allocate the machine stack for the timer thread
* at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes
* machine stack overflow only with PTHREAD_STACK_MIN.
*/
enum {
needs_more_stack =
#if defined HAVE_VALGRIND_MEMCHECK_H && defined __APPLE__
1
#else
THREAD_DEBUG != 0
#endif
};
stack_size = stack_min;
if (stack_size < min_size) stack_size = min_size;
if (needs_more_stack) {
stack_size += +((BUFSIZ - 1) / stack_min + 1) * stack_min;
}
err = pthread_attr_setstacksize(&attr, stack_size);
if (err != 0) {
rb_bug("pthread_attr_setstacksize(.., %"PRIuSIZE") failed: %s",
stack_size, strerror(err));
}
}
# endif
if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return; #if TIMER_IMPL == TIMER_THREAD_SLEEPY
err = setup_communication_pipe();
if (err) return;
#endif /* TIMER_THREAD_SLEEPY */
if (owner != current) { /* create timer thread */
/* validate pipe on this process */ if (timer_thread.created) {
sigwait_th = THREAD_INVALID; rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
timer_thread_pipe.owner_process = current; }
err = pthread_create(&timer_thread.id, &attr, thread_timer, vm);
pthread_attr_destroy(&attr);
if (err == EINVAL) {
/*
* Even if we are careful with our own stack use in thread_timer(),
* any third-party libraries (eg libkqueue) which rely on __thread
* storage can cause small stack sizes to fail. So lets hope the
* default stack size is enough for them:
*/
stack_size = 0;
err = pthread_create(&timer_thread.id, NULL, thread_timer, vm);
}
if (err != 0) {
rb_warn("pthread_create failed for timer: %s, scheduling broken",
strerror(err));
if (stack_size) {
rb_warn("timer thread stack size: %"PRIuSIZE, stack_size);
}
else {
rb_warn("timer thread stack size: system default");
}
VM_ASSERT(err == 0);
return;
}
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
/* validate pipe on this process */
timer_thread_pipe.owner_process = getpid();
#endif /* TIMER_THREAD_SLEEPY */
timer_thread.created = 1;
} }
} }
#endif /* TIMER_IMPL & TIMER_THREAD_MASK */
static int static int
native_stop_timer_thread(void) native_stop_timer_thread(void)
@ -1473,6 +1665,24 @@ native_stop_timer_thread(void)
stopped = --system_working <= 0; stopped = --system_working <= 0;
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
if (stopped) {
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
/* kick timer thread out of sleep */
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
#endif
/* timer thread will stop looping when system_working <= 0: */
native_thread_join(timer_thread.id);
/*
* don't care if timer_thread_pipe may fill up at this point.
* If we restart timer thread, signals will be processed, if
* we don't, it's because we're in a different child
*/
if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
timer_thread.created = 0;
}
return stopped; return stopped;
} }
@ -1529,14 +1739,20 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
int int
rb_reserved_fd_p(int fd) rb_reserved_fd_p(int fd)
{ {
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
if ((fd == timer_thread_pipe.normal[0] || if ((fd == timer_thread_pipe.normal[0] ||
fd == timer_thread_pipe.normal[1]) && fd == timer_thread_pipe.normal[1] ||
fd == timer_thread_pipe.low[0] ||
fd == timer_thread_pipe.low[1]) &&
timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
return 1; return 1;
} }
else { else {
return 0; return 0;
} }
#else
return 0;
#endif
} }
rb_nativethread_id_t rb_nativethread_id_t
@ -1587,7 +1803,7 @@ rb_sleep_cond_get(const rb_execution_context_t *ec)
{ {
rb_thread_t *th = rb_ec_thread_ptr(ec); rb_thread_t *th = rb_ec_thread_ptr(ec);
return &th->native_thread_data.cond.intr; return &th->native_thread_data.sleep_cond;
} }
void void
@ -1597,126 +1813,4 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond)
} }
#endif /* USE_NATIVE_SLEEP_COND */ #endif /* USE_NATIVE_SLEEP_COND */
int
rb_sigwait_fd_get(const rb_thread_t *th)
{
if (timer_thread_pipe.owner_process == getpid() &&
timer_thread_pipe.normal[0] >= 0) {
if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) {
return timer_thread_pipe.normal[0];
}
}
return -1; /* avoid thundering herd */
}
void
rb_sigwait_fd_put(const rb_thread_t *th, int fd)
{
const rb_thread_t *old;
VM_ASSERT(timer_thread_pipe.normal[0] == fd);
old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID);
if (old != th) assert(old == th);
}
#ifndef HAVE_PPOLL
/* TODO: don't ignore sigmask */
static int
ruby_ppoll(struct pollfd *fds, nfds_t nfds,
const struct timespec *ts, const sigset_t *sigmask)
{
int timeout_ms;
if (ts) {
int tmp, tmp2;
if (ts->tv_sec > INT_MAX/1000)
timeout_ms = INT_MAX;
else {
tmp = (int)(ts->tv_sec * 1000);
/* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */
tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L));
if (INT_MAX - tmp < tmp2)
timeout_ms = INT_MAX;
else
timeout_ms = (int)(tmp + tmp2);
}
}
else
timeout_ms = -1;
return poll(fds, nfds, timeout_ms);
}
# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask))
#endif
void
rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts)
{
struct pollfd pfd;
pfd.fd = sigwait_fd;
pfd.events = POLLIN;
if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) {
(void)ppoll(&pfd, 1, ts, 0);
check_signals_nogvl(th, sigwait_fd);
}
else {
struct timespec end, diff;
const struct timespec *to;
int n = 0;
if (ts) {
getclockofday(&end);
timespec_add(&end, ts);
diff = *ts;
ts = &diff;
}
/*
* tricky: this needs to return on spurious wakeup (no auto-retry).
* But we also need to distinguish between periodic quantum
* wakeups, so we care about the result of consume_communication_pipe
*/
for (;;) {
to = sigwait_timeout(th, sigwait_fd, ts, &n);
if (n) return;
n = ppoll(&pfd, 1, to, 0);
if (check_signals_nogvl(th, sigwait_fd))
return;
if (n || (th && RUBY_VM_INTERRUPTED(th->ec)))
return;
if (ts && timespec_update_expire(&diff, &end))
return;
}
}
}
static void
native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
{
int sigwait_fd = rb_sigwait_fd_get(th);
if (sigwait_fd >= 0) {
rb_native_mutex_lock(&th->interrupt_lock);
th->unblock.func = ubf_sigwait;
rb_native_mutex_unlock(&th->interrupt_lock);
GVL_UNLOCK_BEGIN(th);
if (!RUBY_VM_INTERRUPTED(th->ec)) {
rb_sigwait_sleep(th, sigwait_fd, timeout_rel);
}
else {
check_signals_nogvl(th, sigwait_fd);
}
unblock_function_clear(th);
GVL_UNLOCK_END(th);
rb_sigwait_fd_put(th, sigwait_fd);
rb_sigwait_fd_migrate(th->vm);
}
else {
native_cond_sleep(th, timeout_rel);
}
}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */

View File

@ -22,19 +22,7 @@ typedef pthread_cond_t rb_nativethread_cond_t;
typedef struct native_thread_data_struct { typedef struct native_thread_data_struct {
struct list_node ubf_list; struct list_node ubf_list;
#if defined(__GLIBC__) || defined(__FreeBSD__) rb_nativethread_cond_t sleep_cond;
union
#else
/*
* assume the platform condvars are badly implemented and have a
* "memory" of which mutex they're associated with
*/
struct
#endif
{
rb_nativethread_cond_t intr; /* th->interrupt_lock */
rb_nativethread_cond_t gvlq; /* vm->gvl.lock */
} cond;
} native_thread_data_t; } native_thread_data_t;
#undef except #undef except
@ -44,12 +32,12 @@ typedef struct native_thread_data_struct {
typedef struct rb_global_vm_lock_struct { typedef struct rb_global_vm_lock_struct {
/* fast path */ /* fast path */
const struct rb_thread_struct *acquired; unsigned long acquired;
rb_nativethread_lock_t lock; rb_nativethread_lock_t lock;
/* slow path */ /* slow path */
struct list_head waitq; volatile unsigned long waiting;
const struct rb_thread_struct *timer; rb_nativethread_cond_t cond;
/* yield */ /* yield */
rb_nativethread_cond_t switch_cond; rb_nativethread_cond_t switch_cond;

View File

@ -20,8 +20,6 @@
#define native_thread_yield() Sleep(0) #define native_thread_yield() Sleep(0)
#define unregister_ubf_list(th) #define unregister_ubf_list(th)
#define ubf_wakeup_all_threads() do {} while (0)
#define ubf_threads_empty() (1)
static volatile DWORD ruby_native_thread_key = TLS_OUT_OF_INDEXES; static volatile DWORD ruby_native_thread_key = TLS_OUT_OF_INDEXES;
@ -682,21 +680,18 @@ static struct {
static unsigned long __stdcall static unsigned long __stdcall
timer_thread_func(void *dummy) timer_thread_func(void *dummy)
{ {
rb_vm_t *vm = GET_VM();
thread_debug("timer_thread\n"); thread_debug("timer_thread\n");
rb_w32_set_thread_description(GetCurrentThread(), L"ruby-timer-thread"); rb_w32_set_thread_description(GetCurrentThread(), L"ruby-timer-thread");
while (WaitForSingleObject(timer_thread.lock, TIME_QUANTUM_USEC/1000) == while (WaitForSingleObject(timer_thread.lock, TIME_QUANTUM_USEC/1000) ==
WAIT_TIMEOUT) { WAIT_TIMEOUT) {
timer_thread_function(); timer_thread_function(dummy);
ruby_sigchld_handler(vm); /* probably no-op */
rb_threadptr_check_signal(vm->main_thread);
} }
thread_debug("timer killed\n"); thread_debug("timer killed\n");
return 0; return 0;
} }
void void
rb_thread_wakeup_timer_thread(int sig) rb_thread_wakeup_timer_thread(void)
{ {
/* do nothing */ /* do nothing */
} }
@ -773,26 +768,6 @@ rb_reserved_fd_p(int fd)
return 0; return 0;
} }
int
rb_sigwait_fd_get(rb_thread_t *th)
{
return -1; /* TODO */
}
NORETURN(void rb_sigwait_fd_put(rb_thread_t *, int));
void
rb_sigwait_fd_put(rb_thread_t *th, int fd)
{
rb_bug("not implemented, should not be called");
}
NORETURN(void rb_sigwait_sleep(const rb_thread_t *, int, const struct timespec *));
void
rb_sigwait_sleep(const rb_thread_t *th, int fd, const struct timespec *ts)
{
rb_bug("not implemented, should not be called");
}
rb_nativethread_id_t rb_nativethread_id_t
rb_nativethread_self(void) rb_nativethread_self(void)
{ {

View File

@ -564,12 +564,10 @@ typedef struct rb_vm_struct {
VALUE self; VALUE self;
rb_global_vm_lock_t gvl; rb_global_vm_lock_t gvl;
rb_nativethread_lock_t thread_destruct_lock;
struct rb_thread_struct *main_thread; struct rb_thread_struct *main_thread;
struct rb_thread_struct *running_thread;
/* persists across uncontended GVL release/acquire for time slice */
const struct rb_thread_struct *running_thread;
#ifdef USE_SIGALTSTACK #ifdef USE_SIGALTSTACK
void *main_altstack; void *main_altstack;
#endif #endif
@ -1583,7 +1581,7 @@ void rb_vm_pop_frame(rb_execution_context_t *ec);
void rb_thread_start_timer_thread(void); void rb_thread_start_timer_thread(void);
void rb_thread_stop_timer_thread(void); void rb_thread_stop_timer_thread(void);
void rb_thread_reset_timer_thread(void); void rb_thread_reset_timer_thread(void);
void rb_thread_wakeup_timer_thread(int); void rb_thread_wakeup_timer_thread(void);
static inline void static inline void
rb_vm_living_threads_init(rb_vm_t *vm) rb_vm_living_threads_init(rb_vm_t *vm)