rb_io_blocking_operation_exit
should not execute with pending interrupts.
This commit is contained in:
parent
e093c3145a
commit
81a23c5793
Notes:
git
2025-06-06 04:13:29 +00:00
@ -25,7 +25,7 @@ struct rb_io_blocking_operation {
|
||||
// The linked list data structure.
|
||||
struct ccan_list_node list;
|
||||
|
||||
// The execution context of the blocking operation:
|
||||
// The execution context of the blocking operation.
|
||||
struct rb_execution_context_struct *ec;
|
||||
};
|
||||
|
||||
|
14
scheduler.c
14
scheduler.c
@ -422,6 +422,13 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
|
||||
// If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
|
||||
int saved_errno = errno;
|
||||
|
||||
#ifdef RUBY_DEBUG
|
||||
rb_execution_context_t *ec = GET_EC();
|
||||
if (ec->interrupt_flag) {
|
||||
rb_bug("rb_fiber_scheduler_unblock called with interrupt flags set");
|
||||
}
|
||||
#endif
|
||||
|
||||
VALUE result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
|
||||
|
||||
errno = saved_errno;
|
||||
@ -853,6 +860,13 @@ VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exc
|
||||
fiber, exception
|
||||
};
|
||||
|
||||
#ifdef RUBY_DEBUG
|
||||
rb_execution_context_t *ec = GET_EC();
|
||||
if (ec->interrupt_flag) {
|
||||
rb_bug("rb_fiber_scheduler_fiber_interrupt called with interrupt flags set");
|
||||
}
|
||||
#endif
|
||||
|
||||
return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ class TestFiberIO < Test::Unit::TestCase
|
||||
omit unless defined?(UNIXSocket)
|
||||
|
||||
i, o = UNIXSocket.pair
|
||||
if RUBY_PLATFORM=~/mswin|mingw/
|
||||
if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
i.nonblock = true
|
||||
o.nonblock = true
|
||||
end
|
||||
@ -44,7 +44,7 @@ class TestFiberIO < Test::Unit::TestCase
|
||||
16.times.map do
|
||||
Thread.new do
|
||||
i, o = UNIXSocket.pair
|
||||
if RUBY_PLATFORM=~/mswin|mingw/
|
||||
if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
i.nonblock = true
|
||||
o.nonblock = true
|
||||
end
|
||||
@ -67,7 +67,7 @@ class TestFiberIO < Test::Unit::TestCase
|
||||
|
||||
def test_epipe_on_read
|
||||
omit unless defined?(UNIXSocket)
|
||||
omit "nonblock=true isn't properly supported on Windows" if RUBY_PLATFORM=~/mswin|mingw/
|
||||
omit "nonblock=true isn't properly supported on Windows" if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
|
||||
i, o = UNIXSocket.pair
|
||||
|
||||
@ -242,38 +242,37 @@ class TestFiberIO < Test::Unit::TestCase
|
||||
# Windows has UNIXSocket, but only with VS 2019+
|
||||
omit "UNIXSocket is not defined!" unless defined?(UNIXSocket)
|
||||
|
||||
i, o = Socket.pair(:UNIX, :STREAM)
|
||||
if RUBY_PLATFORM=~/mswin|mingw/
|
||||
i.nonblock = true
|
||||
o.nonblock = true
|
||||
end
|
||||
|
||||
reading_thread = Thread.new do
|
||||
Thread.current.report_on_exception = false
|
||||
i.wait_readable
|
||||
end
|
||||
|
||||
fs_thread = Thread.new do
|
||||
# Wait until the reading thread is blocked on read:
|
||||
Thread.pass until reading_thread.status == "sleep"
|
||||
|
||||
scheduler = Scheduler.new
|
||||
Fiber.set_scheduler scheduler
|
||||
Fiber.schedule do
|
||||
i.close
|
||||
Socket.pair(:UNIX, :STREAM) do |i, o|
|
||||
if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
i.nonblock = true
|
||||
o.nonblock = true
|
||||
end
|
||||
|
||||
reading_thread = Thread.new do
|
||||
Thread.current.report_on_exception = false
|
||||
i.wait_readable
|
||||
end
|
||||
|
||||
scheduler_thread = Thread.new do
|
||||
# Wait until the reading thread is blocked on read:
|
||||
Thread.pass until reading_thread.status == "sleep"
|
||||
|
||||
scheduler = Scheduler.new
|
||||
Fiber.set_scheduler scheduler
|
||||
Fiber.schedule do
|
||||
i.close
|
||||
end
|
||||
end
|
||||
|
||||
assert_raise(IOError) { reading_thread.join }
|
||||
refute_nil scheduler_thread.join(5), "expected thread to terminate within 5 seconds"
|
||||
|
||||
assert_predicate(i, :closed?)
|
||||
ensure
|
||||
scheduler_thread&.kill
|
||||
scheduler_thread&.join rescue nil
|
||||
reading_thread&.kill
|
||||
reading_thread&.join rescue nil
|
||||
end
|
||||
|
||||
assert_raise(IOError) { reading_thread.join }
|
||||
refute_nil fs_thread.join(5), "expected thread to terminate within 5 seconds"
|
||||
|
||||
assert_predicate(i, :closed?)
|
||||
ensure
|
||||
fs_thread&.kill
|
||||
fs_thread&.join rescue nil
|
||||
reading_thread&.kill
|
||||
reading_thread&.join rescue nil
|
||||
i&.close
|
||||
o&.close
|
||||
end
|
||||
end
|
||||
|
110
test/fiber/test_io_close.rb
Normal file
110
test/fiber/test_io_close.rb
Normal file
@ -0,0 +1,110 @@
|
||||
# frozen_string_literal: true
|
||||
require 'test/unit'
|
||||
require_relative 'scheduler'
|
||||
|
||||
class TestFiberIOClose < Test::Unit::TestCase
|
||||
def with_socket_pair(&block)
|
||||
omit "UNIXSocket is not defined!" unless defined?(UNIXSocket)
|
||||
|
||||
UNIXSocket.pair do |i, o|
|
||||
if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
i.nonblock = true
|
||||
o.nonblock = true
|
||||
end
|
||||
|
||||
yield i, o
|
||||
end
|
||||
end
|
||||
|
||||
# Problematic on Windows.
|
||||
def test_io_close_across_fibers
|
||||
omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
|
||||
with_socket_pair do |i, o|
|
||||
error = nil
|
||||
|
||||
thread = Thread.new do
|
||||
scheduler = Scheduler.new
|
||||
Fiber.set_scheduler scheduler
|
||||
|
||||
Fiber.schedule do
|
||||
i.read
|
||||
rescue => error
|
||||
# Ignore.
|
||||
end
|
||||
|
||||
Fiber.schedule do
|
||||
i.close
|
||||
end
|
||||
end
|
||||
|
||||
thread.join
|
||||
|
||||
assert_instance_of IOError, error
|
||||
assert_match(/closed/, error.message)
|
||||
end
|
||||
end
|
||||
|
||||
# Okay on all platforms.
|
||||
def test_io_close_blocking_thread
|
||||
omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
|
||||
with_socket_pair do |i, o|
|
||||
error = nil
|
||||
|
||||
reading_thread = Thread.new do
|
||||
i.read
|
||||
rescue => error
|
||||
# Ignore.
|
||||
end
|
||||
|
||||
Thread.pass until reading_thread.status == 'sleep'
|
||||
|
||||
thread = Thread.new do
|
||||
scheduler = Scheduler.new
|
||||
Fiber.set_scheduler scheduler
|
||||
|
||||
Fiber.schedule do
|
||||
i.close
|
||||
end
|
||||
end
|
||||
|
||||
thread.join
|
||||
reading_thread.join
|
||||
|
||||
assert_instance_of IOError, error
|
||||
assert_match(/closed/, error.message)
|
||||
end
|
||||
end
|
||||
|
||||
# Problematic on Windows.
|
||||
def test_io_close_blocking_fiber
|
||||
omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
|
||||
|
||||
with_socket_pair do |i, o|
|
||||
error = nil
|
||||
|
||||
thread = Thread.new do
|
||||
scheduler = Scheduler.new
|
||||
Fiber.set_scheduler scheduler
|
||||
|
||||
Fiber.schedule do
|
||||
begin
|
||||
i.read
|
||||
rescue => error
|
||||
# Ignore.
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Thread.pass until thread.status == 'sleep'
|
||||
|
||||
i.close
|
||||
|
||||
thread.join
|
||||
|
||||
assert_instance_of IOError, error
|
||||
assert_match(/closed/, error.message)
|
||||
end
|
||||
end
|
||||
end
|
15
thread.c
15
thread.c
@ -207,6 +207,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
|
||||
static inline int
|
||||
vm_check_ints_blocking(rb_execution_context_t *ec)
|
||||
{
|
||||
#ifdef RUBY_ASSERT_CRITICAL_SECTION
|
||||
VM_ASSERT(ruby_assert_critical_section_entered == 0);
|
||||
#endif
|
||||
|
||||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||
|
||||
if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
|
||||
@ -1947,6 +1951,9 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
goto retry;
|
||||
}
|
||||
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
|
||||
state = saved_state;
|
||||
}
|
||||
EC_POP_TAG();
|
||||
@ -1961,9 +1968,6 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
|
||||
EC_JUMP_TAG(ec, state);
|
||||
}
|
||||
|
||||
/* TODO: check func() */
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
|
||||
// If the error was a timeout, we raise a specific exception for that:
|
||||
if (saved_errno == ETIMEDOUT) {
|
||||
rb_raise(rb_eIOTimeoutError, "Blocking operation timed out!");
|
||||
@ -4471,6 +4475,8 @@ do_select(VALUE p)
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
|
||||
} while (wait_retryable(&result, lerrno, to, endtime) && do_select_update());
|
||||
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec);
|
||||
|
||||
if (result < 0) {
|
||||
errno = lerrno;
|
||||
}
|
||||
@ -4591,7 +4597,10 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
||||
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
} while (wait_retryable(&result, lerrno, to, end));
|
||||
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
}
|
||||
|
||||
EC_POP_TAG();
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user