Ractor#receive_if to receive only matched messages
Instead of Ractor.receive, Ractor.receive_if can provide a pattern by a block and you can choose the receiving message. [Feature #17378]
This commit is contained in:
parent
ddb93c3d64
commit
a9a7f4d8b8
Notes:
git
2020-12-16 19:13:11 +09:00
@ -100,6 +100,64 @@ assert_equal 'ok', %q{
|
|||||||
r.take
|
r.take
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Ractor#receive_if can filter the message
|
||||||
|
assert_equal '[2, 3, 1]', %q{
|
||||||
|
r = Ractor.new Ractor.current do |main|
|
||||||
|
main << 1
|
||||||
|
main << 2
|
||||||
|
main << 3
|
||||||
|
end
|
||||||
|
a = []
|
||||||
|
a << Ractor.receive_if{|msg| msg == 2}
|
||||||
|
a << Ractor.receive_if{|msg| msg == 3}
|
||||||
|
a << Ractor.receive
|
||||||
|
}
|
||||||
|
|
||||||
|
# Ractor#receive_if with break
|
||||||
|
assert_equal '[2, [1, :break], 3]', %q{
|
||||||
|
r = Ractor.new Ractor.current do |main|
|
||||||
|
main << 1
|
||||||
|
main << 2
|
||||||
|
main << 3
|
||||||
|
end
|
||||||
|
|
||||||
|
a = []
|
||||||
|
a << Ractor.receive_if{|msg| msg == 2}
|
||||||
|
a << Ractor.receive_if{|msg| break [msg, :break]}
|
||||||
|
a << Ractor.receive
|
||||||
|
}
|
||||||
|
|
||||||
|
# Ractor#receive_if can't be called recursively
|
||||||
|
assert_equal '[[:e1, 1], [:e2, 2]]', %q{
|
||||||
|
r = Ractor.new Ractor.current do |main|
|
||||||
|
main << 1
|
||||||
|
main << 2
|
||||||
|
main << 3
|
||||||
|
end
|
||||||
|
|
||||||
|
a = []
|
||||||
|
|
||||||
|
Ractor.receive_if do |msg|
|
||||||
|
begin
|
||||||
|
Ractor.receive
|
||||||
|
rescue Ractor::Error
|
||||||
|
a << [:e1, msg]
|
||||||
|
end
|
||||||
|
true # delete 1 from queue
|
||||||
|
end
|
||||||
|
|
||||||
|
Ractor.receive_if do |msg|
|
||||||
|
begin
|
||||||
|
Ractor.receive_if{}
|
||||||
|
rescue Ractor::Error
|
||||||
|
a << [:e2, msg]
|
||||||
|
end
|
||||||
|
true # delete 2 from queue
|
||||||
|
end
|
||||||
|
|
||||||
|
a #
|
||||||
|
}
|
||||||
|
|
||||||
###
|
###
|
||||||
###
|
###
|
||||||
# Ractor still has several memory corruption so skip huge number of tests
|
# Ractor still has several memory corruption so skip huge number of tests
|
||||||
|
310
ractor.c
310
ractor.c
@ -168,13 +168,15 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status)
|
|||||||
return rb_ractor_status_p(r, status);
|
return rb_ractor_status_p(r, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i);
|
||||||
|
|
||||||
static void
|
static void
|
||||||
ractor_queue_mark(struct rb_ractor_queue *rq)
|
ractor_queue_mark(struct rb_ractor_queue *rq)
|
||||||
{
|
{
|
||||||
for (int i=0; i<rq->cnt; i++) {
|
for (int i=0; i<rq->cnt; i++) {
|
||||||
int idx = (rq->start + i) % rq->size;
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
||||||
rb_gc_mark(rq->baskets[idx].v);
|
rb_gc_mark(b->v);
|
||||||
rb_gc_mark(rq->baskets[idx].sender);
|
rb_gc_mark(b->sender);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,6 +193,8 @@ ractor_mark(void *ptr)
|
|||||||
rb_gc_mark(r->sync.wait.taken_basket.sender);
|
rb_gc_mark(r->sync.wait.taken_basket.sender);
|
||||||
rb_gc_mark(r->sync.wait.yielded_basket.v);
|
rb_gc_mark(r->sync.wait.yielded_basket.v);
|
||||||
rb_gc_mark(r->sync.wait.yielded_basket.sender);
|
rb_gc_mark(r->sync.wait.yielded_basket.sender);
|
||||||
|
rb_gc_mark(r->receiving_mutex);
|
||||||
|
|
||||||
rb_gc_mark(r->loc);
|
rb_gc_mark(r->loc);
|
||||||
rb_gc_mark(r->name);
|
rb_gc_mark(r->name);
|
||||||
rb_gc_mark(r->r_stdin);
|
rb_gc_mark(r->r_stdin);
|
||||||
@ -317,33 +321,90 @@ ractor_queue_setup(struct rb_ractor_queue *rq)
|
|||||||
rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
|
rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct rb_ractor_basket *
|
||||||
|
ractor_queue_at(struct rb_ractor_queue *rq, int i)
|
||||||
|
{
|
||||||
|
return &rq->baskets[(rq->start + i) % rq->size];
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
ractor_queue_advance(struct rb_ractor_queue *rq)
|
||||||
|
{
|
||||||
|
ASSERT_ractor_locking(GET_RACTOR());
|
||||||
|
|
||||||
|
if (rq->reserved_cnt == 0) {
|
||||||
|
rq->cnt--;
|
||||||
|
rq->start = (rq->start + 1) % rq->size;
|
||||||
|
rq->serial++;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
ractor_queue_at(rq, 0)->type = basket_type_deleted;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
ractor_queue_skip_p(struct rb_ractor_queue *rq, int i)
|
||||||
|
{
|
||||||
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
||||||
|
return b->type == basket_type_deleted ||
|
||||||
|
b->type == basket_type_reserved;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
||||||
|
{
|
||||||
|
ASSERT_ractor_locking(r);
|
||||||
|
|
||||||
|
while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) {
|
||||||
|
ractor_queue_advance(rq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
|
||||||
{
|
{
|
||||||
ASSERT_ractor_locking(r);
|
ASSERT_ractor_locking(r);
|
||||||
return rq->cnt == 0;
|
|
||||||
|
if (rq->cnt == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
ractor_queue_compact(r, rq);
|
||||||
|
|
||||||
|
for (int i=0; i<rq->cnt; i++) {
|
||||||
|
if (!ractor_queue_skip_p(rq, i)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
|
ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
|
||||||
{
|
{
|
||||||
bool b;
|
bool found = false;
|
||||||
|
|
||||||
RACTOR_LOCK(r);
|
RACTOR_LOCK(r);
|
||||||
{
|
{
|
||||||
if (!ractor_queue_empty_p(r, rq)) {
|
if (!ractor_queue_empty_p(r, rq)) {
|
||||||
*basket = rq->baskets[rq->start];
|
for (int i=0; i<rq->cnt; i++) {
|
||||||
rq->cnt--;
|
if (!ractor_queue_skip_p(rq, i)) {
|
||||||
rq->start = (rq->start + 1) % rq->size;
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
||||||
b = true;
|
*basket = *b;
|
||||||
|
|
||||||
|
// remove from queue
|
||||||
|
b->type = basket_type_deleted;
|
||||||
|
ractor_queue_compact(r, rq);
|
||||||
|
found = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
b = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RACTOR_UNLOCK(r);
|
RACTOR_UNLOCK(r);
|
||||||
|
|
||||||
return b;
|
return found;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
@ -373,24 +434,29 @@ ractor_basket_clear(struct rb_ractor_basket *b)
|
|||||||
static VALUE ractor_reset_belonging(VALUE obj); // in this file
|
static VALUE ractor_reset_belonging(VALUE obj); // in this file
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
ractor_basket_accept(struct rb_ractor_basket *b)
|
ractor_basket_value(struct rb_ractor_basket *b)
|
||||||
{
|
{
|
||||||
VALUE v;
|
|
||||||
|
|
||||||
switch (b->type) {
|
switch (b->type) {
|
||||||
case basket_type_ref:
|
case basket_type_ref:
|
||||||
VM_ASSERT(rb_ractor_shareable_p(b->v));
|
|
||||||
v = b->v;
|
|
||||||
break;
|
break;
|
||||||
case basket_type_copy:
|
case basket_type_copy:
|
||||||
case basket_type_move:
|
case basket_type_move:
|
||||||
case basket_type_will:
|
case basket_type_will:
|
||||||
v = ractor_reset_belonging(b->v);
|
b->type = basket_type_ref;
|
||||||
|
b->v = ractor_reset_belonging(b->v);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
rb_bug("unreachable");
|
rb_bug("unreachable");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return b->v;
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
ractor_basket_accept(struct rb_ractor_basket *b)
|
||||||
|
{
|
||||||
|
VALUE v = ractor_basket_value(b);
|
||||||
|
|
||||||
if (b->exception) {
|
if (b->exception) {
|
||||||
VALUE cause = v;
|
VALUE cause = v;
|
||||||
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
|
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
|
||||||
@ -404,12 +470,22 @@ ractor_basket_accept(struct rb_ractor_basket *b)
|
|||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
ractor_recursive_receive_if(rb_ractor_t *r)
|
||||||
|
{
|
||||||
|
if (r->receiving_mutex && rb_mutex_locked_p(r->receiving_mutex)) {
|
||||||
|
rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
|
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
|
||||||
{
|
{
|
||||||
struct rb_ractor_queue *rq = &r->sync.incoming_queue;
|
struct rb_ractor_queue *rq = &r->sync.incoming_queue;
|
||||||
struct rb_ractor_basket basket;
|
struct rb_ractor_basket basket;
|
||||||
|
|
||||||
|
ractor_recursive_receive_if(r);
|
||||||
|
|
||||||
if (ractor_queue_deq(r, rq, &basket) == false) {
|
if (ractor_queue_deq(r, rq, &basket) == false) {
|
||||||
if (r->sync.incoming_port_closed) {
|
if (r->sync.incoming_port_closed) {
|
||||||
rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
|
rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
|
||||||
@ -616,31 +692,195 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static void
|
||||||
ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r)
|
ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
|
||||||
{
|
{
|
||||||
VM_ASSERT(r == rb_ec_ractor_ptr(ec));
|
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
|
||||||
|
ractor_recursive_receive_if(cr);
|
||||||
|
|
||||||
|
RACTOR_LOCK(cr);
|
||||||
|
{
|
||||||
|
if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) {
|
||||||
|
VM_ASSERT(cr->sync.wait.status == wait_none);
|
||||||
|
cr->sync.wait.status = wait_receiving;
|
||||||
|
cr->sync.wait.wakeup_status = wakeup_none;
|
||||||
|
ractor_sleep(ec, cr);
|
||||||
|
cr->sync.wait.wakeup_status = wakeup_none;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RACTOR_UNLOCK(cr);
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
|
||||||
|
{
|
||||||
|
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
|
||||||
VALUE v;
|
VALUE v;
|
||||||
|
|
||||||
while ((v = ractor_try_receive(ec, r)) == Qundef) {
|
while ((v = ractor_try_receive(ec, cr)) == Qundef) {
|
||||||
RACTOR_LOCK(r);
|
ractor_receive_wait(ec, cr);
|
||||||
{
|
|
||||||
if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) {
|
|
||||||
VM_ASSERT(r->sync.wait.status == wait_none);
|
|
||||||
r->sync.wait.status = wait_receiving;
|
|
||||||
r->sync.wait.wakeup_status = wakeup_none;
|
|
||||||
|
|
||||||
ractor_sleep(ec, r);
|
|
||||||
|
|
||||||
r->sync.wait.wakeup_status = wakeup_none;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
RACTOR_UNLOCK(r);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// for debug
|
||||||
|
static const char *
|
||||||
|
basket_type_name(enum rb_ractor_basket_type type)
|
||||||
|
{
|
||||||
|
switch (type) {
|
||||||
|
#define T(t) case basket_type_##t: return #t
|
||||||
|
T(none);
|
||||||
|
T(ref);
|
||||||
|
T(copy);
|
||||||
|
T(move);
|
||||||
|
T(will);
|
||||||
|
T(deleted);
|
||||||
|
T(reserved);
|
||||||
|
default: rb_bug("unreachable");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
rq_dump(struct rb_ractor_queue *rq)
|
||||||
|
{
|
||||||
|
bool bug = false;
|
||||||
|
for (int i=0; i<rq->cnt; i++) {
|
||||||
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
||||||
|
fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
|
||||||
|
if (b->type == basket_type_reserved) bug = true;
|
||||||
|
}
|
||||||
|
if (bug) rb_bug("!!");
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
struct receive_block_data {
|
||||||
|
rb_ractor_t *cr;
|
||||||
|
struct rb_ractor_queue *rq;
|
||||||
|
VALUE v;
|
||||||
|
int index;
|
||||||
|
bool success;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void
|
||||||
|
ractor_receive_if_lock(rb_ractor_t *cr)
|
||||||
|
{
|
||||||
|
VALUE m = cr->receiving_mutex;
|
||||||
|
if (m == Qfalse) {
|
||||||
|
m = cr->receiving_mutex = rb_mutex_new();
|
||||||
|
}
|
||||||
|
rb_mutex_lock(m);
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
receive_if_body(VALUE ptr)
|
||||||
|
{
|
||||||
|
struct receive_block_data *data = (struct receive_block_data *)ptr;
|
||||||
|
|
||||||
|
ractor_receive_if_lock(data->cr);
|
||||||
|
VALUE block_result = rb_yield(data->v);
|
||||||
|
|
||||||
|
RACTOR_LOCK_SELF(data->cr);
|
||||||
|
{
|
||||||
|
struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
|
||||||
|
VM_ASSERT(b->type == basket_type_reserved);
|
||||||
|
data->rq->reserved_cnt--;
|
||||||
|
|
||||||
|
if (RTEST(block_result)) {
|
||||||
|
b->type = basket_type_deleted;
|
||||||
|
ractor_queue_compact(data->cr, data->rq);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
b->type = basket_type_ref;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RACTOR_UNLOCK_SELF(data->cr);
|
||||||
|
|
||||||
|
data->success = true;
|
||||||
|
|
||||||
|
if (RTEST(block_result)) {
|
||||||
|
return data->v;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Qundef;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
receive_if_ensure(VALUE v)
|
||||||
|
{
|
||||||
|
struct receive_block_data *data = (struct receive_block_data *)v;
|
||||||
|
|
||||||
|
if (!data->success) {
|
||||||
|
RACTOR_LOCK_SELF(data->cr);
|
||||||
|
{
|
||||||
|
struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
|
||||||
|
VM_ASSERT(b->type == basket_type_reserved);
|
||||||
|
b->type = basket_type_deleted;
|
||||||
|
data->rq->reserved_cnt--;
|
||||||
|
}
|
||||||
|
RACTOR_UNLOCK_SELF(data->cr);
|
||||||
|
}
|
||||||
|
|
||||||
|
rb_mutex_unlock(data->cr->receiving_mutex);
|
||||||
|
return Qnil;
|
||||||
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
|
||||||
|
{
|
||||||
|
if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
|
||||||
|
|
||||||
|
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
|
||||||
|
unsigned int serial = (unsigned int)-1;
|
||||||
|
int index = 0;
|
||||||
|
struct rb_ractor_queue *rq = &cr->sync.incoming_queue;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
VALUE v = Qundef;
|
||||||
|
|
||||||
|
ractor_receive_wait(ec, cr);
|
||||||
|
|
||||||
|
RACTOR_LOCK_SELF(cr);
|
||||||
|
{
|
||||||
|
if (serial != rq->serial) {
|
||||||
|
serial = rq->serial;
|
||||||
|
index = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check newer version
|
||||||
|
for (int i=index; i<rq->cnt; i++) {
|
||||||
|
if (!ractor_queue_skip_p(rq, i)) {
|
||||||
|
struct rb_ractor_basket *b = ractor_queue_at(rq, i);
|
||||||
|
v = ractor_basket_value(b);
|
||||||
|
b->type = basket_type_reserved;
|
||||||
|
rq->reserved_cnt++;
|
||||||
|
index = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RACTOR_UNLOCK_SELF(cr);
|
||||||
|
|
||||||
|
if (v != Qundef) {
|
||||||
|
struct receive_block_data data = {
|
||||||
|
.cr = cr,
|
||||||
|
.rq = rq,
|
||||||
|
.v = v,
|
||||||
|
.index = index,
|
||||||
|
.success = false,
|
||||||
|
};
|
||||||
|
|
||||||
|
VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
|
||||||
|
receive_if_ensure, (VALUE)&data);
|
||||||
|
|
||||||
|
if (result != Qundef) return result;
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
|
ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
|
||||||
{
|
{
|
||||||
|
41
ractor.rb
41
ractor.rb
@ -102,6 +102,47 @@ class Ractor
|
|||||||
end
|
end
|
||||||
alias recv receive
|
alias recv receive
|
||||||
|
|
||||||
|
# Receive only a specific message.
|
||||||
|
#
|
||||||
|
# Instead of Ractor.receive, Ractor.receive_if can provide a pattern
|
||||||
|
# by a block and you can choose the receiving message.
|
||||||
|
#
|
||||||
|
# # Example:
|
||||||
|
# r = Ractor.new do
|
||||||
|
# p Ractor.receive_if{|msg| /foo/ =~ msg} #=> "foo3"
|
||||||
|
# p Ractor.receive_if{|msg| /bar/ =~ msg} #=> "bar1"
|
||||||
|
# p Ractor.receive_if{|msg| /baz/ =~ msg} #=> "baz2"
|
||||||
|
# end
|
||||||
|
# r << "bar1"
|
||||||
|
# r << "baz2"
|
||||||
|
# r << "foo3"
|
||||||
|
# r.take
|
||||||
|
#
|
||||||
|
# If the block returns truthy, the message will be removed from incoming queue
|
||||||
|
# and return this method with the message.
|
||||||
|
# When the block is escaped by break/return/exception and so on, the message also
|
||||||
|
# removed from the incoming queue.
|
||||||
|
# Otherwise, the messsage is remained in the incoming queue and check next received
|
||||||
|
# message by the given block.
|
||||||
|
#
|
||||||
|
# If there is no messages in the incoming queue, wait until arrival of other messages.
|
||||||
|
#
|
||||||
|
# Note that you can not call receive/receive_if in the given block recursively.
|
||||||
|
# It means that you should not do any tasks in the block.
|
||||||
|
#
|
||||||
|
# # Example:
|
||||||
|
# Ractor.current << true
|
||||||
|
# Ractor.receive_if{|msg| Ractor.receive}
|
||||||
|
# #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
|
||||||
|
#
|
||||||
|
def self.receive_if &b
|
||||||
|
Primitive.ractor_receive_if b
|
||||||
|
end
|
||||||
|
|
||||||
|
def receive_if &b
|
||||||
|
Primitive.ractor_receive_if b
|
||||||
|
end
|
||||||
|
|
||||||
# Send a message to a Ractor's incoming queue.
|
# Send a message to a Ractor's incoming queue.
|
||||||
#
|
#
|
||||||
# # Example:
|
# # Example:
|
||||||
|
@ -14,11 +14,13 @@ enum rb_ractor_basket_type {
|
|||||||
basket_type_copy,
|
basket_type_copy,
|
||||||
basket_type_move,
|
basket_type_move,
|
||||||
basket_type_will,
|
basket_type_will,
|
||||||
|
basket_type_deleted,
|
||||||
|
basket_type_reserved,
|
||||||
};
|
};
|
||||||
|
|
||||||
struct rb_ractor_basket {
|
struct rb_ractor_basket {
|
||||||
enum rb_ractor_basket_type type;
|
|
||||||
bool exception;
|
bool exception;
|
||||||
|
enum rb_ractor_basket_type type;
|
||||||
VALUE v;
|
VALUE v;
|
||||||
VALUE sender;
|
VALUE sender;
|
||||||
};
|
};
|
||||||
@ -28,6 +30,8 @@ struct rb_ractor_queue {
|
|||||||
int start;
|
int start;
|
||||||
int cnt;
|
int cnt;
|
||||||
int size;
|
int size;
|
||||||
|
unsigned int serial;
|
||||||
|
unsigned int reserved_cnt;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct rb_ractor_waiting_list {
|
struct rb_ractor_waiting_list {
|
||||||
@ -76,7 +80,7 @@ struct rb_ractor_sync {
|
|||||||
|
|
||||||
struct rb_ractor_struct {
|
struct rb_ractor_struct {
|
||||||
struct rb_ractor_sync sync;
|
struct rb_ractor_sync sync;
|
||||||
|
VALUE receiving_mutex;
|
||||||
bool yield_atexit;
|
bool yield_atexit;
|
||||||
|
|
||||||
// vm wide barrier synchronization
|
// vm wide barrier synchronization
|
||||||
|
Loading…
x
Reference in New Issue
Block a user