diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 21c3dcf6d90..e5c700041ef 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -16,6 +16,7 @@ - Error handling. If we fail in one of multiple parallel executions, we need to make a best effort to complete prior transactions and roll back following transactions, so slave binlog position will be correct. + And all the retry logic for temporary errors like deadlock. - Stopping the slave needs to handle stopping all parallel executions. And the logic in sql_slave_killed() that waits for current event group to @@ -73,7 +74,6 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, mysql_mutex_lock(&rli->data_lock); err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt); /* ToDo: error handling. */ - /* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */ } @@ -85,6 +85,7 @@ handle_rpl_parallel_thread(void *arg) struct rpl_parallel_thread::queued_event *events; bool group_standalone= true; bool in_event_group= false; + uint64 event_gtid_sub_id= 0; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -142,6 +143,7 @@ handle_rpl_parallel_thread(void *arg) rpl_group_info *rgi= events->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; uint64 wait_for_sub_id; + bool end_of_group; if (event_type == GTID_EVENT) { @@ -150,6 +152,9 @@ handle_rpl_parallel_thread(void *arg) (0 != (static_cast(events->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); + /* Save this, as it gets cleared once event group commits. */ + event_gtid_sub_id= rgi->gtid_sub_id; + /* Register ourself to wait for the previous commit, if we need to do such registration _and_ that previous commit has not already @@ -173,43 +178,47 @@ handle_rpl_parallel_thread(void *arg) rpt_handle_event(events, thd, rpt); - if (in_event_group) + end_of_group= + in_event_group && + ((group_standalone && !Log_event::is_part_of_group(event_type)) || + event_type == XID_EVENT || + (event_type == QUERY_EVENT && + (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) || + !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query)))); + + /* ToDo: must use rgi here, not rli, for thread safety. */ + delete_or_keep_event_post_apply(rgi->rli, event_type, events->ev); + my_free(events); + + if (end_of_group) { - if ((group_standalone && !Log_event::is_part_of_group(event_type)) || - event_type == XID_EVENT || - (event_type == QUERY_EVENT && - (!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) || - !strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query)))) + in_event_group= false; + + rgi->commit_orderer.unregister_wait_for_prior_commit(); + thd->wait_for_commit_ptr= NULL; + + /* + Record that we have finished, so other event groups will no + longer attempt to wait for us to commit. + + We can race here with the next transactions, but that is fine, as + long as we check that we do not decrease last_committed_sub_id. If + this commit is done, then any prior commits will also have been + done and also no longer need waiting for. + */ + mysql_mutex_lock(&entry->LOCK_parallel_entry); + if (entry->last_committed_sub_id < event_gtid_sub_id) { - in_event_group= false; - - rgi->commit_orderer.unregister_wait_for_prior_commit(); - thd->wait_for_commit_ptr= NULL; - - /* - Record that we have finished, so other event groups will no - longer attempt to wait for us to commit. - - We can race here with the next transactions, but that is fine, as - long as we check that we do not decrease last_committed_sub_id. If - this commit is done, then any prior commits will also have been - done and also no longer need waiting for. - */ - mysql_mutex_lock(&entry->LOCK_parallel_entry); - if (entry->last_committed_sub_id < rgi->gtid_sub_id) - { - entry->last_committed_sub_id= rgi->gtid_sub_id; - if (entry->need_signal) - mysql_cond_broadcast(&entry->COND_parallel_entry); - } - mysql_mutex_unlock(&entry->LOCK_parallel_entry); - - rgi->commit_orderer.wakeup_subsequent_commits(); - delete rgi; + entry->last_committed_sub_id= event_gtid_sub_id; + if (entry->need_signal) + mysql_cond_broadcast(&entry->COND_parallel_entry); } + mysql_mutex_unlock(&entry->LOCK_parallel_entry); + + rgi->commit_orderer.wakeup_subsequent_commits(); + delete rgi; } - my_free(events); events= next; } @@ -487,7 +496,7 @@ rpl_parallel::wait_for_done() { e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); - while (e->current_sub_id > e->last_commit_id) + while (e->current_sub_id > e->last_committed_sub_id) mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); mysql_mutex_unlock(&e->LOCK_parallel_entry); } @@ -605,6 +614,8 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev, */ qev->rgi= serial_rgi; rpt_handle_event(qev, parent_thd, NULL); + delete_or_keep_event_post_apply(rli, typ, qev->ev); + return false; } else diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index bbf10dbcd51..f189f9adffa 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1556,4 +1556,50 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev) return 0; } + +void +delete_or_keep_event_post_apply(Relay_log_info *rli, + Log_event_type typ, Log_event *ev) +{ + /* + ToDo: This needs to work on rpl_group_info, not Relay_log_info, to be + thread-safe for parallel replication. + */ + + switch (typ) { + case FORMAT_DESCRIPTION_EVENT: + /* + Format_description_log_event should not be deleted because it + will be used to read info about the relay log's format; + it will be deleted when the SQL thread does not need it, + i.e. when this thread terminates. + */ + break; + case ANNOTATE_ROWS_EVENT: + /* + Annotate_rows event should not be deleted because after it has + been applied, thd->query points to the string inside this event. + The thd->query will be used to generate new Annotate_rows event + during applying the subsequent Rows events. + */ + rli->set_annotate_event((Annotate_rows_log_event*) ev); + break; + case DELETE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case WRITE_ROWS_EVENT: + /* + After the last Rows event has been applied, the saved Annotate_rows + event (if any) is not needed anymore and can be deleted. + */ + if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) + rli->free_annotate_event(); + /* fall through */ + default: + DBUG_PRINT("info", ("Deleting the event after it has been executed")); + if (!rli->is_deferred_event(ev)) + delete ev; + break; + } +} + #endif diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index b4daecadea8..c22773f9810 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -646,5 +646,7 @@ extern struct rpl_slave_state rpl_global_gtid_slave_state; int rpl_load_gtid_slave_state(THD *thd); int event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev); +void delete_or_keep_event_post_apply(Relay_log_info *rli, + Log_event_type typ, Log_event *ev); #endif /* RPL_RLI_H */ diff --git a/sql/slave.cc b/sql/slave.cc index 9b3df653384..474a6f902d2 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3264,41 +3264,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli, exec_res= apply_event_and_update_pos(ev, thd, serial_rgi, NULL); - switch (typ) { - case FORMAT_DESCRIPTION_EVENT: - /* - Format_description_log_event should not be deleted because it - will be used to read info about the relay log's format; - it will be deleted when the SQL thread does not need it, - i.e. when this thread terminates. - */ - break; - case ANNOTATE_ROWS_EVENT: - /* - Annotate_rows event should not be deleted because after it has - been applied, thd->query points to the string inside this event. - The thd->query will be used to generate new Annotate_rows event - during applying the subsequent Rows events. - */ - rli->set_annotate_event((Annotate_rows_log_event*) ev); - break; - case DELETE_ROWS_EVENT: - case UPDATE_ROWS_EVENT: - case WRITE_ROWS_EVENT: - /* - After the last Rows event has been applied, the saved Annotate_rows - event (if any) is not needed anymore and can be deleted. - */ - if (((Rows_log_event*)ev)->get_flags(Rows_log_event::STMT_END_F)) - rli->free_annotate_event(); - /* fall through */ - default: - DBUG_PRINT("info", ("Deleting the event after it has been executed")); - if (!rli->is_deferred_event(ev)) - delete ev; - break; - } - + delete_or_keep_event_post_apply(rli, typ, ev); /* update_log_pos failed: this should not happen, so we don't @@ -4363,6 +4329,14 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \ err: + /* + Once again, in case we aborted with an error and skipped the first one. + (We want the first one to be before the printout of stop position to + get the correct position printed.) + */ + if (opt_slave_parallel_threads > 0) + rli->parallel.wait_for_done(); + /* Some events set some playgrounds, which won't be cleared because thread stops. Stopping of this thread may not be known to these events ("stop"