* thread_tools.c: add Queue#close(exception=false) and
SizedQueue#close(exception=false). [Feature #10600] Trying to deq from a closed empty queue return nil if exception parameter equals to false (default). If exception parameter is truthy, it raises ClosedQueueError (< StopIteration). ClosedQueueError inherits StopIteration so that you can write: loop{ e = q.deq; (using e) } Trying to close a closed queue raises ClosedQueueError. Blocking threads to wait deq for Queue and SizedQueue will be restarted immediately by returning nil (exception=false) or raising a ClosedQueueError (exception=true). Blocking threads to wait enq for SizedQueue will be restarted by raising a ClosedQueueError immediately. The above specification is not proposed specification, so that we need to continue discussion to conclude specification this method. * test/thread/test_queue.rb: add tests originally written by John Anderson and modify detailed behavior. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@51699 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
b79d7910f5
commit
fd7ac9f3c9
31
ChangeLog
31
ChangeLog
@ -1,3 +1,34 @@
|
|||||||
|
Thu Aug 27 07:45:34 2015 Koichi Sasada <ko1@atdot.net>
|
||||||
|
|
||||||
|
* thread_tools.c: add Queue#close(exception=false) and
|
||||||
|
SizedQueue#close(exception=false).
|
||||||
|
[Feature #10600]
|
||||||
|
|
||||||
|
Trying to deq from a closed empty queue return nil
|
||||||
|
if exception parameter equals to false (default).
|
||||||
|
|
||||||
|
If exception parameter is truthy, it raises
|
||||||
|
ClosedQueueError (< StopIteration).
|
||||||
|
ClosedQueueError inherits StopIteration so that you can write:
|
||||||
|
|
||||||
|
loop{ e = q.deq; (using e) }
|
||||||
|
|
||||||
|
Trying to close a closed queue raises ClosedQueueError.
|
||||||
|
|
||||||
|
Blocking threads to wait deq for Queue and SizedQueue will be
|
||||||
|
restarted immediately by returning nil (exception=false) or
|
||||||
|
raising a ClosedQueueError (exception=true).
|
||||||
|
|
||||||
|
Blocking threads to wait enq for SizedQueue will be
|
||||||
|
restarted by raising a ClosedQueueError immediately.
|
||||||
|
|
||||||
|
The above specification is not proposed specification, so that
|
||||||
|
we need to continue discussion to conclude specification this
|
||||||
|
method.
|
||||||
|
|
||||||
|
* test/thread/test_queue.rb: add tests originally written by
|
||||||
|
John Anderson and modify detailed behavior.
|
||||||
|
|
||||||
Wed Aug 26 10:52:02 2015 Nobuyoshi Nakada <nobu@ruby-lang.org>
|
Wed Aug 26 10:52:02 2015 Nobuyoshi Nakada <nobu@ruby-lang.org>
|
||||||
|
|
||||||
* re.c (rb_memsearch_wchar, rb_memsearch_qchar): test matching
|
* re.c (rb_memsearch_wchar, rb_memsearch_qchar): test matching
|
||||||
|
@ -43,6 +43,7 @@ class TestQueue < Test::Unit::TestCase
|
|||||||
}
|
}
|
||||||
}.join
|
}.join
|
||||||
|
|
||||||
|
# close the queue the old way to test for backwards-compatibility
|
||||||
num_threads.times { to_workers.push nil }
|
num_threads.times { to_workers.push nil }
|
||||||
workers.each { |t| t.join }
|
workers.each { |t| t.join }
|
||||||
|
|
||||||
@ -277,4 +278,293 @@ class TestQueue < Test::Unit::TestCase
|
|||||||
Marshal.dump(q)
|
Marshal.dump(q)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_close
|
||||||
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||||||
|
q = qcreate.call
|
||||||
|
assert_equal false, q.closed?
|
||||||
|
q << :something
|
||||||
|
assert_equal q, q.close
|
||||||
|
assert q.closed?
|
||||||
|
assert_raise_with_message(ClosedQueueError, /closed/){q << :nothing}
|
||||||
|
assert_equal q.pop, :something
|
||||||
|
assert_nil q.pop
|
||||||
|
# non-blocking
|
||||||
|
assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# test that waiting producers are woken up on close
|
||||||
|
def close_wakeup( num_items, num_threads, &qcreate )
|
||||||
|
raise "This test won't work with num_items(#{num_items}) >= num_threads(#{num_threads})" if num_items >= num_threads
|
||||||
|
|
||||||
|
# create the Queue
|
||||||
|
q = yield
|
||||||
|
threads = num_threads.times.map{Thread.new{q.pop}}
|
||||||
|
num_items.times{|i| q << i}
|
||||||
|
|
||||||
|
# wait until queue empty
|
||||||
|
(Thread.pass; sleep 0.01) until q.size == 0
|
||||||
|
|
||||||
|
# now there should be some waiting consumers
|
||||||
|
assert_equal num_threads - num_items, threads.count{|thr| thr.status}
|
||||||
|
|
||||||
|
# tell them all to go away
|
||||||
|
q.close
|
||||||
|
|
||||||
|
# wait for them to go away
|
||||||
|
Thread.pass until threads.all?{|thr| thr.status == false}
|
||||||
|
|
||||||
|
# check that they've gone away. Convert nil to -1 so we can sort and do the comparison
|
||||||
|
expected_values = [-1] * (num_threads - num_items) + num_items.times.to_a
|
||||||
|
assert_equal expected_values, threads.map{|thr| thr.value || -1 }.sort
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_queue_close_wakeup
|
||||||
|
close_wakeup(15, 18){Queue.new}
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_size_queue_close_wakeup
|
||||||
|
close_wakeup(5, 8){SizedQueue.new 9}
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_sized_queue_one_closed_interrupt
|
||||||
|
q = SizedQueue.new 1
|
||||||
|
q << :one
|
||||||
|
t1 = Thread.new { q << :two }
|
||||||
|
sleep 0.01 until t1.stop?
|
||||||
|
q.close
|
||||||
|
|
||||||
|
t1.kill.join
|
||||||
|
assert_equal 1, q.size
|
||||||
|
assert_equal :one, q.pop
|
||||||
|
assert q.empty?, "queue not empty"
|
||||||
|
end
|
||||||
|
|
||||||
|
# make sure that shutdown state is handled properly by empty? for the non-blocking case
|
||||||
|
def test_empty_non_blocking
|
||||||
|
return
|
||||||
|
q = SizedQueue.new 3
|
||||||
|
3.times{|i| q << i}
|
||||||
|
|
||||||
|
# these all block cos the queue is full
|
||||||
|
prod_threads = 4.times.map{|i| Thread.new{q << 3+i}}
|
||||||
|
sleep 0.01 until prod_threads.all?{|thr| thr.status == 'sleep'}
|
||||||
|
q.close
|
||||||
|
|
||||||
|
items = []
|
||||||
|
# sometimes empty? is false but pop will raise ThreadError('empty'),
|
||||||
|
# meaning a value is not immediately available but will be soon.
|
||||||
|
until q.empty?
|
||||||
|
items << q.pop(non_block=true) rescue nil
|
||||||
|
end
|
||||||
|
items.compact!
|
||||||
|
|
||||||
|
assert_equal 7.times.to_a, items.sort
|
||||||
|
assert q.empty?
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_sized_queue_closed_push_non_blocking
|
||||||
|
q = SizedQueue.new 7
|
||||||
|
q.close
|
||||||
|
assert_raise_with_message(ClosedQueueError, /queue closed/){q.push(non_block=true)}
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_blocked_pushers
|
||||||
|
q = SizedQueue.new 3
|
||||||
|
prod_threads = 6.times.map do |i|
|
||||||
|
thr = Thread.new{q << i}; thr[:pc] = i; thr
|
||||||
|
end
|
||||||
|
|
||||||
|
# wait until some producer threads have finished, and the other 3 are blocked
|
||||||
|
sleep 0.01 while prod_threads.reject{|t| t.status}.count < 3
|
||||||
|
# this would ensure that all producer threads call push before close
|
||||||
|
# sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
|
||||||
|
q.close
|
||||||
|
|
||||||
|
# more than prod_threads
|
||||||
|
cons_threads = 10.times.map do |i|
|
||||||
|
thr = Thread.new{q.pop}; thr[:pc] = i; thr
|
||||||
|
end
|
||||||
|
|
||||||
|
# values that came from the queue
|
||||||
|
popped_values = cons_threads.map &:value
|
||||||
|
|
||||||
|
# wait untl all threads have finished
|
||||||
|
sleep 0.01 until prod_threads.find_all{|t| t.status}.count == 0
|
||||||
|
|
||||||
|
# pick only the producer threads that got in before close
|
||||||
|
successful_prod_threads = prod_threads.reject{|thr| thr.status == nil}
|
||||||
|
assert_nothing_raised{ successful_prod_threads.map(&:value) }
|
||||||
|
|
||||||
|
# the producer threads that tried to push after q.close should all fail
|
||||||
|
unsuccessful_prod_threads = prod_threads - successful_prod_threads
|
||||||
|
unsuccessful_prod_threads.each do |thr|
|
||||||
|
assert_raise(ClosedQueueError){ thr.value }
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_equal cons_threads.size, popped_values.size
|
||||||
|
assert_equal 0, q.size
|
||||||
|
|
||||||
|
# check that consumer threads with values match producers that called push before close
|
||||||
|
assert_equal successful_prod_threads.map{|thr| thr[:pc]}, popped_values.compact.sort
|
||||||
|
assert_nil q.pop
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_deny_pushers
|
||||||
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||||||
|
prod_threads = nil
|
||||||
|
q = qcreate[]
|
||||||
|
synq = Queue.new
|
||||||
|
producers_start = Thread.new do
|
||||||
|
prod_threads = 20.times.map do |i|
|
||||||
|
Thread.new{ synq.pop; q << i }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
q.close
|
||||||
|
synq.close # start producer threads
|
||||||
|
|
||||||
|
# wait for all threads to be finished, because of exceptions
|
||||||
|
# NOTE: thr.status will be nil (raised) or false (terminated)
|
||||||
|
sleep 0.01 until prod_threads && prod_threads.all?{|thr| !thr.status}
|
||||||
|
|
||||||
|
# check that all threads failed to call push
|
||||||
|
prod_threads.each do |thr|
|
||||||
|
assert_kind_of ClosedQueueError, (thr.value rescue $!)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# size should account for waiting pushers during shutdown
|
||||||
|
def sized_queue_size_close
|
||||||
|
q = SizedQueue.new 4
|
||||||
|
4.times{|i| q << i}
|
||||||
|
Thread.new{ q << 5 }
|
||||||
|
Thread.new{ q << 6 }
|
||||||
|
assert_equal 4, q.size
|
||||||
|
assert_equal 4, q.items
|
||||||
|
q.close
|
||||||
|
assert_equal 6, q.size
|
||||||
|
assert_equal 4, q.items
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_blocked_pushers_empty
|
||||||
|
q = SizedQueue.new 3
|
||||||
|
prod_threads = 6.times.map do |i|
|
||||||
|
Thread.new{ q << i}
|
||||||
|
end
|
||||||
|
|
||||||
|
# this ensures that all producer threads call push before close
|
||||||
|
sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3
|
||||||
|
q.close
|
||||||
|
|
||||||
|
ary = []
|
||||||
|
until q.empty?
|
||||||
|
ary << q.pop
|
||||||
|
end
|
||||||
|
assert_equal 0, q.size
|
||||||
|
|
||||||
|
assert_equal 3, ary.size
|
||||||
|
ary.each{|e| assert [0,1,2,3,4,5].include?(e)}
|
||||||
|
assert_nil q.pop
|
||||||
|
|
||||||
|
prod_threads.each{|t|
|
||||||
|
begin
|
||||||
|
t.join
|
||||||
|
rescue => e
|
||||||
|
end
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
# test thread wakeup on one-element SizedQueue with close
|
||||||
|
def test_one_element_sized_queue
|
||||||
|
q = SizedQueue.new 1
|
||||||
|
t = Thread.new{ q.pop }
|
||||||
|
q.close
|
||||||
|
assert_nil t.value
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_close_token_exception
|
||||||
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||||||
|
q = qcreate[]
|
||||||
|
q.close true
|
||||||
|
assert_raise(ClosedQueueError){q.pop}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_close_token_loop
|
||||||
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||||||
|
q = qcreate[]
|
||||||
|
popped_items = []
|
||||||
|
consumer_thread = Thread.new{loop{popped_items << q.pop}; :done}
|
||||||
|
7.times{|i| q << i}
|
||||||
|
q.close true
|
||||||
|
sleep 0.1 unless q.empty?
|
||||||
|
assert_equal :done, consumer_thread.value
|
||||||
|
assert_equal 7.times.to_a, popped_items
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_close_twice
|
||||||
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||||||
|
q = qcreate[]
|
||||||
|
q.close
|
||||||
|
assert_raise(ClosedQueueError){q.close}
|
||||||
|
|
||||||
|
q = qcreate[]
|
||||||
|
q.close(true)
|
||||||
|
assert_raise(ClosedQueueError){q.close(false)}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_queue_close_multi_multi
|
||||||
|
q = SizedQueue.new rand(800..1200)
|
||||||
|
|
||||||
|
count_items = rand(3000..5000)
|
||||||
|
count_producers = rand(10..20)
|
||||||
|
|
||||||
|
producers = count_producers.times.map do
|
||||||
|
Thread.new do
|
||||||
|
sleep(rand / 100)
|
||||||
|
count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
consumers = rand(7..12).times.map do
|
||||||
|
Thread.new do
|
||||||
|
count = 0
|
||||||
|
loop do
|
||||||
|
i, st = q.pop
|
||||||
|
count += 1 if i.is_a?(Fixnum) && st.is_a?(String)
|
||||||
|
end
|
||||||
|
count
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# No dead or finished threads
|
||||||
|
assert (consumers + producers).all?{|thr| thr.status =~ /\Arun|sleep\Z/}, 'no threads runnning'
|
||||||
|
|
||||||
|
# just exercising the concurrency of the support methods.
|
||||||
|
counter = Thread.new do
|
||||||
|
until q.closed? && q.empty?
|
||||||
|
raise if q.size > q.max
|
||||||
|
# otherwise this exercise causes too much contention on the lock
|
||||||
|
sleep 0.01
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
producers.each &:join
|
||||||
|
q.close true
|
||||||
|
|
||||||
|
# results not randomly distributed. Not sure why.
|
||||||
|
# consumers.map{|thr| thr.value}.each do |x|
|
||||||
|
# assert_not_equal 0, x
|
||||||
|
# end
|
||||||
|
|
||||||
|
all_items_count = consumers.map{|thr| thr.value}.inject(:+)
|
||||||
|
assert_equal count_items * count_producers, all_items_count
|
||||||
|
|
||||||
|
# don't leak this thread
|
||||||
|
assert_nothing_raised{counter.join}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
215
thread_tools.c
215
thread_tools.c
@ -1,6 +1,7 @@
|
|||||||
/* included by thraed.c */
|
/* included by thraed.c */
|
||||||
|
|
||||||
VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
||||||
|
VALUE rb_eClosedQueueError;
|
||||||
|
|
||||||
/* Mutex */
|
/* Mutex */
|
||||||
|
|
||||||
@ -521,6 +522,9 @@ enum {
|
|||||||
END_QUEUE
|
END_QUEUE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#define QUEUE_CLOSED FL_USER5
|
||||||
|
#define QUEUE_CLOSE_EXCEPTION FL_USER6
|
||||||
|
|
||||||
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
|
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
|
||||||
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
|
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
|
||||||
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
|
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
|
||||||
@ -566,6 +570,77 @@ wakeup_all_threads(VALUE list)
|
|||||||
rb_ary_clear(list);
|
rb_ary_clear(list);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static unsigned long
|
||||||
|
queue_length(VALUE self)
|
||||||
|
{
|
||||||
|
VALUE que = GET_QUEUE_QUE(self);
|
||||||
|
return RARRAY_LEN(que);
|
||||||
|
}
|
||||||
|
|
||||||
|
static unsigned long
|
||||||
|
queue_num_waiting(VALUE self)
|
||||||
|
{
|
||||||
|
VALUE waiters = GET_QUEUE_WAITERS(self);
|
||||||
|
return RARRAY_LEN(waiters);
|
||||||
|
}
|
||||||
|
|
||||||
|
static unsigned long
|
||||||
|
szqueue_num_waiting_producer(VALUE self)
|
||||||
|
{
|
||||||
|
VALUE waiters = GET_SZQUEUE_WAITERS(self);
|
||||||
|
return RARRAY_LEN(waiters);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
queue_closed_p(VALUE self)
|
||||||
|
{
|
||||||
|
return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
raise_closed_queue_error(VALUE self)
|
||||||
|
{
|
||||||
|
rb_raise(rb_eClosedQueueError, "queue closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
queue_closed_result(VALUE self)
|
||||||
|
{
|
||||||
|
assert(queue_length(self) == 0);
|
||||||
|
|
||||||
|
if (FL_TEST(self, QUEUE_CLOSE_EXCEPTION)) {
|
||||||
|
raise_closed_queue_error(self);
|
||||||
|
}
|
||||||
|
return Qnil;
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
queue_do_close(VALUE self, int argc, VALUE *argv, int is_szq)
|
||||||
|
{
|
||||||
|
VALUE exception = Qfalse;
|
||||||
|
|
||||||
|
if (queue_closed_p(self)) raise_closed_queue_error(self);
|
||||||
|
|
||||||
|
rb_scan_args(argc, argv, "01", &exception);
|
||||||
|
FL_SET(self, QUEUE_CLOSED);
|
||||||
|
|
||||||
|
if (RTEST(exception)) {
|
||||||
|
FL_SET(self, QUEUE_CLOSE_EXCEPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (queue_num_waiting(self) > 0) {
|
||||||
|
VALUE waiters = GET_QUEUE_WAITERS(self);
|
||||||
|
wakeup_all_threads(waiters);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_szq && szqueue_num_waiting_producer(self) > 0) {
|
||||||
|
VALUE waiters = GET_SZQUEUE_WAITERS(self);
|
||||||
|
wakeup_all_threads(waiters);
|
||||||
|
}
|
||||||
|
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Document-class: Queue
|
* Document-class: Queue
|
||||||
*
|
*
|
||||||
@ -611,11 +686,74 @@ rb_queue_initialize(VALUE self)
|
|||||||
static VALUE
|
static VALUE
|
||||||
queue_do_push(VALUE self, VALUE obj)
|
queue_do_push(VALUE self, VALUE obj)
|
||||||
{
|
{
|
||||||
|
if (queue_closed_p(self)) {
|
||||||
|
raise_closed_queue_error(self);
|
||||||
|
}
|
||||||
rb_ary_push(GET_QUEUE_QUE(self), obj);
|
rb_ary_push(GET_QUEUE_QUE(self), obj);
|
||||||
wakeup_first_thread(GET_QUEUE_WAITERS(self));
|
wakeup_first_thread(GET_QUEUE_WAITERS(self));
|
||||||
return self;
|
return self;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: Queue#close
|
||||||
|
* call-seq:
|
||||||
|
* close(exception=false)
|
||||||
|
*
|
||||||
|
* Closes the queue. A closed queue cannot be re-opened.
|
||||||
|
*
|
||||||
|
* After the call to close completes, the following are true:
|
||||||
|
*
|
||||||
|
* - +closed?+ will return true
|
||||||
|
*
|
||||||
|
* - calling enq/push/<< will raise ClosedQueueError('queue closed')
|
||||||
|
*
|
||||||
|
* - when +empty?+ is false, calling deq/pop/shift will return an object
|
||||||
|
* from the queue as usual.
|
||||||
|
*
|
||||||
|
* - when +empty?+ is true, deq(non_block=false) will not suspend and
|
||||||
|
* will either return nil. If +exception+ parameter is true, raise ClosedQueueError error.
|
||||||
|
* deq(non_block=true) will ignore the parameter and raise a ThreadError('queue empty').
|
||||||
|
*
|
||||||
|
* ClosedQueueError is inherited from StopIteration, so that you can break loop block.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
*
|
||||||
|
* q = Queue.new
|
||||||
|
* Thread.new{
|
||||||
|
* while e = q.deq # wait for nil to break loop
|
||||||
|
* # ...
|
||||||
|
* end
|
||||||
|
* }
|
||||||
|
* q.close # equals to q.close(false)
|
||||||
|
*
|
||||||
|
* q = Queue.new
|
||||||
|
* Thread.new{
|
||||||
|
* loop{
|
||||||
|
* e = q.deq; ... # braek with ClosedQueueError
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* q.close(true)
|
||||||
|
*/
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
rb_queue_close(int argc, VALUE *argv, VALUE self)
|
||||||
|
{
|
||||||
|
return queue_do_close(self, argc, argv, FALSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: Queue#closed?
|
||||||
|
* call-seq: closed?
|
||||||
|
*
|
||||||
|
* Returns +true+ if the queue is closed.
|
||||||
|
*/
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
rb_queue_closed_p(VALUE self)
|
||||||
|
{
|
||||||
|
return queue_closed_p(self) ? Qtrue : Qfalse;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Document-method: Queue#push
|
* Document-method: Queue#push
|
||||||
* call-seq:
|
* call-seq:
|
||||||
@ -632,20 +770,6 @@ rb_queue_push(VALUE self, VALUE obj)
|
|||||||
return queue_do_push(self, obj);
|
return queue_do_push(self, obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
static unsigned long
|
|
||||||
queue_length(VALUE self)
|
|
||||||
{
|
|
||||||
VALUE que = GET_QUEUE_QUE(self);
|
|
||||||
return RARRAY_LEN(que);
|
|
||||||
}
|
|
||||||
|
|
||||||
static unsigned long
|
|
||||||
queue_num_waiting(VALUE self)
|
|
||||||
{
|
|
||||||
VALUE waiters = GET_QUEUE_WAITERS(self);
|
|
||||||
return RARRAY_LEN(waiters);
|
|
||||||
}
|
|
||||||
|
|
||||||
struct waiting_delete {
|
struct waiting_delete {
|
||||||
VALUE waiting;
|
VALUE waiting;
|
||||||
VALUE th;
|
VALUE th;
|
||||||
@ -676,8 +800,16 @@ queue_do_pop(VALUE self, int should_block)
|
|||||||
if (!should_block) {
|
if (!should_block) {
|
||||||
rb_raise(rb_eThreadError, "queue empty");
|
rb_raise(rb_eThreadError, "queue empty");
|
||||||
}
|
}
|
||||||
rb_ary_push(args.waiting, args.th);
|
else if (queue_closed_p(self)) {
|
||||||
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
return queue_closed_result(self);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
assert(queue_length(self) == 0);
|
||||||
|
assert(queue_closed_p(self) == 0);
|
||||||
|
|
||||||
|
rb_ary_push(args.waiting, args.th);
|
||||||
|
rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rb_ary_shift(GET_QUEUE_QUE(self));
|
return rb_ary_shift(GET_QUEUE_QUE(self));
|
||||||
@ -804,6 +936,24 @@ rb_szqueue_initialize(VALUE self, VALUE vmax)
|
|||||||
return self;
|
return self;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: SizedQueue#close
|
||||||
|
* call-seq:
|
||||||
|
* close(exception=false)
|
||||||
|
*
|
||||||
|
* Similar to Queue#close.
|
||||||
|
*
|
||||||
|
* The difference is behavior with waiting enqueuing threads.
|
||||||
|
*
|
||||||
|
* If there are waiting enqueuing threads, they are interrupted by
|
||||||
|
* raising ClosedQueueError('queue closed').
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_szqueue_close(int argc, VALUE *argv, VALUE self)
|
||||||
|
{
|
||||||
|
return queue_do_close(self, argc, argv, TRUE);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Document-method: SizedQueue#max
|
* Document-method: SizedQueue#max
|
||||||
*
|
*
|
||||||
@ -879,9 +1029,20 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
|
|||||||
if (!should_block) {
|
if (!should_block) {
|
||||||
rb_raise(rb_eThreadError, "queue full");
|
rb_raise(rb_eThreadError, "queue full");
|
||||||
}
|
}
|
||||||
rb_ary_push(args.waiting, args.th);
|
else if (queue_closed_p(self)) {
|
||||||
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
goto closed;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
rb_ary_push(args.waiting, args.th);
|
||||||
|
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (queue_closed_p(self)) {
|
||||||
|
closed:
|
||||||
|
raise_closed_queue_error(self);
|
||||||
|
}
|
||||||
|
|
||||||
return queue_do_push(self, argv[0]);
|
return queue_do_push(self, argv[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -941,9 +1102,7 @@ rb_szqueue_clear(VALUE self)
|
|||||||
static VALUE
|
static VALUE
|
||||||
rb_szqueue_num_waiting(VALUE self)
|
rb_szqueue_num_waiting(VALUE self)
|
||||||
{
|
{
|
||||||
long len = queue_num_waiting(self);
|
long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self);
|
||||||
VALUE waiters = GET_SZQUEUE_WAITERS(self);
|
|
||||||
len += RARRAY_LEN(waiters);
|
|
||||||
return ULONG2NUM(len);
|
return ULONG2NUM(len);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1106,14 +1265,14 @@ Init_thread_tools(void)
|
|||||||
rb_cThread,
|
rb_cThread,
|
||||||
"Queue", rb_cObject, rb_struct_alloc_noinit,
|
"Queue", rb_cObject, rb_struct_alloc_noinit,
|
||||||
"que", "waiters", NULL);
|
"que", "waiters", NULL);
|
||||||
rb_cSizedQueue = rb_struct_define_without_accessor_under(
|
|
||||||
rb_cThread,
|
rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
|
||||||
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
|
|
||||||
"que", "waiters", "queue_waiters", "size", NULL);
|
|
||||||
|
|
||||||
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
|
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
|
||||||
rb_undef_method(rb_cQueue, "initialize_copy");
|
rb_undef_method(rb_cQueue, "initialize_copy");
|
||||||
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
|
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
|
||||||
|
rb_define_method(rb_cQueue, "close", rb_queue_close, -1);
|
||||||
|
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||||||
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
||||||
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
|
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
|
||||||
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
|
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
|
||||||
@ -1127,7 +1286,13 @@ Init_thread_tools(void)
|
|||||||
rb_define_alias(rb_cQueue, "shift", "pop"); /* Alias for #pop. */
|
rb_define_alias(rb_cQueue, "shift", "pop"); /* Alias for #pop. */
|
||||||
rb_define_alias(rb_cQueue, "size", "length"); /* Alias for #length. */
|
rb_define_alias(rb_cQueue, "size", "length"); /* Alias for #length. */
|
||||||
|
|
||||||
|
rb_cSizedQueue = rb_struct_define_without_accessor_under(
|
||||||
|
rb_cThread,
|
||||||
|
"SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
|
||||||
|
"que", "waiters", "queue_waiters", "size", NULL);
|
||||||
|
|
||||||
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
||||||
|
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1);
|
||||||
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
||||||
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
||||||
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
|
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user