Call scheduler.block instead of scheduler.kernel_sleep for blocking Queue/SizedQueue operations
* scheduler.unblock was already already called before but with no corresponding scheduler.block * add test that Queue#pop makes the scheduler wait until it gets an element.
This commit is contained in:
parent
738a089b3a
commit
9472d16061
@ -93,6 +93,7 @@ class Scheduler
|
|||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
@urgent.each(&:close)
|
@urgent.each(&:close)
|
||||||
|
@urgent = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def current_time
|
def current_time
|
||||||
@ -139,7 +140,7 @@ class Scheduler
|
|||||||
end
|
end
|
||||||
|
|
||||||
if io = @urgent&.last
|
if io = @urgent&.last
|
||||||
@urgent.last.write_nonblock('.')
|
io.write_nonblock('.')
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -115,6 +115,31 @@ class TestFiberMutex < Test::Unit::TestCase
|
|||||||
assert processed == 3
|
assert processed == 3
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_queue_pop_waits
|
||||||
|
queue = Queue.new
|
||||||
|
running = false
|
||||||
|
|
||||||
|
thread = Thread.new do
|
||||||
|
scheduler = Scheduler.new
|
||||||
|
Thread.current.scheduler = scheduler
|
||||||
|
|
||||||
|
result = nil
|
||||||
|
Fiber.schedule do
|
||||||
|
result = queue.pop
|
||||||
|
end
|
||||||
|
|
||||||
|
running = true
|
||||||
|
scheduler.run
|
||||||
|
result
|
||||||
|
end
|
||||||
|
|
||||||
|
Thread.pass until running
|
||||||
|
sleep 0.1
|
||||||
|
|
||||||
|
queue << :done
|
||||||
|
assert_equal :done, thread.value
|
||||||
|
end
|
||||||
|
|
||||||
def test_mutex_deadlock
|
def test_mutex_deadlock
|
||||||
err = /No live threads left. Deadlock\?/
|
err = /No live threads left. Deadlock\?/
|
||||||
assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
|
assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
|
||||||
|
6
thread.c
6
thread.c
@ -134,7 +134,7 @@ rb_thread_local_storage(VALUE thread)
|
|||||||
|
|
||||||
static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
|
static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
|
||||||
static void sleep_forever(rb_thread_t *th, unsigned int fl);
|
static void sleep_forever(rb_thread_t *th, unsigned int fl);
|
||||||
static void rb_thread_sleep_deadly_allow_spurious_wakeup(void);
|
static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker);
|
||||||
static int rb_threadptr_dead(rb_thread_t *th);
|
static int rb_threadptr_dead(rb_thread_t *th);
|
||||||
static void rb_check_deadlock(rb_ractor_t *r);
|
static void rb_check_deadlock(rb_ractor_t *r);
|
||||||
static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
|
static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
|
||||||
@ -1485,11 +1485,11 @@ rb_thread_sleep_interruptible(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
rb_thread_sleep_deadly_allow_spurious_wakeup(void)
|
rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker)
|
||||||
{
|
{
|
||||||
VALUE scheduler = rb_thread_current_scheduler();
|
VALUE scheduler = rb_thread_current_scheduler();
|
||||||
if (scheduler != Qnil) {
|
if (scheduler != Qnil) {
|
||||||
rb_scheduler_kernel_sleepv(scheduler, 0, NULL);
|
rb_scheduler_block(scheduler, blocker);
|
||||||
} else {
|
} else {
|
||||||
thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
|
thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
|
||||||
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
|
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
|
||||||
|
@ -483,9 +483,9 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_mutex_sleep_forever(VALUE time)
|
rb_mutex_sleep_forever(VALUE self)
|
||||||
{
|
{
|
||||||
rb_thread_sleep_deadly_allow_spurious_wakeup();
|
rb_thread_sleep_deadly_allow_spurious_wakeup(self);
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -516,7 +516,7 @@ rb_mutex_sleep(VALUE self, VALUE timeout)
|
|||||||
mutex_lock_uninterruptible(self);
|
mutex_lock_uninterruptible(self);
|
||||||
} else {
|
} else {
|
||||||
if (NIL_P(timeout)) {
|
if (NIL_P(timeout)) {
|
||||||
rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
|
rb_ensure(rb_mutex_sleep_forever, self, mutex_lock_uninterruptible, self);
|
||||||
} else {
|
} else {
|
||||||
rb_hrtime_t rel = rb_timeval2hrtime(&t);
|
rb_hrtime_t rel = rb_timeval2hrtime(&t);
|
||||||
rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
|
rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
|
||||||
@ -904,9 +904,9 @@ rb_queue_push(VALUE self, VALUE obj)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
queue_sleep(VALUE arg)
|
queue_sleep(VALUE self)
|
||||||
{
|
{
|
||||||
rb_thread_sleep_deadly_allow_spurious_wakeup();
|
rb_thread_sleep_deadly_allow_spurious_wakeup(self);
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user