diff --git a/NEWS.md b/NEWS.md index ac9f282714..3896d23fe5 100644 --- a/NEWS.md +++ b/NEWS.md @@ -57,6 +57,11 @@ Note: We're only listing outstanding class updates. associated with the AST node. [[Feature #20624]] * Add RubyVM::AbstractSyntaxTree::Location class which holds location information. [[Feature #20624]] +* Fiber::Scheduler + + * An optional `Fiber::Scheduler#blocking_region` hook allows blocking operations to be moved out of the event loop + in order to reduce latency and improve multi-core processor utilization. [[Feature #20855]] + ## Stdlib updates * Tempfile @@ -212,3 +217,4 @@ details of the default gems or bundled gems. [Feature #20497]: https://bugs.ruby-lang.org/issues/20497 [Feature #20624]: https://bugs.ruby-lang.org/issues/20624 [Feature #20775]: https://bugs.ruby-lang.org/issues/20775 +[Feature #20855]: https://bugs.ruby-lang.org/issues/20855 diff --git a/common.mk b/common.mk index 4c48f1ebd0..28bb2b7d60 100644 --- a/common.mk +++ b/common.mk @@ -16648,6 +16648,7 @@ scheduler.$(OBJEXT): {$(VPATH)}scheduler.c scheduler.$(OBJEXT): {$(VPATH)}shape.h scheduler.$(OBJEXT): {$(VPATH)}st.h scheduler.$(OBJEXT): {$(VPATH)}subst.h +scheduler.$(OBJEXT): {$(VPATH)}thread.h scheduler.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h scheduler.$(OBJEXT): {$(VPATH)}thread_native.h scheduler.$(OBJEXT): {$(VPATH)}vm_core.h diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h index 8f3d383330..bb18b5e01b 100644 --- a/include/ruby/fiber/scheduler.h +++ b/include/ruby/fiber/scheduler.h @@ -391,6 +391,26 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io); */ VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname); +struct rb_fiber_scheduler_blocking_region_state { + void *result; + int saved_errno; +}; + +/** + * Defer the execution of the passed function to the scheduler. + * + * @param[in] scheduler Target scheduler. + * @param[in] function The function to run. + * @param[in] data The data to pass to the function. + * @param[in] unblock_function The unblock function to use to interrupt the operation. + * @param[in] data2 The data to pass to the unblock function. + * @param[in] flags Flags passed to `rb_nogvl`. + * @param[out] state The result and errno of the operation. + * @retval RUBY_Qundef `scheduler` doesn't have `#blocking_region`. + * @return otherwise What `scheduler.blocking_region` returns. + */ +VALUE rb_fiber_scheduler_blocking_region(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_region_state *state); + /** * Create and schedule a non-blocking fiber. * diff --git a/scheduler.c b/scheduler.c index 3159635dba..0d51a0d951 100644 --- a/scheduler.c +++ b/scheduler.c @@ -13,6 +13,9 @@ #include "ruby/io.h" #include "ruby/io/buffer.h" +#include "ruby/thread.h" + +// For `ruby_thread_has_gvl_p`. #include "internal/thread.h" static ID id_close; @@ -33,6 +36,8 @@ static ID id_io_close; static ID id_address_resolve; +static ID id_blocking_region; + static ID id_fiber_schedule; /* @@ -109,6 +114,8 @@ Init_Fiber_Scheduler(void) id_address_resolve = rb_intern_const("address_resolve"); + id_blocking_region = rb_intern_const("blocking_region"); + id_fiber_schedule = rb_intern_const("fiber"); #if 0 /* for RDoc */ @@ -693,6 +700,62 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) return rb_check_funcall(scheduler, id_address_resolve, 1, arguments); } +struct rb_blocking_region_arguments { + void *(*function)(void *); + void *data; + rb_unblock_function_t *unblock_function; + void *data2; + int flags; + + struct rb_fiber_scheduler_blocking_region_state *state; +}; + +static VALUE +rb_fiber_scheduler_blocking_region_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments)) +{ + struct rb_blocking_region_arguments *arguments = (struct rb_blocking_region_arguments*)_arguments; + + if (arguments->state == NULL) { + rb_raise(rb_eRuntimeError, "Blocking function was already invoked!"); + } + + arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags); + arguments->state->saved_errno = rb_errno(); + + // Make sure it's only invoked once. + arguments->state = NULL; + + return Qnil; +} + +/* + * Document-method: Fiber::Scheduler#blocking_region + * call-seq: blocking_region(work) + * + * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way. + * + * Minimal suggested implementation is: + * + * def blocking_region(work) + * Thread.new(&work).join + * end + */ +VALUE rb_fiber_scheduler_blocking_region(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_region_state *state) +{ + struct rb_blocking_region_arguments arguments = { + .function = function, + .data = data, + .unblock_function = unblock_function, + .data2 = data2, + .flags = flags, + .state = state + }; + + VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_region_proc, (VALUE)&arguments); + + return rb_check_funcall(scheduler, id_blocking_region, 1, &proc); +} + /* * Document-method: Fiber::Scheduler#fiber * call-seq: fiber(&block) diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 3926226ca3..91fba0476e 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -309,6 +309,10 @@ class Scheduler Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq end.value end + + def blocking_region(work) + Thread.new(&work).join + end end # This scheduler class implements `io_read` and `io_write` hooks which require @@ -321,8 +325,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.read(io, maximum_size, offset)} + result = blocking{buffer.read(io, 0, offset)} if result > 0 total += result @@ -349,8 +352,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.write(io, maximum_size, offset)} + result = blocking{buffer.write(io, 0, offset)} if result > 0 total += result @@ -377,8 +379,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.pread(io, from, maximum_size, offset)} + result = blocking{buffer.pread(io, from, 0, offset)} if result > 0 total += result @@ -406,8 +407,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.pwrite(io, from, maximum_size, offset)} + result = blocking{buffer.pwrite(io, from, 0, offset)} if result > 0 total += result diff --git a/test/fiber/test_io.rb b/test/fiber/test_io.rb index 4891c607f7..39e32c5987 100644 --- a/test/fiber/test_io.rb +++ b/test/fiber/test_io.rb @@ -153,12 +153,13 @@ class TestFiberIO < Test::Unit::TestCase Fiber.set_scheduler scheduler Fiber.schedule do - message = i.read(20) + # We add 1 here, to force the read to block (testing that specific code path). + message = i.read(MESSAGE.bytesize + 1) i.close end Fiber.schedule do - o.write("Hello World") + o.write(MESSAGE) o.close end end diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb index a08b1ce1a9..19e6c1f88e 100644 --- a/test/fiber/test_io_buffer.rb +++ b/test/fiber/test_io_buffer.rb @@ -21,7 +21,8 @@ class TestFiberIOBuffer < Test::Unit::TestCase Fiber.set_scheduler scheduler Fiber.schedule do - message = i.read(20) + # We add 1 here, to force the read to block (testing that specific code path). + message = i.read(MESSAGE.bytesize + 1) i.close end diff --git a/test/fiber/test_process.rb b/test/fiber/test_process.rb index a09b070c0a..f17f767704 100644 --- a/test/fiber/test_process.rb +++ b/test/fiber/test_process.rb @@ -59,12 +59,14 @@ class TestFiberProcess < Test::Unit::TestCase def test_fork omit 'fork not supported' unless Process.respond_to?(:fork) + + pid = Process.fork{} + Thread.new do scheduler = Scheduler.new Fiber.set_scheduler scheduler Fiber.schedule do - pid = Process.fork {} Process.wait(pid) assert_predicate $?, :success? diff --git a/thread.c b/thread.c index 2a937ca278..91278e718d 100644 --- a/thread.c +++ b/thread.c @@ -1523,6 +1523,18 @@ rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags) { + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + struct rb_fiber_scheduler_blocking_region_state state; + + VALUE result = rb_fiber_scheduler_blocking_region(scheduler, func, data1, ubf, data2, flags, &state); + + if (!UNDEF_P(result)) { + rb_errno_set(state.saved_errno); + return state.result; + } + } + void *val = 0; rb_execution_context_t *ec = GET_EC(); rb_thread_t *th = rb_ec_thread_ptr(ec);