re-layout rb_ractor_t

separate synchronization data and ractor local data.
This commit is contained in:
Koichi Sasada 2020-12-08 00:42:20 +09:00
parent 2749123e21
commit ee194af2aa
Notes: git 2020-12-09 01:41:20 +09:00
3 changed files with 123 additions and 122 deletions

224
ractor.c
View File

@ -37,7 +37,7 @@ ASSERT_ractor_unlocking(rb_ractor_t *r)
{ {
#if RACTOR_CHECK_MODE > 0 #if RACTOR_CHECK_MODE > 0
// GET_EC is NULL in an MJIT worker // GET_EC is NULL in an MJIT worker
if (GET_EC() != NULL && r->locked_by == GET_RACTOR()->self) { if (GET_EC() != NULL && r->sync.locked_by == GET_RACTOR()->self) {
rb_bug("recursive ractor locking"); rb_bug("recursive ractor locking");
} }
#endif #endif
@ -48,8 +48,8 @@ ASSERT_ractor_locking(rb_ractor_t *r)
{ {
#if RACTOR_CHECK_MODE > 0 #if RACTOR_CHECK_MODE > 0
// GET_EC is NULL in an MJIT worker // GET_EC is NULL in an MJIT worker
if (GET_EC() != NULL && r->locked_by != GET_RACTOR()->self) { if (GET_EC() != NULL && r->sync.locked_by != GET_RACTOR()->self) {
rp(r->locked_by); rp(r->sync.locked_by);
rb_bug("ractor lock is not acquired."); rb_bug("ractor lock is not acquired.");
} }
#endif #endif
@ -61,11 +61,11 @@ ractor_lock(rb_ractor_t *r, const char *file, int line)
RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : ""); RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : "");
ASSERT_ractor_unlocking(r); ASSERT_ractor_unlocking(r);
rb_native_mutex_lock(&r->lock); rb_native_mutex_lock(&r->sync.lock);
#if RACTOR_CHECK_MODE > 0 #if RACTOR_CHECK_MODE > 0
if (GET_EC() != NULL) { // GET_EC is NULL in an MJIT worker if (GET_EC() != NULL) { // GET_EC is NULL in an MJIT worker
r->locked_by = GET_RACTOR()->self; r->sync.locked_by = GET_RACTOR()->self;
} }
#endif #endif
@ -76,7 +76,7 @@ static void
ractor_lock_self(rb_ractor_t *cr, const char *file, int line) ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
{ {
VM_ASSERT(cr == GET_RACTOR()); VM_ASSERT(cr == GET_RACTOR());
VM_ASSERT(cr->locked_by != cr->self); VM_ASSERT(cr->sync.locked_by != cr->self);
ractor_lock(cr, file, line); ractor_lock(cr, file, line);
} }
@ -85,9 +85,9 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line)
{ {
ASSERT_ractor_locking(r); ASSERT_ractor_locking(r);
#if RACTOR_CHECK_MODE > 0 #if RACTOR_CHECK_MODE > 0
r->locked_by = Qnil; r->sync.locked_by = Qnil;
#endif #endif
rb_native_mutex_unlock(&r->lock); rb_native_mutex_unlock(&r->sync.lock);
RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : ""); RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : "");
} }
@ -96,7 +96,7 @@ static void
ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
{ {
VM_ASSERT(cr == GET_RACTOR()); VM_ASSERT(cr == GET_RACTOR());
VM_ASSERT(cr->locked_by == cr->self); VM_ASSERT(cr->sync.locked_by == cr->self);
ractor_unlock(cr, file, line); ractor_unlock(cr, file, line);
} }
@ -109,13 +109,13 @@ static void
ractor_cond_wait(rb_ractor_t *r) ractor_cond_wait(rb_ractor_t *r)
{ {
#if RACTOR_CHECK_MODE > 0 #if RACTOR_CHECK_MODE > 0
VALUE locked_by = r->locked_by; VALUE locked_by = r->sync.locked_by;
r->locked_by = Qnil; r->sync.locked_by = Qnil;
#endif #endif
rb_native_cond_wait(&r->wait.cond, &r->lock); rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
#if RACTOR_CHECK_MODE > 0 #if RACTOR_CHECK_MODE > 0
r->locked_by = locked_by; r->sync.locked_by = locked_by;
#endif #endif
} }
@ -186,11 +186,11 @@ ractor_mark(void *ptr)
{ {
rb_ractor_t *r = (rb_ractor_t *)ptr; rb_ractor_t *r = (rb_ractor_t *)ptr;
ractor_queue_mark(&r->incoming_queue); ractor_queue_mark(&r->sync.incoming_queue);
rb_gc_mark(r->wait.taken_basket.v); rb_gc_mark(r->sync.wait.taken_basket.v);
rb_gc_mark(r->wait.taken_basket.sender); rb_gc_mark(r->sync.wait.taken_basket.sender);
rb_gc_mark(r->wait.yielded_basket.v); rb_gc_mark(r->sync.wait.yielded_basket.v);
rb_gc_mark(r->wait.yielded_basket.sender); rb_gc_mark(r->sync.wait.yielded_basket.sender);
rb_gc_mark(r->loc); rb_gc_mark(r->loc);
rb_gc_mark(r->name); rb_gc_mark(r->name);
rb_gc_mark(r->r_stdin); rb_gc_mark(r->r_stdin);
@ -224,10 +224,10 @@ static void
ractor_free(void *ptr) ractor_free(void *ptr)
{ {
rb_ractor_t *r = (rb_ractor_t *)ptr; rb_ractor_t *r = (rb_ractor_t *)ptr;
rb_native_mutex_destroy(&r->lock); rb_native_mutex_destroy(&r->sync.lock);
rb_native_cond_destroy(&r->wait.cond); rb_native_cond_destroy(&r->sync.cond);
ractor_queue_free(&r->incoming_queue); ractor_queue_free(&r->sync.incoming_queue);
ractor_waiting_list_free(&r->taking_ractors); ractor_waiting_list_free(&r->sync.taking_ractors);
ractor_local_storage_free(r); ractor_local_storage_free(r);
ruby_xfree(r); ruby_xfree(r);
} }
@ -251,8 +251,8 @@ ractor_memsize(const void *ptr)
// TODO // TODO
return sizeof(rb_ractor_t) + return sizeof(rb_ractor_t) +
ractor_queue_memsize(&r->incoming_queue) + ractor_queue_memsize(&r->sync.incoming_queue) +
ractor_waiting_list_memsize(&r->taking_ractors); ractor_waiting_list_memsize(&r->sync.taking_ractors);
} }
static const rb_data_type_t ractor_data_type = { static const rb_data_type_t ractor_data_type = {
@ -407,11 +407,11 @@ ractor_basket_accept(struct rb_ractor_basket *b)
static VALUE static VALUE
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
{ {
struct rb_ractor_queue *rq = &r->incoming_queue; struct rb_ractor_queue *rq = &r->sync.incoming_queue;
struct rb_ractor_basket basket; struct rb_ractor_basket basket;
if (ractor_queue_deq(r, rq, &basket) == false) { if (ractor_queue_deq(r, rq, &basket) == false) {
if (r->incoming_port_closed) { if (r->sync.incoming_port_closed) {
rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
} }
else { else {
@ -427,11 +427,11 @@ ractor_sleep_wo_gvl(void *ptr)
{ {
rb_ractor_t *cr = ptr; rb_ractor_t *cr = ptr;
RACTOR_LOCK_SELF(cr); RACTOR_LOCK_SELF(cr);
VM_ASSERT(cr->wait.status != wait_none); VM_ASSERT(cr->sync.wait.status != wait_none);
if (cr->wait.wakeup_status == wakeup_none) { if (cr->sync.wait.wakeup_status == wakeup_none) {
ractor_cond_wait(cr); ractor_cond_wait(cr);
} }
cr->wait.status = wait_none; cr->sync.wait.status = wait_none;
RACTOR_UNLOCK_SELF(cr); RACTOR_UNLOCK_SELF(cr);
return NULL; return NULL;
} }
@ -442,9 +442,9 @@ ractor_sleep_interrupt(void *ptr)
rb_ractor_t *r = ptr; rb_ractor_t *r = ptr;
RACTOR_LOCK(r); RACTOR_LOCK(r);
if (r->wait.wakeup_status == wakeup_none) { if (r->sync.wait.wakeup_status == wakeup_none) {
r->wait.wakeup_status = wakeup_by_interrupt; r->sync.wait.wakeup_status = wakeup_by_interrupt;
rb_native_cond_signal(&r->wait.cond); rb_native_cond_signal(&r->sync.cond);
} }
RACTOR_UNLOCK(r); RACTOR_UNLOCK(r);
} }
@ -486,9 +486,9 @@ static void
ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr)
{ {
VM_ASSERT(GET_RACTOR() == cr); VM_ASSERT(GET_RACTOR() == cr);
VM_ASSERT(cr->wait.status != wait_none); VM_ASSERT(cr->sync.wait.status != wait_none);
// fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr, // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr,
// wait_status_str(cr->wait.status), wakeup_status_str(cr->wait.wakeup_status)); // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
RACTOR_UNLOCK(cr); RACTOR_UNLOCK(cr);
rb_nogvl(ractor_sleep_wo_gvl, cr, rb_nogvl(ractor_sleep_wo_gvl, cr,
@ -500,7 +500,7 @@ ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr)
static bool static bool
ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status) ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status)
{ {
return (r->wait.status & wait_status) && r->wait.wakeup_status == wakeup_none; return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
} }
static bool static bool
@ -509,12 +509,12 @@ ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_w
ASSERT_ractor_locking(r); ASSERT_ractor_locking(r);
// fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r, // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r,
// wait_status_str(r->wait.status), wait_status_str(wait_status), // wait_status_str(r->sync.wait.status), wait_status_str(wait_status),
// wakeup_status_str(r->wait.wakeup_status), wakeup_status_str(wakeup_status)); // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status));
if (ractor_sleeping_by(r, wait_status)) { if (ractor_sleeping_by(r, wait_status)) {
r->wait.wakeup_status = wakeup_status; r->sync.wait.wakeup_status = wakeup_status;
rb_native_cond_signal(&r->wait.cond); rb_native_cond_signal(&r->sync.cond);
return true; return true;
} }
else { else {
@ -536,12 +536,12 @@ ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr)
} }
else { else {
// insert cr into taking list // insert cr into taking list
struct rb_ractor_waiting_list *wl = &r->taking_ractors; struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors;
for (int i=0; i<wl->cnt; i++) { for (int i=0; i<wl->cnt; i++) {
if (wl->ractors[i] == cr) { if (wl->ractors[i] == cr) {
// TODO: make it clean code. // TODO: make it clean code.
rb_native_mutex_unlock(&r->lock); rb_native_mutex_unlock(&r->sync.lock);
rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting."); rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting.");
} }
} }
@ -564,11 +564,11 @@ ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr)
if (retry_try) { if (retry_try) {
RACTOR_LOCK(cr); RACTOR_LOCK(cr);
{ {
if (cr->wait.wakeup_status == wakeup_none) { if (cr->sync.wait.wakeup_status == wakeup_none) {
VM_ASSERT(cr->wait.status != wait_none); VM_ASSERT(cr->sync.wait.status != wait_none);
cr->wait.wakeup_status = wakeup_by_retry; cr->sync.wait.wakeup_status = wakeup_by_retry;
cr->wait.status = wait_none; cr->sync.wait.status = wait_none;
} }
} }
RACTOR_UNLOCK(cr); RACTOR_UNLOCK(cr);
@ -601,7 +601,7 @@ static rb_ractor_t *
ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
{ {
ASSERT_ractor_locking(r); ASSERT_ractor_locking(r);
VM_ASSERT(&r->taking_ractors == wl); VM_ASSERT(&r->sync.taking_ractors == wl);
if (wl->cnt > 0) { if (wl->cnt > 0) {
rb_ractor_t *tr = wl->ractors[0]; rb_ractor_t *tr = wl->ractors[0];
@ -625,14 +625,14 @@ ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r)
while ((v = ractor_try_receive(ec, r)) == Qundef) { while ((v = ractor_try_receive(ec, r)) == Qundef) {
RACTOR_LOCK(r); RACTOR_LOCK(r);
{ {
if (ractor_queue_empty_p(r, &r->incoming_queue)) { if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) {
VM_ASSERT(r->wait.status == wait_none); VM_ASSERT(r->sync.wait.status == wait_none);
r->wait.status = wait_receiving; r->sync.wait.status = wait_receiving;
r->wait.wakeup_status = wakeup_none; r->sync.wait.wakeup_status = wakeup_none;
ractor_sleep(ec, r); ractor_sleep(ec, r);
r->wait.wakeup_status = wakeup_none; r->sync.wait.wakeup_status = wakeup_none;
} }
} }
RACTOR_UNLOCK(r); RACTOR_UNLOCK(r);
@ -645,11 +645,11 @@ static void
ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
{ {
bool closed = false; bool closed = false;
struct rb_ractor_queue *rq = &r->incoming_queue; struct rb_ractor_queue *rq = &r->sync.incoming_queue;
RACTOR_LOCK(r); RACTOR_LOCK(r);
{ {
if (r->incoming_port_closed) { if (r->sync.incoming_port_closed) {
closed = true; closed = true;
} }
else { else {
@ -713,11 +713,11 @@ ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r)
RACTOR_LOCK(r); RACTOR_LOCK(r);
{ {
if (ractor_wakeup(r, wait_yielding, wakeup_by_take)) { if (ractor_wakeup(r, wait_yielding, wakeup_by_take)) {
VM_ASSERT(r->wait.yielded_basket.type != basket_type_none); VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none);
basket = r->wait.yielded_basket; basket = r->sync.wait.yielded_basket;
ractor_basket_clear(&r->wait.yielded_basket); ractor_basket_clear(&r->sync.wait.yielded_basket);
} }
else if (r->outgoing_port_closed) { else if (r->sync.outgoing_port_closed) {
closed = true; closed = true;
} }
else { else {
@ -745,7 +745,7 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b
ASSERT_ractor_unlocking(cr); ASSERT_ractor_unlocking(cr);
VM_ASSERT(basket->type != basket_type_none); VM_ASSERT(basket->type != basket_type_none);
if (cr->outgoing_port_closed) { if (cr->sync.outgoing_port_closed) {
rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
} }
@ -754,7 +754,7 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b
retry_shift: retry_shift:
RACTOR_LOCK(cr); RACTOR_LOCK(cr);
{ {
r = ractor_waiting_list_shift(cr, &cr->taking_ractors); r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors);
} }
RACTOR_UNLOCK(cr); RACTOR_UNLOCK(cr);
@ -764,8 +764,8 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b
RACTOR_LOCK(r); RACTOR_LOCK(r);
{ {
if (ractor_wakeup(r, wait_taking, wakeup_by_yield)) { if (ractor_wakeup(r, wait_taking, wakeup_by_yield)) {
VM_ASSERT(r->wait.taken_basket.type == basket_type_none); VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none);
r->wait.taken_basket = *basket; r->sync.wait.taken_basket = *basket;
} }
else { else {
retry_shift = true; retry_shift = true;
@ -809,10 +809,10 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
VALUE v; VALUE v;
} *actions = ALLOCA_N(struct ractor_select_action, alen + (yield_p ? 1 : 0)); } *actions = ALLOCA_N(struct ractor_select_action, alen + (yield_p ? 1 : 0));
VM_ASSERT(cr->wait.status == wait_none); VM_ASSERT(cr->sync.wait.status == wait_none);
VM_ASSERT(cr->wait.wakeup_status == wakeup_none); VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
VM_ASSERT(cr->wait.taken_basket.type == basket_type_none); VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
VM_ASSERT(cr->wait.yielded_basket.type == basket_type_none); VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
// setup actions // setup actions
for (i=0; i<alen; i++) { for (i=0; i<alen; i++) {
@ -842,7 +842,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
wait_status |= wait_yielding; wait_status |= wait_yielding;
alen++; alen++;
ractor_basket_setup(ec, &cr->wait.yielded_basket, yielded_value, move, false, false); ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false);
} }
// TODO: shuffle actions // TODO: shuffle actions
@ -872,7 +872,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
break; break;
case ractor_select_action_yield: case ractor_select_action_yield:
{ {
if (ractor_try_yield(ec, cr, &cr->wait.yielded_basket)) { if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) {
*ret_r = ID2SYM(rb_intern("yield")); *ret_r = ID2SYM(rb_intern("yield"));
ret = Qnil; ret = Qnil;
goto cleanup; goto cleanup;
@ -886,9 +886,9 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
RACTOR_LOCK(cr); RACTOR_LOCK(cr);
{ {
VM_ASSERT(cr->wait.status == wait_none); VM_ASSERT(cr->sync.wait.status == wait_none);
cr->wait.status = wait_status; cr->sync.wait.status = wait_status;
cr->wait.wakeup_status == wakeup_none; cr->sync.wait.wakeup_status == wakeup_none;
} }
RACTOR_UNLOCK(cr); RACTOR_UNLOCK(cr);
@ -909,7 +909,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
// wait // wait
RACTOR_LOCK(cr); RACTOR_LOCK(cr);
{ {
if (cr->wait.wakeup_status == wakeup_none) { if (cr->sync.wait.wakeup_status == wakeup_none) {
for (i=0; i<alen; i++) { for (i=0; i<alen; i++) {
rb_ractor_t *r; rb_ractor_t *r;
@ -918,41 +918,41 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
r = RACTOR_PTR(actions[i].v); r = RACTOR_PTR(actions[i].v);
if (ractor_sleeping_by(r, wait_yielding)) { if (ractor_sleeping_by(r, wait_yielding)) {
RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->id); RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->id);
cr->wait.wakeup_status = wakeup_by_retry; cr->sync.wait.wakeup_status = wakeup_by_retry;
goto skip_sleep; goto skip_sleep;
} }
break; break;
case ractor_select_action_receive: case ractor_select_action_receive:
if (cr->incoming_queue.cnt > 0) { if (cr->sync.incoming_queue.cnt > 0) {
RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->incoming_queue.cnt); RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt);
cr->wait.wakeup_status = wakeup_by_retry; cr->sync.wait.wakeup_status = wakeup_by_retry;
goto skip_sleep; goto skip_sleep;
} }
break; break;
case ractor_select_action_yield: case ractor_select_action_yield:
if (cr->taking_ractors.cnt > 0) { if (cr->sync.taking_ractors.cnt > 0) {
RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->taking_ractors.cnt); RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt);
cr->wait.wakeup_status = wakeup_by_retry; cr->sync.wait.wakeup_status = wakeup_by_retry;
goto skip_sleep; goto skip_sleep;
} }
else if (cr->outgoing_port_closed) { else if (cr->sync.outgoing_port_closed) {
cr->wait.wakeup_status = wakeup_by_close; cr->sync.wait.wakeup_status = wakeup_by_close;
goto skip_sleep; goto skip_sleep;
} }
break; break;
} }
} }
RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->wait.status)); RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status));
ractor_sleep(ec, cr); ractor_sleep(ec, cr);
RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->wait.wakeup_status)); RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status));
} }
else { else {
skip_sleep: skip_sleep:
RUBY_DEBUG_LOG("no need to sleep %s->%s", RUBY_DEBUG_LOG("no need to sleep %s->%s",
wait_status_str(cr->wait.status), wait_status_str(cr->sync.wait.status),
wakeup_status_str(cr->wait.wakeup_status)); wakeup_status_str(cr->sync.wait.wakeup_status));
cr->wait.status = wait_none; cr->sync.wait.status = wait_none;
} }
} }
RACTOR_UNLOCK(cr); RACTOR_UNLOCK(cr);
@ -963,7 +963,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
switch (actions[i].type) { switch (actions[i].type) {
case ractor_select_action_take: case ractor_select_action_take:
r = RACTOR_PTR(actions[i].v); r = RACTOR_PTR(actions[i].v);
ractor_waiting_list_del(r, &r->taking_ractors, cr); ractor_waiting_list_del(r, &r->sync.taking_ractors, cr);
break; break;
case ractor_select_action_receive: case ractor_select_action_receive:
case ractor_select_action_yield: case ractor_select_action_yield:
@ -972,8 +972,8 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
} }
// check results // check results
enum ractor_wakeup_status wakeup_status = cr->wait.wakeup_status; enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status;
cr->wait.wakeup_status = wakeup_none; cr->sync.wait.wakeup_status = wakeup_none;
switch (wakeup_status) { switch (wakeup_status) {
case wakeup_none: case wakeup_none:
@ -990,10 +990,10 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
case wakeup_by_yield: case wakeup_by_yield:
// take was succeeded! // take was succeeded!
// cr.wait.taken_basket contains passed block // cr.wait.taken_basket contains passed block
VM_ASSERT(cr->wait.taken_basket.type != basket_type_none); VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none);
*ret_r = cr->wait.taken_basket.sender; *ret_r = cr->sync.wait.taken_basket.sender;
VM_ASSERT(rb_ractor_p(*ret_r)); VM_ASSERT(rb_ractor_p(*ret_r));
ret = ractor_basket_accept(&cr->wait.taken_basket); ret = ractor_basket_accept(&cr->sync.wait.taken_basket);
goto cleanup; goto cleanup;
case wakeup_by_take: case wakeup_by_take:
*ret_r = ID2SYM(rb_intern("yield")); *ret_r = ID2SYM(rb_intern("yield"));
@ -1013,14 +1013,14 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
cleanup: cleanup:
RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status)); RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status));
if (cr->wait.yielded_basket.type != basket_type_none) { if (cr->sync.wait.yielded_basket.type != basket_type_none) {
ractor_basket_clear(&cr->wait.yielded_basket); ractor_basket_clear(&cr->sync.wait.yielded_basket);
} }
VM_ASSERT(cr->wait.status == wait_none); VM_ASSERT(cr->sync.wait.status == wait_none);
VM_ASSERT(cr->wait.wakeup_status == wakeup_none); VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
VM_ASSERT(cr->wait.taken_basket.type == basket_type_none); VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
VM_ASSERT(cr->wait.yielded_basket.type == basket_type_none); VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
if (interrupted) { if (interrupted) {
rb_vm_check_ints_blocking(ec); rb_vm_check_ints_blocking(ec);
@ -1055,11 +1055,11 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
RACTOR_LOCK(r); RACTOR_LOCK(r);
{ {
if (!r->incoming_port_closed) { if (!r->sync.incoming_port_closed) {
prev = Qfalse; prev = Qfalse;
r->incoming_port_closed = true; r->sync.incoming_port_closed = true;
if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) { if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
VM_ASSERT(r->incoming_queue.cnt == 0); VM_ASSERT(r->sync.incoming_queue.cnt == 0);
RUBY_DEBUG_LOG("cancel receiving", 0); RUBY_DEBUG_LOG("cancel receiving", 0);
} }
} }
@ -1078,9 +1078,9 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
RACTOR_LOCK(r); RACTOR_LOCK(r);
{ {
if (!r->outgoing_port_closed) { if (!r->sync.outgoing_port_closed) {
prev = Qfalse; prev = Qfalse;
r->outgoing_port_closed = true; r->sync.outgoing_port_closed = true;
} }
else { else {
prev = Qtrue; prev = Qtrue;
@ -1088,7 +1088,7 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
// wakeup all taking ractors // wakeup all taking ractors
rb_ractor_t *taking_ractor; rb_ractor_t *taking_ractor;
while ((taking_ractor = ractor_waiting_list_shift(r, &r->taking_ractors)) != NULL) { while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) {
RACTOR_LOCK(taking_ractor); RACTOR_LOCK(taking_ractor);
ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close); ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close);
RACTOR_UNLOCK(taking_ractor); RACTOR_UNLOCK(taking_ractor);
@ -1242,9 +1242,9 @@ rb_ractor_living_threads_init(rb_ractor_t *r)
static void static void
ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
{ {
ractor_queue_setup(&r->incoming_queue); ractor_queue_setup(&r->sync.incoming_queue);
rb_native_mutex_initialize(&r->lock); rb_native_mutex_initialize(&r->sync.lock);
rb_native_cond_initialize(&r->wait.cond); rb_native_cond_initialize(&r->sync.cond);
rb_native_cond_initialize(&r->barrier_wait_cond); rb_native_cond_initialize(&r->barrier_wait_cond);
// thread management // thread management
@ -1309,7 +1309,7 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL
static void static void
ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc) ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
{ {
if (cr->outgoing_port_closed) { if (cr->sync.outgoing_port_closed) {
return; return;
} }
@ -1326,12 +1326,12 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e
bool retry = false; bool retry = false;
RACTOR_LOCK(cr); RACTOR_LOCK(cr);
{ {
if (cr->taking_ractors.cnt == 0) { if (cr->sync.taking_ractors.cnt == 0) {
cr->wait.yielded_basket = basket; cr->sync.wait.yielded_basket = basket;
VM_ASSERT(cr->wait.status == wait_none); VM_ASSERT(cr->sync.wait.status == wait_none);
cr->wait.status = wait_yielding; cr->sync.wait.status = wait_yielding;
cr->wait.wakeup_status = wakeup_none; cr->sync.wait.wakeup_status = wakeup_none;
VM_ASSERT(cr->yield_atexit == false); VM_ASSERT(cr->yield_atexit == false);
cr->yield_atexit = true; cr->yield_atexit = true;

View File

@ -36,23 +36,20 @@ struct rb_ractor_waiting_list {
rb_ractor_t **ractors; rb_ractor_t **ractors;
}; };
struct rb_random_struct; // c.f. ruby/random.h struct rb_ractor_sync {
struct rb_ractor_struct {
// ractor lock // ractor lock
rb_nativethread_lock_t lock; rb_nativethread_lock_t lock;
#if RACTOR_CHECK_MODE > 0 #if RACTOR_CHECK_MODE > 0
VALUE locked_by; VALUE locked_by;
#endif #endif
rb_nativethread_cond_t cond;
// communication // communication
struct rb_ractor_queue incoming_queue; struct rb_ractor_queue incoming_queue;
struct rb_ractor_waiting_list taking_ractors;
bool incoming_port_closed; bool incoming_port_closed;
bool outgoing_port_closed; bool outgoing_port_closed;
bool yield_atexit;
struct rb_ractor_waiting_list taking_ractors;
struct ractor_wait { struct ractor_wait {
enum ractor_wait_status { enum ractor_wait_status {
@ -72,11 +69,15 @@ struct rb_ractor_struct {
wakeup_by_retry, wakeup_by_retry,
} wakeup_status; } wakeup_status;
struct rb_ractor_basket taken_basket;
struct rb_ractor_basket yielded_basket; struct rb_ractor_basket yielded_basket;
struct rb_ractor_basket taken_basket;
rb_nativethread_cond_t cond;
} wait; } wait;
};
struct rb_ractor_struct {
struct rb_ractor_sync sync;
bool yield_atexit;
// vm wide barrier synchronization // vm wide barrier synchronization
rb_nativethread_cond_t barrier_wait_cond; rb_nativethread_cond_t barrier_wait_cond;

View File

@ -49,7 +49,7 @@ vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsign
else { else {
#if RACTOR_CHECK_MODE #if RACTOR_CHECK_MODE
// locking ractor and acquire VM lock will cause deadlock // locking ractor and acquire VM lock will cause deadlock
VM_ASSERT(cr->locked_by != cr->self); VM_ASSERT(cr->sync.locked_by != cr->self);
#endif #endif
// lock // lock