MDEV-4506, parallel replication.

Some after-review fixes.
This commit is contained in:
unknown 2013-09-13 15:09:57 +02:00
parent 13fddb32de
commit d107bdaa01
16 changed files with 311 additions and 191 deletions

View File

@ -6542,26 +6542,87 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
}
}
/*
Put a transaction that is ready to commit in the group commit queue.
The transaction is identified by the ENTRY object passed into this function.
To facilitate group commit for the binlog, we first queue up ourselves in
this function. Then later the first thread to enter the queue waits for
the LOCK_log mutex, and commits for everyone in the queue once it gets the
lock. Any other threads in the queue just wait for the first one to finish
the commit and wake them up. This way, all transactions in the queue get
committed in a single disk operation.
The return value of this function is TRUE if queued as the first entry in
the queue (meaning this is the leader), FALSE otherwise.
The main work in this function is when the commit in one transaction has
been marked to wait for the commit of another transaction to happen
first. This is used to support in-order parallel replication, where
transactions can execute out-of-order but need to be committed in-order with
how they happened on the master. The waiting of one commit on another needs
to be integrated with the group commit queue, to ensure that the waiting
transaction can participate in the same group commit as the waited-for
transaction.
So when we put a transaction in the queue, we check if there were other
transactions already prepared to commit but just waiting for the first one
to commit. If so, we add those to the queue as well, transitively for all
waiters.
*/
bool
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
wait_for_commit *wfc)
MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry)
{
group_commit_entry *orig_queue;
wait_for_commit *list, *cur, *last;
wait_for_commit *wfc;
/*
To facilitate group commit for the binlog, we first queue up ourselves in
the group commit queue. Then the first thread to enter the queue waits for
the LOCK_log mutex, and commits for everyone in the queue once it gets the
lock. Any other threads in the queue just wait for the first one to finish
the commit and wake them up.
Check if we need to wait for another transaction to commit before us.
To support in-order parallel replication with group commit, after we add
some transaction to the queue, we check if there were other transactions
already prepared to commit but just waiting for the first one to commit.
If so, we add those to the queue as well, transitively for all waiters.
It is safe to do a quick check without lock first in the case where we do
not have to wait. But if the quick check shows we need to wait, we must do
another safe check under lock, to avoid the race where the other
transaction wakes us up between the check and the wait.
*/
wfc= entry->thd->wait_for_commit_ptr;
entry->queued_by_other= false;
if (wfc && wfc->waiting_for_commit)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* Do an extra check here, this time safely under lock. */
if (wfc->waiting_for_commit)
{
/*
By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before
us.
This other transaction may then take over the commit process for us to
get us included in its own group commit. If this happens, the
queued_by_other flag is set.
*/
wfc->opaque_pointer= entry;
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL;
}
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
/*
If the transaction we were waiting for has already put us into the group
commit queue (and possibly already done the entire binlog commit for us),
then there is nothing else to do.
*/
if (entry->queued_by_other)
return false;
/* Now enqueue ourselves in the group commit queue. */
entry->thd->clear_wakeup_ready();
mysql_mutex_lock(&LOCK_prepare_ordered);
orig_queue= group_commit_queue;
@ -6574,6 +6635,23 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
This would be natural to do with recursion, but we want to avoid
potentially unbounded recursion blowing the C stack, so we use the list
approach instead.
We keep a list of all the waiters that need to be processed in `list',
linked through the next_subsequent_commit pointer. Initially this list
contains only the entry passed into this function.
We process entries in the list one by one. The element currently being
processed is pointed to by `cur`, and the element at the end of the list
is pointed to by `last` (we do not use NULL to terminate the list).
As we process an element, it is first added to the group_commit_queue.
Then any waiters for that element are added at the end of the list, to
be processed in subsequent iterations. This continues until the list
is exhausted, with all elements ever added eventually processed.
The end result is a breath-first traversal of the tree of waiters,
re-using the next_subsequent_commit pointers in place of extra stack
space in a recursive traversal.
*/
list= wfc;
cur= list;
@ -6594,6 +6672,12 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
if (!cur)
break; // Can happen if initial entry has no wait_for_commit
/*
Check if this transaction has other transaction waiting for it to commit.
If so, process the waiting transactions, and their waiters and so on,
transitively.
*/
if (cur->subsequent_commits_list)
{
bool have_lock;
@ -6601,63 +6685,66 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
mysql_mutex_lock(&cur->LOCK_wait_commit);
have_lock= true;
/*
Grab the list, now safely under lock, and process it if still
non-empty.
*/
waiter= cur->subsequent_commits_list;
/* Check again, now safely under lock. */
if (waiter)
cur->subsequent_commits_list= NULL;
while (waiter)
{
/* Grab the list of waiters and process it. */
cur->subsequent_commits_list= NULL;
do
wait_for_commit *next= waiter->next_subsequent_commit;
group_commit_entry *entry2=
(group_commit_entry *)waiter->opaque_pointer;
if (entry2)
{
wait_for_commit *next= waiter->next_subsequent_commit;
group_commit_entry *entry2=
(group_commit_entry *)waiter->opaque_pointer;
if (entry2)
{
/*
This is another transaction ready to be written to the binary
log. We can put it into the queue directly, without needing a
separate context switch to the other thread. We just set a flag
so that the other thread will know when it wakes up that it was
already processed.
/*
This is another transaction ready to be written to the binary
log. We can put it into the queue directly, without needing a
separate context switch to the other thread. We just set a flag
so that the other thread will know when it wakes up that it was
already processed.
So put it at the end of the list to be processed in a subsequent
iteration of the outer loop.
*/
entry2->queued_by_other= true;
last->next_subsequent_commit= waiter;
last= waiter;
/*
As a small optimisation, we do not actually need to set
waiter->next_subsequent_commit to NULL, as we can use the
pointer `last' to check for end-of-list.
*/
}
else
{
/*
Wake up the waiting transaction.
So put it at the end of the list to be processed in a subsequent
iteration of the outer loop.
*/
entry2->queued_by_other= true;
last->next_subsequent_commit= waiter;
last= waiter;
/*
As a small optimisation, we do not actually need to set
waiter->next_subsequent_commit to NULL, as we can use the
pointer `last' to check for end-of-list.
*/
}
else
{
/*
Wake up the waiting transaction.
For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details.
*/
if (have_lock)
{
cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit);
have_lock= false;
}
waiter->wakeup();
For this, we need to set the "wakeup running" flag and release
the waitee lock to avoid a deadlock, see comments on
THD::wakeup_subsequent_commits2() for details.
*/
if (have_lock)
{
have_lock= false;
cur->wakeup_subsequent_commits_running= true;
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
waiter= next;
} while (waiter);
waiter->wakeup();
}
waiter= next;
}
if (have_lock)
mysql_mutex_unlock(&cur->LOCK_wait_commit);
}
if (cur == last)
break;
/*
Move to the next entry in the flattened list of waiting transactions
that still need to be processed transitively.
*/
cur= cur->next_subsequent_commit;
entry= (group_commit_entry *)cur->opaque_pointer;
DBUG_ASSERT(entry != NULL);
@ -6691,31 +6778,7 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *entry,
bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
wait_for_commit *wfc;
bool is_leader;
wfc= entry->thd->wait_for_commit_ptr;
entry->queued_by_other= false;
if (wfc && wfc->waiting_for_commit)
{
mysql_mutex_lock(&wfc->LOCK_wait_commit);
/* Do an extra check here, this time safely under lock. */
if (wfc->waiting_for_commit)
{
wfc->opaque_pointer= entry;
do
{
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
} while (wfc->waiting_for_commit);
wfc->opaque_pointer= NULL;
}
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
}
if (entry->queued_by_other)
is_leader= false;
else
is_leader= queue_for_group_commit(entry, wfc);
bool is_leader= queue_for_group_commit(entry);
/*
The first in the queue handles group commit for all; the others just wait
@ -6756,6 +6819,16 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
if (next)
{
/*
Wake up the next thread in the group commit.
The next thread can be waiting in two different ways, depending on
whether it put itself in the queue, or if it was put in queue by us
because it had to wait for us to commit first.
So execute the appropriate wakeup, identified by the queued_by_other
field.
*/
if (next->queued_by_other)
next->thd->wait_for_commit_ptr->wakeup();
else
@ -6840,14 +6913,18 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
mysql_mutex_lock(&LOCK_log);
DEBUG_SYNC(leader->thd, "commit_after_get_LOCK_log");
binlog_id= current_binlog_id;
mysql_mutex_lock(&LOCK_prepare_ordered);
if (opt_binlog_commit_wait_count)
wait_for_sufficient_commits();
/*
Note that wait_for_sufficient_commits() may have released and
re-acquired the LOCK_log and LOCK_prepare_ordered if it needed to wait.
*/
current= group_commit_queue;
group_commit_queue= NULL;
mysql_mutex_unlock(&LOCK_prepare_ordered);
binlog_id= current_binlog_id;
/* As the queue is in reverse order of entering, reverse it. */
last_in_queue= current;
@ -7141,6 +7218,13 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
}
/*
Wait for sufficient commits to queue up for group commit, according to the
values of binlog_commit_wait_count and binlog_commit_wait_usec.
Note that this function may release and re-acquire LOCK_log and
LOCK_prepare_ordered if it needs to wait.
*/
void
MYSQL_BIN_LOG::wait_for_sufficient_commits()
{
@ -7152,11 +7236,9 @@ MYSQL_BIN_LOG::wait_for_sufficient_commits()
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_prepare_ordered);
count= 0;
for (e= last_head= group_commit_queue; e; e= e->next)
++count;
if (count >= opt_binlog_commit_wait_count)
return;
for (e= last_head= group_commit_queue, count= 0; e; e= e->next)
if (++count >= opt_binlog_commit_wait_count)
return;
mysql_mutex_unlock(&LOCK_log);
set_timespec_nsec(wait_until, (ulonglong)1000*opt_binlog_commit_wait_usec);
@ -7178,7 +7260,25 @@ MYSQL_BIN_LOG::wait_for_sufficient_commits()
last_head= head;
}
mysql_mutex_lock(&LOCK_log);
/*
We must not wait for LOCK_log while holding LOCK_prepare_ordered.
LOCK_log can be held for long periods (eg. we do I/O under it), while
LOCK_prepare_ordered must only be held for short periods.
In addition, waiting for LOCK_log while holding LOCK_prepare_ordered would
violate locking order of LOCK_log-before-LOCK_prepare_ordered. This could
cause SAFEMUTEX warnings (even if it cannot actually deadlock with current
code, as there can be at most one group commit leader thread at a time).
So release and re-acquire LOCK_prepare_ordered if we need to wait for the
LOCK_log.
*/
if (mysql_mutex_trylock(&LOCK_log))
{
mysql_mutex_unlock(&LOCK_prepare_ordered);
mysql_mutex_lock(&LOCK_log);
mysql_mutex_lock(&LOCK_prepare_ordered);
}
}

View File

@ -540,7 +540,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
void do_checkpoint_request(ulong binlog_id);
void purge();
int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
bool queue_for_group_commit(group_commit_entry *entry, wait_for_commit *wfc);
bool queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
void trx_group_commit_leader(group_commit_entry *leader);
bool is_xidlist_idle_nolock();

View File

@ -937,7 +937,7 @@ Log_event::Log_event(const char* buf,
#ifndef MYSQL_CLIENT
#ifdef HAVE_REPLICATION
int Log_event::do_update_pos(struct rpl_group_info *rgi)
int Log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@ -3756,7 +3756,7 @@ void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Query_log_event::do_apply_event(struct rpl_group_info *rgi)
int Query_log_event::do_apply_event(rpl_group_info *rgi)
{
return do_apply_event(rgi, query, q_len);
}
@ -3807,8 +3807,8 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error)
mismatch. This mismatch could be implemented with a new ER_ code, and
to ignore it you would use --slave-skip-errors...
*/
int Query_log_event::do_apply_event(struct rpl_group_info *rgi,
const char *query_arg, uint32 q_len_arg)
int Query_log_event::do_apply_event(rpl_group_info *rgi,
const char *query_arg, uint32 q_len_arg)
{
LEX_STRING new_db;
int expected_error,actual_error= 0;
@ -4244,7 +4244,7 @@ end:
DBUG_RETURN(thd->is_slave_error);
}
int Query_log_event::do_update_pos(struct rpl_group_info *rgi)
int Query_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@ -4461,7 +4461,7 @@ bool Start_log_event_v3::write(IO_CACHE* file)
other words, no deadlock problem.
*/
int Start_log_event_v3::do_apply_event(struct rpl_group_info *rgi)
int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
{
DBUG_ENTER("Start_log_event_v3::do_apply_event");
int error= 0;
@ -4810,7 +4810,7 @@ bool Format_description_log_event::write(IO_CACHE* file)
#endif
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi)
int Format_description_log_event::do_apply_event(rpl_group_info *rgi)
{
int ret= 0;
Relay_log_info const *rli= rgi->rli;
@ -4867,7 +4867,7 @@ int Format_description_log_event::do_apply_event(struct rpl_group_info *rgi)
DBUG_RETURN(ret);
}
int Format_description_log_event::do_update_pos(struct rpl_group_info *rgi)
int Format_description_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
if (server_id == (uint32) global_system_variables.server_id)
@ -5516,7 +5516,7 @@ void Load_log_event::set_fields(const char* affected_db,
1 Failure
*/
int Load_log_event::do_apply_event(NET* net, struct rpl_group_info *rgi,
int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
bool use_rli_only_for_errors)
{
LEX_STRING new_db;
@ -5919,7 +5919,7 @@ bool Rotate_log_event::write(IO_CACHE* file)
@retval
0 ok
*/
int Rotate_log_event::do_update_pos(struct rpl_group_info *rgi)
int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Rotate_log_event::do_update_pos");
@ -6096,7 +6096,7 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
const Format_description_log_event *description_event)
: Log_event(buf, description_event), seq_no(0)
: Log_event(buf, description_event), seq_no(0), commit_id(0)
{
uint8 header_size= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
@ -6120,8 +6120,6 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
++buf;
commit_id= uint8korr(buf);
}
else
commit_id= 0;
}
@ -6254,7 +6252,7 @@ Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
static char gtid_begin_string[] = "BEGIN";
int
Gtid_log_event::do_apply_event(struct rpl_group_info *rgi)
Gtid_log_event::do_apply_event(rpl_group_info *rgi)
{
thd->variables.server_id= this->server_id;
thd->variables.gtid_domain_id= this->domain_id;
@ -6295,7 +6293,7 @@ Gtid_log_event::do_apply_event(struct rpl_group_info *rgi)
int
Gtid_log_event::do_update_pos(struct rpl_group_info *rgi)
Gtid_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@ -6477,7 +6475,7 @@ Gtid_list_log_event::write(IO_CACHE *file)
int
Gtid_list_log_event::do_apply_event(struct rpl_group_info *rgi)
Gtid_list_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
int ret= Log_event::do_apply_event(rgi);
@ -6707,7 +6705,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Intvar_log_event::do_apply_event()
*/
int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi)
int Intvar_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@ -6731,7 +6729,7 @@ int Intvar_log_event::do_apply_event(struct rpl_group_info *rgi)
return 0;
}
int Intvar_log_event::do_update_pos(struct rpl_group_info *rgi)
int Intvar_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@ -6818,7 +6816,7 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Rand_log_event::do_apply_event(struct rpl_group_info *rgi)
int Rand_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
/*
@ -6835,7 +6833,7 @@ int Rand_log_event::do_apply_event(struct rpl_group_info *rgi)
return 0;
}
int Rand_log_event::do_update_pos(struct rpl_group_info *rgi)
int Rand_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@ -6950,7 +6948,7 @@ void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Xid_log_event::do_apply_event(struct rpl_group_info *rgi)
int Xid_log_event::do_apply_event(rpl_group_info *rgi)
{
bool res;
int err;
@ -7416,7 +7414,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int User_var_log_event::do_apply_event(struct rpl_group_info *rgi)
int User_var_log_event::do_apply_event(rpl_group_info *rgi)
{
Item *it= 0;
CHARSET_INFO *charset;
@ -7505,7 +7503,7 @@ int User_var_log_event::do_apply_event(struct rpl_group_info *rgi)
DBUG_RETURN(0);
}
int User_var_log_event::do_update_pos(struct rpl_group_info *rgi)
int User_var_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@ -7682,7 +7680,7 @@ Slave_log_event::Slave_log_event(const char* buf,
#ifndef MYSQL_CLIENT
int Slave_log_event::do_apply_event(struct rpl_group_info *rgi)
int Slave_log_event::do_apply_event(rpl_group_info *rgi)
{
if (mysql_bin_log.is_open())
return mysql_bin_log.write(this);
@ -7726,7 +7724,7 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Start_log_event_v3::do_apply_event(), not here. Because if we come
here, the master was sane.
*/
int Stop_log_event::do_update_pos(struct rpl_group_info *rgi)
int Stop_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
@ -7958,7 +7956,7 @@ void Create_file_log_event::pack_info(THD *thd, Protocol *protocol)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Create_file_log_event::do_apply_event(struct rpl_group_info *rgi)
int Create_file_log_event::do_apply_event(rpl_group_info *rgi)
{
char proc_info[17+FN_REFLEN+10], *fname_buf;
char *ext;
@ -8140,7 +8138,7 @@ int Append_block_log_event::get_create_or_append() const
Append_block_log_event::do_apply_event()
*/
int Append_block_log_event::do_apply_event(struct rpl_group_info *rgi)
int Append_block_log_event::do_apply_event(rpl_group_info *rgi)
{
char proc_info[17+FN_REFLEN+10], *fname= proc_info+17;
int fd;
@ -8291,7 +8289,7 @@ void Delete_file_log_event::pack_info(THD *thd, Protocol *protocol)
*/
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Delete_file_log_event::do_apply_event(struct rpl_group_info *rgi)
int Delete_file_log_event::do_apply_event(rpl_group_info *rgi)
{
char fname[FN_REFLEN+10];
Relay_log_info const *rli= rgi->rli;
@ -8391,7 +8389,7 @@ void Execute_load_log_event::pack_info(THD *thd, Protocol *protocol)
Execute_load_log_event::do_apply_event()
*/
int Execute_load_log_event::do_apply_event(struct rpl_group_info *rgi)
int Execute_load_log_event::do_apply_event(rpl_group_info *rgi)
{
char fname[FN_REFLEN+10];
char *ext;
@ -8664,7 +8662,7 @@ void Execute_load_query_log_event::pack_info(THD *thd, Protocol *protocol)
int
Execute_load_query_log_event::do_apply_event(struct rpl_group_info *rgi)
Execute_load_query_log_event::do_apply_event(rpl_group_info *rgi)
{
char *p;
char *buf;
@ -9072,7 +9070,7 @@ int Rows_log_event::do_add_row_data(uchar *row_data, size_t length)
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int Rows_log_event::do_apply_event(struct rpl_group_info *rgi)
int Rows_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
@ -9538,7 +9536,7 @@ static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
@retval non-zero Error in the statement commit
*/
int
Rows_log_event::do_update_pos(struct rpl_group_info *rgi)
Rows_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Rows_log_event::do_update_pos");
@ -9777,7 +9775,7 @@ void Annotate_rows_log_event::print(FILE *file, PRINT_EVENT_INFO *pinfo)
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
int Annotate_rows_log_event::do_apply_event(rpl_group_info *rgi)
{
m_save_thd_query_txt= thd->query();
m_save_thd_query_len= thd->query_length();
@ -9787,7 +9785,7 @@ int Annotate_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int Annotate_rows_log_event::do_update_pos(struct rpl_group_info *rgi)
int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@ -10296,7 +10294,7 @@ check_table_map(Relay_log_info const *rli, RPL_TABLE_LIST *table_list)
DBUG_RETURN(res);
}
int Table_map_log_event::do_apply_event(struct rpl_group_info *rgi)
int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
{
RPL_TABLE_LIST *table_list;
char *db_mem, *tname_mem;
@ -10415,7 +10413,7 @@ Table_map_log_event::do_shall_skip(Relay_log_info *rli)
return continue_group(rli);
}
int Table_map_log_event::do_update_pos(struct rpl_group_info *rgi)
int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
rli->inc_event_relay_log_pos();
@ -11847,7 +11845,7 @@ Incident_log_event::print(FILE *file,
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int
Incident_log_event::do_apply_event(struct rpl_group_info *rgi)
Incident_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("Incident_log_event::do_apply_event");

View File

@ -1317,7 +1317,7 @@ public:
@see do_apply_event
*/
int apply_event(struct rpl_group_info *rgi)
int apply_event(rpl_group_info *rgi)
{
return do_apply_event(rgi);
}
@ -1331,7 +1331,7 @@ public:
@see do_update_pos
*/
int update_pos(struct rpl_group_info *rgi)
int update_pos(rpl_group_info *rgi)
{
return do_update_pos(rgi);
}
@ -1432,7 +1432,7 @@ protected:
@retval 0 Event applied successfully
@retval errno Error code if event application failed
*/
virtual int do_apply_event(struct rpl_group_info *rgi)
virtual int do_apply_event(rpl_group_info *rgi)
{
return 0; /* Default implementation does nothing */
}
@ -1461,7 +1461,7 @@ protected:
1). Observe that handler errors are returned by the
do_apply_event() function, and not by this one.
*/
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
/**
@ -1986,10 +1986,10 @@ public:
public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
int do_apply_event(struct rpl_group_info *rgi,
int do_apply_event(rpl_group_info *rgi,
const char *query_arg,
uint32 q_len_arg);
static bool peek_is_commit_rollback(const char *event_start,
@ -2103,7 +2103,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@ -2416,12 +2416,12 @@ public:
public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi)
virtual int do_apply_event(rpl_group_info *rgi)
{
return do_apply_event(thd->slave_net,rgi,0);
}
int do_apply_event(NET *net, struct rpl_group_info *rgi,
int do_apply_event(NET *net, rpl_group_info *rgi,
bool use_rli_only_for_errors);
#endif
};
@ -2500,7 +2500,7 @@ public:
protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*)
{
/*
@ -2596,8 +2596,8 @@ public:
static bool is_version_before_checksum(const master_version_split *version_split);
protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@ -2675,8 +2675,8 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@ -2754,8 +2754,8 @@ class Rand_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@ -2803,7 +2803,7 @@ class Xid_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@ -2870,8 +2870,8 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@ -2905,7 +2905,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli)
{
/*
@ -3007,7 +3007,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
@ -3119,8 +3119,8 @@ public:
uint16 flags, bool is_transactional, uint64 commit_id);
#ifdef HAVE_REPLICATION
void pack_info(THD *thd, Protocol *protocol);
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
#else
@ -3249,7 +3249,7 @@ public:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
bool to_packet(String *packet);
bool write(IO_CACHE *file);
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
static bool peek(const char *event_start, uint32 event_len,
uint8 checksum_alg,
@ -3328,7 +3328,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@ -3383,7 +3383,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@ -3424,7 +3424,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@ -3464,7 +3464,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@ -3563,7 +3563,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
};
@ -3635,8 +3635,8 @@ public:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
private:
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*);
#endif
@ -4050,8 +4050,8 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
@ -4278,8 +4278,8 @@ protected:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
/*
@ -4612,7 +4612,7 @@ public:
#endif
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
#endif
virtual bool write_data_header(IO_CACHE *file);

View File

@ -36,7 +36,7 @@
// Old implementation of do_apply_event()
int
Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info *rgi)
Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(st_relay_log_info*)");
int error= 0;
@ -1451,7 +1451,7 @@ int Old_rows_log_event::do_add_row_data(uchar *row_data, size_t length)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
{
DBUG_ENTER("Old_rows_log_event::do_apply_event(Relay_log_info*)");
int error= 0;
@ -1834,7 +1834,7 @@ Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
}
int
Old_rows_log_event::do_update_pos(struct rpl_group_info *rgi)
Old_rows_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Old_rows_log_event::do_update_pos");

View File

@ -214,8 +214,8 @@ protected:
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(struct rpl_group_info *rgi);
virtual int do_update_pos(struct rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
/*
@ -275,7 +275,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
int do_apply_event(Old_rows_log_event*, struct rpl_group_info *rgi);
int do_apply_event(Old_rows_log_event*, rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.
@ -403,7 +403,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(struct rpl_group_info *rgi)
virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
@ -481,7 +481,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(struct rpl_group_info *rgi)
virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()
@ -556,7 +556,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
// use old definition of do_apply_event()
virtual int do_apply_event(struct rpl_group_info *rgi)
virtual int do_apply_event(rpl_group_info *rgi)
{ return Old_rows_log_event::do_apply_event(this, rgi); }
// primitives for old version of do_apply_event()

View File

@ -62,7 +62,7 @@ rpl_slave_state::update_state_hash(uint64 sub_id, rpl_gtid *gtid)
int
rpl_slave_state::record_and_update_gtid(THD *thd, struct rpl_group_info *rgi)
rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
uint64 sub_id;

View File

@ -62,7 +62,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
struct rpl_parallel_thread *rpt)
{
int err;
struct rpl_group_info *rgi= qev->rgi;
rpl_group_info *rgi= qev->rgi;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
@ -128,8 +128,9 @@ handle_rpl_parallel_thread(void *arg)
old_msg= thd->proc_info;
thd->enter_cond(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread,
"Waiting for work from SQL thread");
while (!rpt->stop && !thd->killed && !(events= rpt->event_queue))
while (!(events= rpt->event_queue) && !rpt->stop && !thd->killed)
mysql_cond_wait(&rpt->COND_rpl_thread, &rpt->LOCK_rpl_thread);
/* Mark that this thread is now executing */
rpt->free= false;
rpt->event_queue= rpt->last_in_queue= NULL;
thd->exit_cond(old_msg);
@ -145,9 +146,15 @@ handle_rpl_parallel_thread(void *arg)
uint64 wait_start_sub_id;
bool end_of_group;
/* Handle a new event group, which will be initiated by a GTID event. */
if (event_type == GTID_EVENT)
{
in_event_group= true;
/*
If the standalone flag is set, then this event group consists of a
single statement (possibly preceeded by some Intvar_log_event and
similar), without any terminating COMMIT/ROLLBACK/XID.
*/
group_standalone=
(0 != (static_cast<Gtid_log_event *>(events->ev)->flags2 &
Gtid_log_event::FL_STANDALONE));
@ -540,12 +547,12 @@ rpl_parallel::wait_for_done()
bool
rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
{
rpl_parallel_entry *e;
rpl_parallel_thread *cur_thread;
rpl_parallel_thread::queued_event *qev;
struct rpl_group_info *rgi;
rpl_group_info *rgi;
Relay_log_info *rli= serial_rgi->rli;
enum Log_event_type typ;

View File

@ -23,7 +23,7 @@ struct rpl_parallel_thread {
struct queued_event {
queued_event *next;
Log_event *ev;
struct rpl_group_info *rgi;
rpl_group_info *rgi;
} *event_queue, *last_in_queue;
};
@ -59,7 +59,7 @@ struct rpl_parallel_entry {
mysql_mutex_t LOCK_parallel_entry;
mysql_cond_t COND_parallel_entry;
uint64 current_sub_id;
struct rpl_group_info *current_group_info;
rpl_group_info *current_group_info;
/*
The sub_id of the last event group in the previous batch of group-committed
transactions.
@ -78,7 +78,7 @@ struct rpl_parallel {
~rpl_parallel();
rpl_parallel_entry *find(uint32 domain_id);
void wait_for_done();
bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev);
bool do_event(rpl_group_info *serial_rgi, Log_event *ev);
};

View File

@ -1193,7 +1193,7 @@ bool Relay_log_info::cached_charset_compare(char *charset) const
void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
time_t event_creation_time, THD *thd,
struct rpl_group_info *rgi)
rpl_group_info *rgi)
{
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
@ -1265,6 +1265,11 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
/*
In parallel replication, different THDs can be used from different
parallel threads. But in single-threaded mode, only the THD of the main
SQL thread is allowed.
*/
DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd);
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
@ -1552,6 +1557,7 @@ event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
uint64 sub_id= rpl_global_gtid_slave_state.next_subid(gev->domain_id);
if (!sub_id)
{
/* Out of memory caused hash insertion to fail. */
return 1;
}
rgi->gtid_sub_id= sub_id;

View File

@ -422,7 +422,7 @@ public:
*/
void stmt_done(my_off_t event_log_pos,
time_t event_creation_time, THD *thd,
struct rpl_group_info *rgi);
rpl_group_info *rgi);
/**
@ -521,10 +521,14 @@ private:
/*
This is data for various state needed to be kept for the processing of
one event group in the SQL thread.
one event group (transaction) during replication.
For single-threaded replication it is linked from the RLI, for parallel
replication it is linked into each event group being executed in parallel.
In single-threaded replication, there will be one global rpl_group_info and
one global Relay_log_info per master connection. They will be linked
together.
In parallel replication, there will be one rpl_group_info object for
each running thd. All rpl_group_info will share the same Relay_log_info.
*/
struct rpl_group_info
{
@ -555,7 +559,7 @@ struct rpl_group_info
for the wrong commit).
*/
uint64 wait_commit_sub_id;
struct rpl_group_info *wait_commit_group_info;
rpl_group_info *wait_commit_group_info;
/*
If non-zero, the event group must wait for this sub_id to be committed
before the execution of the event group is allowed to start.

View File

@ -1143,7 +1143,7 @@ bool Deferred_log_events::is_empty()
return array.elements == 0;
}
bool Deferred_log_events::execute(struct rpl_group_info *rgi)
bool Deferred_log_events::execute(rpl_group_info *rgi)
{
bool res= false;

View File

@ -3019,7 +3019,7 @@ static int has_temporary_error(THD *thd)
ev->update_pos().
*/
int apply_event_and_update_pos(Log_event* ev, THD* thd,
struct rpl_group_info *rgi,
rpl_group_info *rgi,
rpl_parallel_thread *rpt)
{
int exec_res= 0;

View File

@ -80,7 +80,7 @@ void mysql_client_binlog_statement(THD* thd)
my_bool have_fd_event= TRUE;
int err;
Relay_log_info *rli;
struct rpl_group_info *rgi;
rpl_group_info *rgi;
rli= thd->rli_fake;
if (!rli)

View File

@ -5666,6 +5666,10 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
waiting_for_commit= false;
else
{
/*
Put ourself at the head of the waitee's list of transactions that must
wait for it to commit first.
*/
this->next_subsequent_commit= waitee->subsequent_commits_list;
waitee->subsequent_commits_list= this;
}
@ -5704,7 +5708,7 @@ wait_for_commit::wait_for_prior_commit2()
The waiter needs to lock the waitee to delete itself from the list in
unregister_wait_for_prior_commit(). Thus wakeup_subsequent_commits() can not
hold its own lock while locking waiters, lest we deadlock.
hold its own lock while locking waiters, as this could lead to deadlock.
So we need to prevent unregister_wait_for_prior_commit() running while wakeup
is in progress - otherwise the unregister could complete before the wakeup,
@ -5727,6 +5731,7 @@ wait_for_commit::wait_for_prior_commit2()
would not be woken up until next wakeup, which could be potentially much
later than necessary.
*/
void
wait_for_commit::wakeup_subsequent_commits2()
{

View File

@ -1615,7 +1615,7 @@ struct wait_for_commit
*/
bool waiting_for_commit;
/*
Flag set when wakeup_subsequent_commits_running() is active, see commonts
Flag set when wakeup_subsequent_commits_running() is active, see comments
on that function for details.
*/
bool wakeup_subsequent_commits_running;