From f0ee3948b66189f7a59f5558fcaf4a538602b305 Mon Sep 17 00:00:00 2001 From: Pekka Lampio Date: Thu, 6 Mar 2025 11:41:49 +0200 Subject: [PATCH] Galera feature: Retry applying writesets at slaves A new Galera feature that allows retrying of applying of writesets at slave nodes (codership/mysql-wsrep-bugs/#1619). Currently replication applying stops for first non ignored failure occurring in event applying, and node will do emergency abort (or start inconsistency voting). Some failures, however, can be concurrency related, and applying may succeed if the operation is tried at later time. This feature introduces a new dynamic global option variable "wsrep_applier_retry_count" that controls the retry-applying feature: a zero value disables retrying and a positive value sets the maximum number of retry attempts. The default value for this option is zero, which means that this feature is disabled by default. --- .../suite/galera/r/galera_defaults.result | 1 + .../suite/sys_vars/r/sysvars_wsrep.result | 15 ++++ sql/log_event_server.cc | 28 ++++++- sql/sys_vars.cc | 6 ++ sql/transaction.cc | 10 +++ sql/transaction.h | 3 + sql/wsrep_applier.cc | 84 +++++++++++++++++-- sql/wsrep_applier.h | 9 +- sql/wsrep_high_priority_service.cc | 23 +---- sql/wsrep_mysqld.cc | 1 + sql/wsrep_mysqld.h | 1 + sql/wsrep_schema.cc | 4 +- 12 files changed, 154 insertions(+), 31 deletions(-) diff --git a/mysql-test/suite/galera/r/galera_defaults.result b/mysql-test/suite/galera/r/galera_defaults.result index 3a82297d4a3..66193c1ebc2 100644 --- a/mysql-test/suite/galera/r/galera_defaults.result +++ b/mysql-test/suite/galera/r/galera_defaults.result @@ -21,6 +21,7 @@ AND VARIABLE_NAME NOT IN ( ORDER BY VARIABLE_NAME; VARIABLE_NAME VARIABLE_VALUE WSREP_ALLOWLIST +WSREP_APPLIER_RETRY_COUNT 0 WSREP_AUTO_INCREMENT_CONTROL ON WSREP_CERTIFICATION_RULES strict WSREP_CERTIFY_NONPK ON diff --git a/mysql-test/suite/sys_vars/r/sysvars_wsrep.result b/mysql-test/suite/sys_vars/r/sysvars_wsrep.result index 92b147a76ba..68c94dcd11d 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_wsrep.result +++ b/mysql-test/suite/sys_vars/r/sysvars_wsrep.result @@ -16,6 +16,21 @@ ENUM_VALUE_LIST NULL READ_ONLY YES COMMAND_LINE_ARGUMENT REQUIRED GLOBAL_VALUE_PATH NULL +VARIABLE_NAME WSREP_APPLIER_RETRY_COUNT +SESSION_VALUE NULL +GLOBAL_VALUE 0 +GLOBAL_VALUE_ORIGIN COMPILE-TIME +DEFAULT_VALUE 0 +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE INT UNSIGNED +VARIABLE_COMMENT Maximum number of applier retry attempts +NUMERIC_MIN_VALUE 0 +NUMERIC_MAX_VALUE 4294967295 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL +GLOBAL_VALUE_PATH NULL VARIABLE_NAME WSREP_AUTO_INCREMENT_CONTROL SESSION_VALUE NULL GLOBAL_VALUE ON diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 69e3cbc3d5b..8ff4c236690 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -4993,7 +4993,33 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) /* remove trigger's tables */ goto err; } - +#ifdef WITH_WSREP + DBUG_EXECUTE_IF("apply_event_fail_once", { + if (WSREP(thd)) { + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + DBUG_SET("-d,apply_event_fail_once"); + goto err; + } + };); + DBUG_EXECUTE_IF("apply_event_fail_always", { + if (WSREP(thd)) { + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + goto err; + } + };); +#endif /* WITH_WSREP */ /* When the open and locking succeeded, we check all tables to ensure that they still have the correct type. diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index c090f373afa..cc18e663c27 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -6699,6 +6699,12 @@ static Sys_var_charptr Sys_wsrep_allowlist( READ_ONLY GLOBAL_VAR(wsrep_allowlist), CMD_LINE(REQUIRED_ARG), DEFAULT("")); +static Sys_var_uint Sys_wsrep_applier_retry_count ( + "wsrep_applier_retry_count", "Maximum number of applier retry attempts", + GLOBAL_VAR(wsrep_applier_retry_count), CMD_LINE(OPT_ARG), + VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG); + #endif /* WITH_WSREP */ static bool fix_host_cache_size(sys_var *, THD *, enum_var_type) diff --git a/sql/transaction.cc b/sql/transaction.cc index 2702cb192c7..4dee7bcbd0f 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -784,3 +784,13 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) DBUG_RETURN(MY_TEST(res)); } + +#ifdef WITH_WSREP +/* check if a named savepoint exists for the current transaction */ +bool trans_savepoint_exists(THD *thd, LEX_CSTRING name) +{ + SAVEPOINT **sv = find_savepoint(thd, Lex_ident_savepoint(name)); + + return (*sv != NULL); +} +#endif /* WITH_WSREP */ diff --git a/sql/transaction.h b/sql/transaction.h index 3b7e2df8807..3472205902e 100644 --- a/sql/transaction.h +++ b/sql/transaction.h @@ -34,6 +34,9 @@ bool trans_rollback_stmt(THD *thd); bool trans_savepoint(THD *thd, LEX_CSTRING name); bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name); bool trans_release_savepoint(THD *thd, LEX_CSTRING name); +#ifdef WITH_WSREP +bool trans_savepoint_exists(THD *thd, LEX_CSTRING name); +#endif /* WITH_WSREP */ void trans_reset_one_shot_chistics(THD *thd); diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index ee3da59d4f2..87ce66cbff4 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -128,17 +128,19 @@ void wsrep_store_error(const THD* const thd, dst.size(), dst.size() ? dst.data() : "(null)"); } -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len) +static int apply_events(THD* thd, + Relay_log_info* rli, + const void* events_buf, + size_t buf_len, + const LEX_STRING &savepoint, + bool set_savepoint) { char *buf= (char *)events_buf; int rcode= 0; int event= 1; Log_event_type typ; - DBUG_ENTER("wsrep_apply_events"); + DBUG_ENTER("apply_events"); if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); @@ -148,6 +150,15 @@ int wsrep_apply_events(THD* thd, else thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + bool in_trans = thd->in_active_multi_stmt_transaction(); + if (in_trans && set_savepoint) { + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() && + trans_savepoint(thd, savepoint)) { + rcode = 1; + goto error; + } + } + while (buf_len) { int exec_res; @@ -254,6 +265,19 @@ int wsrep_apply_events(THD* thd, delete ev; goto error; } + + /* Transaction was started by the event, set the savepoint to rollback to + * in case of failure. */ + if (!in_trans && thd->in_active_multi_stmt_transaction()) { + in_trans = true; + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() + && set_savepoint && trans_savepoint(thd, savepoint)) { + delete ev; + rcode = 1; + goto error; + } + } + event++; delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); @@ -267,3 +291,53 @@ error: DBUG_RETURN(rcode); } + +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg) +{ + static char savepoint_name[20] = "wsrep_retry"; + static size_t savepoint_name_len = strlen(savepoint_name); + static const LEX_STRING savepoint= { savepoint_name, savepoint_name_len }; + uint n_retries = 0; + bool savepoint_exists = false; + + int ret= apply_events(thd, rli, data.data(), data.size(), savepoint, true); + + while (ret && n_retries < wsrep_applier_retry_count && + (savepoint_exists = trans_savepoint_exists(thd, savepoint))) { + /* applying failed, retry applying events */ + + /* rollback to savepoint without telling Wsrep-lib */ + thd->variables.wsrep_on = false; + if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) { + thd->variables.wsrep_on = true; + break; + } + thd->variables.wsrep_on = true; + + /* reset THD object for retry */ + thd->clear_error(); + thd->reset_for_next_command(); + + /* retry applying events */ + ret= apply_events(thd, rli, data.data(), data.size(), savepoint, false); + n_retries++; + } + + if (savepoint_exists) { + trans_release_savepoint(thd, savepoint); + } + + if (ret || wsrep_thd_has_ignored_error(thd)) + { + if (ret) { + wsrep_store_error(thd, err, include_msg); + } + wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); + } + + return ret; +} diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h index e633b1b9bf2..01dd496f7d8 100644 --- a/sql/wsrep_applier.h +++ b/sql/wsrep_applier.h @@ -20,10 +20,11 @@ #include "rpl_rli.h" // Relay_log_info #include "log_event.h" // Format_description_log_event -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len); +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg); /* Applier error codes, when nothing better is available. */ #define WSREP_RET_SUCCESS 0 // Success diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index bb57aa59930..598251c26c6 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -123,23 +123,6 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd) thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; } -static int apply_events(THD* thd, - Relay_log_info* rli, - const wsrep::const_buffer& data, - wsrep::mutable_buffer& err, - bool const include_msg) -{ - int const ret= wsrep_apply_events(thd, rli, data.data(), data.size()); - if (ret || wsrep_thd_has_ignored_error(thd)) - { - if (ret) - { - wsrep_store_error(thd, err, include_msg); - } - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } - return ret; -} /**************************************************************************** High priority service @@ -442,7 +425,7 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, #endif thd->set_time(); - int ret= apply_events(thd, m_rli, data, err, false); + int ret= wsrep_apply_events(thd, m_rli, data, err, false); wsrep_thd_set_ignored_error(thd, false); trans_commit(thd); @@ -610,7 +593,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, #endif /* ENABLED_DEBUG_SYNC */ wsrep_setup_uk_and_fk_checks(thd); - int ret= apply_events(thd, m_rli, data, err, true); + int ret= wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !wsrep::commits_transaction(ws_meta.flags())) @@ -779,7 +762,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, ws_meta, thd->wsrep_sr().fragments()); } - ret= ret || apply_events(thd, m_rli, data, err, true); + ret= ret || wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !wsrep::commits_transaction(ws_meta.flags())) { diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index d14c069dc1d..25028c3aebe 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -127,6 +127,7 @@ ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES; // unit for fragment size ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE; uint wsrep_ignore_apply_errors= 0; +uint wsrep_applier_retry_count= 0; std::atomic wsrep_thread_create_failed; diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 9be4769b5fc..75d4055499f 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -92,6 +92,7 @@ extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; extern std::atomic wsrep_thread_create_failed; extern ulonglong wsrep_mode; +extern uint wsrep_applier_retry_count; enum enum_wsrep_reject_types { WSREP_REJECT_NONE, /* nothing rejected */ diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index f8acd876a5f..29d0f8fbc31 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1396,7 +1396,9 @@ static int replay_transaction(THD* thd, { Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd); - ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length()); + wsrep::mutable_buffer unused; + ret= wsrep_apply_events(orig_thd, rli, {buf.c_ptr_quick(), buf.length()}, + unused, false); if (ret) { WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");