diff --git a/thread.c b/thread.c index 1a857a12ca..da55879824 100644 --- a/thread.c +++ b/thread.c @@ -1646,8 +1646,14 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, return rb_nogvl(func, data1, ubf, data2, 0); } +static int +waitfd_to_waiting_flag(int wfd_event) +{ + return wfd_event << 1; +} + static void -rb_thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd) +thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd) { wfd->fd = fd; wfd->th = th; @@ -1661,7 +1667,7 @@ rb_thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd) } static void -rb_thread_io_wake_pending_closer(struct waiting_fd *wfd) +thread_io_wake_pending_closer(struct waiting_fd *wfd) { bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex); if (has_waiter) { @@ -1682,9 +1688,37 @@ rb_thread_io_wake_pending_closer(struct waiting_fd *wfd) } static int -waitfd_to_waiting_flag(int wfd_event) +thread_io_wait_events(rb_thread_t *th, rb_execution_context_t *ec, int fd, int events, struct timeval *timeout, struct waiting_fd *wfd) { - return wfd_event << 1; +#if defined(USE_MN_THREADS) && USE_MN_THREADS + if (!th_has_dedicated_nt(th) && (events || timeout)) { + int r; + rb_hrtime_t rel, *prel; + + if (timeout) { + rel = rb_timeval2hrtime(timeout); + prel = &rel; + } + else { + prel = NULL; + } + + VM_ASSERT(prel || events == RB_WAITFD_IN || events == RB_WAITFD_OUT); + + thread_io_setup_wfd(th, fd, wfd); + { + // wait readable/writable + r = thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel); + } + thread_io_wake_pending_closer(wfd); + + RUBY_VM_CHECK_INTS_BLOCKING(ec); + + return r; + } +#endif // defined(USE_MN_THREADS) && USE_MN_THREADS + + return 0; } VALUE @@ -1697,20 +1731,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in struct waiting_fd waiting_fd; -#ifdef RUBY_THREAD_PTHREAD_H - if (events && !th_has_dedicated_nt(th)) { - VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT); - - rb_thread_io_setup_wfd(th, fd, &waiting_fd); - { - // wait readable/writable - thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL); - } - rb_thread_io_wake_pending_closer(&waiting_fd); - - RUBY_VM_CHECK_INTS_BLOCKING(ec); - } -#endif + thread_io_wait_events(th, ec, fd, events, NULL, &waiting_fd); volatile VALUE val = Qundef; /* shouldn't be used */ volatile int saved_errno = 0; @@ -1722,7 +1743,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in // `func` or not (as opposed to some previously set value). errno = 0; - rb_thread_io_setup_wfd(th, fd, &waiting_fd); + thread_io_setup_wfd(th, fd, &waiting_fd); EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { @@ -1737,7 +1758,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in * must be deleted before jump * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list */ - rb_thread_io_wake_pending_closer(&waiting_fd); + thread_io_wake_pending_closer(&waiting_fd); if (state) { EC_JUMP_TAG(ec, state); @@ -4265,27 +4286,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); } -static bool -thread_sched_wait_events_timeval(rb_thread_t *th, int fd, int events, struct timeval *timeout) -{ -#ifdef RUBY_THREAD_PTHREAD_H - if (!th->nt->dedicated) { - rb_hrtime_t rel, *prel; - - if (timeout) { - rel = rb_timeval2hrtime(timeout); - prel = &rel; - } - else { - prel = NULL; - } - - return thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel); - } -#endif // RUBY_THREAD_PTHREAD_H - return 0; -} - #ifdef USE_POLL /* The same with linux kernel. TODO: make platform independent definition. */ @@ -4310,19 +4310,14 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) int state; volatile int lerrno; - rb_thread_t *th = wfd.th = GET_THREAD(); - wfd.fd = fd; - wfd.busy = NULL; + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_ec_thread_ptr(ec); - if (thread_sched_wait_events_timeval(th, fd, events, timeout)) { + if (thread_io_wait_events(th, ec, fd, events, timeout, &wfd)) { return 0; // timeout } - RB_VM_LOCK_ENTER(); - { - ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); - } - RB_VM_LOCK_LEAVE(); + thread_io_setup_wfd(th, fd, &wfd); EC_PUSH_TAG(wfd.th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { @@ -4350,7 +4345,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) } EC_POP_TAG(); - rb_thread_io_wake_pending_closer(&wfd); + thread_io_wake_pending_closer(&wfd); if (state) { EC_JUMP_TAG(wfd.th->ec, state); @@ -4424,7 +4419,7 @@ select_single_cleanup(VALUE ptr) { struct select_args *args = (struct select_args *)ptr; - rb_thread_io_wake_pending_closer(&args->wfd); + thread_io_wake_pending_closer(&args->wfd); if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -4451,9 +4446,10 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) struct select_args args; int r; VALUE ptr = (VALUE)&args; - rb_thread_t *th = GET_THREAD(); + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_ec_thread_ptr(ec); - if (thread_sched_wait_events_timeval(th, fd, events, timeout)) { + if (thread_io_wait_events(th, ec, fd, events, timeout, &args.wfd)) { return 0; // timeout }