Cleanup of slave code:

- Added testing if connection is killed to shortcut reading of connection data
  This will allow us later in 10.2 to do a cleaner shutdown of slaves (less errors in the log)
- Add new status variables: Slaves_connected, Slaves_running and Slave_connections.
- Use MYSQL_SLAVE_NOT_RUN instead of 0 with slave_running.
- Don't print obvious extra warnings to the error log when slave is shut down normally.
This commit is contained in:
Monty 2016-01-03 13:20:07 +02:00
parent 4b4777ab63
commit 661a6d8906
9 changed files with 77 additions and 22 deletions

View File

@ -7299,6 +7299,38 @@ static int show_slave_running(THD *thd, SHOW_VAR *var, char *buff)
} }
/* How many slaves are connected to this master */
static int show_slaves_connected(THD *thd, SHOW_VAR *var, char *buff)
{
var->type= SHOW_LONGLONG;
var->value= buff;
mysql_mutex_lock(&LOCK_slave_list);
*((longlong *)buff)= slave_list.records;
mysql_mutex_unlock(&LOCK_slave_list);
return 0;
}
/* How many masters this slave is connected to */
static int show_slaves_running(THD *thd, SHOW_VAR *var, char *buff)
{
var->type= SHOW_LONGLONG;
var->value= buff;
mysql_mutex_lock(&LOCK_active_mi);
*((longlong *)buff)= master_info_index->any_slave_sql_running();
mysql_mutex_unlock(&LOCK_active_mi);
return 0;
}
static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff) static int show_slave_received_heartbeats(THD *thd, SHOW_VAR *var, char *buff)
{ {
Master_info *mi= NULL; Master_info *mi= NULL;
@ -7950,6 +7982,9 @@ SHOW_VAR status_vars[]= {
{"Select_scan", (char*) offsetof(STATUS_VAR, select_scan_count_), SHOW_LONG_STATUS}, {"Select_scan", (char*) offsetof(STATUS_VAR, select_scan_count_), SHOW_LONG_STATUS},
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_INT}, {"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_INT},
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
{"Slaves_connected", (char*) &show_slaves_connected, SHOW_SIMPLE_FUNC },
{"Slaves_running", (char*) &show_slaves_running, SHOW_SIMPLE_FUNC },
{"Slave_connections", (char*) offsetof(STATUS_VAR, com_register_slave), SHOW_LONG_STATUS},
{"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_SIMPLE_FUNC}, {"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_SIMPLE_FUNC},
{"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats, SHOW_SIMPLE_FUNC}, {"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats, SHOW_SIMPLE_FUNC},
{"Slave_retried_transactions",(char*)&slave_retried_transactions, SHOW_LONG}, {"Slave_retried_transactions",(char*)&slave_retried_transactions, SHOW_LONG},

View File

@ -34,6 +34,7 @@
HFTODO this must be hidden if we don't want client capabilities in HFTODO this must be hidden if we don't want client capabilities in
embedded library embedded library
*/ */
#include <my_global.h> #include <my_global.h>
#include <mysql.h> #include <mysql.h>
#include <mysql_com.h> #include <mysql_com.h>
@ -107,13 +108,12 @@ extern void query_cache_insert(const char *packet, ulong length,
unsigned pkt_nr); unsigned pkt_nr);
#endif // HAVE_QUERY_CACHE #endif // HAVE_QUERY_CACHE
#define update_statistics(A) A #define update_statistics(A) A
#else extern my_bool thd_net_is_killed();
#define update_statistics(A)
#endif
#ifdef MYSQL_SERVER
/* Additional instrumentation hooks for the server */ /* Additional instrumentation hooks for the server */
#include "mysql_com_server.h" #include "mysql_com_server.h"
#else
#define update_statistics(A)
#define thd_net_is_killed() 0
#endif #endif
#define TEST_BLOCKING 8 #define TEST_BLOCKING 8
@ -875,6 +875,16 @@ my_real_read(NET *net, size_t *complen,
DBUG_PRINT("info",("vio_read returned %ld errno: %d", DBUG_PRINT("info",("vio_read returned %ld errno: %d",
(long) length, vio_errno(net->vio))); (long) length, vio_errno(net->vio)));
if (i== 0 && thd_net_is_killed())
{
len= packet_error;
net->error= 0;
net->last_errno= ER_CONNECTION_KILLED;
MYSQL_SERVER_my_error(net->last_errno, MYF(0));
goto end;
}
#if !defined(__WIN__) && defined(MYSQL_SERVER) #if !defined(__WIN__) && defined(MYSQL_SERVER)
/* /*
We got an error that there was no data on the socket. We now set up We got an error that there was no data on the socket. We now set up

View File

@ -35,7 +35,8 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
rli(is_slave_recovery), port(MYSQL_PORT), rli(is_slave_recovery), port(MYSQL_PORT),
checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF), checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF),
connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0), connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0),
slave_running(0), slave_run_id(0), clock_diff_with_master(0), slave_running(MYSQL_SLAVE_NOT_RUN), slave_run_id(0),
clock_diff_with_master(0),
sync_counter(0), heartbeat_period(0), received_heartbeats(0), sync_counter(0), heartbeat_period(0), received_heartbeats(0),
master_id(0), prev_master_id(0), master_id(0), prev_master_id(0),
using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0),
@ -1273,23 +1274,24 @@ bool Master_info_index::give_error_if_slave_running()
The LOCK_active_mi must be held while calling this function. The LOCK_active_mi must be held while calling this function.
@return @return
TRUE If some slave SQL thread is running. 0 No Slave SQL thread is running
FALSE No slave SQL thread is running # Number of slave SQL thread running
*/ */
bool Master_info_index::any_slave_sql_running() uint Master_info_index::any_slave_sql_running()
{ {
uint count= 0;
DBUG_ENTER("any_slave_sql_running"); DBUG_ENTER("any_slave_sql_running");
if (!this) // master_info_index is set to NULL on server shutdown if (!this) // master_info_index is set to NULL on server shutdown
DBUG_RETURN(TRUE); DBUG_RETURN(count);
for (uint i= 0; i< master_info_hash.records; ++i) for (uint i= 0; i< master_info_hash.records; ++i)
{ {
Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i); Master_info *mi= (Master_info *)my_hash_element(&master_info_hash, i);
if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN) if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
DBUG_RETURN(TRUE); count++;
} }
DBUG_RETURN(FALSE); DBUG_RETURN(count);
} }

View File

@ -218,7 +218,7 @@ public:
Master_info *get_master_info(LEX_STRING *connection_name, Master_info *get_master_info(LEX_STRING *connection_name,
Sql_condition::enum_warning_level warning); Sql_condition::enum_warning_level warning);
bool give_error_if_slave_running(); bool give_error_if_slave_running();
bool any_slave_sql_running(); uint any_slave_sql_running();
bool start_all_slaves(THD *thd); bool start_all_slaves(THD *thd);
bool stop_all_slaves(THD *thd); bool stop_all_slaves(THD *thd);
}; };

View File

@ -63,7 +63,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0), last_master_timestamp(0), sql_thread_caught_up(true), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(), abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0), gtid_skip_flag(GTID_SKIP_NOT), inited(0), abort_slave(0), stop_for_until(0),
slave_running(0), until_condition(UNTIL_NONE), slave_running(MYSQL_SLAVE_NOT_RUN), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0), until_log_pos(0), retried_trans(0), executed_entries(0),
m_flags(0) m_flags(0)
{ {

View File

@ -112,7 +112,7 @@ static const char *reconnect_messages[SLAVE_RECON_ACT_MAX][SLAVE_RECON_MSG_MAX]=
{ {
{ {
"Waiting to reconnect after a failed registration on master", "Waiting to reconnect after a failed registration on master",
"Slave I/O thread killed while waitnig to reconnect after a failed \ "Slave I/O thread killed while waiting to reconnect after a failed \
registration on master", registration on master",
"Reconnecting after a failed registration on master", "Reconnecting after a failed registration on master",
"failed registering on master, reconnecting to try again, \ "failed registering on master, reconnecting to try again, \
@ -4040,8 +4040,7 @@ connected:
if (request_dump(thd, mysql, mi, &suppress_warnings)) if (request_dump(thd, mysql, mi, &suppress_warnings))
{ {
sql_print_error("Failed on request_dump()"); sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(mi, "Slave I/O thread killed while \ if (check_io_slave_killed(mi, NullS) ||
requesting master dump") ||
try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings, try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
reconnect_messages[SLAVE_RECON_ACT_DUMP])) reconnect_messages[SLAVE_RECON_ACT_DUMP]))
goto err; goto err;
@ -4059,6 +4058,7 @@ requesting master dump") ||
}); });
const char *event_buf; const char *event_buf;
mi->slave_running= MYSQL_SLAVE_RUN_READING;
DBUG_ASSERT(mi->last_error().number == 0); DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(mi)) while (!io_slave_killed(mi))
{ {
@ -4071,8 +4071,7 @@ requesting master dump") ||
*/ */
THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event);
event_len= read_event(mysql, mi, &suppress_warnings); event_len= read_event(mysql, mi, &suppress_warnings);
if (check_io_slave_killed(mi, "Slave I/O thread killed while \ if (check_io_slave_killed(mi, NullS))
reading event"))
goto err; goto err;
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT", DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
if (!retry_count_event) if (!retry_count_event)

View File

@ -1743,7 +1743,8 @@ void add_diff_to_status(STATUS_VAR *to_var, STATUS_VAR *from_var,
void THD::awake(killed_state state_to_set) void THD::awake(killed_state state_to_set)
{ {
DBUG_ENTER("THD::awake"); DBUG_ENTER("THD::awake");
DBUG_PRINT("enter", ("this: %p current_thd: %p", this, current_thd)); DBUG_PRINT("enter", ("this: %p current_thd: %p state: %d",
this, current_thd, (int) state_to_set));
THD_CHECK_SENTRY(this); THD_CHECK_SENTRY(this);
mysql_mutex_assert_owner(&LOCK_thd_data); mysql_mutex_assert_owner(&LOCK_thd_data);
@ -3799,6 +3800,12 @@ void thd_increment_bytes_sent(ulong length)
} }
} }
my_bool thd_net_is_killed()
{
THD *thd= current_thd;
return thd && thd->killed ? 1 : 0;
}
void thd_increment_bytes_received(ulong length) void thd_increment_bytes_received(ulong length)
{ {

View File

@ -656,6 +656,7 @@ typedef struct system_status_var
{ {
ulong com_other; ulong com_other;
ulong com_stat[(uint) SQLCOM_END]; ulong com_stat[(uint) SQLCOM_END];
ulong com_register_slave;
ulong created_tmp_disk_tables_; ulong created_tmp_disk_tables_;
ulong created_tmp_tables_; ulong created_tmp_tables_;
ulong ha_commit_count; ulong ha_commit_count;

View File

@ -1177,6 +1177,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
#ifdef HAVE_REPLICATION #ifdef HAVE_REPLICATION
case COM_REGISTER_SLAVE: case COM_REGISTER_SLAVE:
{ {
status_var_increment(thd->status_var.com_register_slave);
if (!register_slave(thd, (uchar*)packet, packet_length)) if (!register_slave(thd, (uchar*)packet, packet_length))
my_ok(thd); my_ok(thd);
break; break;