Introduce Fiber Scheduler blocking_region
hook. (#11963)
This commit is contained in:
parent
550ac2f2ed
commit
87fb44dff6
Notes:
git
2024-10-31 04:26:56 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
6
NEWS.md
6
NEWS.md
@ -57,6 +57,11 @@ Note: We're only listing outstanding class updates.
|
|||||||
associated with the AST node. [[Feature #20624]]
|
associated with the AST node. [[Feature #20624]]
|
||||||
* Add RubyVM::AbstractSyntaxTree::Location class which holds location information. [[Feature #20624]]
|
* Add RubyVM::AbstractSyntaxTree::Location class which holds location information. [[Feature #20624]]
|
||||||
|
|
||||||
|
* Fiber::Scheduler
|
||||||
|
|
||||||
|
* An optional `Fiber::Scheduler#blocking_region` hook allows blocking operations to be moved out of the event loop
|
||||||
|
in order to reduce latency and improve multi-core processor utilization. [[Feature #20855]]
|
||||||
|
|
||||||
## Stdlib updates
|
## Stdlib updates
|
||||||
|
|
||||||
* Tempfile
|
* Tempfile
|
||||||
@ -212,3 +217,4 @@ details of the default gems or bundled gems.
|
|||||||
[Feature #20497]: https://bugs.ruby-lang.org/issues/20497
|
[Feature #20497]: https://bugs.ruby-lang.org/issues/20497
|
||||||
[Feature #20624]: https://bugs.ruby-lang.org/issues/20624
|
[Feature #20624]: https://bugs.ruby-lang.org/issues/20624
|
||||||
[Feature #20775]: https://bugs.ruby-lang.org/issues/20775
|
[Feature #20775]: https://bugs.ruby-lang.org/issues/20775
|
||||||
|
[Feature #20855]: https://bugs.ruby-lang.org/issues/20855
|
||||||
|
@ -16648,6 +16648,7 @@ scheduler.$(OBJEXT): {$(VPATH)}scheduler.c
|
|||||||
scheduler.$(OBJEXT): {$(VPATH)}shape.h
|
scheduler.$(OBJEXT): {$(VPATH)}shape.h
|
||||||
scheduler.$(OBJEXT): {$(VPATH)}st.h
|
scheduler.$(OBJEXT): {$(VPATH)}st.h
|
||||||
scheduler.$(OBJEXT): {$(VPATH)}subst.h
|
scheduler.$(OBJEXT): {$(VPATH)}subst.h
|
||||||
|
scheduler.$(OBJEXT): {$(VPATH)}thread.h
|
||||||
scheduler.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
|
scheduler.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
|
||||||
scheduler.$(OBJEXT): {$(VPATH)}thread_native.h
|
scheduler.$(OBJEXT): {$(VPATH)}thread_native.h
|
||||||
scheduler.$(OBJEXT): {$(VPATH)}vm_core.h
|
scheduler.$(OBJEXT): {$(VPATH)}vm_core.h
|
||||||
|
@ -391,6 +391,26 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io);
|
|||||||
*/
|
*/
|
||||||
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);
|
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);
|
||||||
|
|
||||||
|
struct rb_fiber_scheduler_blocking_region_state {
|
||||||
|
void *result;
|
||||||
|
int saved_errno;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defer the execution of the passed function to the scheduler.
|
||||||
|
*
|
||||||
|
* @param[in] scheduler Target scheduler.
|
||||||
|
* @param[in] function The function to run.
|
||||||
|
* @param[in] data The data to pass to the function.
|
||||||
|
* @param[in] unblock_function The unblock function to use to interrupt the operation.
|
||||||
|
* @param[in] data2 The data to pass to the unblock function.
|
||||||
|
* @param[in] flags Flags passed to `rb_nogvl`.
|
||||||
|
* @param[out] state The result and errno of the operation.
|
||||||
|
* @retval RUBY_Qundef `scheduler` doesn't have `#blocking_region`.
|
||||||
|
* @return otherwise What `scheduler.blocking_region` returns.
|
||||||
|
*/
|
||||||
|
VALUE rb_fiber_scheduler_blocking_region(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_region_state *state);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create and schedule a non-blocking fiber.
|
* Create and schedule a non-blocking fiber.
|
||||||
*
|
*
|
||||||
|
63
scheduler.c
63
scheduler.c
@ -13,6 +13,9 @@
|
|||||||
#include "ruby/io.h"
|
#include "ruby/io.h"
|
||||||
#include "ruby/io/buffer.h"
|
#include "ruby/io/buffer.h"
|
||||||
|
|
||||||
|
#include "ruby/thread.h"
|
||||||
|
|
||||||
|
// For `ruby_thread_has_gvl_p`.
|
||||||
#include "internal/thread.h"
|
#include "internal/thread.h"
|
||||||
|
|
||||||
static ID id_close;
|
static ID id_close;
|
||||||
@ -33,6 +36,8 @@ static ID id_io_close;
|
|||||||
|
|
||||||
static ID id_address_resolve;
|
static ID id_address_resolve;
|
||||||
|
|
||||||
|
static ID id_blocking_region;
|
||||||
|
|
||||||
static ID id_fiber_schedule;
|
static ID id_fiber_schedule;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -109,6 +114,8 @@ Init_Fiber_Scheduler(void)
|
|||||||
|
|
||||||
id_address_resolve = rb_intern_const("address_resolve");
|
id_address_resolve = rb_intern_const("address_resolve");
|
||||||
|
|
||||||
|
id_blocking_region = rb_intern_const("blocking_region");
|
||||||
|
|
||||||
id_fiber_schedule = rb_intern_const("fiber");
|
id_fiber_schedule = rb_intern_const("fiber");
|
||||||
|
|
||||||
#if 0 /* for RDoc */
|
#if 0 /* for RDoc */
|
||||||
@ -693,6 +700,62 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
|
|||||||
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
|
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct rb_blocking_region_arguments {
|
||||||
|
void *(*function)(void *);
|
||||||
|
void *data;
|
||||||
|
rb_unblock_function_t *unblock_function;
|
||||||
|
void *data2;
|
||||||
|
int flags;
|
||||||
|
|
||||||
|
struct rb_fiber_scheduler_blocking_region_state *state;
|
||||||
|
};
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
rb_fiber_scheduler_blocking_region_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments))
|
||||||
|
{
|
||||||
|
struct rb_blocking_region_arguments *arguments = (struct rb_blocking_region_arguments*)_arguments;
|
||||||
|
|
||||||
|
if (arguments->state == NULL) {
|
||||||
|
rb_raise(rb_eRuntimeError, "Blocking function was already invoked!");
|
||||||
|
}
|
||||||
|
|
||||||
|
arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
|
||||||
|
arguments->state->saved_errno = rb_errno();
|
||||||
|
|
||||||
|
// Make sure it's only invoked once.
|
||||||
|
arguments->state = NULL;
|
||||||
|
|
||||||
|
return Qnil;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Document-method: Fiber::Scheduler#blocking_region
|
||||||
|
* call-seq: blocking_region(work)
|
||||||
|
*
|
||||||
|
* Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
|
||||||
|
*
|
||||||
|
* Minimal suggested implementation is:
|
||||||
|
*
|
||||||
|
* def blocking_region(work)
|
||||||
|
* Thread.new(&work).join
|
||||||
|
* end
|
||||||
|
*/
|
||||||
|
VALUE rb_fiber_scheduler_blocking_region(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_region_state *state)
|
||||||
|
{
|
||||||
|
struct rb_blocking_region_arguments arguments = {
|
||||||
|
.function = function,
|
||||||
|
.data = data,
|
||||||
|
.unblock_function = unblock_function,
|
||||||
|
.data2 = data2,
|
||||||
|
.flags = flags,
|
||||||
|
.state = state
|
||||||
|
};
|
||||||
|
|
||||||
|
VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_region_proc, (VALUE)&arguments);
|
||||||
|
|
||||||
|
return rb_check_funcall(scheduler, id_blocking_region, 1, &proc);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Document-method: Fiber::Scheduler#fiber
|
* Document-method: Fiber::Scheduler#fiber
|
||||||
* call-seq: fiber(&block)
|
* call-seq: fiber(&block)
|
||||||
|
@ -309,6 +309,10 @@ class Scheduler
|
|||||||
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
|
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
|
||||||
end.value
|
end.value
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def blocking_region(work)
|
||||||
|
Thread.new(&work).join
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# This scheduler class implements `io_read` and `io_write` hooks which require
|
# This scheduler class implements `io_read` and `io_write` hooks which require
|
||||||
@ -321,8 +325,7 @@ class IOBufferScheduler < Scheduler
|
|||||||
io.nonblock = true
|
io.nonblock = true
|
||||||
|
|
||||||
while true
|
while true
|
||||||
maximum_size = buffer.size - offset
|
result = blocking{buffer.read(io, 0, offset)}
|
||||||
result = blocking{buffer.read(io, maximum_size, offset)}
|
|
||||||
|
|
||||||
if result > 0
|
if result > 0
|
||||||
total += result
|
total += result
|
||||||
@ -349,8 +352,7 @@ class IOBufferScheduler < Scheduler
|
|||||||
io.nonblock = true
|
io.nonblock = true
|
||||||
|
|
||||||
while true
|
while true
|
||||||
maximum_size = buffer.size - offset
|
result = blocking{buffer.write(io, 0, offset)}
|
||||||
result = blocking{buffer.write(io, maximum_size, offset)}
|
|
||||||
|
|
||||||
if result > 0
|
if result > 0
|
||||||
total += result
|
total += result
|
||||||
@ -377,8 +379,7 @@ class IOBufferScheduler < Scheduler
|
|||||||
io.nonblock = true
|
io.nonblock = true
|
||||||
|
|
||||||
while true
|
while true
|
||||||
maximum_size = buffer.size - offset
|
result = blocking{buffer.pread(io, from, 0, offset)}
|
||||||
result = blocking{buffer.pread(io, from, maximum_size, offset)}
|
|
||||||
|
|
||||||
if result > 0
|
if result > 0
|
||||||
total += result
|
total += result
|
||||||
@ -406,8 +407,7 @@ class IOBufferScheduler < Scheduler
|
|||||||
io.nonblock = true
|
io.nonblock = true
|
||||||
|
|
||||||
while true
|
while true
|
||||||
maximum_size = buffer.size - offset
|
result = blocking{buffer.pwrite(io, from, 0, offset)}
|
||||||
result = blocking{buffer.pwrite(io, from, maximum_size, offset)}
|
|
||||||
|
|
||||||
if result > 0
|
if result > 0
|
||||||
total += result
|
total += result
|
||||||
|
@ -153,12 +153,13 @@ class TestFiberIO < Test::Unit::TestCase
|
|||||||
Fiber.set_scheduler scheduler
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
Fiber.schedule do
|
Fiber.schedule do
|
||||||
message = i.read(20)
|
# We add 1 here, to force the read to block (testing that specific code path).
|
||||||
|
message = i.read(MESSAGE.bytesize + 1)
|
||||||
i.close
|
i.close
|
||||||
end
|
end
|
||||||
|
|
||||||
Fiber.schedule do
|
Fiber.schedule do
|
||||||
o.write("Hello World")
|
o.write(MESSAGE)
|
||||||
o.close
|
o.close
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -21,7 +21,8 @@ class TestFiberIOBuffer < Test::Unit::TestCase
|
|||||||
Fiber.set_scheduler scheduler
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
Fiber.schedule do
|
Fiber.schedule do
|
||||||
message = i.read(20)
|
# We add 1 here, to force the read to block (testing that specific code path).
|
||||||
|
message = i.read(MESSAGE.bytesize + 1)
|
||||||
i.close
|
i.close
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -59,12 +59,14 @@ class TestFiberProcess < Test::Unit::TestCase
|
|||||||
|
|
||||||
def test_fork
|
def test_fork
|
||||||
omit 'fork not supported' unless Process.respond_to?(:fork)
|
omit 'fork not supported' unless Process.respond_to?(:fork)
|
||||||
|
|
||||||
|
pid = Process.fork{}
|
||||||
|
|
||||||
Thread.new do
|
Thread.new do
|
||||||
scheduler = Scheduler.new
|
scheduler = Scheduler.new
|
||||||
Fiber.set_scheduler scheduler
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
Fiber.schedule do
|
Fiber.schedule do
|
||||||
pid = Process.fork {}
|
|
||||||
Process.wait(pid)
|
Process.wait(pid)
|
||||||
|
|
||||||
assert_predicate $?, :success?
|
assert_predicate $?, :success?
|
||||||
|
12
thread.c
12
thread.c
@ -1523,6 +1523,18 @@ rb_nogvl(void *(*func)(void *), void *data1,
|
|||||||
rb_unblock_function_t *ubf, void *data2,
|
rb_unblock_function_t *ubf, void *data2,
|
||||||
int flags)
|
int flags)
|
||||||
{
|
{
|
||||||
|
VALUE scheduler = rb_fiber_scheduler_current();
|
||||||
|
if (scheduler != Qnil) {
|
||||||
|
struct rb_fiber_scheduler_blocking_region_state state;
|
||||||
|
|
||||||
|
VALUE result = rb_fiber_scheduler_blocking_region(scheduler, func, data1, ubf, data2, flags, &state);
|
||||||
|
|
||||||
|
if (!UNDEF_P(result)) {
|
||||||
|
rb_errno_set(state.saved_errno);
|
||||||
|
return state.result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void *val = 0;
|
void *val = 0;
|
||||||
rb_execution_context_t *ec = GET_EC();
|
rb_execution_context_t *ec = GET_EC();
|
||||||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user