MDEV-18983 Port rpl_semi_sync_master_wait_for_slave_count from MySQL

This commit is contained in:
ParadoxV5 2025-05-06 20:25:57 -06:00 committed by Sergei Golubchik
parent 031f15a3b4
commit f3b21cd6f6
6 changed files with 98 additions and 13 deletions

View File

@ -1282,9 +1282,14 @@ The following specify which files/extra groups are read (specified before remain
replication in the master
--rpl-semi-sync-master-trace-level=#
The tracing level for semi-sync replication
--rpl-semi-sync-master-wait-for-slave-count=#
The number of slaves that need to acknowledge that they
have received a transaction before the transaction can
complete on the master
--rpl-semi-sync-master-wait-no-slave
Wait until timeout when no semi-synchronous replication
slave is available
Wait until timeout when less than
`rpl_semi_sync_master_wait_for_slave_count`
semi-synchronous replication slaves are available
(Defaults to on; use --skip-rpl-semi-sync-master-wait-no-slave to disable.)
--rpl-semi-sync-master-wait-point=name
Should transaction wait for semi-sync ack after having
@ -1991,6 +1996,7 @@ rowid-merge-buff-size 8388608
rpl-semi-sync-master-enabled FALSE
rpl-semi-sync-master-timeout 10000
rpl-semi-sync-master-trace-level 32
rpl-semi-sync-master-wait-for-slave-count 1
rpl-semi-sync-master-wait-no-slave TRUE
rpl-semi-sync-master-wait-point AFTER_COMMIT
rpl-semi-sync-slave-delay-master FALSE

View File

@ -3892,10 +3892,20 @@ NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME RPL_SEMI_SYNC_MASTER_WAIT_FOR_SLAVE_COUNT
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT The number of slaves that need to acknowledge that they have received a transaction before the transaction can complete on the master
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 65535
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME RPL_SEMI_SYNC_MASTER_WAIT_NO_SLAVE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BOOLEAN
VARIABLE_COMMENT Wait until timeout when no semi-synchronous replication slave is available
VARIABLE_COMMENT Wait until timeout when less than `rpl_semi_sync_master_wait_for_slave_count` semi-synchronous replication slaves are available
NUMERIC_MIN_VALUE NULL
NUMERIC_MAX_VALUE NULL
NUMERIC_BLOCK_SIZE NULL

View File

@ -520,6 +520,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_TRACE_LEVE
REPL_MASTER_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_POINT=
REPL_MASTER_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_FOR_SLAVE_COUNT=
REPL_MASTER_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MASTER_VERIFY_CHECKSUM=
REPL_MASTER_ADMIN_ACL;

View File

@ -49,6 +49,7 @@ ulonglong rpl_semi_sync_master_net_wait_num = 0;
ulong rpl_semi_sync_master_clients = 0;
ulonglong rpl_semi_sync_master_net_wait_time = 0;
ulonglong rpl_semi_sync_master_trx_wait_time = 0;
unsigned int rpl_semi_sync_master_wait_for_slave_count = 1;
Repl_semi_sync_master repl_semisync_master;
Ack_receiver ack_receiver;
@ -71,7 +72,9 @@ static ulonglong timespec_to_usec(const struct timespec *ts)
/** @return Should we revert to async because there not enough slaves? */
static bool is_no_slave()
{
return !rpl_semi_sync_master_clients && !rpl_semi_sync_master_wait_no_slave;
return rpl_semi_sync_master_clients <
rpl_semi_sync_master_wait_for_slave_count &&
!rpl_semi_sync_master_wait_no_slave;
}
int signal_waiting_transaction(THD *waiting_thd, const char *binlog_file,
@ -250,6 +253,15 @@ bool Active_tranx::is_tranx_end_pos(const char *log_file_name,
DBUG_RETURN(get_tranx_node(log_file_name, log_file_pos));
}
Tranx_node *Active_tranx::find_acked_tranx_node()
{
Tranx_node *new_front;
for (Tranx_node *entry= m_trx_front; entry; entry= entry->next)
if (entry->acks >= rpl_semi_sync_master_wait_for_slave_count)
new_front= entry;
return new_front;
}
void Active_tranx::clear_active_tranx_nodes(
const char *log_file_name, my_off_t log_file_pos,
active_tranx_action pre_delete_hook)
@ -677,16 +689,20 @@ int Repl_semi_sync_master::report_reply_binlog(uint32 server_id,
if (need_copy_send_pos)
{
Tranx_node *entry;
strmake_buf(m_reply_file_name, log_file_name);
m_reply_file_pos = log_file_pos;
m_reply_file_name_inited = true;
/* Remove all active transaction nodes before this point. */
DBUG_ASSERT(m_active_tranxs != NULL);
m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos,
signal_waiting_transaction);
if (m_active_tranxs->is_empty())
m_wait_file_name_inited= false;
entry= m_active_tranxs->get_tranx_node(log_file_name, log_file_pos);
if (entry && ++(entry->acks) >= rpl_semi_sync_master_wait_for_slave_count)
{
/* Remove all active transaction nodes before this point. */
m_active_tranxs->clear_active_tranx_nodes(log_file_name, log_file_pos,
signal_waiting_transaction);
if (m_active_tranxs->is_empty())
m_wait_file_name_inited= false;
}
DBUG_PRINT("semisync", ("%s: Got reply at (%s, %lu)",
"Repl_semi_sync_master::report_reply_binlog",
@ -1471,6 +1487,20 @@ void Repl_semi_sync_master::await_all_slave_replies(const char *msg)
DBUG_VOID_RETURN;
}
void Repl_semi_sync_master::refresh_wait_for_slave_count(uint32 server_id)
{
DBUG_ENTER("refresh_wait_for_slave_count");
lock();
if (get_master_enabled())
{
Tranx_node *entry;
DBUG_ASSERT(m_active_tranxs);
if ((entry= m_active_tranxs->find_acked_tranx_node()))
report_reply_binlog(server_id, entry->log_name, entry->log_pos);
}
unlock();
}
/* Get the waiting time given the wait's staring time.
*
* Return:

View File

@ -34,6 +34,7 @@ struct Tranx_node {
THD *thd; /* The thread awaiting an ACK */
struct Tranx_node *next; /* the next node in the sorted list */
struct Tranx_node *hash_next; /* the next node during hash collision */
unsigned int acks; ///< number of ACKs received
};
/**
@ -129,6 +130,7 @@ public:
trx_node->log_pos= 0;
trx_node->next= 0;
trx_node->hash_next= 0;
trx_node->acks= 0;
return trx_node;
}
@ -359,6 +361,13 @@ public:
int insert_tranx_node(THD *thd_to_wait, const char *log_file_name,
my_off_t log_file_pos);
/**
Find (if any) the (last) transaction node with at least
rpl_semi_sync_master_wait_for_slave_count Tranx_node::acks
@see Repl_semi_sync_master::refresh_wait_for_slave_count
*/
Tranx_node *find_acked_tranx_node();
/* Clear the active transaction nodes until(inclusive) the specified
* position.
* If log_file_name is NULL, everything will be cleared: the sorted
@ -396,7 +405,6 @@ public:
* if the internal linked list has no entries, false otherwise.
*/
bool is_empty() { return m_trx_front == NULL; }
};
/**
@ -693,6 +701,13 @@ class Repl_semi_sync_master
/*called before reset master*/
int before_reset_master();
/**
If `SET rpl_semi_sync_master_wait_for_slave_count` lowered the requirement,
the transaction queue `m_active_tranxs` needs to flush any that did not have
enough Tranx_node::acks before but now have.
*/
void refresh_wait_for_slave_count(uint32 server_id);
mysql_mutex_t LOCK_rpl_semi_sync_master_enabled;
};
@ -707,6 +722,7 @@ extern Ack_receiver ack_receiver;
/* System and status variables for the master component */
extern my_bool rpl_semi_sync_master_enabled;
extern my_bool rpl_semi_sync_master_status;
extern unsigned int rpl_semi_sync_master_wait_for_slave_count;
extern ulong rpl_semi_sync_master_wait_point;
extern ulong rpl_semi_sync_master_clients;
extern ulong rpl_semi_sync_master_timeout;

View File

@ -3826,6 +3826,15 @@ static bool fix_rpl_semi_sync_master_wait_point(sys_var *self, THD *thd,
return false;
}
static bool fix_rpl_semi_sync_master_wait_for_slave_count
(sys_var *self, THD *thd, enum_var_type type)
{
mysql_mutex_unlock(&LOCK_global_system_variables);
repl_semisync_master.refresh_wait_for_slave_count(thd->variables.server_id);
mysql_mutex_lock(&LOCK_global_system_variables);
return false;
}
static Sys_var_on_access_global<Sys_var_mybool,
PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_ENABLED>
Sys_semisync_master_enabled(
@ -3852,8 +3861,8 @@ static Sys_var_on_access_global<Sys_var_mybool,
PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_NO_SLAVE>
Sys_semisync_master_wait_no_slave(
"rpl_semi_sync_master_wait_no_slave",
"Wait until timeout when no semi-synchronous replication slave is "
"available",
"Wait until timeout when less than `rpl_semi_sync_master_wait_for_"
"slave_count` semi-synchronous replication slaves are available",
GLOBAL_VAR(rpl_semi_sync_master_wait_no_slave),
CMD_LINE(OPT_ARG), DEFAULT(TRUE),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0));
@ -3883,6 +3892,18 @@ Sys_semisync_master_wait_point(
NO_MUTEX_GUARD, NOT_IN_BINLOG,ON_CHECK(0),
ON_UPDATE(fix_rpl_semi_sync_master_wait_point));
static Sys_var_on_access_global<Sys_var_uint,
PRIV_SET_SYSTEM_GLOBAL_VAR_RPL_SEMI_SYNC_MASTER_WAIT_FOR_SLAVE_COUNT>
Sys_semisync_master_wait_for_slave_count(
"rpl_semi_sync_master_wait_for_slave_count",
"The number of slaves that need to acknowledge that they have received "
"a transaction before the transaction can complete on the master",
GLOBAL_VAR(rpl_semi_sync_master_wait_for_slave_count),
CMD_LINE(REQUIRED_ARG), VALID_RANGE(1, 0xFFFF),
DEFAULT(rpl_semi_sync_master_wait_for_slave_count), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),
ON_UPDATE(fix_rpl_semi_sync_master_wait_for_slave_count));
static bool fix_rpl_semi_sync_slave_trace_level(sys_var *self, THD *thd,
enum_var_type type)
{