diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 1ad2fa782ad..154a8c0f98f 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -8,6 +8,15 @@ Code for optional parallel execution of replicated events on the slave. */ + +/* + Maximum number of queued events to accumulate in a local free list, before + moving them to the global free list. There is additional a limit of how much + to accumulate based on opt_slave_parallel_max_queued. +*/ +#define QEV_BATCH_FREE 200 + + struct rpl_parallel_thread_pool global_rpl_thread_pool; static void signal_error_to_sql_driver_thread(THD *thd, rpl_group_info *rgi, @@ -510,14 +519,8 @@ handle_rpl_parallel_thread(void *arg) rpl_group_info *group_rgi= NULL; group_commit_orderer *gco, *tmp_gco; uint64 event_gtid_sub_id= 0; - rpl_parallel_thread::queued_event *qevs_to_free; - rpl_group_info *rgis_to_free; - group_commit_orderer *gcos_to_free; rpl_sql_thread_info sql_info(NULL); - size_t total_event_size; int err; - inuse_relaylog *last_ir; - uint64 accumulated_ir_count; struct rpl_parallel_thread *rpt= (struct rpl_parallel_thread *)arg; @@ -559,6 +562,8 @@ handle_rpl_parallel_thread(void *arg) while (!rpt->stop) { + rpl_parallel_thread::queued_event *qev, *next_qev; + thd->ENTER_COND(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread, &stage_waiting_for_work_from_sql_thread, &old_stage); /* @@ -580,28 +585,21 @@ handle_rpl_parallel_thread(void *arg) thd->EXIT_COND(&old_stage); more_events: - qevs_to_free= NULL; - rgis_to_free= NULL; - gcos_to_free= NULL; - total_event_size= 0; - while (events) + for (qev= events; qev; qev= next_qev) { - struct rpl_parallel_thread::queued_event *next= events->next; Log_event_type event_type; - rpl_group_info *rgi= events->rgi; + rpl_group_info *rgi= qev->rgi; rpl_parallel_entry *entry= rgi->parallel_entry; bool end_of_group, group_ending; - total_event_size+= events->event_size; - if (events->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) + next_qev= qev->next; + if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_POS_UPDATE) { - handle_queued_pos_update(thd, events); - events->next= qevs_to_free; - qevs_to_free= events; - events= next; + handle_queued_pos_update(thd, qev); + rpt->loc_free_qev(qev); continue; } - else if (events->typ == + else if (qev->typ == rpl_parallel_thread::queued_event::QUEUED_MASTER_RESTART) { if (in_event_group) @@ -613,24 +611,21 @@ handle_rpl_parallel_thread(void *arg) group_rgi->cleanup_context(thd, 1); in_event_group= false; finish_event_group(thd, group_rgi->gtid_sub_id, - events->entry_for_queued, group_rgi); + qev->entry_for_queued, group_rgi); - group_rgi->next= rgis_to_free; - rgis_to_free= group_rgi; + rpt->loc_free_rgi(group_rgi); thd->rgi_slave= group_rgi= NULL; } - events->next= qevs_to_free; - qevs_to_free= events; - events= next; + rpt->loc_free_qev(qev); continue; } - DBUG_ASSERT(events->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); + DBUG_ASSERT(qev->typ==rpl_parallel_thread::queued_event::QUEUED_EVENT); thd->rgi_slave= group_rgi= rgi; gco= rgi->gco; /* Handle a new event group, which will be initiated by a GTID event. */ - if ((event_type= events->ev->get_type_code()) == GTID_EVENT) + if ((event_type= qev->ev->get_type_code()) == GTID_EVENT) { bool did_enter_cond= false; PSI_stage_info old_stage; @@ -643,7 +638,7 @@ handle_rpl_parallel_thread(void *arg) similar), without any terminating COMMIT/ROLLBACK/XID. */ group_standalone= - (0 != (static_cast(events->ev)->flags2 & + (0 != (static_cast(qev->ev)->flags2 & Gtid_log_event::FL_STANDALONE)); event_gtid_sub_id= rgi->gtid_sub_id; @@ -704,8 +699,7 @@ handle_rpl_parallel_thread(void *arg) */ DBUG_ASSERT(!tmp_gco->prev_gco); gco->prev_gco= NULL; - tmp_gco->next_gco= gcos_to_free; - gcos_to_free= tmp_gco; + rpt->loc_free_gco(tmp_gco); } if (entry->force_abort && wait_count > entry->stop_count) @@ -766,7 +760,7 @@ handle_rpl_parallel_thread(void *arg) } } - group_ending= is_group_ending(events->ev, event_type); + group_ending= is_group_ending(qev->ev, event_type); if (group_ending && likely(!rgi->worker_error)) { DEBUG_SYNC(thd, "rpl_parallel_before_mark_start_commit"); @@ -782,20 +776,20 @@ handle_rpl_parallel_thread(void *arg) if (likely(!rgi->worker_error) && !skip_event_group) { ++rgi->retry_event_count; - err= rpt_handle_event(events, rpt); - delete_or_keep_event_post_apply(rgi, event_type, events->ev); + err= rpt_handle_event(qev, rpt); + delete_or_keep_event_post_apply(rgi, event_type, qev->ev); DBUG_EXECUTE_IF("rpl_parallel_simulate_temp_err_gtid_0_x_100", err= dbug_simulate_tmp_error(rgi, thd);); if (err) { convert_kill_to_deadlock_error(rgi); if (has_temporary_error(thd) && slave_trans_retries > 0) - err= retry_event_group(rgi, rpt, events); + err= retry_event_group(rgi, rpt, qev); } } else { - delete events->ev; + delete qev->ev; err= thd->wait_for_prior_commit(); } @@ -804,8 +798,7 @@ handle_rpl_parallel_thread(void *arg) ((group_standalone && !Log_event::is_part_of_group(event_type)) || group_ending); - events->next= qevs_to_free; - qevs_to_free= events; + rpt->loc_free_qev(qev); if (unlikely(err)) { @@ -820,61 +813,20 @@ handle_rpl_parallel_thread(void *arg) { in_event_group= false; finish_event_group(thd, event_gtid_sub_id, entry, rgi); - rgi->next= rgis_to_free; - rgis_to_free= rgi; + rpt->loc_free_rgi(rgi); thd->rgi_slave= group_rgi= rgi= NULL; skip_event_group= false; DEBUG_SYNC(thd, "rpl_parallel_end_of_group"); } - - events= next; } mysql_mutex_lock(&rpt->LOCK_rpl_thread); - /* Signal that our queue can now accept more events. */ - rpt->dequeue2(total_event_size); - mysql_cond_signal(&rpt->COND_rpl_thread_queue); - /* We need to delay the free here, to when we have the lock. */ - while (gcos_to_free) - { - group_commit_orderer *next= gcos_to_free->next_gco; - rpt->free_gco(gcos_to_free); - gcos_to_free= next; - } - while (rgis_to_free) - { - rpl_group_info *next= rgis_to_free->next; - rpt->free_rgi(rgis_to_free); - rgis_to_free= next; - } - last_ir= NULL; - accumulated_ir_count= 0; - while (qevs_to_free) - { - rpl_parallel_thread::queued_event *next= qevs_to_free->next; - inuse_relaylog *ir= qevs_to_free->ir; - /* Batch up refcount update to reduce use of synchronised operations. */ - if (last_ir != ir) - { - if (last_ir) - { - my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); - my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); - my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); - accumulated_ir_count= 0; - } - last_ir= ir; - } - ++accumulated_ir_count; - rpt->free_qev(qevs_to_free); - qevs_to_free= next; - } - if (last_ir) - { - my_atomic_rwlock_wrlock(&last_ir->inuse_relaylog_atomic_lock); - my_atomic_add64(&last_ir->dequeued_count, accumulated_ir_count); - my_atomic_rwlock_wrunlock(&last_ir->inuse_relaylog_atomic_lock); - } + /* + Now that we have the lock, we can move everything from our local free + lists to the real free lists that are also accessible from the SQL + driver thread. + */ + rpt->batch_free(); if ((events= rpt->event_queue) != NULL) { @@ -887,6 +839,7 @@ handle_rpl_parallel_thread(void *arg) mysql_mutex_unlock(&rpt->LOCK_rpl_thread); goto more_events; } + rpt->inuse_relaylog_refcount_update(); if (in_event_group && group_rgi->parallel_entry->force_abort) { @@ -1122,6 +1075,51 @@ err: } +void +rpl_parallel_thread::batch_free() +{ + mysql_mutex_assert_owner(&LOCK_rpl_thread); + if (loc_qev_list) + { + *loc_qev_last_ptr_ptr= qev_free_list; + qev_free_list= loc_qev_list; + loc_qev_list= NULL; + dequeue2(loc_qev_size); + /* Signal that our queue can now accept more events. */ + mysql_cond_signal(&COND_rpl_thread_queue); + loc_qev_size= 0; + qev_free_pending= 0; + } + if (loc_rgi_list) + { + *loc_rgi_last_ptr_ptr= rgi_free_list; + rgi_free_list= loc_rgi_list; + loc_rgi_list= NULL; + } + if (loc_gco_list) + { + *loc_gco_last_ptr_ptr= gco_free_list; + gco_free_list= loc_gco_list; + loc_gco_list= NULL; + } +} + + +void +rpl_parallel_thread::inuse_relaylog_refcount_update() +{ + inuse_relaylog *ir= accumulated_ir_last; + if (ir) + { + my_atomic_rwlock_wrlock(&ir->rli->inuse_relaylog_atomic_lock); + my_atomic_add64(&ir->dequeued_count, accumulated_ir_count); + my_atomic_rwlock_wrunlock(&ir->rli->inuse_relaylog_atomic_lock); + accumulated_ir_count= 0; + accumulated_ir_last= NULL; + } +} + + rpl_parallel_thread::queued_event * rpl_parallel_thread::get_qev_common(Log_event *ev, ulonglong event_size) { @@ -1175,6 +1173,43 @@ rpl_parallel_thread::retry_get_qev(Log_event *ev, queued_event *orig_qev, } +void +rpl_parallel_thread::loc_free_qev(rpl_parallel_thread::queued_event *qev) +{ + inuse_relaylog *ir= qev->ir; + inuse_relaylog *last_ir= accumulated_ir_last; + if (ir != last_ir) + { + if (last_ir) + inuse_relaylog_refcount_update(); + accumulated_ir_last= ir; + } + ++accumulated_ir_count; + if (!loc_qev_list) + loc_qev_last_ptr_ptr= &qev->next; + else + qev->next= loc_qev_list; + loc_qev_list= qev; + loc_qev_size+= qev->event_size; + /* + We want to release to the global free list only occasionally, to avoid + having to take the LOCK_rpl_thread muted too many times. + + However, we do need to release regularly. If we let the unreleased part + grow too large, then the SQL driver thread may go to sleep waiting for + the queue to drop below opt_slave_parallel_max_queued, and this in turn + can stall all other worker threads for more stuff to do. + */ + if (++qev_free_pending >= QEV_BATCH_FREE || + loc_qev_size >= opt_slave_parallel_max_queued/3) + { + mysql_mutex_lock(&LOCK_rpl_thread); + batch_free(); + mysql_mutex_unlock(&LOCK_rpl_thread); + } +} + + void rpl_parallel_thread::free_qev(rpl_parallel_thread::queued_event *qev) { @@ -1223,6 +1258,19 @@ rpl_parallel_thread::get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, } +void +rpl_parallel_thread::loc_free_rgi(rpl_group_info *rgi) +{ + DBUG_ASSERT(rgi->commit_orderer.waitee == NULL); + rgi->free_annotate_event(); + if (!loc_rgi_list) + loc_rgi_last_ptr_ptr= &rgi->next; + else + rgi->next= loc_rgi_list; + loc_rgi_list= rgi; +} + + void rpl_parallel_thread::free_rgi(rpl_group_info *rgi) { @@ -1257,12 +1305,14 @@ rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev) void -rpl_parallel_thread::free_gco(group_commit_orderer *gco) +rpl_parallel_thread::loc_free_gco(group_commit_orderer *gco) { - mysql_mutex_assert_owner(&LOCK_rpl_thread); DBUG_ASSERT(!gco->prev_gco /* Must not free until wait has completed. */); - gco->next_gco= gco_free_list; - gco_free_list= gco; + if (!loc_gco_list) + loc_gco_last_ptr_ptr= &gco->next_gco; + else + gco->next_gco= loc_gco_list; + loc_gco_list= gco; } diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h index b114ee4ebcb..239818855b8 100644 --- a/sql/rpl_parallel.h +++ b/sql/rpl_parallel.h @@ -96,9 +96,28 @@ struct rpl_parallel_thread { size_t event_size; } *event_queue, *last_in_queue; uint64 queued_size; + /* These free lists are protected by LOCK_rpl_thread. */ queued_event *qev_free_list; rpl_group_info *rgi_free_list; group_commit_orderer *gco_free_list; + /* + These free lists are local to the thread, so need not be protected by any + lock. They are moved to the global free lists in batches in the function + batch_free(), to reduce LOCK_rpl_thread contention. + + The lists are not NULL-terminated (as we do not need to traverse them). + Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the + `next' pointer of the last element, which is used to link into the front + of the global freelists. + */ + queued_event *loc_qev_list, **loc_qev_last_ptr_ptr; + size_t loc_qev_size; + uint64 qev_free_pending; + rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr; + group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr; + /* These keep track of batch update of inuse_relaylog refcounts. */ + inuse_relaylog *accumulated_ir_last; + uint64 accumulated_ir_count; void enqueue(queued_event *qev) { @@ -127,12 +146,41 @@ struct rpl_parallel_thread { queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev, const char *relay_log_name, ulonglong event_pos, ulonglong event_size); + /* + Put a qev on the local free list, to be later released to the global free + list by batch_free(). + */ + void loc_free_qev(queued_event *qev); + /* + Release an rgi immediately to the global free list. Requires holding the + LOCK_rpl_thread mutex. + */ void free_qev(queued_event *qev); rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, rpl_parallel_entry *e, ulonglong event_size); + /* + Put an gco on the local free list, to be later released to the global free + list by batch_free(). + */ + void loc_free_rgi(rpl_group_info *rgi); + /* + Release an rgi immediately to the global free list. Requires holding the + LOCK_rpl_thread mutex. + */ void free_rgi(rpl_group_info *rgi); group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev); - void free_gco(group_commit_orderer *gco); + /* + Put a gco on the local free list, to be later released to the global free + list by batch_free(). + */ + void loc_free_gco(group_commit_orderer *gco); + /* + Move all local free lists to the global ones. Requires holding + LOCK_rpl_thread. + */ + void batch_free(); + /* Update inuse_relaylog refcounts with what we have accumulated so far. */ + void inuse_relaylog_refcount_update(); }; diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index bb8528c5a98..d21ebd494c1 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1390,6 +1390,7 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); return 1; } + ir->rli= this; strmake_buf(ir->name, name); if (!inuse_relaylog_list) diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 3a8d87030ad..9885417aa3f 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -496,6 +496,7 @@ private: */ struct inuse_relaylog { inuse_relaylog *next; + Relay_log_info *rli; /* Number of events in this relay log queued for worker threads. */ int64 queued_count; /* Number of events completed by worker threads. */