Get ractor message passing working with > 1 thread sending/receiving values in same ractor

Rework ractors so that any ractor action (Ractor.receive, Ractor#send, Ractor.yield, Ractor#take,
Ractor.select) will operate on the thread that called the action. It will put that thread to sleep if
it's a blocking function and it needs to put it to sleep, and the awakening action (Ractor.yield,
Ractor#send) will wake up the blocked thread.

Before this change every blocking ractor action was associated with the ractor struct and its fields.
If a ractor called Ractor.receive, its wait status was wait_receiving, and when another ractor calls
r.send on it, it will look for that status in the ractor struct fields and wake it up. The problem was that
what if 2 threads call blocking ractor actions in the same ractor. Imagine if 1 thread has called Ractor.receive
and another r.take. Then, when a different ractor calls r.send on it, it doesn't know which ruby thread is associated
to which ractor action, so what ruby thread should it schedule? This change moves some fields onto the ruby thread
itself so that ruby threads are the ones that have ractor blocking statuses, and threads are then specifically scheduled
when unblocked.

Fixes [#17624]
Fixes [#21037]
This commit is contained in:
Luke Gruber 2025-05-12 18:03:22 -04:00 committed by Aaron Patterson
parent 2fee379f8f
commit 1d4822a175
Notes: git 2025-05-13 20:24:09 +00:00
8 changed files with 352 additions and 137 deletions

View File

@ -544,7 +544,7 @@ assert_equal '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]', %q{
}.sort
}
# an exception in a Ractor will be re-raised at Ractor#receive
# an exception in a Ractor main thread will be re-raised at Ractor#receive
assert_equal '[RuntimeError, "ok", true]', %q{
r = Ractor.new do
raise 'ok' # exception will be transferred receiver
@ -558,6 +558,18 @@ assert_equal '[RuntimeError, "ok", true]', %q{
end
}
# an exception in a Ractor non-main thread will not be re-raised at Ractor#receive
assert_equal 'ok', %q{
r = Ractor.new do
Thread.new do
raise 'ng'
end
sleep 0.1
'ok'
end
r.take
}
# threads in a ractor will killed
assert_equal '{ok: 3}', %q{
Ractor.new Ractor.current do |main|
@ -2294,3 +2306,97 @@ assert_equal 'ok', %q{
'ok'
}
# There are some bugs in Windows with multiple threads in same ractor calling ractor actions
# Ex: https://github.com/ruby/ruby/actions/runs/14998660285/job/42139383905
unless /mswin/ =~ RUBY_PLATFORM
# r.send and r.take from multiple threads
# [Bug #21037]
assert_equal '[true, true]', %q{
class Map
def initialize
@r = Ractor.new {
loop do
key = Ractor.receive
Ractor.yield key
end
}
end
def fetch(key)
@r.send key
@r.take
end
end
tm = Map.new
t1 = Thread.new { 10.times.map { tm.fetch("t1") } }
t2 = Thread.new { 10.times.map { tm.fetch("t2") } }
vals = t1.value + t2.value
[
vals.first(10).all? { |v| v == "t1" },
vals.last(10).all? { |v| v == "t2" }
]
}
# r.send and Ractor.select from multiple threads
assert_equal '[true, true]', %q{
class Map
def initialize
@r = Ractor.new {
loop do
key = Ractor.receive
Ractor.yield key
end
}
end
def fetch(key)
@r.send key
_r, val = Ractor.select(@r)
val
end
end
tm = Map.new
t1 = Thread.new { 10.times.map { tm.fetch("t1") } }
t2 = Thread.new { 10.times.map { tm.fetch("t2") } }
vals = t1.value + t2.value
[
vals.first(10).all? { |v| v == "t1" },
vals.last(10).all? { |v| v == "t2" }
]
}
# Ractor.receive in multiple threads in same ractor
# [Bug #17624]
assert_equal '["T1 received", "T2 received"]', %q{
r1 = Ractor.new do
output = []
m = Mutex.new
# Start two listener threads
t1 = Thread.new do
Ractor.receive
m.synchronize do
output << "T1 received"
end
end
t2 = Thread.new do
Ractor.receive
m.synchronize do
output << "T2 received"
end
end
sleep 0.1 until [t1,t2].all? { |t| t.status == "sleep" }
Ractor.main.send(:both_blocking)
[t1, t2].each(&:join)
output
end
Ractor.receive # wait until both threads have blocked
r1.send(1)
r1.send(2)
r1.take.sort
}
end

291
ractor.c
View File

@ -210,8 +210,6 @@ ractor_mark(void *ptr)
ractor_queue_mark(&r->sync.recv_queue);
ractor_queue_mark(&r->sync.takers_queue);
rb_gc_mark(r->receiving_mutex);
rb_gc_mark(r->loc);
rb_gc_mark(r->name);
rb_gc_mark(r->r_stdin);
@ -242,9 +240,6 @@ ractor_free(void *ptr)
rb_ractor_t *r = (rb_ractor_t *)ptr;
RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
rb_native_mutex_destroy(&r->sync.lock);
#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_destroy(&r->sync.cond);
#endif
ractor_queue_free(&r->sync.recv_queue);
ractor_queue_free(&r->sync.takers_queue);
ractor_local_storage_free(r);
@ -432,6 +427,7 @@ ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_ba
}
rq->size *= 2;
}
// copy basket into queue
rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
// fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
}
@ -470,6 +466,7 @@ ractor_basket_accept(struct rb_ractor_basket *b)
{
VALUE v = ractor_basket_value(b);
// a ractor's main thread had an error and yielded us this exception during its dying moments
if (b->p.send.exception) {
VALUE cause = v;
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
@ -534,39 +531,59 @@ basket_type_name(enum rb_ractor_basket_type type)
}
#endif // USE_RUBY_DEBUG_LOG
static bool
ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
static rb_thread_t *
ractor_sleeping_by(const rb_ractor_t *r, rb_thread_t *th, enum rb_ractor_wait_status wait_status)
{
return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
if (th) {
if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) {
return th;
}
} else {
// find any thread that has this ractor wait status that is blocked
ccan_list_for_each(&r->sync.wait.waiting_threads, th, ractor_waiting.waiting_node) {
if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) {
return th;
}
}
}
return NULL;
}
#ifdef RUBY_THREAD_PTHREAD_H
// thread_*.c
void rb_ractor_sched_wakeup(rb_ractor_t *r);
void rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th);
#else
// win32
static void
rb_ractor_sched_wakeup(rb_ractor_t *r)
rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th)
{
rb_native_cond_broadcast(&r->sync.cond);
(void)r;
ASSERT_ractor_locking(r);
rb_native_cond_signal(&th->ractor_waiting.cond);
}
#endif
/*
* Wakeup `r` if the given `th` is blocked and has the given ractor `wait_status`.
* Wakeup any blocked thread in `r` with the given ractor `wait_status` if `th` is NULL.
*/
static bool
ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
ractor_wakeup(rb_ractor_t *r, rb_thread_t *th /* can be NULL */, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
{
ASSERT_ractor_locking(r);
RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s",
rb_ractor_id(r),
wait_status_str(r->sync.wait.status),
wait_status_str(th->ractor_waiting.wait_status),
wait_status_str(wait_status),
wakeup_status_str(wakeup_status));
if (ractor_sleeping_by(r, wait_status)) {
r->sync.wait.wakeup_status = wakeup_status;
rb_ractor_sched_wakeup(r);
if ((th = ractor_sleeping_by(r, th, wait_status)) != NULL) {
th->ractor_waiting.wakeup_status = wakeup_status;
rb_ractor_sched_wakeup(r, th);
return true;
}
else {
@ -574,27 +591,33 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra
}
}
// unblock function (UBF). This gets called when another thread on this or another ractor sets our thread's interrupt flag.
// This is not async-safe.
static void
ractor_sleep_interrupt(void *ptr)
{
rb_ractor_t *r = ptr;
rb_execution_context_t *ec = ptr;
rb_ractor_t *r = rb_ec_ractor_ptr(ec);
rb_thread_t *th = rb_ec_thread_ptr(ec);
RACTOR_LOCK(r);
{
ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
ractor_wakeup(r, th, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
}
RACTOR_UNLOCK(r);
}
typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
// Checks the current thread for ruby interrupts and runs the cleanup function `cf_func` with `cf_data` if
// `rb_ec_check_ints` is going to raise. See the `rb_threadptr_execute_interrupts` for info on when it can raise.
static void
ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data)
ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, ractor_sleep_cleanup_function cf_func, void *cf_data)
{
if (cr->sync.wait.status != wait_none) {
enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status;
cr->sync.wait.status = wait_none;
cr->sync.wait.wakeup_status = wakeup_by_interrupt;
if (cur_th->ractor_waiting.wait_status != wait_none) {
enum rb_ractor_wait_status prev_wait_status = cur_th->ractor_waiting.wait_status;
cur_th->ractor_waiting.wait_status = wait_none;
cur_th->ractor_waiting.wakeup_status = wakeup_by_interrupt;
RACTOR_UNLOCK(cr);
{
@ -607,7 +630,7 @@ ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_clea
EC_POP_TAG();
if (state) {
(*cf_func)(cr, cf_data);
(*cf_func)(cr, cf_data); // cleanup function is run after the ubf, if it had ubf
EC_JUMP_TAG(ec, state);
}
}
@ -616,9 +639,8 @@ ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_clea
}
}
// reachable?
RACTOR_LOCK(cr);
cr->sync.wait.status = prev_wait_status;
cur_th->ractor_waiting.wait_status = prev_wait_status;
}
}
@ -626,15 +648,14 @@ ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_clea
void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
#else
// win32
static void
ractor_cond_wait(rb_ractor_t *r)
ractor_cond_wait(rb_ractor_t *r, rb_thread_t *th)
{
#if RACTOR_CHECK_MODE > 0
VALUE locked_by = r->sync.locked_by;
r->sync.locked_by = Qnil;
#endif
rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
rb_native_cond_wait(&th->ractor_waiting.cond, &r->sync.lock);
#if RACTOR_CHECK_MODE > 0
r->sync.locked_by = locked_by;
@ -645,77 +666,99 @@ static void *
ractor_sleep_wo_gvl(void *ptr)
{
rb_ractor_t *cr = ptr;
rb_execution_context_t *ec = cr->threads.running_ec;
VM_ASSERT(GET_EC() == ec);
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
RACTOR_LOCK_SELF(cr);
{
VM_ASSERT(cr->sync.wait.status != wait_none);
if (cr->sync.wait.wakeup_status == wakeup_none) {
ractor_cond_wait(cr);
VM_ASSERT(cur_th->ractor_waiting.wait_status != wait_none);
// it's possible that another ractor has woken us up (ractor_wakeup),
// so check this condition
if (cur_th->ractor_waiting.wakeup_status == wakeup_none) {
cur_th->status = THREAD_STOPPED_FOREVER;
ractor_cond_wait(cr, cur_th);
cur_th->status = THREAD_RUNNABLE;
VM_ASSERT(cur_th->ractor_waiting.wakeup_status != wakeup_none);
} else {
RUBY_DEBUG_LOG("rare timing, no cond wait");
}
cr->sync.wait.status = wait_none;
cur_th->ractor_waiting.wait_status = wait_none;
}
RACTOR_UNLOCK_SELF(cr);
return NULL;
}
static void
rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf)
rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_ractor_sleep_interrupt)
{
ASSERT_ractor_locking(cr);
rb_thread_t *th = rb_ec_thread_ptr(ec);
struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node;
VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked
ccan_list_add(&cr->sync.wait.waiting_threads, waitn);
RACTOR_UNLOCK(cr);
{
rb_nogvl(ractor_sleep_wo_gvl, cr,
ubf, cr,
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
rb_nogvl(ractor_sleep_wo_gvl, cr, ubf_ractor_sleep_interrupt, ec, RB_NOGVL_INTR_FAIL);
}
RACTOR_LOCK(cr);
ccan_list_del_init(waitn);
}
#endif
/*
* Sleep the current ractor's current thread until another ractor wakes us up or another thread calls our unblock function.
* The following ractor actions can cause this function to be called:
* Ractor#take (wait_taking)
* Ractor.yield (wait_yielding)
* Ractor.receive (wait_receiving)
* Ractor.select (can be a combination of the above wait states, depending on the states of the ractors passed to Ractor.select)
*/
static enum rb_ractor_wakeup_status
ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status,
ractor_sleep_cleanup_function cf_func, void *cf_data)
{
ASSERT_ractor_locking(cr);
enum rb_ractor_wakeup_status wakeup_status;
VM_ASSERT(GET_RACTOR() == cr);
// TODO: multi-threads
VM_ASSERT(cr->sync.wait.status == wait_none);
VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none);
VM_ASSERT(wait_status != wait_none);
cr->sync.wait.status = wait_status;
cr->sync.wait.wakeup_status = wakeup_none;
cur_th->ractor_waiting.wait_status = wait_status;
cur_th->ractor_waiting.wakeup_status = wakeup_none;
// fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
// wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
while (cr->sync.wait.wakeup_status == wakeup_none) {
while (cur_th->ractor_waiting.wakeup_status == wakeup_none) {
rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
ractor_check_ints(ec, cr, cf_func, cf_data);
ractor_check_ints(ec, cr, cur_th, cf_func, cf_data);
}
cr->sync.wait.status = wait_none;
cur_th->ractor_waiting.wait_status = wait_none;
// TODO: multi-thread
wakeup_status = cr->sync.wait.wakeup_status;
cr->sync.wait.wakeup_status = wakeup_none;
wakeup_status = cur_th->ractor_waiting.wakeup_status;
cur_th->ractor_waiting.wakeup_status = wakeup_none;
RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status));
ASSERT_ractor_locking(cr);
return wakeup_status;
}
static enum rb_ractor_wakeup_status
ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status)
ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status)
{
return ractor_sleep_with_cleanup(ec, cr, wait_status, 0, NULL);
return ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, 0, NULL);
}
// Ractor.receive
static void
ractor_recursive_receive_if(rb_ractor_t *r)
ractor_recursive_receive_if(rb_thread_t *th)
{
if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
if (th->ractor_waiting.receiving_mutex && rb_mutex_owned_p(th->ractor_waiting.receiving_mutex)) {
rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
}
}
@ -724,7 +767,7 @@ static VALUE
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
{
struct rb_ractor_basket basket;
ractor_recursive_receive_if(cr);
ractor_recursive_receive_if(rb_ec_thread_ptr(ec));
bool received = false;
RACTOR_LOCK_SELF(cr);
@ -749,12 +792,13 @@ static void
ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
{
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
ractor_recursive_receive_if(cr);
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
ractor_recursive_receive_if(cur_th);
RACTOR_LOCK(cr);
{
while (ractor_queue_empty_p(cr, rq) && !cr->sync.incoming_port_closed) {
ractor_sleep(ec, cr, wait_receiving);
ractor_sleep(ec, cr, cur_th, wait_receiving);
}
}
RACTOR_UNLOCK(cr);
@ -791,6 +835,7 @@ rq_dump(struct rb_ractor_queue *rq)
struct receive_block_data {
rb_ractor_t *cr;
rb_thread_t *th;
struct rb_ractor_queue *rq;
VALUE v;
int index;
@ -798,11 +843,11 @@ struct receive_block_data {
};
static void
ractor_receive_if_lock(rb_ractor_t *cr)
ractor_receive_if_lock(rb_thread_t *th)
{
VALUE m = cr->receiving_mutex;
VALUE m = th->ractor_waiting.receiving_mutex;
if (m == Qfalse) {
m = cr->receiving_mutex = rb_mutex_new();
m = th->ractor_waiting.receiving_mutex = rb_mutex_new();
}
rb_mutex_lock(m);
}
@ -812,7 +857,7 @@ receive_if_body(VALUE ptr)
{
struct receive_block_data *data = (struct receive_block_data *)ptr;
ractor_receive_if_lock(data->cr);
ractor_receive_if_lock(data->th);
VALUE block_result = rb_yield(data->v);
rb_ractor_t *cr = data->cr;
@ -847,6 +892,7 @@ receive_if_ensure(VALUE v)
{
struct receive_block_data *data = (struct receive_block_data *)v;
rb_ractor_t *cr = data->cr;
rb_thread_t *cur_th = data->th;
if (!data->success) {
RACTOR_LOCK_SELF(cr);
@ -859,7 +905,7 @@ receive_if_ensure(VALUE v)
RACTOR_UNLOCK_SELF(cr);
}
rb_mutex_unlock(cr->receiving_mutex);
rb_mutex_unlock(cur_th->ractor_waiting.receiving_mutex);
return Qnil;
}
@ -869,6 +915,7 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
unsigned int serial = (unsigned int)-1;
int index = 0;
struct rb_ractor_queue *rq = &cr->sync.recv_queue;
@ -902,6 +949,7 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
if (!UNDEF_P(v)) {
struct receive_block_data data = {
.cr = cr,
.th = cur_th,
.rq = rq,
.v = v,
.index = index,
@ -931,7 +979,8 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_
}
else {
ractor_queue_enq(r, &r->sync.recv_queue, b);
ractor_wakeup(r, wait_receiving, wakeup_by_send);
// wakeup any receiving thread in `r`
ractor_wakeup(r, NULL, wait_receiving, wakeup_by_send);
}
}
RACTOR_UNLOCK(r);
@ -970,40 +1019,43 @@ ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum
}
static void
ractor_basket_fill_(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
ractor_basket_fill_(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc)
{
VM_ASSERT(cr == GET_RACTOR());
basket->sender = cr->pub.self;
basket->sending_th = cur_th;
basket->p.send.exception = exc;
basket->p.send.v = obj;
}
static void
ractor_basket_fill(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
ractor_basket_fill(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
{
VALUE v;
enum rb_ractor_basket_type type;
ractor_basket_prepare_contents(obj, move, &v, &type);
ractor_basket_fill_(cr, basket, v, exc);
ractor_basket_fill_(cr, cur_th, basket, v, exc);
basket->type.e = type;
}
static void
ractor_basket_fill_will(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
ractor_basket_fill_will(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc)
{
ractor_basket_fill_(cr, basket, obj, exc);
ractor_basket_fill_(cr, cur_th, basket, obj, exc);
basket->type.e = basket_type_will;
}
static VALUE
ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
ractor_send(rb_execution_context_t *ec, rb_ractor_t *recv_r, VALUE obj, VALUE move)
{
struct rb_ractor_basket basket;
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
// TODO: Ractor local GC
ractor_basket_fill(rb_ec_ractor_ptr(ec), &basket, obj, move, false);
ractor_send_basket(ec, r, &basket);
return r->pub.self;
ractor_basket_fill(cr, cur_th, &basket, obj, move, false);
ractor_send_basket(ec, recv_r, &basket);
return recv_r->pub.self;
}
// Ractor#take
@ -1048,15 +1100,16 @@ ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b)
}
static bool
ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
ractor_register_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error)
{
struct rb_ractor_basket b = {
.type.e = basket_type_take_basket,
.sender = cr->pub.self,
.sending_th = cur_th,
.p = {
.take = {
.basket = take_basket,
.basket = take_basket, // pointer to our stack value saved in ractor `r` queue
.config = config,
},
},
@ -1081,7 +1134,8 @@ ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *t
ractor_queue_enq(r, &r->sync.takers_queue, &b);
if (basket_none_p(take_basket)) {
ractor_wakeup(r, wait_yielding, wakeup_by_take);
// wakeup any thread in `r` that has yielded, if there is any.
ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take);
}
}
}
@ -1126,17 +1180,18 @@ ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket)
}
static VALUE
ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
ractor_try_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *recv_r, struct rb_ractor_basket *take_basket)
{
bool taken;
RACTOR_LOCK_SELF(cr);
{
// If it hasn't yielded yet or is currently in the process of yielding, sleep more
if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
taken = false;
}
else {
taken = true;
taken = true; // basket type might be, for ex, basket_type_copy if value was copied during yield
}
}
RACTOR_UNLOCK_SELF(cr);
@ -1144,7 +1199,7 @@ ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_b
if (taken) {
RUBY_DEBUG_LOG("taken");
if (basket_type_p(take_basket, basket_type_deleted)) {
VM_ASSERT(r->sync.outgoing_port_closed);
VM_ASSERT(recv_r->sync.outgoing_port_closed);
rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
}
return ractor_basket_accept(take_basket);
@ -1179,6 +1234,7 @@ ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *
}
#endif
// cleanup function, cr is unlocked
static void
ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb)
{
@ -1208,7 +1264,7 @@ ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr)
}
static void
ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
{
struct take_wait_take_cleanup_data data = {
.r = r,
@ -1218,32 +1274,33 @@ ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, st
RACTOR_LOCK_SELF(cr);
{
if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
ractor_sleep_with_cleanup(ec, cr, wait_taking, ractor_wait_take_cleanup, &data);
ractor_sleep_with_cleanup(ec, cr, cur_th, wait_taking, ractor_wait_take_cleanup, &data);
}
}
RACTOR_UNLOCK_SELF(cr);
}
static VALUE
ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
ractor_take(rb_execution_context_t *ec, rb_ractor_t *recv_r)
{
RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r));
RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(recv_r));
VALUE v;
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
struct rb_ractor_basket take_basket = {
.type.e = basket_type_none,
.sender = 0,
};
ractor_register_take(cr, r, &take_basket, true, NULL, false);
ractor_register_take(cr, cur_th, recv_r, &take_basket, true, NULL, false);
while (UNDEF_P(v = ractor_try_take(cr, r, &take_basket))) {
ractor_wait_take(ec, cr, r, &take_basket);
while (UNDEF_P(v = ractor_try_take(cr, cur_th, recv_r, &take_basket))) {
ractor_wait_take(ec, cr, cur_th, recv_r, &take_basket);
}
VM_ASSERT(!basket_none_p(&take_basket));
VM_ASSERT(!ractor_check_specific_take_basket_lock(r, &take_basket));
VM_ASSERT(!basket_none_p(&take_basket)); // might be, for ex, basket_type_copy
VM_ASSERT(!ractor_check_specific_take_basket_lock(recv_r, &take_basket));
return v;
}
@ -1266,6 +1323,7 @@ ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs)
return false;
}
// Find another ractor that is taking from this ractor, so we can yield to it
static bool
ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b)
{
@ -1276,11 +1334,11 @@ ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ra
RACTOR_LOCK_SELF(cr);
{
while (ractor_queue_deq(cr, rs, b)) {
if (basket_type_p(b, basket_type_take_basket)) {
if (basket_type_p(b, basket_type_take_basket)) { // some other ractor is taking
struct rb_ractor_basket *tb = b->p.take.basket;
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
found = true;
found = true; // payload basket is now "yielding" type
break;
}
else {
@ -1307,25 +1365,30 @@ ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ra
return found;
}
// Try yielding to a taking ractor
static bool
ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will)
{
// Don't lock yielding ractor at same time as taking ractor. This could deadlock due to timing
// issue because we don't have a lock hierarchy.
ASSERT_ractor_unlocking(cr);
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
struct rb_ractor_basket b;
if (ractor_deq_take_basket(cr, ts, &b)) {
if (ractor_deq_take_basket(cr, ts, &b)) { // deq a take basket from takers queue of `cr` into `b`
VM_ASSERT(basket_type_p(&b, basket_type_take_basket));
VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding));
rb_ractor_t *tr = RACTOR_PTR(b.sender);
struct rb_ractor_basket *tb = b.p.take.basket;
rb_ractor_t *tr = RACTOR_PTR(b.sender); // taking ractor
rb_thread_t *tr_th = b.sending_th; // taking thread
struct rb_ractor_basket *tb = b.p.take.basket; // payload basket
enum rb_ractor_basket_type type;
RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr));
if (is_will) {
type = basket_type_will;
type = basket_type_will; // last message
}
else {
enum ruby_tag_type state;
@ -1337,7 +1400,7 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_q
ractor_basket_prepare_contents(obj, move, &obj, &type);
}
EC_POP_TAG();
// rescue
// rescue ractor copy/move error, then re-raise
if (state) {
RACTOR_LOCK_SELF(cr);
{
@ -1354,11 +1417,11 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_q
VM_ASSERT(basket_type_p(tb, basket_type_yielding));
// fill atomic
RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr));
ractor_basket_fill_(cr, tb, obj, exc);
ractor_basket_fill_(cr, cur_th, tb, obj, exc); // fill the take basket payload
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) {
rb_bug("unreachable");
}
ractor_wakeup(tr, wait_taking, wakeup_by_yield);
ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_yield);
}
RACTOR_UNLOCK(tr);
@ -1376,15 +1439,17 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_q
static void
ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts)
{
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
RACTOR_LOCK_SELF(cr);
{
while (!ractor_check_take_basket(cr, ts) && !cr->sync.outgoing_port_closed) {
ractor_sleep(ec, cr, wait_yielding);
ractor_sleep(ec, cr, cur_th, wait_yielding);
}
}
RACTOR_UNLOCK_SELF(cr);
}
// In order to yield, we wait until our takers queue has at least one element. Then, we wakeup a taker.
static VALUE
ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move)
{
@ -1525,7 +1590,7 @@ ractor_selector_add(VALUE selv, VALUE rv)
config->closed = false;
config->oneshot = false;
if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) {
if (ractor_register_take(GET_RACTOR(), GET_THREAD(), r, &s->take_basket, false, config, true)) {
st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
}
@ -1649,7 +1714,7 @@ ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
}
else {
RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r));
ractor_wakeup(r, wait_yielding, wakeup_by_take);
ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take);
ret = ST_CONTINUE;
}
}
@ -1660,14 +1725,21 @@ ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
// Ractor::Selector#wait
// cleanup function, cr is unlocked
static void
ractor_selector_wait_cleaup(rb_ractor_t *cr, void *ptr)
ractor_selector_wait_cleanup(rb_ractor_t *cr, void *ptr)
{
struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr;
RACTOR_LOCK_SELF(cr);
{
while (basket_type_p(tb, basket_type_yielding)) rb_thread_sleep(0);
while (basket_type_p(tb, basket_type_yielding)) {
RACTOR_UNLOCK_SELF(cr);
{
rb_thread_sleep(0);
}
RACTOR_LOCK_SELF(cr);
}
// if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
tb->type.e = basket_type_reserved;
}
@ -1683,6 +1755,7 @@ ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yiel
struct rb_ractor_basket *tb = &s->take_basket;
struct rb_ractor_basket taken_basket;
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
bool do_receive = !!RTEST(do_receivev);
bool do_yield = !!RTEST(do_yieldv);
VALUE ret_v, ret_r;
@ -1744,7 +1817,7 @@ ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yiel
break;
}
ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, ractor_selector_wait_cleanup, tb);
}
taken_basket = *tb;
@ -1870,13 +1943,17 @@ static VALUE
ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
{
VALUE prev;
rb_thread_t *r_th = NULL;
if (r == rb_ec_ractor_ptr(ec)) {
r_th = rb_ec_thread_ptr(ec);
}
RACTOR_LOCK(r);
{
if (!r->sync.incoming_port_closed) {
prev = Qfalse;
r->sync.incoming_port_closed = true;
if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
if (ractor_wakeup(r, r_th, wait_receiving, wakeup_by_close)) {
VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue));
RUBY_DEBUG_LOG("cancel receiving");
}
@ -1915,6 +1992,7 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
while (ractor_queue_deq(r, ts, &b)) {
if (basket_type_p(&b, basket_type_take_basket)) {
tr = RACTOR_PTR(b.sender);
rb_thread_t *tr_th = b.sending_th;
struct rb_ractor_basket *tb = b.p.take.basket;
if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
@ -1932,14 +2010,14 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
// TODO: deadlock-able?
RACTOR_LOCK(tr);
{
ractor_wakeup(tr, wait_taking, wakeup_by_close);
ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_close);
}
RACTOR_UNLOCK(tr);
}
}
// raising yielding Ractor
ractor_wakeup(r, wait_yielding, wakeup_by_close);
ractor_wakeup(r, NULL, wait_yielding, wakeup_by_close);
VM_ASSERT(ractor_queue_empty_p(r, ts));
}
@ -2121,9 +2199,9 @@ ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
rb_native_cond_initialize(&r->barrier_wait_cond);
#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_initialize(&r->sync.cond);
rb_native_cond_initialize(&r->barrier_wait_cond);
#endif
ccan_list_head_init(&r->sync.wait.waiting_threads);
// thread management
rb_thread_sched_init(&r->threads.sched, false);
@ -2193,6 +2271,7 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e
ASSERT_ractor_unlocking(cr);
struct rb_ractor_queue *ts = &cr->sync.takers_queue;
rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
retry:
if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) {
@ -2203,9 +2282,9 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e
RACTOR_LOCK(cr);
{
if (!ractor_check_take_basket(cr, ts)) {
VM_ASSERT(cr->sync.wait.status == wait_none);
VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none);
RUBY_DEBUG_LOG("leave a will");
ractor_basket_fill_will(cr, &cr->sync.will_basket, v, exc);
ractor_basket_fill_will(cr, cur_th, &cr->sync.will_basket, v, exc);
}
else {
RUBY_DEBUG_LOG("rare timing!");

View File

@ -43,7 +43,8 @@ struct rb_ractor_basket {
enum rb_ractor_basket_type e;
rb_atomic_t atomic;
} type;
VALUE sender;
VALUE sender; // Ractor object sending message
rb_thread_t *sending_th;
union {
struct {
@ -117,14 +118,9 @@ struct rb_ractor_sync {
struct rb_ractor_basket will_basket;
struct ractor_wait {
enum rb_ractor_wait_status status;
enum rb_ractor_wakeup_status wakeup_status;
rb_thread_t *waiting_thread;
struct ccan_list_head waiting_threads;
// each thread has struct ccan_list_node ractor_waiting.waiting_node
} wait;
#ifndef RUBY_THREAD_PTHREAD_H
rb_nativethread_cond_t cond;
#endif
};
// created
@ -152,7 +148,6 @@ struct rb_ractor_struct {
struct rb_ractor_pub pub;
struct rb_ractor_sync sync;
VALUE receiving_mutex;
// vm wide barrier synchronization
rb_nativethread_cond_t barrier_wait_cond;

View File

@ -676,6 +676,7 @@ signal_ignored(int sig)
if (sigaction(sig, NULL, &old) < 0) return FALSE;
func = old.sa_handler;
#else
// TODO: this is not a thread-safe way to do it. Needs lock.
sighandler_t old = signal(sig, SIG_DFL);
signal(sig, old);
func = old;

View File

@ -335,7 +335,7 @@ unblock_function_clear(rb_thread_t *th)
}
static void
threadptr_interrupt_locked(rb_thread_t *th, bool trap)
threadptr_set_interrupt_locked(rb_thread_t *th, bool trap)
{
// th->interrupt_lock should be acquired here
@ -357,26 +357,27 @@ threadptr_interrupt_locked(rb_thread_t *th, bool trap)
}
static void
threadptr_interrupt(rb_thread_t *th, int trap)
threadptr_set_interrupt(rb_thread_t *th, int trap)
{
rb_native_mutex_lock(&th->interrupt_lock);
{
threadptr_interrupt_locked(th, trap);
threadptr_set_interrupt_locked(th, trap);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}
/* Set interrupt flag on another thread or current thread, and call its UBF if it has one set */
void
rb_threadptr_interrupt(rb_thread_t *th)
{
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
threadptr_interrupt(th, false);
threadptr_set_interrupt(th, false);
}
static void
threadptr_trap_interrupt(rb_thread_t *th)
{
threadptr_interrupt(th, true);
threadptr_set_interrupt(th, true);
}
static void
@ -525,6 +526,9 @@ thread_cleanup_func(void *th_ptr, int atfork)
}
rb_native_mutex_destroy(&th->interrupt_lock);
#ifndef RUBY_THREAD_PTHREAD_H
rb_native_cond_destroy(&th->ractor_waiting.cond);
#endif
}
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
@ -2423,6 +2427,7 @@ NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
static void
rb_threadptr_to_kill(rb_thread_t *th)
{
VM_ASSERT(GET_THREAD() == th);
rb_threadptr_pending_interrupt_clear(th);
th->status = THREAD_RUNNABLE;
th->to_kill = 1;
@ -2446,6 +2451,11 @@ threadptr_get_interrupts(rb_thread_t *th)
static void threadptr_interrupt_exec_exec(rb_thread_t *th);
// Execute interrupts on currently running thread
// In certain situations, calling this function will raise an exception. Some examples are:
// * during VM shutdown (`rb_ractor_terminate_all`)
// * Call to Thread#exit for current thread (`rb_thread_kill`)
// * Call to Thread#raise for current thread
int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
@ -2453,6 +2463,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
int postponed_job_interrupt = 0;
int ret = FALSE;
VM_ASSERT(GET_THREAD() == th);
if (th->ec->raised_flag) return ret;
while ((interrupt = threadptr_get_interrupts(th)) != 0) {
@ -6033,7 +6045,7 @@ rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, voi
rb_native_mutex_lock(&th->interrupt_lock);
{
ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
threadptr_interrupt_locked(th, true);
threadptr_set_interrupt_locked(th, true);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}

View File

@ -1309,16 +1309,20 @@ ractor_sched_deq(rb_vm_t *vm, rb_ractor_t *cr)
void rb_ractor_lock_self(rb_ractor_t *r);
void rb_ractor_unlock_self(rb_ractor_t *r);
// The current thread for a ractor is put to "sleep" (descheduled in the STOPPED_FOREVER state) waiting for
// a ractor action to wake it up. See docs for `ractor_sched_sleep_with_cleanup` for more info.
void
rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf)
rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_schedule_ractor_th)
{
// ractor lock of cr is acquired
// r is sleeping status
rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
struct rb_thread_sched *sched = TH_SCHED(th);
cr->sync.wait.waiting_thread = th; // TODO: multi-thread
struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node;
VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked
ccan_list_add(&cr->sync.wait.waiting_threads, waitn);
setup_ubf(th, ubf, (void *)cr);
setup_ubf(th, ubf_schedule_ractor_th, (void *)ec);
thread_sched_lock(sched, th);
{
@ -1327,8 +1331,8 @@ rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fu
if (RUBY_VM_INTERRUPTED(th->ec)) {
RUBY_DEBUG_LOG("interrupted");
}
else if (cr->sync.wait.wakeup_status != wakeup_none) {
RUBY_DEBUG_LOG("awaken:%d", (int)cr->sync.wait.wakeup_status);
else if (th->ractor_waiting.wakeup_status != wakeup_none) {
RUBY_DEBUG_LOG("awaken:%d", (int)th->ractor_waiting.wakeup_status);
}
else {
// sleep
@ -1350,25 +1354,24 @@ rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fu
setup_ubf(th, NULL, NULL);
rb_ractor_lock_self(cr);
cr->sync.wait.waiting_thread = NULL;
ccan_list_del_init(waitn);
}
void
rb_ractor_sched_wakeup(rb_ractor_t *r)
rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th)
{
rb_thread_t *r_th = r->sync.wait.waiting_thread;
// ractor lock of r is acquired
struct rb_thread_sched *sched = TH_SCHED(r_th);
struct rb_thread_sched *sched = TH_SCHED(th);
VM_ASSERT(r->sync.wait.wakeup_status != 0);
VM_ASSERT(th->ractor_waiting.wakeup_status != 0);
thread_sched_lock(sched, r_th);
thread_sched_lock(sched, th);
{
if (r_th->status == THREAD_STOPPED_FOREVER) {
thread_sched_to_ready_common(sched, r_th, true, false);
if (th->status == THREAD_STOPPED_FOREVER) {
thread_sched_to_ready_common(sched, th, true, false);
}
}
thread_sched_unlock(sched, r_th);
thread_sched_unlock(sched, th);
}
static bool

5
vm.c
View File

@ -3556,6 +3556,7 @@ thread_mark(void *ptr)
rb_gc_mark(th->last_status);
rb_gc_mark(th->locking_mutex);
rb_gc_mark(th->name);
rb_gc_mark(th->ractor_waiting.receiving_mutex);
rb_gc_mark(th->scheduler);
@ -3717,6 +3718,10 @@ th_init(rb_thread_t *th, VALUE self, rb_vm_t *vm)
th->ext_config.ractor_safe = true;
ccan_list_head_init(&th->interrupt_exec_tasks);
ccan_list_node_init(&th->ractor_waiting.waiting_node);
#ifndef RUBY_THREAD_PTHREAD_H
rb_native_cond_initialize(&th->ractor_waiting.cond);
#endif
#if USE_RUBY_DEBUG_LOG
static rb_atomic_t thread_serial = 1;

View File

@ -1106,8 +1106,20 @@ typedef struct rb_ractor_struct rb_ractor_t;
struct rb_native_thread;
struct rb_thread_ractor_waiting {
//enum rb_ractor_wait_status wait_status;
int wait_status;
//enum rb_ractor_wakeup_status wakeup_status;
int wakeup_status;
struct ccan_list_node waiting_node; // the rb_thread_t
VALUE receiving_mutex; // protects Ractor.receive_if
#ifndef RUBY_THREAD_PTHREAD_H
rb_nativethread_cond_t cond;
#endif
};
typedef struct rb_thread_struct {
struct ccan_list_node lt_node; // managed by a ractor
struct ccan_list_node lt_node; // managed by a ractor (r->threads.set)
VALUE self;
rb_ractor_t *ractor;
rb_vm_t *vm;
@ -1118,6 +1130,8 @@ typedef struct rb_thread_struct {
bool mn_schedulable;
rb_atomic_t serial; // only for RUBY_DEBUG_LOG()
struct rb_thread_ractor_waiting ractor_waiting;
VALUE last_status; /* $? */
/* for cfunc */