diff --git a/mysql-test/suite/galera/r/galera_defaults.result b/mysql-test/suite/galera/r/galera_defaults.result index 3a82297d4a3..66193c1ebc2 100644 --- a/mysql-test/suite/galera/r/galera_defaults.result +++ b/mysql-test/suite/galera/r/galera_defaults.result @@ -21,6 +21,7 @@ AND VARIABLE_NAME NOT IN ( ORDER BY VARIABLE_NAME; VARIABLE_NAME VARIABLE_VALUE WSREP_ALLOWLIST +WSREP_APPLIER_RETRY_COUNT 0 WSREP_AUTO_INCREMENT_CONTROL ON WSREP_CERTIFICATION_RULES strict WSREP_CERTIFY_NONPK ON diff --git a/mysql-test/suite/sys_vars/r/sysvars_wsrep.result b/mysql-test/suite/sys_vars/r/sysvars_wsrep.result index 92b147a76ba..68c94dcd11d 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_wsrep.result +++ b/mysql-test/suite/sys_vars/r/sysvars_wsrep.result @@ -16,6 +16,21 @@ ENUM_VALUE_LIST NULL READ_ONLY YES COMMAND_LINE_ARGUMENT REQUIRED GLOBAL_VALUE_PATH NULL +VARIABLE_NAME WSREP_APPLIER_RETRY_COUNT +SESSION_VALUE NULL +GLOBAL_VALUE 0 +GLOBAL_VALUE_ORIGIN COMPILE-TIME +DEFAULT_VALUE 0 +VARIABLE_SCOPE GLOBAL +VARIABLE_TYPE INT UNSIGNED +VARIABLE_COMMENT Maximum number of applier retry attempts +NUMERIC_MIN_VALUE 0 +NUMERIC_MAX_VALUE 4294967295 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT OPTIONAL +GLOBAL_VALUE_PATH NULL VARIABLE_NAME WSREP_AUTO_INCREMENT_CONTROL SESSION_VALUE NULL GLOBAL_VALUE ON diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 69e3cbc3d5b..8ff4c236690 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -4993,7 +4993,33 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi) /* remove trigger's tables */ goto err; } - +#ifdef WITH_WSREP + DBUG_EXECUTE_IF("apply_event_fail_once", { + if (WSREP(thd)) { + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + DBUG_SET("-d,apply_event_fail_once"); + goto err; + } + };); + DBUG_EXECUTE_IF("apply_event_fail_always", { + if (WSREP(thd)) { + RPL_TABLE_LIST *ptr= static_cast(rgi->tables_to_lock); + error= HA_ERR_LOCK_WAIT_TIMEOUT; + slave_rows_error_report( + INFORMATION_LEVEL, error, rgi, thd, ptr->table, + get_type_str(), RPL_LOG_NAME, log_pos); + my_error(error, MYF(0)); + thd->is_slave_error= 1; + goto err; + } + };); +#endif /* WITH_WSREP */ /* When the open and locking succeeded, we check all tables to ensure that they still have the correct type. diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index c090f373afa..cc18e663c27 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -6699,6 +6699,12 @@ static Sys_var_charptr Sys_wsrep_allowlist( READ_ONLY GLOBAL_VAR(wsrep_allowlist), CMD_LINE(REQUIRED_ARG), DEFAULT("")); +static Sys_var_uint Sys_wsrep_applier_retry_count ( + "wsrep_applier_retry_count", "Maximum number of applier retry attempts", + GLOBAL_VAR(wsrep_applier_retry_count), CMD_LINE(OPT_ARG), + VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG); + #endif /* WITH_WSREP */ static bool fix_host_cache_size(sys_var *, THD *, enum_var_type) diff --git a/sql/transaction.cc b/sql/transaction.cc index 2702cb192c7..4dee7bcbd0f 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -784,3 +784,13 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) DBUG_RETURN(MY_TEST(res)); } + +#ifdef WITH_WSREP +/* check if a named savepoint exists for the current transaction */ +bool trans_savepoint_exists(THD *thd, LEX_CSTRING name) +{ + SAVEPOINT **sv = find_savepoint(thd, Lex_ident_savepoint(name)); + + return (*sv != NULL); +} +#endif /* WITH_WSREP */ diff --git a/sql/transaction.h b/sql/transaction.h index 3b7e2df8807..3472205902e 100644 --- a/sql/transaction.h +++ b/sql/transaction.h @@ -34,6 +34,9 @@ bool trans_rollback_stmt(THD *thd); bool trans_savepoint(THD *thd, LEX_CSTRING name); bool trans_rollback_to_savepoint(THD *thd, LEX_CSTRING name); bool trans_release_savepoint(THD *thd, LEX_CSTRING name); +#ifdef WITH_WSREP +bool trans_savepoint_exists(THD *thd, LEX_CSTRING name); +#endif /* WITH_WSREP */ void trans_reset_one_shot_chistics(THD *thd); diff --git a/sql/wsrep_applier.cc b/sql/wsrep_applier.cc index ee3da59d4f2..87ce66cbff4 100644 --- a/sql/wsrep_applier.cc +++ b/sql/wsrep_applier.cc @@ -128,17 +128,19 @@ void wsrep_store_error(const THD* const thd, dst.size(), dst.size() ? dst.data() : "(null)"); } -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len) +static int apply_events(THD* thd, + Relay_log_info* rli, + const void* events_buf, + size_t buf_len, + const LEX_STRING &savepoint, + bool set_savepoint) { char *buf= (char *)events_buf; int rcode= 0; int event= 1; Log_event_type typ; - DBUG_ENTER("wsrep_apply_events"); + DBUG_ENTER("apply_events"); if (!buf_len) WSREP_DEBUG("empty rbr buffer to apply: %lld", (long long) wsrep_thd_trx_seqno(thd)); @@ -148,6 +150,15 @@ int wsrep_apply_events(THD* thd, else thd->variables.gtid_domain_id= global_system_variables.gtid_domain_id; + bool in_trans = thd->in_active_multi_stmt_transaction(); + if (in_trans && set_savepoint) { + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() && + trans_savepoint(thd, savepoint)) { + rcode = 1; + goto error; + } + } + while (buf_len) { int exec_res; @@ -254,6 +265,19 @@ int wsrep_apply_events(THD* thd, delete ev; goto error; } + + /* Transaction was started by the event, set the savepoint to rollback to + * in case of failure. */ + if (!in_trans && thd->in_active_multi_stmt_transaction()) { + in_trans = true; + if (wsrep_applier_retry_count > 0 && !thd->wsrep_trx().is_streaming() + && set_savepoint && trans_savepoint(thd, savepoint)) { + delete ev; + rcode = 1; + goto error; + } + } + event++; delete_or_keep_event_post_apply(thd->wsrep_rgi, typ, ev); @@ -267,3 +291,53 @@ error: DBUG_RETURN(rcode); } + +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg) +{ + static char savepoint_name[20] = "wsrep_retry"; + static size_t savepoint_name_len = strlen(savepoint_name); + static const LEX_STRING savepoint= { savepoint_name, savepoint_name_len }; + uint n_retries = 0; + bool savepoint_exists = false; + + int ret= apply_events(thd, rli, data.data(), data.size(), savepoint, true); + + while (ret && n_retries < wsrep_applier_retry_count && + (savepoint_exists = trans_savepoint_exists(thd, savepoint))) { + /* applying failed, retry applying events */ + + /* rollback to savepoint without telling Wsrep-lib */ + thd->variables.wsrep_on = false; + if (FALSE != trans_rollback_to_savepoint(thd, savepoint)) { + thd->variables.wsrep_on = true; + break; + } + thd->variables.wsrep_on = true; + + /* reset THD object for retry */ + thd->clear_error(); + thd->reset_for_next_command(); + + /* retry applying events */ + ret= apply_events(thd, rli, data.data(), data.size(), savepoint, false); + n_retries++; + } + + if (savepoint_exists) { + trans_release_savepoint(thd, savepoint); + } + + if (ret || wsrep_thd_has_ignored_error(thd)) + { + if (ret) { + wsrep_store_error(thd, err, include_msg); + } + wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); + } + + return ret; +} diff --git a/sql/wsrep_applier.h b/sql/wsrep_applier.h index e633b1b9bf2..01dd496f7d8 100644 --- a/sql/wsrep_applier.h +++ b/sql/wsrep_applier.h @@ -20,10 +20,11 @@ #include "rpl_rli.h" // Relay_log_info #include "log_event.h" // Format_description_log_event -int wsrep_apply_events(THD* thd, - Relay_log_info* rli, - const void* events_buf, - size_t buf_len); +int wsrep_apply_events(THD* thd, + Relay_log_info* rli, + const wsrep::const_buffer& data, + wsrep::mutable_buffer& err, + bool const include_msg); /* Applier error codes, when nothing better is available. */ #define WSREP_RET_SUCCESS 0 // Success diff --git a/sql/wsrep_high_priority_service.cc b/sql/wsrep_high_priority_service.cc index bb57aa59930..598251c26c6 100644 --- a/sql/wsrep_high_priority_service.cc +++ b/sql/wsrep_high_priority_service.cc @@ -123,23 +123,6 @@ static void wsrep_setup_uk_and_fk_checks(THD* thd) thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS; } -static int apply_events(THD* thd, - Relay_log_info* rli, - const wsrep::const_buffer& data, - wsrep::mutable_buffer& err, - bool const include_msg) -{ - int const ret= wsrep_apply_events(thd, rli, data.data(), data.size()); - if (ret || wsrep_thd_has_ignored_error(thd)) - { - if (ret) - { - wsrep_store_error(thd, err, include_msg); - } - wsrep_dump_rbr_buf_with_header(thd, data.data(), data.size()); - } - return ret; -} /**************************************************************************** High priority service @@ -442,7 +425,7 @@ int Wsrep_high_priority_service::apply_toi(const wsrep::ws_meta& ws_meta, #endif thd->set_time(); - int ret= apply_events(thd, m_rli, data, err, false); + int ret= wsrep_apply_events(thd, m_rli, data, err, false); wsrep_thd_set_ignored_error(thd, false); trans_commit(thd); @@ -610,7 +593,7 @@ int Wsrep_applier_service::apply_write_set(const wsrep::ws_meta& ws_meta, #endif /* ENABLED_DEBUG_SYNC */ wsrep_setup_uk_and_fk_checks(thd); - int ret= apply_events(thd, m_rli, data, err, true); + int ret= wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !wsrep::commits_transaction(ws_meta.flags())) @@ -779,7 +762,7 @@ int Wsrep_replayer_service::apply_write_set(const wsrep::ws_meta& ws_meta, ws_meta, thd->wsrep_sr().fragments()); } - ret= ret || apply_events(thd, m_rli, data, err, true); + ret= ret || wsrep_apply_events(thd, m_rli, data, err, true); thd->close_temporary_tables(); if (!ret && !wsrep::commits_transaction(ws_meta.flags())) { diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index d14c069dc1d..25028c3aebe 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -127,6 +127,7 @@ ulong wsrep_trx_fragment_unit= WSREP_FRAG_BYTES; // unit for fragment size ulong wsrep_SR_store_type= WSREP_SR_STORE_TABLE; uint wsrep_ignore_apply_errors= 0; +uint wsrep_applier_retry_count= 0; std::atomic wsrep_thread_create_failed; diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 9be4769b5fc..75d4055499f 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -92,6 +92,7 @@ extern bool wsrep_gtid_mode; extern uint32 wsrep_gtid_domain_id; extern std::atomic wsrep_thread_create_failed; extern ulonglong wsrep_mode; +extern uint wsrep_applier_retry_count; enum enum_wsrep_reject_types { WSREP_REJECT_NONE, /* nothing rejected */ diff --git a/sql/wsrep_schema.cc b/sql/wsrep_schema.cc index f8acd876a5f..29d0f8fbc31 100644 --- a/sql/wsrep_schema.cc +++ b/sql/wsrep_schema.cc @@ -1396,7 +1396,9 @@ static int replay_transaction(THD* thd, { Wsrep_schema_impl::thd_context_switch thd_context_switch(thd, orig_thd); - ret= wsrep_apply_events(orig_thd, rli, buf.ptr(), buf.length()); + wsrep::mutable_buffer unused; + ret= wsrep_apply_events(orig_thd, rli, {buf.c_ptr_quick(), buf.length()}, + unused, false); if (ret) { WSREP_WARN("Wsrep_schema::replay_transaction: failed to apply fragments");