Improve robustness of io_wait
implementation. (#7456)
- Restore correct handling of `duration`. - Don't delete from `@readable` or `@writable` unless it was added. - A little more documentation.
This commit is contained in:
parent
66c4dc1592
commit
7abe47b85a
Notes:
git
2023-03-07 06:39:18 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
@ -1,8 +1,13 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
# This is an example and simplified scheduler for test purposes.
|
||||
# It is not efficient for a large number of file descriptors as it uses IO.select().
|
||||
# Production Fiber schedulers should use epoll/kqueue/etc.
|
||||
# - It is not efficient for a large number of file descriptors as it uses
|
||||
# IO.select().
|
||||
# - It does not correctly handle multiple calls to `wait` with the same file
|
||||
# descriptor and overlapping events.
|
||||
# - Production fiber schedulers should use epoll/kqueue/etc. Consider using the
|
||||
# [`io-event`](https://github.com/socketry/io-event) gem instead of this
|
||||
# scheduler if you want something simple to build on.
|
||||
|
||||
require 'fiber'
|
||||
require 'socket'
|
||||
@ -58,7 +63,7 @@ class Scheduler
|
||||
# $stderr.puts [__method__, Fiber.current].inspect
|
||||
|
||||
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
|
||||
# Can only handle file descriptors up to 1024...
|
||||
# May only handle file descriptors up to 1024...
|
||||
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
|
||||
|
||||
# puts "readable: #{readable}" if readable&.any?
|
||||
@ -115,10 +120,16 @@ class Scheduler
|
||||
end
|
||||
end
|
||||
|
||||
# A fiber scheduler hook, invoked when the scheduler goes out of scope.
|
||||
def scheduler_close
|
||||
close(true)
|
||||
end
|
||||
|
||||
# If the `scheduler_close` hook does not exist, this method `close` will be
|
||||
# invoked instead when the fiber scheduler goes out of scope. This is legacy
|
||||
# behaviour, you should almost certainly use `scheduler_close`. The reason for
|
||||
# this, is `scheduler_close` is called when the scheduler goes out of scope,
|
||||
# while `close` may be called by the user.
|
||||
def close(internal = false)
|
||||
# $stderr.puts [__method__, Fiber.current].inspect
|
||||
|
||||
@ -153,6 +164,7 @@ class Scheduler
|
||||
Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
end
|
||||
|
||||
# This hook is invoked by `Timeout.timeout` and related code.
|
||||
def timeout_after(duration, klass, message, &block)
|
||||
fiber = Fiber.current
|
||||
|
||||
@ -171,6 +183,7 @@ class Scheduler
|
||||
end
|
||||
end
|
||||
|
||||
# This hook is invoked by `Process.wait`, `system`, and backticks.
|
||||
def process_wait(pid, flags)
|
||||
# $stderr.puts [__method__, pid, flags, Fiber.current].inspect
|
||||
|
||||
@ -180,23 +193,39 @@ class Scheduler
|
||||
end.value
|
||||
end
|
||||
|
||||
# This hook is invoked by `IO#read` and `IO#write` in the case that `io_read`
|
||||
# and `io_write` hooks are not available. This implementation is not
|
||||
# completely general, in the sense that calling `io_wait` multiple times with
|
||||
# the same `io` and `events` will not work, which is okay for tests but not
|
||||
# for real code. Correct fiber schedulers should not have this limitation.
|
||||
def io_wait(io, events, duration)
|
||||
# $stderr.puts [__method__, io, events, duration, Fiber.current].inspect
|
||||
|
||||
fiber = Fiber.current
|
||||
|
||||
unless (events & IO::READABLE).zero?
|
||||
@readable[io] = Fiber.current
|
||||
@readable[io] = fiber
|
||||
readable = true
|
||||
end
|
||||
|
||||
unless (events & IO::WRITABLE).zero?
|
||||
@writable[io] = Fiber.current
|
||||
@writable[io] = fiber
|
||||
writable = true
|
||||
end
|
||||
|
||||
if duration
|
||||
@waiting[fiber] = current_time + duration
|
||||
end
|
||||
|
||||
Fiber.yield
|
||||
ensure
|
||||
@readable.delete(io)
|
||||
@writable.delete(io)
|
||||
@waiting.delete(fiber) if duration
|
||||
@readable.delete(io) if readable
|
||||
@writable.delete(io) if writable
|
||||
end
|
||||
|
||||
# This hook is invoked by `IO.select`. Using a thread ensures that the
|
||||
# operation does not block the fiber scheduler.
|
||||
def io_select(...)
|
||||
# Emulate the operation using a non-blocking thread:
|
||||
Thread.new do
|
||||
@ -204,7 +233,7 @@ class Scheduler
|
||||
end.value
|
||||
end
|
||||
|
||||
# Used for Kernel#sleep and Thread::Mutex#sleep
|
||||
# This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`.
|
||||
def kernel_sleep(duration = nil)
|
||||
# $stderr.puts [__method__, duration, Fiber.current].inspect
|
||||
|
||||
@ -213,8 +242,10 @@ class Scheduler
|
||||
return true
|
||||
end
|
||||
|
||||
# Used when blocking on synchronization (Thread::Mutex#lock,
|
||||
# Thread::Queue#pop, Thread::SizedQueue#push, ...)
|
||||
# This hook is invoked by blocking options such as `Thread::Mutex#lock`,
|
||||
# `Thread::Queue#pop` and `Thread::SizedQueue#push`, which are unblocked by
|
||||
# other threads/fibers. To unblock a blocked fiber, you should call `unblock`
|
||||
# with the same `blocker` and `fiber` arguments.
|
||||
def block(blocker, timeout = nil)
|
||||
# $stderr.puts [__method__, blocker, timeout].inspect
|
||||
|
||||
@ -238,9 +269,8 @@ class Scheduler
|
||||
end
|
||||
end
|
||||
|
||||
# Used when synchronization wakes up a previously-blocked fiber
|
||||
# (Thread::Mutex#unlock, Thread::Queue#push, ...).
|
||||
# This might be called from another thread.
|
||||
# This method is invoked from a thread or fiber to unblock a fiber that is
|
||||
# blocked by `block`. It is expected to be thread safe.
|
||||
def unblock(blocker, fiber)
|
||||
# $stderr.puts [__method__, blocker, fiber].inspect
|
||||
# $stderr.puts blocker.backtrace.inspect
|
||||
@ -254,6 +284,9 @@ class Scheduler
|
||||
io.write_nonblock('.')
|
||||
end
|
||||
|
||||
# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
|
||||
# it to create scheduled fibers, but it is not required in practice;
|
||||
# `Fiber.new` is usually sufficient.
|
||||
def fiber(&block)
|
||||
fiber = Fiber.new(blocking: false, &block)
|
||||
|
||||
@ -262,6 +295,9 @@ class Scheduler
|
||||
return fiber
|
||||
end
|
||||
|
||||
# This hook is invoked by `Addrinfo.getaddrinfo`. Using a thread ensures that
|
||||
# the operation does not block the fiber scheduler, since `getaddrinfo` is
|
||||
# usually provided by `libc` and is blocking.
|
||||
def address_resolve(hostname)
|
||||
Thread.new do
|
||||
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
|
||||
@ -269,6 +305,8 @@ class Scheduler
|
||||
end
|
||||
end
|
||||
|
||||
# This scheduler class implements `io_read` and `io_write` hooks which require
|
||||
# `IO::Buffer`.
|
||||
class IOBufferScheduler < Scheduler
|
||||
EAGAIN = -Errno::EAGAIN::Errno
|
||||
|
||||
@ -333,6 +371,9 @@ class IOBufferScheduler < Scheduler
|
||||
end
|
||||
end
|
||||
|
||||
# This scheduler has a broken implementation of `unblock`` in the sense that it
|
||||
# raises an exception. This is used to test the behavior of the scheduler when
|
||||
# unblock raises an exception.
|
||||
class BrokenUnblockScheduler < Scheduler
|
||||
def unblock(blocker, fiber)
|
||||
super
|
||||
@ -341,6 +382,9 @@ class BrokenUnblockScheduler < Scheduler
|
||||
end
|
||||
end
|
||||
|
||||
# This scheduler has a broken implementation of `unblock` in the sense that it
|
||||
# sleeps. This is used to test the behavior of the scheduler when unblock
|
||||
# messes with the internal thread state in an unexpected way.
|
||||
class SleepingUnblockScheduler < Scheduler
|
||||
# This method is invoked when the thread is exiting.
|
||||
def unblock(blocker, fiber)
|
||||
@ -351,6 +395,8 @@ class SleepingUnblockScheduler < Scheduler
|
||||
end
|
||||
end
|
||||
|
||||
# This scheduler has a broken implementation of `kernel_sleep` in the sense that
|
||||
# it invokes a blocking sleep which can cause a deadlock in some cases.
|
||||
class SleepingBlockingScheduler < Scheduler
|
||||
def kernel_sleep(duration = nil)
|
||||
# Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
|
||||
|
Loading…
x
Reference in New Issue
Block a user