Ensure that forked process do not see invalid blocking operations. (#13343)
This commit is contained in:
parent
49b306ecb9
commit
87261c2d95
Notes:
git
2025-05-15 06:50:28 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
@ -15,6 +15,7 @@ struct rb_io;
|
|||||||
|
|
||||||
#include "ruby/io.h" /* for rb_io_t */
|
#include "ruby/io.h" /* for rb_io_t */
|
||||||
#include "ccan/list/list.h"
|
#include "ccan/list/list.h"
|
||||||
|
#include "serial.h"
|
||||||
|
|
||||||
#define IO_WITHOUT_GVL(func, arg) rb_nogvl(func, arg, RUBY_UBF_IO, 0, RB_NOGVL_OFFLOAD_SAFE)
|
#define IO_WITHOUT_GVL(func, arg) rb_nogvl(func, arg, RUBY_UBF_IO, 0, RB_NOGVL_OFFLOAD_SAFE)
|
||||||
#define IO_WITHOUT_GVL_INT(func, arg) (int)(VALUE)IO_WITHOUT_GVL(func, arg)
|
#define IO_WITHOUT_GVL_INT(func, arg) (int)(VALUE)IO_WITHOUT_GVL(func, arg)
|
||||||
@ -130,6 +131,9 @@ struct rb_io {
|
|||||||
struct ccan_list_head blocking_operations;
|
struct ccan_list_head blocking_operations;
|
||||||
struct rb_execution_context_struct *closing_ec;
|
struct rb_execution_context_struct *closing_ec;
|
||||||
VALUE wakeup_mutex;
|
VALUE wakeup_mutex;
|
||||||
|
|
||||||
|
// The fork generation of the the blocking operations list.
|
||||||
|
rb_serial_t fork_generation;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* io.c */
|
/* io.c */
|
||||||
|
4
io.c
4
io.c
@ -8570,6 +8570,7 @@ rb_io_init_copy(VALUE dest, VALUE io)
|
|||||||
ccan_list_head_init(&fptr->blocking_operations);
|
ccan_list_head_init(&fptr->blocking_operations);
|
||||||
fptr->closing_ec = NULL;
|
fptr->closing_ec = NULL;
|
||||||
fptr->wakeup_mutex = Qnil;
|
fptr->wakeup_mutex = Qnil;
|
||||||
|
fptr->fork_generation = GET_VM()->fork_gen;
|
||||||
|
|
||||||
if (!NIL_P(orig->pathv)) fptr->pathv = orig->pathv;
|
if (!NIL_P(orig->pathv)) fptr->pathv = orig->pathv;
|
||||||
fptr_copy_finalizer(fptr, orig);
|
fptr_copy_finalizer(fptr, orig);
|
||||||
@ -9311,6 +9312,7 @@ rb_io_open_descriptor(VALUE klass, int descriptor, int mode, VALUE path, VALUE t
|
|||||||
ccan_list_head_init(&io->blocking_operations);
|
ccan_list_head_init(&io->blocking_operations);
|
||||||
io->closing_ec = NULL;
|
io->closing_ec = NULL;
|
||||||
io->wakeup_mutex = Qnil;
|
io->wakeup_mutex = Qnil;
|
||||||
|
io->fork_generation = GET_VM()->fork_gen;
|
||||||
|
|
||||||
if (encoding) {
|
if (encoding) {
|
||||||
io->encs = *encoding;
|
io->encs = *encoding;
|
||||||
@ -9454,6 +9456,7 @@ rb_io_fptr_new(void)
|
|||||||
ccan_list_head_init(&fp->blocking_operations);
|
ccan_list_head_init(&fp->blocking_operations);
|
||||||
fp->closing_ec = NULL;
|
fp->closing_ec = NULL;
|
||||||
fp->wakeup_mutex = Qnil;
|
fp->wakeup_mutex = Qnil;
|
||||||
|
fp->fork_generation = GET_VM()->fork_gen;
|
||||||
return fp;
|
return fp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -9587,6 +9590,7 @@ io_initialize(VALUE io, VALUE fnum, VALUE vmode, VALUE opt)
|
|||||||
ccan_list_head_init(&fp->blocking_operations);
|
ccan_list_head_init(&fp->blocking_operations);
|
||||||
fp->closing_ec = NULL;
|
fp->closing_ec = NULL;
|
||||||
fp->wakeup_mutex = Qnil;
|
fp->wakeup_mutex = Qnil;
|
||||||
|
fp->fork_generation = GET_VM()->fork_gen;
|
||||||
clear_codeconv(fp);
|
clear_codeconv(fp);
|
||||||
io_check_tty(fp);
|
io_check_tty(fp);
|
||||||
if (fileno(stdin) == fd)
|
if (fileno(stdin) == fd)
|
||||||
|
@ -4416,4 +4416,29 @@ __END__
|
|||||||
end
|
end
|
||||||
RUBY
|
RUBY
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_fork_close
|
||||||
|
omit "fork is not supported" unless Process.respond_to?(:fork)
|
||||||
|
|
||||||
|
assert_separately([], <<~'RUBY')
|
||||||
|
r, w = IO.pipe
|
||||||
|
|
||||||
|
thread = Thread.new do
|
||||||
|
r.read
|
||||||
|
end
|
||||||
|
|
||||||
|
Thread.pass until thread.status == "sleep"
|
||||||
|
|
||||||
|
pid = fork do
|
||||||
|
r.close
|
||||||
|
end
|
||||||
|
|
||||||
|
w.close
|
||||||
|
|
||||||
|
status = Process.wait2(pid).last
|
||||||
|
thread.join
|
||||||
|
|
||||||
|
assert_predicate(status, :success?)
|
||||||
|
RUBY
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
44
thread.c
44
thread.c
@ -1693,13 +1693,32 @@ waitfd_to_waiting_flag(int wfd_event)
|
|||||||
return wfd_event << 1;
|
return wfd_event << 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct ccan_list_head *
|
||||||
|
rb_io_blocking_operations(struct rb_io *io)
|
||||||
|
{
|
||||||
|
rb_serial_t fork_generation = GET_VM()->fork_gen;
|
||||||
|
|
||||||
|
// On fork, all existing entries in this list (which are stack allocated) become invalid. Therefore, we re-initialize the list which clears it.
|
||||||
|
if (io->fork_generation != fork_generation) {
|
||||||
|
ccan_list_head_init(&io->blocking_operations);
|
||||||
|
io->fork_generation = fork_generation;
|
||||||
|
}
|
||||||
|
|
||||||
|
return &io->blocking_operations;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) {
|
||||||
|
ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list);
|
||||||
|
}
|
||||||
|
|
||||||
struct io_blocking_operation_arguments {
|
struct io_blocking_operation_arguments {
|
||||||
struct rb_io *io;
|
struct rb_io *io;
|
||||||
struct rb_io_blocking_operation *blocking_operation;
|
struct rb_io_blocking_operation *blocking_operation;
|
||||||
};
|
};
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
io_blocking_operation_release(VALUE _arguments) {
|
io_blocking_operation_exit(VALUE _arguments) {
|
||||||
struct io_blocking_operation_arguments *arguments = (void*)_arguments;
|
struct io_blocking_operation_arguments *arguments = (void*)_arguments;
|
||||||
struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
|
struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
|
||||||
|
|
||||||
@ -1719,7 +1738,7 @@ io_blocking_operation_release(VALUE _arguments) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
rb_io_blocking_operation_release(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
|
rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
|
||||||
{
|
{
|
||||||
VALUE wakeup_mutex = io->wakeup_mutex;
|
VALUE wakeup_mutex = io->wakeup_mutex;
|
||||||
|
|
||||||
@ -1729,7 +1748,7 @@ rb_io_blocking_operation_release(struct rb_io *io, struct rb_io_blocking_operati
|
|||||||
.blocking_operation = blocking_operation
|
.blocking_operation = blocking_operation
|
||||||
};
|
};
|
||||||
|
|
||||||
rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_release, (VALUE)&arguments);
|
rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments);
|
||||||
} else {
|
} else {
|
||||||
ccan_list_del(&blocking_operation->list);
|
ccan_list_del(&blocking_operation->list);
|
||||||
}
|
}
|
||||||
@ -1824,7 +1843,7 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
|
|||||||
struct rb_io_blocking_operation blocking_operation = {
|
struct rb_io_blocking_operation blocking_operation = {
|
||||||
.ec = ec,
|
.ec = ec,
|
||||||
};
|
};
|
||||||
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
|
rb_io_blocking_operation_enter(io, &blocking_operation);
|
||||||
|
|
||||||
{
|
{
|
||||||
EC_PUSH_TAG(ec);
|
EC_PUSH_TAG(ec);
|
||||||
@ -1851,7 +1870,7 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
|
|||||||
th->mn_schedulable = prev_mn_schedulable;
|
th->mn_schedulable = prev_mn_schedulable;
|
||||||
}
|
}
|
||||||
|
|
||||||
rb_io_blocking_operation_release(io, &blocking_operation);
|
rb_io_blocking_operation_exit(io, &blocking_operation);
|
||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
EC_JUMP_TAG(ec, state);
|
EC_JUMP_TAG(ec, state);
|
||||||
@ -2658,10 +2677,11 @@ thread_io_close_notify_all(struct rb_io *io)
|
|||||||
VALUE error = vm->special_exceptions[ruby_error_stream_closed];
|
VALUE error = vm->special_exceptions[ruby_error_stream_closed];
|
||||||
|
|
||||||
struct rb_io_blocking_operation *blocking_operation;
|
struct rb_io_blocking_operation *blocking_operation;
|
||||||
ccan_list_for_each(&io->blocking_operations, blocking_operation, list) {
|
ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) {
|
||||||
rb_execution_context_t *ec = blocking_operation->ec;
|
rb_execution_context_t *ec = blocking_operation->ec;
|
||||||
|
|
||||||
rb_thread_t *thread = ec->thread_ptr;
|
rb_thread_t *thread = ec->thread_ptr;
|
||||||
|
|
||||||
rb_threadptr_pending_interrupt_enque(thread, error);
|
rb_threadptr_pending_interrupt_enque(thread, error);
|
||||||
|
|
||||||
// This operation is slow:
|
// This operation is slow:
|
||||||
@ -2684,7 +2704,7 @@ rb_thread_io_close_interrupt(struct rb_io *io)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// If there are no blocking operations, we are done:
|
// If there are no blocking operations, we are done:
|
||||||
if (ccan_list_empty(&io->blocking_operations)) {
|
if (ccan_list_empty(rb_io_blocking_operations(io))) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2709,7 +2729,7 @@ rb_thread_io_close_wait(struct rb_io* io)
|
|||||||
}
|
}
|
||||||
|
|
||||||
rb_mutex_lock(wakeup_mutex);
|
rb_mutex_lock(wakeup_mutex);
|
||||||
while (!ccan_list_empty(&io->blocking_operations)) {
|
while (!ccan_list_empty(rb_io_blocking_operations(io))) {
|
||||||
rb_mutex_sleep(wakeup_mutex, Qnil);
|
rb_mutex_sleep(wakeup_mutex, Qnil);
|
||||||
}
|
}
|
||||||
rb_mutex_unlock(wakeup_mutex);
|
rb_mutex_unlock(wakeup_mutex);
|
||||||
@ -4435,7 +4455,7 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
|||||||
|
|
||||||
if (io) {
|
if (io) {
|
||||||
blocking_operation.ec = ec;
|
blocking_operation.ec = ec;
|
||||||
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
|
rb_io_blocking_operation_enter(io, &blocking_operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
|
if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
|
||||||
@ -4461,7 +4481,7 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (io) {
|
if (io) {
|
||||||
rb_io_blocking_operation_release(io, &blocking_operation);
|
rb_io_blocking_operation_exit(io, &blocking_operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
@ -4539,7 +4559,7 @@ select_single_cleanup(VALUE ptr)
|
|||||||
struct select_args *args = (struct select_args *)ptr;
|
struct select_args *args = (struct select_args *)ptr;
|
||||||
|
|
||||||
if (args->blocking_operation) {
|
if (args->blocking_operation) {
|
||||||
rb_io_blocking_operation_release(args->io, args->blocking_operation);
|
rb_io_blocking_operation_exit(args->io, args->blocking_operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (args->read) rb_fd_term(args->read);
|
if (args->read) rb_fd_term(args->read);
|
||||||
@ -4572,7 +4592,7 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
|||||||
if (io) {
|
if (io) {
|
||||||
args.io = io;
|
args.io = io;
|
||||||
blocking_operation.ec = GET_EC();
|
blocking_operation.ec = GET_EC();
|
||||||
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
|
rb_io_blocking_operation_enter(io, &blocking_operation);
|
||||||
args.blocking_operation = &blocking_operation;
|
args.blocking_operation = &blocking_operation;
|
||||||
} else {
|
} else {
|
||||||
args.io = NULL;
|
args.io = NULL;
|
||||||
|
@ -730,7 +730,6 @@ typedef struct rb_vm_struct {
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
rb_serial_t fork_gen;
|
rb_serial_t fork_gen;
|
||||||
struct ccan_list_head waiting_fds; /* <=> struct waiting_fd */
|
|
||||||
|
|
||||||
/* set in single-threaded processes only: */
|
/* set in single-threaded processes only: */
|
||||||
volatile int ubf_async_safe;
|
volatile int ubf_async_safe;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user