rb_thread_lock_native_thread()

Introduce `rb_thread_lock_native_thread()` to allocate dedicated
native thread to the current Ruby thread for M:N threads.
This C API is similar to Go's `runtime.LockOSThread()`.

Accepted at https://github.com/ruby/dev-meeting-log/blob/master/2023/DevMeeting-2023-08-24.md
(and missed to implement on Ruby 3.3.0)
This commit is contained in:
Koichi Sasada 2024-02-20 19:09:23 +09:00
parent 91cb303531
commit d578684989
8 changed files with 165 additions and 14 deletions

View File

@ -0,0 +1,2 @@
# frozen_string_literal: false
create_makefile("-test-/thread/lock_native_thread")

View File

@ -0,0 +1,49 @@
#include "ruby/ruby.h"
#include "ruby/thread.h"
#ifdef HAVE_PTHREAD_H
#include <pthread.h>
static pthread_key_t tls_key;
static VALUE
get_tls(VALUE self)
{
return (VALUE)pthread_getspecific(tls_key);
}
static VALUE
set_tls(VALUE self, VALUE vn)
{
pthread_setspecific(tls_key, (void *)vn);
return Qnil;
}
static VALUE
lock_native_thread(VALUE self)
{
return rb_thread_lock_native_thread() ? Qtrue : Qfalse;
}
void
Init_lock_native_thread(void)
{
int r;
if ((r = pthread_key_create(&tls_key, NULL)) != 0) {
rb_bug("pthread_key_create() returns %d", r);
}
pthread_setspecific(tls_key, NULL);
rb_define_method(rb_cThread, "lock_native_thread", lock_native_thread, 0);
rb_define_method(rb_cThread, "get_tls", get_tls, 0);
rb_define_method(rb_cThread, "set_tls", set_tls, 1);
}
#else // HAVE_PTHREAD_H
Init_lock_native_thread(void)
{
// do nothing
}
#endif // HAVE_PTHREAD_H

View File

@ -190,6 +190,19 @@ void *rb_nogvl(void *(*func)(void *), void *data1,
*/
#define RUBY_CALL_WO_GVL_FLAG_SKIP_CHECK_INTS_
/**
* Declare the current Ruby thread should acquire a dedicated
* native thread on M:N thread scheduler.
*
* If a C extension (or a library which the extension relies on) should
* keep to run on a native thread (e.g. using thread-local-storage),
* this function allocates a dedicated native thread for the thread.
*
* @return `false` if the thread already running on a dedicated native
* thread. Otherwise `true`.
*/
bool rb_thread_lock_native_thread(void);
/**
* Triggered when a new thread is started.
*

View File

@ -0,0 +1,50 @@
# frozen_string_literal: false
require 'envutil'
mn_supported_p = -> do
out, *_ = EnvUtil.invoke_ruby([{'RUBY_MN_THREADS' => '1'}, '-v'], '', true)
return /\+MN/ =~ out
end
if mn_supported_p.call
# test only on MN threads
else
return
end
class TestThreadLockNativeThread < Test::Unit::TestCase
def test_lock_native_thread
assert_separately([{'RUBY_MN_THREADS' => '1'}], <<-RUBY)
require '-test-/thread/lock_native_thread'
Thread.new{
assert_equal true, Thread.current.lock_native_thread
}.join
# main thread already has DNT
assert_equal false, Thread.current.lock_native_thread
RUBY
end
def test_lock_native_thread_tls
assert_separately([{'RUBY_MN_THREADS' => '1'}], <<-RUBY)
require '-test-/thread/lock_native_thread'
tn = 10
ln = 1_000
ts = tn.times.map{|i|
Thread.new(i){|i|
Thread.current.set_tls i
assert_equal true, Thread.current.lock_native_thread
ln.times{
assert_equal i, Thread.current.get_tls
Thread.pass
}
}
}
ts.each(&:join)
RUBY
end
end

View File

@ -318,4 +318,10 @@ rb_thread_sched_mark_zombies(rb_vm_t *vm)
// do nothing
}
bool
rb_thread_lock_native_thread(void)
{
return false;
}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */

View File

@ -2304,6 +2304,11 @@ nt_start(void *ptr)
// timeout -> deleted.
break;
}
if (nt->dedicated) {
// SNT becomes DNT while running
break;
}
}
}
@ -3422,4 +3427,16 @@ rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th)
}
}
// return true if the current thread acquires DNT.
// return false if the current thread already acquires DNT.
bool
rb_thread_lock_native_thread(void)
{
rb_thread_t *th = GET_THREAD();
bool is_snt = th->nt->dedicated == 0;
native_thread_dedicated_inc(th->vm, th->ractor, th->nt);
return is_snt;
}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */

View File

@ -439,27 +439,35 @@ co_start(struct coroutine_context *from, struct coroutine_context *self)
// Thread is terminated
VM_ASSERT(!th_has_dedicated_nt(th));
rb_vm_t *vm = th->vm;
bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued
rb_thread_t *next_th = sched->running;
struct rb_native_thread *nt = th->nt;
bool is_dnt = th_has_dedicated_nt(th);
native_thread_assign(NULL, th);
rb_ractor_set_current_ec(th->ractor, NULL);
if (!has_ready_ractor && next_th && !next_th->nt) {
// switch to the next thread
thread_sched_set_lock_owner(sched, NULL);
thread_sched_switch0(th->sched.context, next_th, nt, true);
th->sched.finished = true;
}
else {
// switch to the next Ractor
if (is_dnt) {
// SNT became DNT while running. Just return to the nt_context
th->sched.finished = true;
coroutine_transfer0(self, nt->nt_context, true);
}
else {
rb_vm_t *vm = th->vm;
bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued
rb_thread_t *next_th = sched->running;
if (!has_ready_ractor && next_th && !next_th->nt) {
// switch to the next thread
thread_sched_set_lock_owner(sched, NULL);
thread_sched_switch0(th->sched.context, next_th, nt, true);
th->sched.finished = true;
}
else {
// switch to the next Ractor
th->sched.finished = true;
coroutine_transfer0(self, nt->nt_context, true);
}
}
rb_bug("unreachable");
}

View File

@ -1003,4 +1003,10 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr)
vm->ractor.sync.lock_owner = NULL;
}
bool
rb_thread_lock_native_thread(void)
{
return false;
}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */