Fix compatibility with fiber schedulers that don't implement #fiber_interrupt
. (#13492)
This commit is contained in:
parent
20d7db8cba
commit
9a29252830
Notes:
git
2025-06-02 09:50:37 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
34
scheduler.c
34
scheduler.c
@ -170,6 +170,10 @@ verify_interface(VALUE scheduler)
|
|||||||
if (!rb_respond_to(scheduler, id_io_wait)) {
|
if (!rb_respond_to(scheduler, id_io_wait)) {
|
||||||
rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
|
rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
|
||||||
|
rb_warn("Scheduler should implement #fiber_interrupt");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
@ -458,7 +462,11 @@ rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeou
|
|||||||
scheduler, io, events, timeout
|
scheduler, io, events, timeout
|
||||||
};
|
};
|
||||||
|
|
||||||
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
|
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
|
||||||
|
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
|
||||||
|
} else {
|
||||||
|
return fiber_scheduler_io_wait((VALUE)&arguments);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
@ -546,7 +554,11 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
|
|||||||
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
|
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
|
||||||
};
|
};
|
||||||
|
|
||||||
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
|
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
|
||||||
|
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
|
||||||
|
} else {
|
||||||
|
return fiber_scheduler_io_read((VALUE)&arguments);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -581,7 +593,11 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
|
|||||||
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
|
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
|
||||||
};
|
};
|
||||||
|
|
||||||
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
|
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
|
||||||
|
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
|
||||||
|
} else {
|
||||||
|
return fiber_scheduler_io_pread((VALUE)&arguments);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -630,7 +646,11 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
|
|||||||
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
|
scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
|
||||||
};
|
};
|
||||||
|
|
||||||
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
|
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
|
||||||
|
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
|
||||||
|
} else {
|
||||||
|
return fiber_scheduler_io_write((VALUE)&arguments);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -666,7 +686,11 @@ rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buf
|
|||||||
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
|
scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
|
||||||
};
|
};
|
||||||
|
|
||||||
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
|
if (rb_respond_to(scheduler, id_fiber_interrupt)) {
|
||||||
|
return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
|
||||||
|
} else {
|
||||||
|
return fiber_scheduler_io_pwrite((VALUE)&arguments);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
|
35
thread.c
35
thread.c
@ -1721,6 +1721,12 @@ rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation
|
|||||||
ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list);
|
ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
|
||||||
|
{
|
||||||
|
ccan_list_del(&blocking_operation->list);
|
||||||
|
}
|
||||||
|
|
||||||
struct io_blocking_operation_arguments {
|
struct io_blocking_operation_arguments {
|
||||||
struct rb_io *io;
|
struct rb_io *io;
|
||||||
struct rb_io_blocking_operation *blocking_operation;
|
struct rb_io_blocking_operation *blocking_operation;
|
||||||
@ -1732,7 +1738,7 @@ io_blocking_operation_exit(VALUE _arguments)
|
|||||||
struct io_blocking_operation_arguments *arguments = (void*)_arguments;
|
struct io_blocking_operation_arguments *arguments = (void*)_arguments;
|
||||||
struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
|
struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
|
||||||
|
|
||||||
ccan_list_del(&blocking_operation->list);
|
rb_io_blocking_operation_pop(arguments->io, blocking_operation);
|
||||||
|
|
||||||
rb_io_t *io = arguments->io;
|
rb_io_t *io = arguments->io;
|
||||||
rb_thread_t *thread = io->closing_ec->thread_ptr;
|
rb_thread_t *thread = io->closing_ec->thread_ptr;
|
||||||
@ -1763,6 +1769,9 @@ rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation
|
|||||||
{
|
{
|
||||||
VALUE wakeup_mutex = io->wakeup_mutex;
|
VALUE wakeup_mutex = io->wakeup_mutex;
|
||||||
|
|
||||||
|
// Indicate that the blocking operation is no longer active:
|
||||||
|
blocking_operation->ec = NULL;
|
||||||
|
|
||||||
if (RB_TEST(wakeup_mutex)) {
|
if (RB_TEST(wakeup_mutex)) {
|
||||||
struct io_blocking_operation_arguments arguments = {
|
struct io_blocking_operation_arguments arguments = {
|
||||||
.io = io,
|
.io = io,
|
||||||
@ -1772,7 +1781,8 @@ rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation
|
|||||||
rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments);
|
rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
ccan_list_del(&blocking_operation->list);
|
// If there's no wakeup_mutex, we can safely remove the operation directly:
|
||||||
|
rb_io_blocking_operation_pop(io, blocking_operation);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1809,7 +1819,7 @@ rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argum
|
|||||||
struct rb_io_blocking_operation blocking_operation = {
|
struct rb_io_blocking_operation blocking_operation = {
|
||||||
.ec = ec,
|
.ec = ec,
|
||||||
};
|
};
|
||||||
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
|
rb_io_blocking_operation_enter(io, &blocking_operation);
|
||||||
|
|
||||||
struct io_blocking_operation_arguments io_blocking_operation_arguments = {
|
struct io_blocking_operation_arguments io_blocking_operation_arguments = {
|
||||||
.io = io,
|
.io = io,
|
||||||
@ -2765,13 +2775,20 @@ thread_io_close_notify_all(VALUE _io)
|
|||||||
ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) {
|
ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) {
|
||||||
rb_execution_context_t *ec = blocking_operation->ec;
|
rb_execution_context_t *ec = blocking_operation->ec;
|
||||||
|
|
||||||
rb_thread_t *thread = ec->thread_ptr;
|
// If the operation is in progress, we need to interrupt it:
|
||||||
|
if (ec) {
|
||||||
|
rb_thread_t *thread = ec->thread_ptr;
|
||||||
|
|
||||||
if (thread->scheduler != Qnil) {
|
VALUE result = RUBY_Qundef;
|
||||||
rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
|
if (thread->scheduler != Qnil) {
|
||||||
} else {
|
result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
|
||||||
rb_threadptr_pending_interrupt_enque(thread, error);
|
}
|
||||||
rb_threadptr_interrupt(thread);
|
|
||||||
|
if (result == RUBY_Qundef) {
|
||||||
|
// If the thread is not the current thread, we need to enqueue an error:
|
||||||
|
rb_threadptr_pending_interrupt_enque(thread, error);
|
||||||
|
rb_threadptr_interrupt(thread);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user