Use a proper mutex for autoloading features. (#5788)
Object#autoload implements a custom per-thread "mutex" for blocking threads waiting on autoloading a feature. This causes problems when used with the fiber scheduler. We swap the implementation to use a Ruby mutex which is fiber aware.
This commit is contained in:
parent
679b6e43c7
commit
fd6cef79f5
Notes:
git
2022-05-08 07:23:27 +09:00
Merged-By: ioquatix <samuel@codeotaku.com>
3
test/fiber/autoload.rb
Normal file
3
test/fiber/autoload.rb
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
sleep 0.01
|
||||||
|
module TestFiberSchedulerAutoload
|
||||||
|
end
|
@ -104,4 +104,23 @@ class TestFiberScheduler < Test::Unit::TestCase
|
|||||||
|
|
||||||
thread.join
|
thread.join
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_autoload
|
||||||
|
Object.autoload(:TestFiberSchedulerAutoload, File.expand_path("autoload.rb", __dir__))
|
||||||
|
|
||||||
|
thread = Thread.new do
|
||||||
|
scheduler = Scheduler.new
|
||||||
|
Fiber.set_scheduler scheduler
|
||||||
|
|
||||||
|
10.times do
|
||||||
|
Fiber.schedule do
|
||||||
|
Object.const_get(:TestFiberSchedulerAutoload)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
thread.join
|
||||||
|
ensure
|
||||||
|
Object.send(:remove_const, :TestFiberSchedulerAutoload)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
109
variable.c
109
variable.c
@ -2118,8 +2118,7 @@ struct autoload_const {
|
|||||||
struct autoload_state {
|
struct autoload_state {
|
||||||
struct autoload_const *ac;
|
struct autoload_const *ac;
|
||||||
VALUE result;
|
VALUE result;
|
||||||
VALUE thread;
|
VALUE mutex;
|
||||||
struct ccan_list_head waitq;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct autoload_data_i {
|
struct autoload_data_i {
|
||||||
@ -2348,6 +2347,11 @@ autoload_delete(VALUE mod, ID id)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
autoload_by_someone_else(struct autoload_data_i *ele) {
|
||||||
|
return ele->state && ele->state->mutex != Qnil && !rb_mutex_owned_p(ele->state->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
check_autoload_required(VALUE mod, ID id, const char **loadingpath)
|
check_autoload_required(VALUE mod, ID id, const char **loadingpath)
|
||||||
{
|
{
|
||||||
@ -2359,7 +2363,9 @@ check_autoload_required(VALUE mod, ID id, const char **loadingpath)
|
|||||||
if (!load || !(ele = get_autoload_data(load, 0))) {
|
if (!load || !(ele = get_autoload_data(load, 0))) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
file = ele->feature;
|
file = ele->feature;
|
||||||
|
|
||||||
Check_Type(file, T_STRING);
|
Check_Type(file, T_STRING);
|
||||||
if (!RSTRING_LEN(file) || !*RSTRING_PTR(file)) {
|
if (!RSTRING_LEN(file) || !*RSTRING_PTR(file)) {
|
||||||
rb_raise(rb_eArgError, "empty file name");
|
rb_raise(rb_eArgError, "empty file name");
|
||||||
@ -2371,18 +2377,21 @@ check_autoload_required(VALUE mod, ID id, const char **loadingpath)
|
|||||||
* completes. We must wait until autoload_const_set finishes in
|
* completes. We must wait until autoload_const_set finishes in
|
||||||
* the other thread.
|
* the other thread.
|
||||||
*/
|
*/
|
||||||
if (ele->state && ele->state->thread != rb_thread_current()) {
|
if (autoload_by_someone_else(ele)) {
|
||||||
return load;
|
return load;
|
||||||
}
|
}
|
||||||
|
|
||||||
loading = RSTRING_PTR(file);
|
loading = RSTRING_PTR(file);
|
||||||
|
|
||||||
if (!rb_feature_provided(loading, &loading)) {
|
if (!rb_feature_provided(loading, &loading)) {
|
||||||
return load;
|
return load;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (loadingpath && loading) {
|
if (loadingpath && loading) {
|
||||||
*loadingpath = loading;
|
*loadingpath = loading;
|
||||||
return load;
|
return load;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2403,6 +2412,11 @@ rb_autoloading_value(VALUE mod, ID id, VALUE* value, rb_const_flag_t *flag)
|
|||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
autoload_by_current(struct autoload_data_i *ele) {
|
||||||
|
return ele->state && ele->state->mutex != Qnil && rb_mutex_owned_p(ele->state->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
struct autoload_const *
|
struct autoload_const *
|
||||||
autoloading_const_entry(VALUE mod, ID id)
|
autoloading_const_entry(VALUE mod, ID id)
|
||||||
{
|
{
|
||||||
@ -2414,11 +2428,12 @@ autoloading_const_entry(VALUE mod, ID id)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ele->state && ele->state->thread == rb_thread_current()) {
|
if (autoload_by_current(ele)) {
|
||||||
if (ac->value != Qundef) {
|
if (ac->value != Qundef) {
|
||||||
return ac;
|
return ac;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2460,8 +2475,7 @@ autoload_require(VALUE arg)
|
|||||||
|
|
||||||
ele = rb_check_typeddata(ac->ad, &autoload_data_i_type);
|
ele = rb_check_typeddata(ac->ad, &autoload_data_i_type);
|
||||||
/* this may release GVL and switch threads: */
|
/* this may release GVL and switch threads: */
|
||||||
state->result = rb_funcall(rb_vm_top_self(), rb_intern("require"), 1,
|
state->result = rb_funcall(rb_vm_top_self(), rb_intern("require"), 1, ele->feature);
|
||||||
ele->feature);
|
|
||||||
|
|
||||||
return state->result;
|
return state->result;
|
||||||
}
|
}
|
||||||
@ -2470,17 +2484,19 @@ static VALUE
|
|||||||
autoload_reset(VALUE arg)
|
autoload_reset(VALUE arg)
|
||||||
{
|
{
|
||||||
struct autoload_state *state = (struct autoload_state *)arg;
|
struct autoload_state *state = (struct autoload_state *)arg;
|
||||||
int need_wakeups = 0;
|
|
||||||
struct autoload_const *ac = state->ac;
|
struct autoload_const *ac = state->ac;
|
||||||
struct autoload_data_i *ele;
|
struct autoload_data_i *ele;
|
||||||
|
|
||||||
ele = rb_check_typeddata(ac->ad, &autoload_data_i_type);
|
ele = rb_check_typeddata(ac->ad, &autoload_data_i_type);
|
||||||
|
VALUE mutex = state->mutex;
|
||||||
|
|
||||||
if (ele->state == state) {
|
if (ele->state == state) {
|
||||||
need_wakeups = 1;
|
|
||||||
ele->state = 0;
|
ele->state = 0;
|
||||||
ele->fork_gen = 0;
|
ele->fork_gen = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rb_mutex_unlock(mutex);
|
||||||
|
|
||||||
/* At the last, move a value defined in autoload to constant table */
|
/* At the last, move a value defined in autoload to constant table */
|
||||||
if (RTEST(state->result)) {
|
if (RTEST(state->result)) {
|
||||||
struct autoload_const *next;
|
struct autoload_const *next;
|
||||||
@ -2492,59 +2508,13 @@ autoload_reset(VALUE arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* wakeup any waiters we had */
|
|
||||||
if (need_wakeups) {
|
|
||||||
struct autoload_state *cur = 0, *nxt;
|
|
||||||
|
|
||||||
ccan_list_for_each_safe(&state->waitq, cur, nxt, waitq.n) {
|
|
||||||
VALUE th = cur->thread;
|
|
||||||
|
|
||||||
cur->thread = Qfalse;
|
|
||||||
ccan_list_del_init(&cur->waitq.n); /* idempotent */
|
|
||||||
|
|
||||||
/*
|
|
||||||
* cur is stored on the stack of cur->waiting_th,
|
|
||||||
* do not touch cur after waking up waiting_th
|
|
||||||
*/
|
|
||||||
rb_thread_wakeup_alive(th);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0; /* ignored */
|
return 0; /* ignored */
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
|
||||||
autoload_sleep(VALUE arg)
|
|
||||||
{
|
|
||||||
struct autoload_state *state = (struct autoload_state *)arg;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* autoload_reset in other thread will resume us and remove us
|
|
||||||
* from the waitq list
|
|
||||||
*/
|
|
||||||
do {
|
|
||||||
rb_thread_sleep_deadly();
|
|
||||||
} while (state->thread != Qfalse);
|
|
||||||
|
|
||||||
return Qfalse;
|
|
||||||
}
|
|
||||||
|
|
||||||
static VALUE
|
|
||||||
autoload_sleep_done(VALUE arg)
|
|
||||||
{
|
|
||||||
struct autoload_state *state = (struct autoload_state *)arg;
|
|
||||||
|
|
||||||
if (state->thread != Qfalse && rb_thread_to_be_killed(state->thread)) {
|
|
||||||
ccan_list_del(&state->waitq.n); /* idempotent after list_del_init */
|
|
||||||
}
|
|
||||||
|
|
||||||
return Qfalse;
|
|
||||||
}
|
|
||||||
|
|
||||||
VALUE
|
VALUE
|
||||||
rb_autoload_load(VALUE mod, ID id)
|
rb_autoload_load(VALUE mod, ID id)
|
||||||
{
|
{
|
||||||
VALUE load, result;
|
VALUE load;
|
||||||
const char *loading = 0, *src;
|
const char *loading = 0, *src;
|
||||||
struct autoload_data_i *ele;
|
struct autoload_data_i *ele;
|
||||||
struct autoload_const *ac;
|
struct autoload_const *ac;
|
||||||
@ -2570,32 +2540,26 @@ rb_autoload_load(VALUE mod, ID id)
|
|||||||
if (!(ele = get_autoload_data(load, &ac))) {
|
if (!(ele = get_autoload_data(load, &ac))) {
|
||||||
return Qfalse;
|
return Qfalse;
|
||||||
}
|
}
|
||||||
|
|
||||||
state.ac = ac;
|
state.ac = ac;
|
||||||
state.thread = rb_thread_current();
|
|
||||||
if (!ele->state) {
|
if (!ele->state) {
|
||||||
ele->state = &state;
|
ele->state = &state;
|
||||||
|
ele->state->mutex = rb_mutex_new();
|
||||||
ele->fork_gen = GET_VM()->fork_gen;
|
ele->fork_gen = GET_VM()->fork_gen;
|
||||||
|
|
||||||
/*
|
|
||||||
* autoload_reset will wake up any threads added to this
|
|
||||||
* if and only if the GVL is released during autoload_require
|
|
||||||
*/
|
|
||||||
ccan_list_head_init(&state.waitq);
|
|
||||||
}
|
}
|
||||||
else if (state.thread == ele->state->thread) {
|
else if (rb_mutex_owned_p(ele->state->mutex)) {
|
||||||
return Qfalse;
|
return Qfalse;
|
||||||
|
} else {
|
||||||
|
state.mutex = ele->state->mutex;
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
ccan_list_add_tail(&ele->state->waitq, &state.waitq.n);
|
|
||||||
|
|
||||||
rb_ensure(autoload_sleep, (VALUE)&state,
|
// Block all other threads that come here until we are done in autoload_reset. At that point, all threads can continue. Current implementation prevents threads from executing in parallel even though at that point there are no data races.
|
||||||
autoload_sleep_done, (VALUE)&state);
|
rb_mutex_lock(state.mutex);
|
||||||
}
|
|
||||||
|
|
||||||
/* autoload_data_i can be deleted by another thread while require */
|
/* autoload_data_i can be deleted by another thread while require */
|
||||||
state.result = Qfalse;
|
state.result = Qfalse;
|
||||||
result = rb_ensure(autoload_require, (VALUE)&state,
|
VALUE result = rb_ensure(autoload_require, (VALUE)&state, autoload_reset, (VALUE)&state);
|
||||||
autoload_reset, (VALUE)&state);
|
|
||||||
|
|
||||||
if (!(ce = rb_const_lookup(mod, id)) || ce->value == Qundef) {
|
if (!(ce = rb_const_lookup(mod, id)) || ce->value == Qundef) {
|
||||||
rb_const_remove(mod, id);
|
rb_const_remove(mod, id);
|
||||||
@ -2603,7 +2567,9 @@ rb_autoload_load(VALUE mod, ID id)
|
|||||||
else if (flag > 0) {
|
else if (flag > 0) {
|
||||||
ce->flag |= flag;
|
ce->flag |= flag;
|
||||||
}
|
}
|
||||||
|
|
||||||
RB_GC_GUARD(load);
|
RB_GC_GUARD(load);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3197,8 +3163,9 @@ current_autoload_data(VALUE mod, ID id, struct autoload_const **acp)
|
|||||||
if (!load) return 0;
|
if (!load) return 0;
|
||||||
ele = get_autoload_data(load, acp);
|
ele = get_autoload_data(load, acp);
|
||||||
if (!ele) return 0;
|
if (!ele) return 0;
|
||||||
|
|
||||||
/* for autoloading thread, keep the defined value to autoloading storage */
|
/* for autoloading thread, keep the defined value to autoloading storage */
|
||||||
if (ele->state && (ele->state->thread == rb_thread_current())) {
|
if (autoload_by_current(ele)) {
|
||||||
return ele;
|
return ele;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user