MDEV-30780 optimistic parallel slave hangs after hit an error
The hang could be seen as show slave status displaying an error like Last_Error: Could not execute Write_rows_v1 along with Slave_SQL_Running: Yes accompanied with one of the replication threads in show-processlist characteristically having status like 2394 | system user | | NULL | Slave_worker | 50852| closing tables It turns out that closing tables worker got entrapped in endless looping in mark_start_commit_inner() across already garbage-collected gco items. The reclaimed gco links are explained with actually possible out-of-order groups of events termination due to the Last_Error. This patch reinforces the correct ordering to perform finish_event_group's cleanup actions, incl unlinking gco:s from the active list.
This commit is contained in:
parent
dfdcd7ffab
commit
d4339620be
@ -0,0 +1,81 @@
|
|||||||
|
include/rpl_init.inc [topology=1->2]
|
||||||
|
call mtr.add_suppression("Slave: Commit failed due to failure of an earlier commit");
|
||||||
|
call mtr.add_suppression("Slave: Duplicate entry '99'");
|
||||||
|
connection server_1;
|
||||||
|
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
|
||||||
|
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
|
||||||
|
INSERT INTO t1 VALUES(1,1);
|
||||||
|
INSERT INTO t1 VALUES(2,1);
|
||||||
|
INSERT INTO t1 VALUES(3,1);
|
||||||
|
INSERT INTO t1 VALUES(4,1);
|
||||||
|
connection server_2;
|
||||||
|
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
|
||||||
|
include/stop_slave.inc
|
||||||
|
SET @old_debug_dbug = @@global.debug_dbug;
|
||||||
|
SET @@global.debug_dbug = "d,hold_worker2_favor_worker3";
|
||||||
|
SET GLOBAL slave_parallel_threads=4;
|
||||||
|
CHANGE MASTER TO master_use_gtid=slave_pos;
|
||||||
|
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
|
||||||
|
SET GLOBAL slave_parallel_mode='optimistic';
|
||||||
|
connection server_1;
|
||||||
|
SET @@gtid_seq_no = 2001;
|
||||||
|
BEGIN;
|
||||||
|
UPDATE t1 SET b = 11 WHERE a = 4;
|
||||||
|
UPDATE t1 SET b = 11 WHERE a = 3;
|
||||||
|
UPDATE t1 SET a = 99 WHERE a = 1;
|
||||||
|
COMMIT;
|
||||||
|
UPDATE t1 SET b = 2 WHERE a = 2;
|
||||||
|
UPDATE t1 SET b = 3 WHERE a = 3;
|
||||||
|
DROP TABLE IF EXISTS phantom_1;
|
||||||
|
Warnings:
|
||||||
|
Note 1051 Unknown table 'test.phantom_1'
|
||||||
|
include/save_master_gtid.inc
|
||||||
|
connect slave_local_0, 127.0.0.1, root,, test, $SLAVE_MYPORT,;
|
||||||
|
begin;
|
||||||
|
UPDATE t1 set b = 11 where a = 4;
|
||||||
|
connect slave_local_1, 127.0.0.1, root,, test, $SLAVE_MYPORT,;
|
||||||
|
begin;
|
||||||
|
INSERT INTO t1 VALUES (99, 11);
|
||||||
|
connect slave_local_2, 127.0.0.1, root,, test, $SLAVE_MYPORT,;
|
||||||
|
begin;
|
||||||
|
UPDATE t1 SET b = 12 WHERE a = 2;
|
||||||
|
connect slave_local_3, 127.0.0.1, root,, test, $SLAVE_MYPORT,;
|
||||||
|
begin;
|
||||||
|
UPDATE t1 SET b = 13 WHERE a = 3;
|
||||||
|
connection server_2;
|
||||||
|
include/start_slave.inc
|
||||||
|
# W4 is waiting to start its DROP
|
||||||
|
connection slave_local_3;
|
||||||
|
rollback;
|
||||||
|
connection slave_local_0;
|
||||||
|
rollback;
|
||||||
|
SELECT count(*) = 0 as "W3 undid its commit state" FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to commit%";
|
||||||
|
W3 undid its commit state
|
||||||
|
1
|
||||||
|
connection slave_local_2;
|
||||||
|
rollback;
|
||||||
|
connection slave_local_1;
|
||||||
|
commit;
|
||||||
|
SELECT COUNT(*) = 1 as "W4 remains with the same status" FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to start commit%";
|
||||||
|
W4 remains with the same status
|
||||||
|
1
|
||||||
|
# Slave_SQL_Running YES = Yes
|
||||||
|
# while W2 is held back ...
|
||||||
|
SET DEBUG_SYNC = 'now SIGNAL cont_worker2';
|
||||||
|
include/wait_for_slave_sql_error.inc [errno=1062]
|
||||||
|
DELETE FROM t1 WHERE a=99;
|
||||||
|
include/start_slave.inc
|
||||||
|
include/sync_with_master_gtid.inc
|
||||||
|
connection server_2;
|
||||||
|
include/stop_slave.inc
|
||||||
|
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
|
||||||
|
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||||
|
SET @@global.debug_dbug = @old_debug_dbug;
|
||||||
|
SET debug_sync = RESET;
|
||||||
|
include/start_slave.inc
|
||||||
|
connection server_1;
|
||||||
|
DROP TABLE t1;
|
||||||
|
include/save_master_gtid.inc
|
||||||
|
connection server_2;
|
||||||
|
include/sync_with_master_gtid.inc
|
||||||
|
include/rpl_end.inc
|
180
mysql-test/suite/rpl/t/rpl_parallel_optimistic_error_stop.test
Normal file
180
mysql-test/suite/rpl/t/rpl_parallel_optimistic_error_stop.test
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
--source include/have_innodb.inc
|
||||||
|
--source include/have_debug.inc
|
||||||
|
--source include/have_debug_sync.inc
|
||||||
|
--let $rpl_topology=1->2
|
||||||
|
--source include/rpl_init.inc
|
||||||
|
|
||||||
|
call mtr.add_suppression("Slave: Commit failed due to failure of an earlier commit");
|
||||||
|
call mtr.add_suppression("Slave: Duplicate entry '99'");
|
||||||
|
|
||||||
|
--connection server_1
|
||||||
|
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
|
||||||
|
CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB;
|
||||||
|
INSERT INTO t1 VALUES(1,1); # hit a dup entry on slave
|
||||||
|
INSERT INTO t1 VALUES(2,1); # races to "win" the last exit
|
||||||
|
INSERT INTO t1 VALUES(3,1);
|
||||||
|
INSERT INTO t1 VALUES(4,1); # make W3 race over W1
|
||||||
|
--save_master_pos
|
||||||
|
|
||||||
|
--connection server_2
|
||||||
|
--sync_with_master
|
||||||
|
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
|
||||||
|
--source include/stop_slave.inc
|
||||||
|
SET @old_debug_dbug = @@global.debug_dbug;
|
||||||
|
# In a group of W1,W2,W3 of the same batch W2 simulates slowness.
|
||||||
|
SET @@global.debug_dbug = "d,hold_worker2_favor_worker3";
|
||||||
|
SET GLOBAL slave_parallel_threads=4;
|
||||||
|
CHANGE MASTER TO master_use_gtid=slave_pos;
|
||||||
|
SET @old_parallel_mode=@@GLOBAL.slave_parallel_mode;
|
||||||
|
SET GLOBAL slave_parallel_mode='optimistic';
|
||||||
|
|
||||||
|
# MDEV-30780 optimistic parallel slave hangs after hit an error
|
||||||
|
# Test workers hang scenario to prove it's no more neither
|
||||||
|
# out-of-order access to the active gco list.
|
||||||
|
#
|
||||||
|
# The test provides how to reproduce on the OLD version, false by default.
|
||||||
|
# That branch approximates the original hang with an assert that
|
||||||
|
# confirms the OLD version indeed could access already reclaimed gco.
|
||||||
|
--let $old_version_regression=0
|
||||||
|
|
||||||
|
|
||||||
|
--connection server_1
|
||||||
|
|
||||||
|
# Let W1,W2,W3,W4 parallel workers that are going to execute
|
||||||
|
# the following transaction.
|
||||||
|
# W1 holds on with the 1st statement
|
||||||
|
# then crashes W3 with the 2nd into retry,
|
||||||
|
# finally hits with the 3rd a dup entry, on slave.
|
||||||
|
SET @@gtid_seq_no = 2001;
|
||||||
|
BEGIN;
|
||||||
|
UPDATE t1 SET b = 11 WHERE a = 4;
|
||||||
|
UPDATE t1 SET b = 11 WHERE a = 3;
|
||||||
|
UPDATE t1 SET a = 99 WHERE a = 1;
|
||||||
|
COMMIT;
|
||||||
|
# In the buggy version W2 races to "win" the exit last (of W1..3)
|
||||||
|
# and by that to access last a gco struct, garbage-collected.
|
||||||
|
UPDATE t1 SET b = 2 WHERE a = 2;
|
||||||
|
# W3 garbage-collects the gco struct in the buggy version.
|
||||||
|
UPDATE t1 SET b = 3 WHERE a = 3;
|
||||||
|
# W4 resides in following "singleton" batch to a W2 replacement
|
||||||
|
# in the buggy version to allow W3 reclaim the batch's gco.
|
||||||
|
DROP TABLE IF EXISTS phantom_1;
|
||||||
|
|
||||||
|
--source include/save_master_gtid.inc
|
||||||
|
|
||||||
|
--connect (slave_local_0, 127.0.0.1, root,, test, $SLAVE_MYPORT,)
|
||||||
|
begin;
|
||||||
|
UPDATE t1 set b = 11 where a = 4;
|
||||||
|
--connect (slave_local_1, 127.0.0.1, root,, test, $SLAVE_MYPORT,)
|
||||||
|
begin;
|
||||||
|
INSERT INTO t1 VALUES (99, 11);
|
||||||
|
|
||||||
|
--connect (slave_local_2, 127.0.0.1, root,, test, $SLAVE_MYPORT,)
|
||||||
|
begin;
|
||||||
|
UPDATE t1 SET b = 12 WHERE a = 2;
|
||||||
|
|
||||||
|
--connect (slave_local_3, 127.0.0.1, root,, test, $SLAVE_MYPORT,)
|
||||||
|
begin;
|
||||||
|
UPDATE t1 SET b = 13 WHERE a = 3;
|
||||||
|
|
||||||
|
--connection server_2
|
||||||
|
--source include/start_slave.inc
|
||||||
|
|
||||||
|
--echo # W4 is waiting to start its DROP
|
||||||
|
|
||||||
|
--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to start commit%"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
|
||||||
|
--connection slave_local_3
|
||||||
|
# make W3 to set E.cc <- 1
|
||||||
|
rollback;
|
||||||
|
--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to commit%"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
|
||||||
|
--connection slave_local_0
|
||||||
|
# make W3 into retry and delay it to let W1 hit a dupicate error first,
|
||||||
|
# see 'commit' by slave_local_1.
|
||||||
|
rollback;
|
||||||
|
--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state LIKE "debug sync point: now"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
SELECT count(*) = 0 as "W3 undid its commit state" FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to commit%";
|
||||||
|
|
||||||
|
|
||||||
|
--connection slave_local_2
|
||||||
|
rollback;
|
||||||
|
# wait for W2 to start committing E.cc <- 2
|
||||||
|
--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state like "Waiting for prior transaction to commit"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
|
||||||
|
--connection slave_local_1
|
||||||
|
|
||||||
|
# W1 errors out
|
||||||
|
# A. to alert W3
|
||||||
|
# B. W3 will *not* wake up W4 in the fixed version, having to wait for W2 demise.
|
||||||
|
# C. W2 will notify W3 that releases W4 as it would do in normal cases.
|
||||||
|
commit;
|
||||||
|
|
||||||
|
if (!$old_version_regression)
|
||||||
|
{
|
||||||
|
# A. In the fixed version show-processlist W4 is still in the ordered waiting
|
||||||
|
SELECT COUNT(*) = 1 as "W4 remains with the same status" FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to start commit%";
|
||||||
|
--let $status= query_get_value("show slave status", Slave_SQL_Running, 1)
|
||||||
|
--echo # Slave_SQL_Running YES = $status
|
||||||
|
|
||||||
|
# B. In the fixed version W3 is waiting for W2,...
|
||||||
|
--let $wait_condition= SELECT count(*) = 1 as "W4 is waiting" FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to commit%"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
--echo # while W2 is held back ...
|
||||||
|
--let $wait_condition= SELECT count(*) = 1 as "W2 simulates slowness" FROM information_schema.processlist WHERE state LIKE "debug sync point: now"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
|
||||||
|
# C. # ...until NOW.
|
||||||
|
SET DEBUG_SYNC = 'now SIGNAL cont_worker2';
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
# To reproduce the hang on the OLD version ...
|
||||||
|
if ($old_version_regression)
|
||||||
|
{
|
||||||
|
# replace the actual fixes block with checking W3,W4 have actually committed,
|
||||||
|
# followed by signaling to W2 like on behalf of W4 which would end up in the hang.
|
||||||
|
--let $wait_condition= SELECT COUNT(*) = 0 as "W4 has moved on" FROM information_schema.processlist WHERE state like "Waiting for prior transaction to start commit"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
--let $wait_condition= SELECT count(*) = 0 as "W3 does not wait on W2" FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to commit%"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
|
||||||
|
--let $wait_condition= SELECT count(*) = 1 as "W2 simulates slowness" FROM information_schema.processlist WHERE state LIKE "debug sync point: now"
|
||||||
|
--source include/wait_condition.inc
|
||||||
|
|
||||||
|
# Like above, but signaling is done after W4 is done to violate the commit order
|
||||||
|
# that must fire a debug assert.
|
||||||
|
SET DEBUG_SYNC = 'now SIGNAL cont_worker2';
|
||||||
|
}
|
||||||
|
|
||||||
|
--let $slave_sql_errno= 1062
|
||||||
|
--source include/wait_for_slave_sql_error.inc
|
||||||
|
|
||||||
|
# Restore the slave data and resume with replication
|
||||||
|
DELETE FROM t1 WHERE a=99;
|
||||||
|
--source include/start_slave.inc
|
||||||
|
--source include/sync_with_master_gtid.inc
|
||||||
|
|
||||||
|
#
|
||||||
|
# Clean up.
|
||||||
|
#
|
||||||
|
--connection server_2
|
||||||
|
--source include/stop_slave.inc
|
||||||
|
SET GLOBAL slave_parallel_mode=@old_parallel_mode;
|
||||||
|
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||||
|
SET @@global.debug_dbug = @old_debug_dbug;
|
||||||
|
SET debug_sync = RESET;
|
||||||
|
--source include/start_slave.inc
|
||||||
|
|
||||||
|
--connection server_1
|
||||||
|
DROP TABLE t1;
|
||||||
|
--source include/save_master_gtid.inc
|
||||||
|
|
||||||
|
--connection server_2
|
||||||
|
--source include/sync_with_master_gtid.inc
|
||||||
|
|
||||||
|
--source include/rpl_end.inc
|
@ -261,6 +261,12 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
|
|||||||
STRING_WITH_LEN("now WAIT_FOR proceed_by_1000"));
|
STRING_WITH_LEN("now WAIT_FOR proceed_by_1000"));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
DBUG_EXECUTE_IF("hold_worker2_favor_worker3", {
|
||||||
|
if (rgi->current_gtid.seq_no == 2001) {
|
||||||
|
DBUG_ASSERT(!rgi->worker_error || entry->stop_on_error_sub_id == sub_id);
|
||||||
|
debug_sync_set_action(thd, STRING_WITH_LEN("now SIGNAL cont_worker3"));
|
||||||
|
}
|
||||||
|
});
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
|
if (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
|
||||||
@ -284,6 +290,11 @@ signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, int err)
|
|||||||
In case we get an error during commit, inform following transactions that
|
In case we get an error during commit, inform following transactions that
|
||||||
we aborted our commit.
|
we aborted our commit.
|
||||||
*/
|
*/
|
||||||
|
DBUG_EXECUTE_IF("hold_worker2_favor_worker3", {
|
||||||
|
if (rgi->current_gtid.seq_no == 2002) {
|
||||||
|
debug_sync_set_action(thd, STRING_WITH_LEN("now WAIT_FOR cont_worker2"));
|
||||||
|
}});
|
||||||
|
|
||||||
rgi->unmark_start_commit();
|
rgi->unmark_start_commit();
|
||||||
rgi->cleanup_context(thd, true);
|
rgi->cleanup_context(thd, true);
|
||||||
rgi->rli->abort_slave= true;
|
rgi->rli->abort_slave= true;
|
||||||
@ -788,7 +799,14 @@ do_retry:
|
|||||||
thd->reset_killed();
|
thd->reset_killed();
|
||||||
thd->clear_error();
|
thd->clear_error();
|
||||||
rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE;
|
rgi->killed_for_retry = rpl_group_info::RETRY_KILL_NONE;
|
||||||
|
#ifdef ENABLED_DEBUG_SYNC
|
||||||
|
DBUG_EXECUTE_IF("hold_worker2_favor_worker3", {
|
||||||
|
if (rgi->current_gtid.seq_no == 2003) {
|
||||||
|
debug_sync_set_action(thd,
|
||||||
|
STRING_WITH_LEN("now WAIT_FOR cont_worker3"));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
#endif
|
||||||
/*
|
/*
|
||||||
If we retry due to a deadlock kill that occurred during the commit step, we
|
If we retry due to a deadlock kill that occurred during the commit step, we
|
||||||
might have already updated (but not committed) an update of table
|
might have already updated (but not committed) an update of table
|
||||||
@ -806,15 +824,12 @@ do_retry:
|
|||||||
for (;;)
|
for (;;)
|
||||||
{
|
{
|
||||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||||
if (entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX ||
|
register_wait_for_prior_event_group_commit(rgi, entry);
|
||||||
|
if (!(entry->stop_on_error_sub_id == (uint64) ULONGLONG_MAX ||
|
||||||
#ifndef DBUG_OFF
|
#ifndef DBUG_OFF
|
||||||
(DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)) ||
|
(DBUG_EVALUATE_IF("simulate_mdev_12746", 1, 0)) ||
|
||||||
#endif
|
#endif
|
||||||
rgi->gtid_sub_id < entry->stop_on_error_sub_id)
|
rgi->gtid_sub_id < entry->stop_on_error_sub_id))
|
||||||
{
|
|
||||||
register_wait_for_prior_event_group_commit(rgi, entry);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
A failure of a preceeding "parent" transaction may not be
|
A failure of a preceeding "parent" transaction may not be
|
||||||
@ -1991,6 +2006,9 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
|
|||||||
gco->prior_sub_id= prior_sub_id;
|
gco->prior_sub_id= prior_sub_id;
|
||||||
gco->installed= false;
|
gco->installed= false;
|
||||||
gco->flags= 0;
|
gco->flags= 0;
|
||||||
|
#ifndef DBUG_OFF
|
||||||
|
gco->gc_done= false;
|
||||||
|
#endif
|
||||||
return gco;
|
return gco;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1998,6 +2016,10 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
|
|||||||
void
|
void
|
||||||
rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
|
rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco)
|
||||||
{
|
{
|
||||||
|
#ifndef DBUG_OFF
|
||||||
|
DBUG_ASSERT(!gco->gc_done);
|
||||||
|
gco->gc_done= true;
|
||||||
|
#endif
|
||||||
if (!loc_gco_list)
|
if (!loc_gco_list)
|
||||||
loc_gco_last_ptr_ptr= &gco->next_gco;
|
loc_gco_last_ptr_ptr= &gco->next_gco;
|
||||||
else
|
else
|
||||||
|
@ -90,6 +90,9 @@ struct group_commit_orderer {
|
|||||||
FORCE_SWITCH= 2
|
FORCE_SWITCH= 2
|
||||||
};
|
};
|
||||||
uint8 flags;
|
uint8 flags;
|
||||||
|
#ifndef DBUG_OFF
|
||||||
|
bool gc_done;
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -2417,8 +2417,13 @@ mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco,
|
|||||||
uint64 count= ++e->count_committing_event_groups;
|
uint64 count= ++e->count_committing_event_groups;
|
||||||
/* Signal any following GCO whose wait_count has been reached now. */
|
/* Signal any following GCO whose wait_count has been reached now. */
|
||||||
tmp= gco;
|
tmp= gco;
|
||||||
|
|
||||||
|
DBUG_ASSERT(!tmp->gc_done);
|
||||||
|
|
||||||
while ((tmp= tmp->next_gco))
|
while ((tmp= tmp->next_gco))
|
||||||
{
|
{
|
||||||
|
DBUG_ASSERT(!tmp->gc_done);
|
||||||
|
|
||||||
uint64 wait_count= tmp->wait_count;
|
uint64 wait_count= tmp->wait_count;
|
||||||
if (wait_count > count)
|
if (wait_count > count)
|
||||||
break;
|
break;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user