diff --git a/common.mk b/common.mk index 1e9349d464..ec13d25ca2 100644 --- a/common.mk +++ b/common.mk @@ -6576,6 +6576,10 @@ explicit_bzero.$(OBJEXT): {$(VPATH)}internal/config.h explicit_bzero.$(OBJEXT): {$(VPATH)}internal/dllexport.h explicit_bzero.$(OBJEXT): {$(VPATH)}internal/has/attribute.h explicit_bzero.$(OBJEXT): {$(VPATH)}missing.h +file.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h +file.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h +file.$(OBJEXT): $(CCAN_DIR)/list/list.h +file.$(OBJEXT): $(CCAN_DIR)/str/str.h file.$(OBJEXT): $(hdrdir)/ruby/ruby.h file.$(OBJEXT): $(top_srcdir)/internal/array.h file.$(OBJEXT): $(top_srcdir)/internal/class.h @@ -6773,6 +6777,7 @@ file.$(OBJEXT): {$(VPATH)}shape.h file.$(OBJEXT): {$(VPATH)}st.h file.$(OBJEXT): {$(VPATH)}subst.h file.$(OBJEXT): {$(VPATH)}thread.h +file.$(OBJEXT): {$(VPATH)}thread_native.h file.$(OBJEXT): {$(VPATH)}util.h gc.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h gc.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h @@ -7810,6 +7815,10 @@ io.$(OBJEXT): {$(VPATH)}thread_native.h io.$(OBJEXT): {$(VPATH)}util.h io.$(OBJEXT): {$(VPATH)}vm_core.h io.$(OBJEXT): {$(VPATH)}vm_opts.h +io_buffer.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h +io_buffer.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h +io_buffer.$(OBJEXT): $(CCAN_DIR)/list/list.h +io_buffer.$(OBJEXT): $(CCAN_DIR)/str/str.h io_buffer.$(OBJEXT): $(hdrdir)/ruby/ruby.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/array.h io_buffer.$(OBJEXT): $(top_srcdir)/internal/bignum.h @@ -7995,6 +8004,7 @@ io_buffer.$(OBJEXT): {$(VPATH)}onigmo.h io_buffer.$(OBJEXT): {$(VPATH)}oniguruma.h io_buffer.$(OBJEXT): {$(VPATH)}st.h io_buffer.$(OBJEXT): {$(VPATH)}subst.h +io_buffer.$(OBJEXT): {$(VPATH)}thread_native.h iseq.$(OBJEXT): $(CCAN_DIR)/check_type/check_type.h iseq.$(OBJEXT): $(CCAN_DIR)/container_of/container_of.h iseq.$(OBJEXT): $(CCAN_DIR)/list/list.h diff --git a/internal/thread.h b/internal/thread.h index ac7e46b9be..7a6a860aeb 100644 --- a/internal/thread.h +++ b/internal/thread.h @@ -10,6 +10,8 @@ */ #include "ruby/ruby.h" /* for VALUE */ #include "ruby/intern.h" /* for rb_blocking_function_t */ +#include "ccan/list/list.h" /* for list in rb_io_close_wait_list */ +#include "ruby/thread_native.h" /* for mutexes in rb_io_close_wait_list */ struct rb_thread_struct; /* in vm_core.h */ @@ -52,6 +54,14 @@ VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g, int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout); +struct rb_io_close_wait_list { + struct ccan_list_head list; + rb_nativethread_lock_t mu; + rb_nativethread_cond_t cv; +}; +int rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy); +void rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy); + RUBY_SYMBOL_EXPORT_BEGIN /* Temporary. This API will be removed (renamed). */ VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd); diff --git a/io.c b/io.c index 13965c7610..fea1a9cbcd 100644 --- a/io.c +++ b/io.c @@ -5422,9 +5422,17 @@ maygvl_fclose(FILE *file, int keepgvl) static void free_io_buffer(rb_io_buffer_t *buf); static void clear_codeconv(rb_io_t *fptr); +static void* +call_close_wait_nogvl(void *arg) +{ + struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg; + rb_notify_fd_close_wait(busy); + return NULL; +} + static void fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl, - struct ccan_list_head *busy) + struct rb_io_close_wait_list *busy) { VALUE error = Qnil; int fd = fptr->fd; @@ -5467,7 +5475,7 @@ fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl, // Ensure waiting_fd users do not hit EBADF. if (busy) { // Wait for them to exit before we call close(). - do rb_thread_schedule(); while (!ccan_list_empty(busy)); + (void)rb_thread_call_without_gvl(call_close_wait_nogvl, busy, RUBY_UBF_IO, 0); } // Disable for now. @@ -5618,16 +5626,14 @@ rb_io_memsize(const rb_io_t *fptr) # define KEEPGVL FALSE #endif -int rb_notify_fd_close(int fd, struct ccan_list_head *); static rb_io_t * io_close_fptr(VALUE io) { rb_io_t *fptr; VALUE write_io; rb_io_t *write_fptr; - struct ccan_list_head busy; + struct rb_io_close_wait_list busy; - ccan_list_head_init(&busy); write_io = GetWriteIO(io); if (io != write_io) { write_fptr = RFILE(write_io)->fptr; diff --git a/thread.c b/thread.c index a99e2318ab..409ec7435a 100644 --- a/thread.c +++ b/thread.c @@ -155,6 +155,7 @@ struct waiting_fd { struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */ rb_thread_t *th; int fd; + struct rb_io_close_wait_list *busy; }; /********************************************************************************/ @@ -1672,7 +1673,8 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) struct waiting_fd waiting_fd = { .fd = fd, - .th = rb_ec_thread_ptr(ec) + .th = rb_ec_thread_ptr(ec), + .busy = NULL, }; // `errno` is only valid when there is an actual error - but we can't @@ -1702,7 +1704,14 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) */ RB_VM_LOCK_ENTER(); { + if (waiting_fd.busy) { + rb_native_mutex_lock(&waiting_fd.busy->mu); + } ccan_list_del(&waiting_fd.wfd_node); + if (waiting_fd.busy) { + rb_native_cond_broadcast(&waiting_fd.busy->cv); + rb_native_mutex_unlock(&waiting_fd.busy->mu); + } } RB_VM_LOCK_LEAVE(); @@ -2461,10 +2470,12 @@ rb_ec_reset_raised(rb_execution_context_t *ec) } int -rb_notify_fd_close(int fd, struct ccan_list_head *busy) +rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy) { rb_vm_t *vm = GET_THREAD()->vm; struct waiting_fd *wfd = 0, *next; + ccan_list_head_init(&busy->list); + int has_any; RB_VM_LOCK_ENTER(); { @@ -2474,27 +2485,52 @@ rb_notify_fd_close(int fd, struct ccan_list_head *busy) VALUE err; ccan_list_del(&wfd->wfd_node); - ccan_list_add(busy, &wfd->wfd_node); + ccan_list_add(&busy->list, &wfd->wfd_node); + wfd->busy = busy; err = th->vm->special_exceptions[ruby_error_stream_closed]; rb_threadptr_pending_interrupt_enque(th, err); rb_threadptr_interrupt(th); } } } + has_any = !ccan_list_empty(&busy->list); + if (has_any) { + rb_native_mutex_initialize(&busy->mu); + rb_native_cond_initialize(&busy->cv); + } RB_VM_LOCK_LEAVE(); - return !ccan_list_empty(busy); + return has_any; +} + +void +rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy) +{ + rb_native_mutex_lock(&busy->mu); + while (!ccan_list_empty(&busy->list)) { + rb_native_cond_wait(&busy->cv, &busy->mu); + }; + rb_native_mutex_unlock(&busy->mu); + rb_native_mutex_destroy(&busy->mu); + rb_native_cond_destroy(&busy->cv); +} + +static void* +call_notify_fd_close_wait_nogvl(void *arg) +{ + struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg; + rb_notify_fd_close_wait(busy); + return NULL; } void rb_thread_fd_close(int fd) { - struct ccan_list_head busy; + struct rb_io_close_wait_list busy; - ccan_list_head_init(&busy); if (rb_notify_fd_close(fd, &busy)) { - do rb_thread_schedule(); while (!ccan_list_empty(&busy)); + rb_thread_call_without_gvl(call_notify_fd_close_wait_nogvl, &busy, RUBY_UBF_IO, 0); } }