Galera feature: Retry applying writesets at slaves

A new Galera feature that allows retrying of applying of writesets at
slave nodes (codership/mysql-wsrep-bugs/#1619). Currently replication
applying stops for first non ignored failure occurring in event
applying, and node will do emergency abort (or start inconsistency
voting). Some failures, however, can be concurrency related, and
applying may succeed if the operation is tried at later time.

This feature introduces a new dynamic global option variable
"wsrep_applier_retry_count" that controls the retry-applying feature:
a zero value disables retrying and a positive value sets the maximum
number of retry attempts. The default value for this option is zero,
which means that this feature is disabled by default.
This commit is contained in:
Pekka Lampio 2025-03-06 11:41:49 +02:00 committed by Sergei Golubchik
parent 612c653d6a
commit f0ee3948b6
12 changed files with 154 additions and 31 deletions

View File

@ -21,6 +21,7 @@ AND VARIABLE_NAME NOT IN (
ORDER BY VARIABLE_NAME;
VARIABLE_NAME VARIABLE_VALUE
WSREP_ALLOWLIST
WSREP_APPLIER_RETRY_COUNT 0
WSREP_AUTO_INCREMENT_CONTROL ON
WSREP_CERTIFICATION_RULES strict
WSREP_CERTIFY_NONPK ON

View File

@ -16,6 +16,21 @@ ENUM_VALUE_LIST NULL
READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
GLOBAL_VALUE_PATH NULL
VARIABLE_NAME WSREP_APPLIER_RETRY_COUNT
SESSION_VALUE NULL
GLOBAL_VALUE 0
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 0
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT Maximum number of applier retry attempts
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 4294967295
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT OPTIONAL
GLOBAL_VALUE_PATH NULL
VARIABLE_NAME WSREP_AUTO_INCREMENT_CONTROL
SESSION_VALUE NULL
GLOBAL_VALUE ON

View File

@ -4993,7 +4993,33 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
/* remove trigger's tables */
goto err;
}
#ifdef WITH_WSREP
DBUG_EXECUTE_IF("apply_event_fail_once", {
if (WSREP(thd)) {
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);
error= HA_ERR_LOCK_WAIT_TIMEOUT;
slave_rows_error_report(
INFORMATION_LEVEL, error, rgi, thd, ptr->table,
get_type_str(), RPL_LOG_NAME, log_pos);
my_error(error, MYF(0));
thd->is_slave_error= 1;
DBUG_SET("-d,apply_event_fail_once");
goto err;
}
};);
DBUG_EXECUTE_IF("apply_event_fail_always", {
if (WSREP(thd)) {
RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);
error= HA_ERR_LOCK_WAIT_TIMEOUT;
slave_rows_error_report(
INFORMATION_LEVEL, error, rgi, thd, ptr->table,
get_type_str(), RPL_LOG_NAME, log_pos);
my_error(error, MYF(0));
thd->is_slave_error= 1;
goto err;
}
};);
#endif /* WITH_WSREP */
/*
When the open and locking succeeded, we check all tables to
ensure that they still have the correct type.

View File

@ -6699,6 +6699,12 @@ static Sys_var_charptr Sys_wsrep_allowlist(
READ_ONLY GLOBAL_VAR(wsrep_allowlist), CMD_LINE(REQUIRED_ARG),
DEFAULT(""));
static Sys_var_uint Sys_wsrep_applier_retry_count (
"wsrep_applier_retry_count", "Maximum number of applier retry attempts",
GLOBAL_VAR(wsrep_applier_retry_count), CMD_LINE(OPT_ARG),
VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1),
NO_MUTEX_GUARD, NOT_IN_BINLOG);
#endif /* WITH_WSREP */
static bool fix_host_cache_size(sys_var *, THD *, enum_var_type)

View File

@ -784,3 +784,13 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name)
DBUG_RETURN(MY_TEST(res));
}
#ifdef WITH_WSREP
/* check if a named savepoint exists for the current transaction */
bool trans_savepoint_exists(THD *thd, LEX_CSTRING name)
{
SAVEPOINT **sv = find_savepoint(thd, Lex_ident_savepoint(name));
return (*sv != NULL);
}
#endif /* WITH_WSREP */

View File

@ -34,6 +34,9 @@ bool trans_rollback_stmt(THD *thd);
bool trans_savepoint(THD *thd, LEX_CSTRING name);
bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name);
bool trans_release_savepoint(THD *thd, LEX_CSTRING name);
#ifdef WITH_WSREP
bool trans_savepoint_exists(THD *thd, LEX_CSTRING name);
#endif /* WITH_WSREP */
void trans_reset_one_shot_chistics(THD *thd);

View File

@ -128,17 +128,19 @@ void wsrep_store_error(const THD* const thd,
dst.size(), dst.size() ? dst.data() : "(null)");
}
int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len)
static int apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len,
const LEX_STRING &savepoint,
bool set_savepoint)
{
char *buf= (char *)events_buf;
int rcode= 0;
int event= 1;
Log_event_type typ;
DBUG_ENTER("wsrep_apply_events");
DBUG_ENTER("apply_events");
if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld",
(long long) wsrep_thd_trx_seqno(thd));
@ -148,6 +150,15 @@ int wsrep_apply_events(THD* thd,
else
thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id;
bool in_trans = thd->in_active_multi_stmt_transaction();
if (in_trans && set_savepoint) {
if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() &&
trans_savepoint(thd, savepoint)) {
rcode = 1;
goto error;
}
}
while (buf_len)
{
int exec_res;
@ -254,6 +265,19 @@ int wsrep_apply_events(THD* thd,
delete ev;
goto error;
}
/* Transaction was started by the event, set the savepoint to rollback to
* in case of failure. */
if (!in_trans && thd->in_active_multi_stmt_transaction()) {
in_trans = true;
if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming()
&& set_savepoint && trans_savepoint(thd, savepoint)) {
delete ev;
rcode = 1;
goto error;
}
}
event++;
delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev);
@ -267,3 +291,53 @@ error:
DBUG_RETURN(rcode);
}
int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const wsrep::const_buffer& data,
wsrep::mutable_buffer& err,
bool const include_msg)
{
static char savepoint_name[20] = "wsrep_retry";
static size_t savepoint_name_len = strlen(savepoint_name);
static const LEX_STRING savepoint= { savepoint_name, savepoint_name_len };
uint n_retries = 0;
bool savepoint_exists = false;
int ret= apply_events(thd, rli, data.data(), data.size(), savepoint, true);
while (ret && n_retries < wsrep_applier_retry_count &&
(savepoint_exists = trans_savepoint_exists(thd, savepoint))) {
/* applying failed, retry applying events */
/* rollback to savepoint without telling Wsrep-lib */
thd->variables.wsrep_on = false;
if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) {
thd->variables.wsrep_on = true;
break;
}
thd->variables.wsrep_on = true;
/* reset THD object for retry */
thd->clear_error();
thd->reset_for_next_command();
/* retry applying events */
ret= apply_events(thd, rli, data.data(), data.size(), savepoint, false);
n_retries++;
}
if (savepoint_exists) {
trans_release_savepoint(thd, savepoint);
}
if (ret || wsrep_thd_has_ignored_error(thd))
{
if (ret) {
wsrep_store_error(thd, err, include_msg);
}
wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
}
return ret;
}

View File

@ -20,10 +20,11 @@
#include "rpl_rli.h" // Relay_log_info
#include "log_event.h" // Format_description_log_event
int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const void* events_buf,
size_t buf_len);
int wsrep_apply_events(THD* thd,
Relay_log_info* rli,
const wsrep::const_buffer& data,
wsrep::mutable_buffer& err,
bool const include_msg);
/* Applier error codes, when nothing better is available. */
#define WSREP_RET_SUCCESS 0 // Success

View File

@ -123,23 +123,6 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd)
thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
}
static int apply_events(THD* thd,
Relay_log_info* rli,
const wsrep::const_buffer& data,
wsrep::mutable_buffer& err,
bool const include_msg)
{
int const ret= wsrep_apply_events(thd, rli, data.data(), data.size());
if (ret || wsrep_thd_has_ignored_error(thd))
{
if (ret)
{
wsrep_store_error(thd, err, include_msg);
}
wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size());
}
return ret;
}
/****************************************************************************
High priority service
@ -442,7 +425,7 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta,
#endif
thd->set_time();
int ret= apply_events(thd, m_rli, data, err, false);
int ret= wsrep_apply_events(thd, m_rli, data, err, false);
wsrep_thd_set_ignored_error(thd, false);
trans_commit(thd);
@ -610,7 +593,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta,
#endif /* ENABLED_DEBUG_SYNC */
wsrep_setup_uk_and_fk_checks(thd);
int ret= apply_events(thd, m_rli, data, err, true);
int ret= wsrep_apply_events(thd, m_rli, data, err, true);
thd->close_temporary_tables();
if (!ret && !wsrep::commits_transaction(ws_meta.flags()))
@ -779,7 +762,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta,
ws_meta,
thd->wsrep_sr().fragments());
}
ret= ret || apply_events(thd, m_rli, data, err, true);
ret= ret || wsrep_apply_events(thd, m_rli, data, err, true);
thd->close_temporary_tables();
if (!ret && !wsrep::commits_transaction(ws_meta.flags()))
{

View File

@ -127,6 +127,7 @@ ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES;
// unit for fragment size
ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE;
uint wsrep_ignore_apply_errors= 0;
uint wsrep_applier_retry_count= 0;
std::atomic <bool> wsrep_thread_create_failed;

View File

@ -92,6 +92,7 @@ extern bool wsrep_gtid_mode;
extern uint32 wsrep_gtid_domain_id;
extern std::atomic <bool > wsrep_thread_create_failed;
extern ulonglong wsrep_mode;
extern uint wsrep_applier_retry_count;
enum enum_wsrep_reject_types {
WSREP_REJECT_NONE, /* nothing rejected */

View File

@ -1396,7 +1396,9 @@ static int replay_transaction(THD* thd,
{
Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd);
ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length());
wsrep::mutable_buffer unused;
ret= wsrep_apply_events(orig_thd, rli, {buf.c_ptr_quick(), buf.length()},
unused, false);
if (ret)
{
WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");