MDEV-5262: Missing retry after temp error in parallel replication
Implement that if first retry fails, we can do another attempt. Add testcases to test multi-retry that succeeds in second attempt, and multi-retry that eventually fails due to exceeding slave_trans_retries.
This commit is contained in:
parent
b0b60f2498
commit
d60915692c
@ -28,23 +28,21 @@ END IF;
|
|||||||
RETURN x;
|
RETURN x;
|
||||||
END
|
END
|
||||||
||
|
||
|
||||||
|
SET sql_log_bin=1;
|
||||||
include/stop_slave.inc
|
include/stop_slave.inc
|
||||||
SET @old_format= @@SESSION.binlog_format;
|
|
||||||
SET binlog_format='statement';
|
|
||||||
SET gtid_seq_no = 100;
|
SET gtid_seq_no = 100;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO t1 VALUES (2,1);
|
INSERT INTO t1 VALUES (2,1);
|
||||||
UPDATE t1 SET b=b+1 WHERE a=1;
|
UPDATE t1 SET b=b+1 WHERE a=1;
|
||||||
INSERT INTO t1 VALUES (3,1);
|
INSERT INTO t1 VALUES (3,1);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SET binlog_format=@old_format;
|
|
||||||
SELECT * FROM t1 ORDER BY a;
|
SELECT * FROM t1 ORDER BY a;
|
||||||
a b
|
a b
|
||||||
1 2
|
1 2
|
||||||
2 1
|
2 1
|
||||||
3 1
|
3 1
|
||||||
SET @old_dbug= @@GLOBAL.debug_dbug;
|
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||||
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
|
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
|
||||||
include/start_slave.inc
|
include/start_slave.inc
|
||||||
SET GLOBAL debug_dbug=@old_dbug;
|
SET GLOBAL debug_dbug=@old_dbug;
|
||||||
retries
|
retries
|
||||||
@ -54,6 +52,95 @@ a b
|
|||||||
1 2
|
1 2
|
||||||
2 1
|
2 1
|
||||||
3 1
|
3 1
|
||||||
|
*** Test that double retry works when the first retry also fails with temp error ***
|
||||||
|
include/stop_slave.inc
|
||||||
|
SET gtid_seq_no = 100;
|
||||||
|
SET @old_server_id = @@server_id;
|
||||||
|
SET server_id = 10;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO t1 VALUES (4,1);
|
||||||
|
UPDATE t1 SET b=b+1 WHERE a=1;
|
||||||
|
INSERT INTO t1 VALUES (5,1);
|
||||||
|
INSERT INTO t1 VALUES (6,1);
|
||||||
|
COMMIT;
|
||||||
|
SET server_id = @old_server_id;
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
a b
|
||||||
|
1 3
|
||||||
|
2 1
|
||||||
|
3 1
|
||||||
|
4 1
|
||||||
|
5 1
|
||||||
|
6 1
|
||||||
|
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||||
|
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_double_temp_err_gtid_0_x_100";
|
||||||
|
include/start_slave.inc
|
||||||
|
SET GLOBAL debug_dbug=@old_dbug;
|
||||||
|
retries
|
||||||
|
2
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
a b
|
||||||
|
1 3
|
||||||
|
2 1
|
||||||
|
3 1
|
||||||
|
4 1
|
||||||
|
5 1
|
||||||
|
6 1
|
||||||
|
*** Test too many retries, eventually causing failure. ***
|
||||||
|
include/stop_slave.inc
|
||||||
|
SET gtid_seq_no = 100;
|
||||||
|
SET @old_server_id = @@server_id;
|
||||||
|
SET server_id = 11;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO t1 VALUES (7,1);
|
||||||
|
UPDATE t1 SET b=b+1 WHERE a=1;
|
||||||
|
INSERT INTO t1 VALUES (8,1);
|
||||||
|
INSERT INTO t1 VALUES (9,1);
|
||||||
|
COMMIT;
|
||||||
|
SET server_id = @old_server_id;
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
a b
|
||||||
|
1 4
|
||||||
|
2 1
|
||||||
|
3 1
|
||||||
|
4 1
|
||||||
|
5 1
|
||||||
|
6 1
|
||||||
|
7 1
|
||||||
|
8 1
|
||||||
|
9 1
|
||||||
|
SET sql_log_bin=0;
|
||||||
|
CALL mtr.add_suppression("Slave worker thread retried transaction 10 time\\(s\\) in vain, giving up");
|
||||||
|
CALL mtr.add_suppression("Slave: Deadlock found when trying to get lock; try restarting transaction");
|
||||||
|
SET sql_log_bin=1;
|
||||||
|
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||||
|
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100";
|
||||||
|
START SLAVE;
|
||||||
|
include/wait_for_slave_sql_error.inc [errno=1213]
|
||||||
|
SET GLOBAL debug_dbug=@old_dbug;
|
||||||
|
retries
|
||||||
|
10
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
a b
|
||||||
|
1 3
|
||||||
|
2 1
|
||||||
|
3 1
|
||||||
|
4 1
|
||||||
|
5 1
|
||||||
|
6 1
|
||||||
|
STOP SLAVE IO_THREAD;
|
||||||
|
include/start_slave.inc
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
a b
|
||||||
|
1 4
|
||||||
|
2 1
|
||||||
|
3 1
|
||||||
|
4 1
|
||||||
|
5 1
|
||||||
|
6 1
|
||||||
|
7 1
|
||||||
|
8 1
|
||||||
|
9 1
|
||||||
include/stop_slave.inc
|
include/stop_slave.inc
|
||||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||||
include/start_slave.inc
|
include/start_slave.inc
|
||||||
|
@ -47,27 +47,22 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
|||||||
END
|
END
|
||||||
||
|
||
|
||||||
--delimiter ;
|
--delimiter ;
|
||||||
|
SET sql_log_bin=1;
|
||||||
--source include/stop_slave.inc
|
--source include/stop_slave.inc
|
||||||
|
|
||||||
--connection server_1
|
--connection server_1
|
||||||
SET @old_format= @@SESSION.binlog_format;
|
|
||||||
SET binlog_format='statement';
|
|
||||||
SET gtid_seq_no = 100;
|
SET gtid_seq_no = 100;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO t1 VALUES (2,1);
|
INSERT INTO t1 VALUES (2,1);
|
||||||
UPDATE t1 SET b=b+1 WHERE a=1;
|
UPDATE t1 SET b=b+1 WHERE a=1;
|
||||||
#INSERT INTO t1 VALUES (3,foo(1,
|
|
||||||
# "ha_write_row_end SIGNAL q1_ready WAIT_FOR q1_cont",
|
|
||||||
# ""));
|
|
||||||
INSERT INTO t1 VALUES (3,1);
|
INSERT INTO t1 VALUES (3,1);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SET binlog_format=@old_format;
|
|
||||||
SELECT * FROM t1 ORDER BY a;
|
SELECT * FROM t1 ORDER BY a;
|
||||||
--save_master_pos
|
--save_master_pos
|
||||||
|
|
||||||
--connection server_2
|
--connection server_2
|
||||||
SET @old_dbug= @@GLOBAL.debug_dbug;
|
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||||
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_1_100";
|
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100";
|
||||||
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
|
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
|
||||||
--source include/start_slave.inc
|
--source include/start_slave.inc
|
||||||
--sync_with_master
|
--sync_with_master
|
||||||
@ -79,6 +74,82 @@ eval SELECT $new_retry - $old_retry AS retries;
|
|||||||
|
|
||||||
SELECT * FROM t1 ORDER BY a;
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
|
||||||
|
|
||||||
|
--echo *** Test that double retry works when the first retry also fails with temp error ***
|
||||||
|
--source include/stop_slave.inc
|
||||||
|
|
||||||
|
--connection server_1
|
||||||
|
SET gtid_seq_no = 100;
|
||||||
|
SET @old_server_id = @@server_id;
|
||||||
|
SET server_id = 10;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO t1 VALUES (4,1);
|
||||||
|
UPDATE t1 SET b=b+1 WHERE a=1;
|
||||||
|
INSERT INTO t1 VALUES (5,1);
|
||||||
|
INSERT INTO t1 VALUES (6,1);
|
||||||
|
COMMIT;
|
||||||
|
SET server_id = @old_server_id;
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
--save_master_pos
|
||||||
|
|
||||||
|
--connection server_2
|
||||||
|
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||||
|
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_double_temp_err_gtid_0_x_100";
|
||||||
|
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
|
||||||
|
--source include/start_slave.inc
|
||||||
|
--sync_with_master
|
||||||
|
SET GLOBAL debug_dbug=@old_dbug;
|
||||||
|
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
|
||||||
|
--disable_query_log
|
||||||
|
eval SELECT $new_retry - $old_retry AS retries;
|
||||||
|
--enable_query_log
|
||||||
|
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
|
||||||
|
|
||||||
|
--echo *** Test too many retries, eventually causing failure. ***
|
||||||
|
--source include/stop_slave.inc
|
||||||
|
|
||||||
|
--connection server_1
|
||||||
|
SET gtid_seq_no = 100;
|
||||||
|
SET @old_server_id = @@server_id;
|
||||||
|
SET server_id = 11;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO t1 VALUES (7,1);
|
||||||
|
UPDATE t1 SET b=b+1 WHERE a=1;
|
||||||
|
INSERT INTO t1 VALUES (8,1);
|
||||||
|
INSERT INTO t1 VALUES (9,1);
|
||||||
|
COMMIT;
|
||||||
|
SET server_id = @old_server_id;
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
--save_master_pos
|
||||||
|
|
||||||
|
--connection server_2
|
||||||
|
SET sql_log_bin=0;
|
||||||
|
CALL mtr.add_suppression("Slave worker thread retried transaction 10 time\\(s\\) in vain, giving up");
|
||||||
|
CALL mtr.add_suppression("Slave: Deadlock found when trying to get lock; try restarting transaction");
|
||||||
|
SET sql_log_bin=1;
|
||||||
|
|
||||||
|
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||||
|
SET GLOBAL debug_dbug="+d,rpl_parallel_simulate_temp_err_gtid_0_x_100,rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100";
|
||||||
|
let $old_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
|
||||||
|
START SLAVE;
|
||||||
|
--let $slave_sql_errno= 1213
|
||||||
|
--let $slave_timeout= 10
|
||||||
|
--source include/wait_for_slave_sql_error.inc
|
||||||
|
SET GLOBAL debug_dbug=@old_dbug;
|
||||||
|
let $new_retry= query_get_value(SHOW STATUS LIKE 'Slave_retried_transactions', Value, 1);
|
||||||
|
--disable_query_log
|
||||||
|
eval SELECT $new_retry - $old_retry AS retries;
|
||||||
|
--enable_query_log
|
||||||
|
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
STOP SLAVE IO_THREAD;
|
||||||
|
--source include/start_slave.inc
|
||||||
|
--sync_with_master
|
||||||
|
SELECT * FROM t1 ORDER BY a;
|
||||||
|
|
||||||
|
|
||||||
--connection server_2
|
--connection server_2
|
||||||
--source include/stop_slave.inc
|
--source include/stop_slave.inc
|
||||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||||
|
@ -188,6 +188,22 @@ unlock_or_exit_cond(THD *thd, mysql_mutex_t *lock, bool *did_enter_cond,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifndef DBUG_OFF
|
||||||
|
static int
|
||||||
|
dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd)
|
||||||
|
{
|
||||||
|
if (rgi->current_gtid.domain_id == 0 && rgi->current_gtid.seq_no == 100 &&
|
||||||
|
rgi->retry_event_count == 4)
|
||||||
|
{
|
||||||
|
thd->clear_error();
|
||||||
|
thd->get_stmt_da()->reset_diagnostics_area();
|
||||||
|
my_error(ER_LOCK_DEADLOCK, MYF(0));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static int
|
static int
|
||||||
retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
|
retry_handle_relay_log_rotate(Log_event *ev, IO_CACHE *rlog)
|
||||||
{
|
{
|
||||||
@ -204,15 +220,18 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt,
|
|||||||
File fd;
|
File fd;
|
||||||
const char *errmsg= NULL;
|
const char *errmsg= NULL;
|
||||||
inuse_relaylog *ir= rgi->relay_log;
|
inuse_relaylog *ir= rgi->relay_log;
|
||||||
uint64 event_count= 0;
|
uint64 event_count;
|
||||||
uint64 events_to_execute= rgi->retry_event_count;
|
uint64 events_to_execute= rgi->retry_event_count;
|
||||||
Relay_log_info *rli= rgi->rli;
|
Relay_log_info *rli= rgi->rli;
|
||||||
int err= 0;
|
int err;
|
||||||
ulonglong cur_offset, old_offset;
|
ulonglong cur_offset, old_offset;
|
||||||
char log_name[FN_REFLEN];
|
char log_name[FN_REFLEN];
|
||||||
THD *thd= rgi->thd;
|
THD *thd= rgi->thd;
|
||||||
|
ulong retries= 0;
|
||||||
|
|
||||||
do_retry:
|
do_retry:
|
||||||
|
event_count= 0;
|
||||||
|
err= 0;
|
||||||
rgi->cleanup_context(thd, 1);
|
rgi->cleanup_context(thd, 1);
|
||||||
|
|
||||||
mysql_mutex_lock(&rli->data_lock);
|
mysql_mutex_lock(&rli->data_lock);
|
||||||
@ -268,10 +287,26 @@ do_retry:
|
|||||||
else
|
else
|
||||||
err= retry_handle_relay_log_rotate(ev, &rlog);
|
err= retry_handle_relay_log_rotate(ev, &rlog);
|
||||||
delete_or_keep_event_post_apply(rgi, event_type, ev);
|
delete_or_keep_event_post_apply(rgi, event_type, ev);
|
||||||
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_double_temp_err_gtid_0_x_100",
|
||||||
|
if (retries == 0) err= dbug_simulate_tmp_error(rgi, thd););
|
||||||
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_infinite_temp_err_gtid_0_x_100",
|
||||||
|
err= dbug_simulate_tmp_error(rgi, thd););
|
||||||
if (err)
|
if (err)
|
||||||
{
|
{
|
||||||
/* ToDo: Need to here also handle second retry. */
|
if (has_temporary_error(thd))
|
||||||
|
{
|
||||||
|
++retries;
|
||||||
|
if (retries < slave_trans_retries)
|
||||||
|
{
|
||||||
|
end_io_cache(&rlog);
|
||||||
|
mysql_file_close(fd, MYF(MY_WME));
|
||||||
|
goto do_retry;
|
||||||
|
}
|
||||||
|
sql_print_error("Slave worker thread retried transaction %lu time(s) "
|
||||||
|
"in vain, giving up. Consider raising the value of "
|
||||||
|
"the slave_transaction_retries variable.",
|
||||||
|
slave_trans_retries);
|
||||||
|
}
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -592,29 +627,23 @@ handle_rpl_parallel_thread(void *arg)
|
|||||||
{
|
{
|
||||||
++rgi->retry_event_count;
|
++rgi->retry_event_count;
|
||||||
err= rpt_handle_event(events, rpt);
|
err= rpt_handle_event(events, rpt);
|
||||||
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_1_100",
|
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
|
||||||
if (rgi->current_gtid.domain_id == 0 &&
|
DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100",
|
||||||
rgi->current_gtid.server_id == 1 &&
|
err= dbug_simulate_tmp_error(rgi, thd););
|
||||||
rgi->current_gtid.seq_no == 100 &&
|
|
||||||
rgi->retry_event_count == 4)
|
|
||||||
{
|
|
||||||
thd->clear_error();
|
|
||||||
thd->get_stmt_da()->reset_diagnostics_area();
|
|
||||||
my_error(ER_LOCK_DEADLOCK, MYF(0));
|
|
||||||
err= 1;
|
|
||||||
};);
|
|
||||||
if (err && has_temporary_error(thd))
|
if (err && has_temporary_error(thd))
|
||||||
err= retry_event_group(rgi, rpt, events);
|
err= retry_event_group(rgi, rpt, events);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
delete events->ev;
|
||||||
err= thd->wait_for_prior_commit();
|
err= thd->wait_for_prior_commit();
|
||||||
|
}
|
||||||
|
|
||||||
end_of_group=
|
end_of_group=
|
||||||
in_event_group &&
|
in_event_group &&
|
||||||
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
|
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
|
||||||
group_ending);
|
group_ending);
|
||||||
|
|
||||||
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
|
|
||||||
events->next= qevs_to_free;
|
events->next= qevs_to_free;
|
||||||
qevs_to_free= events;
|
qevs_to_free= events;
|
||||||
|
|
||||||
@ -1527,16 +1556,10 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (typ == GTID_EVENT)
|
if (typ == GTID_EVENT)
|
||||||
{
|
|
||||||
uint32 domain_id;
|
|
||||||
if (likely(typ == GTID_EVENT))
|
|
||||||
{
|
{
|
||||||
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
||||||
domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
|
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ?
|
||||||
0 : gtid_ev->domain_id);
|
0 : gtid_ev->domain_id);
|
||||||
}
|
|
||||||
else
|
|
||||||
domain_id= 0;
|
|
||||||
if (!(e= find(domain_id)))
|
if (!(e= find(domain_id)))
|
||||||
{
|
{
|
||||||
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user