diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf b/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf new file mode 100644 index 00000000000..b47ebb2cf30 --- /dev/null +++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.cnf @@ -0,0 +1,24 @@ +!include my.cnf + +[mysqld.1] +log-slave-updates +loose-innodb + +[mysqld.2] +log-slave-updates +loose-innodb + +[mysqld.3] +log-bin=server3-bin +log-slave-updates +loose-innodb + +[mysqld.4] +server-id=4 +log-bin=server4-bin +log-slave-updates +loose-innodb + +[ENV] +SERVER_MYPORT_4= @mysqld.4.port +SERVER_MYSOCK_4= @mysqld.4.socket diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.result b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result new file mode 100644 index 00000000000..bac522af76b --- /dev/null +++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.result @@ -0,0 +1,188 @@ +*** Test all-to-all replication with --gtid-ignore-duplicates *** +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 1; +SET SESSION gtid_domain_id= 1; +CHANGE MASTER 'b2a' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +CHANGE MASTER 'c2a' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'b2a'; +START SLAVE; +include/wait_for_slave_to_start.inc +set default_master_connection = 'c2a'; +START SLAVE; +include/wait_for_slave_to_start.inc +set default_master_connection = ''; +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 2; +SET SESSION gtid_domain_id= 2; +CHANGE MASTER 'a2b' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +CHANGE MASTER 'c2b' TO master_port=MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'a2b'; +START SLAVE; +include/wait_for_slave_to_start.inc +set default_master_connection = 'c2b'; +START SLAVE; +include/wait_for_slave_to_start.inc +set default_master_connection = ''; +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 3; +SET SESSION gtid_domain_id= 3; +CHANGE MASTER 'a2c' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +CHANGE MASTER 'b2c' TO master_port=MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'a2c'; +START SLAVE; +include/wait_for_slave_to_start.inc +set default_master_connection = 'b2c'; +START SLAVE; +include/wait_for_slave_to_start.inc +set default_master_connection = ''; +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 1; +SET SESSION gtid_domain_id= 1; +CHANGE MASTER 'a2d' TO master_port=MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'a2d'; +START SLAVE; +include/wait_for_slave_to_start.inc +set default_master_connection = ''; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1); +BEGIN; +INSERT INTO t1 VALUES (2); +INSERT INTO t1 VALUES (3); +COMMIT; +INSERT INTO t1 VALUES (4), (5); +INSERT INTO t1 VALUES (6); +include/save_master_gtid.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a +1 +2 +3 +4 +5 +6 +INSERT INTO t1 VALUES (10); +include/save_master_gtid.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +a +10 +STOP SLAVE "c2b"; +SET default_master_connection = "c2b"; +include/wait_for_slave_to_stop.inc +STOP SLAVE "a2b"; +SET default_master_connection = "a2b"; +include/wait_for_slave_to_stop.inc +INSERT INTO t1 VALUES (11); +include/save_master_gtid.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +a +10 +11 +SET default_master_connection = "b2a"; +STOP SLAVE; +include/wait_for_slave_to_stop.inc +INSERT INTO t1 VALUES (12); +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +a +10 +12 +include/save_master_gtid.inc +START SLAVE "b2a"; +SET default_master_connection = "b2a"; +include/wait_for_slave_to_start.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +a +10 +11 +12 +START SLAVE "c2b"; +SET default_master_connection = "c2b"; +include/wait_for_slave_to_start.inc +START SLAVE "a2b"; +SET default_master_connection = "a2b"; +include/wait_for_slave_to_start.inc +include/save_master_gtid.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +a +10 +11 +12 +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +Warnings: +Note 1938 SLAVE 'c2a' stopped +Note 1938 SLAVE 'b2a' stopped +include/reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +Warnings: +Note 1938 SLAVE 'a2b' stopped +Note 1938 SLAVE 'c2b' stopped +include/reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +Warnings: +Note 1938 SLAVE 'a2c' stopped +Note 1938 SLAVE 'b2c' stopped +include/reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +Warnings: +Note 1938 SLAVE 'a2d' stopped +include/reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; diff --git a/mysql-test/suite/multi_source/gtid_ignore_duplicates.test b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test new file mode 100644 index 00000000000..16db4d82ddd --- /dev/null +++ b/mysql-test/suite/multi_source/gtid_ignore_duplicates.test @@ -0,0 +1,208 @@ +--source include/not_embedded.inc +--source include/have_innodb.inc + +--echo *** Test all-to-all replication with --gtid-ignore-duplicates *** + +--connect (server_1,127.0.0.1,root,,,$SERVER_MYPORT_1) +--connect (server_2,127.0.0.1,root,,,$SERVER_MYPORT_2) +--connect (server_3,127.0.0.1,root,,,$SERVER_MYPORT_3) +--connect (server_4,127.0.0.1,root,,,$SERVER_MYPORT_4) + +# Setup A <-> B, B <-> C, C <-> A, and A -> D. + +--connection server_1 +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 1; +SET SESSION gtid_domain_id= 1; +--replace_result $SERVER_MYPORT_2 MYPORT_2 +eval CHANGE MASTER 'b2a' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +--replace_result $SERVER_MYPORT_3 MYPORT_3 +eval CHANGE MASTER 'c2a' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'b2a'; +START SLAVE; +--source include/wait_for_slave_to_start.inc +set default_master_connection = 'c2a'; +START SLAVE; +--source include/wait_for_slave_to_start.inc +set default_master_connection = ''; + +--connection server_2 +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 2; +SET SESSION gtid_domain_id= 2; +--replace_result $SERVER_MYPORT_1 MYPORT_1 +eval CHANGE MASTER 'a2b' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +--replace_result $SERVER_MYPORT_3 MYPORT_3 +eval CHANGE MASTER 'c2b' TO master_port=$SERVER_MYPORT_3, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'a2b'; +START SLAVE; +--source include/wait_for_slave_to_start.inc +set default_master_connection = 'c2b'; +START SLAVE; +--source include/wait_for_slave_to_start.inc +set default_master_connection = ''; + +--connection server_3 +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 3; +SET SESSION gtid_domain_id= 3; +--replace_result $SERVER_MYPORT_1 MYPORT_1 +eval CHANGE MASTER 'a2c' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +--replace_result $SERVER_MYPORT_2 MYPORT_2 +eval CHANGE MASTER 'b2c' TO master_port=$SERVER_MYPORT_2, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'a2c'; +START SLAVE; +--source include/wait_for_slave_to_start.inc +set default_master_connection = 'b2c'; +START SLAVE; +--source include/wait_for_slave_to_start.inc +set default_master_connection = ''; + +--connection server_4 +SET @old_parallel= @@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=5; +SET @old_ignore_duplicates= @@GLOBAL.gtid_ignore_duplicates; +SET GLOBAL gtid_ignore_duplicates=1; +SET GLOBAL gtid_domain_id= 1; +SET SESSION gtid_domain_id= 1; +--replace_result $SERVER_MYPORT_1 MYPORT_1 +eval CHANGE MASTER 'a2d' TO master_port=$SERVER_MYPORT_1, master_host='127.0.0.1', master_user='root', master_use_gtid=slave_pos; +set default_master_connection = 'a2d'; +START SLAVE; +--source include/wait_for_slave_to_start.inc +set default_master_connection = ''; + + +--connection server_1 +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1); +BEGIN; +INSERT INTO t1 VALUES (2); +INSERT INTO t1 VALUES (3); +COMMIT; +INSERT INTO t1 VALUES (4), (5); +INSERT INTO t1 VALUES (6); + +--source include/save_master_gtid.inc + +--connection server_2 +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + +--connection server_3 +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + +--connection server_4 +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + +--connection server_1 +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + +# Test that we can connect at a GTID position that has not yet reached +# that master server. +# We stop the connections C->B and A->B, create an event on C, Check that +# the event has reached A (but not B). Then let A stop and re-connect to +# B, which will connect at the new event, which is in the future for B. + +--connection server_3 +INSERT INTO t1 VALUES (10); +--source include/save_master_gtid.inc + +--connection server_2 +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +STOP SLAVE "c2b"; +SET default_master_connection = "c2b"; +--source include/wait_for_slave_to_stop.inc +STOP SLAVE "a2b"; +SET default_master_connection = "a2b"; +--source include/wait_for_slave_to_stop.inc + +--connection server_3 +INSERT INTO t1 VALUES (11); +--source include/save_master_gtid.inc + +--connection server_1 +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +SET default_master_connection = "b2a"; +STOP SLAVE; +--source include/wait_for_slave_to_stop.inc + +--connection server_2 +INSERT INTO t1 VALUES (12); +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; +--source include/save_master_gtid.inc + +--connection server_1 +START SLAVE "b2a"; +SET default_master_connection = "b2a"; +--source include/wait_for_slave_to_start.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; + +--connection server_2 +START SLAVE "c2b"; +SET default_master_connection = "c2b"; +--source include/wait_for_slave_to_start.inc +START SLAVE "a2b"; +SET default_master_connection = "a2b"; +--source include/wait_for_slave_to_start.inc + +--connection server_1 +--source include/save_master_gtid.inc + +--connection server_2 +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 WHERE a >= 10 ORDER BY a; + + +# Clean up. +--connection server_1 +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +--source reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; +--disconnect server_1 + +--connection server_2 +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +--source reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; +--disconnect server_2 + +--connection server_3 +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +--source reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; +--disconnect server_3 + +--connection server_4 +SET GLOBAL gtid_domain_id=0; +STOP ALL SLAVES; +--source reset_master_slave.inc +SET GLOBAL slave_parallel_threads= @old_parallel; +SET GLOBAL gtid_ignore_duplicates= @old_ignore_duplicates; +DROP TABLE t1; +--disconnect server_4 diff --git a/sql/log_event.cc b/sql/log_event.cc index f3f6d7a5d38..98524d73433 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4440,7 +4440,7 @@ Default database: '%s'. Query: '%s'", end: if (sub_id && !thd->is_slave_error) - rpl_global_gtid_slave_state.update_state_hash(sub_id, >id); + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id, rli); /* Probably we have set thd->query, thd->db, thd->catalog to point to places @@ -6806,7 +6806,8 @@ Gtid_list_log_event::do_apply_event(rpl_group_info *rgi) sub_id_list[i], false, false))) return ret; - rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i]); + rpl_global_gtid_slave_state.update_state_hash(sub_id_list[i], &list[i], + NULL); } } ret= Log_event::do_apply_event(rgi); @@ -7326,7 +7327,7 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi) thd->mdl_context.release_transactional_locks(); if (!res && sub_id) - rpl_global_gtid_slave_state.update_state_hash(sub_id, >id); + rpl_global_gtid_slave_state.update_state_hash(sub_id, >id, rli); /* Increment the global status commit count variable diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 36d0edee660..d292cc86cfb 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -553,6 +553,7 @@ ulong opt_slave_domain_parallel_threads= 0; ulong opt_binlog_commit_wait_count= 0; ulong opt_binlog_commit_wait_usec= 0; ulong opt_slave_parallel_max_queued= 131072; +my_bool opt_gtid_ignore_duplicates= FALSE; const double log_10[] = { 1e000, 1e001, 1e002, 1e003, 1e004, 1e005, 1e006, 1e007, 1e008, 1e009, @@ -987,7 +988,7 @@ PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread, key_COND_rpl_thread_pool, key_COND_parallel_entry, key_COND_group_commit_orderer, key_COND_prepare_ordered; -PSI_cond_key key_COND_wait_gtid; +PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; static PSI_cond_info all_server_conds[]= { @@ -1035,7 +1036,8 @@ static PSI_cond_info all_server_conds[]= { &key_COND_parallel_entry, "COND_parallel_entry", 0}, { &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0}, { &key_COND_prepare_ordered, "COND_prepare_ordered", 0}, - { &key_COND_wait_gtid, "COND_wait_gtid", 0} + { &key_COND_wait_gtid, "COND_wait_gtid", 0}, + { &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0} }; PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, diff --git a/sql/mysqld.h b/sql/mysqld.h index 0d4b23b12e7..28b9c061945 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -184,6 +184,7 @@ extern ulong opt_slave_domain_parallel_threads; extern ulong opt_slave_parallel_max_queued; extern ulong opt_binlog_commit_wait_count; extern ulong opt_binlog_commit_wait_usec; +extern my_bool opt_gtid_ignore_duplicates; extern ulong back_log; extern ulong executed_events; extern char language[FN_REFLEN]; @@ -299,7 +300,7 @@ extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy; extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue, key_COND_rpl_thread_pool, key_COND_parallel_entry, key_COND_group_commit_orderer; -extern PSI_cond_key key_COND_wait_gtid; +extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates; extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert, key_thread_handle_manager, key_thread_kill_server, key_thread_main, diff --git a/sql/rpl_gtid.cc b/sql/rpl_gtid.cc index e51dee20c19..b66651ae5fd 100644 --- a/sql/rpl_gtid.cc +++ b/sql/rpl_gtid.cc @@ -33,7 +33,8 @@ const LEX_STRING rpl_gtid_slave_state_table_name= void -rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) +rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid, + const Relay_log_info *rli) { int err; /* @@ -44,7 +45,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid) it is even committed. */ mysql_mutex_lock(&LOCK_slave_state); - err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no); + err= update(gtid->domain_id, gtid->server_id, sub_id, gtid->seq_no, rli); mysql_mutex_unlock(&LOCK_slave_state); if (err) { @@ -76,17 +77,102 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi) rgi->gtid_sub_id= 0; if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false)) DBUG_RETURN(1); - update_state_hash(sub_id, &rgi->current_gtid); + update_state_hash(sub_id, &rgi->current_gtid, rgi->rli); } DBUG_RETURN(0); } +/* + Check GTID event execution when --gtid-ignore-duplicates. + + The idea with --gtid-ignore-duplicates is that we allow multiple master + connections (in multi-source replication) to all receive the same GTIDs and + event groups. Only one instance of each is applied; we use the sequence + number in the GTID to decide whether a GTID has already been applied. + + So if the seq_no of a GTID (or a higher sequence number) has already been + applied, then the event should be skipped. If not then the event should be + applied. + + To avoid two master connections tring to apply the same event + simultaneously, only one is allowed to work in any given domain at any point + in time. The associated Relay_log_info object is called the owner of the + domain (and there can be multiple parallel worker threads working in that + domain for that Relay_log_info). Any other Relay_log_info/master connection + must wait for the domain to become free, or for their GTID to have been + applied, before being allowed to proceed. + + Returns: + 0 This GTID is already applied, it should be skipped. + 1 The GTID is not yet applied; this rli is now the owner, and must apply + the event and release the domain afterwards. + -1 Error (out of memory to allocate a new element for the domain). +*/ +int +rpl_slave_state::check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli) +{ + uint32 domain_id= gtid->domain_id; + uint32 seq_no= gtid->seq_no; + rpl_slave_state::element *elem; + int res; + + mysql_mutex_lock(&LOCK_slave_state); + if (!(elem= get_element(domain_id))) + { + res= -1; + goto err; + } + /* + Note that the elem pointer does not change once inserted in the hash. So + we can re-use the pointer without looking it up again in the hash after + each lock release and re-take. + */ + + /* ToDo: Make this wait killable. */ + for (;;) + { + if (elem->highest_seq_no >= seq_no) + { + /* This sequence number is already applied, ignore it. */ + res= 0; + break; + } + if (!elem->owner_rli) + { + /* The domain became free, grab it and apply the event. */ + elem->owner_rli= rli; + elem->owner_count= 1; + res= 1; + break; + } + if (elem->owner_rli == rli) + { + /* Already own this domain, increment reference count and apply event. */ + ++elem->owner_count; + res= 1; + break; + } + /* + Someone else is currently processing this GTID (or an earlier one). + Wait for them to complete (or fail), and then check again. + */ + mysql_cond_wait(&elem->COND_gtid_ignore_duplicates, + &LOCK_slave_state); + } + +err: + mysql_mutex_unlock(&LOCK_slave_state); + return res; +} + + static void rpl_slave_state_free_element(void *arg) { struct rpl_slave_state::element *elem= (struct rpl_slave_state::element *)arg; mysql_cond_destroy(&elem->COND_wait_gtid); + mysql_cond_destroy(&elem->COND_gtid_ignore_duplicates); my_free(elem); } @@ -147,7 +233,7 @@ rpl_slave_state::deinit() int rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, - uint64 seq_no) + uint64 seq_no, const Relay_log_info *rli) { element *elem= NULL; list_element *list_elem= NULL; @@ -170,6 +256,20 @@ rpl_slave_state::update(uint32 domain_id, uint32 server_id, uint64 sub_id, mysql_cond_broadcast(&elem->COND_wait_gtid); } + if (opt_gtid_ignore_duplicates && rli) + { + uint32 count= elem->owner_count; + DBUG_ASSERT(count > 0); + DBUG_ASSERT(elem->owner_rli == rli); + --count; + elem->owner_count= count; + if (count == 0) + { + elem->owner_rli= NULL; + mysql_cond_broadcast(&elem->COND_gtid_ignore_duplicates); + } + } + if (!(list_elem= (list_element *)my_malloc(sizeof(*list_elem), MYF(MY_WME)))) return 1; list_elem->server_id= server_id; @@ -199,7 +299,11 @@ rpl_slave_state::get_element(uint32 domain_id) elem->domain_id= domain_id; elem->highest_seq_no= 0; elem->gtid_waiter= NULL; + elem->owner_rli= NULL; + elem->owner_count= 0; mysql_cond_init(key_COND_wait_gtid, &elem->COND_wait_gtid, 0); + mysql_cond_init(key_COND_gtid_ignore_duplicates, + &elem->COND_gtid_ignore_duplicates, 0); if (my_hash_insert(&hash, (uchar *)elem)) { my_free(elem); @@ -821,7 +925,7 @@ rpl_slave_state::load(THD *thd, char *state_from_master, size_t len, if (gtid_parser_helper(&state_from_master, end, >id) || !(sub_id= next_sub_id(gtid.domain_id)) || record_gtid(thd, >id, sub_id, false, in_statement) || - update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no)) + update(gtid.domain_id, gtid.server_id, sub_id, gtid.seq_no, NULL)) return 1; if (state_from_master == end) break; diff --git a/sql/rpl_gtid.h b/sql/rpl_gtid.h index 54f352661a7..aef1ca9e403 100644 --- a/sql/rpl_gtid.h +++ b/sql/rpl_gtid.h @@ -91,6 +91,8 @@ struct gtid_waiting { }; +class Relay_log_info; + /* Replication slave state. @@ -131,6 +133,19 @@ struct rpl_slave_state uint64 min_wait_seq_no; mysql_cond_t COND_wait_gtid; + /* + For --gtid-ignore-duplicates. The Relay_log_info that currently owns + this domain, and the number of worker threads that are active in it. + + The idea is that only one of multiple master connections is allowed to + actively apply events for a given domain. Other connections must either + discard the events (if the seq_no in GTID shows they have already been + applied), or wait to see if the current owner will apply it. + */ + const Relay_log_info *owner_rli; + uint32 owner_count; + mysql_cond_t COND_gtid_ignore_duplicates; + list_element *grab_list() { list_element *l= list; list= NULL; return l; } void add(list_element *l) { @@ -155,7 +170,8 @@ struct rpl_slave_state void deinit(); void truncate_hash(); ulong count() const { return hash.records; } - int update(uint32 domain_id, uint32 server_id, uint64 sub_id, uint64 seq_no); + int update(uint32 domain_id, uint32 server_id, uint64 sub_id, + uint64 seq_no, const Relay_log_info *rli); int truncate_state_table(THD *thd); int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, bool in_transaction, bool in_statement); @@ -171,8 +187,10 @@ struct rpl_slave_state element *get_element(uint32 domain_id); int put_back_list(uint32 domain_id, list_element *list); - void update_state_hash(uint64 sub_id, rpl_gtid *gtid); + void update_state_hash(uint64 sub_id, rpl_gtid *gtid, + const Relay_log_info *rli); int record_and_update_gtid(THD *thd, struct rpl_group_info *rgi); + int check_duplicate_gtid(rpl_gtid *gtid, const Relay_log_info *rli); }; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 27f17668849..d1e0ca518f1 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -202,7 +202,7 @@ handle_rpl_parallel_thread(void *arg) struct rpl_parallel_thread::queued_event *events; bool group_standalone= true; bool in_event_group= false; - bool group_skip_for_stop= false; + bool skip_event_group= false; rpl_group_info *group_rgi= NULL; group_commit_orderer *gco, *tmp_gco; uint64 event_gtid_sub_id= 0; @@ -385,13 +385,13 @@ handle_rpl_parallel_thread(void *arg) point where we can safely stop. So set a flag that will cause us to skip, rather than execute, the following events. */ - group_skip_for_stop= true; + skip_event_group= true; } else - group_skip_for_stop= false; + skip_event_group= false; if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) - group_skip_for_stop= true; + skip_event_group= true; else if (rgi->wait_commit_sub_id > entry->last_committed_sub_id) { /* @@ -420,6 +420,16 @@ handle_rpl_parallel_thread(void *arg) thd->wait_for_commit_ptr->wakeup_subsequent_commits(err); } thd->wait_for_commit_ptr= &rgi->commit_orderer; + + if (opt_gtid_ignore_duplicates) + { + int res= + rpl_global_gtid_slave_state.check_duplicate_gtid(&rgi->current_gtid, + rgi->rli); + /* ToDo: Handle res==-1 error. */ + if (!res) + skip_event_group= true; + } } group_ending= event_type == XID_EVENT || @@ -438,7 +448,7 @@ handle_rpl_parallel_thread(void *arg) processing between the event groups as a simple way to ensure that everything is stopped and cleaned up correctly. */ - if (!rgi->is_error && !group_skip_for_stop) + if (!rgi->is_error && !skip_event_group) err= rpt_handle_event(events, rpt); else err= thd->wait_for_prior_commit(); @@ -464,7 +474,7 @@ handle_rpl_parallel_thread(void *arg) rgi->next= rgis_to_free; rgis_to_free= rgi; group_rgi= rgi= NULL; - group_skip_for_stop= false; + skip_event_group= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } @@ -526,7 +536,7 @@ handle_rpl_parallel_thread(void *arg) mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->free_rgi(group_rgi); group_rgi= NULL; - group_skip_for_stop= false; + skip_event_group= false; } if (!in_event_group) { diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 0fae3a3bb89..020f984ad50 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1435,7 +1435,8 @@ rpl_load_gtid_slave_state(THD *thd) if ((err= rpl_global_gtid_slave_state.update(tmp_entry.gtid.domain_id, tmp_entry.gtid.server_id, tmp_entry.sub_id, - tmp_entry.gtid.seq_no))) + tmp_entry.gtid.seq_no, + NULL))) { mysql_mutex_unlock(&rpl_global_gtid_slave_state.LOCK_slave_state); my_error(ER_OUT_OF_RESOURCES, MYF(0)); diff --git a/sql/slave.cc b/sql/slave.cc index 74955c09ced..cf741ccedc0 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2047,6 +2047,39 @@ after_set_capability: } } + query_str.length(0); + if (query_str.append(STRING_WITH_LEN("SET @slave_gtid_ignore_duplicates="), + system_charset_info) || + query_str.append_ulonglong(opt_gtid_ignore_duplicates != false)) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory error " + "is encountered when it tries to set @slave_gtid_ignore_duplicates."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + + rc= mysql_real_query(mysql, query_str.ptr(), query_str.length()); + if (rc) + { + err_code= mysql_errno(mysql); + if (is_network_error(err_code)) + { + mi->report(ERROR_LEVEL, err_code, + "Setting @slave_gtid_ignore_duplicates failed with " + "error: %s", mysql_error(mysql)); + goto network_err; + } + else + { + /* Fatal error */ + errmsg= "The slave I/O thread stops because a fatal error is " + "encountered when it tries to set @slave_gtid_ignore_duplicates."; + sprintf(err_buff, "%s Error: %s", errmsg, mysql_error(mysql)); + goto err; + } + } + if (mi->rli.until_condition == Relay_log_info::UNTIL_GTID) { query_str.length(0); diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 516e5d7567a..c7bd28259ae 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -114,6 +114,39 @@ fake_event_write(NET *net, String *packet, const char **errmsg) } +/* + Helper structure, used to pass miscellaneous info from mysql_binlog_send() + into the helper functions that it calls. +*/ +struct binlog_send_info { + rpl_binlog_state until_binlog_state; + slave_connection_state gtid_state; + THD *thd; + NET *net; + String *packet; + char *log_file_name; + slave_connection_state *until_gtid_state; + Format_description_log_event *fdev; + int mariadb_slave_capability; + enum_gtid_skip_type gtid_skip_group; + enum_gtid_until_state gtid_until_group; + ushort flags; + uint8 current_checksum_alg; + bool slave_gtid_strict_mode; + bool send_fake_gtid_list; + bool slave_gtid_ignore_duplicates; + bool using_gtid_state; + + binlog_send_info(THD *thd_arg, String *packet_arg, ushort flags_arg, char *lfn) + : thd(thd_arg), net(&thd_arg->net), packet(packet_arg), + log_file_name(lfn), until_gtid_state(NULL), fdev(NULL), + gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE), + flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), + slave_gtid_strict_mode(false), send_fake_gtid_list(false), + slave_gtid_ignore_duplicates(false) + { } +}; + /* fake_rotate_event() builds a fake (=which does not exist physically in any binlog) Rotate event, which contains the name of the binlog we are going to @@ -132,16 +165,16 @@ fake_event_write(NET *net, String *packet, const char **errmsg) part. */ -static int fake_rotate_event(NET* net, String* packet, char* log_file_name, - ulonglong position, const char** errmsg, - uint8 checksum_alg_arg) +static int fake_rotate_event(binlog_send_info *info, ulonglong position, + const char** errmsg, uint8 checksum_alg_arg) { DBUG_ENTER("fake_rotate_event"); char buf[ROTATE_HEADER_LEN+100]; my_bool do_checksum; int err; - char* p = log_file_name+dirname_length(log_file_name); + char* p = info->log_file_name+dirname_length(info->log_file_name); uint ident_len = (uint) strlen(p); + String *packet= info->packet; ha_checksum crc; if ((err= fake_event_header(packet, ROTATE_EVENT, @@ -160,22 +193,23 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, } if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || - (err= fake_event_write(net, packet, errmsg))) + (err= fake_event_write(info->net, packet, errmsg))) DBUG_RETURN(err); DBUG_RETURN(0); } -static int fake_gtid_list_event(NET* net, String* packet, +static int fake_gtid_list_event(binlog_send_info *info, Gtid_list_log_event *glev, const char** errmsg, - uint8 checksum_alg_arg, uint32 current_pos) + uint32 current_pos) { my_bool do_checksum; int err; ha_checksum crc; char buf[128]; String str(buf, sizeof(buf), system_charset_info); + String* packet= info->packet; str.length(0); if (glev->to_packet(&str)) @@ -185,7 +219,7 @@ static int fake_gtid_list_event(NET* net, String* packet, } if ((err= fake_event_header(packet, GTID_LIST_EVENT, str.length(), &do_checksum, &crc, - errmsg, checksum_alg_arg, current_pos))) + errmsg, info->current_checksum_alg, current_pos))) return err; packet->append(str); @@ -195,7 +229,7 @@ static int fake_gtid_list_event(NET* net, String* packet, } if ((err= fake_event_footer(packet, do_checksum, crc, errmsg)) || - (err= fake_event_write(net, packet, errmsg))) + (err= fake_event_write(info->net, packet, errmsg))) return err; return 0; @@ -627,6 +661,19 @@ get_slave_gtid_strict_mode(THD *thd) } +static bool +get_slave_gtid_ignore_duplicates(THD *thd) +{ + bool null_value; + + const LEX_STRING name= { C_STRING_WITH_LEN("slave_gtid_ignore_duplicates") }; + user_var_entry *entry= + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name.str, + name.length); + return entry && entry->val_int(&null_value) && !null_value; +} + + /* Get the value of the @slave_until_gtid user variable into the supplied String (this is the GTID position specified for START SLAVE UNTIL @@ -914,16 +961,16 @@ give_error_start_pos_missing_in_binlog(int *err, const char **errormsg, */ static int -check_slave_start_position(THD *thd, slave_connection_state *st, - const char **errormsg, rpl_gtid *error_gtid, - slave_connection_state *until_gtid_state) +check_slave_start_position(binlog_send_info *info, const char **errormsg, + rpl_gtid *error_gtid) { uint32 i; int err; slave_connection_state::entry **delete_list= NULL; uint32 delete_idx= 0; + slave_connection_state *st= &info->gtid_state; - if (rpl_load_gtid_slave_state(thd)) + if (rpl_load_gtid_slave_state(info->thd)) { *errormsg= "Failed to load replication slave GTID state"; err= ER_CANNOT_LOAD_SLAVE_GTID_STATE; @@ -963,6 +1010,7 @@ check_slave_start_position(THD *thd, slave_connection_state *st, if (!start_at_own_slave_pos) { rpl_gtid domain_gtid; + slave_connection_state *until_gtid_state= info->until_gtid_state; rpl_gtid *until_gtid; if (!mysql_bin_log.lookup_domain_in_binlog_state(slave_gtid->domain_id, @@ -981,6 +1029,17 @@ check_slave_start_position(THD *thd, slave_connection_state *st, continue; } + if (info->slave_gtid_ignore_duplicates && + domain_gtid.seq_no < slave_gtid->seq_no) + { + /* + When --gtid-ignore-duplicates, it is ok for the slave to request + something that we do not have (yet) - they might already have gotten + it through another path in a multi-path replication hierarchy. + */ + continue; + } + if (until_gtid_state && ( !(until_gtid= until_gtid_state->find(slave_gtid->domain_id)) || (mysql_bin_log.find_in_binlog_state(until_gtid->domain_id, @@ -1462,13 +1521,11 @@ gtid_state_from_binlog_pos(const char *in_name, uint32 pos, String *out_str) static bool -is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, - enum_gtid_until_state gtid_until_group, - Log_event_type event_type, uint8 current_checksum_alg, - ushort flags, const char **errmsg, - rpl_binlog_state *until_binlog_state, uint32 current_pos) +is_until_reached(binlog_send_info *info, ulong *ev_offset, + Log_event_type event_type, const char **errmsg, + uint32 current_pos) { - switch (gtid_until_group) + switch (info->gtid_until_group) { case GTID_UNTIL_NOT_DONE: return false; @@ -1479,9 +1536,10 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, case GTID_UNTIL_STOP_AFTER_TRANSACTION: if (event_type != XID_EVENT && (event_type != QUERY_EVENT || - !Query_log_event::peek_is_commit_rollback(packet->ptr()+*ev_offset, - packet->length()-*ev_offset, - current_checksum_alg))) + !Query_log_event::peek_is_commit_rollback + (info->packet->ptr()+*ev_offset, + info->packet->length()-*ev_offset, + info->current_checksum_alg))) return false; break; } @@ -1493,12 +1551,11 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, Send a last fake Gtid_list_log_event with a flag set to mark that we stop due to UNTIL condition. */ - if (reset_transmit_packet(thd, flags, ev_offset, errmsg)) + if (reset_transmit_packet(info->thd, info->flags, ev_offset, errmsg)) return true; - Gtid_list_log_event glev(until_binlog_state, + Gtid_list_log_event glev(&info->until_binlog_state, Gtid_list_log_event::FLAG_UNTIL_REACHED); - if (fake_gtid_list_event(net, packet, &glev, errmsg, current_checksum_alg, - current_pos)) + if (fake_gtid_list_event(info, &glev, errmsg, current_pos)) return true; *errmsg= NULL; return true; @@ -1512,23 +1569,19 @@ is_until_reached(THD *thd, NET *net, String *packet, ulong *ev_offset, Returns NULL on success, error message string on error. */ static const char * -send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, - Log_event_type event_type, char *log_file_name, - IO_CACHE *log, int mariadb_slave_capability, - ulong ev_offset, uint8 current_checksum_alg, - bool using_gtid_state, slave_connection_state *gtid_state, - enum_gtid_skip_type *gtid_skip_group, - slave_connection_state *until_gtid_state, - enum_gtid_until_state *gtid_until_group, - rpl_binlog_state *until_binlog_state, - bool slave_gtid_strict_mode, rpl_gtid *error_gtid, - bool *send_fake_gtid_list, - Format_description_log_event *fdev) +send_event_to_slave(binlog_send_info *info, Log_event_type event_type, + IO_CACHE *log, ulong ev_offset, rpl_gtid *error_gtid) { my_off_t pos; + String* const packet= info->packet; size_t len= packet->length(); + int mariadb_slave_capability= info->mariadb_slave_capability; + uint8 current_checksum_alg= info->current_checksum_alg; + slave_connection_state *gtid_state= &info->gtid_state; + slave_connection_state *until_gtid_state= info->until_gtid_state; - if (event_type == GTID_LIST_EVENT && using_gtid_state && until_gtid_state) + if (event_type == GTID_LIST_EVENT && + info->using_gtid_state && until_gtid_state) { rpl_gtid *gtid_list; uint32 list_len; @@ -1537,12 +1590,12 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, if (ev_offset > len || Gtid_list_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, current_checksum_alg, - >id_list, &list_len, fdev)) + >id_list, &list_len, info->fdev)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_list_log_event: corrupt binlog"; } - err= until_binlog_state->load(gtid_list, list_len); + err= info->until_binlog_state.load(gtid_list, list_len); my_free(gtid_list); if (err) { @@ -1552,7 +1605,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, } /* Skip GTID event groups until we reach slave position within a domain_id. */ - if (event_type == GTID_EVENT && using_gtid_state) + if (event_type == GTID_EVENT && info->using_gtid_state) { uchar flags2; slave_connection_state::entry *gtid_entry; @@ -1566,7 +1619,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Gtid_log_event::peek(packet->ptr()+ev_offset, len - ev_offset, current_checksum_alg, &event_gtid.domain_id, &event_gtid.server_id, - &event_gtid.seq_no, &flags2, fdev)) + &event_gtid.seq_no, &flags2, info->fdev)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed to read Gtid_log_event: corrupt binlog"; @@ -1575,7 +1628,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, DBUG_EXECUTE_IF("gtid_force_reconnect_at_10_1_100", { rpl_gtid *dbug_gtid; - if ((dbug_gtid= until_binlog_state->find_nolock(10,1)) && + if ((dbug_gtid= info->until_binlog_state.find_nolock(10,1)) && dbug_gtid->seq_no == 100) { DBUG_SET("-d,gtid_force_reconnect_at_10_1_100"); @@ -1585,7 +1638,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, } }); - if (until_binlog_state->update_nolock(&event_gtid, false)) + if (info->until_binlog_state.update_nolock(&event_gtid, false)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; return "Failed in internal GTID book-keeping: Out of memory"; @@ -1618,12 +1671,13 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, /* Skip this event group if we have not yet reached slave start pos. */ if (event_gtid.server_id != gtid->server_id || event_gtid.seq_no <= gtid->seq_no) - *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); if (event_gtid.server_id == gtid->server_id && event_gtid.seq_no >= gtid->seq_no) { - if (slave_gtid_strict_mode && event_gtid.seq_no > gtid->seq_no && + if (info->slave_gtid_strict_mode && + event_gtid.seq_no > gtid->seq_no && !(gtid_entry->flags & slave_connection_state::START_OWN_SLAVE_POS)) { /* @@ -1645,7 +1699,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, so MASTER_POS_WAIT() and MASTER_GTID_WAIT() can work. The fake event will be sent at the end of this event group. */ - *send_fake_gtid_list= true; + info->send_fake_gtid_list= true; /* Delete this entry if we have reached slave start position (so we @@ -1666,7 +1720,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, This domain already reached the START SLAVE UNTIL stop condition, so skip this event group. */ - *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); } else if (event_gtid.server_id == gtid->server_id && @@ -1681,9 +1735,9 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, uint64 until_seq_no= gtid->seq_no; until_gtid_state->remove(gtid); if (until_gtid_state->count() == 0) - *gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ? - GTID_UNTIL_STOP_AFTER_STANDALONE : - GTID_UNTIL_STOP_AFTER_TRANSACTION); + info->gtid_until_group= (flags2 & Gtid_log_event::FL_STANDALONE ? + GTID_UNTIL_STOP_AFTER_STANDALONE : + GTID_UNTIL_STOP_AFTER_TRANSACTION); if (event_gtid.seq_no > until_seq_no) { /* @@ -1693,7 +1747,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, should be in, we can just stop now. And we also need to skip this event group (as it is beyond the UNTIL condition). */ - *gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? + info->gtid_skip_group = (flags2 & Gtid_log_event::FL_STANDALONE ? GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION); } } @@ -1707,11 +1761,11 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Note that slave that understands GTID can also tolerate holes, so there is no need to supply dummy event. */ - switch (*gtid_skip_group) + switch (info->gtid_skip_group) { case GTID_SKIP_STANDALONE: if (!Log_event::is_part_of_group(event_type)) - *gtid_skip_group= GTID_SKIP_NOT; + info->gtid_skip_group= GTID_SKIP_NOT; return NULL; case GTID_SKIP_TRANSACTION: if (event_type == XID_EVENT || @@ -1719,14 +1773,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Query_log_event::peek_is_commit_rollback(packet->ptr() + ev_offset, len - ev_offset, current_checksum_alg))) - *gtid_skip_group= GTID_SKIP_NOT; + info->gtid_skip_group= GTID_SKIP_NOT; return NULL; case GTID_SKIP_NOT: break; } /* Do not send annotate_rows events unless slave requested it. */ - if (event_type == ANNOTATE_ROWS_EVENT && !(flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) + if (event_type == ANNOTATE_ROWS_EVENT && + !(info->flags & BINLOG_SEND_ANNOTATE_ROWS_EVENT)) { if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES) { @@ -1820,7 +1875,7 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, Skip events with the @@skip_replication flag set, if slave requested skipping of such events. */ - if (thd->variables.option_bits & OPTION_SKIP_REPLICATION) + if (info->thd->variables.option_bits & OPTION_SKIP_REPLICATION) { /* The first byte of the packet is a '\0' to distinguish it from an error @@ -1831,17 +1886,17 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, return NULL; } - THD_STAGE_INFO(thd, stage_sending_binlog_event_to_slave); + THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave); pos= my_b_tell(log); if (RUN_HOOK(binlog_transmit, before_send_event, - (thd, flags, packet, log_file_name, pos))) + (info->thd, info->flags, packet, info->log_file_name, pos))) { my_errno= ER_UNKNOWN_ERROR; return "run 'before_send_event' hook failed"; } - if (my_net_write(net, (uchar*) packet->ptr(), len)) + if (my_net_write(info->net, (uchar*) packet->ptr(), len)) { my_errno= ER_UNKNOWN_ERROR; return "Failed on my_net_write()"; @@ -1850,14 +1905,15 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags, DBUG_PRINT("info", ("log event code %d", (*packet)[LOG_EVENT_OFFSET+1] )); if (event_type == LOAD_EVENT) { - if (send_file(thd)) + if (send_file(info->thd)) { my_errno= ER_UNKNOWN_ERROR; return "failed in send_file()"; } } - if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + if (RUN_HOOK(binlog_transmit, after_send_event, + (info->thd, info->flags, packet))) { my_errno= ER_UNKNOWN_ERROR; return "Failed to run hook 'after_send_event'"; @@ -1878,31 +1934,21 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, IO_CACHE log; File file = -1; - String* const packet = &thd->packet; + String* const packet= &thd->packet; int error; const char *errmsg = "Unknown error", *tmp_msg; char error_text[MAX_SLAVE_ERRMSG]; // to be send to slave via my_message() - NET* net = &thd->net; mysql_mutex_t *log_lock; mysql_cond_t *log_cond; - int mariadb_slave_capability; char str_buf[128]; String connect_gtid_state(str_buf, sizeof(str_buf), system_charset_info); - bool using_gtid_state; char str_buf2[128]; String slave_until_gtid_str(str_buf2, sizeof(str_buf2), system_charset_info); - slave_connection_state gtid_state, until_gtid_state_obj; - slave_connection_state *until_gtid_state= NULL; + slave_connection_state until_gtid_state_obj; rpl_gtid error_gtid; - enum_gtid_skip_type gtid_skip_group= GTID_SKIP_NOT; - enum_gtid_until_state gtid_until_group= GTID_UNTIL_NOT_DONE; - rpl_binlog_state until_binlog_state; - bool slave_gtid_strict_mode= false; - bool send_fake_gtid_list= false; + binlog_send_info info(thd, packet, flags, log_file_name); - uint8 current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; int old_max_allowed_packet= thd->variables.max_allowed_packet; - Format_description_log_event *fdev= NULL; #ifndef DBUG_OFF int left_events = max_binlog_dump_events; @@ -1928,16 +1974,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, heartbeat_ts= &heartbeat_buf; set_timespec_nsec(*heartbeat_ts, 0); } - mariadb_slave_capability= get_mariadb_slave_capability(thd); + info.mariadb_slave_capability= get_mariadb_slave_capability(thd); connect_gtid_state.length(0); - using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); - DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", using_gtid_state= false;); - if (using_gtid_state) + info.using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state); + DBUG_EXECUTE_IF("simulate_non_gtid_aware_master", info.using_gtid_state= false;); + if (info.using_gtid_state) { - slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); + info.slave_gtid_strict_mode= get_slave_gtid_strict_mode(thd); + info.slave_gtid_ignore_duplicates= get_slave_gtid_ignore_duplicates(thd); if(get_slave_until_gtid(thd, &slave_until_gtid_str)) - until_gtid_state= &until_gtid_state_obj; + info.until_gtid_state= &until_gtid_state_obj; } DBUG_EXECUTE_IF("binlog_force_reconnect_after_22_events", @@ -1978,7 +2025,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } #endif - if (!(fdev= new Format_description_log_event(3))) + if (!(info.fdev= new Format_description_log_event(3))) { errmsg= "Out of memory initializing format_description event"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; @@ -1999,33 +2046,32 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, } name=search_file_name; - if (using_gtid_state) + if (info.using_gtid_state) { - if (gtid_state.load(connect_gtid_state.c_ptr_quick(), - connect_gtid_state.length())) + if (info.gtid_state.load(connect_gtid_state.c_ptr_quick(), + connect_gtid_state.length())) { errmsg= "Out of memory or malformed slave request when obtaining start " "position from GTID state"; my_errno= ER_UNKNOWN_ERROR; goto err; } - if (until_gtid_state && - until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), - slave_until_gtid_str.length())) + if (info.until_gtid_state && + info.until_gtid_state->load(slave_until_gtid_str.c_ptr_quick(), + slave_until_gtid_str.length())) { errmsg= "Out of memory or malformed slave request when obtaining UNTIL " "position sent from slave"; my_errno= ER_UNKNOWN_ERROR; goto err; } - if ((error= check_slave_start_position(thd, >id_state, &errmsg, - &error_gtid, until_gtid_state))) + if ((error= check_slave_start_position(&info, &errmsg, &error_gtid))) { my_errno= error; goto err; } - if ((errmsg= gtid_find_binlog_file(>id_state, search_file_name, - until_gtid_state))) + if ((errmsg= gtid_find_binlog_file(&info.gtid_state, search_file_name, + info.until_gtid_state))) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; @@ -2098,7 +2144,7 @@ impossible position"; given that we want minimum modification of 4.0, we send the normal and fake Rotates. */ - if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg, + if (fake_rotate_event(&info, pos, &errmsg, get_binlog_checksum_value_at_connect(thd))) { /* @@ -2150,14 +2196,14 @@ impossible position"; { Format_description_log_event *tmp; - current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, - packet->length() - ev_offset); - DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, + packet->length() - ev_offset); + DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); if (!is_slave_checksum_aware(thd) && - current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Slave can not handle replication events with the checksum " @@ -2170,14 +2216,14 @@ impossible position"; if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, packet->length()-ev_offset, - fdev))) + info.fdev))) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Corrupt Format_description event found or out-of-memory"; goto err; } - delete fdev; - fdev= tmp; + delete info.fdev; + info.fdev= tmp; (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* @@ -2194,12 +2240,12 @@ impossible position"; ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* fix the checksum due to latest changes in header */ - if (current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + if (info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) fix_checksum(packet, ev_offset); /* send it */ - if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) + if (my_net_write(info.net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; @@ -2235,13 +2281,13 @@ impossible position"; We will send one event, the format_description, and then stop. */ - if (until_gtid_state && until_gtid_state->count() == 0) - gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; + if (info.until_gtid_state && info.until_gtid_state->count() == 0) + info.gtid_until_group= GTID_UNTIL_STOP_AFTER_STANDALONE; /* seek to the requested position, to start the requested dump */ my_b_seek(&log, pos); // Seek will done on next read - while (!net->error && net->vio != 0 && !thd->killed) + while (!info.net->error && info.net->vio != 0 && !thd->killed) { Log_event_type event_type= UNKNOWN_EVENT; killed_state killed; @@ -2254,14 +2300,14 @@ impossible position"; bool is_active_binlog= false; while (!(killed= thd->killed) && !(error = Log_event::read_log_event(&log, packet, log_lock, - current_checksum_alg, + info.current_checksum_alg, log_file_name, &is_active_binlog))) { #ifndef DBUG_OFF if (max_binlog_dump_events && !left_events--) { - net_flush(net); + net_flush(info.net); errmsg = "Debugging binlog dump abort"; my_errno= ER_UNKNOWN_ERROR; goto err; @@ -2279,7 +2325,7 @@ impossible position"; { if (event_type == XID_EVENT) { - net_flush(net); + net_flush(info.net); const char act[]= "now " "wait_for signal.continue"; @@ -2298,14 +2344,14 @@ impossible position"; { Format_description_log_event *tmp; - current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, + info.current_checksum_alg= get_checksum_alg(packet->ptr() + ev_offset, packet->length() - ev_offset); - DBUG_ASSERT(current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || - current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); + DBUG_ASSERT(info.current_checksum_alg == BINLOG_CHECKSUM_ALG_OFF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF || + info.current_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32); if (!is_slave_checksum_aware(thd) && - current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && - current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_OFF && + info.current_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Slave can not handle replication events with the checksum " @@ -2318,14 +2364,14 @@ impossible position"; if (!(tmp= new Format_description_log_event(packet->ptr()+ev_offset, packet->length()-ev_offset, - fdev))) + info.fdev))) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; errmsg= "Corrupt Format_description event found or out-of-memory"; goto err; } - delete fdev; - fdev= tmp; + delete info.fdev; + info.fdev= tmp; (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; } @@ -2343,36 +2389,28 @@ impossible position"; } #endif - if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log, - mariadb_slave_capability, ev_offset, - current_checksum_alg, using_gtid_state, - >id_state, >id_skip_group, - until_gtid_state, >id_until_group, - &until_binlog_state, - slave_gtid_strict_mode, &error_gtid, - &send_fake_gtid_list, fdev))) + if ((tmp_msg= send_event_to_slave(&info, event_type, &log, + ev_offset, &error_gtid))) { errmsg= tmp_msg; goto err; } - if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT) + if (unlikely(info.send_fake_gtid_list) && + info.gtid_skip_group == GTID_SKIP_NOT) { - Gtid_list_log_event glev(&until_binlog_state, 0); + Gtid_list_log_event glev(&info.until_binlog_state, 0); if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || - fake_gtid_list_event(net, packet, &glev, &errmsg, - current_checksum_alg, my_b_tell(&log))) + fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log))) { my_errno= ER_UNKNOWN_ERROR; goto err; } - send_fake_gtid_list= false; + info.send_fake_gtid_list= false; } - if (until_gtid_state && - is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, - event_type, current_checksum_alg, flags, &errmsg, - &until_binlog_state, my_b_tell(&log))) + if (info.until_gtid_state && + is_until_reached(&info, &ev_offset, event_type, &errmsg, + my_b_tell(&log))) { if (errmsg) { @@ -2386,7 +2424,7 @@ impossible position"; { if (event_type == XID_EVENT) { - net_flush(net); + net_flush(info.net); } }); @@ -2423,7 +2461,7 @@ impossible position"; /* Block until there is more data in the log */ - if (net_flush(net)) + if (net_flush(info.net)) { errmsg = "failed on net_flush()"; my_errno= ER_UNKNOWN_ERROR; @@ -2466,7 +2504,7 @@ impossible position"; mysql_mutex_lock(log_lock); switch (error= Log_event::read_log_event(&log, packet, (mysql_mutex_t*) 0, - current_checksum_alg)) { + info.current_checksum_alg)) { case 0: /* we read successfully, so we'll need to send it to the slave */ mysql_mutex_unlock(log_lock); @@ -2524,7 +2562,8 @@ impossible position"; thd->EXIT_COND(&old_stage); goto err; } - if (send_heartbeat_event(net, packet, p_coord, current_checksum_alg)) + if (send_heartbeat_event(info.net, packet, p_coord, + info.current_checksum_alg)) { errmsg = "Failed on my_net_write()"; my_errno= ER_UNKNOWN_ERROR; @@ -2549,36 +2588,28 @@ impossible position"; if (read_packet) { - if ((tmp_msg= send_event_to_slave(thd, net, packet, flags, event_type, - log_file_name, &log, - mariadb_slave_capability, ev_offset, - current_checksum_alg, - using_gtid_state, >id_state, - >id_skip_group, until_gtid_state, - >id_until_group, &until_binlog_state, - slave_gtid_strict_mode, &error_gtid, - &send_fake_gtid_list, fdev))) + if ((tmp_msg= send_event_to_slave(&info, event_type, &log, + ev_offset, &error_gtid))) { errmsg= tmp_msg; goto err; } - if (unlikely(send_fake_gtid_list) && gtid_skip_group == GTID_SKIP_NOT) + if (unlikely(info.send_fake_gtid_list) + && info.gtid_skip_group == GTID_SKIP_NOT) { - Gtid_list_log_event glev(&until_binlog_state, 0); + Gtid_list_log_event glev(&info.until_binlog_state, 0); if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg) || - fake_gtid_list_event(net, packet, &glev, &errmsg, - current_checksum_alg, my_b_tell(&log))) + fake_gtid_list_event(&info, &glev, &errmsg, my_b_tell(&log))) { my_errno= ER_UNKNOWN_ERROR; goto err; } - send_fake_gtid_list= false; + info.send_fake_gtid_list= false; } - if (until_gtid_state && - is_until_reached(thd, net, packet, &ev_offset, gtid_until_group, - event_type, current_checksum_alg, flags, &errmsg, - &until_binlog_state, my_b_tell(&log))) + if (info.until_gtid_state && + is_until_reached(&info, &ev_offset, event_type, &errmsg, + my_b_tell(&log))) { if (errmsg) { @@ -2633,8 +2664,8 @@ impossible position"; read and send is Format_description_log_event. */ if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 || - fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE, - &errmsg, current_checksum_alg)) + fake_rotate_event(&info, BIN_LOG_HEADER_SIZE, &errmsg, + info.current_checksum_alg)) { my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; @@ -2655,7 +2686,7 @@ end: thd->current_linfo = 0; mysql_mutex_unlock(&LOCK_thread_count); thd->variables.max_allowed_packet= old_max_allowed_packet; - delete fdev; + delete info.fdev; DBUG_VOID_RETURN; err: @@ -2731,7 +2762,7 @@ err: if (file >= 0) mysql_file_close(file, MYF(MY_WME)); thd->variables.max_allowed_packet= old_max_allowed_packet; - delete fdev; + delete info.fdev; my_message(my_errno, error_text, MYF(0)); DBUG_VOID_RETURN; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index fbf0c624a88..8aa202b381b 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -1819,6 +1819,54 @@ static Sys_var_ulong Sys_slave_parallel_max_queued( "--slave-parallel-threads > 0.", GLOBAL_VAR(opt_slave_parallel_max_queued), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1)); + + +static bool +check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var) +{ + bool running; + + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + if (running) + return true; + + return false; +} + +static bool +fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type) +{ + bool running; + bool err= false; + + mysql_mutex_unlock(&LOCK_global_system_variables); + mysql_mutex_lock(&LOCK_active_mi); + running= master_info_index->give_error_if_slave_running(); + mysql_mutex_unlock(&LOCK_active_mi); + if (running) + err= true; + mysql_mutex_lock(&LOCK_global_system_variables); + + /* ToDo: Isn't there a race here? I need to change the variable only under the LOCK_active_mi, and only if running is false. */ + return err; +} + + +static Sys_var_mybool Sys_gtid_ignore_duplicates( + "gtid_ignore_duplicates", + "When set, different master connections in multi-source replication are " + "allowed to receive and process event groups with the same GTID (when " + "using GTID mode). Only one will be applied, any others will be " + "ignored. Within a given replication domain, just the sequence number " + "will be used to decide whether a given GTID has been already applied; " + "this means it is the responsibility of the user to ensure that GTID " + "sequence numbers are strictly increasing.", + GLOBAL_VAR(opt_gtid_ignore_duplicates), CMD_LINE(OPT_ARG), + DEFAULT(FALSE), NO_MUTEX_GUARD, + NOT_IN_BINLOG, ON_CHECK(check_gtid_ignore_duplicates), + ON_UPDATE(fix_gtid_ignore_duplicates)); #endif