make monitor.so for performance. (#2576)

Recent monitor.rb has performance problem because of interrupt
handlers. 'Monitor#synchronize' is frequently used primitive
so the performance of this method is important.

This patch rewrite 'monitor.rb' with 'monitor.so' (C-extension)
and make it faster. See [Feature #16255] for details.

Monitor class objects are normal object which include MonitorMixin.
This patch introduce a Monitor class which is implemented on C
and MonitorMixin uses Monitor object as re-entrant (recursive)
Mutex. This technique improve performance because we don't need
to care atomicity and we don't need accesses to instance variables
any more on Monitor class.
This commit is contained in:
Koichi Sasada 2019-10-20 04:52:20 +09:00 committed by GitHub
parent 434966bffd
commit caac5f777a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
Notes: git 2019-10-20 04:52:45 +09:00
Merged-By: ko1 <ko1@atdot.net>
6 changed files with 250 additions and 79 deletions

3
NEWS
View File

@ -511,6 +511,9 @@ File::
* File.realpath now uses realpath(3) on many platforms, which can * File.realpath now uses realpath(3) on many platforms, which can
significantly improve performance. significantly improve performance.
Monitor::
* Monitor class is written in C-extension. [Feature #16255]
Thread:: Thread::
* VM stack memory allocation is now combined with native thread stack, * VM stack memory allocation is now combined with native thread stack,

13
ext/monitor/depend Normal file
View File

@ -0,0 +1,13 @@
# AUTOGENERATED DEPENDENCIES START
monitor.o: $(RUBY_EXTCONF_H)
monitor.o: $(arch_hdrdir)/ruby/config.h
monitor.o: $(hdrdir)/ruby/assert.h
monitor.o: $(hdrdir)/ruby/backward.h
monitor.o: $(hdrdir)/ruby/defines.h
monitor.o: $(hdrdir)/ruby/intern.h
monitor.o: $(hdrdir)/ruby/missing.h
monitor.o: $(hdrdir)/ruby/ruby.h
monitor.o: $(hdrdir)/ruby/st.h
monitor.o: $(hdrdir)/ruby/subst.h
monitor.o: monitor.c
# AUTOGENERATED DEPENDENCIES END

2
ext/monitor/extconf.rb Normal file
View File

@ -0,0 +1,2 @@
require 'mkmf'
create_makefile('monitor')

View File

@ -86,10 +86,10 @@
# This Class is implemented as subclass of Array which includes the # This Class is implemented as subclass of Array which includes the
# MonitorMixin module. # MonitorMixin module.
# #
module MonitorMixin
EXCEPTION_NEVER = {Exception => :never}.freeze
EXCEPTION_IMMEDIATE = {Exception => :immediate}.freeze
require 'monitor.so'
module MonitorMixin
# #
# FIXME: This isn't documented in Nutshell. # FIXME: This isn't documented in Nutshell.
# #
@ -97,8 +97,6 @@ module MonitorMixin
# above calls while_wait and signal, this class should be documented. # above calls while_wait and signal, this class should be documented.
# #
class ConditionVariable class ConditionVariable
class Timeout < Exception; end
# #
# Releases the lock held in the associated monitor and waits; reacquires the lock on wakeup. # Releases the lock held in the associated monitor and waits; reacquires the lock on wakeup.
# #
@ -106,17 +104,13 @@ module MonitorMixin
# even if no other thread doesn't signal. # even if no other thread doesn't signal.
# #
def wait(timeout = nil) def wait(timeout = nil)
Thread.handle_interrupt(EXCEPTION_NEVER) do @monitor.mon_check_owner
@monitor.__send__(:mon_check_owner) count = @monitor.__send__(:exit_for_cond)
count = @monitor.__send__(:mon_exit_for_cond)
begin begin
Thread.handle_interrupt(EXCEPTION_IMMEDIATE) do @cond.wait(@monitor.__send__(:mutex_for_cond), timeout)
@cond.wait(@monitor.instance_variable_get(:@mon_mutex), timeout)
end
return true return true
ensure ensure
@monitor.__send__(:mon_enter_for_cond, count) @monitor.__send__(:enter_for_cond, count)
end
end end
end end
@ -142,7 +136,7 @@ module MonitorMixin
# Wakes up the first thread in line waiting for this lock. # Wakes up the first thread in line waiting for this lock.
# #
def signal def signal
@monitor.__send__(:mon_check_owner) @monitor.mon_check_owner
@cond.signal @cond.signal
end end
@ -150,7 +144,7 @@ module MonitorMixin
# Wakes up all threads waiting for this lock. # Wakes up all threads waiting for this lock.
# #
def broadcast def broadcast
@monitor.__send__(:mon_check_owner) @monitor.mon_check_owner
@cond.broadcast @cond.broadcast
end end
@ -171,15 +165,7 @@ module MonitorMixin
# Attempts to enter exclusive section. Returns +false+ if lock fails. # Attempts to enter exclusive section. Returns +false+ if lock fails.
# #
def mon_try_enter def mon_try_enter
if @mon_owner != Thread.current @mon_data.try_enter
unless @mon_mutex.try_lock
return false
end
@mon_owner = Thread.current
@mon_count = 0
end
@mon_count += 1
return true
end end
# For backward compatibility # For backward compatibility
alias try_mon_enter mon_try_enter alias try_mon_enter mon_try_enter
@ -188,12 +174,7 @@ module MonitorMixin
# Enters exclusive section. # Enters exclusive section.
# #
def mon_enter def mon_enter
if @mon_owner != Thread.current @mon_data.enter
@mon_mutex.lock
@mon_owner = Thread.current
@mon_count = 0
end
@mon_count += 1
end end
# #
@ -201,25 +182,21 @@ module MonitorMixin
# #
def mon_exit def mon_exit
mon_check_owner mon_check_owner
@mon_count -=1 @mon_data.exit
if @mon_count == 0
@mon_owner = nil
@mon_mutex.unlock
end
end end
# #
# Returns true if this monitor is locked by any thread # Returns true if this monitor is locked by any thread
# #
def mon_locked? def mon_locked?
@mon_mutex.locked? @mon_data.mon_locked?
end end
# #
# Returns true if this monitor is locked by current thread. # Returns true if this monitor is locked by current thread.
# #
def mon_owned? def mon_owned?
@mon_mutex.locked? && @mon_owner == Thread.current @mon_data.mon_owned?
end end
# #
@ -227,14 +204,12 @@ module MonitorMixin
# section automatically when the block exits. See example under # section automatically when the block exits. See example under
# +MonitorMixin+. # +MonitorMixin+.
# #
def mon_synchronize def mon_synchronize(&b)
# Prevent interrupt on handling interrupts; for example timeout errors @mon_data.enter
# it may break locking state.
Thread.handle_interrupt(EXCEPTION_NEVER){ mon_enter }
begin begin
yield yield
ensure ensure
Thread.handle_interrupt(EXCEPTION_NEVER){ mon_exit } @mon_data.exit
end end
end end
alias synchronize mon_synchronize alias synchronize mon_synchronize
@ -244,7 +219,7 @@ module MonitorMixin
# receiver. # receiver.
# #
def new_cond def new_cond
return ConditionVariable.new(self) return ConditionVariable.new(@mon_data)
end end
private private
@ -260,31 +235,15 @@ module MonitorMixin
# Initializes the MonitorMixin after being included in a class or when an # Initializes the MonitorMixin after being included in a class or when an
# object has been extended with the MonitorMixin # object has been extended with the MonitorMixin
def mon_initialize def mon_initialize
if defined?(@mon_mutex) && @mon_mutex_owner_object_id == object_id if defined?(@mon_data) && @mon_data_owner_object_id == self.object_id
raise ThreadError, "already initialized" raise ThreadError, "already initialized"
end end
@mon_mutex = Thread::Mutex.new @mon_data = ::Monitor.new
@mon_mutex_owner_object_id = object_id @mon_data_owner_object_id = self.object_id
@mon_owner = nil
@mon_count = 0
end end
def mon_check_owner def mon_check_owner
if @mon_owner != Thread.current @mon_data.mon_check_owner
raise ThreadError, "current thread not owner"
end
end
def mon_enter_for_cond(count)
@mon_owner = Thread.current
@mon_count = count
end
def mon_exit_for_cond
count = @mon_count
@mon_owner = nil
@mon_count = 0
return count
end end
end end
@ -299,12 +258,17 @@ end
# end # end
# #
class Monitor class Monitor
include MonitorMixin def new_cond
alias try_enter try_mon_enter ::MonitorMixin::ConditionVariable.new(self)
alias enter mon_enter end
alias exit mon_exit
end
# for compatibility
alias try_mon_enter try_enter
alias mon_try_enter try_enter
alias mon_enter enter
alias mon_exit exit
alias mon_synchronize synchronize
end
# Documentation comments: # Documentation comments:
# - All documentation comes from Nutshell. # - All documentation comes from Nutshell.

189
ext/monitor/monitor.c Normal file
View File

@ -0,0 +1,189 @@
#include "ruby/ruby.h"
/* Thread::Monitor */
struct rb_monitor {
long count;
const VALUE owner;
const VALUE mutex;
};
static void
monitor_mark(void *ptr)
{
struct rb_monitor *mc = ptr;
rb_gc_mark(mc->owner);
rb_gc_mark(mc->mutex);
}
static size_t
monitor_memsize(const void *ptr)
{
return sizeof(struct rb_monitor);
}
static const rb_data_type_t monitor_data_type = {
"monitor",
{monitor_mark, RUBY_TYPED_DEFAULT_FREE, monitor_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
};
static VALUE
monitor_alloc(VALUE klass)
{
struct rb_monitor *mc;
VALUE obj;
obj = TypedData_Make_Struct(klass, struct rb_monitor, &monitor_data_type, mc);
RB_OBJ_WRITE(obj, &mc->mutex, rb_mutex_new());
RB_OBJ_WRITE(obj, &mc->owner, Qnil);
mc->count = 0;
return obj;
}
static struct rb_monitor *
monitor_ptr(VALUE monitor)
{
struct rb_monitor *mc;
TypedData_Get_Struct(monitor, struct rb_monitor, &monitor_data_type, mc);
return mc;
}
static int
mc_owner_p(struct rb_monitor *mc)
{
return mc->owner == rb_thread_current();
}
static VALUE
monitor_try_enter(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
if (!mc_owner_p(mc)) {
if (!rb_mutex_trylock(mc->mutex)) {
return Qfalse;
}
RB_OBJ_WRITE(monitor, &mc->owner, rb_thread_current());
mc->count = 0;
}
mc->count += 1;
return Qtrue;
}
static VALUE
monitor_enter(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
if (!mc_owner_p(mc)) {
rb_mutex_lock(mc->mutex);
RB_OBJ_WRITE(monitor, &mc->owner, rb_thread_current());
mc->count = 0;
}
mc->count++;
return Qnil;
}
static VALUE
monitor_exit(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
mc->count--;
if (mc->count == 0) {
RB_OBJ_WRITE(monitor, &mc->owner, Qnil);
rb_mutex_unlock(mc->mutex);
}
return Qnil;
}
static VALUE
monitor_locked_p(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
return rb_mutex_locked_p(mc->mutex);
}
static VALUE
monitor_owned_p(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
return (rb_mutex_locked_p(mc->mutex) && mc_owner_p(mc)) ? Qtrue : Qfalse;
}
static VALUE
monitor_check_owner(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
if (!mc_owner_p(mc)) {
rb_raise(rb_eThreadError, "current thread not owner");
}
return Qnil;
}
static VALUE
monitor_enter_for_cond(VALUE monitor, VALUE count)
{
struct rb_monitor *mc = monitor_ptr(monitor);
RB_OBJ_WRITE(monitor, &mc->owner, rb_thread_current());
mc->count = NUM2LONG(count);
return Qnil;
}
static VALUE
monitor_exit_for_cond(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
long cnt = mc->count;
RB_OBJ_WRITE(monitor, &mc->owner, Qnil);
mc->count = 0;
return LONG2NUM(cnt);
}
static VALUE
monitor_mutex_for_cond(VALUE monitor)
{
struct rb_monitor *mc = monitor_ptr(monitor);
return mc->mutex;
}
static VALUE
monitor_sync_body(VALUE monitor)
{
return rb_yield_values(0);
}
static VALUE
monitor_sync_ensure(VALUE monitor)
{
return monitor_exit(monitor);
}
static VALUE
monitor_synchronize(VALUE monitor)
{
monitor_enter(monitor);
return rb_ensure(monitor_sync_body, monitor, monitor_sync_ensure, monitor);
}
void
Init_monitor(void)
{
VALUE rb_cMonitor = rb_define_class("Monitor", rb_cObject);
rb_define_alloc_func(rb_cMonitor, monitor_alloc);
rb_define_method(rb_cMonitor, "try_enter", monitor_try_enter, 0);
rb_define_method(rb_cMonitor, "enter", monitor_enter, 0);
rb_define_method(rb_cMonitor, "exit", monitor_exit, 0);
rb_define_method(rb_cMonitor, "synchronize", monitor_synchronize, 0);
/* internal methods for MonitorMixin */
rb_define_method(rb_cMonitor, "mon_locked?", monitor_locked_p, 0);
rb_define_method(rb_cMonitor, "mon_check_owner", monitor_check_owner, 0);
rb_define_method(rb_cMonitor, "mon_owned?", monitor_owned_p, 0);
/* internal methods for MonitorMixin::ConditionalVariable */
rb_define_private_method(rb_cMonitor, "enter_for_cond", monitor_enter_for_cond, 1);
rb_define_private_method(rb_cMonitor, "exit_for_cond", monitor_exit_for_cond, 0);
rb_define_private_method(rb_cMonitor, "mutex_for_cond", monitor_mutex_for_cond, 0);
}

View File

@ -273,24 +273,24 @@ class TestMonitor < Test::Unit::TestCase
end end
def test_wait_interruption def test_wait_interruption
queue = Queue.new
cond = @monitor.new_cond cond = @monitor.new_cond
@monitor.define_singleton_method(:mon_enter_for_cond) do |*args|
queue.deq
super(*args)
end
th = Thread.start { th = Thread.start {
@monitor.synchronize do @monitor.synchronize do
begin begin
cond.wait(0.1) cond.wait(0.1)
@monitor.mon_owned?
rescue Interrupt rescue Interrupt
@monitor.instance_variable_get(:@mon_owner) @monitor.mon_owned?
end end
end end
} }
sleep(0.1) sleep(0.1)
th.raise(Interrupt) th.raise(Interrupt)
queue.enq(nil)
assert_equal th, th.value begin
assert_equal true, th.value
rescue Interrupt
end
end end
end end