MDEV-9573 'Stop slave' hangs on replication slave

The reason for this is that stop slave takes LOCK_active_mi over the
whole operation while some slave operations will also need LOCK_active_mi
which causes deadlocks.

Fixed by introducing object counting for Master_info and not taking
LOCK_active_mi over stop slave or even stop_all_slaves()

Another benefit of this approach is that it allows:
- Multiple threads can run SHOW SLAVE STATUS at the same time
- START/STOP/RESET/SLAVE STATUS on a slave will not block other slaves
- Simpler interface for handling get_master_info()
- Added some missing unlock of 'log_lock' in error condtions
- Moved rpl_parallel_inactivate_pool(&global_rpl_thread_pool) to end
  of stop_slave() to not have to use LOCK_active_mi inside
  terminate_slave_threads()
- Changed argument for remove_master_info() to Master_info, as we always
  have this available
- Fixed core dump when doing FLUSH TABLES WITH READ LOCK and parallel
  replication. Problem was that waiting for pause_for_ftwrl was not done
  when deleting rpt->current_owner after a force_abort.
This commit is contained in:
Monty 2017-01-29 22:10:56 +02:00 committed by Sergei Golubchik
parent d5c54f3990
commit e65f667bb6
13 changed files with 539 additions and 345 deletions

View File

@ -1815,6 +1815,7 @@ int my_b_flush_io_cache(IO_CACHE *info,
It's currently safe to call this if one has called init_io_cache() It's currently safe to call this if one has called init_io_cache()
on the 'info' object, even if init_io_cache() failed. on the 'info' object, even if init_io_cache() failed.
This function is also safe to call twice with the same handle. This function is also safe to call twice with the same handle.
Note that info->file is not reset as the caller may still use ut for my_close()
RETURN RETURN
0 ok 0 ok
@ -1850,10 +1851,12 @@ int end_io_cache(IO_CACHE *info)
if (info->type == SEQ_READ_APPEND) if (info->type == SEQ_READ_APPEND)
{ {
/* Destroy allocated mutex */ /* Destroy allocated mutex */
info->type= TYPE_NOT_SET;
mysql_mutex_destroy(&info->append_buffer_lock); mysql_mutex_destroy(&info->append_buffer_lock);
} }
info->share= 0; info->share= 0;
info->type= TYPE_NOT_SET; /* Ensure that flush_io_cache() does nothing */
info->write_end= 0; /* Ensure that my_b_write() fails */
info->write_function= 0; /* my_b_write will crash if used */
DBUG_RETURN(error); DBUG_RETURN(error);
} /* end_io_cache */ } /* end_io_cache */

View File

@ -3958,12 +3958,7 @@ longlong Item_master_pos_wait::val_int()
else else
connection_name= thd->variables.default_master_connection; connection_name= thd->variables.default_master_connection;
mysql_mutex_lock(&LOCK_active_mi); if (!(mi= get_master_info(&connection_name, Sql_condition::WARN_LEVEL_WARN)))
if (master_info_index) // master_info_index is set to NULL on shutdown.
mi= master_info_index->get_master_info(&connection_name,
Sql_condition::WARN_LEVEL_WARN);
mysql_mutex_unlock(&LOCK_active_mi);
if (!mi)
goto err; goto err;
if ((event_count = mi->rli.wait_for_pos(thd, log_name, pos, timeout)) == -2) if ((event_count = mi->rli.wait_for_pos(thd, log_name, pos, timeout)) == -2)
@ -3971,6 +3966,7 @@ longlong Item_master_pos_wait::val_int()
null_value = 1; null_value = 1;
event_count=0; event_count=0;
} }
mi->release();
#endif #endif
return event_count; return event_count;

View File

@ -702,12 +702,15 @@ mysql_mutex_t
LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create, LOCK_delayed_insert, LOCK_delayed_status, LOCK_delayed_create,
LOCK_crypt, LOCK_crypt,
LOCK_global_system_variables, LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi, LOCK_user_conn, LOCK_slave_list,
LOCK_connection_count, LOCK_error_messages, LOCK_slave_init; LOCK_connection_count, LOCK_error_messages, LOCK_slave_init;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats, mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats; LOCK_global_table_stats, LOCK_global_index_stats;
/* This protects against changes in master_info_index */
mysql_mutex_t LOCK_active_mi;
/** /**
The below lock protects access to two global server variables: The below lock protects access to two global server variables:
max_prepared_stmt_count and prepared_stmt_count. These variables max_prepared_stmt_count and prepared_stmt_count. These variables
@ -1651,7 +1654,7 @@ static void close_connections(void)
mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list
Events::deinit(); Events::deinit();
end_slave(); slave_prepare_for_shutdown();
/* /*
Give threads time to die. Give threads time to die.
@ -1700,6 +1703,7 @@ static void close_connections(void)
DBUG_PRINT("quit",("Unlocking LOCK_thread_count")); DBUG_PRINT("quit",("Unlocking LOCK_thread_count"));
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
} }
end_slave();
/* All threads has now been aborted */ /* All threads has now been aborted */
DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count)); DBUG_PRINT("quit",("Waiting for threads to die (count=%u)",thread_count));
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
@ -7214,17 +7218,14 @@ static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff)
var->type= SHOW_MY_BOOL; var->type= SHOW_MY_BOOL;
var->value= buff; var->value= buff;
mysql_mutex_lock(&LOCK_active_mi);
if (master_info_index) if ((mi= get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_NOTE)))
{ {
mi= master_info_index->
get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_NOTE);
if (mi)
tmp= (my_bool) (mi->slave_running == MYSQL_SLAVE_RUN_READING && tmp= (my_bool) (mi->slave_running == MYSQL_SLAVE_RUN_READING &&
mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN); mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN);
mi->release();
} }
mysql_mutex_unlock(&LOCK_active_mi);
if (mi) if (mi)
*((my_bool *)buff)= tmp; *((my_bool *)buff)= tmp;
else else
@ -7256,38 +7257,26 @@ static int show_slaves_running(THD *thd, SHOW_VAR *var, char *buff)
{ {
var->type= SHOW_LONGLONG; var->type= SHOW_LONGLONG;
var->value= buff; var->value= buff;
mysql_mutex_lock(&LOCK_active_mi);
if (master_info_index) *((longlong *)buff)= any_slave_sql_running();
*((longlong *)buff)= master_info_index->any_slave_sql_running();
else
*((longlong *)buff)= 0;
mysql_mutex_unlock(&LOCK_active_mi);
return 0; return 0;
} }
static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff)
{ {
Master_info *mi= NULL; Master_info *mi;
longlong tmp;
LINT_INIT(tmp);
var->type= SHOW_LONGLONG; var->type= SHOW_LONGLONG;
var->value= buff; var->value= buff;
mysql_mutex_lock(&LOCK_active_mi);
if (master_info_index) if ((mi= get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_NOTE)))
{ {
mi= master_info_index-> *((longlong *)buff)= mi->received_heartbeats;
get_master_info(&thd->variables.default_master_connection, mi->release();
Sql_condition::WARN_LEVEL_NOTE);
if (mi)
tmp= mi->received_heartbeats;
} }
mysql_mutex_unlock(&LOCK_active_mi);
if (mi)
*((longlong *)buff)= tmp;
else else
var->type= SHOW_UNDEF; var->type= SHOW_UNDEF;
return 0; return 0;
@ -7297,23 +7286,16 @@ static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff)
static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff) static int show_heartbeat_period(THD *thd, SHOW_VAR *var, char *buff)
{ {
Master_info *mi= NULL; Master_info *mi= NULL;
float tmp;
LINT_INIT(tmp);
var->type= SHOW_CHAR; var->type= SHOW_CHAR;
var->value= buff; var->value= buff;
mysql_mutex_lock(&LOCK_active_mi);
if (master_info_index) if ((mi= get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_NOTE)))
{ {
mi= master_info_index-> sprintf(buff, "%.3f", mi->heartbeat_period);
get_master_info(&thd->variables.default_master_connection, mi->release();
Sql_condition::WARN_LEVEL_NOTE);
if (mi)
tmp= mi->heartbeat_period;
} }
mysql_mutex_unlock(&LOCK_active_mi);
if (mi)
sprintf(buff, "%.3f", tmp);
else else
var->type= SHOW_UNDEF; var->type= SHOW_UNDEF;
return 0; return 0;

View File

@ -40,7 +40,9 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
sync_counter(0), heartbeat_period(0), received_heartbeats(0), sync_counter(0), heartbeat_period(0), received_heartbeats(0),
master_id(0), prev_master_id(0), master_id(0), prev_master_id(0),
using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0),
gtid_reconnect_event_skip_count(0), gtid_event_seen(false) gtid_reconnect_event_skip_count(0), gtid_event_seen(false),
in_start_all_slaves(0), in_stop_all_slaves(0),
users(0), killed(0)
{ {
host[0] = 0; user[0] = 0; password[0] = 0; host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
@ -87,8 +89,27 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL); mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL);
} }
/**
Wait until no one is using Master_info
*/
void Master_info::wait_until_free()
{
mysql_mutex_lock(&sleep_lock);
killed= 1;
while (users)
mysql_cond_wait(&sleep_cond, &sleep_lock);
mysql_mutex_unlock(&sleep_lock);
}
/**
Delete master_info
*/
Master_info::~Master_info() Master_info::~Master_info()
{ {
wait_until_free();
rpl_filters.delete_element(connection_name.str, connection_name.length, rpl_filters.delete_element(connection_name.str, connection_name.length,
(void (*)(const char*, uchar*)) free_rpl_filter); (void (*)(const char*, uchar*)) free_rpl_filter);
my_free(connection_name.str); my_free(connection_name.str);
@ -709,8 +730,13 @@ uchar *get_key_master_info(Master_info *mi, size_t *length,
void free_key_master_info(Master_info *mi) void free_key_master_info(Master_info *mi)
{ {
DBUG_ENTER("free_key_master_info"); DBUG_ENTER("free_key_master_info");
/* Ensure that we are not in reset_slave while this is done */
lock_slave_threads(mi);
terminate_slave_threads(mi,SLAVE_FORCE_ALL); terminate_slave_threads(mi,SLAVE_FORCE_ALL);
/* We use 2 here instead of 1 just to make it easier when debugging */
mi->killed= 2;
end_master_info(mi); end_master_info(mi);
unlock_slave_threads(mi);
delete mi; delete mi;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
@ -867,9 +893,27 @@ Master_info_index::Master_info_index()
index_file.file= -1; index_file.file= -1;
} }
/**
Free all connection threads
This is done during early stages of shutdown
to give connection threads and slave threads time
to die before ~Master_info_index is called
*/
void Master_info_index::free_connections()
{
my_hash_reset(&master_info_hash);
}
/**
Free all connection threads and free structures
*/
Master_info_index::~Master_info_index() Master_info_index::~Master_info_index()
{ {
/* This will close connection for all objects in the cache */
my_hash_free(&master_info_hash); my_hash_free(&master_info_hash);
end_io_cache(&index_file); end_io_cache(&index_file);
if (index_file.file >= 0) if (index_file.file >= 0)
@ -1001,7 +1045,6 @@ bool Master_info_index::init_all_master_info()
if (master_info_index->add_master_info(mi, FALSE)) if (master_info_index->add_master_info(mi, FALSE))
DBUG_RETURN(1); DBUG_RETURN(1);
succ_num++; succ_num++;
unlock_slave_threads(mi);
if (!opt_skip_slave_start) if (!opt_skip_slave_start)
{ {
@ -1022,6 +1065,7 @@ bool Master_info_index::init_all_master_info()
(int) connection_name.length, (int) connection_name.length,
connection_name.str); connection_name.str);
} }
unlock_slave_threads(mi);
} }
} }
@ -1072,6 +1116,71 @@ bool Master_info_index::write_master_name_to_index_file(LEX_STRING *name,
} }
/**
Get Master_info for a connection and lock the object from deletion
@param
connection_name Connection name
warning WARN_LEVEL_NOTE -> Don't print anything
WARN_LEVEL_WARN -> Issue warning if not exists
WARN_LEVEL_ERROR-> Issue error if not exists
*/
Master_info *get_master_info(LEX_STRING *connection_name,
Sql_condition::enum_warning_level warning)
{
Master_info *mi;
DBUG_ENTER("get_master_info");
/* Protect against inserts into hash */
mysql_mutex_lock(&LOCK_active_mi);
/*
The following can only be true during shutdown when slave has been killed
but some other threads are still trying to access slave statistics.
*/
if (unlikely(!master_info_index))
{
if (warning != Sql_condition::WARN_LEVEL_NOTE)
my_error(WARN_NO_MASTER_INFO,
MYF(warning == Sql_condition::WARN_LEVEL_WARN ?
ME_JUST_WARNING : 0),
(int) connection_name->length, connection_name->str);
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(0);
}
if ((mi= master_info_index->get_master_info(connection_name, warning)))
{
/*
We have to use sleep_lock here. If we would use LOCK_active_mi
then we would take locks in wrong order in Master_info::release()
*/
mysql_mutex_lock(&mi->sleep_lock);
mi->users++;
DBUG_PRINT("info",("users: %d", mi->users));
mysql_mutex_unlock(&mi->sleep_lock);
}
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(mi);
}
/**
Release master info.
Signals ~Master_info that it's now safe to delete it
*/
void Master_info::release()
{
mysql_mutex_lock(&sleep_lock);
if (!--users && killed)
{
/* Signal ~Master_info that it's ok to now free it */
mysql_cond_signal(&sleep_cond);
}
mysql_mutex_unlock(&sleep_lock);
}
/** /**
Get Master_info for a connection Get Master_info for a connection
@ -1094,8 +1203,6 @@ Master_info_index::get_master_info(LEX_STRING *connection_name,
("connection_name: '%.*s'", (int) connection_name->length, ("connection_name: '%.*s'", (int) connection_name->length,
connection_name->str)); connection_name->str));
mysql_mutex_assert_owner(&LOCK_active_mi);
/* Make name lower case for comparison */ /* Make name lower case for comparison */
res= strmake(buff, connection_name->str, connection_name->length); res= strmake(buff, connection_name->str, connection_name->length);
my_casedn_str(system_charset_info, buff); my_casedn_str(system_charset_info, buff);
@ -1187,13 +1294,11 @@ bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
atomic atomic
*/ */
bool Master_info_index::remove_master_info(LEX_STRING *name) bool Master_info_index::remove_master_info(Master_info *mi)
{ {
Master_info* mi;
DBUG_ENTER("remove_master_info"); DBUG_ENTER("remove_master_info");
mysql_mutex_assert_owner(&LOCK_active_mi);
if ((mi= get_master_info(name, Sql_condition::WARN_LEVEL_WARN)))
{
// Delete Master_info and rewrite others to file // Delete Master_info and rewrite others to file
if (!my_hash_delete(&master_info_hash, (uchar*) mi)) if (!my_hash_delete(&master_info_hash, (uchar*) mi))
{ {
@ -1229,63 +1334,83 @@ bool Master_info_index::remove_master_info(LEX_STRING *name)
tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i); tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
write_master_name_to_index_file(&tmp_mi->connection_name, 0); write_master_name_to_index_file(&tmp_mi->connection_name, 0);
} }
my_sync(index_file_nr, MYF(MY_WME)); if (my_sync(index_file_nr, MYF(MY_WME)))
} DBUG_RETURN(TRUE);
} }
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
/** /**
Master_info_index::give_error_if_slave_running() give_error_if_slave_running()
@param
already_locked 0 if we need to lock, 1 if we have LOCK_active_mi_locked
@return @return
TRUE If some slave is running. An error is printed TRUE If some slave is running. An error is printed
FALSE No slave is running FALSE No slave is running
*/ */
bool Master_info_index::give_error_if_slave_running() bool give_error_if_slave_running(bool already_locked)
{ {
bool ret= 0;
DBUG_ENTER("give_error_if_slave_running"); DBUG_ENTER("give_error_if_slave_running");
mysql_mutex_assert_owner(&LOCK_active_mi);
for (uint i= 0; i< master_info_hash.records; ++i) if (!already_locked)
mysql_mutex_lock(&LOCK_active_mi);
if (!master_info_index)
{
my_error(ER_SERVER_SHUTDOWN, MYF(0));
ret= 1;
}
else
{
HASH *hash= &master_info_index->master_info_hash;
for (uint i= 0; i< hash->records; ++i)
{ {
Master_info *mi; Master_info *mi;
mi= (Master_info *) my_hash_element(&master_info_hash, i); mi= (Master_info *) my_hash_element(hash, i);
if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
{ {
my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length, my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
mi->connection_name.str); mi->connection_name.str);
DBUG_RETURN(TRUE); ret= 1;
break;
} }
} }
DBUG_RETURN(FALSE); }
if (!already_locked)
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(ret);
} }
/** /**
Master_info_index::any_slave_sql_running() any_slave_sql_running()
The LOCK_active_mi must be held while calling this function.
@return @return
0 No Slave SQL thread is running 0 No Slave SQL thread is running
# Number of slave SQL thread running # Number of slave SQL thread running
*/ */
uint Master_info_index::any_slave_sql_running() uint any_slave_sql_running()
{ {
uint count= 0; uint count= 0;
DBUG_ENTER("any_slave_sql_running"); DBUG_ENTER("any_slave_sql_running");
mysql_mutex_assert_owner(&LOCK_active_mi);
for (uint i= 0; i< master_info_hash.records; ++i) mysql_mutex_lock(&LOCK_active_mi);
if (likely(master_info_index)) // Not shutdown
{ {
Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i); HASH *hash= &master_info_index->master_info_hash;
for (uint i= 0; i< hash->records; ++i)
{
Master_info *mi= (Master_info *)my_hash_element(hash, i);
if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
count++; count++;
} }
}
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(count); DBUG_RETURN(count);
} }
@ -1298,15 +1423,25 @@ uint Master_info_index::any_slave_sql_running()
@return @return
TRUE Error TRUE Error
FALSE Everything ok. FALSE Everything ok.
This code is written so that we don't keep LOCK_active_mi active
while we are starting a slave.
*/ */
bool Master_info_index::start_all_slaves(THD *thd) bool Master_info_index::start_all_slaves(THD *thd)
{ {
bool result= FALSE; bool result= FALSE;
DBUG_ENTER("warn_if_slave_running"); DBUG_ENTER("start_all_slaves");
mysql_mutex_assert_owner(&LOCK_active_mi); mysql_mutex_assert_owner(&LOCK_active_mi);
for (uint i= 0; i< master_info_hash.records; ++i) for (uint i= 0; i< master_info_hash.records; i++)
{
Master_info *mi;
mi= (Master_info *) my_hash_element(&master_info_hash, i);
mi->in_start_all_slaves= 0;
}
for (uint i= 0; i< master_info_hash.records; )
{ {
int error; int error;
Master_info *mi; Master_info *mi;
@ -1316,10 +1451,23 @@ bool Master_info_index::start_all_slaves(THD *thd)
Try to start all slaves that are configured (host is defined) Try to start all slaves that are configured (host is defined)
and are not already running and are not already running
*/ */
if ((mi->slave_running == MYSQL_SLAVE_NOT_RUN || if (!((mi->slave_running == MYSQL_SLAVE_NOT_RUN ||
!mi->rli.slave_running) && *mi->host) !mi->rli.slave_running) && *mi->host) ||
mi->in_start_all_slaves)
{ {
if ((error= start_slave(thd, mi, 1))) i++;
continue;
}
mi->in_start_all_slaves= 1;
mysql_mutex_lock(&mi->sleep_lock);
mi->users++; // Mark used
mysql_mutex_unlock(&mi->sleep_lock);
mysql_mutex_unlock(&LOCK_active_mi);
error= start_slave(thd, mi, 1);
mi->release();
mysql_mutex_lock(&LOCK_active_mi);
if (error)
{ {
my_error(ER_CANT_START_STOP_SLAVE, MYF(0), my_error(ER_CANT_START_STOP_SLAVE, MYF(0),
"START", "START",
@ -1334,7 +1482,9 @@ bool Master_info_index::start_all_slaves(THD *thd)
ER_SLAVE_STARTED, ER(ER_SLAVE_STARTED), ER_SLAVE_STARTED, ER(ER_SLAVE_STARTED),
(int) mi->connection_name.length, (int) mi->connection_name.length,
mi->connection_name.str); mi->connection_name.str);
} /* Restart from first element as master_info_hash may have changed */
i= 0;
continue;
} }
DBUG_RETURN(result); DBUG_RETURN(result);
} }
@ -1348,23 +1498,46 @@ bool Master_info_index::start_all_slaves(THD *thd)
@return @return
TRUE Error TRUE Error
FALSE Everything ok. FALSE Everything ok.
This code is written so that we don't keep LOCK_active_mi active
while we are stopping a slave.
*/ */
bool Master_info_index::stop_all_slaves(THD *thd) bool Master_info_index::stop_all_slaves(THD *thd)
{ {
bool result= FALSE; bool result= FALSE;
DBUG_ENTER("warn_if_slave_running"); DBUG_ENTER("stop_all_slaves");
mysql_mutex_assert_owner(&LOCK_active_mi); mysql_mutex_assert_owner(&LOCK_active_mi);
for (uint i= 0; i< master_info_hash.records; ++i) for (uint i= 0; i< master_info_hash.records; i++)
{
Master_info *mi;
mi= (Master_info *) my_hash_element(&master_info_hash, i);
mi->in_stop_all_slaves= 0;
}
for (uint i= 0; i< master_info_hash.records ;)
{ {
int error; int error;
Master_info *mi; Master_info *mi;
mi= (Master_info *) my_hash_element(&master_info_hash, i); mi= (Master_info *) my_hash_element(&master_info_hash, i);
if ((mi->slave_running != MYSQL_SLAVE_NOT_RUN || if (!(mi->slave_running != MYSQL_SLAVE_NOT_RUN ||
mi->rli.slave_running)) mi->rli.slave_running) ||
mi->in_stop_all_slaves)
{ {
if ((error= stop_slave(thd, mi, 1))) i++;
continue;
}
mi->in_stop_all_slaves= 1; // Protection for loops
mysql_mutex_lock(&mi->sleep_lock);
mi->users++; // Mark used
mysql_mutex_unlock(&mi->sleep_lock);
mysql_mutex_unlock(&LOCK_active_mi);
error= stop_slave(thd, mi, 1);
mi->release();
mysql_mutex_lock(&LOCK_active_mi);
if (error)
{ {
my_error(ER_CANT_START_STOP_SLAVE, MYF(0), my_error(ER_CANT_START_STOP_SLAVE, MYF(0),
"STOP", "STOP",
@ -1379,7 +1552,9 @@ bool Master_info_index::stop_all_slaves(THD *thd)
ER_SLAVE_STOPPED, ER(ER_SLAVE_STOPPED), ER_SLAVE_STOPPED, ER(ER_SLAVE_STOPPED),
(int) mi->connection_name.length, (int) mi->connection_name.length,
mi->connection_name.str); mi->connection_name.str);
} /* Restart from first element as master_info_hash may have changed */
i= 0;
continue;
} }
DBUG_RETURN(result); DBUG_RETURN(result);
} }

View File

@ -79,6 +79,8 @@ class Master_info : public Slave_reporting_capability
{ {
return opt_slave_parallel_threads > 0; return opt_slave_parallel_threads > 0;
} }
void release();
void wait_until_free();
/* the variables below are needed because we can change masters on the fly */ /* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
@ -182,7 +184,11 @@ class Master_info : public Slave_reporting_capability
uint64 gtid_reconnect_event_skip_count; uint64 gtid_reconnect_event_skip_count;
/* gtid_event_seen is false until we receive first GTID event from master. */ /* gtid_event_seen is false until we receive first GTID event from master. */
bool gtid_event_seen; bool gtid_event_seen;
bool in_start_all_slaves, in_stop_all_slaves;
uint users; /* Active user for object */
uint killed;
}; };
int init_master_info(Master_info* mi, const char* master_info_fname, int init_master_info(Master_info* mi, const char* master_info_fname,
const char* slave_info_fname, const char* slave_info_fname,
bool abort_if_no_master_info_file, bool abort_if_no_master_info_file,
@ -218,13 +224,12 @@ public:
bool check_duplicate_master_info(LEX_STRING *connection_name, bool check_duplicate_master_info(LEX_STRING *connection_name,
const char *host, uint port); const char *host, uint port);
bool add_master_info(Master_info *mi, bool write_to_file); bool add_master_info(Master_info *mi, bool write_to_file);
bool remove_master_info(LEX_STRING *connection_name); bool remove_master_info(Master_info *mi);
Master_info *get_master_info(LEX_STRING *connection_name, Master_info *get_master_info(LEX_STRING *connection_name,
Sql_condition::enum_warning_level warning); Sql_condition::enum_warning_level warning);
bool give_error_if_slave_running();
uint any_slave_sql_running();
bool start_all_slaves(THD *thd); bool start_all_slaves(THD *thd);
bool stop_all_slaves(THD *thd); bool stop_all_slaves(THD *thd);
void free_connections();
}; };
@ -237,6 +242,8 @@ public:
}; };
Master_info *get_master_info(LEX_STRING *connection_name,
Sql_condition::enum_warning_level warning);
bool check_master_connection_name(LEX_STRING *name); bool check_master_connection_name(LEX_STRING *name);
void create_logfile_name_with_suffix(char *res_file_name, size_t length, void create_logfile_name_with_suffix(char *res_file_name, size_t length,
const char *info_file, const char *info_file,
@ -246,7 +253,8 @@ void create_logfile_name_with_suffix(char *res_file_name, size_t length,
uchar *get_key_master_info(Master_info *mi, size_t *length, uchar *get_key_master_info(Master_info *mi, size_t *length,
my_bool not_used __attribute__((unused))); my_bool not_used __attribute__((unused)));
void free_key_master_info(Master_info *mi); void free_key_master_info(Master_info *mi);
uint any_slave_sql_running();
bool give_error_if_slave_running(bool already_lock);
#endif /* HAVE_REPLICATION */ #endif /* HAVE_REPLICATION */
#endif /* RPL_MI_H */ #endif /* RPL_MI_H */

View File

@ -1312,6 +1312,29 @@ handle_rpl_parallel_thread(void *arg)
} }
if (!in_event_group) if (!in_event_group)
{ {
/* If we are in a FLUSH TABLES FOR READ LOCK, wait for it */
while (rpt->current_entry && rpt->pause_for_ftwrl)
{
/*
We are currently in the delicate process of pausing parallel
replication while FLUSH TABLES WITH READ LOCK is starting. We must
not de-allocate the thread (setting rpt->current_owner= NULL) until
rpl_unpause_after_ftwrl() has woken us up.
*/
rpl_parallel_entry *e= rpt->current_entry;
/*
Ensure that we will unblock rpl_pause_for_ftrwl()
e->pause_sub_id may be LONGLONG_MAX if rpt->current_entry has changed
*/
DBUG_ASSERT(e->pause_sub_id == (uint64)ULONGLONG_MAX ||
e->last_committed_sub_id >= e->pause_sub_id);
mysql_mutex_lock(&e->LOCK_parallel_entry);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
if (rpt->pause_for_ftwrl)
mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry);
mysql_mutex_unlock(&e->LOCK_parallel_entry);
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
}
rpt->current_owner= NULL; rpt->current_owner= NULL;
/* Tell wait_for_done() that we are done, if it is waiting. */ /* Tell wait_for_done() that we are done, if it is waiting. */
if (likely(rpt->current_entry) && if (likely(rpt->current_entry) &&
@ -1369,6 +1392,28 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
if ((res= pool_mark_busy(pool, current_thd))) if ((res= pool_mark_busy(pool, current_thd)))
return res; return res;
/* Protect against parallel pool resizes */
if (pool->count == new_count)
{
pool_mark_not_busy(pool);
return 0;
}
/*
If we are about to delete pool, do an extra check that there are no new
slave threads running since we marked pool busy
*/
if (!new_count)
{
if (any_slave_sql_running())
{
DBUG_PRINT("warning",
("SQL threads running while trying to reset parallel pool"));
pool_mark_not_busy(pool);
return 0; // Ok to not resize pool
}
}
/* /*
Allocate the new list of threads up-front. Allocate the new list of threads up-front.
That way, if we fail half-way, we only need to free whatever we managed That way, if we fail half-way, we only need to free whatever we managed
@ -1382,7 +1427,7 @@ rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool,
{ {
my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) + my_error(ER_OUTOFMEMORY, MYF(0), (int(new_count*sizeof(*new_list) +
new_count*sizeof(*rpt_array)))); new_count*sizeof(*rpt_array))));
goto err;; goto err;
} }
for (i= 0; i < new_count; ++i) for (i= 0; i < new_count; ++i)
@ -1503,6 +1548,20 @@ err:
return 1; return 1;
} }
/*
Deactivate the parallel replication thread pool, if there are now no more
SQL threads running.
*/
int rpl_parallel_resize_pool_if_no_slaves(void)
{
/* master_info_index is set to NULL on shutdown */
if (opt_slave_parallel_threads > 0 && !any_slave_sql_running() &&
master_info_index)
return rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
return 0;
}
int int
rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
@ -1814,6 +1873,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
{ {
rpl_parallel_thread *rpt; rpl_parallel_thread *rpt;
DBUG_ASSERT(count > 0);
mysql_mutex_lock(&LOCK_rpl_thread_pool); mysql_mutex_lock(&LOCK_rpl_thread_pool);
while (unlikely(busy) || !(rpt= free_list)) while (unlikely(busy) || !(rpt= free_list))
mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool);

View File

@ -342,6 +342,7 @@ struct rpl_parallel {
extern struct rpl_parallel_thread_pool global_rpl_thread_pool; extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
extern int rpl_parallel_resize_pool_if_no_slaves(void);
extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);

View File

@ -614,6 +614,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
if (!mi->inited) if (!mi->inited)
DBUG_RETURN(0); /* successfully do nothing */ DBUG_RETURN(0); /* successfully do nothing */
int error,force_all = (thread_mask & SLAVE_FORCE_ALL); int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
int retval= 0;
mysql_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock; mysql_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock(); mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock();
@ -633,24 +634,19 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
skip_lock)) && skip_lock)) &&
!force_all) !force_all)
DBUG_RETURN(error); DBUG_RETURN(error);
retval= error;
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
DBUG_PRINT("info",("Flushing relay-log info file.")); DBUG_PRINT("info",("Flushing relay-log info file."));
if (current_thd) if (current_thd)
THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file); THD_STAGE_INFO(current_thd, stage_flushing_relay_log_info_file);
if (flush_relay_log_info(&mi->rli)) if (flush_relay_log_info(&mi->rli) ||
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); my_sync(mi->rli.info_fd, MYF(MY_WME)))
retval= ER_ERROR_DURING_FLUSH_LOGS;
if (my_sync(mi->rli.info_fd, MYF(MY_WME)))
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
} }
if (opt_slave_parallel_threads > 0 &&
master_info_index &&// master_info_index is set to NULL on server shutdown
!master_info_index->any_slave_sql_running())
rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)) if (thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL))
{ {
DBUG_PRINT("info",("Terminating IO thread")); DBUG_PRINT("info",("Terminating IO thread"));
@ -661,25 +657,26 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
skip_lock)) && skip_lock)) &&
!force_all) !force_all)
DBUG_RETURN(error); DBUG_RETURN(error);
if (!retval)
retval= error;
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
DBUG_PRINT("info",("Flushing relay log and master info file.")); DBUG_PRINT("info",("Flushing relay log and master info file."));
if (current_thd) if (current_thd)
THD_STAGE_INFO(current_thd, stage_flushing_relay_log_and_master_info_repository); THD_STAGE_INFO(current_thd, stage_flushing_relay_log_and_master_info_repository);
if (flush_master_info(mi, TRUE, FALSE)) if (likely(mi->fd >= 0))
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); {
if (flush_master_info(mi, TRUE, FALSE) || my_sync(mi->fd, MYF(MY_WME)))
retval= ER_ERROR_DURING_FLUSH_LOGS;
}
if (mi->rli.relay_log.is_open() && if (mi->rli.relay_log.is_open() &&
my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME))) my_sync(mi->rli.relay_log.get_log_file()->file, MYF(MY_WME)))
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS); retval= ER_ERROR_DURING_FLUSH_LOGS;
if (my_sync(mi->fd, MYF(MY_WME)))
DBUG_RETURN(ER_ERROR_DURING_FLUSH_LOGS);
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
} }
DBUG_RETURN(0); DBUG_RETURN(retval);
} }
@ -956,9 +953,6 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
mi); mi);
if (!error && (thread_mask & SLAVE_SQL)) if (!error && (thread_mask & SLAVE_SQL))
{ {
if (opt_slave_parallel_threads > 0)
error= rpl_parallel_activate_pool(&global_rpl_thread_pool);
if (!error)
error= start_slave_thread( error= start_slave_thread(
#ifdef HAVE_PSI_INTERFACE #ifdef HAVE_PSI_INTERFACE
key_thread_slave_sql, key_thread_slave_sql,
@ -975,10 +969,18 @@ int start_slave_threads(bool need_slave_mutex, bool wait_for_start,
/* /*
Release slave threads at time of executing shutdown. Kill slaves preparing for shutdown
*/
SYNOPSIS void slave_prepare_for_shutdown()
end_slave() {
mysql_mutex_lock(&LOCK_active_mi);
master_info_index->free_connections();
mysql_mutex_unlock(&LOCK_active_mi);
}
/*
Release slave threads at time of executing shutdown.
*/ */
void end_slave() void end_slave()
@ -996,7 +998,10 @@ void end_slave()
startup parameter to the server was wrong. startup parameter to the server was wrong.
*/ */
mysql_mutex_lock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_active_mi);
/* This will call terminate_slave_threads() on all connections */ /*
master_info_index should not have any threads anymore as they where
killed as part of slave_prepare_for_shutdown()
*/
delete master_info_index; delete master_info_index;
master_info_index= 0; master_info_index= 0;
active_mi= 0; active_mi= 0;
@ -2657,7 +2662,9 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
mysql_mutex_lock(&mi->data_lock); mysql_mutex_lock(&mi->data_lock);
mysql_mutex_lock(&mi->rli.data_lock); mysql_mutex_lock(&mi->rli.data_lock);
/* err_lock is to protect mi->last_error() */
mysql_mutex_lock(&mi->err_lock); mysql_mutex_lock(&mi->err_lock);
/* err_lock is to protect mi->rli.last_error() */
mysql_mutex_lock(&mi->rli.err_lock); mysql_mutex_lock(&mi->rli.err_lock);
protocol->store(mi->host, &my_charset_bin); protocol->store(mi->host, &my_charset_bin);
protocol->store(mi->user, &my_charset_bin); protocol->store(mi->user, &my_charset_bin);
@ -4493,6 +4500,16 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT; rli->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
pthread_detach_this_thread(); pthread_detach_this_thread();
if (opt_slave_parallel_threads > 0 &&
rpl_parallel_activate_pool(&global_rpl_thread_pool))
{
mysql_cond_broadcast(&rli->start_cond);
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
"Failed during parallel slave pool activation");
goto err_during_init;
}
if (init_slave_thread(thd, mi, SLAVE_THD_SQL)) if (init_slave_thread(thd, mi, SLAVE_THD_SQL))
{ {
/* /*
@ -4862,17 +4879,7 @@ err_during_init:
DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5);); DBUG_EXECUTE_IF("simulate_slave_delay_at_terminate_bug38694", sleep(5););
mysql_mutex_unlock(&rli->run_lock); // tell the world we are done mysql_mutex_unlock(&rli->run_lock); // tell the world we are done
/* rpl_parallel_resize_pool_if_no_slaves();
Deactivate the parallel replication thread pool, if there are now no more
SQL threads running. Do this here, when we have released all locks, but
while our THD (and current_thd) is still valid.
*/
mysql_mutex_lock(&LOCK_active_mi);
if (opt_slave_parallel_threads > 0 &&
master_info_index &&// master_info_index is set to NULL on server shutdown
!master_info_index->any_slave_sql_running())
rpl_parallel_inactivate_pool(&global_rpl_thread_pool);
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
delete thd; delete thd;

View File

@ -213,6 +213,7 @@ bool rpl_master_erroneous_autoinc(THD* thd);
const char *print_slave_db_safe(const char *db); const char *print_slave_db_safe(const char *db);
void skip_load_data_infile(NET* net); void skip_load_data_infile(NET* net);
void slave_prepare_for_shutdown();
void end_slave(); /* release slave threads */ void end_slave(); /* release slave threads */
void close_active_mi(); /* clean up slave threads data */ void close_active_mi(); /* clean up slave threads data */
void clear_until_condition(Relay_log_info* rli); void clear_until_condition(Relay_log_info* rli);

View File

@ -2207,7 +2207,7 @@ err:
int int
mysql_execute_command(THD *thd) mysql_execute_command(THD *thd)
{ {
int res= FALSE; int res= 0;
int up_result= 0; int up_result= 0;
LEX *lex= thd->lex; LEX *lex= thd->lex;
/* first SELECT_LEX (have special meaning for many of non-SELECTcommands) */ /* first SELECT_LEX (have special meaning for many of non-SELECTcommands) */
@ -2702,10 +2702,17 @@ case SQLCOM_PREPARE:
if (check_global_access(thd, SUPER_ACL)) if (check_global_access(thd, SUPER_ACL))
goto error; goto error;
/*
In this code it's ok to use LOCK_active_mi as we are adding new things
into master_info_index
*/
mysql_mutex_lock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_active_mi);
if (!master_info_index) if (!master_info_index)
{
mysql_mutex_unlock(&LOCK_active_mi);
my_error(ER_SERVER_SHUTDOWN, MYF(0));
goto error; goto error;
}
mi= master_info_index->get_master_info(&lex_mi->connection_name, mi= master_info_index->get_master_info(&lex_mi->connection_name,
Sql_condition::WARN_LEVEL_NOTE); Sql_condition::WARN_LEVEL_NOTE);
@ -2734,7 +2741,7 @@ case SQLCOM_PREPARE:
If new master was not added, we still need to free mi. If new master was not added, we still need to free mi.
*/ */
if (master_info_added) if (master_info_added)
master_info_index->remove_master_info(&lex_mi->connection_name); master_info_index->remove_master_info(mi);
else else
delete mi; delete mi;
} }
@ -2752,22 +2759,24 @@ case SQLCOM_PREPARE:
/* Accept one of two privileges */ /* Accept one of two privileges */
if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL)) if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL))
goto error; goto error;
mysql_mutex_lock(&LOCK_active_mi);
if (lex->verbose) if (lex->verbose)
{
mysql_mutex_lock(&LOCK_active_mi);
res= show_all_master_info(thd); res= show_all_master_info(thd);
mysql_mutex_unlock(&LOCK_active_mi);
}
else else
{ {
LEX_MASTER_INFO *lex_mi= &thd->lex->mi; LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
Master_info *mi; Master_info *mi;
mi= master_info_index->get_master_info(&lex_mi->connection_name, if ((mi= get_master_info(&lex_mi->connection_name,
Sql_condition::WARN_LEVEL_ERROR); Sql_condition::WARN_LEVEL_ERROR)))
if (mi != NULL)
{ {
res= show_master_info(thd, mi, 0); res= show_master_info(thd, mi, 0);
mi->release();
} }
} }
mysql_mutex_unlock(&LOCK_active_mi);
break; break;
} }
case SQLCOM_SHOW_MASTER_STAT: case SQLCOM_SHOW_MASTER_STAT:
@ -3091,22 +3100,23 @@ end_with_restore_list:
load_error= rpl_load_gtid_slave_state(thd); load_error= rpl_load_gtid_slave_state(thd);
mysql_mutex_lock(&LOCK_active_mi); /*
We don't need to ensure that only one user is using master_info
if ((mi= (master_info_index-> as start_slave is protected against simultaneous usage
get_master_info(&lex_mi->connection_name, */
Sql_condition::WARN_LEVEL_ERROR)))) if ((mi= get_master_info(&lex_mi->connection_name,
Sql_condition::WARN_LEVEL_ERROR)))
{ {
if (load_error) if (load_error)
{ {
/* /*
We cannot start a slave using GTID if we cannot load the GTID position We cannot start a slave using GTID if we cannot load the
from the mysql.gtid_slave_pos table. But we can allow non-GTID GTID position from the mysql.gtid_slave_pos table. But we
replication (useful eg. during upgrade). can allow non-GTID replication (useful eg. during upgrade).
*/ */
if (mi->using_gtid != Master_info::USE_GTID_NO) if (mi->using_gtid != Master_info::USE_GTID_NO)
{ {
mysql_mutex_unlock(&LOCK_active_mi); mi->release();
break; break;
} }
else else
@ -3114,8 +3124,8 @@ end_with_restore_list:
} }
if (!start_slave(thd, mi, 1 /* net report*/)) if (!start_slave(thd, mi, 1 /* net report*/))
my_ok(thd); my_ok(thd);
mi->release();
} }
mysql_mutex_unlock(&LOCK_active_mi);
break; break;
} }
case SQLCOM_SLAVE_STOP: case SQLCOM_SLAVE_STOP:
@ -3145,13 +3155,17 @@ end_with_restore_list:
} }
lex_mi= &thd->lex->mi; lex_mi= &thd->lex->mi;
mysql_mutex_lock(&LOCK_active_mi); if ((mi= get_master_info(&lex_mi->connection_name,
if ((mi= (master_info_index-> Sql_condition::WARN_LEVEL_ERROR)))
get_master_info(&lex_mi->connection_name, {
Sql_condition::WARN_LEVEL_ERROR)))) if (stop_slave(thd, mi, 1/* net report*/))
if (!stop_slave(thd, mi, 1/* net report*/)) res= 1;
mi->release();
if (rpl_parallel_resize_pool_if_no_slaves())
res= 1;
if (!res)
my_ok(thd); my_ok(thd);
mysql_mutex_unlock(&LOCK_active_mi); }
break; break;
} }
case SQLCOM_SLAVE_ALL_START: case SQLCOM_SLAVE_ALL_START:
@ -4322,6 +4336,8 @@ end_with_restore_list:
if (!res) if (!res)
my_ok(thd); my_ok(thd);
} }
else
res= 1; // reload_acl_and_cache failed
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
if (lex->type & REFRESH_READ_LOCK) if (lex->type & REFRESH_READ_LOCK)
rpl_unpause_after_ftwrl(thd); rpl_unpause_after_ftwrl(thd);

View File

@ -174,11 +174,8 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options,
slave is not likely to have the same connection names. slave is not likely to have the same connection names.
*/ */
tmp_write_to_binlog= 0; tmp_write_to_binlog= 0;
mysql_mutex_lock(&LOCK_active_mi);
if (master_info_index) if (!(mi= (get_master_info(&connection_name,
{
if (!(mi= (master_info_index->
get_master_info(&connection_name,
Sql_condition::WARN_LEVEL_ERROR)))) Sql_condition::WARN_LEVEL_ERROR))))
{ {
result= 1; result= 1;
@ -189,9 +186,8 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options,
if (rotate_relay_log(mi)) if (rotate_relay_log(mi))
*write_to_binlog= -1; *write_to_binlog= -1;
mysql_mutex_unlock(&mi->data_lock); mysql_mutex_unlock(&mi->data_lock);
mi->release();
} }
}
mysql_mutex_unlock(&LOCK_active_mi);
#endif #endif
} }
#ifdef HAVE_QUERY_CACHE #ifdef HAVE_QUERY_CACHE
@ -349,28 +345,34 @@ bool reload_acl_and_cache(THD *thd, unsigned long long options,
LEX_MASTER_INFO* lex_mi= &thd->lex->mi; LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
Master_info *mi; Master_info *mi;
tmp_write_to_binlog= 0; tmp_write_to_binlog= 0;
mysql_mutex_lock(&LOCK_active_mi);
if (master_info_index) if (!(mi= get_master_info(&lex_mi->connection_name,
{ Sql_condition::WARN_LEVEL_ERROR)))
if (!(mi= (master_info_index->
get_master_info(&lex_mi->connection_name,
Sql_condition::WARN_LEVEL_ERROR))))
{ {
result= 1; result= 1;
} }
else if (reset_slave(thd, mi)) else
{ {
/* The following will fail if slave is running */
if (reset_slave(thd, mi))
{
mi->release();
/* NOTE: my_error() has been already called by reset_slave(). */ /* NOTE: my_error() has been already called by reset_slave(). */
result= 1; result= 1;
} }
else if (mi->connection_name.length && thd->lex->reset_slave_info.all) else if (mi->connection_name.length && thd->lex->reset_slave_info.all)
{ {
/* If not default connection and 'all' is used */ /* If not default connection and 'all' is used */
master_info_index->remove_master_info(&mi->connection_name); mi->release();
} mysql_mutex_lock(&LOCK_active_mi);
} if (master_info_index->remove_master_info(mi))
result= 1;
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_unlock(&LOCK_active_mi);
} }
else
mi->release();
}
}
#endif #endif
if (options & REFRESH_USER_RESOURCES) if (options & REFRESH_USER_RESOURCES)
reset_mqh((LEX_USER *) NULL, 0); /* purecov: inspected */ reset_mqh((LEX_USER *) NULL, 0); /* purecov: inspected */

View File

@ -3265,8 +3265,8 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
LEX_MASTER_INFO* lex_mi= &thd->lex->mi; LEX_MASTER_INFO* lex_mi= &thd->lex->mi;
DBUG_ENTER("change_master"); DBUG_ENTER("change_master");
mysql_mutex_assert_owner(&LOCK_active_mi);
DBUG_ASSERT(master_info_index); DBUG_ASSERT(master_info_index);
mysql_mutex_assert_owner(&LOCK_active_mi);
*master_info_added= false; *master_info_added= false;
/* /*
@ -3646,7 +3646,6 @@ bool mysql_show_binlog_events(THD* thd)
int old_max_allowed_packet= thd->variables.max_allowed_packet; int old_max_allowed_packet= thd->variables.max_allowed_packet;
Master_info *mi= 0; Master_info *mi= 0;
LOG_INFO linfo; LOG_INFO linfo;
DBUG_ENTER("mysql_show_binlog_events"); DBUG_ENTER("mysql_show_binlog_events");
Log_event::init_show_field_list(&field_list); Log_event::init_show_field_list(&field_list);
@ -3674,13 +3673,9 @@ bool mysql_show_binlog_events(THD* thd)
} }
else /* showing relay log contents */ else /* showing relay log contents */
{ {
mysql_mutex_lock(&LOCK_active_mi); if (!(mi= get_master_info(&thd->variables.default_master_connection,
if (!master_info_index ||
!(mi= master_info_index->
get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_ERROR))) Sql_condition::WARN_LEVEL_ERROR)))
{ {
mysql_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
} }
binary_log= &(mi->rli.relay_log); binary_log= &(mi->rli.relay_log);
@ -3700,7 +3695,7 @@ bool mysql_show_binlog_events(THD* thd)
if (mi) if (mi)
{ {
/* We can unlock the mutex as we have a lock on the file */ /* We can unlock the mutex as we have a lock on the file */
mysql_mutex_unlock(&LOCK_active_mi); mi->release();
mi= 0; mi= 0;
} }
@ -3722,6 +3717,7 @@ bool mysql_show_binlog_events(THD* thd)
goto err; goto err;
} }
/* These locks is here to enable syncronization with log_in_use() */
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = &linfo; thd->current_linfo = &linfo;
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
@ -3799,7 +3795,7 @@ bool mysql_show_binlog_events(THD* thd)
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
} }
else if (mi) else if (mi)
mysql_mutex_unlock(&LOCK_active_mi); mi->release();
// Check that linfo is still on the function scope. // Check that linfo is still on the function scope.
DEBUG_SYNC(thd, "after_show_binlog_events"); DEBUG_SYNC(thd, "after_show_binlog_events");
@ -3820,8 +3816,9 @@ err:
else else
my_eof(thd); my_eof(thd);
/* These locks is here to enable syncronization with log_in_use() */
mysql_mutex_lock(&LOCK_thread_count); mysql_mutex_lock(&LOCK_thread_count);
thd->current_linfo = 0; thd->current_linfo= 0;
mysql_mutex_unlock(&LOCK_thread_count); mysql_mutex_unlock(&LOCK_thread_count);
thd->variables.max_allowed_packet= old_max_allowed_packet; thd->variables.max_allowed_packet= old_max_allowed_packet;
DBUG_RETURN(ret); DBUG_RETURN(ret);

View File

@ -1527,7 +1527,6 @@ bool
Sys_var_gtid_slave_pos::do_check(THD *thd, set_var *var) Sys_var_gtid_slave_pos::do_check(THD *thd, set_var *var)
{ {
String str, *res; String str, *res;
bool running;
DBUG_ASSERT(var->type == OPT_GLOBAL); DBUG_ASSERT(var->type == OPT_GLOBAL);
@ -1538,11 +1537,7 @@ Sys_var_gtid_slave_pos::do_check(THD *thd, set_var *var)
return true; return true;
} }
mysql_mutex_lock(&LOCK_active_mi); if (give_error_if_slave_running(0))
running= (!master_info_index ||
master_info_index->give_error_if_slave_running());
mysql_mutex_unlock(&LOCK_active_mi);
if (running)
return true; return true;
if (!(res= var->value->val_str(&str))) if (!(res= var->value->val_str(&str)))
return true; return true;
@ -1580,7 +1575,7 @@ Sys_var_gtid_slave_pos::global_update(THD *thd, set_var *var)
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_active_mi);
if (!master_info_index || master_info_index->give_error_if_slave_running()) if (give_error_if_slave_running(1))
err= true; err= true;
else else
err= rpl_gtid_pos_update(thd, var->save_result.string_value.str, err= rpl_gtid_pos_update(thd, var->save_result.string_value.str,
@ -1766,16 +1761,7 @@ Sys_var_last_gtid::session_value_ptr(THD *thd, LEX_STRING *base)
static bool static bool
check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var) check_slave_parallel_threads(sys_var *self, THD *thd, set_var *var)
{ {
bool running; return give_error_if_slave_running(0);
mysql_mutex_lock(&LOCK_active_mi);
running= (!master_info_index ||
master_info_index->give_error_if_slave_running());
mysql_mutex_unlock(&LOCK_active_mi);
if (running)
return true;
return false;
} }
static bool static bool
@ -1784,10 +1770,7 @@ fix_slave_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
bool err; bool err;
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); err= give_error_if_slave_running(0);
err= (!master_info_index ||
master_info_index->give_error_if_slave_running());
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
return err; return err;
@ -1810,16 +1793,7 @@ static Sys_var_ulong Sys_slave_parallel_threads(
static bool static bool
check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var)
{ {
bool running; return give_error_if_slave_running(0);
mysql_mutex_lock(&LOCK_active_mi);
running= (!master_info_index ||
master_info_index->give_error_if_slave_running());
mysql_mutex_unlock(&LOCK_active_mi);
if (running)
return true;
return false;
} }
static bool static bool
@ -1828,13 +1802,10 @@ fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
bool running; bool running;
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); running= give_error_if_slave_running(0);
running= (!master_info_index ||
master_info_index->give_error_if_slave_running());
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
return running ? true : false; return running;
} }
@ -1865,16 +1836,7 @@ static Sys_var_ulong Sys_slave_parallel_max_queued(
static bool static bool
check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var) check_gtid_ignore_duplicates(sys_var *self, THD *thd, set_var *var)
{ {
bool running; return give_error_if_slave_running(0);
mysql_mutex_lock(&LOCK_active_mi);
running= (!master_info_index ||
master_info_index->give_error_if_slave_running());
mysql_mutex_unlock(&LOCK_active_mi);
if (running)
return true;
return false;
} }
static bool static bool
@ -1883,13 +1845,10 @@ fix_gtid_ignore_duplicates(sys_var *self, THD *thd, enum_var_type type)
bool running; bool running;
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); running= give_error_if_slave_running(0);
running= (!master_info_index ||
master_info_index->give_error_if_slave_running());
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
return running ? true : false; return running;
} }
@ -2837,10 +2796,8 @@ Sys_var_replicate_events_marked_for_skip::global_update(THD *thd, set_var *var)
DBUG_ENTER("Sys_var_replicate_events_marked_for_skip::global_update"); DBUG_ENTER("Sys_var_replicate_events_marked_for_skip::global_update");
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); if (!give_error_if_slave_running(0))
if (master_info_index && !master_info_index->give_error_if_slave_running())
result= Sys_var_enum::global_update(thd, var); result= Sys_var_enum::global_update(thd, var);
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
DBUG_RETURN(result); DBUG_RETURN(result);
} }
@ -4112,18 +4069,15 @@ bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var)
Master_info *mi; Master_info *mi;
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
if (!var->base.length) // no base name if (!var->base.length) // no base name
{ {
mi= master_info_index-> mi= get_master_info(&thd->variables.default_master_connection,
get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_ERROR); Sql_condition::WARN_LEVEL_ERROR);
} }
else // has base name else // has base name
{ {
mi= master_info_index-> mi= get_master_info(&var->base,
get_master_info(&var->base,
Sql_condition::WARN_LEVEL_WARN); Sql_condition::WARN_LEVEL_WARN);
} }
@ -4140,9 +4094,9 @@ bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var)
{ {
result= set_filter_value(var->save_result.string_value.str, mi); result= set_filter_value(var->save_result.string_value.str, mi);
} }
mi->release();
} }
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
return result; return result;
} }
@ -4150,8 +4104,10 @@ bool Sys_var_rpl_filter::global_update(THD *thd, set_var *var)
bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi) bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi)
{ {
bool status= true; bool status= true;
Rpl_filter* rpl_filter= mi ? mi->rpl_filter : global_rpl_filter; Rpl_filter* rpl_filter= mi->rpl_filter;
/* Proctect against other threads */
mysql_mutex_lock(&LOCK_active_mi);
switch (opt_id) { switch (opt_id) {
case OPT_REPLICATE_DO_DB: case OPT_REPLICATE_DO_DB:
status= rpl_filter->set_do_db(value); status= rpl_filter->set_do_db(value);
@ -4172,7 +4128,7 @@ bool Sys_var_rpl_filter::set_filter_value(const char *value, Master_info *mi)
status= rpl_filter->set_wild_ignore_table(value); status= rpl_filter->set_wild_ignore_table(value);
break; break;
} }
mysql_mutex_unlock(&LOCK_active_mi);
return status; return status;
} }
@ -4185,29 +4141,24 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base)
Rpl_filter *rpl_filter; Rpl_filter *rpl_filter;
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi);
if (!base->length) // no base name if (!base->length) // no base name
{ {
mi= master_info_index-> mi= get_master_info(&thd->variables.default_master_connection,
get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_ERROR); Sql_condition::WARN_LEVEL_ERROR);
} }
else // has base name else // has base name
{ mi= get_master_info(base, Sql_condition::WARN_LEVEL_WARN);
mi= master_info_index->
get_master_info(base,
Sql_condition::WARN_LEVEL_WARN);
}
mysql_mutex_lock(&LOCK_global_system_variables);
if (!mi) if (!mi)
{ {
mysql_mutex_unlock(&LOCK_active_mi); mysql_mutex_lock(&LOCK_global_system_variables);
return 0; return 0;
} }
rpl_filter= mi->rpl_filter; rpl_filter= mi->rpl_filter;
tmp.length(0); tmp.length(0);
mysql_mutex_lock(&LOCK_active_mi);
switch (opt_id) { switch (opt_id) {
case OPT_REPLICATE_DO_DB: case OPT_REPLICATE_DO_DB:
rpl_filter->get_do_db(&tmp); rpl_filter->get_do_db(&tmp);
@ -4228,9 +4179,12 @@ uchar *Sys_var_rpl_filter::global_value_ptr(THD *thd, LEX_STRING *base)
rpl_filter->get_wild_ignore_table(&tmp); rpl_filter->get_wild_ignore_table(&tmp);
break; break;
} }
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables);
mi->release();
ret= (uchar *) thd->strmake(tmp.ptr(), tmp.length()); ret= (uchar *) thd->strmake(tmp.ptr(), tmp.length());
mysql_mutex_unlock(&LOCK_active_mi);
return ret; return ret;
} }
@ -4301,17 +4255,12 @@ get_master_info_ulonglong_value(THD *thd, ptrdiff_t offset)
Master_info *mi; Master_info *mi;
ulonglong res= 0; // Default value ulonglong res= 0; // Default value
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); if ((mi= get_master_info(&thd->variables.default_master_connection,
mi= master_info_index-> Sql_condition::WARN_LEVEL_WARN)))
get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_WARN);
if (mi)
{ {
mysql_mutex_lock(&mi->rli.data_lock);
res= *((ulonglong*) (((uchar*) mi) + master_info_offset)); res= *((ulonglong*) (((uchar*) mi) + master_info_offset));
mysql_mutex_unlock(&mi->rli.data_lock); mi->release();
} }
mysql_mutex_unlock(&LOCK_active_mi);
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
return res; return res;
} }
@ -4326,19 +4275,16 @@ bool update_multi_source_variable(sys_var *self_var, THD *thd,
if (type == OPT_GLOBAL) if (type == OPT_GLOBAL)
mysql_mutex_unlock(&LOCK_global_system_variables); mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(&LOCK_active_mi); if ((mi= (get_master_info(&thd->variables.default_master_connection,
mi= master_info_index-> Sql_condition::WARN_LEVEL_ERROR))))
get_master_info(&thd->variables.default_master_connection,
Sql_condition::WARN_LEVEL_ERROR);
if (mi)
{ {
mysql_mutex_lock(&mi->rli.run_lock); mysql_mutex_lock(&mi->rli.run_lock);
mysql_mutex_lock(&mi->rli.data_lock); mysql_mutex_lock(&mi->rli.data_lock);
result= self->update_variable(thd, mi); result= self->update_variable(thd, mi);
mysql_mutex_unlock(&mi->rli.data_lock); mysql_mutex_unlock(&mi->rli.data_lock);
mysql_mutex_unlock(&mi->rli.run_lock); mysql_mutex_unlock(&mi->rli.run_lock);
mi->release();
} }
mysql_mutex_unlock(&LOCK_active_mi);
if (type == OPT_GLOBAL) if (type == OPT_GLOBAL)
mysql_mutex_lock(&LOCK_global_system_variables); mysql_mutex_lock(&LOCK_global_system_variables);
return result; return result;