Fixed failures in rpl_parallel2
Problem was that we used same condition variable with 2 different mutex. Fixed by changing to use COND_rpl_thread_stop instead of COND_parallel_entry for stopping threads. Patch by Kristian Nielsen
This commit is contained in:
parent
72dc30f217
commit
b30a768e7b
@ -1001,7 +1001,7 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
|
|||||||
PSI_cond_key key_RELAYLOG_COND_queue_busy;
|
PSI_cond_key key_RELAYLOG_COND_queue_busy;
|
||||||
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
|
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
|
||||||
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
|
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
|
||||||
key_COND_rpl_thread_pool,
|
key_COND_rpl_thread_stop, key_COND_rpl_thread_pool,
|
||||||
key_COND_parallel_entry, key_COND_group_commit_orderer,
|
key_COND_parallel_entry, key_COND_group_commit_orderer,
|
||||||
key_COND_prepare_ordered, key_COND_slave_init;
|
key_COND_prepare_ordered, key_COND_slave_init;
|
||||||
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
|
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
|
||||||
@ -1048,6 +1048,7 @@ static PSI_cond_info all_server_conds[]=
|
|||||||
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
|
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
|
||||||
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
|
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
|
||||||
{ &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0},
|
{ &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0},
|
||||||
|
{ &key_COND_rpl_thread_stop, "COND_rpl_thread_stop", 0},
|
||||||
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
|
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
|
||||||
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
|
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
|
||||||
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
|
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
|
||||||
|
@ -306,7 +306,7 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
|
|||||||
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
|
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
|
||||||
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
|
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
|
||||||
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
|
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
|
||||||
key_COND_rpl_thread_pool,
|
key_COND_rpl_thread_stop, key_COND_rpl_thread_pool,
|
||||||
key_COND_parallel_entry, key_COND_group_commit_orderer;
|
key_COND_parallel_entry, key_COND_group_commit_orderer;
|
||||||
extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
|
extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
|
||||||
|
|
||||||
|
@ -352,6 +352,7 @@ do_ftwrl_wait(rpl_group_info *rgi,
|
|||||||
THD *thd= rgi->thd;
|
THD *thd= rgi->thd;
|
||||||
rpl_parallel_entry *entry= rgi->parallel_entry;
|
rpl_parallel_entry *entry= rgi->parallel_entry;
|
||||||
uint64 sub_id= rgi->gtid_sub_id;
|
uint64 sub_id= rgi->gtid_sub_id;
|
||||||
|
DBUG_ENTER("do_ftwrl_wait");
|
||||||
|
|
||||||
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
mysql_mutex_assert_owner(&entry->LOCK_parallel_entry);
|
||||||
|
|
||||||
@ -391,6 +392,8 @@ do_ftwrl_wait(rpl_group_info *rgi,
|
|||||||
|
|
||||||
if (sub_id > entry->largest_started_sub_id)
|
if (sub_id > entry->largest_started_sub_id)
|
||||||
entry->largest_started_sub_id= sub_id;
|
entry->largest_started_sub_id= sub_id;
|
||||||
|
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -454,6 +457,7 @@ rpl_unpause_after_ftwrl(THD *thd)
|
|||||||
{
|
{
|
||||||
uint32 i;
|
uint32 i;
|
||||||
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
||||||
|
DBUG_ENTER("rpl_unpause_after_ftwrl");
|
||||||
|
|
||||||
DBUG_ASSERT(pool->busy);
|
DBUG_ASSERT(pool->busy);
|
||||||
|
|
||||||
@ -478,6 +482,7 @@ rpl_unpause_after_ftwrl(THD *thd)
|
|||||||
}
|
}
|
||||||
|
|
||||||
pool_mark_not_busy(pool);
|
pool_mark_not_busy(pool);
|
||||||
|
DBUG_VOID_RETURN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -492,6 +497,7 @@ rpl_pause_for_ftwrl(THD *thd)
|
|||||||
uint32 i;
|
uint32 i;
|
||||||
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
|
||||||
int err;
|
int err;
|
||||||
|
DBUG_ENTER("rpl_pause_for_ftwrl");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
While the count_pending_pause_for_ftwrl counter is non-zero, the pool
|
While the count_pending_pause_for_ftwrl counter is non-zero, the pool
|
||||||
@ -502,7 +508,7 @@ rpl_pause_for_ftwrl(THD *thd)
|
|||||||
as this can deadlock against release_thread()).
|
as this can deadlock against release_thread()).
|
||||||
*/
|
*/
|
||||||
if ((err= pool_mark_busy(pool, thd)))
|
if ((err= pool_mark_busy(pool, thd)))
|
||||||
return err;
|
DBUG_RETURN(err);
|
||||||
|
|
||||||
for (i= 0; i < pool->count; ++i)
|
for (i= 0; i < pool->count; ++i)
|
||||||
{
|
{
|
||||||
@ -549,7 +555,7 @@ rpl_pause_for_ftwrl(THD *thd)
|
|||||||
|
|
||||||
if (err)
|
if (err)
|
||||||
rpl_unpause_after_ftwrl(thd);
|
rpl_unpause_after_ftwrl(thd);
|
||||||
return err;
|
DBUG_RETURN(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1271,8 +1277,9 @@ handle_rpl_parallel_thread(void *arg)
|
|||||||
*/
|
*/
|
||||||
mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
|
mysql_mutex_lock(&rpt->current_entry->LOCK_parallel_entry);
|
||||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||||
mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
|
if (rpt->pause_for_ftwrl)
|
||||||
&rpt->current_entry->LOCK_parallel_entry);
|
mysql_cond_wait(&rpt->current_entry->COND_parallel_entry,
|
||||||
|
&rpt->current_entry->LOCK_parallel_entry);
|
||||||
mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
|
mysql_mutex_unlock(&rpt->current_entry->LOCK_parallel_entry);
|
||||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||||
/*
|
/*
|
||||||
@ -1309,7 +1316,7 @@ handle_rpl_parallel_thread(void *arg)
|
|||||||
/* Tell wait_for_done() that we are done, if it is waiting. */
|
/* Tell wait_for_done() that we are done, if it is waiting. */
|
||||||
if (likely(rpt->current_entry) &&
|
if (likely(rpt->current_entry) &&
|
||||||
unlikely(rpt->current_entry->force_abort))
|
unlikely(rpt->current_entry->force_abort))
|
||||||
mysql_cond_broadcast(&rpt->current_entry->COND_parallel_entry);
|
mysql_cond_broadcast(&rpt->COND_rpl_thread_stop);
|
||||||
rpt->current_entry= NULL;
|
rpt->current_entry= NULL;
|
||||||
if (!rpt->stop)
|
if (!rpt->stop)
|
||||||
rpt->pool->release_thread(rpt);
|
rpt->pool->release_thread(rpt);
|
||||||
@ -1389,6 +1396,8 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
|
|||||||
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
|
mysql_cond_init(key_COND_rpl_thread, &new_list[i]->COND_rpl_thread, NULL);
|
||||||
mysql_cond_init(key_COND_rpl_thread_queue,
|
mysql_cond_init(key_COND_rpl_thread_queue,
|
||||||
&new_list[i]->COND_rpl_thread_queue, NULL);
|
&new_list[i]->COND_rpl_thread_queue, NULL);
|
||||||
|
mysql_cond_init(key_COND_rpl_thread_stop,
|
||||||
|
&new_list[i]->COND_rpl_thread_stop, NULL);
|
||||||
new_list[i]->pool= pool;
|
new_list[i]->pool= pool;
|
||||||
if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
|
if (mysql_thread_create(key_rpl_parallel_thread, &th, &connection_attrib,
|
||||||
handle_rpl_parallel_thread, new_list[i]))
|
handle_rpl_parallel_thread, new_list[i]))
|
||||||
@ -2099,7 +2108,7 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
|
|||||||
{
|
{
|
||||||
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
|
||||||
while (rpt->current_owner == &e->rpl_threads[j])
|
while (rpt->current_owner == &e->rpl_threads[j])
|
||||||
mysql_cond_wait(&e->COND_parallel_entry, &rpt->LOCK_rpl_thread);
|
mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
|
||||||
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -74,6 +74,7 @@ struct rpl_parallel_thread {
|
|||||||
mysql_mutex_t LOCK_rpl_thread;
|
mysql_mutex_t LOCK_rpl_thread;
|
||||||
mysql_cond_t COND_rpl_thread;
|
mysql_cond_t COND_rpl_thread;
|
||||||
mysql_cond_t COND_rpl_thread_queue;
|
mysql_cond_t COND_rpl_thread_queue;
|
||||||
|
mysql_cond_t COND_rpl_thread_stop;
|
||||||
struct rpl_parallel_thread *next; /* For free list. */
|
struct rpl_parallel_thread *next; /* For free list. */
|
||||||
struct rpl_parallel_thread_pool *pool;
|
struct rpl_parallel_thread_pool *pool;
|
||||||
THD *thd;
|
THD *thd;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user