* lib/thread.rb (Queue#pop): Fixed double registration issue when
mutex.sleep is interrupted. [Bug #5258] [ruby-dev:44448] * lib/thread.rb (SizedQueue#push): ditto. * test/thread/test_queue.rb (test_sized_queue_and_wakeup, test_queue_pop_interrupt, test_sized_queue_pop_interrupt, test_sized_queue_push_interrupt): new tests. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36938 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
61f530500e
commit
7198053a49
10
ChangeLog
10
ChangeLog
@ -1,3 +1,13 @@
|
|||||||
|
Sun Sep 9 21:21:15 2012 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
|
||||||
|
|
||||||
|
* lib/thread.rb (Queue#pop): Fixed double registration issue when
|
||||||
|
mutex.sleep is interrupted. [Bug #5258] [ruby-dev:44448]
|
||||||
|
* lib/thread.rb (SizedQueue#push): ditto.
|
||||||
|
|
||||||
|
* test/thread/test_queue.rb (test_sized_queue_and_wakeup,
|
||||||
|
test_queue_pop_interrupt, test_sized_queue_pop_interrupt,
|
||||||
|
test_sized_queue_push_interrupt): new tests.
|
||||||
|
|
||||||
Sun Sep 9 20:20:31 2012 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
|
Sun Sep 9 20:20:31 2012 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
|
||||||
|
|
||||||
* lib/sync.rb (Sync_m#sync_lock): Fixed wakeup/raise unsafe code.
|
* lib/sync.rb (Sync_m#sync_lock): Fixed wakeup/raise unsafe code.
|
||||||
|
@ -182,16 +182,20 @@ class Queue
|
|||||||
#
|
#
|
||||||
def pop(non_block=false)
|
def pop(non_block=false)
|
||||||
@mutex.synchronize{
|
@mutex.synchronize{
|
||||||
while true
|
begin
|
||||||
if @que.empty?
|
while true
|
||||||
raise ThreadError, "queue empty" if non_block
|
if @que.empty?
|
||||||
# @waiting.include? check is necessary for avoiding a race against
|
raise ThreadError, "queue empty" if non_block
|
||||||
# Thread.wakeup [Bug 5195]
|
# @waiting.include? check is necessary for avoiding a race against
|
||||||
@waiting.push Thread.current unless @waiting.include?(Thread.current)
|
# Thread.wakeup [Bug 5195]
|
||||||
@mutex.sleep
|
@waiting.push Thread.current unless @waiting.include?(Thread.current)
|
||||||
else
|
@mutex.sleep
|
||||||
return @que.shift
|
else
|
||||||
|
return @que.shift
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
ensure
|
||||||
|
@waiting.delete(Thread.current)
|
||||||
end
|
end
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
@ -298,10 +302,14 @@ class SizedQueue < Queue
|
|||||||
#
|
#
|
||||||
def push(obj)
|
def push(obj)
|
||||||
@mutex.synchronize{
|
@mutex.synchronize{
|
||||||
while true
|
begin
|
||||||
break if @que.length < @max
|
while true
|
||||||
@queue_wait.push Thread.current
|
break if @que.length < @max
|
||||||
@mutex.sleep
|
@queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
|
||||||
|
@mutex.sleep
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
@queue_wait.delete(Thread.current)
|
||||||
end
|
end
|
||||||
|
|
||||||
@que.push obj
|
@que.push obj
|
||||||
|
@ -56,6 +56,48 @@ class TestQueue < Test::Unit::TestCase
|
|||||||
assert_equal(1, q.max)
|
assert_equal(1, q.max)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_sized_queue_and_wakeup
|
||||||
|
sq = SizedQueue.new(1)
|
||||||
|
sq.push(0)
|
||||||
|
|
||||||
|
t1 = Thread.start { sq.push(1) ; sleep }
|
||||||
|
|
||||||
|
sleep 0.1 until t1.stop?
|
||||||
|
t1.wakeup
|
||||||
|
sleep 0.1 until t1.stop?
|
||||||
|
|
||||||
|
t2 = Thread.start { sq.push(2) }
|
||||||
|
sleep 0.1 until t1.stop? && t2.stop?
|
||||||
|
|
||||||
|
queue_wait = sq.instance_eval{ @queue_wait }
|
||||||
|
assert_equal(queue_wait.uniq, queue_wait)
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_queue_pop_interrupt
|
||||||
|
q = Queue.new
|
||||||
|
t1 = Thread.new { q.pop }
|
||||||
|
sleep 0.01 until t1.stop?
|
||||||
|
t1.kill.join
|
||||||
|
assert_equal(0, q.num_waiting)
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_sized_queue_pop_interrupt
|
||||||
|
q = SizedQueue.new(1)
|
||||||
|
t1 = Thread.new { q.pop }
|
||||||
|
sleep 0.01 until t1.stop?
|
||||||
|
t1.kill.join
|
||||||
|
assert_equal(0, q.num_waiting)
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_sized_queue_push_interrupt
|
||||||
|
q = SizedQueue.new(1)
|
||||||
|
q.push(1)
|
||||||
|
t1 = Thread.new { q.push(2) }
|
||||||
|
sleep 0.01 until t1.stop?
|
||||||
|
t1.kill.join
|
||||||
|
assert_equal(0, q.num_waiting)
|
||||||
|
end
|
||||||
|
|
||||||
def test_thr_kill
|
def test_thr_kill
|
||||||
bug5343 = '[ruby-core:39634]'
|
bug5343 = '[ruby-core:39634]'
|
||||||
Dir.mktmpdir {|d|
|
Dir.mktmpdir {|d|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user