Add support for Queue & SizedQueue.

This commit is contained in:
Samuel Williams 2020-09-14 11:10:02 +12:00
parent 0f613cc5f1
commit 8eea66a0ca
Notes: git 2020-09-14 13:44:35 +09:00
3 changed files with 77 additions and 33 deletions

View File

@ -84,6 +84,37 @@ class TestFiberMutex < Test::Unit::TestCase
assert signalled > 1 assert signalled > 1
end end
def test_queue
queue = Queue.new
processed = 0
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber.schedule do
3.times do |i|
queue << i
sleep 0.1
end
queue.close
end
Fiber.schedule do
while item = queue.pop
processed += 1
end
end
scheduler.run
end
thread.join
assert processed == 3
end
def test_mutex_deadlock def test_mutex_deadlock
err = /No live threads left. Deadlock\?/ err = /No live threads left. Deadlock\?/
assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false

View File

@ -1481,8 +1481,13 @@ rb_thread_sleep_interruptible(void)
static void static void
rb_thread_sleep_deadly_allow_spurious_wakeup(void) rb_thread_sleep_deadly_allow_spurious_wakeup(void)
{ {
thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); VALUE scheduler = rb_thread_current_scheduler();
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); if (scheduler != Qnil) {
rb_scheduler_kernel_sleepv(scheduler, 0, NULL);
} else {
thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
}
} }
void void

View File

@ -946,25 +946,29 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
check_array(self, q->que); check_array(self, q->que);
while (RARRAY_LEN(q->que) == 0) { while (RARRAY_LEN(q->que) == 0) {
if (!should_block) { if (!should_block) {
rb_raise(rb_eThreadError, "queue empty"); rb_raise(rb_eThreadError, "queue empty");
} }
else if (queue_closed_p(self)) { else if (queue_closed_p(self)) {
return queue_closed_result(self, q); return queue_closed_result(self, q);
} }
else { else {
struct queue_waiter qw; rb_execution_context_t *ec = GET_EC();
struct queue_waiter qw;
assert(RARRAY_LEN(q->que) == 0); assert(RARRAY_LEN(q->que) == 0);
assert(queue_closed_p(self) == 0); assert(queue_closed_p(self) == 0);
qw.w.th = GET_THREAD(); qw.w.self = self;
qw.as.q = q; qw.w.th = ec->thread_ptr;
list_add_tail(queue_waitq(qw.as.q), &qw.w.node); qw.w.fiber = ec->fiber_ptr;
qw.as.q->num_waiting++;
rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); qw.as.q = q;
} list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
qw.as.q->num_waiting++;
rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
}
} }
return rb_ary_shift(q->que); return rb_ary_shift(q->que);
@ -1188,27 +1192,31 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
int should_block = szqueue_push_should_block(argc, argv); int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) { while (queue_length(self, &sq->q) >= sq->max) {
if (!should_block) { if (!should_block) {
rb_raise(rb_eThreadError, "queue full"); rb_raise(rb_eThreadError, "queue full");
} }
else if (queue_closed_p(self)) { else if (queue_closed_p(self)) {
break; break;
} }
else { else {
struct queue_waiter qw; rb_execution_context_t *ec = GET_EC();
struct list_head *pushq = szqueue_pushq(sq); struct queue_waiter qw;
struct list_head *pushq = szqueue_pushq(sq);
qw.w.th = GET_THREAD(); qw.w.self = self;
qw.as.sq = sq; qw.w.th = ec->thread_ptr;
list_add_tail(pushq, &qw.w.node); qw.w.fiber = ec->fiber_ptr;
sq->num_waiting_push++;
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); qw.as.sq = sq;
} list_add_tail(pushq, &qw.w.node);
sq->num_waiting_push++;
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
}
} }
if (queue_closed_p(self)) { if (queue_closed_p(self)) {
raise_closed_queue_error(self); raise_closed_queue_error(self);
} }
return queue_do_push(self, &sq->q, argv[0]); return queue_do_push(self, &sq->q, argv[0]);