MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel

replication causing replication to fail.

Remove the temporary fix for MDEV-5914, which used READ COMMITTED for parallel
replication worker threads. Replace it with a better, more selective solution.

The issue is with certain edge cases of InnoDB gap locks, for example between
INSERT and ranged DELETE. It is possible for the gap lock set by the DELETE to
block the INSERT, if the DELETE runs first, while the record lock set by
INSERT does not block the DELETE, if the INSERT runs first. This can cause a
conflict between the two in parallel replication on the slave even though they
ran without conflicts on the master.

With this patch, InnoDB will ask the server layer about the two involved
transactions before blocking on a gap lock. If the server layer tells InnoDB
that the transactions are already fixed wrt. commit order, as they are in
parallel replication, InnoDB will ignore the gap lock and allow the two
transactions to proceed in parallel, avoiding the conflict.

Improve the fix for MDEV-6020. When InnoDB itself detects a deadlock, it now
asks the server layer for any preferences about which transaction to roll
back. In case of parallel replication with two transactions T1 and T2 fixed to
commit T1 before T2, the server layer will ask InnoDB to roll back T2 as the
deadlock victim, not T1. This helps in some cases to avoid excessive deadlock
rollback, as T2 will in any case need to wait for T1 to complete before it can
itself commit.

Also some misc. fixes found during development and testing:

 - Remove thd_rpl_is_parallel(), it is not used or needed.

 - Use KILL_CONNECTION instead of KILL_QUERY when a parallel replication
   worker thread is killed to resolve a deadlock with fixed commit
   ordering. There are some cases, eg. in sql/sql_parse.cc, where a KILL_QUERY
   can be ignored if the query otherwise completed successfully, and this
   could cause the deadlock kill to be lost, so that the deadlock was not
   correctly resolved.

 - Fix random test failure due to missing wait_for_binlog_checkpoint.inc.

 - Make sure that deadlock or other temporary errors during parallel
   replication are not printed to the the error log; there were some places
   around the replication code with extra error logging. These conditions can
   occur occasionally and are handled automatically without breaking
   replication, so they should not pollute the error log.

 - Fix handling of rgi->gtid_sub_id. We need to be able to access this also at
   the end of a transaction, to be able to detect and resolve deadlocks due to
   commit ordering. But this value was also used as a flag to mark whether
   record_gtid() had been called, by being set to zero, losing the value. Now,
   introduce a separate flag rgi->gtid_pending, so rgi->gtid_sub_id remains
   valid for the entire duration of the transaction.

 - Fix one place where the code to handle ignored errors called reset_killed()
   unconditionally, even if no error was caught that should be ignored. This
   could cause loss of a deadlock kill signal, breaking deadlock detection and
   resolution.

 - Fix a couple of missing mysql_reset_thd_for_next_command(). This could
   cause a prior error condition to remain for the next event executed,
   causing assertions about errors already being set and possibly giving
   incorrect error handling for following event executions.

 - Fix code that cleared thd->rgi_slave in the parallel replication worker
   threads after each event execution; this caused the deadlock detection and
   handling code to not be able to correctly process the associated
   transactions as belonging to replication worker threads.

 - Remove useless error code in slave_background_kill_request().

 - Fix bug where wfc->wakeup_error was not cleared at
   wait_for_commit::unregister_wait_for_prior_commit(). This could cause the
   error condition to wrongly propagate to a later wait_for_prior_commit(),
   causing spurious ER_PRIOR_COMMIT_FAILED errors.

 - Do not put the binlog background thread into the processlist. It causes
   too many result differences in mtr, but also it probably is not useful
   for users to pollute the process list with a system thread that does not
   really perform any user-visible tasks...
This commit is contained in:
unknown 2014-06-10 10:13:15 +02:00 committed by Kristian Nielsen
parent 629b822913
commit bd4153a8c2
23 changed files with 273 additions and 179 deletions

View File

@ -622,7 +622,6 @@ void **thd_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
void thd_storage_lock_wait(MYSQL_THD thd, long long value);
int thd_tx_isolation(const MYSQL_THD thd);
int thd_tx_is_read_only(const MYSQL_THD thd);
int thd_rpl_is_parallel(const MYSQL_THD thd);
/**
Create a temporary file.
@ -782,6 +781,28 @@ int thd_need_wait_for(const MYSQL_THD thd);
*/
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
/*
If the storage engine detects a deadlock, and needs to choose a victim
transaction to roll back, it can call this function to ask the upper
server layer for which of two possible transactions is prefered to be
aborted and rolled back.
In parallel replication, if two transactions are running in parallel and
one is fixed to commit before the other, then the one that commits later
will be prefered as the victim - chosing the early transaction as a victim
will not resolve the deadlock anyway, as the later transaction still needs
to wait for the earlier to commit.
Otherwise, a transaction that uses only transactional tables, and can thus
be safely rolled back, will be prefered as a deadlock victim over a
transaction that also modified non-transactional (eg. MyISAM) tables.
The return value is -1 if the first transaction is prefered as a deadlock
victim, 1 if the second transaction is prefered, or 0 for no preference (in
which case the storage engine can make the choice as it prefers).
*/
int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);
#ifdef __cplusplus
}
#endif

View File

@ -303,7 +303,6 @@ void **thd_ha_data(const void* thd, const struct handlerton *hton);
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
@ -317,6 +316,7 @@ void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
struct mysql_event_general
{
unsigned int event_subclass;

View File

@ -303,7 +303,6 @@ void **thd_ha_data(const void* thd, const struct handlerton *hton);
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
@ -317,6 +316,7 @@ void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{

View File

@ -256,7 +256,6 @@ void **thd_ha_data(const void* thd, const struct handlerton *hton);
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
@ -270,6 +269,7 @@ void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,

View File

@ -314,7 +314,7 @@ SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1964]
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
a b
@ -398,7 +398,7 @@ SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1964]
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
SET debug_sync='RESET';
SET GLOBAL slave_parallel_threads=0;
SET GLOBAL slave_parallel_threads=10;
@ -481,7 +481,7 @@ SET debug_sync='now WAIT_FOR t1_ready';
KILL THD_ID;
SET debug_sync='now WAIT_FOR t2_killed';
SET debug_sync='now SIGNAL t1_cont';
include/wait_for_slave_sql_error.inc [errno=1317,1964]
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
a b
51 51

View File

@ -438,7 +438,7 @@ SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1964
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
@ -573,7 +573,7 @@ SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1964
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
# Now we have to disable the debug_sync statements, so they do not trigger
@ -712,7 +712,7 @@ SET debug_sync='now WAIT_FOR t2_killed';
# Now we can allow T1 to proceed.
SET debug_sync='now SIGNAL t1_cont';
--let $slave_sql_errno= 1317,1964
--let $slave_sql_errno= 1317,1927,1964
--source include/wait_for_slave_sql_error.inc
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
@ -1277,6 +1277,7 @@ eval SELECT IF('$io_pos' = '$sql_pos', "OK", "Not ok, $io_pos <> $sql_pos") AS t
--connection server_1
FLUSH LOGS;
--source include/wait_for_binlog_checkpoint.inc
--save_master_pos
--connection server_2

View File

@ -6836,7 +6836,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
/* Interrupted by kill. */
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior_killed");
wfc->wakeup_error= orig_entry->thd->killed_errno();
if (wfc->wakeup_error)
if (!wfc->wakeup_error)
wfc->wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wfc->wakeup_error, ER(wfc->wakeup_error), MYF(0));
DBUG_RETURN(-1);

View File

@ -190,6 +190,28 @@ static const char *HA_ERR(int i)
return "No Error!";
}
/*
Return true if an error caught during event execution is a temporary error
that will cause automatic retry of the event group during parallel
replication, false otherwise.
In parallel replication, conflicting transactions can occasionally cause
deadlocks; such errors are handled automatically by rolling back re-trying
the transactions, so should not pollute the error log.
*/
static bool
is_parallel_retry_error(rpl_group_info *rgi, int err)
{
if (!rgi->is_parallel_exec)
return false;
if (rgi->killed_for_retry &&
(err == ER_QUERY_INTERRUPTED || err == ER_CONNECTION_KILLED))
return true;
return has_temporary_error(rgi->thd);
}
/**
Error reporting facility for Rows_log_event::do_apply_event
@ -218,6 +240,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
const Sql_condition *err;
Relay_log_info const *rli= rgi->rli;
buff[0]= 0;
int errcode= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0;
/*
In parallel replication, deadlocks or other temporary errors can happen
@ -225,8 +248,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
automatically by re-trying the transactions. So do not pollute the error
log with messages about them.
*/
if (rgi->is_parallel_exec &&
(rgi->killed_for_retry || has_temporary_error(thd)))
if (is_parallel_retry_error(rgi, errcode))
return;
for (err= it++, slider= buff; err && slider < buff_end - 1;
@ -238,8 +260,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
}
if (ha_error != 0)
rli->report(level, thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0,
rgi->gtid_info(),
rli->report(level, errcode, rgi->gtid_info(),
"Could not execute %s event on table %s.%s;"
"%s handler error %s; "
"the event's master log %s, end_log_pos %lu",
@ -247,8 +268,7 @@ static void inline slave_rows_error_report(enum loglevel level, int ha_error,
buff, handler_error == NULL ? "<unknown>" : handler_error,
log_name, pos);
else
rli->report(level, thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0,
rgi->gtid_info(),
rli->report(level, errcode, rgi->gtid_info(),
"Could not execute %s event on table %s.%s;"
"%s the event's master log %s, end_log_pos %lu",
type, table->s->db.str, table->s->table_name.str,
@ -4098,7 +4118,8 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
*/
int error;
char llbuff[22];
if ((error= rows_event_stmt_cleanup(rgi, thd)))
if ((error= rows_event_stmt_cleanup(rgi, thd)) &&
!is_parallel_retry_error(rgi, error))
{
rli->report(ERROR_LEVEL, error, rgi->gtid_info(),
"Error in cleaning up after an event preceding the commit; "
@ -4245,22 +4266,24 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
Record any GTID in the same transaction, so slave state is
transactionally consistent.
*/
if (current_stmt_is_commit && (sub_id= rgi->gtid_sub_id))
if (current_stmt_is_commit && rgi->gtid_pending)
{
/* Clear the GTID from the RLI so we don't accidentally reuse it. */
rgi->gtid_sub_id= 0;
sub_id= rgi->gtid_sub_id;
rgi->gtid_pending= false;
gtid= rgi->current_gtid;
thd->variables.option_bits&= ~OPTION_GTID_BEGIN;
if (rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false))
{
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
rgi->gtid_info(),
"Error during COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
int errcode= thd->get_stmt_da()->sql_errno();
if (!is_parallel_retry_error(rgi, errcode))
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE,
rgi->gtid_info(),
"Error during COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str,
errcode,
thd->get_stmt_da()->message());
trans_rollback(thd);
sub_id= 0;
thd->is_slave_error= 1;
@ -4407,18 +4430,21 @@ Default database: '%s'. Query: '%s'",
{
DBUG_PRINT("info",("error ignored"));
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
thd->reset_killed();
if (actual_error == ER_QUERY_INTERRUPTED ||
actual_error == ER_CONNECTION_KILLED)
thd->reset_killed();
}
/*
Other cases: mostly we expected no error and get one.
*/
else if (thd->is_slave_error || thd->is_fatal_error)
{
rli->report(ERROR_LEVEL, actual_error, rgi->gtid_info(),
"Error '%s' on query. Default database: '%s'. Query: '%s'",
(actual_error ? thd->get_stmt_da()->message() :
"unexpected success or fatal error"),
print_slave_db_safe(thd->db), query_arg);
if (!is_parallel_retry_error(rgi, actual_error))
rli->report(ERROR_LEVEL, actual_error, rgi->gtid_info(),
"Error '%s' on query. Default database: '%s'. Query: '%s'",
(actual_error ? thd->get_stmt_da()->message() :
"unexpected success or fatal error"),
print_slave_db_safe(thd->db), query_arg);
thd->is_slave_error= 1;
}
@ -6518,12 +6544,10 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
thd->variables.server_id= this->server_id;
thd->variables.gtid_domain_id= this->domain_id;
thd->variables.gtid_seq_no= this->seq_no;
mysql_reset_thd_for_next_command(thd);
if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates)
{
/* Need to reset prior "ok" status to give an error. */
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
if (mysql_bin_log.check_strict_gtid_sequence(this->domain_id,
this->server_id, this->seq_no))
return 1;
@ -7301,35 +7325,34 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
bool res;
int err;
rpl_gtid gtid;
uint64 sub_id;
uint64 sub_id= 0;
Relay_log_info const *rli= rgi->rli;
mysql_reset_thd_for_next_command(thd);
/*
Record any GTID in the same transaction, so slave state is transactionally
consistent.
*/
if ((sub_id= rgi->gtid_sub_id))
if (rgi->gtid_pending)
{
/* Clear the GTID from the RLI so we don't accidentally reuse it. */
rgi->gtid_sub_id= 0;
sub_id= rgi->gtid_sub_id;
rgi->gtid_pending= false;
gtid= rgi->current_gtid;
err= rpl_global_gtid_slave_state.record_gtid(thd, &gtid, sub_id, true, false);
if (err)
{
int ec= thd->get_stmt_da()->sql_errno();
/*
Do not report an error if this is really a kill due to a deadlock.
In this case, the transaction will be re-tried instead.
*/
if (rgi->killed_for_retry &&
thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED)
return err;
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Error during XID COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str,
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
if (!is_parallel_retry_error(rgi, ec))
rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, rgi->gtid_info(),
"Error during XID COMMIT: failed to update GTID state in "
"%s.%s: %d: %s",
"mysql", rpl_gtid_slave_state_table_name.str, ec,
thd->get_stmt_da()->message());
trans_rollback(thd);
thd->is_slave_error= 1;
return err;
@ -9650,7 +9673,7 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
{
uint actual_error= thd->get_stmt_da()->sql_errno();
if ((thd->is_slave_error || thd->is_fatal_error) &&
!(rgi->killed_for_retry && actual_error == ER_QUERY_INTERRUPTED))
!is_parallel_retry_error(rgi, actual_error))
{
/*
Error reporting borrowed from Query_log_event with many excessive

View File

@ -65,16 +65,16 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid,
int
rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
uint64 sub_id;
DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
/*
Update the GTID position, if we have it and did not already update
it in a GTID transaction.
*/
if ((sub_id= rgi->gtid_sub_id))
if (rgi->gtid_pending)
{
rgi->gtid_sub_id= 0;
uint64 sub_id= rgi->gtid_sub_id;
rgi->gtid_pending= false;
if (rgi->gtid_ignore_duplicate_state!=rpl_group_info::GTID_DUPLICATE_IGNORE)
{
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))

View File

@ -23,7 +23,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
thd->rgi_slave= rgi;
thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter;
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
@ -35,7 +34,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
rgi->future_event_relay_log_pos= qev->future_event_relay_log_pos;
strcpy(rgi->future_event_master_log_name, qev->future_event_master_log_name);
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
thread_safe_increment64(&rli->executed_entries,
&slave_executed_entries_lock);
@ -236,8 +234,9 @@ static void
convert_kill_to_deadlock_error(rpl_group_info *rgi)
{
THD *thd= rgi->thd;
int err_code= thd->get_stmt_da()->sql_errno();
if (thd->get_stmt_da()->sql_errno() == ER_QUERY_INTERRUPTED &&
if ((err_code == ER_QUERY_INTERRUPTED || err_code == ER_CONNECTION_KILLED) &&
rgi->killed_for_retry)
{
thd->clear_error();
@ -510,39 +509,6 @@ handle_rpl_parallel_thread(void *arg)
thd->set_time();
thd->variables.lock_wait_timeout= LONG_TIMEOUT;
thd->system_thread_info.rpl_sql_info= &sql_info;
/*
For now, we need to run the replication parallel worker threads in
READ COMMITTED. This is needed because gap locks are not symmetric.
For example, a gap lock from a DELETE blocks an insert intention lock,
but not vice versa. So an INSERT followed by DELETE can group commit
on the master, but if we are unlucky with thread scheduling we can
then deadlock on the slave because the INSERT ends up waiting for a
gap lock from the DELETE (and the DELETE in turn waits for the INSERT
in wait_for_prior_commit()). See also MDEV-5914.
It should be mostly safe to run in READ COMMITTED in the slave anyway.
The commit order is already fixed from on the master, so we do not
risk logging into the binlog in an incorrect order between worker
threads (one that would cause different results if executed on a
lower-level slave that uses this slave as a master). The only
potential problem is with transactions run in a different master
connection (using multi-source replication), or run directly on the
slave by an application; when using READ COMMITTED we are not
guaranteed serialisability of binlogged statements.
In practice, this is unlikely to be an issue. In GTID mode, such
parallel transactions from multi-source or application must in any
case use a different replication domain, in which case binlog order
by definition must be independent between the different domain. Even
in non-GTID mode, normally one will assume that the external
transactions are not conflicting with those applied by the slave, so
that isolation level should make no difference. It would be rather
strange if the result of applying query events from one master would
depend on the timing and nature of other queries executed from
different multi-source connections or done directly on the slave by
an application. Still, something to be aware of.
*/
thd->variables.tx_isolation= ISO_READ_COMMITTED;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->thd= thd;
@ -598,7 +564,7 @@ handle_rpl_parallel_thread(void *arg)
continue;
}
group_rgi= rgi;
thd->rgi_slave= group_rgi= rgi;
gco= rgi->gco;
/* Handle a new event group, which will be initiated by a GTID event. */
if ((event_type= events->ev->get_type_code()) == GTID_EVENT)
@ -607,7 +573,6 @@ handle_rpl_parallel_thread(void *arg)
PSI_stage_info old_stage;
uint64 wait_count;
thd->tx_isolation= (enum_tx_isolation)thd->variables.tx_isolation;
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
@ -618,9 +583,7 @@ handle_rpl_parallel_thread(void *arg)
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
/* Save this, as it gets cleared when the event group commits. */
event_gtid_sub_id= rgi->gtid_sub_id;
rgi->thd= thd;
/*
@ -796,7 +759,7 @@ handle_rpl_parallel_thread(void *arg)
finish_event_group(thd, event_gtid_sub_id, entry, rgi);
rgi->next= rgis_to_free;
rgis_to_free= rgi;
group_rgi= rgi= NULL;
thd->rgi_slave= group_rgi= rgi= NULL;
skip_event_group= false;
DEBUG_SYNC(thd, "rpl_parallel_end_of_group");
}
@ -879,7 +842,7 @@ handle_rpl_parallel_thread(void *arg)
in_event_group= false;
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
rpt->free_rgi(group_rgi);
group_rgi= NULL;
thd->rgi_slave= group_rgi= NULL;
skip_event_group= false;
}
if (!in_event_group)

View File

@ -182,7 +182,7 @@ struct rpl_parallel_entry {
Event groups commit in order, so the rpl_group_info for an event group
will be alive (at least) as long as
rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to
rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to
safely refer back to previous event groups if they are still executing,
and ignore them if they completed, without requiring explicit
synchronisation between the threads.

View File

@ -1563,6 +1563,8 @@ rpl_group_info::reinit(Relay_log_info *rli)
tables_to_lock_count= 0;
trans_retries= 0;
last_event_start_time= 0;
gtid_sub_id= 0;
gtid_pending= false;
worker_error= 0;
row_stmt_start_timestamp= 0;
long_find_row_note_printed= false;
@ -1572,7 +1574,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
{
@ -1606,6 +1608,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.domain_id= gev->domain_id;
rgi->current_gtid.seq_no= gev->seq_no;
rgi->gtid_pending= true;
return 0;
}

View File

@ -609,6 +609,8 @@ struct rpl_group_info
*/
char future_event_master_log_name[FN_REFLEN];
bool is_parallel_exec;
/* When gtid_pending is true, we have not yet done record_gtid(). */
bool gtid_pending;
int worker_error;
/*
Set true when we signalled that we reach the commit phase. Used to avoid

View File

@ -288,12 +288,12 @@ static void init_slave_psi_keys(void)
static bool slave_background_thread_running;
static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;
struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
int errcode;
} *slave_background_kill_list;
@ -323,24 +323,21 @@ handle_slave_background(void *arg __attribute__((unused)))
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
mysql_mutex_lock(&LOCK_thread_count);
threads.append(thd);
mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_gtid_loaded= true;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_broadcast(&COND_slave_background);
THD_STAGE_INFO(thd, stage_slave_background_process_request);
do
{
slave_background_kill_t *kill_list;
mysql_mutex_lock(&LOCK_slave_background);
thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
&old_stage);
for (;;)
{
stop= abort_loop || thd->killed;
stop= abort_loop || thd->killed || slave_background_thread_stop;
kill_list= slave_background_kill_list;
if (stop || kill_list)
break;
@ -356,36 +353,34 @@ handle_slave_background(void *arg __attribute__((unused)))
kill_list= p->next;
mysql_mutex_lock(&p->to_kill->LOCK_thd_data);
/* ToDo: mark the p->errcode error code somehow ... ? */
p->to_kill->awake(KILL_QUERY);
p->to_kill->awake(KILL_CONNECTION);
mysql_mutex_unlock(&p->to_kill->LOCK_thd_data);
my_free(p);
}
mysql_mutex_lock(&LOCK_slave_background);
} while (!stop);
slave_background_thread_running= false;
mysql_cond_broadcast(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
mysql_mutex_lock(&LOCK_thread_count);
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
my_thread_end();
mysql_mutex_lock(&LOCK_thread_count);
slave_background_thread_running= false;
mysql_cond_broadcast(&COND_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
return 0;
}
void
slave_background_kill_request(THD *to_kill, int errcode)
slave_background_kill_request(THD *to_kill)
{
slave_background_kill_t *p=
(slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
if (p)
{
p->to_kill= to_kill;
p->errcode= errcode;
to_kill->rgi_slave->killed_for_retry= true;
mysql_mutex_lock(&LOCK_slave_background);
p->next= slave_background_kill_list;
@ -417,6 +412,7 @@ start_slave_background_thread()
pthread_t th;
slave_background_thread_running= true;
slave_background_thread_stop= false;
slave_background_thread_gtid_loaded= false;
if (mysql_thread_create(key_thread_slave_background,
&th, &connection_attrib, handle_slave_background,
@ -426,15 +422,27 @@ start_slave_background_thread()
return 1;
}
mysql_mutex_lock(&LOCK_thread_count);
mysql_mutex_lock(&LOCK_slave_background);
while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&COND_thread_count, &LOCK_thread_count);
mysql_mutex_unlock(&LOCK_thread_count);
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
return 0;
}
static void
stop_slave_background_thread()
{
mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_stop= true;
mysql_cond_broadcast(&COND_slave_background);
while (slave_background_thread_running)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
}
/* Initialize slave structures */
int init_slave()
@ -1076,6 +1084,9 @@ void end_slave()
master_info_index= 0;
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
stop_slave_background_thread();
global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;
@ -3399,7 +3410,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
Make sure we do not errorneously update gtid_slave_pos with a lingering
GTID from this failed event group (MDEV-4906).
*/
rgi->gtid_sub_id= 0;
rgi->gtid_pending= false;
}
DBUG_RETURN(exec_res ? 1 : 0);
@ -4557,6 +4568,7 @@ pthread_handler_t handle_slave_sql(void *arg)
mysql_mutex_unlock(&rli->log_space_lock);
serial_rgi->gtid_sub_id= 0;
serial_rgi->gtid_pending= false;
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,

View File

@ -238,7 +238,7 @@ pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);
void slave_background_kill_request(THD *to_kill, int errcode);
void slave_background_kill_request(THD *to_kill);
extern bool volatile abort_loop;
extern Master_info main_mi, *active_mi; /* active_mi for multi-master */

View File

@ -4211,16 +4211,17 @@ extern "C" int thd_slave_thread(const MYSQL_THD thd)
return(thd->slave_thread);
}
/* Returns true for a worker thread in parallel replication. */
extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
{
return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
}
extern "C" int
thd_need_wait_for(const MYSQL_THD thd)
{
return thd && thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
rpl_group_info *rgi;
if (!thd)
return false;
rgi= thd->rgi_slave;
if (!rgi)
return false;
return rgi->is_parallel_exec;
}
extern "C" void
@ -4239,7 +4240,7 @@ thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
return;
if (rgi->rli != other_rgi->rli)
return;
if (!rgi->gtid_sub_id)
if (!rgi->gtid_sub_id || !other_rgi->gtid_sub_id)
return;
if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
return;
@ -4255,15 +4256,19 @@ thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
*/
#ifdef HAVE_REPLICATION
slave_background_kill_request(other_thd, ER_LOCK_DEADLOCK);
slave_background_kill_request(other_thd);
#endif
}
extern "C" int
thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
{
rpl_group_info *rgi= thd->rgi_slave;
rpl_group_info *other_rgi= other_thd->rgi_slave;
rpl_group_info *rgi, *other_rgi;
if (!thd || !other_thd)
return 1;
rgi= thd->rgi_slave;
other_rgi= other_thd->rgi_slave;
if (!rgi || !other_rgi)
return 1;
if (!rgi->is_parallel_exec)
@ -4281,6 +4286,46 @@ thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
return 0;
}
extern "C" int
thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2)
{
rpl_group_info *rgi1, *rgi2;
bool nontrans1, nontrans2;
if (!thd1 || !thd2)
return 0;
/*
If the transactions are participating in the same replication domain in
parallel replication, then request to select the one that will commit
later (in the fixed commit order from the master) as the deadlock victim.
*/
rgi1= thd1->rgi_slave;
rgi2= thd2->rgi_slave;
if (rgi1 && rgi2 &&
rgi1->is_parallel_exec &&
rgi1->rli == rgi2->rli &&
rgi1->current_gtid.domain_id == rgi2->current_gtid.domain_id)
return rgi1->gtid_sub_id < rgi2->gtid_sub_id ? 1 : -1;
/*
If one transaction has modified non-transactional tables (so that it
cannot be safely rolled back), and the other has not, then prefer to
select the purely transactional one as the victim.
*/
nontrans1= thd1->transaction.all.modified_non_trans_table;
nontrans2= thd2->transaction.all.modified_non_trans_table;
if (nontrans1 && !nontrans2)
return 1;
else if (!nontrans1 && nontrans2)
return -1;
/* No preferences, let the storage engine decide. */
return 0;
}
extern "C" int thd_non_transactional_update(const MYSQL_THD thd)
{
return(thd->transaction.all.modified_non_trans_table);
@ -6457,6 +6502,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
this->waitee= NULL;
}
}
wakeup_error= 0;
mysql_mutex_unlock(&LOCK_wait_commit);
}

View File

@ -1741,6 +1741,8 @@ struct wait_for_commit
{
if (waitee)
unregister_wait_for_prior_commit2();
else
wakeup_error= 0;
}
/*
Remove a waiter from the list in the waitee. Used to unregister a wait.

View File

@ -4274,14 +4274,11 @@ handler::Table_flags
ha_innobase::table_flags() const
/*============================*/
{
THD *thd = ha_thd();
/* Need to use tx_isolation here since table flags is (also)
called before prebuilt is inited. */
ulong const tx_isolation = thd_tx_isolation(thd);
ulong const tx_isolation = thd_tx_isolation(ha_thd());
if (tx_isolation <= ISO_READ_COMMITTED &&
!(tx_isolation == ISO_READ_COMMITTED &&
thd_rpl_is_parallel(thd))) {
if (tx_isolation <= ISO_READ_COMMITTED) {
return(int_table_flags);
}

View File

@ -1016,6 +1016,28 @@ lock_rec_has_to_wait(
return(FALSE);
}
if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2)) &&
!thd_need_ordering_with(trx->mysql_thd,
lock2->trx->mysql_thd)) {
/* If the upper server layer has already decided on the
commit order between the transaction requesting the
lock and the transaction owning the lock, we do not
need to wait for gap locks. Such ordeering by the upper
server layer happens in parallel replication, where the
commit order is fixed to match the original order on the
master.
Such gap locks are mainly needed to get serialisability
between transactions so that they will be binlogged in
the correct order so that statement-based replication
will give the correct results. Since the right order
was already determined on the master, we do not need
to enforce it again here (and doing so could lead to
occasional deadlocks). */
return (FALSE);
}
return(TRUE);
}

View File

@ -1903,9 +1903,8 @@ trx_assert_started(
#endif /* UNIV_DEBUG */
/*******************************************************************//**
Compares the "weight" (or size) of two transactions. Transactions that
have edited non-transactional tables are considered heavier than ones
that have not.
Compares the "weight" (or size) of two transactions. The heavier the weight,
the more reluctant we will be to choose the transaction as a deadlock victim.
@return TRUE if weight(a) >= weight(b) */
UNIV_INTERN
ibool
@ -1914,26 +1913,18 @@ trx_weight_ge(
const trx_t* a, /*!< in: the first transaction to be compared */
const trx_t* b) /*!< in: the second transaction to be compared */
{
ibool a_notrans_edit;
ibool b_notrans_edit;
int pref;
/* If mysql_thd is NULL for a transaction we assume that it has
not edited non-transactional tables. */
/* First ask the upper server layer if it has any preference for which
to prefer as a deadlock victim. */
pref= thd_deadlock_victim_preference(a->mysql_thd, b->mysql_thd);
if (pref < 0)
return FALSE;
else if (pref > 0)
return TRUE;
a_notrans_edit = a->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(a->mysql_thd);
b_notrans_edit = b->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(b->mysql_thd);
if (a_notrans_edit != b_notrans_edit) {
return(a_notrans_edit);
}
/* Either both had edited non-transactional tables or both had
not, we fall back to comparing the number of altered/locked
rows. */
/* Upper server layer had no preference, we fall back to comparing the
number of altered/locked rows. */
#if 0
fprintf(stderr,

View File

@ -4732,14 +4732,11 @@ handler::Table_flags
ha_innobase::table_flags() const
/*============================*/
{
THD *thd = ha_thd();
/* Need to use tx_isolation here since table flags is (also)
called before prebuilt is inited. */
ulong const tx_isolation = thd_tx_isolation(thd);
ulong const tx_isolation = thd_tx_isolation(ha_thd());
if (tx_isolation <= ISO_READ_COMMITTED &&
!(tx_isolation == ISO_READ_COMMITTED &&
thd_rpl_is_parallel(thd))) {
if (tx_isolation <= ISO_READ_COMMITTED) {
return(int_table_flags);
}

View File

@ -1017,6 +1017,28 @@ lock_rec_has_to_wait(
return(FALSE);
}
if ((type_mode & LOCK_GAP || lock_rec_get_gap(lock2)) &&
!thd_need_ordering_with(trx->mysql_thd,
lock2->trx->mysql_thd)) {
/* If the upper server layer has already decided on the
commit order between the transaction requesting the
lock and the transaction owning the lock, we do not
need to wait for gap locks. Such ordeering by the upper
server layer happens in parallel replication, where the
commit order is fixed to match the original order on the
master.
Such gap locks are mainly needed to get serialisability
between transactions so that they will be binlogged in
the correct order so that statement-based replication
will give the correct results. Since the right order
was already determined on the master, we do not need
to enforce it again here (and doing so could lead to
occasional deadlocks). */
return (FALSE);
}
return(TRUE);
}

View File

@ -2150,26 +2150,18 @@ trx_weight_ge(
const trx_t* a, /*!< in: the first transaction to be compared */
const trx_t* b) /*!< in: the second transaction to be compared */
{
ibool a_notrans_edit;
ibool b_notrans_edit;
int pref;
/* If mysql_thd is NULL for a transaction we assume that it has
not edited non-transactional tables. */
/* First ask the upper server layer if it has any preference for which
to prefer as a deadlock victim. */
pref= thd_deadlock_victim_preference(a->mysql_thd, b->mysql_thd);
if (pref < 0)
return FALSE;
else if (pref > 0)
return TRUE;
a_notrans_edit = a->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(a->mysql_thd);
b_notrans_edit = b->mysql_thd != NULL
&& thd_has_edited_nontrans_tables(b->mysql_thd);
if (a_notrans_edit != b_notrans_edit) {
return(a_notrans_edit);
}
/* Either both had edited non-transactional tables or both had
not, we fall back to comparing the number of altered/locked
rows. */
/* Upper server layer had no preference, we fall back to comparing the
number of altered/locked rows. */
#if 0
fprintf(stderr,