diff --git a/mysql-test/suite/rpl/r/rpl_parallel.result b/mysql-test/suite/rpl/r/rpl_parallel.result index fb86d46b01e..ac21e7a3e01 100644 --- a/mysql-test/suite/rpl/r/rpl_parallel.result +++ b/mysql-test/suite/rpl/r/rpl_parallel.result @@ -923,6 +923,55 @@ a 32 33 34 +*** MDEV-6775: Wrong binlog order in parallel replication *** +DELETE FROM t4; +INSERT INTO t4 VALUES (1,NULL), (3,NULL), (4,4), (5, NULL), (6, 6); +include/save_master_gtid.inc +include/sync_with_master_gtid.inc +include/stop_slave.inc +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,inject_binlog_commit_before_get_LOCK_log"; +SET @old_format=@@GLOBAL.binlog_format; +SET GLOBAL binlog_format=ROW; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +SET @old_format= @@binlog_format; +SET binlog_format= statement; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; +UPDATE t4 SET b=NULL WHERE a=6; +SET debug_sync='now WAIT_FOR master_queued1'; +SET @old_format= @@binlog_format; +SET binlog_format= statement; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; +DELETE FROM t4 WHERE b <= 3; +SET debug_sync='now WAIT_FOR master_queued2'; +SET debug_sync='now SIGNAL master_cont1'; +SET binlog_format= @old_format; +SET binlog_format= @old_format; +SET debug_sync='RESET'; +SELECT * FROM t4 ORDER BY a; +a b +1 NULL +3 NULL +4 4 +5 NULL +6 NULL +include/start_slave.inc +SET debug_sync= 'now WAIT_FOR waiting'; +SELECT * FROM t4 ORDER BY a; +a b +1 NULL +3 NULL +4 4 +5 NULL +6 NULL +SET debug_sync= 'now SIGNAL cont'; +include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL binlog_format= @old_format; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +include/start_slave.inc include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; include/start_slave.inc diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test index 4f01ef7765b..d3ec08f5508 100644 --- a/mysql-test/suite/rpl/t/rpl_parallel.test +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -1466,6 +1466,75 @@ SET sql_slave_skip_counter= 1; SELECT * FROM t2 WHERE a >= 30 ORDER BY a; +--echo *** MDEV-6775: Wrong binlog order in parallel replication *** +--connection server_1 +# A bit tricky bug to reproduce. On the master, we binlog in statement-mode +# two transactions, an UPDATE followed by a DELETE. On the slave, we replicate +# with binlog-mode set to ROW, which means the DELETE, which modifies no rows, +# is not binlogged. Then we inject a wait in the group commit code on the +# slave, shortly before the actual commit of the UPDATE. The bug was that the +# DELETE could wake up from wait_for_prior_commit() before the commit of the +# UPDATE. So the test could see the slave position updated to after DELETE, +# while the UPDATE was still not visible. +DELETE FROM t4; +INSERT INTO t4 VALUES (1,NULL), (3,NULL), (4,4), (5, NULL), (6, 6); +--source include/save_master_gtid.inc + +--connection server_2 +--source include/sync_with_master_gtid.inc +--source include/stop_slave.inc +SET @old_dbug= @@GLOBAL.debug_dbug; +SET GLOBAL debug_dbug="+d,inject_binlog_commit_before_get_LOCK_log"; +SET @old_format=@@GLOBAL.binlog_format; +SET GLOBAL binlog_format=ROW; +# Re-spawn the worker threads to be sure they pick up the new binlog format +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; + +--connection con1 +SET @old_format= @@binlog_format; +SET binlog_format= statement; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued1 WAIT_FOR master_cont1'; +send UPDATE t4 SET b=NULL WHERE a=6; +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued1'; + +--connection con2 +SET @old_format= @@binlog_format; +SET binlog_format= statement; +SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2'; +send DELETE FROM t4 WHERE b <= 3; + +--connection server_1 +SET debug_sync='now WAIT_FOR master_queued2'; +SET debug_sync='now SIGNAL master_cont1'; + +--connection con1 +REAP; +SET binlog_format= @old_format; +--connection con2 +REAP; +SET binlog_format= @old_format; +SET debug_sync='RESET'; +--save_master_pos +SELECT * FROM t4 ORDER BY a; + +--connection server_2 +--source include/start_slave.inc +SET debug_sync= 'now WAIT_FOR waiting'; +--sync_with_master +SELECT * FROM t4 ORDER BY a; +SET debug_sync= 'now SIGNAL cont'; + +# Re-spawn the worker threads to remove any DBUG injections or DEBUG_SYNC. +--source include/stop_slave.inc +SET GLOBAL debug_dbug=@old_dbug; +SET GLOBAL binlog_format= @old_format; +SET GLOBAL slave_parallel_threads=0; +SET GLOBAL slave_parallel_threads=10; +--source include/start_slave.inc + + --connection server_2 --source include/stop_slave.inc SET GLOBAL slave_parallel_threads=@old_parallel_threads; diff --git a/sql/log.cc b/sql/log.cc index 6a08bb48631..34795db66b8 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -6771,6 +6771,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, to commit. If so, we add those to the queue as well, transitively for all waiters. + And if a transaction is marked to wait for a prior transaction, but that + prior transaction is already queued for group commit, then we can queue the + new transaction directly to participate in the group commit. + @retval < 0 Error @retval > 0 If queued as the first entry in the queue (meaning this is the leader) @@ -6780,8 +6784,8 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd, int MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) { - group_commit_entry *entry, *orig_queue; - wait_for_commit *cur, *last; + group_commit_entry *entry, *orig_queue, *last; + wait_for_commit *cur; wait_for_commit *wfc; DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit"); @@ -6798,8 +6802,17 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) if (wfc && wfc->waitee) { mysql_mutex_lock(&wfc->LOCK_wait_commit); - /* Do an extra check here, this time safely under lock. */ - if (wfc->waitee) + /* + Do an extra check here, this time safely under lock. + + If waitee->commit_started is set, it means that the transaction we need + to wait for has already queued up for group commit. In this case it is + safe for us to queue up immediately as well, increasing the opprtunities + for group commit. Because waitee has taken the LOCK_prepare_ordered + before setting the flag, so there is no risk that we can queue ahead of + it. + */ + if (wfc->waitee && !wfc->waitee->commit_started) { PSI_stage_info old_stage; wait_for_commit *loc_waitee; @@ -6812,6 +6825,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) This other transaction may then take over the commit process for us to get us included in its own group commit. If this happens, the queued_by_other flag is set. + + Setting this flag may or may not be seen by the other thread, but we + are safe in any case: The other thread will set queued_by_other under + its LOCK_wait_commit, and we will not check queued_by_other only after + we have been woken up. */ wfc->opaque_pointer= orig_entry; DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior"); @@ -6884,41 +6902,41 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) /* Iteratively process everything added to the queue, looking for waiters, and their waiters, and so on. If a waiter is ready to commit, we - immediately add it to the queue; if not we just wake it up. + immediately add it to the queue, and mark it as queued_by_other. This would be natural to do with recursion, but we want to avoid potentially unbounded recursion blowing the C stack, so we use the list approach instead. - We keep a list of all the waiters that need to be processed in `list', - linked through the next_subsequent_commit pointer. Initially this list - contains only the entry passed into this function. + We keep a list of the group_commit_entry of all the waiters that need to + be processed. Initially this list contains only the entry passed into this + function. We process entries in the list one by one. The element currently being - processed is pointed to by `cur`, and the element at the end of the list + processed is pointed to by `entry`, and the element at the end of the list is pointed to by `last` (we do not use NULL to terminate the list). - As we process an element, it is first added to the group_commit_queue. - Then any waiters for that element are added at the end of the list, to - be processed in subsequent iterations. This continues until the list - is exhausted, with all elements ever added eventually processed. + As we process an entry, any waiters for that entry are added at the end of + the list, to be processed in subsequent iterations. The the entry is added + to the group_commit_queue. This continues until the list is exhausted, + with all entries ever added eventually processed. The end result is a breath-first traversal of the tree of waiters, - re-using the next_subsequent_commit pointers in place of extra stack - space in a recursive traversal. + re-using the `next' pointers of the group_commit_entry objects in place of + extra stack space in a recursive traversal. - The temporary list created in next_subsequent_commit is not - used by the caller or any other function. + The temporary list linked through these `next' pointers is not used by the + caller or any other function; it only exists while doing the iterative + tree traversal. After, all the processed entries are linked into the + group_commit_queue. */ cur= wfc; - last= wfc; + last= orig_entry; entry= orig_entry; for (;;) { - /* Add the entry to the group commit queue. */ - entry->next= group_commit_queue; - group_commit_queue= entry; + group_commit_entry *next_entry; if (entry->cache_mngr->using_xa) { @@ -6927,135 +6945,95 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) DEBUG_SYNC(entry->thd, "commit_after_prepare_ordered"); } - if (!cur) - break; // Can happen if initial entry has no wait_for_commit - - /* - Check if this transaction has other transaction waiting for it to commit. - - If so, process the waiting transactions, and their waiters and so on, - transitively. - */ - if (cur->subsequent_commits_list) + if (cur) { - wait_for_commit *waiter; - wait_for_commit *wakeup_list= NULL; - wait_for_commit **wakeup_next_ptr= &wakeup_list; - - mysql_mutex_lock(&cur->LOCK_wait_commit); /* - Grab the list, now safely under lock, and process it if still - non-empty. + Now that we have taken LOCK_prepare_ordered and will queue up in the + group commit queue, it is safe for following transactions to queue + themselves. We will grab here any transaction that is now ready to + queue up, but after that, more transactions may become ready while the + leader is waiting to start the group commit. So set the flag + `commit_started', so that later transactions can still participate in + the group commit.. */ - waiter= cur->subsequent_commits_list; - cur->subsequent_commits_list= NULL; - while (waiter) + cur->commit_started= true; + + /* + Check if this transaction has other transaction waiting for it to + commit. + + If so, process the waiting transactions, and their waiters and so on, + transitively. + */ + if (cur->subsequent_commits_list) { - wait_for_commit *next= waiter->next_subsequent_commit; - group_commit_entry *entry2= - (group_commit_entry *)waiter->opaque_pointer; - if (entry2) - { - /* - This is another transaction ready to be written to the binary - log. We can put it into the queue directly, without needing a - separate context switch to the other thread. We just set a flag - so that the other thread will know when it wakes up that it was - already processed. + wait_for_commit *waiter, **waiter_ptr; - So put it at the end of the list to be processed in a subsequent - iteration of the outer loop. - */ - entry2->queued_by_other= true; - last->next_subsequent_commit= waiter; - last= waiter; - /* - As a small optimisation, we do not actually need to set - waiter->next_subsequent_commit to NULL, as we can use the - pointer `last' to check for end-of-list. - */ - } - else - { - /* - Wake up the waiting transaction. - - For this, we need to set the "wakeup running" flag and release - the waitee lock to avoid a deadlock, see comments on - THD::wakeup_subsequent_commits2() for details. - - So we need to put these on a list and delay the wakeup until we - have released the lock. - */ - *wakeup_next_ptr= waiter; - wakeup_next_ptr= &waiter->next_subsequent_commit; - } - waiter= next; - } - if (wakeup_list) - { - /* Now release our lock and do the wakeups that were delayed above. */ - cur->wakeup_subsequent_commits_running= true; - mysql_mutex_unlock(&cur->LOCK_wait_commit); - for (;;) - { - wait_for_commit *next; - - /* - ToDo: We wakeup the waiter here, so that it can have the chance to - reach its own commit state and queue up for this same group commit, - if it is still pending. - - One problem with this is that if the waiter does not reach its own - commit state before this group commit starts, and then the group - commit fails (binlog write failure), we do not get to propagate - the error to the waiter. - - A solution for this could be to delay the wakeup until commit is - successful. But then we need to set a flag in the waitee that it is - already queued for group commit, so that the waiter can check this - flag and queue itself if it _does_ reach the commit state in time. - - (But error handling in case of binlog write failure is currently - broken in other ways, as well). - */ - if (&wakeup_list->next_subsequent_commit == wakeup_next_ptr) - { - /* The last one in the list. */ - wakeup_list->wakeup(0); - break; - } - /* - Important: don't access wakeup_list->next after the wakeup() call, - it may be invalidated by the other thread. - */ - next= wakeup_list->next_subsequent_commit; - wakeup_list->wakeup(0); - wakeup_list= next; - } + mysql_mutex_lock(&cur->LOCK_wait_commit); /* - We need a full memory barrier between walking the list and clearing - the flag wakeup_subsequent_commits_running. This barrier is needed - to ensure that no other thread will start to modify the list - pointers before we are done traversing the list. - - But wait_for_commit::wakeup(), which was called above, does a full - memory barrier already (it locks a mutex). + Grab the list, now safely under lock, and process it if still + non-empty. */ - cur->wakeup_subsequent_commits_running= false; - } - else + waiter= cur->subsequent_commits_list; + waiter_ptr= &cur->subsequent_commits_list; + while (waiter) + { + wait_for_commit *next_waiter= waiter->next_subsequent_commit; + group_commit_entry *entry2= + (group_commit_entry *)waiter->opaque_pointer; + if (entry2) + { + /* + This is another transaction ready to be written to the binary + log. We can put it into the queue directly, without needing a + separate context switch to the other thread. We just set a flag + so that the other thread will know when it wakes up that it was + already processed. + + So remove it from the list of our waiters, and instead put it at + the end of the list to be processed in a subsequent iteration of + the outer loop. + */ + *waiter_ptr= next_waiter; + entry2->queued_by_other= true; + last->next= entry2; + last= entry2; + /* + As a small optimisation, we do not actually need to set + entry2->next to NULL, as we can use the pointer `last' to check + for end-of-list. + */ + } + else + { + /* + This transaction is not ready to participate in the group commit + yet, so leave it in the waiter list. It might join the group + commit later, if it completes soon enough to do so (it will see + our wfc->commit_started flag set), or it might commit later in a + later group commit. + */ + waiter_ptr= &waiter->next_subsequent_commit; + } + waiter= next_waiter; + } mysql_mutex_unlock(&cur->LOCK_wait_commit); + } } - if (cur == last) + + /* Add the entry to the group commit queue. */ + next_entry= entry->next; + entry->next= group_commit_queue; + group_commit_queue= entry; + if (entry == last) break; /* Move to the next entry in the flattened list of waiting transactions that still need to be processed transitively. */ - cur= cur->next_subsequent_commit; - entry= (group_commit_entry *)cur->opaque_pointer; + entry= next_entry; DBUG_ASSERT(entry != NULL); + cur= entry->thd->wait_for_commit_ptr; } if (opt_binlog_commit_wait_count > 0) @@ -7111,6 +7089,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) DEBUG_SYNC(entry->thd, "commit_after_group_run_commit_ordered"); } mysql_mutex_unlock(&LOCK_commit_ordered); + entry->thd->wakeup_subsequent_commits(entry->error); if (next) { @@ -7144,7 +7123,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry) } if (likely(!entry->error)) - return 0; + return entry->thd->wait_for_prior_commit(); switch (entry->error) { @@ -7202,10 +7181,15 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) LINT_INIT(binlog_id); { + DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log", + DBUG_ASSERT(!debug_sync_set_action(leader->thd, STRING_WITH_LEN + ("commit_before_get_LOCK_log SIGNAL waiting WAIT_FOR cont TIMEOUT 1"))); + ); /* Lock the LOCK_log(), and once we get it, collect any additional writes that queued up while we were waiting. */ + DEBUG_SYNC(leader->thd, "commit_before_get_LOCK_log"); mysql_mutex_lock(&LOCK_log); DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log"); @@ -7412,6 +7396,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader) if (current->cache_mngr->using_xa && !current->error && DBUG_EVALUATE_IF("skip_commit_ordered", 0, 1)) run_commit_ordered(current->thd, current->all); + current->thd->wakeup_subsequent_commits(current->error); /* Careful not to access current->next after waking up the other thread! As diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 31772403f32..4bad191777c 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -6342,6 +6342,7 @@ wait_for_commit::reinit() opaque_pointer= NULL; wakeup_error= 0; wakeup_subsequent_commits_running= false; + commit_started= false; #ifdef SAFE_MUTEX /* When using SAFE_MUTEX, the ordering between taking the LOCK_wait_commit diff --git a/sql/sql_class.h b/sql/sql_class.h index 762328c1a23..4ec16bff33d 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1713,6 +1713,16 @@ struct wait_for_commit on that function for details. */ bool wakeup_subsequent_commits_running; + /* + This flag can be set when a commit starts, but has not completed yet. + It is used by binlog group commit to allow a waiting transaction T2 to + join the group commit of an earlier transaction T1. When T1 has queued + itself for group commit, it will set the commit_started flag. Then when + T2 becomes ready to commit and needs to wait for T1 to commit first, T2 + can queue itself before waiting, and thereby participate in the same + group commit as T1. + */ + bool commit_started; void register_wait_for_prior_commit(wait_for_commit *waitee); int wait_for_prior_commit(THD *thd)