speed up IO#close with many threads
Today, it increases IO#close performance with many threads: Execution time (sec) name trunk after vm_thread_close 4.276 3.018 Speedup ratio: compare with the result of `trunk' (greater is better) name after vm_thread_close 1.417 This speedup comes because rb_notify_fd_close only scans threads inside rb_thread_io_blocking_region, not all threads in the VM. In the future, this type data structure may allow us to notify waiters of multiple FDs on a single thread (when using Fibers). * thread.c (struct waiting_fd): declare (rb_thread_io_blocking_region): use on-stack list waiter (rb_notify_fd_close): walk vm->waiting_fds instead (call_without_gvl): remove old field setting (th_init): ditto * vm_core.h (typedef struct rb_vm_struct): add waiting_fds list * (typedef struct rb_thread_struct): remove waiting_fd field (rb_vm_living_threads_init): initialize waiting_fds list I am now kicking myself for not thinking about this 3 years ago when I introduced ccan/list in [Feature #9632] to optimize this same function :< git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58812 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
9cd66d7022
commit
508091d9cc
24
thread.c
24
thread.c
@ -101,6 +101,12 @@ static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th);
|
||||
#define eTerminateSignal INT2FIX(1)
|
||||
static volatile int system_working = 1;
|
||||
|
||||
struct waiting_fd {
|
||||
struct list_node wfd_node; /* <=> vm.waiting_fds */
|
||||
rb_thread_t *th;
|
||||
int fd;
|
||||
};
|
||||
|
||||
inline static void
|
||||
st_delete_wrap(st_table *table, st_data_t key)
|
||||
{
|
||||
@ -1316,7 +1322,6 @@ call_without_gvl(void *(*func)(void *), void *data1,
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
int saved_errno = 0;
|
||||
|
||||
th->waiting_fd = -1;
|
||||
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
|
||||
ubf = ubf_select;
|
||||
data2 = th;
|
||||
@ -1439,11 +1444,15 @@ VALUE
|
||||
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
||||
{
|
||||
volatile VALUE val = Qundef; /* shouldn't be used */
|
||||
rb_vm_t *vm = GET_VM();
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
volatile int saved_errno = 0;
|
||||
int state;
|
||||
struct waiting_fd wfd;
|
||||
|
||||
th->waiting_fd = fd;
|
||||
wfd.fd = fd;
|
||||
wfd.th = th;
|
||||
list_add(&vm->waiting_fds, &wfd.wfd_node);
|
||||
|
||||
TH_PUSH_TAG(th);
|
||||
if ((state = EXEC_TAG()) == 0) {
|
||||
@ -1454,8 +1463,8 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
|
||||
}
|
||||
TH_POP_TAG();
|
||||
|
||||
/* clear waiting_fd anytime */
|
||||
th->waiting_fd = -1;
|
||||
/* must be deleted before jump */
|
||||
list_del(&wfd.wfd_node);
|
||||
|
||||
if (state) {
|
||||
TH_JUMP_TAG(th, state);
|
||||
@ -2196,12 +2205,13 @@ int
|
||||
rb_notify_fd_close(int fd)
|
||||
{
|
||||
rb_vm_t *vm = GET_THREAD()->vm;
|
||||
rb_thread_t *th = 0;
|
||||
struct waiting_fd *wfd = 0;
|
||||
int busy;
|
||||
|
||||
busy = 0;
|
||||
list_for_each(&vm->living_threads, th, vmlt_node) {
|
||||
if (th->waiting_fd == fd) {
|
||||
list_for_each(&vm->waiting_fds, wfd, wfd_node) {
|
||||
if (wfd->fd == fd) {
|
||||
rb_thread_t *th = wfd->th;
|
||||
VALUE err = th->vm->special_exceptions[ruby_error_stream_closed];
|
||||
rb_threadptr_pending_interrupt_enque(th, err);
|
||||
rb_threadptr_interrupt(th);
|
||||
|
1
vm.c
1
vm.c
@ -2521,7 +2521,6 @@ th_init(rb_thread_t *th, VALUE self)
|
||||
th->status = THREAD_RUNNABLE;
|
||||
th->errinfo = Qnil;
|
||||
th->last_status = Qnil;
|
||||
th->waiting_fd = -1;
|
||||
th->root_svar = Qfalse;
|
||||
th->local_storage_recursive_hash = Qnil;
|
||||
th->local_storage_recursive_hash_for_trace = Qnil;
|
||||
|
@ -490,6 +490,7 @@ typedef struct rb_vm_struct {
|
||||
struct rb_thread_struct *main_thread;
|
||||
struct rb_thread_struct *running_thread;
|
||||
|
||||
struct list_head waiting_fds; /* <=> struct waiting_fd */
|
||||
struct list_head living_threads;
|
||||
size_t living_thread_num;
|
||||
VALUE thgroup_default;
|
||||
@ -716,8 +717,6 @@ typedef struct rb_thread_struct {
|
||||
/* passing state */
|
||||
int state;
|
||||
|
||||
int waiting_fd;
|
||||
|
||||
/* for rb_iterate */
|
||||
VALUE passed_block_handler;
|
||||
|
||||
@ -1442,6 +1441,7 @@ void rb_thread_wakeup_timer_thread(void);
|
||||
static inline void
|
||||
rb_vm_living_threads_init(rb_vm_t *vm)
|
||||
{
|
||||
list_head_init(&vm->waiting_fds);
|
||||
list_head_init(&vm->living_threads);
|
||||
vm->living_thread_num = 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user