diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 6ca582e4f21..98256118909 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -395,12 +395,13 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, } while (wait_count > entry->count_committing_event_groups); } - if (entry->force_abort && wait_count > entry->stop_count) + if (entry->force_abort && rgi->gtid_sub_id > entry->stop_sub_id) { /* - We are stopping (STOP SLAVE), and this event group is beyond the point - where we can safely stop. So return a flag that will cause us to skip, - rather than execute, the following events. + We are stopping (STOP SLAVE), and this event group need not be applied + before we can safely stop. So return a flag that will cause us to skip, + rather than execute, the following events. Once all queued events have + been skipped, the STOP SLAVE is complete (for this thread). */ return true; } @@ -2357,20 +2358,18 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) are also executed, so that we stop at a consistent point in the binlog stream (per replication domain). - All event groups wait for e->count_committing_event_groups to reach - the value of group_commit_orderer::wait_count before starting to - execute. Thus, at this point we know that any event group with a - strictly larger wait_count are safe to skip, none of them can have - started executing yet. So we set e->stop_count here and use it to - decide in the worker threads whether to continue executing an event - group or whether to skip it, when force_abort is set. + At this point, we are holding LOCK_parallel_entry, and we know that no + event group after e->largest_started_sub_id has started running yet. We + record this value in e->stop_sub_id, and then each event group can check + their own sub_id against it. If their sub_id is strictly larger, then + that event group will be skipped. If we stop due to reaching the START SLAVE UNTIL condition, then we need to continue executing any queued events up to that point. */ e->force_abort= true; - e->stop_count= rli->stop_for_until ? - e->count_queued_event_groups : e->count_committing_event_groups; + e->stop_sub_id= rli->stop_for_until ? + e->current_sub_id : e->largest_started_sub_id; mysql_mutex_unlock(&e->LOCK_parallel_entry); for (j= 0; j < e->rpl_thread_max; ++j) { @@ -2426,7 +2425,7 @@ rpl_parallel::stop_during_until() e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); if (e->force_abort) - e->stop_count= e->count_committing_event_groups; + e->stop_sub_id= e->largest_started_sub_id; mysql_mutex_unlock(&e->LOCK_parallel_entry); } } diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index 650aa06e504..b7304d204ee 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -276,13 +276,13 @@ struct rpl_parallel_entry { /* At STOP SLAVE (force_abort=true), we do not want to process all events in the queue (which could unnecessarily delay stop, if a lot of events happen - to be queued). The stop_count provides a safe point at which to stop, so + to be queued). The stop_sub_id provides a safe point at which to stop, so that everything before becomes committed and nothing after does. The value - corresponds to group_commit_orderer::wait_count; if wait_count is less than - or equal to stop_count, we execute the associated event group, else we - skip it (and all following) and stop. + corresponds to rpl_group_info::gtid_sub_id; if that is less than or equal + to stop_sub_id, we execute the associated event group, else we skip it (and + all following) and stop. */ - uint64 stop_count; + uint64 stop_sub_id; /* Cyclic array recording the last rpl_thread_max worker threads that we