Deduplicate code in sql/semisync_master.cc

* Create reüsable functions
  `is_no_slave()` & `Active_tranx::get_tranx_node()`
* Replace `Active_tranx::is_thd_waiter()`
  with equivalent method `is_tranx_end_pos()`
This commit is contained in:
ParadoxV5 2025-05-06 19:52:41 -06:00 committed by Sergei Golubchik
parent dd8f5b92ad
commit 031f15a3b4
2 changed files with 28 additions and 62 deletions

View File

@ -68,6 +68,12 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
return (ulonglong) ts->tv_sec * TIME_MILLION + ts->tv_nsec / TIME_THOUSAND;
}
/** @return Should we revert to async because there not enough slaves? */
static bool is_no_slave()
{
return !rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave;
}
int signal_waiting_transaction(THD *waiting_thd, const char *binlog_file,
my_off_t binlog_pos)
{
@ -157,6 +163,17 @@ int Active_tranx::compare(const char *log_file_name1, my_off_t log_file_pos1,
return 0;
}
Tranx_node *
Active_tranx::get_tranx_node(const char *log_file_name, my_off_t log_file_pos)
{
Tranx_node *entry;
mysql_mutex_assert_owner(m_lock);
for (entry= m_trx_htb[get_hash_value(log_file_name, log_file_pos)];
entry && compare(entry, log_file_name, log_file_pos);
entry= entry->hash_next);
return entry;
}
int Active_tranx::insert_tranx_node(THD *thd_to_wait,
const char *log_file_name,
my_off_t log_file_pos)
@ -230,23 +247,7 @@ bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::is_tranx_end_pos");
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];
while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;
entry = entry->hash_next;
}
DBUG_PRINT("semisync", ("%s: probe (%s, %lu) in entry(%u)",
"Active_tranx::is_tranx_end_pos",
log_file_name, (ulong)log_file_pos, hash_val));
DBUG_RETURN(entry != NULL);
DBUG_RETURN(get_tranx_node(log_file_name, log_file_pos));
}
void Active_tranx::clear_active_tranx_nodes(
@ -341,45 +342,14 @@ void Active_tranx::unlink_thd_as_waiter(const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::unlink_thd_as_waiter");
mysql_mutex_assert_owner(m_lock);
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];
while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;
entry = entry->hash_next;
}
Tranx_node *entry= get_tranx_node(log_file_name, log_file_pos);
if (entry)
entry->thd= NULL;
DBUG_VOID_RETURN;
}
bool Active_tranx::is_thd_waiter(THD *thd_to_check, const char *log_file_name,
my_off_t log_file_pos)
{
DBUG_ENTER("Active_tranx::assert_thd_is_waiter");
mysql_mutex_assert_owner(m_lock);
unsigned int hash_val = get_hash_value(log_file_name, log_file_pos);
Tranx_node *entry = m_trx_htb[hash_val];
while (entry != NULL)
{
if (compare(entry, log_file_name, log_file_pos) == 0)
break;
entry = entry->hash_next;
}
DBUG_RETURN(static_cast<bool>(entry));
}
/*******************************************************************************
*
* <Repl_semi_sync_master> class: the basic code layer for semisync master.
@ -565,8 +535,8 @@ void Repl_semi_sync_master::remove_slave()
{
lock();
DBUG_ASSERT(rpl_semi_sync_master_clients > 0);
if (!(--rpl_semi_sync_master_clients) && !rpl_semi_sync_master_wait_no_slave
&& get_master_enabled())
--rpl_semi_sync_master_clients;
if (is_no_slave() && get_master_enabled())
{
/*
Signal transactions waiting in commit_trx() that they do not have to
@ -861,7 +831,7 @@ int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
bool success= 0;
DBUG_ENTER("Repl_semi_sync_master::commit_trx");
if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
if (is_no_slave())
{
rpl_semi_sync_master_no_transactions++;
DBUG_RETURN(0);
@ -897,7 +867,7 @@ int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
while (is_on() && !(aborted= thd_killed(thd)))
{
/* We have to check these again as things may have changed */
if (!rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave)
if (is_no_slave())
{
aborted= 1;
break;
@ -932,8 +902,8 @@ int Repl_semi_sync_master::commit_trx(const char *trx_wait_binlog_name,
* rpl_semi_sync_master_yes/no_tx consistent with it, we check for a
* semi-sync restart _after_ checking the reply state.
*/
if (unlikely(!m_active_tranxs->is_thd_waiter(thd, trx_wait_binlog_name,
trx_wait_binlog_pos)))
if (unlikely(!m_active_tranxs->is_tranx_end_pos(trx_wait_binlog_name,
trx_wait_binlog_pos)))
{
DBUG_EXECUTE_IF(
"semisync_log_skip_trx_wait",

View File

@ -348,6 +348,9 @@ public:
unsigned long trace_level);
~Active_tranx();
/** Find (if any) the active transaction node with the specified position */
Tranx_node *get_tranx_node(const char *log_file_name, my_off_t log_file_pos);
/* Insert an active transaction node with the specified position.
*
* Return:
@ -377,13 +380,6 @@ public:
*/
void unlink_thd_as_waiter(const char *log_file_name, my_off_t log_file_pos);
/* Uses DBUG_ASSERT statements to ensure that the argument thd_to_check
* matches the thread of the respective Tranx_node::thd of the passed in
* log_file_name and log_file_pos.
*/
bool is_thd_waiter(THD *thd_to_check, const char *log_file_name,
my_off_t log_file_pos);
/* Given a position, check to see whether the position is an active
* transaction's ending position by probing the hash table.
*/