MDEV-5262, MDEV-5914, MDEV-5941, MDEV-6020: Deadlocks during parallel replication causing replication to fail.

After-review changes.

For this patch in 10.0, we do not introduce a new public storage engine API,
we just fix the InnoDB/XtraDB issues. In 10.1, we will make a better public
API that can be used for all storage engines (MDEV-6429).

Eliminate the background thread that did deadlock kills asynchroneously.
Instead, we ensure that the InnoDB/XtraDB code can handle doing the kill from
inside the deadlock detection code (when thd_report_wait_for() needs to kill a
later thread to resolve a deadlock).

(We preserve the part of the original patch that introduces dedicated mutex
and condition for the slave init thread, to remove the abuse of
LOCK_thread_count for start/stop synchronisation of the slave init thread).
This commit is contained in:
Kristian Nielsen 2014-07-08 12:54:47 +02:00
parent e5149fa0d9
commit 98fc5b3af8
25 changed files with 382 additions and 265 deletions

View File

@ -622,6 +622,7 @@ void **thd_ha_data(const MYSQL_THD thd, const struct handlerton *hton);
void thd_storage_lock_wait(MYSQL_THD thd, long long value);
int thd_tx_isolation(const MYSQL_THD thd);
int thd_tx_is_read_only(const MYSQL_THD thd);
int thd_rpl_is_parallel(const MYSQL_THD thd);
/**
Create a temporary file.
@ -729,80 +730,6 @@ void thd_set_ha_data(MYSQL_THD thd, const struct handlerton *hton,
*/
void thd_wakeup_subsequent_commits(MYSQL_THD thd, int wakeup_error);
/*
Used by a storage engine to report that one transaction THD is about to
go to wait for a transactional lock held by another transactions OTHER_THD.
This is used for parallel replication, where transactions are required to
commit in the same order on the slave as they did on the master. If the
transactions on the slave can encounter lock conflicts on the slave that did
not exist on the master, this can cause deadlocks.
The storage engine can report such conflicting locks using this call. This
will allow parallel replication to detect such conflicts and resolve the
deadlock (by killing the second transaction to release the locks that the
first is waiting for, and then later re-try the second killed transaction).
The storage engine should not report false positives. That is, it should not
report any lock waits that do not actually require one transaction to wait
for the other. Nor should it report waits for locks that will be released
before the commit of the other transactions.
*/
void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
/*
This function can optionally be called to check if thd_report_wait_for()
needs to be called for waits done by a given transaction.
If this function returns false for a given thd, there is no need to do any
calls to thd_report_wait_for() on that thd.
This call is optional; it is safe to call thd_report_wait_for() in any case.
This call can be used to save some redundant calls to thd_report_wait_for()
if desired. (This is unlikely to matter much unless there are _lots_ of
waits to report, as the overhead of thd_report_wait_for() is small).
*/
int thd_need_wait_for(const MYSQL_THD thd);
/*
This function can be called by storage engines to check if the commit order
of two transactions has already been decided by the upper layer. This
happens in parallel replication, where the commit order is forced to be the
same on the slave as it was originally on the master.
If this function returns false, it means that such commit order will be
enforced. This allows the storage engine to optionally omit gap lock waitss
or similar measures that would otherwise be needed to ensure that
transactions would be serialised in a way that would cause a commit order
that is correct for binlogging for statement-based replication.
If this function returns true, normal locking should be done as required by
the binlogging and transaction isolation level in effect.
*/
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
/*
If the storage engine detects a deadlock, and needs to choose a victim
transaction to roll back, it can call this function to ask the upper
server layer for which of two possible transactions is prefered to be
aborted and rolled back.
In parallel replication, if two transactions are running in parallel and
one is fixed to commit before the other, then the one that commits later
will be prefered as the victim - chosing the early transaction as a victim
will not resolve the deadlock anyway, as the later transaction still needs
to wait for the earlier to commit.
Otherwise, a transaction that uses only transactional tables, and can thus
be safely rolled back, will be prefered as a deadlock victim over a
transaction that also modified non-transactional (eg. MyISAM) tables.
The return value is -1 if the first transaction is prefered as a deadlock
victim, 1 if the second transaction is prefered, or 0 for no preference (in
which case the storage engine can make the choice as it prefers).
*/
int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);
#ifdef __cplusplus
}
#endif

View File

@ -303,6 +303,7 @@ void **thd_ha_data(const void* thd, const struct handlerton *hton);
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
@ -313,10 +314,6 @@ void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
struct mysql_event_general
{
unsigned int event_subclass;

View File

@ -303,6 +303,7 @@ void **thd_ha_data(const void* thd, const struct handlerton *hton);
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
@ -313,10 +314,6 @@ void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
#include <mysql/plugin_auth_common.h>
typedef struct st_plugin_vio_info
{

View File

@ -256,6 +256,7 @@ void **thd_ha_data(const void* thd, const struct handlerton *hton);
void thd_storage_lock_wait(void* thd, long long value);
int thd_tx_isolation(const void* thd);
int thd_tx_is_read_only(const void* thd);
int thd_rpl_is_parallel(const void* thd);
int mysql_tmpfile(const char *prefix);
unsigned long thd_get_thread_id(const void* thd);
void thd_get_xid(const void* thd, MYSQL_XID *xid);
@ -266,10 +267,6 @@ void *thd_get_ha_data(const void* thd, const struct handlerton *hton);
void thd_set_ha_data(void* thd, const struct handlerton *hton,
const void *ha_data);
void thd_wakeup_subsequent_commits(void* thd, int wakeup_error);
void thd_report_wait_for(const void* thd, void *other_thd);
int thd_need_wait_for(const void* thd);
int thd_need_ordering_with(const void* thd, const void* other_thd);
int thd_deadlock_victim_preference(const void* thd1, const void* thd2);
enum enum_ftparser_mode
{
MYSQL_FTPARSER_SIMPLE_MODE= 0,

View File

@ -44,16 +44,6 @@ processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
name thread/sql/slave_background
type BACKGROUND
processlist_user NULL
processlist_host NULL
processlist_db NULL
processlist_command NULL
processlist_info NULL
unified_parent_thread_id unified parent_thread_id
role NULL
instrumented YES
CREATE TEMPORARY TABLE t1 AS
SELECT thread_id FROM performance_schema.threads
WHERE name LIKE 'thread/sql%';
@ -115,5 +105,4 @@ parent_thread_name child_thread_name
thread/sql/event_scheduler thread/sql/event_worker
thread/sql/main thread/sql/one_connection
thread/sql/main thread/sql/signal_handler
thread/sql/main thread/sql/slave_background
thread/sql/one_connection thread/sql/event_scheduler

View File

@ -4115,7 +4115,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay_log_info* rli, bool included)
included= false;
break;
}
if (!included && 0 == strcmp(ir->name, rli->group_relay_log_name))
if (!included && !strcmp(ir->name, rli->group_relay_log_name))
break;
if (!next)
{
@ -9369,7 +9369,7 @@ int TC_LOG_BINLOG::recover(LOG_INFO *linfo, const char *last_log_name,
file= -1;
}
if (0 == strcmp(linfo->log_file_name, last_log_name))
if (!strcmp(linfo->log_file_name, last_log_name))
break; // No more files to do
if ((file= open_binlog(&log, linfo->log_file_name, &errmsg)) < 0)
{

View File

@ -7328,6 +7328,13 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
uint64 sub_id= 0;
Relay_log_info const *rli= rgi->rli;
/*
XID_EVENT works like a COMMIT statement. And it also updates the
mysql.gtid_slave_pos table with the GTID of the current transaction.
Therefore, it acts much like a normal SQL statement, so we need to do
mysql_reset_thd_for_next_command() as if starting a new statement.
*/
mysql_reset_thd_for_next_command(thd);
/*
Record any GTID in the same transaction, so slave state is transactionally

View File

@ -368,7 +368,7 @@ static I_List<THD> thread_cache;
static bool binlog_format_used= false;
LEX_STRING opt_init_connect, opt_init_slave;
static mysql_cond_t COND_thread_cache, COND_flush_thread_cache;
mysql_cond_t COND_slave_background;
mysql_cond_t COND_slave_init;
static DYNAMIC_ARRAY all_options;
/* Global variables */
@ -707,7 +707,7 @@ mysql_mutex_t
LOCK_crypt,
LOCK_global_system_variables,
LOCK_user_conn, LOCK_slave_list, LOCK_active_mi,
LOCK_connection_count, LOCK_error_messages, LOCK_slave_background;
LOCK_connection_count, LOCK_error_messages, LOCK_slave_init;
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
LOCK_global_table_stats, LOCK_global_index_stats;
@ -882,7 +882,7 @@ PSI_mutex_key key_LOCK_stats,
PSI_mutex_key key_LOCK_gtid_waiting;
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered,
key_LOCK_slave_background;
key_LOCK_slave_init;
PSI_mutex_key key_TABLE_SHARE_LOCK_share;
static PSI_mutex_info all_server_mutexes[]=
@ -945,7 +945,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
{ &key_LOCK_prepare_ordered, "LOCK_prepare_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_commit_ordered, "LOCK_commit_ordered", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_background, "LOCK_slave_background", PSI_FLAG_GLOBAL},
{ &key_LOCK_slave_init, "LOCK_slave_init", PSI_FLAG_GLOBAL},
{ &key_LOG_INFO_lock, "LOG_INFO::lock", 0},
{ &key_LOCK_thread_count, "LOCK_thread_count", PSI_FLAG_GLOBAL},
{ &key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_GLOBAL},
@ -1000,7 +1000,7 @@ PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
key_COND_rpl_thread_pool,
key_COND_parallel_entry, key_COND_group_commit_orderer,
key_COND_prepare_ordered, key_COND_slave_background;
key_COND_prepare_ordered, key_COND_slave_init;
PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
static PSI_cond_info all_server_conds[]=
@ -1049,7 +1049,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
{ &key_COND_slave_background, "COND_slave_background", 0},
{ &key_COND_slave_init, "COND_slave_init", 0},
{ &key_COND_wait_gtid, "COND_wait_gtid", 0},
{ &key_COND_gtid_ignore_duplicates, "COND_gtid_ignore_duplicates", 0}
};
@ -1057,7 +1057,7 @@ static PSI_cond_info all_server_conds[]=
PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;
key_thread_slave_init, key_rpl_parallel_thread;
static PSI_thread_info all_server_threads[]=
{
@ -1083,7 +1083,7 @@ static PSI_thread_info all_server_threads[]=
{ &key_thread_main, "main", PSI_FLAG_GLOBAL},
{ &key_thread_one_connection, "one_connection", 0},
{ &key_thread_signal_hand, "signal_handler", PSI_FLAG_GLOBAL},
{ &key_thread_slave_background, "slave_background", PSI_FLAG_GLOBAL},
{ &key_thread_slave_init, "slave_init", PSI_FLAG_GLOBAL},
{ &key_rpl_parallel_thread, "rpl_parallel_thread", 0}
};
@ -2177,8 +2177,8 @@ static void clean_up_mutexes()
mysql_mutex_destroy(&LOCK_prepare_ordered);
mysql_cond_destroy(&COND_prepare_ordered);
mysql_mutex_destroy(&LOCK_commit_ordered);
mysql_mutex_destroy(&LOCK_slave_background);
mysql_cond_destroy(&COND_slave_background);
mysql_mutex_destroy(&LOCK_slave_init);
mysql_cond_destroy(&COND_slave_init);
DBUG_VOID_RETURN;
}
@ -4393,9 +4393,9 @@ static int init_thread_environment()
mysql_cond_init(key_COND_prepare_ordered, &COND_prepare_ordered, NULL);
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_LOCK_slave_background, &LOCK_slave_background,
mysql_mutex_init(key_LOCK_slave_init, &LOCK_slave_init,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_slave_background, &COND_slave_background, NULL);
mysql_cond_init(key_COND_slave_init, &COND_slave_init, NULL);
#ifdef HAVE_OPENSSL
mysql_mutex_init(key_LOCK_des_key_file,
@ -9477,8 +9477,6 @@ PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room i
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other master connection to process GTID received on multiple master connections", 0};
PSI_stage_info stage_slave_background_process_request= { 0, "Processing requests", 0};
PSI_stage_info stage_slave_background_wait_request= { 0, "Waiting for requests", 0};
#ifdef HAVE_PSI_INTERFACE
@ -9602,9 +9600,7 @@ PSI_stage_info *all_server_stages[]=
& stage_waiting_to_get_readlock,
& stage_master_gtid_wait_primary,
& stage_master_gtid_wait,
& stage_gtid_wait_other_connection,
& stage_slave_background_process_request,
& stage_slave_background_wait_request
& stage_gtid_wait_other_connection
};
PSI_socket_key key_socket_tcpip, key_socket_unix, key_socket_client_connection;

View File

@ -309,8 +309,8 @@ extern PSI_cond_key key_COND_wait_gtid, key_COND_gtid_ignore_duplicates;
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
key_thread_handle_manager, key_thread_kill_server, key_thread_main,
key_thread_one_connection, key_thread_signal_hand,
key_thread_slave_background, key_rpl_parallel_thread;
key_thread_one_connection, key_thread_signal_hand, key_thread_slave_init,
key_rpl_parallel_thread;
extern PSI_file_key key_file_binlog, key_file_binlog_index, key_file_casetest,
key_file_dbopt, key_file_des_key_file, key_file_ERRMSG, key_select_to_file,
@ -451,8 +451,6 @@ extern PSI_stage_info stage_waiting_for_room_in_worker_thread;
extern PSI_stage_info stage_master_gtid_wait_primary;
extern PSI_stage_info stage_master_gtid_wait;
extern PSI_stage_info stage_gtid_wait_other_connection;
extern PSI_stage_info stage_slave_background_process_request;
extern PSI_stage_info stage_slave_background_wait_request;
#ifdef HAVE_PSI_STATEMENT_INTERFACE
/**
@ -521,7 +519,7 @@ extern mysql_mutex_t
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
LOCK_global_system_variables, LOCK_user_conn,
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count,
LOCK_slave_background;
LOCK_slave_init;
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
#ifdef HAVE_OPENSSL
extern mysql_mutex_t LOCK_des_key_file;
@ -532,7 +530,7 @@ extern mysql_rwlock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern mysql_rwlock_t LOCK_system_variables_hash;
extern mysql_cond_t COND_thread_count;
extern mysql_cond_t COND_manager;
extern mysql_cond_t COND_slave_background;
extern mysql_cond_t COND_slave_init;
extern int32 thread_running;
extern int32 thread_count;
extern my_atomic_rwlock_t thread_running_lock, thread_count_lock;

View File

@ -240,7 +240,6 @@ convert_kill_to_deadlock_error(rpl_group_info *rgi)
rgi->killed_for_retry)
{
thd->clear_error();
thd->get_stmt_da()->reset_diagnostics_area();
my_error(ER_LOCK_DEADLOCK, MYF(0));
rgi->killed_for_retry= false;
thd->reset_killed();
@ -325,7 +324,7 @@ do_retry:
register_wait_for_prior_event_group_commit(rgi, entry);
mysql_mutex_unlock(&entry->LOCK_parallel_entry);
strcpy(log_name, ir->name);
strmake_buf(log_name, ir->name);
if ((fd= open_binlog(&rlog, log_name, &errmsg)) <0)
{
err= 1;

View File

@ -1362,7 +1362,7 @@ Relay_log_info::alloc_inuse_relaylog(const char *name)
my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir));
return 1;
}
strcpy(ir->name, name);
strmake_buf(ir->name, name);
if (!inuse_relaylog_list)
inuse_relaylog_list= ir;
@ -1564,6 +1564,7 @@ rpl_group_info::reinit(Relay_log_info *rli)
trans_retries= 0;
last_event_start_time= 0;
gtid_sub_id= 0;
commit_id= 0;
gtid_pending= false;
worker_error= 0;
row_stmt_start_timestamp= 0;
@ -1608,6 +1609,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
rgi->current_gtid.server_id= gev->server_id;
rgi->current_gtid.domain_id= gev->domain_id;
rgi->current_gtid.seq_no= gev->seq_no;
rgi->commit_id= gev->commit_id;
rgi->gtid_pending= true;
return 0;
}

View File

@ -170,6 +170,7 @@ public:
*/
inuse_relaylog *inuse_relaylog_list;
inuse_relaylog *last_inuse_relaylog;
/* Lock used to protect inuse_relaylog::dequeued_count */
my_atomic_rwlock_t inuse_relaylog_atomic_lock;
/*
@ -532,6 +533,7 @@ struct rpl_group_info
*/
uint64 gtid_sub_id;
rpl_gtid current_gtid;
uint64 commit_id;
/*
This is used to keep transaction commit order.
We will signal this when we commit, and can register it to wait for the

View File

@ -287,22 +287,13 @@ static void init_slave_psi_keys(void)
#endif /* HAVE_PSI_INTERFACE */
static bool slave_background_thread_running;
static bool slave_background_thread_stop;
static bool slave_background_thread_gtid_loaded;
struct slave_background_kill_t {
slave_background_kill_t *next;
THD *to_kill;
} *slave_background_kill_list;
static bool slave_init_thread_running;
pthread_handler_t
handle_slave_background(void *arg __attribute__((unused)))
handle_slave_init(void *arg __attribute__((unused)))
{
THD *thd;
PSI_stage_info old_stage;
bool stop;
my_thread_init();
thd= new THD;
@ -310,7 +301,7 @@ handle_slave_background(void *arg __attribute__((unused)))
mysql_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
mysql_mutex_unlock(&LOCK_thread_count);
thd->system_thread = SYSTEM_THREAD_SLAVE_BACKGROUND;
thd->system_thread = SYSTEM_THREAD_SLAVE_INIT;
thd->store_globals();
thd->security_ctx->skip_grants();
thd->set_command(COM_DAEMON);
@ -323,126 +314,49 @@ handle_slave_background(void *arg __attribute__((unused)))
thd->get_stmt_da()->sql_errno(),
thd->get_stmt_da()->message());
mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_gtid_loaded= true;
mysql_cond_broadcast(&COND_slave_background);
THD_STAGE_INFO(thd, stage_slave_background_process_request);
do
{
slave_background_kill_t *kill_list;
thd->ENTER_COND(&COND_slave_background, &LOCK_slave_background,
&stage_slave_background_wait_request,
&old_stage);
for (;;)
{
stop= abort_loop || thd->killed || slave_background_thread_stop;
kill_list= slave_background_kill_list;
if (stop || kill_list)
break;
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
}
slave_background_kill_list= NULL;
thd->EXIT_COND(&old_stage);
while (kill_list)
{
slave_background_kill_t *p = kill_list;
kill_list= p->next;
mysql_mutex_lock(&p->to_kill->LOCK_thd_data);
p->to_kill->awake(KILL_CONNECTION);
mysql_mutex_unlock(&p->to_kill->LOCK_thd_data);
my_free(p);
}
mysql_mutex_lock(&LOCK_slave_background);
} while (!stop);
slave_background_thread_running= false;
mysql_cond_broadcast(&COND_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
mysql_mutex_lock(&LOCK_thread_count);
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
my_thread_end();
mysql_mutex_lock(&LOCK_slave_init);
slave_init_thread_running= false;
mysql_cond_broadcast(&COND_slave_init);
mysql_mutex_unlock(&LOCK_slave_init);
return 0;
}
void
slave_background_kill_request(THD *to_kill)
{
slave_background_kill_t *p=
(slave_background_kill_t *)my_malloc(sizeof(*p), MYF(MY_WME));
if (p)
{
p->to_kill= to_kill;
to_kill->rgi_slave->killed_for_retry= true;
mysql_mutex_lock(&LOCK_slave_background);
p->next= slave_background_kill_list;
slave_background_kill_list= p;
mysql_mutex_unlock(&LOCK_slave_background);
mysql_cond_signal(&COND_slave_background);
}
}
/*
Start the slave background thread.
Start the slave init thread.
This thread is currently used for two purposes:
1. To load the GTID state from mysql.gtid_slave_pos at server start; reading
from table requires valid THD, which is otherwise not available during
server init.
2. To kill worker thread transactions during parallel replication, when a
storage engine attempts to take an errorneous conflicting lock that would
cause a deadlock. Killing is done asynchroneously, as the kill may not
be safe within the context of a callback from inside storage engine
locking code.
This thread is used to load the GTID state from mysql.gtid_slave_pos at
server start; reading from table requires valid THD, which is otherwise not
available during server init.
*/
static int
start_slave_background_thread()
run_slave_init_thread()
{
pthread_t th;
slave_background_thread_running= true;
slave_background_thread_stop= false;
slave_background_thread_gtid_loaded= false;
if (mysql_thread_create(key_thread_slave_background,
&th, &connection_attrib, handle_slave_background,
NULL))
slave_init_thread_running= true;
if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
handle_slave_init, NULL))
{
sql_print_error("Failed to create thread while initialising slave");
return 1;
}
mysql_mutex_lock(&LOCK_slave_background);
while (!slave_background_thread_gtid_loaded)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
mysql_mutex_lock(&LOCK_slave_init);
while (!slave_init_thread_running)
mysql_cond_wait(&COND_slave_init, &LOCK_slave_init);
mysql_mutex_unlock(&LOCK_slave_init);
return 0;
}
static void
stop_slave_background_thread()
{
mysql_mutex_lock(&LOCK_slave_background);
slave_background_thread_stop= true;
mysql_cond_broadcast(&COND_slave_background);
while (slave_background_thread_running)
mysql_cond_wait(&COND_slave_background, &LOCK_slave_background);
mysql_mutex_unlock(&LOCK_slave_background);
}
/* Initialize slave structures */
int init_slave()
@ -454,7 +368,7 @@ int init_slave()
init_slave_psi_keys();
#endif
if (start_slave_background_thread())
if (run_slave_init_thread())
return 1;
/*
@ -1084,9 +998,6 @@ void end_slave()
master_info_index= 0;
active_mi= 0;
mysql_mutex_unlock(&LOCK_active_mi);
stop_slave_background_thread();
global_rpl_thread_pool.destroy();
free_all_rpl_filters();
DBUG_VOID_RETURN;

View File

@ -238,7 +238,6 @@ pthread_handler_t handle_slave_io(void *arg);
void slave_output_error_info(rpl_group_info *rgi, THD *thd);
pthread_handler_t handle_slave_sql(void *arg);
bool net_request_file(NET* net, const char* fname);
void slave_background_kill_request(THD *to_kill);
extern bool volatile abort_loop;
extern Master_info main_mi, *active_mi; /* active_mi for multi-master */

View File

@ -4211,6 +4211,24 @@ extern "C" int thd_slave_thread(const MYSQL_THD thd)
return(thd->slave_thread);
}
/* Returns true for a worker thread in parallel replication. */
extern "C" int thd_rpl_is_parallel(const MYSQL_THD thd)
{
return thd->rgi_slave && thd->rgi_slave->is_parallel_exec;
}
/*
This function can optionally be called to check if thd_report_wait_for()
needs to be called for waits done by a given transaction.
If this function returns false for a given thd, there is no need to do any
calls to thd_report_wait_for() on that thd.
This call is optional; it is safe to call thd_report_wait_for() in any case.
This call can be used to save some redundant calls to thd_report_wait_for()
if desired. (This is unlikely to matter much unless there are _lots_ of
waits to report, as the overhead of thd_report_wait_for() is small).
*/
extern "C" int
thd_need_wait_for(const MYSQL_THD thd)
{
@ -4224,6 +4242,31 @@ thd_need_wait_for(const MYSQL_THD thd)
return rgi->is_parallel_exec;
}
/*
Used by InnoDB/XtraDB to report that one transaction THD is about to go to
wait for a transactional lock held by another transactions OTHER_THD.
This is used for parallel replication, where transactions are required to
commit in the same order on the slave as they did on the master. If the
transactions on the slave encounters lock conflicts on the slave that did
not exist on the master, this can cause deadlocks.
Normally, such conflicts will not occur, because the same conflict would
have prevented the two transactions from committing in parallel on the
master, thus preventing them from running in parallel on the slave in the
first place. However, it is possible in case when the optimizer chooses a
different plan on the slave than on the master (eg. table scan instead of
index scan).
InnoDB/XtraDB reports lock waits using this call. If a lock wait causes a
deadlock with the pre-determined commit order, we kill the later transaction,
and later re-try it, to resolve the deadlock.
This call need only receive reports about waits for locks that will remain
until the holding transaction commits. InnoDB/XtraDB auto-increment locks
are released earlier, and so need not be reported. (Such false positives are
not harmful, but could lead to unnecessary kill and retry, so best avoided).
*/
extern "C" void
thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
{
@ -4254,12 +4297,51 @@ thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd)
cause replication to rollback (and later re-try) the other transaction,
releasing the lock for this transaction so replication can proceed.
*/
#ifdef HAVE_REPLICATION
slave_background_kill_request(other_thd);
#endif
other_rgi->killed_for_retry= true;
mysql_mutex_lock(&other_thd->LOCK_thd_data);
other_thd->awake(KILL_CONNECTION);
mysql_mutex_unlock(&other_thd->LOCK_thd_data);
}
/*
This function is called from InnoDB/XtraDB to check if the commit order of
two transactions has already been decided by the upper layer. This happens
in parallel replication, where the commit order is forced to be the same on
the slave as it was originally on the master.
If this function returns false, it means that such commit order will be
enforced. This allows the storage engine to optionally omit gap lock waits
or similar measures that would otherwise be needed to ensure that
transactions would be serialised in a way that would cause a commit order
that is correct for binlogging for statement-based replication.
Since transactions are only run in parallel on the slave if they ran without
lock conflicts on the master, normally no lock conflicts on the slave happen
during parallel replication. However, there are a couple of corner cases
where it can happen, like these secondary-index operations:
T1: INSERT INTO t1 VALUES (7, NULL);
T2: DELETE FROM t1 WHERE b <= 3;
T1: UPDATE t1 SET secondary=NULL WHERE primary=1
T2: DELETE t1 WHERE secondary <= 3
The DELETE takes a gap lock that can block the INSERT/UPDATE, but the row
locks set by INSERT/UPDATE do not block the DELETE. Thus, the execution
order of the transactions determine whether a lock conflict occurs or
not. Thus a lock conflict can occur on the slave where it did not on the
master.
If this function returns true, normal locking should be done as required by
the binlogging and transaction isolation level in effect. But if it returns
false, the correct order will be enforced anyway, and InnoDB/XtraDB can
avoid taking the gap lock, preventing the lock conflict.
Calling this function is just an optimisation to avoid unnecessary
deadlocks. If it was not used, a gap lock would be set that could eventually
cause a deadlock; the deadlock would be caught by thd_report_wait_for() and
the transaction T2 killed and rolled back (and later re-tried).
*/
extern "C" int
thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
{
@ -4277,7 +4359,7 @@ thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
return 1;
if (rgi->current_gtid.domain_id != other_rgi->current_gtid.domain_id)
return 1;
if (rgi->commit_id != other_rgi->commit_id)
if (!rgi->commit_id || rgi->commit_id != other_rgi->commit_id)
return 1;
/*
These two threads are doing parallel replication within the same
@ -4289,6 +4371,26 @@ thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd)
}
/*
If the storage engine detects a deadlock, and needs to choose a victim
transaction to roll back, it can call this function to ask the upper
server layer for which of two possible transactions is prefered to be
aborted and rolled back.
In parallel replication, if two transactions are running in parallel and
one is fixed to commit before the other, then the one that commits later
will be prefered as the victim - chosing the early transaction as a victim
will not resolve the deadlock anyway, as the later transaction still needs
to wait for the earlier to commit.
Otherwise, a transaction that uses only transactional tables, and can thus
be safely rolled back, will be prefered as a deadlock victim over a
transaction that also modified non-transactional (eg. MyISAM) tables.
The return value is -1 if the first transaction is prefered as a deadlock
victim, 1 if the second transaction is prefered, or 0 for no preference (in
which case the storage engine can make the choice as it prefers).
*/
extern "C" int
thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2)
{

View File

@ -1358,7 +1358,7 @@ enum enum_thread_type
SYSTEM_THREAD_EVENT_SCHEDULER= 16,
SYSTEM_THREAD_EVENT_WORKER= 32,
SYSTEM_THREAD_BINLOG_BACKGROUND= 64,
SYSTEM_THREAD_SLAVE_BACKGROUND= 128,
SYSTEM_THREAD_SLAVE_INIT= 128,
};
inline char const *

View File

@ -156,7 +156,7 @@ static uchar *next_free_record_pos(HP_SHARE *info)
("record file full. records: %lu max_records: %lu "
"data_length: %llu index_length: %llu "
"max_table_size: %llu",
(unsigned long)info->records, info->max_records,
info->records, info->max_records,
info->data_length, info->index_length,
info->max_table_size));
my_errno=HA_ERR_RECORD_FILE_FULL;

View File

@ -4220,13 +4220,18 @@ innobase_kill_query(
if (trx)
{
THD *cur = current_thd;
THD *owner = trx->current_lock_mutex_owner;
/* Cancel a pending lock request. */
lock_mutex_enter();
if (owner != cur)
lock_mutex_enter();
trx_mutex_enter(trx);
if (trx->lock.wait_lock)
lock_cancel_waiting_and_release(trx->lock.wait_lock);
trx_mutex_exit(trx);
lock_mutex_exit();
if (owner != cur)
lock_mutex_exit();
}
DBUG_VOID_RETURN;

View File

@ -992,6 +992,11 @@ struct trx_t{
count of tables being flushed. */
/*------------------------------*/
THD* current_lock_mutex_owner;
/*!< If this is equal to current_thd,
then in innobase_kill_query() we know we
already hold the lock_sys->mutex. */
/*------------------------------*/
#ifdef UNIV_DEBUG
ulint start_line; /*!< Track where it was started from */
const char* start_file; /*!< Filename where it was started */

View File

@ -374,6 +374,11 @@ struct lock_stack_t {
ulint heap_no; /*!< heap number if rec lock */
};
extern "C" void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_for(const MYSQL_THD thd);
extern "C"
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
/** Stack to use during DFS search. Currently only a single stack is required
because there is no parallel deadlock check. This stack is protected by
the lock_sys_t::mutex. */
@ -392,6 +397,14 @@ UNIV_INTERN mysql_pfs_key_t lock_sys_wait_mutex_key;
#ifdef UNIV_DEBUG
UNIV_INTERN ibool lock_print_waits = FALSE;
/* Buffer to collect THDs to report waits for. */
struct thd_wait_reports {
struct thd_wait_reports *next;
ulint used;
trx_t *waitees[64];
};
/*********************************************************************//**
Validates the lock system.
@return TRUE if ok */
@ -1032,8 +1045,12 @@ lock_rec_has_to_wait(
the correct order so that statement-based replication
will give the correct results. Since the right order
was already determined on the master, we do not need
to enforce it again here (and doing so could lead to
occasional deadlocks). */
to enforce it again here.
Skipping the locks is not essential for correctness,
since in case of deadlock we will just kill the later
transaction and retry it. But it can save some
unnecessary rollbacks and retries. */
return (FALSE);
}
@ -3821,7 +3838,8 @@ static
trx_id_t
lock_deadlock_search(
/*=================*/
lock_deadlock_ctx_t* ctx) /*!< in/out: deadlock context */
lock_deadlock_ctx_t* ctx, /*!< in/out: deadlock context */
struct thd_wait_reports*waitee_ptr) /*!< in/out: list of waitees */
{
const lock_t* lock;
ulint heap_no;
@ -3900,10 +3918,24 @@ lock_deadlock_search(
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they can
not cause deadlocks with binlog-fixed commit order. */
if (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)
thd_report_wait_for(ctx->start->mysql_thd,
lock->trx->mysql_thd);
if (waitee_ptr && (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used == sizeof(waitee_ptr->waitees)/
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
@ -3996,6 +4028,41 @@ lock_deadlock_trx_rollback(
trx_mutex_exit(trx);
}
static void
mysql_report_waiters(struct thd_wait_reports *waitee_buf_ptr,
THD *mysql_thd,
trx_id_t victim_trx_id)
{
struct thd_wait_reports *p = waitee_buf_ptr;
while (p) {
struct thd_wait_reports *q;
ulint i = 0;
while (i < p->used) {
trx_t *w_trx = p->waitees[i];
/* There is no need to report waits to a trx already
selected as a victim. */
if (w_trx->id != victim_trx_id)
{
/* If thd_report_wait_for() decides to kill the
transaction, then we will get a call back into
innobase_kill_query. We mark this by setting
current_lock_mutex_owner, so we can avoid trying
to recursively take lock_sys->mutex. */
w_trx->current_lock_mutex_owner = mysql_thd;
thd_report_wait_for(mysql_thd, w_trx->mysql_thd);
w_trx->current_lock_mutex_owner = NULL;
}
++i;
}
q = p->next;
if (p != waitee_buf_ptr)
mem_free(q);
p = q;
}
}
/********************************************************************//**
Checks if a joining lock request results in a deadlock. If a deadlock is
found this function will resolve the dadlock by choosing a victim transaction
@ -4012,12 +4079,20 @@ lock_deadlock_check_and_resolve(
const trx_t* trx) /*!< in: transaction */
{
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf, *waitee_buf_ptr;
THD* start_mysql_thd;
ut_ad(trx != NULL);
ut_ad(lock != NULL);
ut_ad(lock_mutex_own());
assert_trx_in_list(trx);
start_mysql_thd = trx->mysql_thd;
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd))
waitee_buf_ptr = &waitee_buf;
else
waitee_buf_ptr = NULL;
/* Try and resolve as many deadlocks as possible. */
do {
lock_deadlock_ctx_t ctx;
@ -4030,7 +4105,17 @@ lock_deadlock_check_and_resolve(
ctx.wait_lock = lock;
ctx.mark_start = lock_mark_counter;
victim_trx_id = lock_deadlock_search(&ctx);
if (waitee_buf_ptr) {
waitee_buf_ptr->next = NULL;
waitee_buf_ptr->used = 0;
}
victim_trx_id = lock_deadlock_search(&ctx, waitee_buf_ptr);
/* Report waits to upper layer, as needed. */
if (waitee_buf_ptr)
mysql_report_waiters(waitee_buf_ptr, start_mysql_thd,
victim_trx_id);
/* Search too deep, we rollback the joining transaction. */
if (ctx.too_deep) {

View File

@ -50,6 +50,9 @@ Created 3/26/1996 Heikki Tuuri
#include<set>
extern "C"
int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);
/** Set of table_id */
typedef std::set<table_id_t> table_id_set;

View File

@ -4702,12 +4702,15 @@ innobase_kill_connection(
DBUG_ENTER("innobase_kill_connection");
DBUG_ASSERT(hton == innodb_hton_ptr);
lock_mutex_enter();
trx = thd_to_trx(thd);
if (trx)
{
THD *cur = current_thd;
THD *owner = trx->current_lock_mutex_owner;
if (owner != cur)
lock_mutex_enter();
trx_mutex_enter(trx);
/* Cancel a pending lock request. */
@ -4715,10 +4718,10 @@ innobase_kill_connection(
lock_cancel_waiting_and_release(trx->lock.wait_lock);
trx_mutex_exit(trx);
if (owner != cur)
lock_mutex_exit();
}
lock_mutex_exit();
DBUG_VOID_RETURN;
}

View File

@ -1019,6 +1019,11 @@ struct trx_t{
count of tables being flushed. */
/*------------------------------*/
THD* current_lock_mutex_owner;
/*!< If this is equal to current_thd,
then in innobase_kill_query() we know we
already hold the lock_sys->mutex. */
/*------------------------------*/
#ifdef UNIV_DEBUG
ulint start_line; /*!< Track where it was started from */
const char* start_file; /*!< Filename where it was started */

View File

@ -374,6 +374,11 @@ struct lock_stack_t {
ulint heap_no; /*!< heap number if rec lock */
};
extern "C" void thd_report_wait_for(const MYSQL_THD thd, MYSQL_THD other_thd);
extern "C" int thd_need_wait_for(const MYSQL_THD thd);
extern "C"
int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
/** Stack to use during DFS search. Currently only a single stack is required
because there is no parallel deadlock check. This stack is protected by
the lock_sys_t::mutex. */
@ -392,6 +397,14 @@ UNIV_INTERN mysql_pfs_key_t lock_sys_wait_mutex_key;
#ifdef UNIV_DEBUG
UNIV_INTERN ibool lock_print_waits = FALSE;
/* Buffer to collect THDs to report waits for. */
struct thd_wait_reports {
struct thd_wait_reports *next;
ulint used;
trx_t *waitees[64];
};
/*********************************************************************//**
Validates the lock system.
@return TRUE if ok */
@ -1033,8 +1046,12 @@ lock_rec_has_to_wait(
the correct order so that statement-based replication
will give the correct results. Since the right order
was already determined on the master, we do not need
to enforce it again here (and doing so could lead to
occasional deadlocks). */
to enforce it again here.
Skipping the locks is not essential for correctness,
since in case of deadlock we will just kill the later
transaction and retry it. But it can save some
unnecessary rollbacks and retries. */
return (FALSE);
}
@ -3844,7 +3861,8 @@ static
trx_id_t
lock_deadlock_search(
/*=================*/
lock_deadlock_ctx_t* ctx) /*!< in/out: deadlock context */
lock_deadlock_ctx_t* ctx, /*!< in/out: deadlock context */
struct thd_wait_reports*waitee_ptr) /*!< in/out: list of waitees */
{
const lock_t* lock;
ulint heap_no;
@ -3923,10 +3941,24 @@ lock_deadlock_search(
/* We do not need to report autoinc locks to the upper
layer. These locks are released before commit, so they can
not cause deadlocks with binlog-fixed commit order. */
if (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)
thd_report_wait_for(ctx->start->mysql_thd,
lock->trx->mysql_thd);
if (waitee_ptr && (lock_get_type_low(lock) != LOCK_TABLE ||
lock_get_mode(lock) != LOCK_AUTO_INC)) {
if (waitee_ptr->used == sizeof(waitee_ptr->waitees)/
sizeof(waitee_ptr->waitees[0])) {
waitee_ptr->next =
(struct thd_wait_reports *)
mem_alloc(sizeof(*waitee_ptr));
waitee_ptr = waitee_ptr->next;
if (!waitee_ptr) {
ctx->too_deep = TRUE;
return(ctx->start->id);
}
waitee_ptr->next = NULL;
waitee_ptr->used = 0;
}
waitee_ptr->waitees[waitee_ptr->used++] = lock->trx;
}
if (lock->trx->lock.que_state == TRX_QUE_LOCK_WAIT) {
/* Another trx ahead has requested a lock in an
@ -4019,6 +4051,41 @@ lock_deadlock_trx_rollback(
trx_mutex_exit(trx);
}
static void
mysql_report_waiters(struct thd_wait_reports *waitee_buf_ptr,
THD *mysql_thd,
trx_id_t victim_trx_id)
{
struct thd_wait_reports *p = waitee_buf_ptr;
while (p) {
struct thd_wait_reports *q;
ulint i = 0;
while (i < p->used) {
trx_t *w_trx = p->waitees[i];
/* There is no need to report waits to a trx already
selected as a victim. */
if (w_trx->id != victim_trx_id)
{
/* If thd_report_wait_for() decides to kill the
transaction, then we will get a call back into
innobase_kill_query. We mark this by setting
current_lock_mutex_owner, so we can avoid trying
to recursively take lock_sys->mutex. */
w_trx->current_lock_mutex_owner = mysql_thd;
thd_report_wait_for(mysql_thd, w_trx->mysql_thd);
w_trx->current_lock_mutex_owner = NULL;
}
++i;
}
q = p->next;
if (p != waitee_buf_ptr)
mem_free(q);
p = q;
}
}
/********************************************************************//**
Checks if a joining lock request results in a deadlock. If a deadlock is
found this function will resolve the dadlock by choosing a victim transaction
@ -4035,12 +4102,20 @@ lock_deadlock_check_and_resolve(
const trx_t* trx) /*!< in: transaction */
{
trx_id_t victim_trx_id;
struct thd_wait_reports waitee_buf, *waitee_buf_ptr;
THD* start_mysql_thd;
ut_ad(trx != NULL);
ut_ad(lock != NULL);
ut_ad(lock_mutex_own());
assert_trx_in_list(trx);
start_mysql_thd = trx->mysql_thd;
if (start_mysql_thd && thd_need_wait_for(start_mysql_thd))
waitee_buf_ptr = &waitee_buf;
else
waitee_buf_ptr = NULL;
/* Try and resolve as many deadlocks as possible. */
do {
lock_deadlock_ctx_t ctx;
@ -4053,7 +4128,17 @@ lock_deadlock_check_and_resolve(
ctx.wait_lock = lock;
ctx.mark_start = lock_mark_counter;
victim_trx_id = lock_deadlock_search(&ctx);
if (waitee_buf_ptr) {
waitee_buf_ptr->next = NULL;
waitee_buf_ptr->used = 0;
}
victim_trx_id = lock_deadlock_search(&ctx, waitee_buf_ptr);
/* Report waits to upper layer, as needed. */
if (waitee_buf_ptr)
mysql_report_waiters(waitee_buf_ptr, start_mysql_thd,
victim_trx_id);
/* Search too deep, we rollback the joining transaction. */
if (ctx.too_deep) {

View File

@ -51,6 +51,9 @@ Created 3/26/1996 Heikki Tuuri
#include<set>
extern "C"
int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);
/** Set of table_id */
typedef std::set<table_id_t> table_id_set;