thread/thread.c: non-blocking push on SizedQueue
* ext/thread/thread.c (rb_szqueue_push): add optional parameter, non_block defaulted to false. [ruby-core:63794] [Feature #10052] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@46852 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
06e70ae4f3
commit
4c849f0e62
@ -1,3 +1,8 @@
|
|||||||
|
Thu Jul 17 19:57:27 2014 Herwin <herwin@quarantainenet.nl>
|
||||||
|
|
||||||
|
* ext/thread/thread.c (rb_szqueue_push): add optional parameter,
|
||||||
|
non_block defaulted to false. [ruby-core:63794] [Feature #10052]
|
||||||
|
|
||||||
Wed Jul 16 23:01:43 2014 Masaki Suketa <masaki.suketa@nifty.ne.jp>
|
Wed Jul 16 23:01:43 2014 Masaki Suketa <masaki.suketa@nifty.ne.jp>
|
||||||
|
|
||||||
* ext/win32ole/win32ole.c (ole_variant2val): support array of
|
* ext/win32ole/win32ole.c (ole_variant2val): support array of
|
||||||
|
@ -443,30 +443,55 @@ rb_szqueue_max_set(VALUE self, VALUE vmax)
|
|||||||
return vmax;
|
return vmax;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static VALUE
|
||||||
|
szqueue_push_should_block(int argc, VALUE *argv)
|
||||||
|
{
|
||||||
|
VALUE should_block = Qtrue;
|
||||||
|
switch (argc) {
|
||||||
|
case 0:
|
||||||
|
rb_raise(rb_eArgError, "wrong number of arguments (0 for 1)");
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
should_block = RTEST(argv[1]) ? Qfalse : Qtrue;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
rb_raise(rb_eArgError, "wrong number of arguments (%d for 2)", argc);
|
||||||
|
}
|
||||||
|
return should_block;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Document-method: SizedQueue#push
|
* Document-method: SizedQueue#push
|
||||||
* call-seq:
|
* call-seq:
|
||||||
* push(object)
|
* push(object, non_block=false)
|
||||||
* enq(object)
|
* enq(object, non_block=false)
|
||||||
* <<(object)
|
* <<(object)
|
||||||
*
|
*
|
||||||
* Pushes +object+ to the queue.
|
* Pushes +object+ to the queue.
|
||||||
*
|
*
|
||||||
* If there is no space left in the queue, waits until space becomes available.
|
* If there is no space left in the queue, waits until space becomes
|
||||||
|
* available, unless +non_block+ is true. If +non_block+ is true, the
|
||||||
|
* thread isn't suspended, and an exception is raised.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_szqueue_push(VALUE self, VALUE obj)
|
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
|
||||||
{
|
{
|
||||||
struct waiting_delete args;
|
struct waiting_delete args;
|
||||||
|
VALUE should_block = szqueue_push_should_block(argc, argv);
|
||||||
args.waiting = GET_SZQUEUE_WAITERS(self);
|
args.waiting = GET_SZQUEUE_WAITERS(self);
|
||||||
args.th = rb_thread_current();
|
args.th = rb_thread_current();
|
||||||
|
|
||||||
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
|
while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
|
||||||
|
if (!(int)should_block) {
|
||||||
|
rb_raise(rb_eThreadError, "queue full");
|
||||||
|
}
|
||||||
rb_ary_push(args.waiting, args.th);
|
rb_ary_push(args.waiting, args.th);
|
||||||
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
|
||||||
}
|
}
|
||||||
return queue_do_push(self, obj);
|
return queue_do_push(self, argv[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
@ -609,7 +634,7 @@ Init_thread(void)
|
|||||||
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
||||||
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
||||||
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
||||||
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
|
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
|
||||||
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
|
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
|
||||||
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
|
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
|
||||||
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
|
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
|
||||||
|
@ -97,6 +97,14 @@ class TestQueue < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
|
|
||||||
def test_sized_queue_push_interrupt
|
def test_sized_queue_push_interrupt
|
||||||
|
q = SizedQueue.new(1)
|
||||||
|
q.push(1)
|
||||||
|
assert_raise_with_message(ThreadError, /full/) do
|
||||||
|
q.push(2, true)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_sized_queue_push_non_block
|
||||||
q = SizedQueue.new(1)
|
q = SizedQueue.new(1)
|
||||||
q.push(1)
|
q.push(1)
|
||||||
t1 = Thread.new { q.push(2) }
|
t1 = Thread.new { q.push(2) }
|
||||||
|
Loading…
x
Reference in New Issue
Block a user