diff --git a/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result new file mode 100644 index 00000000000..47bf2ff887f --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_multilevel2.result @@ -0,0 +1,52 @@ +include/rpl_init.inc [topology=1->2->3] +*** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure *** +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +include/stop_slave.inc +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count; +SET GLOBAL binlog_commit_wait_count=2; +SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec; +SET GLOBAL binlog_commit_wait_usec=2000000; +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +CHANGE MASTER TO master_use_gtid=current_pos; +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; +BEGIN; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY; +COMMIT; +INSERT INTO t2 VALUES (1); +INSERT INTO t1 SELECT a, a*10 FROM t2; +DROP TABLE t2; +include/save_master_gtid.inc +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a b +1 10 +include/start_slave.inc +include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; +a b +1 10 +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +SET GLOBAL binlog_commit_wait_count=@old_commit_count; +SET GLOBAL binlog_commit_wait_usec=@old_commit_usec; +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +include/start_slave.inc +include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +include/start_slave.inc +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table"); +DROP TABLE t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf new file mode 100644 index 00000000000..4e1d3878f03 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.cnf @@ -0,0 +1,17 @@ +!include ../my.cnf + +[mysqld.1] +log-slave-updates +loose-innodb + +[mysqld.2] +log-slave-updates +loose-innodb + +[mysqld.3] +log-slave-updates +loose-innodb + +[ENV] +SERVER_MYPORT_3= @mysqld.3.port +SERVER_MYSOCK_3= @mysqld.3.socket diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test new file mode 100644 index 00000000000..4125394ef80 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_multilevel2.test @@ -0,0 +1,80 @@ +--source include/have_innodb.inc +--let $rpl_topology=1->2->3 +--source include/rpl_init.inc + +--echo *** MDEV-7668: Intermediate master groups CREATE with INSERT, causing parallel replication failure *** + +--connection server_1 +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +--save_master_pos + +--connection server_2 +--sync_with_master +--save_master_pos +--source include/stop_slave.inc +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +SET GLOBAL slave_parallel_threads=10; +SET @old_commit_count=@@GLOBAL.binlog_commit_wait_count; +SET GLOBAL binlog_commit_wait_count=2; +SET @old_commit_usec=@@GLOBAL.binlog_commit_wait_usec; +SET GLOBAL binlog_commit_wait_usec=2000000; +SET @old_updates= @@GLOBAL.binlog_direct_non_transactional_updates; +SET GLOBAL binlog_direct_non_transactional_updates=OFF; +SET SESSION binlog_direct_non_transactional_updates=OFF; +CHANGE MASTER TO master_use_gtid=current_pos; + +--connection server_3 +--sync_with_master +--save_master_pos +SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=10; +CHANGE MASTER TO master_use_gtid=current_pos; + + +--connection server_1 + +BEGIN; +CREATE TEMPORARY TABLE t2 (a INT PRIMARY KEY) ENGINE=MEMORY; +COMMIT; +INSERT INTO t2 VALUES (1); +INSERT INTO t1 SELECT a, a*10 FROM t2; +DROP TABLE t2; +--source include/save_master_gtid.inc + +--connection server_2 +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + +--connection server_3 +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +SELECT * FROM t1 ORDER BY a; + + +# Clean up + +--connection server_2 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +SET GLOBAL binlog_commit_wait_count=@old_commit_count; +SET GLOBAL binlog_commit_wait_usec=@old_commit_usec; +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +--source include/start_slave.inc + +--connection server_3 +--source include/stop_slave.inc +SET GLOBAL slave_parallel_threads=@old_parallel_threads; +--source include/start_slave.inc + +--connection server_1 +SET GLOBAL binlog_direct_non_transactional_updates= @old_updates; +CALL mtr.add_suppression("Statement accesses nontransactional table as well as transactional or temporary table"); +DROP TABLE t1; + +--source include/rpl_end.inc diff --git a/sql/sql_base.cc b/sql/sql_base.cc index d4fb2a8b8bb..9aa9d1e0266 100644 --- a/sql/sql_base.cc +++ b/sql/sql_base.cc @@ -661,20 +661,23 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd) DBUG_VOID_RETURN; } - thd->lock_temporary_tables(); - for (TABLE *table= thd->temporary_tables ; table ; table= table->next) + if (thd->temporary_tables) { - if ((table->query_id == thd->query_id) && ! table->open_by_handler) - mark_tmp_table_for_reuse(table); - } - thd->unlock_temporary_tables(); - if (thd->rgi_slave) - { - /* - Temporary tables are shared with other by sql execution threads. - As a safety messure, clear the pointer to the common area. - */ - thd->temporary_tables= 0; + thd->lock_temporary_tables(); + for (TABLE *table= thd->temporary_tables ; table ; table= table->next) + { + if ((table->query_id == thd->query_id) && ! table->open_by_handler) + mark_tmp_table_for_reuse(table); + } + thd->unlock_temporary_tables(); + if (thd->rgi_slave) + { + /* + Temporary tables are shared with other by sql execution threads. + As a safety messure, clear the pointer to the common area. + */ + thd->temporary_tables= 0; + } } DBUG_VOID_RETURN; } @@ -5586,6 +5589,14 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton, (uint) thd->variables.server_id, (ulong) thd->variables.pseudo_thread_id)); + if (add_to_temporary_tables_list) + { + /* Temporary tables are not safe for parallel replication. */ + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_prior_commit()) + return NULL; + } + /* Create the cache_key for temporary tables */ key_length= create_tmp_table_def_key(thd, cache_key, db, table_name); @@ -5822,6 +5833,25 @@ bool open_temporary_table(THD *thd, TABLE_LIST *tl) DBUG_RETURN(FALSE); } + /* + Temporary tables are not safe for parallel replication. They were + designed to be visible to one thread only, so have no table locking. + Thus there is no protection against two conflicting transactions + committing in parallel and things like that. + + So for now, anything that uses temporary tables will be serialised + with anything before it, when using parallel replication. + + ToDo: We might be able to introduce a reference count or something + on temp tables, and have slave worker threads wait for it to reach + zero before being allowed to use the temp table. Might not be worth + it though, as statement-based replication using temporary tables is + in any case rather fragile. + */ + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_prior_commit()) + DBUG_RETURN(true); + #ifdef WITH_PARTITION_STORAGE_ENGINE if (tl->partition_names) {