Improve handling of urgent notification pipe.

This commit is contained in:
Samuel Williams 2020-09-06 14:48:52 +12:00
parent 3dc0fc11f0
commit 1a0cfe2839
Notes: git 2020-09-14 13:44:36 +09:00
2 changed files with 58 additions and 64 deletions

View File

@ -15,11 +15,9 @@ class Scheduler
@writable = {} @writable = {}
@waiting = {} @waiting = {}
@urgent = nil
@lock = Mutex.new @lock = Mutex.new
@locking = 0 @locking = 0
@ready = [] @ready = Array.new
end end
attr :readable attr :readable
@ -51,11 +49,17 @@ class Scheduler
# puts "writable: #{writable}" if writable&.any? # puts "writable: #{writable}" if writable&.any?
readable&.each do |io| readable&.each do |io|
@readable[io]&.resume if fiber = @readable.delete(io)
fiber.resume
elsif io == @urgent.first
@urgent.first.read_nonblock(1024)
end
end end
writable&.each do |io| writable&.each do |io|
@writable[io]&.resume if fiber = @writable.delete(io)
fiber.resume
end
end end
if @waiting.any? if @waiting.any?
@ -73,9 +77,6 @@ class Scheduler
end end
if @ready.any? if @ready.any?
# Clear out the urgent notification pipe.
@urgent.first.read_nonblock(1024)
ready = nil ready = nil
@lock.synchronize do @lock.synchronize do
@ -114,9 +115,6 @@ class Scheduler
Fiber.yield Fiber.yield
@readable.delete(io)
@writable.delete(io)
return true return true
end end
@ -130,10 +128,10 @@ class Scheduler
def mutex_unlock(mutex, fiber) def mutex_unlock(mutex, fiber)
@lock.synchronize do @lock.synchronize do
@ready << fiber @ready << fiber
end
if @urgent if io = @urgent&.last
@urgent.last.write('.') @urgent.last.write_nonblock('.')
end
end end
end end

View File

@ -264,12 +264,12 @@ do_mutex_lock(VALUE self, int interruptible_p)
.fiber = fiber .fiber = fiber
}; };
if (mutex->fiber == fiber) { if (mutex->fiber == fiber) {
rb_raise(rb_eThreadError, "deadlock; recursive locking"); rb_raise(rb_eThreadError, "deadlock; recursive locking");
} }
VALUE scheduler = rb_thread_current_scheduler(); while (mutex->fiber != fiber) {
while (mutex->fiber != fiber) { VALUE scheduler = rb_thread_current_scheduler();
if (scheduler != Qnil) { if (scheduler != Qnil) {
list_add_tail(&mutex->waitq, &w.node); list_add_tail(&mutex->waitq, &w.node);
@ -279,52 +279,48 @@ do_mutex_lock(VALUE self, int interruptible_p)
if (!mutex->fiber) { if (!mutex->fiber) {
mutex->fiber = fiber; mutex->fiber = fiber;
break;
} else {
// Try again...
continue;
} }
} else {
enum rb_thread_status prev_status = th->status;
rb_hrtime_t *timeout = 0;
rb_hrtime_t rel = rb_msec2hrtime(100);
th->status = THREAD_STOPPED_FOREVER;
th->locking_mutex = self;
rb_ractor_sleeper_threads_inc(th->ractor);
/*
* Carefully! while some contended threads are in native_sleep(),
* ractor->sleeper is unstable value. we have to avoid both deadlock
* and busy loop.
*/
if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
!patrol_thread) {
timeout = &rel;
patrol_thread = th;
}
list_add_tail(&mutex->waitq, &w.node);
native_sleep(th, timeout); /* release GVL */
list_del(&w.node);
if (!mutex->fiber) {
mutex->fiber = fiber;
}
if (patrol_thread == th)
patrol_thread = NULL;
th->locking_mutex = Qfalse;
if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
rb_check_deadlock(th->ractor);
}
if (th->status == THREAD_STOPPED_FOREVER) {
th->status = prev_status;
}
rb_ractor_sleeper_threads_dec(th->ractor);
} }
enum rb_thread_status prev_status = th->status;
rb_hrtime_t *timeout = 0;
rb_hrtime_t rel = rb_msec2hrtime(100);
th->status = THREAD_STOPPED_FOREVER;
th->locking_mutex = self;
rb_ractor_sleeper_threads_inc(th->ractor);
/*
* Carefully! while some contended threads are in native_sleep(),
* ractor->sleeper is unstable value. we have to avoid both deadlock
* and busy loop.
*/
if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
!patrol_thread) {
timeout = &rel;
patrol_thread = th;
}
list_add_tail(&mutex->waitq, &w.node);
native_sleep(th, timeout); /* release GVL */
list_del(&w.node);
if (!mutex->fiber) {
mutex->fiber = fiber;
}
if (patrol_thread == th)
patrol_thread = NULL;
th->locking_mutex = Qfalse;
if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
rb_check_deadlock(th->ractor);
}
if (th->status == THREAD_STOPPED_FOREVER) {
th->status = prev_status;
}
rb_ractor_sleeper_threads_dec(th->ractor);
if (interruptible_p) { if (interruptible_p) {
/* release mutex before checking for interrupts...as interrupt checking /* release mutex before checking for interrupts...as interrupt checking
@ -335,7 +331,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
mutex->fiber = fiber; mutex->fiber = fiber;
} }
} }
} }
if (mutex->fiber == fiber) mutex_locked(th, self); if (mutex->fiber == fiber) mutex_locked(th, self);
} }