* lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.

* lib/thread.rb (SizedQueue#push): ditto.
* lib/thread.rb (SizedQueue#max): ditto.
* lib/thread.rb (Queue#pop): ditto.
* lib/thread.rb (Queue#push): ditto.

* lib/thread.rb (SizedQueue#num_waiting): adopt the above changes.
* lib/thread.rb (SizedQueue#initialize): ditto.
* lib/thread.rb (Queue#num_waiting): ditto.
* lib/thread.rb (Queue#initialize): ditto.
* test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto.

git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@38087 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
kosaki 2012-11-30 18:55:09 +00:00
parent e742a446c8
commit 3357d88ada
3 changed files with 58 additions and 58 deletions

View File

@ -1,3 +1,17 @@
Sat Dec 1 03:29:52 2012 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.
* lib/thread.rb (SizedQueue#push): ditto.
* lib/thread.rb (SizedQueue#max): ditto.
* lib/thread.rb (Queue#pop): ditto.
* lib/thread.rb (Queue#push): ditto.
* lib/thread.rb (SizedQueue#num_waiting): adopt the above changes.
* lib/thread.rb (SizedQueue#initialize): ditto.
* lib/thread.rb (Queue#num_waiting): ditto.
* lib/thread.rb (Queue#initialize): ditto.
* test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto.
Sat Dec 1 03:45:47 2012 Koichi Sasada <ko1@atdot.net> Sat Dec 1 03:45:47 2012 Koichi Sasada <ko1@atdot.net>
* thread.c (Thread.async_interrupt_timing): fix RDoc. * thread.c (Thread.async_interrupt_timing): fix RDoc.

View File

@ -149,26 +149,23 @@ class Queue
# #
def initialize def initialize
@que = [] @que = []
@waiting = []
@que.taint # enable tainted communication @que.taint # enable tainted communication
@waiting.taint @num_waiting = 0
self.taint self.taint
@mutex = Mutex.new @mutex = Mutex.new
@cond = ConditionVariable.new
end end
# #
# Pushes +obj+ to the queue. # Pushes +obj+ to the queue.
# #
def push(obj) def push(obj)
@mutex.synchronize{ Thread.async_interrupt_timing(StandardError => :on_blocking) do
@mutex.synchronize do
@que.push obj @que.push obj
begin @cond.signal
t = @waiting.shift end
t.wakeup if t
rescue ThreadError
retry
end end
}
end end
# #
@ -187,23 +184,26 @@ class Queue
# thread isn't suspended, and an exception is raised. # thread isn't suspended, and an exception is raised.
# #
def pop(non_block=false) def pop(non_block=false)
@mutex.synchronize{ Thread.async_interrupt_timing(StandardError => :on_blocking) do
begin @mutex.synchronize do
while true while true
if @que.empty? if @que.empty?
raise ThreadError, "queue empty" if non_block if non_block
# @waiting.include? check is necessary for avoiding a race against raise ThreadError, "queue empty"
# Thread.wakeup [Bug 5195] else
@waiting.push Thread.current unless @waiting.include?(Thread.current) begin
@mutex.sleep @num_waiting += 1
@cond.wait @mutex
ensure
@num_waiting -= 1
end
end
else else
return @que.shift return @que.shift
end end
end end
ensure
@waiting.delete(Thread.current)
end end
} end
end end
# #
@ -246,7 +246,7 @@ class Queue
# Returns the number of threads waiting on the queue. # Returns the number of threads waiting on the queue.
# #
def num_waiting def num_waiting
@waiting.size @num_waiting
end end
end end
@ -263,8 +263,8 @@ class SizedQueue < Queue
def initialize(max) def initialize(max)
raise ArgumentError, "queue size must be positive" unless max > 0 raise ArgumentError, "queue size must be positive" unless max > 0
@max = max @max = max
@queue_wait = [] @enque_cond = ConditionVariable.new
@queue_wait.taint # enable tainted comunication @num_enqueue_waiting = 0
super() super()
end end
@ -280,22 +280,15 @@ class SizedQueue < Queue
# #
def max=(max) def max=(max)
raise ArgumentError, "queue size must be positive" unless max > 0 raise ArgumentError, "queue size must be positive" unless max > 0
diff = nil
@mutex.synchronize { @mutex.synchronize do
if max <= @max if max <= @max
@max = max @max = max
else else
diff = max - @max diff = max - @max
@max = max @max = max
end
}
if diff
diff.times do diff.times do
begin @enque_cond.signal
t = @queue_wait.shift
t.run if t
rescue ThreadError
retry
end end
end end
end end
@ -307,25 +300,22 @@ class SizedQueue < Queue
# until space becomes available. # until space becomes available.
# #
def push(obj) def push(obj)
@mutex.synchronize{ Thread.async_interrupt_timing(RuntimeError => :on_blocking) do
begin @mutex.synchronize do
while true while true
break if @que.length < @max break if @que.length < @max
@queue_wait.push Thread.current unless @queue_wait.include?(Thread.current) @num_enqueue_waiting += 1
@mutex.sleep begin
end @enque_cond.wait @mutex
ensure ensure
@queue_wait.delete(Thread.current) @num_enqueue_waiting -= 1
end
end end
@que.push obj @que.push obj
begin @cond.signal
t = @waiting.shift end
t.wakeup if t
rescue ThreadError
retry
end end
}
end end
# #
@ -343,16 +333,11 @@ class SizedQueue < Queue
# #
def pop(*args) def pop(*args)
retval = super retval = super
@mutex.synchronize { @mutex.synchronize do
if @que.length < @max if @que.length < @max
begin @enque_cond.signal
t = @queue_wait.shift
t.wakeup if t
rescue ThreadError
retry
end end
end end
}
retval retval
end end
@ -370,7 +355,7 @@ class SizedQueue < Queue
# Returns the number of threads waiting on the queue. # Returns the number of threads waiting on the queue.
# #
def num_waiting def num_waiting
@waiting.size + @queue_wait.size @num_waiting + @num_enqueue_waiting
end end
end end

View File

@ -69,7 +69,8 @@ class TestQueue < Test::Unit::TestCase
t2 = Thread.start { sq.push(2) } t2 = Thread.start { sq.push(2) }
sleep 0.1 until t1.stop? && t2.stop? sleep 0.1 until t1.stop? && t2.stop?
queue_wait = sq.instance_eval{ @queue_wait } enque_cond = sq.instance_eval{ @enque_cond }
queue_wait = enque_cond.instance_eval { @waiters }
assert_equal(queue_wait.uniq, queue_wait) assert_equal(queue_wait.uniq, queue_wait)
end end