Introduce Fiber::Scheduler#blocking_operation_wait
. (#12016)
Redirect `rb_nogvl` blocking operations to the fiber scheduler if possible to prevent stalling the event loop. [Feature #20876]
This commit is contained in:
parent
86b1c83857
commit
9c268302bf
Notes:
git
2024-11-20 06:40:38 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
6
NEWS.md
6
NEWS.md
@ -61,6 +61,11 @@ Note: We're only listing outstanding class updates.
|
||||
associated with the AST node. [[Feature #20624]]
|
||||
* Add RubyVM::AbstractSyntaxTree::Location class which holds location information. [[Feature #20624]]
|
||||
|
||||
* Fiber::Scheduler
|
||||
|
||||
* An optional `Fiber::Scheduler#blocking_operation_wait` hook allows blocking operations to be moved out of the
|
||||
event loop in order to reduce latency and improve multi-core processor utilization. [[Feature #20876]]
|
||||
|
||||
## Stdlib updates
|
||||
|
||||
* Tempfile
|
||||
@ -236,3 +241,4 @@ details of the default gems or bundled gems.
|
||||
[Feature #20497]: https://bugs.ruby-lang.org/issues/20497
|
||||
[Feature #20624]: https://bugs.ruby-lang.org/issues/20624
|
||||
[Feature #20775]: https://bugs.ruby-lang.org/issues/20775
|
||||
[Feature #20876]: https://bugs.ruby-lang.org/issues/20876
|
||||
|
@ -16689,6 +16689,7 @@ scheduler.$(OBJEXT): {$(VPATH)}scheduler.c
|
||||
scheduler.$(OBJEXT): {$(VPATH)}shape.h
|
||||
scheduler.$(OBJEXT): {$(VPATH)}st.h
|
||||
scheduler.$(OBJEXT): {$(VPATH)}subst.h
|
||||
scheduler.$(OBJEXT): {$(VPATH)}thread.h
|
||||
scheduler.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
|
||||
scheduler.$(OBJEXT): {$(VPATH)}thread_native.h
|
||||
scheduler.$(OBJEXT): {$(VPATH)}vm_core.h
|
||||
|
@ -391,6 +391,26 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io);
|
||||
*/
|
||||
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);
|
||||
|
||||
struct rb_fiber_scheduler_blocking_operation_state {
|
||||
void *result;
|
||||
int saved_errno;
|
||||
};
|
||||
|
||||
/**
|
||||
* Defer the execution of the passed function to the scheduler.
|
||||
*
|
||||
* @param[in] scheduler Target scheduler.
|
||||
* @param[in] function The function to run.
|
||||
* @param[in] data The data to pass to the function.
|
||||
* @param[in] unblock_function The unblock function to use to interrupt the operation.
|
||||
* @param[in] data2 The data to pass to the unblock function.
|
||||
* @param[in] flags Flags passed to `rb_nogvl`.
|
||||
* @param[out] state The result and errno of the operation.
|
||||
* @retval RUBY_Qundef `scheduler` doesn't have `#blocking_operation_wait`.
|
||||
* @return otherwise What `scheduler.blocking_operation_wait` returns.
|
||||
*/
|
||||
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state);
|
||||
|
||||
/**
|
||||
* Create and schedule a non-blocking fiber.
|
||||
*
|
||||
|
@ -59,6 +59,19 @@
|
||||
*/
|
||||
#define RB_NOGVL_UBF_ASYNC_SAFE (0x2)
|
||||
|
||||
/**
|
||||
* Passing this flag to rb_nogvl() indicates that the passed function
|
||||
* is safe to offload to a background thread or work pool. In other words, the
|
||||
* function is safe to run using a fiber scheduler's `blocking_operation_wait`.
|
||||
* hook.
|
||||
*
|
||||
* If your function depends on thread-local storage, or thread-specific data
|
||||
* operations/data structures, you should not set this flag, as
|
||||
* these operations may behave differently (or fail) when run in a different
|
||||
* thread/context (e.g. unlocking a mutex).
|
||||
*/
|
||||
#define RB_NOGVL_OFFLOAD_SAFE (0x4)
|
||||
|
||||
/** @} */
|
||||
|
||||
RBIMPL_SYMBOL_EXPORT_BEGIN()
|
||||
|
@ -15,7 +15,7 @@ struct rb_io;
|
||||
|
||||
#include "ruby/io.h" /* for rb_io_t */
|
||||
|
||||
#define IO_WITHOUT_GVL(func, arg) rb_thread_call_without_gvl(func, arg, RUBY_UBF_IO, 0)
|
||||
#define IO_WITHOUT_GVL(func, arg) rb_nogvl(func, arg, RUBY_UBF_IO, 0, RB_NOGVL_OFFLOAD_SAFE)
|
||||
#define IO_WITHOUT_GVL_INT(func, arg) (int)(VALUE)IO_WITHOUT_GVL(func, arg)
|
||||
|
||||
/** Ruby's IO, metadata and buffers. */
|
||||
|
63
scheduler.c
63
scheduler.c
@ -13,6 +13,9 @@
|
||||
#include "ruby/io.h"
|
||||
#include "ruby/io/buffer.h"
|
||||
|
||||
#include "ruby/thread.h"
|
||||
|
||||
// For `ruby_thread_has_gvl_p`.
|
||||
#include "internal/thread.h"
|
||||
|
||||
static ID id_close;
|
||||
@ -33,6 +36,8 @@ static ID id_io_close;
|
||||
|
||||
static ID id_address_resolve;
|
||||
|
||||
static ID id_blocking_operation_wait;
|
||||
|
||||
static ID id_fiber_schedule;
|
||||
|
||||
/*
|
||||
@ -109,6 +114,8 @@ Init_Fiber_Scheduler(void)
|
||||
|
||||
id_address_resolve = rb_intern_const("address_resolve");
|
||||
|
||||
id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
|
||||
|
||||
id_fiber_schedule = rb_intern_const("fiber");
|
||||
|
||||
#if 0 /* for RDoc */
|
||||
@ -693,6 +700,62 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
|
||||
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
|
||||
}
|
||||
|
||||
struct rb_blocking_operation_wait_arguments {
|
||||
void *(*function)(void *);
|
||||
void *data;
|
||||
rb_unblock_function_t *unblock_function;
|
||||
void *data2;
|
||||
int flags;
|
||||
|
||||
struct rb_fiber_scheduler_blocking_operation_state *state;
|
||||
};
|
||||
|
||||
static VALUE
|
||||
rb_fiber_scheduler_blocking_operation_wait_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments))
|
||||
{
|
||||
struct rb_blocking_operation_wait_arguments *arguments = (struct rb_blocking_operation_wait_arguments*)_arguments;
|
||||
|
||||
if (arguments->state == NULL) {
|
||||
rb_raise(rb_eRuntimeError, "Blocking function was already invoked!");
|
||||
}
|
||||
|
||||
arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
|
||||
arguments->state->saved_errno = rb_errno();
|
||||
|
||||
// Make sure it's only invoked once.
|
||||
arguments->state = NULL;
|
||||
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Fiber::Scheduler#blocking_operation_wait
|
||||
* call-seq: blocking_operation_wait(work)
|
||||
*
|
||||
* Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
|
||||
*
|
||||
* Minimal suggested implementation is:
|
||||
*
|
||||
* def blocking_operation_wait(work)
|
||||
* Thread.new(&work).join
|
||||
* end
|
||||
*/
|
||||
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
|
||||
{
|
||||
struct rb_blocking_operation_wait_arguments arguments = {
|
||||
.function = function,
|
||||
.data = data,
|
||||
.unblock_function = unblock_function,
|
||||
.data2 = data2,
|
||||
.flags = flags,
|
||||
.state = state
|
||||
};
|
||||
|
||||
VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments);
|
||||
|
||||
return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Fiber::Scheduler#fiber
|
||||
* call-seq: fiber(&block)
|
||||
|
@ -309,6 +309,16 @@ class Scheduler
|
||||
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
|
||||
end.value
|
||||
end
|
||||
|
||||
def blocking_operation_wait(work)
|
||||
thread = Thread.new(&work)
|
||||
|
||||
thread.join
|
||||
|
||||
thread = nil
|
||||
ensure
|
||||
thread&.kill
|
||||
end
|
||||
end
|
||||
|
||||
# This scheduler class implements `io_read` and `io_write` hooks which require
|
||||
|
14
thread.c
14
thread.c
@ -1539,6 +1539,20 @@ rb_nogvl(void *(*func)(void *), void *data1,
|
||||
rb_unblock_function_t *ubf, void *data2,
|
||||
int flags)
|
||||
{
|
||||
if (flags & RB_NOGVL_OFFLOAD_SAFE) {
|
||||
VALUE scheduler = rb_fiber_scheduler_current();
|
||||
if (scheduler != Qnil) {
|
||||
struct rb_fiber_scheduler_blocking_operation_state state;
|
||||
|
||||
VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);
|
||||
|
||||
if (!UNDEF_P(result)) {
|
||||
rb_errno_set(state.saved_errno);
|
||||
return state.result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void *val = 0;
|
||||
rb_execution_context_t *ec = GET_EC();
|
||||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||
|
Loading…
x
Reference in New Issue
Block a user