Use a real Ruby mutex in rb_io_close_wait_list (#7884)
Because a thread calling IO#close now blocks in a native condvar wait, it's possible for there to be _no_ threads left to actually handle incoming signals/ubf calls/etc. This manifested as failing tests on Solaris 10 (SPARC), because: * One thread called IO#close, which sent a SIGVTALRM to the other thread to interrupt it, and then waited on the condvar to be notified that the reading thread was done. * One thread was calling IO#read, but it hadn't yet reached the actual call to select(2) when the SIGVTALRM arrived, so it never unblocked itself. This results in a deadlock. The fix is to use a real Ruby mutex for the close lock; that way, the closing thread goes into sigwait-sleep and can keep trying to interrupt the select(2) thread. See the discussion in: https://github.com/ruby/ruby/pull/7865/
This commit is contained in:
parent
d8f333491e
commit
edee9b6a12
Notes:
git
2023-06-01 08:37:41 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
@ -11,7 +11,6 @@
|
|||||||
#include "ruby/ruby.h" /* for VALUE */
|
#include "ruby/ruby.h" /* for VALUE */
|
||||||
#include "ruby/intern.h" /* for rb_blocking_function_t */
|
#include "ruby/intern.h" /* for rb_blocking_function_t */
|
||||||
#include "ccan/list/list.h" /* for list in rb_io_close_wait_list */
|
#include "ccan/list/list.h" /* for list in rb_io_close_wait_list */
|
||||||
#include "ruby/thread_native.h" /* for mutexes in rb_io_close_wait_list */
|
|
||||||
|
|
||||||
struct rb_thread_struct; /* in vm_core.h */
|
struct rb_thread_struct; /* in vm_core.h */
|
||||||
|
|
||||||
@ -55,9 +54,9 @@ VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g,
|
|||||||
int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);
|
int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);
|
||||||
|
|
||||||
struct rb_io_close_wait_list {
|
struct rb_io_close_wait_list {
|
||||||
struct ccan_list_head list;
|
struct ccan_list_head pending_fd_users;
|
||||||
rb_nativethread_lock_t mu;
|
VALUE closing_thread;
|
||||||
rb_nativethread_cond_t cv;
|
VALUE wakeup_mutex;
|
||||||
};
|
};
|
||||||
int rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy);
|
int rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy);
|
||||||
void rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy);
|
void rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy);
|
||||||
|
10
io.c
10
io.c
@ -5423,14 +5423,6 @@ maygvl_fclose(FILE *file, int keepgvl)
|
|||||||
static void free_io_buffer(rb_io_buffer_t *buf);
|
static void free_io_buffer(rb_io_buffer_t *buf);
|
||||||
static void clear_codeconv(rb_io_t *fptr);
|
static void clear_codeconv(rb_io_t *fptr);
|
||||||
|
|
||||||
static void*
|
|
||||||
call_close_wait_nogvl(void *arg)
|
|
||||||
{
|
|
||||||
struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg;
|
|
||||||
rb_notify_fd_close_wait(busy);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
|
fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
|
||||||
struct rb_io_close_wait_list *busy)
|
struct rb_io_close_wait_list *busy)
|
||||||
@ -5476,7 +5468,7 @@ fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
|
|||||||
// Ensure waiting_fd users do not hit EBADF.
|
// Ensure waiting_fd users do not hit EBADF.
|
||||||
if (busy) {
|
if (busy) {
|
||||||
// Wait for them to exit before we call close().
|
// Wait for them to exit before we call close().
|
||||||
(void)rb_thread_call_without_gvl(call_close_wait_nogvl, busy, RUBY_UBF_IO, 0);
|
rb_notify_fd_close_wait(busy);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disable for now.
|
// Disable for now.
|
||||||
|
96
thread.c
96
thread.c
@ -1663,6 +1663,27 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
|
|||||||
return rb_nogvl(func, data1, ubf, data2, 0);
|
return rb_nogvl(func, data1, ubf, data2, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
|
||||||
|
{
|
||||||
|
bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
|
||||||
|
if (has_waiter) {
|
||||||
|
rb_mutex_lock(wfd->busy->wakeup_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Needs to be protected with RB_VM_LOCK because we don't know if
|
||||||
|
wfd is on the global list of pending FD ops or if it's on a
|
||||||
|
struct rb_io_close_wait_list close-waiter. */
|
||||||
|
RB_VM_LOCK_ENTER();
|
||||||
|
ccan_list_del(&wfd->wfd_node);
|
||||||
|
RB_VM_LOCK_LEAVE();
|
||||||
|
|
||||||
|
if (has_waiter) {
|
||||||
|
rb_thread_wakeup(wfd->busy->closing_thread);
|
||||||
|
rb_mutex_unlock(wfd->busy->wakeup_mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
||||||
{
|
{
|
||||||
@ -1700,20 +1721,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* must be deleted before jump
|
* must be deleted before jump
|
||||||
* this will delete either from waiting_fds or on-stack CCAN_LIST_HEAD(busy)
|
* this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
|
||||||
*/
|
*/
|
||||||
RB_VM_LOCK_ENTER();
|
rb_thread_io_wake_pending_closer(&waiting_fd);
|
||||||
{
|
|
||||||
if (waiting_fd.busy) {
|
|
||||||
rb_native_mutex_lock(&waiting_fd.busy->mu);
|
|
||||||
}
|
|
||||||
ccan_list_del(&waiting_fd.wfd_node);
|
|
||||||
if (waiting_fd.busy) {
|
|
||||||
rb_native_cond_broadcast(&waiting_fd.busy->cv);
|
|
||||||
rb_native_mutex_unlock(&waiting_fd.busy->mu);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
RB_VM_LOCK_LEAVE();
|
|
||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
EC_JUMP_TAG(ec, state);
|
EC_JUMP_TAG(ec, state);
|
||||||
@ -2474,8 +2484,9 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
|
|||||||
{
|
{
|
||||||
rb_vm_t *vm = GET_THREAD()->vm;
|
rb_vm_t *vm = GET_THREAD()->vm;
|
||||||
struct waiting_fd *wfd = 0, *next;
|
struct waiting_fd *wfd = 0, *next;
|
||||||
ccan_list_head_init(&busy->list);
|
ccan_list_head_init(&busy->pending_fd_users);
|
||||||
int has_any;
|
int has_any;
|
||||||
|
VALUE wakeup_mutex;
|
||||||
|
|
||||||
RB_VM_LOCK_ENTER();
|
RB_VM_LOCK_ENTER();
|
||||||
{
|
{
|
||||||
@ -2485,7 +2496,7 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
|
|||||||
VALUE err;
|
VALUE err;
|
||||||
|
|
||||||
ccan_list_del(&wfd->wfd_node);
|
ccan_list_del(&wfd->wfd_node);
|
||||||
ccan_list_add(&busy->list, &wfd->wfd_node);
|
ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
|
||||||
|
|
||||||
wfd->busy = busy;
|
wfd->busy = busy;
|
||||||
err = th->vm->special_exceptions[ruby_error_stream_closed];
|
err = th->vm->special_exceptions[ruby_error_stream_closed];
|
||||||
@ -2494,34 +2505,39 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
has_any = !ccan_list_empty(&busy->list);
|
|
||||||
|
has_any = !ccan_list_empty(&busy->pending_fd_users);
|
||||||
|
busy->closing_thread = rb_thread_current();
|
||||||
|
wakeup_mutex = Qnil;
|
||||||
if (has_any) {
|
if (has_any) {
|
||||||
rb_native_mutex_initialize(&busy->mu);
|
wakeup_mutex = rb_mutex_new();
|
||||||
rb_native_cond_initialize(&busy->cv);
|
RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */
|
||||||
}
|
}
|
||||||
|
busy->wakeup_mutex = wakeup_mutex;
|
||||||
|
|
||||||
RB_VM_LOCK_LEAVE();
|
RB_VM_LOCK_LEAVE();
|
||||||
|
|
||||||
|
/* If the caller didn't pass *busy as a pointer to something on the stack,
|
||||||
|
we need to guard this mutex object on _our_ C stack for the duration
|
||||||
|
of this function. */
|
||||||
|
RB_GC_GUARD(wakeup_mutex);
|
||||||
return has_any;
|
return has_any;
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
|
rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
|
||||||
{
|
{
|
||||||
rb_native_mutex_lock(&busy->mu);
|
if (!RB_TEST(busy->wakeup_mutex)) {
|
||||||
while (!ccan_list_empty(&busy->list)) {
|
/* There was nobody else using this file when we closed it, so we
|
||||||
rb_native_cond_wait(&busy->cv, &busy->mu);
|
never bothered to allocate a mutex*/
|
||||||
};
|
return;
|
||||||
rb_native_mutex_unlock(&busy->mu);
|
}
|
||||||
rb_native_mutex_destroy(&busy->mu);
|
|
||||||
rb_native_cond_destroy(&busy->cv);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void*
|
rb_mutex_lock(busy->wakeup_mutex);
|
||||||
call_notify_fd_close_wait_nogvl(void *arg)
|
while (!ccan_list_empty(&busy->pending_fd_users)) {
|
||||||
{
|
rb_mutex_sleep(busy->wakeup_mutex, Qnil);
|
||||||
struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg;
|
}
|
||||||
rb_notify_fd_close_wait(busy);
|
rb_mutex_unlock(busy->wakeup_mutex);
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@ -2530,7 +2546,7 @@ rb_thread_fd_close(int fd)
|
|||||||
struct rb_io_close_wait_list busy;
|
struct rb_io_close_wait_list busy;
|
||||||
|
|
||||||
if (rb_notify_fd_close(fd, &busy)) {
|
if (rb_notify_fd_close(fd, &busy)) {
|
||||||
rb_thread_call_without_gvl(call_notify_fd_close_wait_nogvl, &busy, RUBY_UBF_IO, 0);
|
rb_notify_fd_close_wait(&busy);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4273,6 +4289,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
|||||||
|
|
||||||
wfd.th = GET_THREAD();
|
wfd.th = GET_THREAD();
|
||||||
wfd.fd = fd;
|
wfd.fd = fd;
|
||||||
|
wfd.busy = NULL;
|
||||||
|
|
||||||
RB_VM_LOCK_ENTER();
|
RB_VM_LOCK_ENTER();
|
||||||
{
|
{
|
||||||
@ -4324,11 +4341,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
|||||||
}
|
}
|
||||||
EC_POP_TAG();
|
EC_POP_TAG();
|
||||||
|
|
||||||
RB_VM_LOCK_ENTER();
|
rb_thread_io_wake_pending_closer(&wfd);
|
||||||
{
|
|
||||||
ccan_list_del(&wfd.wfd_node);
|
|
||||||
}
|
|
||||||
RB_VM_LOCK_LEAVE();
|
|
||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
EC_JUMP_TAG(wfd.th->ec, state);
|
EC_JUMP_TAG(wfd.th->ec, state);
|
||||||
@ -4402,11 +4415,7 @@ select_single_cleanup(VALUE ptr)
|
|||||||
{
|
{
|
||||||
struct select_args *args = (struct select_args *)ptr;
|
struct select_args *args = (struct select_args *)ptr;
|
||||||
|
|
||||||
RB_VM_LOCK_ENTER();
|
rb_thread_io_wake_pending_closer(&args->wfd);
|
||||||
{
|
|
||||||
ccan_list_del(&args->wfd.wfd_node);
|
|
||||||
}
|
|
||||||
RB_VM_LOCK_LEAVE();
|
|
||||||
if (args->read) rb_fd_term(args->read);
|
if (args->read) rb_fd_term(args->read);
|
||||||
if (args->write) rb_fd_term(args->write);
|
if (args->write) rb_fd_term(args->write);
|
||||||
if (args->except) rb_fd_term(args->except);
|
if (args->except) rb_fd_term(args->except);
|
||||||
@ -4429,6 +4438,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
|||||||
args.tv = timeout;
|
args.tv = timeout;
|
||||||
args.wfd.fd = fd;
|
args.wfd.fd = fd;
|
||||||
args.wfd.th = GET_THREAD();
|
args.wfd.th = GET_THREAD();
|
||||||
|
args.wfd.busy = NULL;
|
||||||
|
|
||||||
RB_VM_LOCK_ENTER();
|
RB_VM_LOCK_ENTER();
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user