From 8782e02138e6fe18b6c0dcc29bb877d6cdae57e5 Mon Sep 17 00:00:00 2001 From: JP Camara Date: Wed, 6 Dec 2023 20:01:14 -0500 Subject: [PATCH] KQueue support for M:N threads * Allows macOS users to use M:N threads (and technically FreeBSD, though it has not been verified on FreeBSD) * Include sys/event.h header check for macros, and include sys/event.h when present * Rename epoll_fd to more generic kq_fd (Kernel event Queue) for use by both epoll and kqueue * MAP_STACK is not available on macOS so conditionall apply it to mmap flags * Set fd to close on exec * Log debug messages specific to kqueue and epoll on creation * close_invalidate raises an error for the kqueue fd on child process fork. It's unclear rn if that's a bug, or if it's kqueue specific behavior Use kq with rb_thread_wait_for_single_fd * Only platforms with `USE_POLL` (linux) had changes applied to take advantage of kernel event queues. It needed to be applied to the `select` so that kqueue could be properly applied * Clean up kqueue specific code and make sure only flags that were actually set are removed (or an error is raised) * Also handle kevent specific errnos, since most don't apply from epoll to kqueue * Use the more platform standard close-on-exec approach of `fcntl` and `FD_CLOEXEC`. The io-event gem uses `ioctl`, but fcntl seems to be the recommended choice. It is also what Go, Bun, and Libuv use * We're making changes in this file anyways - may as well fix a couple spelling mistakes while here Make sure FD_CLOEXEC carries over in dup * Otherwise the kqueue descriptor should have FD_CLOEXEC, but doesn't and fails in assert_close_on_exec --- configure.ac | 1 + process.c | 7 ++ thread.c | 44 ++++++--- thread_pthread.c | 24 +++-- thread_pthread_mn.c | 216 +++++++++++++++++++++++++++++++++++++++----- 5 files changed, 250 insertions(+), 42 deletions(-) diff --git a/configure.ac b/configure.ac index a57e506d55..9286946fc1 100644 --- a/configure.ac +++ b/configure.ac @@ -1352,6 +1352,7 @@ AC_CHECK_HEADERS(time.h) AC_CHECK_HEADERS(ucontext.h) AC_CHECK_HEADERS(utime.h) AC_CHECK_HEADERS(sys/epoll.h) +AC_CHECK_HEADERS(sys/event.h) AS_CASE("$target_cpu", [x64|x86_64|i[3-6]86*], [ AC_CHECK_HEADERS(x86intrin.h) diff --git a/process.c b/process.c index 395c69e08f..3b254ddecb 100644 --- a/process.c +++ b/process.c @@ -3354,6 +3354,13 @@ run_exec_dup2(VALUE ary, VALUE tmpbuf, struct rb_execarg *sargp, char *errmsg, s ERRMSG("dup"); goto fail; } + // without this, kqueue timer_th.event_fd fails with a reserved FD did not have close-on-exec + // in #assert_close_on_exec because the FD_CLOEXEC is not dup'd by default + if (fd_get_cloexec(pairs[i].oldfd, errmsg, errmsg_buflen)) { + if (fd_set_cloexec(extra_fd, errmsg, errmsg_buflen)) { + goto fail; + } + } rb_update_max_fd(extra_fd); } else { diff --git a/thread.c b/thread.c index 30555c6f7f..7c53f958fe 100644 --- a/thread.c +++ b/thread.c @@ -4265,6 +4265,27 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); } +#ifdef RUBY_THREAD_PTHREAD_H + +static bool +thread_sched_wait_events_timeval(int fd, int events, struct timeval *timeout) +{ + rb_thread_t *th = GET_THREAD(); + rb_hrtime_t rel, *prel; + + if (timeout) { + rel = rb_timeval2hrtime(timeout); + prel = &rel; + } + else { + prel = NULL; + } + + return thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel); +} + +#endif + #ifdef USE_POLL /* The same with linux kernel. TODO: make platform independent definition. */ @@ -4294,18 +4315,8 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) wfd.busy = NULL; #ifdef RUBY_THREAD_PTHREAD_H - if (!th->nt->dedicated) { - rb_hrtime_t rel, *prel; - - if (timeout) { - rel = rb_timeval2hrtime(timeout); - prel = &rel; - } - else { - prel = NULL; - } - - if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) { + if (!th_has_dedicated_nt(th)) { + if (thread_sched_wait_events_timeval(fd, events, timeout)) { return 0; // timeout } } @@ -4445,6 +4456,15 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) int r; VALUE ptr = (VALUE)&args; +#ifdef RUBY_THREAD_PTHREAD_H + rb_thread_t *th = GET_THREAD(); + if (!th_has_dedicated_nt(th)) { + if (thread_sched_wait_events_timeval(fd, events, timeout)) { + return 0; // timeout + } + } +#endif + args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; diff --git a/thread_pthread.c b/thread_pthread.c index e7e827793b..d8ca403be6 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -62,6 +62,10 @@ static const void *const condattr_monotonic = NULL; #include COROUTINE_H +#ifndef HAVE_SYS_EVENT_H +#define HAVE_SYS_EVENT_H 0 +#endif + #ifndef HAVE_SYS_EPOLL_H #define HAVE_SYS_EPOLL_H 0 #else @@ -78,6 +82,9 @@ static const void *const condattr_monotonic = NULL; #elif HAVE_SYS_EPOLL_H #include #define USE_MN_THREADS 1 + #elif HAVE_SYS_EVENT_H + #include + #define USE_MN_THREADS 1 #else #define USE_MN_THREADS 0 #endif @@ -2799,10 +2806,15 @@ static struct { int comm_fds[2]; // r, w +#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS + int event_fd; // kernel event queue fd (epoll/kqueue) +#endif #if HAVE_SYS_EPOLL_H && USE_MN_THREADS #define EPOLL_EVENTS_MAX 0x10 - int epoll_fd; struct epoll_event finished_events[EPOLL_EVENTS_MAX]; +#elif HAVE_SYS_EVENT_H && USE_MN_THREADS +#define KQUEUE_EVENTS_MAX 0x10 + struct kevent finished_events[KQUEUE_EVENTS_MAX]; #endif // waiting threads list @@ -3088,7 +3100,7 @@ rb_thread_create_timer_thread(void) CLOSE_INVALIDATE_PAIR(timer_th.comm_fds); #if HAVE_SYS_EPOLL_H && USE_MN_THREADS - close_invalidate(&timer_th.epoll_fd, "close epoll_fd"); + close_invalidate(&timer_th.event_fd, "close event_fd"); #endif rb_native_mutex_destroy(&timer_th.waiting_lock); } @@ -3099,8 +3111,8 @@ rb_thread_create_timer_thread(void) // open communication channel setup_communication_pipe_internal(timer_th.comm_fds); - // open epoll fd - timer_thread_setup_nm(); + // open event fd + timer_thread_setup_mn(); } pthread_create(&timer_th.pthread_id, NULL, timer_thread_func, GET_VM()); @@ -3181,8 +3193,8 @@ rb_reserved_fd_p(int fd) if (fd == timer_th.comm_fds[0] || fd == timer_th.comm_fds[1] -#if HAVE_SYS_EPOLL_H && USE_MN_THREADS - || fd == timer_th.epoll_fd +#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS + || fd == timer_th.event_fd #endif ) { goto check_fork_gen; diff --git a/thread_pthread_mn.c b/thread_pthread_mn.c index e516d787b3..93bd189aae 100644 --- a/thread_pthread_mn.c +++ b/thread_pthread_mn.c @@ -2,7 +2,7 @@ #if USE_MN_THREADS -static void timer_thread_unregister_waiting(rb_thread_t *th, int fd); +static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags); static bool timer_thread_cancel_waiting(rb_thread_t *th) @@ -16,7 +16,7 @@ timer_thread_cancel_waiting(rb_thread_t *th) canceled = true; ccan_list_del_init(&th->sched.waiting_reason.node); if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) { - timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd); + timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags); } th->sched.waiting_reason.flags = thread_sched_waiting_none; } @@ -169,7 +169,7 @@ static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT; // vm_stack_size + machine_stack_size + 1 * (guard page size) static inline size_t -nt_therad_stack_size(void) +nt_thread_stack_size(void) { static size_t msz; if (LIKELY(msz > 0)) return msz; @@ -184,12 +184,17 @@ nt_therad_stack_size(void) static struct nt_stack_chunk_header * nt_alloc_thread_stack_chunk(void) { - const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_STACK, -1, 0); + int mmap_flags = MAP_ANONYMOUS | MAP_PRIVATE; +#ifdef MAP_STACK // not available on macOS + mmap_flags |= MAP_STACK; +#endif + + const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); if (m == MAP_FAILED) { return NULL; } - size_t msz = nt_therad_stack_size(); + size_t msz = nt_thread_stack_size(); int header_page_cnt = 1; int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count; @@ -218,7 +223,7 @@ static void * nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx) { const char *m = (char *)ch; - return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_therad_stack_size()); + return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_thread_stack_size()); } static struct nt_machine_stack_footer * @@ -354,9 +359,9 @@ nt_free_stack(void *mstack) // clear the stack pages #if defined(MADV_FREE) - int r = madvise(stack, nt_therad_stack_size(), MADV_FREE); + int r = madvise(stack, nt_thread_stack_size(), MADV_FREE); #elif defined(MADV_DONTNEED) - int r = madvise(stack, nt_therad_stack_size(), MADV_DONTNEED); + int r = madvise(stack, nt_thread_stack_size(), MADV_DONTNEED); #else int r = 0; #endif @@ -501,8 +506,8 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, #endif // USE_MN_THREADS -/// EPOLL specific code -#if HAVE_SYS_EPOLL_H && USE_MN_THREADS +/// EPOLL/KQUEUE specific code +#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS static bool fd_readable_nonblock(int fd) @@ -541,6 +546,78 @@ verify_waiting_list(void) #endif } +#if HAVE_SYS_EVENT_H // kqueue helpers + +static enum thread_sched_waiting_flag +kqueue_translate_filter_to_flags(int16_t filter) +{ + switch (filter) { + case EVFILT_READ: + return thread_sched_waiting_io_read; + case EVFILT_WRITE: + return thread_sched_waiting_io_write; + case EVFILT_TIMER: + return thread_sched_waiting_timeout; + default: + rb_bug("kevent filter:%d not supported", filter); + } +} + +static int +kqueue_wait(rb_vm_t *vm) +{ + struct timespec calculated_timeout; + struct timespec *timeout = NULL; + int timeout_ms = timer_thread_set_timeout(vm); + + if (timeout_ms >= 0) { + calculated_timeout.tv_sec = timeout_ms / 1000; + calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000; + timeout = &calculated_timeout; + } + + return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout); +} + +static void +kqueue_create(void) +{ + if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno); + int flags = fcntl(timer_th.event_fd, F_GETFD); + if (flags == -1) { + rb_bug("kqueue GETFD failed (errno:%d)", errno); + } + + flags |= FD_CLOEXEC; + if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) { + rb_bug("kqueue SETFD failed (errno:%d)", errno); + } +} + +static void +kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags) +{ + if (flags) { + struct kevent ke[2]; + int num_events = 0; + + if (flags & thread_sched_waiting_io_read) { + EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + num_events++; + } + if (flags & thread_sched_waiting_io_write) { + EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + num_events++; + } + if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) { + perror("kevent"); + rb_bug("unregister/kevent fails. errno:%d", errno); + } + } +} + +#endif // HAVE_SYS_EVENT_H + // return false if the fd is not waitable or not need to wait. static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel) @@ -565,7 +642,12 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting flags |= thread_sched_waiting_timeout; } +#if HAVE_SYS_EVENT_H + struct kevent ke[2]; + int num_events = 0; +#else uint32_t epoll_events = 0; +#endif if (flags & thread_sched_waiting_timeout) { VM_ASSERT(rel != NULL); abs = rb_hrtime_add(rb_hrtime_now(), *rel); @@ -578,7 +660,12 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting } else { VM_ASSERT(fd >= 0); +#if HAVE_SYS_EVENT_H + EV_SET(&ke[num_events], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)th); + num_events++; +#else epoll_events |= EPOLLIN; +#endif } } @@ -589,12 +676,35 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting } else { VM_ASSERT(fd >= 0); +#if HAVE_SYS_EVENT_H + EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)th); + num_events++; +#else epoll_events |= EPOLLOUT; +#endif } } rb_native_mutex_lock(&timer_th.waiting_lock); { +#if HAVE_SYS_EVENT_H + if (num_events > 0) { + if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) { + RUBY_DEBUG_LOG("failed (%d)", errno); + + switch (errno) { + case EBADF: + // the fd is closed? + case EINTR: + // signal received? is there a sensible way to handle this? + default: + perror("kevent"); + rb_bug("register/kevent failed(fd:%d, errno:%d)", fd, errno); + } + } + RUBY_DEBUG_LOG("kevent(add, fd:%d) success", fd); + } +#else if (epoll_events) { struct epoll_event event = { .events = epoll_events, @@ -602,7 +712,7 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting .ptr = (void *)th, }, }; - if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { + if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_ADD, fd, &event) == -1) { RUBY_DEBUG_LOG("failed (%d)", errno); switch (errno) { @@ -621,6 +731,7 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting } RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events); } +#endif if (th) { VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none); @@ -677,12 +788,14 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting } static void -timer_thread_unregister_waiting(rb_thread_t *th, int fd) +timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags) { RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd); - +#if HAVE_SYS_EVENT_H + kqueue_unregister_waiting(fd, flags); +#else // Linux 2.6.9 or later is needed to pass NULL as data. - if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) { + if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) { switch (errno) { case EBADF: // just ignore. maybe fd is closed. @@ -692,18 +805,35 @@ timer_thread_unregister_waiting(rb_thread_t *th, int fd) rb_bug("unregister/epoll_ctl fails. errno:%d", errno); } } +#endif } static void -timer_thread_setup_nm(void) +timer_thread_setup_mn(void) { - if ((timer_th.epoll_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno); +#if HAVE_SYS_EVENT_H + kqueue_create(); + RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th.event_fd); +#else + if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno); + RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.event_fd); +#endif RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]); - RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.epoll_fd); timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL); } +static int +event_wait(rb_vm_t *vm) +{ +#if HAVE_SYS_EVENT_H + int r = kqueue_wait(vm); +#else + int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm)); +#endif + return r; +} + /* * The purpose of the timer thread: * @@ -721,7 +851,7 @@ timer_thread_setup_nm(void) static void timer_thread_polling(rb_vm_t *vm) { - int r = epoll_wait(timer_th.epoll_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm)); + int r = event_wait(vm); RUBY_DEBUG_LOG("r:%d errno:%d", r, errno); @@ -753,14 +883,51 @@ timer_thread_polling(rb_vm_t *vm) // simply retry break; default: - perror("epoll_wait"); - rb_bug("epoll_wait errno:%d", errno); + perror("kq"); + rb_bug("kq errno:%d", errno); } break; default: RUBY_DEBUG_LOG("%d event(s)", r); +#if HAVE_SYS_EVENT_H + for (int i=0; isched.waiting_reason.flags) { + // delete from chain + ccan_list_del_init(&th->sched.waiting_reason.node); + timer_thread_unregister_waiting(th, fd, kqueue_translate_filter_to_flags(filter)); + + th->sched.waiting_reason.flags = thread_sched_waiting_none; + th->sched.waiting_reason.data.fd = -1; + th->sched.waiting_reason.data.result = filter; + + timer_thread_wakeup_thread(th); + } else { + // already released + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + } + } +#else for (int i=0; isched.waiting_reason.flags) { // delete from chain ccan_list_del_init(&th->sched.waiting_reason.node); - timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd); + timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags); th->sched.waiting_reason.flags = thread_sched_waiting_none; th->sched.waiting_reason.data.fd = -1; @@ -802,13 +969,14 @@ timer_thread_polling(rb_vm_t *vm) rb_native_mutex_unlock(&timer_th.waiting_lock); } } +#endif } } -#else // HAVE_SYS_EPOLL_H +#else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H static void -timer_thread_setup_nm(void) +timer_thread_setup_mn(void) { // do nothing } @@ -855,4 +1023,4 @@ timer_thread_polling(rb_vm_t *vm) } } -#endif // HAVE_SYS_EPOLL_H +#endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H