Merge MDEV-6589 and MDEV-6403 into 10.1.
Conflicts: sql/log.cc sql/rpl_rli.cc sql/sql_repl.cc
This commit is contained in:
commit
95d7208859
25
mysql-test/suite/rpl/r/rpl_gtid_misc.result
Normal file
25
mysql-test/suite/rpl/r/rpl_gtid_misc.result
Normal file
@ -0,0 +1,25 @@
|
||||
include/master-slave.inc
|
||||
[connection master]
|
||||
*** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart ***
|
||||
CREATE TABLE t1 (a INT PRIMARY KEY);
|
||||
include/stop_slave.inc
|
||||
SET sql_log_bin= 0;
|
||||
INSERT INTO t1 VALUES (1);
|
||||
SET sql_log_bin= 1;
|
||||
CHANGE MASTER TO master_use_gtid= current_pos;
|
||||
CREATE TEMPORARY TABLE t2 LIKE t1;
|
||||
INSERT INTO t2 VALUE (1);
|
||||
INSERT INTO t1 SELECT * FROM t2;
|
||||
DROP TEMPORARY TABLE t2;
|
||||
START SLAVE;
|
||||
include/wait_for_slave_sql_error.inc [errno=1062]
|
||||
STOP SLAVE IO_THREAD;
|
||||
SET sql_log_bin= 0;
|
||||
DELETE FROM t1 WHERE a=1;
|
||||
SET sql_log_bin= 1;
|
||||
include/start_slave.inc
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
a
|
||||
1
|
||||
DROP TABLE t1;
|
||||
include/rpl_end.inc
|
147
mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result
Normal file
147
mysql-test/suite/rpl/r/rpl_parallel_mdev6589.result
Normal file
@ -0,0 +1,147 @@
|
||||
include/master-slave.inc
|
||||
[connection master]
|
||||
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
CHANGE MASTER TO master_use_gtid=current_pos;
|
||||
include/start_slave.inc
|
||||
*** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
|
||||
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
|
||||
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
|
||||
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
|
||||
INSERT INTO t1 VALUES (1);
|
||||
INSERT INTO t2 VALUES (1);
|
||||
SELECT * FROM t1;
|
||||
a
|
||||
1
|
||||
SELECT * FROM t2;
|
||||
a
|
||||
1
|
||||
SET sql_log_bin=0;
|
||||
BEGIN;
|
||||
INSERT INTO t2 VALUES (5);
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t1 VALUES (2);
|
||||
INSERT INTO t2 VALUES (3);
|
||||
FLUSH LOGS;
|
||||
INSERT INTO t1 VALUES (4);
|
||||
SET gtid_domain_id=1;
|
||||
INSERT INTO t2 VALUES (5);
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t1 VALUES (6);
|
||||
INSERT INTO t1 VALUES (7);
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t2 VALUES (8);
|
||||
INSERT INTO t1 VALUES (9);
|
||||
FLUSH LOGS;
|
||||
SET gtid_domain_id=3;
|
||||
INSERT INTO t2 VALUES (10);
|
||||
INSERT INTO t1 VALUES (11);
|
||||
SET gtid_domain_id=1;
|
||||
INSERT INTO t1 VALUES (12);
|
||||
INSERT INTO t2 VALUES (13);
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t2 VALUES (14);
|
||||
FLUSH LOGS;
|
||||
SET gtid_domain_id=3;
|
||||
INSERT INTO t2 VALUES (15);
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t2 VALUES (16);
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t1 VALUES (17);
|
||||
SET @gtid0 = @@last_gtid;
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t1 VALUES (18);
|
||||
SET @gtid2 = @@last_gtid;
|
||||
SET gtid_domain_id=3;
|
||||
INSERT INTO t1 VALUES (19);
|
||||
SET @gtid3 = @@last_gtid;
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
a
|
||||
1
|
||||
2
|
||||
4
|
||||
6
|
||||
7
|
||||
9
|
||||
11
|
||||
12
|
||||
17
|
||||
18
|
||||
19
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
a
|
||||
1
|
||||
3
|
||||
5
|
||||
8
|
||||
10
|
||||
13
|
||||
14
|
||||
15
|
||||
16
|
||||
include/save_master_gtid.inc
|
||||
SELECT MASTER_GTID_WAIT('WAIT_POS');
|
||||
MASTER_GTID_WAIT('WAIT_POS')
|
||||
0
|
||||
COMMIT;
|
||||
SET sql_log_bin=1;
|
||||
include/wait_for_slave_sql_error.inc [errno=1062]
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
a
|
||||
1
|
||||
2
|
||||
4
|
||||
6
|
||||
7
|
||||
9
|
||||
11
|
||||
17
|
||||
18
|
||||
19
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
a
|
||||
1
|
||||
3
|
||||
5
|
||||
8
|
||||
10
|
||||
14
|
||||
15
|
||||
16
|
||||
SET sql_log_bin=0;
|
||||
DELETE FROM t2 WHERE a=5;
|
||||
SET sql_log_bin=1;
|
||||
include/start_slave.inc
|
||||
include/sync_with_master_gtid.inc
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
a
|
||||
1
|
||||
2
|
||||
4
|
||||
6
|
||||
7
|
||||
9
|
||||
11
|
||||
12
|
||||
17
|
||||
18
|
||||
19
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
a
|
||||
1
|
||||
3
|
||||
5
|
||||
8
|
||||
10
|
||||
13
|
||||
14
|
||||
15
|
||||
16
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||
include/start_slave.inc
|
||||
SET DEBUG_SYNC= 'RESET';
|
||||
DROP TABLE t1,t2;
|
||||
SET DEBUG_SYNC= 'RESET';
|
||||
include/rpl_end.inc
|
50
mysql-test/suite/rpl/t/rpl_gtid_misc.test
Normal file
50
mysql-test/suite/rpl/t/rpl_gtid_misc.test
Normal file
@ -0,0 +1,50 @@
|
||||
--source include/master-slave.inc
|
||||
|
||||
--echo *** MDEV-6403: Temporary tables lost at STOP SLAVE in GTID mode if master has not rotated binlog since restart ***
|
||||
|
||||
--connection master
|
||||
CREATE TABLE t1 (a INT PRIMARY KEY);
|
||||
--sync_slave_with_master
|
||||
|
||||
--connection slave
|
||||
--source include/stop_slave.inc
|
||||
# Inject a duplicate key error that will make the slave stop in the middle of
|
||||
# a sequence of transactions that use a temporary table.
|
||||
SET sql_log_bin= 0;
|
||||
INSERT INTO t1 VALUES (1);
|
||||
SET sql_log_bin= 1;
|
||||
CHANGE MASTER TO master_use_gtid= current_pos;
|
||||
|
||||
--connection master
|
||||
|
||||
# Make some queries that use a temporary table.
|
||||
CREATE TEMPORARY TABLE t2 LIKE t1;
|
||||
INSERT INTO t2 VALUE (1);
|
||||
INSERT INTO t1 SELECT * FROM t2;
|
||||
DROP TEMPORARY TABLE t2;
|
||||
--save_master_pos
|
||||
|
||||
--connection slave
|
||||
START SLAVE;
|
||||
--let $slave_sql_errno=1062
|
||||
--source include/wait_for_slave_sql_error.inc
|
||||
|
||||
# Restart the slave.
|
||||
# The bug was that the IO thread would receive again the restart
|
||||
# format_description event at the start of the master's binlog, and this
|
||||
# event would cause the SQL thread to discard all active temporary tables.
|
||||
|
||||
STOP SLAVE IO_THREAD;
|
||||
|
||||
SET sql_log_bin= 0;
|
||||
DELETE FROM t1 WHERE a=1;
|
||||
SET sql_log_bin= 1;
|
||||
|
||||
--source include/start_slave.inc
|
||||
--sync_with_master
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
|
||||
--connection master
|
||||
DROP TABLE t1;
|
||||
|
||||
--source include/rpl_end.inc
|
132
mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test
Normal file
132
mysql-test/suite/rpl/t/rpl_parallel_mdev6589.test
Normal file
@ -0,0 +1,132 @@
|
||||
--source include/have_innodb.inc
|
||||
--source include/have_debug.inc
|
||||
--source include/have_debug_sync.inc
|
||||
--source include/master-slave.inc
|
||||
|
||||
--connection server_2
|
||||
SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads;
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
CHANGE MASTER TO master_use_gtid=current_pos;
|
||||
--source include/start_slave.inc
|
||||
|
||||
|
||||
--echo *** MDEV-6589: Incorrect relay log start position when restarting SQL thread after error in parallel replication ***
|
||||
|
||||
--connection server_1
|
||||
ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB;
|
||||
CREATE TABLE t1 (a int PRIMARY KEY) ENGINE=MyISAM;
|
||||
CREATE TABLE t2 (a int PRIMARY KEY) ENGINE=InnoDB;
|
||||
INSERT INTO t1 VALUES (1);
|
||||
INSERT INTO t2 VALUES (1);
|
||||
--save_master_pos
|
||||
|
||||
--connection server_2
|
||||
--sync_with_master
|
||||
SELECT * FROM t1;
|
||||
SELECT * FROM t2;
|
||||
|
||||
# Block one domain, which we will later cause to give an error. And let some
|
||||
# other domains proceed so we can check that after restart, the slave is able
|
||||
# to correctly restart each domain in a separate position.
|
||||
|
||||
--connect (con_temp1,127.0.0.1,root,,test,$SERVER_MYPORT_2,)
|
||||
SET sql_log_bin=0;
|
||||
BEGIN;
|
||||
INSERT INTO t2 VALUES (5);
|
||||
|
||||
--connection server_1
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t1 VALUES (2);
|
||||
INSERT INTO t2 VALUES (3);
|
||||
FLUSH LOGS;
|
||||
INSERT INTO t1 VALUES (4);
|
||||
|
||||
SET gtid_domain_id=1;
|
||||
# This query will be blocked on the slave, and later give a duplicate key error.
|
||||
INSERT INTO t2 VALUES (5);
|
||||
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t1 VALUES (6);
|
||||
INSERT INTO t1 VALUES (7);
|
||||
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t2 VALUES (8);
|
||||
INSERT INTO t1 VALUES (9);
|
||||
FLUSH LOGS;
|
||||
|
||||
SET gtid_domain_id=3;
|
||||
INSERT INTO t2 VALUES (10);
|
||||
INSERT INTO t1 VALUES (11);
|
||||
|
||||
# These cannot be replicated before the error, as a prior commit is blocked.
|
||||
SET gtid_domain_id=1;
|
||||
INSERT INTO t1 VALUES (12);
|
||||
INSERT INTO t2 VALUES (13);
|
||||
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t2 VALUES (14);
|
||||
FLUSH LOGS;
|
||||
|
||||
SET gtid_domain_id=3;
|
||||
INSERT INTO t2 VALUES (15);
|
||||
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t2 VALUES (16);
|
||||
|
||||
SET gtid_domain_id=0;
|
||||
INSERT INTO t1 VALUES (17);
|
||||
SET @gtid0 = @@last_gtid;
|
||||
SET gtid_domain_id=2;
|
||||
INSERT INTO t1 VALUES (18);
|
||||
SET @gtid2 = @@last_gtid;
|
||||
SET gtid_domain_id=3;
|
||||
INSERT INTO t1 VALUES (19);
|
||||
SET @gtid3 = @@last_gtid;
|
||||
--let $wait_pos= `SELECT CONCAT(@gtid0, ",", @gtid2, ",", @gtid3)`
|
||||
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
--source include/save_master_gtid.inc
|
||||
|
||||
|
||||
--connection server_2
|
||||
# First wait for domains 0, 2, and 3 to complete.
|
||||
--replace_result $wait_pos WAIT_POS
|
||||
eval SELECT MASTER_GTID_WAIT('$wait_pos');
|
||||
|
||||
# Then release the row lock, and wait for the domain 1 to fail with
|
||||
# duplicate key error.
|
||||
--connection con_temp1
|
||||
COMMIT;
|
||||
SET sql_log_bin=1;
|
||||
|
||||
--connection server_2
|
||||
--let $slave_sql_errno= 1062
|
||||
--source include/wait_for_slave_sql_error.inc
|
||||
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
|
||||
SET sql_log_bin=0;
|
||||
DELETE FROM t2 WHERE a=5;
|
||||
SET sql_log_bin=1;
|
||||
--source include/start_slave.inc
|
||||
--source include/sync_with_master_gtid.inc
|
||||
|
||||
SELECT * FROM t1 ORDER BY a;
|
||||
SELECT * FROM t2 ORDER BY a;
|
||||
|
||||
|
||||
# Clean up.
|
||||
--connection server_2
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL slave_parallel_threads=@old_parallel_threads;
|
||||
--source include/start_slave.inc
|
||||
SET DEBUG_SYNC= 'RESET';
|
||||
|
||||
--connection server_1
|
||||
DROP TABLE t1,t2;
|
||||
SET DEBUG_SYNC= 'RESET';
|
||||
|
||||
--source include/rpl_end.inc
|
@ -4188,7 +4188,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
|
||||
included= 1;
|
||||
to_purge_if_included= my_strdup(ir->name, MYF(0));
|
||||
}
|
||||
my_free(ir);
|
||||
rli->free_inuse_relaylog(ir);
|
||||
ir= next;
|
||||
}
|
||||
rli->inuse_relaylog_list= ir;
|
||||
|
@ -1164,6 +1164,27 @@ rpl_binlog_state::load(struct rpl_gtid *list, uint32 count)
|
||||
}
|
||||
|
||||
|
||||
static int rpl_binlog_state_load_cb(rpl_gtid *gtid, void *data)
|
||||
{
|
||||
rpl_binlog_state *self= (rpl_binlog_state *)data;
|
||||
return self->update_nolock(gtid, false);
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
rpl_binlog_state::load(rpl_slave_state *slave_pos)
|
||||
{
|
||||
bool res= false;
|
||||
|
||||
mysql_mutex_lock(&LOCK_binlog_state);
|
||||
reset_nolock();
|
||||
if (slave_pos->iterate(rpl_binlog_state_load_cb, this, NULL, 0))
|
||||
res= true;
|
||||
mysql_mutex_unlock(&LOCK_binlog_state);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
rpl_binlog_state::~rpl_binlog_state()
|
||||
{
|
||||
free();
|
||||
@ -1932,6 +1953,31 @@ slave_connection_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Check if the GTID position has been reached, for mysql_binlog_send().
|
||||
|
||||
The position has not been reached if we have anything in the state, unless
|
||||
it has either the START_ON_EMPTY_DOMAIN flag set (which means it does not
|
||||
belong to this master at all), or the START_OWN_SLAVE_POS (which means that
|
||||
we start on an old position from when the server was a slave with
|
||||
--log-slave-updates=0).
|
||||
*/
|
||||
bool
|
||||
slave_connection_state::is_pos_reached()
|
||||
{
|
||||
uint32 i;
|
||||
|
||||
for (i= 0; i < hash.records; ++i)
|
||||
{
|
||||
entry *e= (entry *)my_hash_element(&hash, i);
|
||||
if (!(e->flags & (START_OWN_SLAVE_POS|START_ON_EMPTY_DOMAIN)))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Execute a MASTER_GTID_WAIT().
|
||||
The position to wait for is in gtid_str in string form.
|
||||
|
@ -241,6 +241,7 @@ struct rpl_binlog_state
|
||||
void reset();
|
||||
void free();
|
||||
bool load(struct rpl_gtid *list, uint32 count);
|
||||
bool load(rpl_slave_state *slave_pos);
|
||||
int update_nolock(const struct rpl_gtid *gtid, bool strict);
|
||||
int update(const struct rpl_gtid *gtid, bool strict);
|
||||
int update_with_next_gtid(uint32 domain_id, uint32 server_id,
|
||||
@ -296,6 +297,7 @@ struct slave_connection_state
|
||||
int to_string(String *out_str);
|
||||
int append_to_string(String *out_str);
|
||||
int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size);
|
||||
bool is_pos_reached();
|
||||
};
|
||||
|
||||
|
||||
|
@ -1893,6 +1893,41 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Handle seeing a GTID during slave restart in GTID mode. If we stopped with
|
||||
different replication domains having reached different positions in the relay
|
||||
log, we need to skip event groups in domains that are further progressed.
|
||||
|
||||
Updates the state with the seen GTID, and returns true if this GTID should
|
||||
be skipped, false otherwise.
|
||||
*/
|
||||
bool
|
||||
process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid)
|
||||
{
|
||||
slave_connection_state::entry *gtid_entry;
|
||||
slave_connection_state *state= &rli->restart_gtid_pos;
|
||||
|
||||
if (likely(state->count() == 0) ||
|
||||
!(gtid_entry= state->find_entry(gtid->domain_id)))
|
||||
return false;
|
||||
if (gtid->server_id == gtid_entry->gtid.server_id)
|
||||
{
|
||||
uint64 seq_no= gtid_entry->gtid.seq_no;
|
||||
if (gtid->seq_no >= seq_no)
|
||||
{
|
||||
/*
|
||||
This domain has reached its start position. So remove it, so that
|
||||
further events will be processed normally.
|
||||
*/
|
||||
state->remove(>id_entry->gtid);
|
||||
}
|
||||
return gtid->seq_no <= seq_no;
|
||||
}
|
||||
else
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
This is used when we get an error during processing in do_event();
|
||||
We will not queue any event to the thread, but we still need to wake it up
|
||||
@ -1955,13 +1990,15 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
return -1;
|
||||
|
||||
/* Execute pre-10.0 event, which have no GTID, in single-threaded mode. */
|
||||
if (unlikely(!current) && typ != GTID_EVENT)
|
||||
is_group_event= Log_event::is_group_event(typ);
|
||||
if (unlikely(!current) && typ != GTID_EVENT &&
|
||||
!(unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event))
|
||||
return -1;
|
||||
|
||||
/* ToDo: what to do with this lock?!? */
|
||||
mysql_mutex_unlock(&rli->data_lock);
|
||||
|
||||
if (typ == FORMAT_DESCRIPTION_EVENT)
|
||||
if (unlikely(typ == FORMAT_DESCRIPTION_EVENT))
|
||||
{
|
||||
Format_description_log_event *fdev=
|
||||
static_cast<Format_description_log_event *>(ev);
|
||||
@ -1987,6 +2024,19 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (unlikely(typ == GTID_LIST_EVENT))
|
||||
{
|
||||
Gtid_list_log_event *glev= static_cast<Gtid_list_log_event *>(ev);
|
||||
rpl_gtid *list= glev->list;
|
||||
uint32 count= glev->count;
|
||||
rli->update_relay_log_state(list, count);
|
||||
while (count)
|
||||
{
|
||||
process_gtid_for_restart_pos(rli, list);
|
||||
++list;
|
||||
--count;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Stop queueing additional event groups once the SQL thread is requested to
|
||||
@ -1996,7 +2046,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
been partially queued, but after that we will just ignore any further
|
||||
events the SQL driver thread may try to queue, and eventually it will stop.
|
||||
*/
|
||||
is_group_event= Log_event::is_group_event(typ);
|
||||
if ((typ == GTID_EVENT || !is_group_event) && rli->abort_slave)
|
||||
sql_thread_stopping= true;
|
||||
if (sql_thread_stopping)
|
||||
@ -2009,8 +2058,34 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (unlikely(rli->gtid_skip_flag != GTID_SKIP_NOT) && is_group_event)
|
||||
{
|
||||
if (typ == GTID_EVENT)
|
||||
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
||||
else
|
||||
{
|
||||
if (rli->gtid_skip_flag == GTID_SKIP_STANDALONE)
|
||||
{
|
||||
if (!Log_event::is_part_of_group(typ))
|
||||
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
||||
}
|
||||
else
|
||||
{
|
||||
DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
|
||||
if (typ == XID_EVENT ||
|
||||
(typ == QUERY_EVENT &&
|
||||
(((Query_log_event *)ev)->is_commit() ||
|
||||
((Query_log_event *)ev)->is_rollback())))
|
||||
rli->gtid_skip_flag= GTID_SKIP_NOT;
|
||||
}
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (typ == GTID_EVENT)
|
||||
{
|
||||
rpl_gtid gtid;
|
||||
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
||||
uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
|
||||
rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
|
||||
@ -2022,6 +2097,23 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
return 1;
|
||||
}
|
||||
current= e;
|
||||
|
||||
gtid.domain_id= gtid_ev->domain_id;
|
||||
gtid.server_id= gtid_ev->server_id;
|
||||
gtid.seq_no= gtid_ev->seq_no;
|
||||
rli->update_relay_log_state(>id, 1);
|
||||
if (process_gtid_for_restart_pos(rli, >id))
|
||||
{
|
||||
/*
|
||||
This domain has progressed further into the relay log before the last
|
||||
SQL thread restart. So we need to skip this event group to not doubly
|
||||
apply it.
|
||||
*/
|
||||
rli->gtid_skip_flag= ((gtid_ev->flags2 & Gtid_log_event::FL_STANDALONE) ?
|
||||
GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
|
||||
delete_or_keep_event_post_apply(serial_rgi, typ, ev);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
e= current;
|
||||
|
@ -339,5 +339,6 @@ extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
|
||||
extern int rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
||||
uint32 new_count,
|
||||
bool skip_check= false);
|
||||
extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
|
||||
|
||||
#endif /* RPL_PARALLEL_H */
|
||||
|
@ -62,7 +62,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
|
||||
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
|
||||
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
|
||||
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
|
||||
inited(0), abort_slave(0), stop_for_until(0),
|
||||
gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
|
||||
slave_running(0), until_condition(UNTIL_NONE),
|
||||
until_log_pos(0), retried_trans(0), executed_entries(0),
|
||||
m_flags(0)
|
||||
@ -100,17 +100,9 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
|
||||
|
||||
Relay_log_info::~Relay_log_info()
|
||||
{
|
||||
inuse_relaylog *cur;
|
||||
DBUG_ENTER("Relay_log_info::~Relay_log_info");
|
||||
|
||||
cur= inuse_relaylog_list;
|
||||
while (cur)
|
||||
{
|
||||
DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
|
||||
inuse_relaylog *next= cur->next;
|
||||
my_free(cur);
|
||||
cur= next;
|
||||
}
|
||||
reset_inuse_relaylog();
|
||||
mysql_mutex_destroy(&run_lock);
|
||||
mysql_mutex_destroy(&data_lock);
|
||||
mysql_mutex_destroy(&log_space_lock);
|
||||
@ -1383,14 +1375,34 @@ int
|
||||
Relay_log_info::alloc_inuse_relaylog(const char *name)
|
||||
{
|
||||
inuse_relaylog *ir;
|
||||
uint32 gtid_count;
|
||||
rpl_gtid *gtid_list;
|
||||
|
||||
if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL))))
|
||||
{
|
||||
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
|
||||
return 1;
|
||||
}
|
||||
gtid_count= relay_log_state.count();
|
||||
if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count,
|
||||
MYF(MY_WME))))
|
||||
{
|
||||
my_free(ir);
|
||||
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count);
|
||||
return 1;
|
||||
}
|
||||
if (relay_log_state.get_gtid_list(gtid_list, gtid_count))
|
||||
{
|
||||
my_free(gtid_list);
|
||||
my_free(ir);
|
||||
DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */);
|
||||
my_error(ER_OUT_OF_RESOURCES, MYF(0));
|
||||
return 1;
|
||||
}
|
||||
ir->rli= this;
|
||||
strmake_buf(ir->name, name);
|
||||
ir->relay_log_state= gtid_list;
|
||||
ir->relay_log_state_count= gtid_count;
|
||||
|
||||
if (!inuse_relaylog_list)
|
||||
inuse_relaylog_list= ir;
|
||||
@ -1405,6 +1417,45 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir)
|
||||
{
|
||||
my_free(ir->relay_log_state);
|
||||
my_atomic_rwlock_destroy(&ir->inuse_relaylog_atomic_lock);
|
||||
my_free(ir);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
Relay_log_info::reset_inuse_relaylog()
|
||||
{
|
||||
inuse_relaylog *cur= inuse_relaylog_list;
|
||||
while (cur)
|
||||
{
|
||||
DBUG_ASSERT(cur->queued_count == cur->dequeued_count);
|
||||
inuse_relaylog *next= cur->next;
|
||||
free_inuse_relaylog(cur);
|
||||
cur= next;
|
||||
}
|
||||
inuse_relaylog_list= last_inuse_relaylog= NULL;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
Relay_log_info::update_relay_log_state(rpl_gtid *gtid_list, uint32 count)
|
||||
{
|
||||
int res= 0;
|
||||
while (count)
|
||||
{
|
||||
if (relay_log_state.update_nolock(gtid_list, false))
|
||||
res= 1;
|
||||
++gtid_list;
|
||||
--count;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
|
||||
int
|
||||
rpl_load_gtid_slave_state(THD *thd)
|
||||
|
@ -269,6 +269,8 @@ public:
|
||||
int events_till_abort;
|
||||
#endif
|
||||
|
||||
enum_gtid_skip_type gtid_skip_flag;
|
||||
|
||||
/*
|
||||
inited changes its value within LOCK_active_mi-guarded critical
|
||||
sections at times of start_slave_threads() (0->1) and end_slave() (1->0).
|
||||
@ -344,6 +346,21 @@ public:
|
||||
size_t slave_patternload_file_size;
|
||||
|
||||
rpl_parallel parallel;
|
||||
/*
|
||||
The relay_log_state keeps track of the current binlog state of the execution
|
||||
of the relay log. This is used to know where to resume current GTID position
|
||||
if the slave thread is stopped and restarted.
|
||||
It is only accessed from the SQL thread, so it does not need any locking.
|
||||
*/
|
||||
rpl_binlog_state relay_log_state;
|
||||
/*
|
||||
The restart_gtid_state is used when the SQL thread restarts on a relay log
|
||||
in GTID mode. In multi-domain parallel replication, each domain may have a
|
||||
separat position, so some events in more progressed domains may need to be
|
||||
skipped. This keeps track of the domains that have not yet reached their
|
||||
starting event.
|
||||
*/
|
||||
slave_connection_state restart_gtid_pos;
|
||||
|
||||
Relay_log_info(bool is_slave_recovery);
|
||||
~Relay_log_info();
|
||||
@ -408,6 +425,9 @@ public:
|
||||
time_t event_creation_time, THD *thd,
|
||||
rpl_group_info *rgi);
|
||||
int alloc_inuse_relaylog(const char *name);
|
||||
void free_inuse_relaylog(inuse_relaylog *ir);
|
||||
void reset_inuse_relaylog();
|
||||
int update_relay_log_state(rpl_gtid *gtid_list, uint32 count);
|
||||
|
||||
/**
|
||||
Is the replication inside a group?
|
||||
@ -497,6 +517,12 @@ private:
|
||||
struct inuse_relaylog {
|
||||
inuse_relaylog *next;
|
||||
Relay_log_info *rli;
|
||||
/*
|
||||
relay_log_state holds the binlog state corresponding to the start of this
|
||||
relay log file. It is an array with relay_log_state_count elements.
|
||||
*/
|
||||
rpl_gtid *relay_log_state;
|
||||
uint32 relay_log_state_count;
|
||||
/* Number of events in this relay log queued for worker threads. */
|
||||
int64 queued_count;
|
||||
/* Number of events completed by worker threads. */
|
||||
|
56
sql/slave.cc
56
sql/slave.cc
@ -943,6 +943,8 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
|
||||
Master_info::USE_GTID_CURRENT_POS);
|
||||
mi->events_queued_since_last_gtid= 0;
|
||||
mi->gtid_reconnect_event_skip_count= 0;
|
||||
|
||||
mi->rli.restart_gtid_pos.reset();
|
||||
}
|
||||
|
||||
if (!error && (thread_mask & SLAVE_IO))
|
||||
@ -4479,6 +4481,16 @@ pthread_handler_t handle_slave_sql(void *arg)
|
||||
|
||||
serial_rgi->gtid_sub_id= 0;
|
||||
serial_rgi->gtid_pending= false;
|
||||
if (mi->using_gtid != Master_info::USE_GTID_NO)
|
||||
{
|
||||
/*
|
||||
We initialize the relay log state from the know starting position.
|
||||
It will then be updated as required by GTID and GTID_LIST events found
|
||||
while applying events read from relay logs.
|
||||
*/
|
||||
rli->relay_log_state.load(&rpl_global_gtid_slave_state);
|
||||
}
|
||||
rli->gtid_skip_flag = GTID_SKIP_NOT;
|
||||
if (init_relay_log_pos(rli,
|
||||
rli->group_relay_log_name,
|
||||
rli->group_relay_log_pos,
|
||||
@ -4489,6 +4501,7 @@ pthread_handler_t handle_slave_sql(void *arg)
|
||||
"Error initializing relay log position: %s", errmsg);
|
||||
goto err;
|
||||
}
|
||||
rli->reset_inuse_relaylog();
|
||||
if (rli->alloc_inuse_relaylog(rli->group_relay_log_name))
|
||||
goto err;
|
||||
|
||||
@ -4705,7 +4718,49 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
|
||||
thd->reset_query();
|
||||
thd->reset_db(NULL, 0);
|
||||
if (rli->mi->using_gtid != Master_info::USE_GTID_NO)
|
||||
{
|
||||
ulong domain_count;
|
||||
|
||||
flush_relay_log_info(rli);
|
||||
if (opt_slave_parallel_threads > 0)
|
||||
{
|
||||
/*
|
||||
In parallel replication GTID mode, we may stop with different domains
|
||||
at different positions in the relay log.
|
||||
|
||||
To handle this when we restart the SQL thread, mark the current
|
||||
per-domain position in the Relay_log_info.
|
||||
*/
|
||||
mysql_mutex_lock(&rpl_global_gtid_slave_state.LOCK_slave_state);
|
||||
domain_count= rpl_global_gtid_slave_state.count();
|
||||
mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state);
|
||||
if (domain_count > 1)
|
||||
{
|
||||
inuse_relaylog *ir;
|
||||
|
||||
/*
|
||||
Load the starting GTID position, so that we can skip already applied
|
||||
GTIDs when we restart the SQL thread. And set the start position in
|
||||
the relay log back to a known safe place to start (prior to any not
|
||||
yet applied transaction in any domain).
|
||||
*/
|
||||
rli->restart_gtid_pos.load(&rpl_global_gtid_slave_state, NULL, 0);
|
||||
if ((ir= rli->inuse_relaylog_list))
|
||||
{
|
||||
rpl_gtid *gtid= ir->relay_log_state;
|
||||
uint32 count= ir->relay_log_state_count;
|
||||
while (count > 0)
|
||||
{
|
||||
process_gtid_for_restart_pos(rli, gtid);
|
||||
++gtid;
|
||||
--count;
|
||||
}
|
||||
strmake_buf(rli->group_relay_log_name, ir->name);
|
||||
rli->group_relay_log_pos= BIN_LOG_HEADER_SIZE;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
|
||||
thd->add_status_to_global();
|
||||
mysql_mutex_lock(&rli->run_lock);
|
||||
@ -4718,6 +4773,7 @@ err_during_init:
|
||||
/* Forget the relay log's format */
|
||||
delete rli->relay_log.description_event_for_exec;
|
||||
rli->relay_log.description_event_for_exec= 0;
|
||||
rli->reset_inuse_relaylog();
|
||||
/* Wake up master_pos_wait() */
|
||||
mysql_mutex_unlock(&rli->data_lock);
|
||||
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
|
||||
|
@ -2320,6 +2320,30 @@ static int send_format_descriptor_event(binlog_send_info *info,
|
||||
info->current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
|
||||
fix_checksum(packet, ev_offset);
|
||||
}
|
||||
else if (info->using_gtid_state)
|
||||
{
|
||||
/*
|
||||
If this event has the field `created' set, then it will cause the
|
||||
slave to delete all active temporary tables. This must not happen
|
||||
if the slave received any later GTIDs in a previous connect, as
|
||||
those GTIDs might have created new temporary tables that are still
|
||||
needed.
|
||||
|
||||
So here, we check if the starting GTID position was already
|
||||
reached before this format description event. If not, we clear the
|
||||
`created' flag to preserve temporary tables on the slave. (If the
|
||||
slave connects at a position past this event, it means that it
|
||||
already received and handled it in a previous connect).
|
||||
*/
|
||||
if (!info.gtid_state.is_pos_reached())
|
||||
{
|
||||
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
|
||||
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
|
||||
if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
|
||||
info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
|
||||
fix_checksum(packet, ev_offset);
|
||||
}
|
||||
}
|
||||
|
||||
/* send it */
|
||||
if (my_net_write(info->net, (uchar*) packet->ptr(), packet->length()))
|
||||
|
Loading…
x
Reference in New Issue
Block a user