Urgent notification pipe has same lifetime as scheduler.

This commit is contained in:
Samuel Williams 2020-11-08 19:55:27 +13:00
parent 57b83dad4c
commit bed4848661
Notes: git 2020-11-08 16:41:17 +09:00

View File

@ -25,7 +25,7 @@ class Scheduler
@blocking = 0 @blocking = 0
@ready = [] @ready = []
@urgent = nil @urgent = IO.pipe
end end
attr :readable attr :readable
@ -47,8 +47,6 @@ class Scheduler
end end
def run def run
@urgent = IO.pipe
while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive?
# Can only handle file descriptors up to 1024... # Can only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
@ -95,9 +93,6 @@ class Scheduler
end end
end end
end end
ensure
@urgent.each(&:close)
@urgent = nil
end end
def close def close
@ -105,6 +100,9 @@ class Scheduler
self.run self.run
ensure ensure
@urgent.each(&:close)
@urgent = nil
@closed = true @closed = true
# We freeze to detect any unintended modifications after the scheduler is closed: # We freeze to detect any unintended modifications after the scheduler is closed:
@ -142,7 +140,8 @@ class Scheduler
# Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, ...) # Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, ...)
def block(blocker, timeout = nil) def block(blocker, timeout = nil)
# p [__method__, blocker, timeout] # $stderr.puts [__method__, blocker, timeout].inspect
if timeout if timeout
@waiting[Fiber.current] = current_time + timeout @waiting[Fiber.current] = current_time + timeout
begin begin
@ -164,14 +163,14 @@ class Scheduler
# Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, ...). # Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, ...).
# This might be called from another thread. # This might be called from another thread.
def unblock(blocker, fiber) def unblock(blocker, fiber)
# p [__method__, blocker, fiber] # $stderr.puts [__method__, blocker, fiber].inspect
@lock.synchronize do @lock.synchronize do
@ready << fiber @ready << fiber
end end
if io = @urgent&.last io = @urgent.last
io.write_nonblock('.') io.write_nonblock('.')
end
end end
def fiber(&block) def fiber(&block)