MN: fix "raise on close"

Introduce `thread_io_wait_events()` to make 1 function to call
`thread_sched_wait_events()`.

In ``thread_io_wait_events()`, manipulate `waiting_fd` to raise
an exception when closing the IO correctly.
This commit is contained in:
Koichi Sasada 2023-12-23 03:57:53 +09:00
parent 19d082dcfa
commit bbfc262c99

104
thread.c
View File

@ -1646,8 +1646,14 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
return rb_nogvl(func, data1, ubf, data2, 0);
}
static int
waitfd_to_waiting_flag(int wfd_event)
{
return wfd_event << 1;
}
static void
rb_thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
{
wfd->fd = fd;
wfd->th = th;
@ -1661,7 +1667,7 @@ rb_thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
}
static void
rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
thread_io_wake_pending_closer(struct waiting_fd *wfd)
{
bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
if (has_waiter) {
@ -1682,9 +1688,37 @@ rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
}
static int
waitfd_to_waiting_flag(int wfd_event)
thread_io_wait_events(rb_thread_t *th, rb_execution_context_t *ec, int fd, int events, struct timeval *timeout, struct waiting_fd *wfd)
{
return wfd_event << 1;
#if defined(USE_MN_THREADS) && USE_MN_THREADS
if (!th_has_dedicated_nt(th) && (events || timeout)) {
int r;
rb_hrtime_t rel, *prel;
if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}
VM_ASSERT(prel || events == RB_WAITFD_IN || events == RB_WAITFD_OUT);
thread_io_setup_wfd(th, fd, wfd);
{
// wait readable/writable
r = thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
}
thread_io_wake_pending_closer(wfd);
RUBY_VM_CHECK_INTS_BLOCKING(ec);
return r;
}
#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
return 0;
}
VALUE
@ -1697,20 +1731,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
struct waiting_fd waiting_fd;
#ifdef RUBY_THREAD_PTHREAD_H
if (events && !th_has_dedicated_nt(th)) {
VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT);
rb_thread_io_setup_wfd(th, fd, &waiting_fd);
{
// wait readable/writable
thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL);
}
rb_thread_io_wake_pending_closer(&waiting_fd);
RUBY_VM_CHECK_INTS_BLOCKING(ec);
}
#endif
thread_io_wait_events(th, ec, fd, events, NULL, &waiting_fd);
volatile VALUE val = Qundef; /* shouldn't be used */
volatile int saved_errno = 0;
@ -1722,7 +1743,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
// `func` or not (as opposed to some previously set value).
errno = 0;
rb_thread_io_setup_wfd(th, fd, &waiting_fd);
thread_io_setup_wfd(th, fd, &waiting_fd);
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
@ -1737,7 +1758,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
* must be deleted before jump
* this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
*/
rb_thread_io_wake_pending_closer(&waiting_fd);
thread_io_wake_pending_closer(&waiting_fd);
if (state) {
EC_JUMP_TAG(ec, state);
@ -4265,27 +4286,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}
static bool
thread_sched_wait_events_timeval(rb_thread_t *th, int fd, int events, struct timeval *timeout)
{
#ifdef RUBY_THREAD_PTHREAD_H
if (!th->nt->dedicated) {
rb_hrtime_t rel, *prel;
if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}
return thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
}
#endif // RUBY_THREAD_PTHREAD_H
return 0;
}
#ifdef USE_POLL
/* The same with linux kernel. TODO: make platform independent definition. */
@ -4310,19 +4310,14 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
int state;
volatile int lerrno;
rb_thread_t *th = wfd.th = GET_THREAD();
wfd.fd = fd;
wfd.busy = NULL;
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);
if (thread_sched_wait_events_timeval(th, fd, events, timeout)) {
if (thread_io_wait_events(th, ec, fd, events, timeout, &wfd)) {
return 0; // timeout
}
RB_VM_LOCK_ENTER();
{
ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
}
RB_VM_LOCK_LEAVE();
thread_io_setup_wfd(th, fd, &wfd);
EC_PUSH_TAG(wfd.th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
@ -4350,7 +4345,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
}
EC_POP_TAG();
rb_thread_io_wake_pending_closer(&wfd);
thread_io_wake_pending_closer(&wfd);
if (state) {
EC_JUMP_TAG(wfd.th->ec, state);
@ -4424,7 +4419,7 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;
rb_thread_io_wake_pending_closer(&args->wfd);
thread_io_wake_pending_closer(&args->wfd);
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
@ -4451,9 +4446,10 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct select_args args;
int r;
VALUE ptr = (VALUE)&args;
rb_thread_t *th = GET_THREAD();
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);
if (thread_sched_wait_events_timeval(th, fd, events, timeout)) {
if (thread_io_wait_events(th, ec, fd, events, timeout, &args.wfd)) {
return 0; // timeout
}