Rewrite Ractor synchronization mechanism

This patch rewrites Ractor synchronization mechanism, send/receive
and take/yield.

* API
  * Ractor::Selector is introduced for lightweight waiting
    for many ractors.
* Data structure
  * remove `struct rb_ractor_waiting_list` and use
    `struct rb_ractor_queue takers_queue` to manage takers.
  * remove `rb_ractor_t::yield_atexit` and use
    `rb_ractor_t::sync::will_basket::type` to check the will.
  * add `rb_ractor_basket::p.take` to represent a taking ractor.
* Synchronization protocol
  * For the Ractor local GC, `take` can not make a copy object
    directly so ask to generate the copy from the yielding ractor.
  * The following steps shows what `r1.take` does on `r0`.
    * step1: (r0) register `r0` into `r1`'s takers.
    * step2: (r0) check `r1`'s status and wakeup r0 if `r1` is waiting
             for yielding a value.
    * step3: (r0) sleep until `r1` wakes up `r0`.
  * The following steps shows what `Ractor.yield(v)` on `r1`.
    * step1: (r1) check first takers of `r1` and if there is (`r0`),
             make a copy object of `v` and pass it to `r0` and
             wakes up `r0`.
    * step2: (r1) if there is no taker ractors, sleep until
             another ractor try to take.
This commit is contained in:
Koichi Sasada 2023-02-24 18:46:17 +09:00
parent 1abec43b5d
commit a4421bd73c
Notes: git 2023-03-02 05:32:21 +00:00
5 changed files with 1220 additions and 694 deletions

View File

@ -518,9 +518,9 @@ assert_equal '[true, true, true]', %q{
end
}
received = []
take = []
taken = []
yielded = []
until rs.empty?
until received.size == RN && taken.size == RN && yielded.size == RN
r, v = Ractor.select(CR, *rs, yield_value: 'yield')
case r
when :receive
@ -528,11 +528,17 @@ assert_equal '[true, true, true]', %q{
when :yield
yielded << v
else
take << v
taken << v
rs.delete r
end
end
[received.all?('sendyield'), yielded.all?(nil), take.all?('take')]
r = [received == ['sendyield'] * RN,
yielded == [nil] * RN,
taken == ['take'] * RN,
]
STDERR.puts [received, yielded, taken].inspect
r
}
# multiple Ractors can send to one Ractor

View File

@ -11525,6 +11525,7 @@ ractor.$(OBJEXT): {$(VPATH)}constant.h
ractor.$(OBJEXT): {$(VPATH)}debug_counter.h
ractor.$(OBJEXT): {$(VPATH)}defines.h
ractor.$(OBJEXT): {$(VPATH)}encoding.h
ractor.$(OBJEXT): {$(VPATH)}eval_intern.h
ractor.$(OBJEXT): {$(VPATH)}id.h
ractor.$(OBJEXT): {$(VPATH)}id_table.h
ractor.$(OBJEXT): {$(VPATH)}intern.h

1722
ractor.c

File diff suppressed because it is too large Load Diff

117
ractor.rb
View File

@ -358,14 +358,117 @@ class Ractor
def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty?
__builtin_cstmt! %q{
const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors);
VALUE rv;
VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors),
yield_unspecified == Qtrue ? Qundef : yield_value,
(bool)RTEST(move) ? true : false, &rv);
return rb_ary_new_from_args(2, rv, v);
begin
if ractors.delete Ractor.current
do_receive = true
else
do_receive = false
end
selector = Ractor::Selector.new(*ractors)
if yield_unspecified
selector.wait receive: do_receive
else
selector.wait receive: do_receive, yield_value: yield_value, move: move
end
ensure
selector.clear
end
end
#
# Ractor::Selector provides a functionality to wait multiple Ractor events.
# Ractor::Selector#wait is more lightweight than Ractor.select()
# because we don't have to specify all target ractors for each wait time.
#
# Ractor.select() uses Ractor::Selector internally to implement it.
#
class Selector
# call-seq:
# Ractor::Selector.new(*ractors)
#
# Creates a selector object.
#
# If a ractors parameter is given, it is same as the following code.
#
# selector = Ractor::Selector.new
# ractors.each{|r| selector.add r}
#
def self.new(*rs)
selector = __builtin_cexpr! %q{
ractor_selector_create(self);
}
rs.each{|r| selector.add(r) }
selector
end
# call-seq:
# selector.add(ractor)
#
# Registers a ractor as a taking target by the selector.
#
def add r
__builtin_ractor_selector_add r
end
# call-seq:
# selector.remove(ractor)
#
# Deregisters a ractor as a taking target by the selector.
#
def remove r
__builtin_ractor_selector_remove r
end
# call-seq:
# selector.clear
#
# Deregisters all ractors.
def clear
__builtin_ractor_selector_clear
end
# call-seq:
# selector.wait(receive: false, yield_value: yield_value, move: false) -> [ractor or symbol, value]
#
# Waits Ractor events. It is lighter than Ractor.select() for many ractors.
#
# The simplest form is waiting for taking a value from one of
# registerred ractors like that.
#
# selector = Ractor::Selector.new(r1, r2, r3)
# r, v = selector.wait
#
# On this case, when r1, r2 or r3 is ready to take (yielding a value),
# this method takes the value from the ready (yielded) ractor
# and returns [the yielded ractor, the taking value].
#
# Note that if a take target ractor is closed, the ractor will be removed
# automatically.
#
# If you also want to wait with receiving an object from other ractors,
# you can specify receive: true keyword like:
#
# r, v = selector.wait receive: true
#
# On this case, wait for taking from r1, r2 or r3 and waiting for receving
# a value from other ractors.
# If it successes the receiving, it returns an array object [:receive, the received value].
#
# If you also want to wait with yielding a value, you can specify
# :yield_value like:
#
# r, v = selector.wait yield_value: obj
#
# On this case wait for taking from r1, r2, or r3 and waiting for taking
# yielding value (obj) by another ractor.
# If antoher ractor takes the value (obj), it returns an array object [:yield, nil].
#
# You can specify a keyword parameter <tt>move: true</tt> like Ractor.yield(obj, move: true)
#
def wait receive: false, yield_value: yield_unspecified = true, move: false
__builtin_ractor_selector_wait receive, !yield_unspecified, yield_value, move
end
end
#

View File

@ -9,22 +9,66 @@
#endif
enum rb_ractor_basket_type {
// basket is empty
basket_type_none,
// value is available
basket_type_ref,
basket_type_copy,
basket_type_move,
basket_type_will,
// basket should be deleted
basket_type_deleted,
// basket is reserved
basket_type_reserved,
// take_basket is available
basket_type_take_basket,
// basket is keeping by yielding ractor
basket_type_yielding,
};
// per ractor taking configuration
struct rb_ractor_selector_take_config {
bool closed;
bool oneshot;
};
struct rb_ractor_basket {
bool exception;
enum rb_ractor_basket_type type;
VALUE v;
union {
enum rb_ractor_basket_type e;
rb_atomic_t atomic;
} type;
VALUE sender;
union {
struct {
VALUE v;
bool exception;
} send;
struct {
struct rb_ractor_basket *basket;
struct rb_ractor_selector_take_config *config;
} take;
} p; // payload
};
static inline bool
basket_type_p(struct rb_ractor_basket *b, enum rb_ractor_basket_type type)
{
return b->type.e == type;
}
static inline bool
basket_none_p(struct rb_ractor_basket *b)
{
return basket_type_p(b, basket_type_none);
}
struct rb_ractor_queue {
struct rb_ractor_basket *baskets;
int start;
@ -34,12 +78,6 @@ struct rb_ractor_queue {
unsigned int reserved_cnt;
};
struct rb_ractor_waiting_list {
int cnt;
int size;
rb_ractor_t **ractors;
};
enum rb_ractor_wait_status {
wait_none = 0x00,
wait_receiving = 0x01,
@ -66,18 +104,21 @@ struct rb_ractor_sync {
#endif
rb_nativethread_cond_t cond;
// communication
struct rb_ractor_queue incoming_queue;
struct rb_ractor_waiting_list taking_ractors;
bool incoming_port_closed;
bool outgoing_port_closed;
// All sent messages will be pushed into recv_queue
struct rb_ractor_queue recv_queue;
// The following ractors waiting for the yielding by this ractor
struct rb_ractor_queue takers_queue;
// Enabled if the ractor already terminated and not taken yet.
struct rb_ractor_basket will_basket;
struct ractor_wait {
enum rb_ractor_wait_status status;
enum rb_ractor_wakeup_status wakeup_status;
struct rb_ractor_basket yielded_basket;
struct rb_ractor_basket taken_basket;
} wait;
};
@ -107,7 +148,6 @@ struct rb_ractor_struct {
struct rb_ractor_sync sync;
VALUE receiving_mutex;
bool yield_atexit;
// vm wide barrier synchronization
rb_nativethread_cond_t barrier_wait_cond;