setup waiting_fd
for thread_sched_wait_events()
`thread_sched_wait_events()` suspend the thread until the target fd is ready. Howver, other threads can close the target fd and suspended thread should be awake. To support it, setup `waiting_fd` before `thread_sched_wait_events()`. `rb_thread_io_wake_pending_closer()` should be called before `RUBY_VM_CHECK_INTS_BLOCKING()` because it can return this function. This patch introduces additional overhead (setup/cleanup `waiting_fd`) and maybe we can reduce the overhead.
This commit is contained in:
parent
28a6e4ea9d
commit
2fe5fc176b
33
thread.c
33
thread.c
@ -1646,6 +1646,20 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
|
|||||||
return rb_nogvl(func, data1, ubf, data2, 0);
|
return rb_nogvl(func, data1, ubf, data2, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
rb_thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
|
||||||
|
{
|
||||||
|
wfd->fd = fd;
|
||||||
|
wfd->th = th;
|
||||||
|
wfd->busy = NULL;
|
||||||
|
|
||||||
|
RB_VM_LOCK_ENTER();
|
||||||
|
{
|
||||||
|
ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
|
||||||
|
}
|
||||||
|
RB_VM_LOCK_LEAVE();
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
|
rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
|
||||||
{
|
{
|
||||||
@ -1681,12 +1695,19 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
|
|||||||
|
|
||||||
RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
|
RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
|
||||||
|
|
||||||
|
struct waiting_fd waiting_fd;
|
||||||
|
|
||||||
#ifdef RUBY_THREAD_PTHREAD_H
|
#ifdef RUBY_THREAD_PTHREAD_H
|
||||||
if (events && !th_has_dedicated_nt(th)) {
|
if (events && !th_has_dedicated_nt(th)) {
|
||||||
VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT);
|
VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT);
|
||||||
|
|
||||||
|
rb_thread_io_setup_wfd(th, fd, &waiting_fd);
|
||||||
|
{
|
||||||
// wait readable/writable
|
// wait readable/writable
|
||||||
thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL);
|
thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL);
|
||||||
|
}
|
||||||
|
rb_thread_io_wake_pending_closer(&waiting_fd);
|
||||||
|
|
||||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -1695,23 +1716,13 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
|
|||||||
volatile int saved_errno = 0;
|
volatile int saved_errno = 0;
|
||||||
enum ruby_tag_type state;
|
enum ruby_tag_type state;
|
||||||
|
|
||||||
struct waiting_fd waiting_fd = {
|
|
||||||
.fd = fd,
|
|
||||||
.th = rb_ec_thread_ptr(ec),
|
|
||||||
.busy = NULL,
|
|
||||||
};
|
|
||||||
|
|
||||||
// `errno` is only valid when there is an actual error - but we can't
|
// `errno` is only valid when there is an actual error - but we can't
|
||||||
// extract that from the return value of `func` alone, so we clear any
|
// extract that from the return value of `func` alone, so we clear any
|
||||||
// prior `errno` value here so that we can later check if it was set by
|
// prior `errno` value here so that we can later check if it was set by
|
||||||
// `func` or not (as opposed to some previously set value).
|
// `func` or not (as opposed to some previously set value).
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
|
||||||
RB_VM_LOCK_ENTER();
|
rb_thread_io_setup_wfd(th, fd, &waiting_fd);
|
||||||
{
|
|
||||||
ccan_list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node);
|
|
||||||
}
|
|
||||||
RB_VM_LOCK_LEAVE();
|
|
||||||
|
|
||||||
EC_PUSH_TAG(ec);
|
EC_PUSH_TAG(ec);
|
||||||
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user