diff --git a/mysql-test/suite/rpl/r/rpl_parallel_multi_domain_xa.result b/mysql-test/suite/rpl/r/rpl_parallel_multi_domain_xa.result new file mode 100644 index 00000000000..defab6aef52 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_parallel_multi_domain_xa.result @@ -0,0 +1,55 @@ +include/master-slave.inc +[connection master] +call mtr.add_suppression("Deadlock found when trying to get lock; try restarting transaction"); +call mtr.add_suppression("WSREP: handlerton rollback failed"); +connection master; +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +connection slave; +include/stop_slave.inc +SET @old_parallel_threads = @@GLOBAL.slave_parallel_threads; +SET @old_slave_domain_parallel_threads = @@GLOBAL.slave_domain_parallel_threads; +SET @@global.slave_parallel_threads = 5; +SET @@global.slave_domain_parallel_threads = 3; +SET @old_parallel_mode = @@GLOBAL.slave_parallel_mode; +CHANGE MASTER TO master_use_gtid=slave_pos; +connection master; +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1, 0); +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +SET @@global.slave_parallel_mode ='optimistic'; +connection master; +include/save_master_gtid.inc +connection slave; +include/start_slave.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +connection master; +include/save_master_gtid.inc +connection slave; +SET @@global.slave_parallel_mode ='conservative'; +include/start_slave.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +include/save_master_gtid.inc +connection slave; +SET @@global.slave_parallel_mode = 'optimistic'; +include/start_slave.inc +include/sync_with_master_gtid.inc +include/diff_tables.inc [master:t1, slave:t1] +connection slave; +include/stop_slave.inc +SET @@global.slave_parallel_mode = @old_parallel_mode; +SET @@global.slave_parallel_threads = @old_parallel_threads; +SET @@global.slave_domain_parallel_threads = @old_slave_domain_parallel_threads; +include/start_slave.inc +connection master; +DROP TABLE t1; +include/save_master_gtid.inc +connection slave; +include/sync_with_master_gtid.inc +connection master; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel_multi_domain_xa.test b/mysql-test/suite/rpl/t/rpl_parallel_multi_domain_xa.test new file mode 100644 index 00000000000..da1aaea130f --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_domain_xa.test @@ -0,0 +1,173 @@ +# Similar to rpl_parallel_optimistic_xa to verify XA +# parallel execution with multiple gtid domain. +# References: +# MDEV-33668 Adapt parallel slave's round-robin scheduling to XA events + +--source include/have_innodb.inc +--source include/have_perfschema.inc +--source include/master-slave.inc + +# Tests' global declarations +--let $trx = _trx_ + +call mtr.add_suppression("Deadlock found when trying to get lock; try restarting transaction"); +call mtr.add_suppression("WSREP: handlerton rollback failed"); + +--connection master +ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; +--save_master_pos + +# Prepare to restart slave into optimistic parallel mode +--connection slave +--sync_with_master +--source include/stop_slave.inc +SET @old_parallel_threads = @@GLOBAL.slave_parallel_threads; +SET @old_slave_domain_parallel_threads = @@GLOBAL.slave_domain_parallel_threads; +SET @@global.slave_parallel_threads = 5; +SET @@global.slave_domain_parallel_threads = 3; +SET @old_parallel_mode = @@GLOBAL.slave_parallel_mode; + +CHANGE MASTER TO master_use_gtid=slave_pos; + +--connection master +CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; +INSERT INTO t1 VALUES (1, 0); +--source include/save_master_gtid.inc + +--connection slave +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc +--source include/stop_slave.inc + +--let $mode = 2 +# mode = 2 is optimistic +SET @@global.slave_parallel_mode ='optimistic'; +while ($mode) +{ + --connection master + # + # create XA events alternating gtid domains to run them in parallel on slave. + # + --let $domain_num = 3 + --let $trx_num = 777 + --let $i = $trx_num + --let $conn = master + --disable_query_log + while($i > 0) + { + --let $domain_id = `SELECT $i % $domain_num` + --eval set @@gtid_domain_id = $domain_id + # 'decision' to commit 0, or rollback 1 + --let $decision = `SELECT $i % 2` + --eval XA START '$conn$trx$i' + --eval UPDATE t1 SET b = 1 - 2 * $decision WHERE a = 1 + --eval XA END '$conn$trx$i' + --eval XA PREPARE '$conn$trx$i' + --let $term = COMMIT + if ($decision) + { + --let $term = ROLLBACK + } + --eval XA $term '$conn$trx$i' + + --dec $i + } + --enable_query_log + --source include/save_master_gtid.inc + + --connection slave + if (`select $mode = 1`) + { + SET @@global.slave_parallel_mode ='conservative'; + } + --source include/start_slave.inc + --source include/sync_with_master_gtid.inc + --source include/stop_slave.inc + + --dec $mode +} + + +# Generations test. +# Create few ranges of XAP groups length of greater than +# 3 * slave_parallel_threads + 1 +# terminated upon each range. +--let $iter = 3 +--let $generation_len = @@global.slave_parallel_threads +--let $domain_num = 3 +--disable_query_log +--connection master +while ($iter) +{ + --let $k = `select 3 * 3 * $generation_len` + --let $_k = $k + while ($k) + { + --source include/count_sessions.inc + --connect(con$k,localhost,root,,) + # + # create XA events alternating gtid domains to run them in parallel on slave. + # + --let $domain_id = `SELECT $k % $domain_num` + --eval set @@gtid_domain_id = $domain_id + --eval XA START '$trx$k' + --eval INSERT INTO t1 VALUES ($k + 1, $iter) + --eval XA END '$trx$k' + --eval XA PREPARE '$trx$k' + + --disconnect con$k + --connection master + --source include/wait_until_count_sessions.inc + + --dec $k + } + + --connection master + --let $k = $_k + while ($k) + { + --let $term = COMMIT + --let $decision = `SELECT $k % 2` + if ($decision) + { + --let $term = ROLLBACK + } + --eval XA $term '$trx$k' + } + --dec $iter +} +--enable_query_log +--source include/save_master_gtid.inc + +--connection slave +SET @@global.slave_parallel_mode = 'optimistic'; +--source include/start_slave.inc +--source include/sync_with_master_gtid.inc + + +# +# Overall consistency check +# +--let $diff_tables= master:t1, slave:t1 +--source include/diff_tables.inc + + +# +# Clean up. +# +--connection slave +--source include/stop_slave.inc +SET @@global.slave_parallel_mode = @old_parallel_mode; +SET @@global.slave_parallel_threads = @old_parallel_threads; +SET @@global.slave_domain_parallel_threads = @old_slave_domain_parallel_threads; +--source include/start_slave.inc + +--connection master +DROP TABLE t1; +--source include/save_master_gtid.inc + +--connection slave +--source include/sync_with_master_gtid.inc + +--connection master +--source include/rpl_end.inc diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index f4fddcd1f65..262c571c771 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -4199,7 +4199,8 @@ int XA_prepare_log_event::do_commit() thd->lex->xid= &xid; if (!one_phase) { - if ((res= thd->wait_for_prior_commit())) + if (thd->is_current_stmt_binlog_disabled() && + (res= thd->wait_for_prior_commit())) return res; thd->lex->sql_command= SQLCOM_XA_PREPARE; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 68e42ecd03f..fdc960a8fc6 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -2325,6 +2325,80 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) } } + +/* + Check when we have done a complete round of scheduling for workers + 0, 1, ..., (rpl_thread_max-1), in this order. + This often occurs every rpl_thread_max event group, but XA XID dependency + restrictions can cause insertion of extra out-of-order worker scheduling + in-between the normal round-robin scheduling. +*/ +void +rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur) +{ + uint32 idx= static_cast(cur - rpl_threads); + DBUG_ASSERT(cur >= rpl_threads); + DBUG_ASSERT(cur < rpl_threads + rpl_thread_max); + if (idx == current_generation_idx) + { + ++idx; + if (idx >= rpl_thread_max) + { + /* A new generation; all workers have been scheduled at least once. */ + idx= 0; + ++current_generation; + } + current_generation_idx= idx; + } +} + + +rpl_parallel_entry::sched_bucket * +rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid) +{ + uint64 cur_gen= current_generation; + my_off_t i= 0; + while (i < maybe_active_xid.elements) + { + /* + Purge no longer active XID from the list: + + - In generation N, XID might have been scheduled for worker W. + - Events in generation (N+1) might run freely in parallel with W. + - Events in generation (N+2) will have done wait_for_prior_commit for + the event group with XID (or a later one), but the XID might still be + active for a bit longer after wakeup_prior_commit(). + - Events in generation (N+3) will have done wait_for_prior_commit() for + an event in W _after_ the XID, so are sure not to see the XID active. + + Therefore, XID can be safely scheduled to a different worker in + generation (N+3) when last prior use was in generation N (or earlier). + */ + xid_active_generation *a= + dynamic_element(&maybe_active_xid, i, xid_active_generation *); + if (a->generation + 3 <= cur_gen) + { + *a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid)); + continue; + } + if (xid->eq(&a->xid)) + { + /* Update the last used generation and return the match. */ + a->generation= cur_gen; + return a->thr; + } + ++i; + } + /* try to keep allocated memory in the range of [2,10] * initial_chunk_size */ + if (maybe_active_xid.elements <= 2 * active_xid_init_alloc() && + maybe_active_xid.max_element > 10 * active_xid_init_alloc()) + freeze_size(&maybe_active_xid); + + /* No matching XID conflicts. */ + return nullptr; +} + + /* Obtain a worker thread that we can queue an event to. @@ -2369,17 +2443,36 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) { - /* - For XA COMMIT/ROLLBACK, choose the same bucket as the XA PREPARE, - overriding the round-robin scheduling. - */ - uint32 idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; - rpl_threads[idx].unlink(); - thread_sched_fifo->append(rpl_threads + idx); + if ((cur_thr= check_xa_xid_dependency(>id_ev->xid))) + { + /* + A previously scheduled event group with the same XID might still be + active in a worker, so schedule this event group in the same worker + to avoid a conflict. + */ + cur_thr->unlink(); + thread_sched_fifo->append(cur_thr); + } + else + { + /* Record this XID now active. */ + xid_active_generation *a= + (xid_active_generation *)alloc_dynamic(&maybe_active_xid); + if (!a) + return NULL; + a->thr= cur_thr= thread_sched_fifo->head(); + a->generation= current_generation; + a->xid.set(>id_ev->xid); + } } + else + cur_thr= thread_sched_fifo->head(); + + check_scheduling_generation(cur_thr); } - cur_thr= thread_sched_fifo->head(); + else + cur_thr= thread_sched_fifo->head(); + thr= cur_thr->thr; if (thr) @@ -2471,6 +2564,7 @@ free_rpl_parallel_entry(void *element) dealloc_gco(e->current_gco); e->current_gco= prev_gco; } + delete_dynamic(&e->maybe_active_xid); mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); my_free(e); @@ -2524,11 +2618,26 @@ rpl_parallel::find(uint32 domain_id) my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; } + /* Initialize a FIFO of scheduled worker threads. */ e->thread_sched_fifo = new (fifo) I_List; - for (ulong i= 0; i < count; ++i) - e->thread_sched_fifo->push_back(::new (p+i) rpl_parallel_entry::sched_bucket); + /* + (We cycle the FIFO _before_ allocating next entry in + rpl_parallel_entry::choose_thread(). So initialize the FIFO with the + highest element at the front, just so that the first event group gets + scheduled on entry 0). + */ + e->thread_sched_fifo-> + push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket); + for (ulong i= 0; i < count-1; ++i) + e->thread_sched_fifo-> + push_back(::new (p+i) rpl_parallel_entry::sched_bucket); e->rpl_threads= p; e->rpl_thread_max= count; + e->current_generation = 0; + e->current_generation_idx = 0; + init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid, + sizeof(rpl_parallel_entry::xid_active_generation), + 0, e->active_xid_init_alloc(), 0, MYF(0)); e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 9ba86453d06..a605b977473 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -326,10 +326,26 @@ struct rpl_parallel_thread_pool { struct rpl_parallel_entry { + /* + A small struct to put worker threads references into a FIFO (using an + I_List) for round-robin scheduling. + */ struct sched_bucket : public ilink { sched_bucket() : thr(nullptr) { } rpl_parallel_thread *thr; }; + /* + A struct to keep track of into which "generation" an XA XID was last + scheduled. A "generation" means that we know that every worker thread + slot in the rpl_parallel_entry was scheduled at least once. When more + that two generations have passed, we can safely reuse the XID in a + different worker. + */ + struct xid_active_generation { + uint64 generation; + sched_bucket *thr; + xid_t xid; + }; mysql_mutex_t LOCK_parallel_entry; mysql_cond_t COND_parallel_entry; @@ -373,6 +389,23 @@ struct rpl_parallel_entry { sched_bucket *rpl_threads; I_List *thread_sched_fifo; uint32 rpl_thread_max; + /* + Keep track of all XA XIDs that may still be active in a worker thread. + The elements are of type xid_active_generation. + */ + DYNAMIC_ARRAY maybe_active_xid; + /* + Keeping track of the current scheduling generation. + + A new generation means that every worker thread in the rpl_threads array + have been scheduled at least one event group. + + When we have scheduled to slot current_generation_idx= 0, 1, ..., N-1 in this + order, we know that (at least) one generation has passed. + */ + uint64 current_generation; + uint32 current_generation_idx; + /* The sub_id of the last transaction to commit within this domain_id. Must be accessed under LOCK_parallel_entry protection. @@ -426,11 +459,19 @@ struct rpl_parallel_entry { /* The group_commit_orderer object for the events currently being queued. */ group_commit_orderer *current_gco; + void check_scheduling_generation(sched_bucket *cur); + sched_bucket *check_xa_xid_dependency(xid_t *xid); rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, Gtid_log_event *gtid_ev); int queue_master_restart(rpl_group_info *rgi, Format_description_log_event *fdev); + /* + the initial size of maybe_ array corresponds to the case of + each worker receives perhaps unlikely XA-PREPARE and XA-COMMIT within + the same generation. + */ + inline uint active_xid_init_alloc() { return 3 * 2 * rpl_thread_max; } }; struct rpl_parallel { HASH domain_hash;