MDEV-4506: Parallel replication: intermediate commit.
Fix a bunch of issues found with locking, ordering, and non-thread-safe stuff in Relay_log_info. Now able to do a simple benchmark, showing 4.5 times speedup for applying a binlog with 10000 REPLACE statements.
This commit is contained in:
parent
e654be3865
commit
a99356fbe7
@ -408,7 +408,7 @@ private:
|
|||||||
|
|
||||||
class binlog_cache_mngr;
|
class binlog_cache_mngr;
|
||||||
struct rpl_gtid;
|
struct rpl_gtid;
|
||||||
class wait_for_commit;
|
struct wait_for_commit;
|
||||||
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
|
class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
|
@ -9101,7 +9101,7 @@ int Rows_log_event::do_apply_event(struct rpl_group_info *rgi)
|
|||||||
do_apply_event(). We still check here to prevent future coding
|
do_apply_event(). We still check here to prevent future coding
|
||||||
errors.
|
errors.
|
||||||
*/
|
*/
|
||||||
DBUG_ASSERT(rli->sql_thd == thd);
|
DBUG_ASSERT(rgi->thd == thd);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If there is no locks taken, this is the first binrow event seen
|
If there is no locks taken, this is the first binrow event seen
|
||||||
|
@ -68,7 +68,7 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, struct rpl_group_info
|
|||||||
do_apply_event(). We still check here to prevent future coding
|
do_apply_event(). We still check here to prevent future coding
|
||||||
errors.
|
errors.
|
||||||
*/
|
*/
|
||||||
DBUG_ASSERT(rli->sql_thd == ev_thd);
|
DBUG_ASSERT(rgi->thd == ev_thd);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If there is no locks taken, this is the first binrow event seen
|
If there is no locks taken, this is the first binrow event seen
|
||||||
@ -1481,7 +1481,7 @@ int Old_rows_log_event::do_apply_event(struct rpl_group_info *rgi)
|
|||||||
do_apply_event(). We still check here to prevent future coding
|
do_apply_event(). We still check here to prevent future coding
|
||||||
errors.
|
errors.
|
||||||
*/
|
*/
|
||||||
DBUG_ASSERT(rli->sql_thd == thd);
|
DBUG_ASSERT(rgi->thd == thd);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
If there is no locks taken, this is the first binrow event seen
|
If there is no locks taken, this is the first binrow event seen
|
||||||
|
@ -38,15 +38,16 @@
|
|||||||
everything needs to be correctly rolled back and stopped in all threads,
|
everything needs to be correctly rolled back and stopped in all threads,
|
||||||
to ensure a consistent slave replication state.
|
to ensure a consistent slave replication state.
|
||||||
|
|
||||||
- We need some knob on the master to allow the user to deliberately delay
|
|
||||||
commits waiting for more transactions to join group commit, to increase
|
|
||||||
potential for parallel execution on the slave.
|
|
||||||
|
|
||||||
- Handle the case of a partial event group. This occurs when the master
|
- Handle the case of a partial event group. This occurs when the master
|
||||||
crashes in the middle of writing the event group to the binlog. The
|
crashes in the middle of writing the event group to the binlog. The
|
||||||
slave rolls back the transaction; parallel execution needs to be able
|
slave rolls back the transaction; parallel execution needs to be able
|
||||||
to deal with this wrt. commit_orderer and such.
|
to deal with this wrt. commit_orderer and such.
|
||||||
|
|
||||||
|
- Relay_log_info::is_in_group(). This needs to be handled correctly in all
|
||||||
|
callers. I think it needs to be split into two, one version in
|
||||||
|
Relay_log_info to be used from next_event() in slave.cc, one to be used in
|
||||||
|
per-transaction stuff.
|
||||||
|
|
||||||
- We should fail if we connect to the master with opt_slave_parallel_threads
|
- We should fail if we connect to the master with opt_slave_parallel_threads
|
||||||
greater than zero and master does not support GTID. Just to avoid a bunch
|
greater than zero and master does not support GTID. Just to avoid a bunch
|
||||||
of potential problems, we won't be able to do any parallel replication
|
of potential problems, we won't be able to do any parallel replication
|
||||||
@ -58,12 +59,12 @@ struct rpl_parallel_thread_pool global_rpl_thread_pool;
|
|||||||
|
|
||||||
static void
|
static void
|
||||||
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
rpt_handle_event(rpl_parallel_thread::queued_event *qev,
|
||||||
THD *thd,
|
|
||||||
struct rpl_parallel_thread *rpt)
|
struct rpl_parallel_thread *rpt)
|
||||||
{
|
{
|
||||||
int err;
|
int err;
|
||||||
struct rpl_group_info *rgi= qev->rgi;
|
struct rpl_group_info *rgi= qev->rgi;
|
||||||
Relay_log_info *rli= rgi->rli;
|
Relay_log_info *rli= rgi->rli;
|
||||||
|
THD *thd= rgi->thd;
|
||||||
|
|
||||||
thd->rli_slave= rli;
|
thd->rli_slave= rli;
|
||||||
thd->rpl_filter = rli->mi->rpl_filter;
|
thd->rpl_filter = rli->mi->rpl_filter;
|
||||||
@ -143,6 +144,7 @@ handle_rpl_parallel_thread(void *arg)
|
|||||||
rpl_group_info *rgi= events->rgi;
|
rpl_group_info *rgi= events->rgi;
|
||||||
rpl_parallel_entry *entry= rgi->parallel_entry;
|
rpl_parallel_entry *entry= rgi->parallel_entry;
|
||||||
uint64 wait_for_sub_id;
|
uint64 wait_for_sub_id;
|
||||||
|
uint64 wait_start_sub_id;
|
||||||
bool end_of_group;
|
bool end_of_group;
|
||||||
|
|
||||||
if (event_type == GTID_EVENT)
|
if (event_type == GTID_EVENT)
|
||||||
@ -155,14 +157,28 @@ handle_rpl_parallel_thread(void *arg)
|
|||||||
/* Save this, as it gets cleared once event group commits. */
|
/* Save this, as it gets cleared once event group commits. */
|
||||||
event_gtid_sub_id= rgi->gtid_sub_id;
|
event_gtid_sub_id= rgi->gtid_sub_id;
|
||||||
|
|
||||||
|
rgi->thd= thd;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Register ourself to wait for the previous commit, if we need to do
|
Register ourself to wait for the previous commit, if we need to do
|
||||||
such registration _and_ that previous commit has not already
|
such registration _and_ that previous commit has not already
|
||||||
occured.
|
occured.
|
||||||
|
|
||||||
|
Also do not start parallel execution of this event group until all
|
||||||
|
prior groups have committed that are not safe to run in parallel with.
|
||||||
*/
|
*/
|
||||||
if ((wait_for_sub_id= rgi->wait_commit_sub_id))
|
wait_for_sub_id= rgi->wait_commit_sub_id;
|
||||||
|
wait_start_sub_id= rgi->wait_start_sub_id;
|
||||||
|
if (wait_for_sub_id || wait_start_sub_id)
|
||||||
{
|
{
|
||||||
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
mysql_mutex_lock(&entry->LOCK_parallel_entry);
|
||||||
|
if (wait_start_sub_id)
|
||||||
|
{
|
||||||
|
while (wait_start_sub_id > entry->last_committed_sub_id)
|
||||||
|
mysql_cond_wait(&entry->COND_parallel_entry,
|
||||||
|
&entry->LOCK_parallel_entry);
|
||||||
|
}
|
||||||
|
rgi->wait_start_sub_id= 0; /* No need to check again. */
|
||||||
if (wait_for_sub_id > entry->last_committed_sub_id)
|
if (wait_for_sub_id > entry->last_committed_sub_id)
|
||||||
{
|
{
|
||||||
wait_for_commit *waitee=
|
wait_for_commit *waitee=
|
||||||
@ -176,7 +192,7 @@ handle_rpl_parallel_thread(void *arg)
|
|||||||
thd->wait_for_commit_ptr= &rgi->commit_orderer;
|
thd->wait_for_commit_ptr= &rgi->commit_orderer;
|
||||||
}
|
}
|
||||||
|
|
||||||
rpt_handle_event(events, thd, rpt);
|
rpt_handle_event(events, rpt);
|
||||||
|
|
||||||
end_of_group=
|
end_of_group=
|
||||||
in_event_group &&
|
in_event_group &&
|
||||||
@ -376,6 +392,7 @@ err:
|
|||||||
while (new_free_list->running)
|
while (new_free_list->running)
|
||||||
mysql_cond_wait(&new_free_list->COND_rpl_thread,
|
mysql_cond_wait(&new_free_list->COND_rpl_thread,
|
||||||
&new_free_list->LOCK_rpl_thread);
|
&new_free_list->LOCK_rpl_thread);
|
||||||
|
mysql_mutex_unlock(&new_free_list->LOCK_rpl_thread);
|
||||||
my_free(new_free_list);
|
my_free(new_free_list);
|
||||||
new_free_list= next;
|
new_free_list= next;
|
||||||
}
|
}
|
||||||
@ -503,8 +520,7 @@ rpl_parallel::wait_for_done()
|
|||||||
|
|
||||||
|
|
||||||
bool
|
bool
|
||||||
rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
|
rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev)
|
||||||
THD *parent_thd)
|
|
||||||
{
|
{
|
||||||
rpl_parallel_entry *e;
|
rpl_parallel_entry *e;
|
||||||
rpl_parallel_thread *cur_thread;
|
rpl_parallel_thread *cur_thread;
|
||||||
@ -530,13 +546,34 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
|
|||||||
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
|
||||||
|
|
||||||
if (!(e= find(gtid_ev->domain_id)) ||
|
if (!(e= find(gtid_ev->domain_id)) ||
|
||||||
!(e->current_group_info= rgi= new rpl_group_info(rli)) ||
|
!(rgi= new rpl_group_info(rli)) ||
|
||||||
event_group_new_gtid(rgi, gtid_ev))
|
event_group_new_gtid(rgi, gtid_ev))
|
||||||
{
|
{
|
||||||
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
my_error(ER_OUT_OF_RESOURCES, MYF(MY_WME));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
|
||||||
|
e->last_commit_id == gtid_ev->commit_id)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
We are already executing something else in this domain. But the two
|
||||||
|
event groups were committed together in the same group commit on the
|
||||||
|
master, so we can still do them in parallel here on the slave.
|
||||||
|
|
||||||
|
However, the commit of this event must wait for the commit of the prior
|
||||||
|
event, to preserve binlog commit order and visibility across all
|
||||||
|
servers in the replication hierarchy.
|
||||||
|
*/
|
||||||
|
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
|
||||||
|
rgi->wait_commit_sub_id= e->current_sub_id;
|
||||||
|
rgi->wait_commit_group_info= e->current_group_info;
|
||||||
|
rgi->wait_start_sub_id= e->prev_groupcommit_sub_id;
|
||||||
|
e->rpl_thread= cur_thread= rpt;
|
||||||
|
/* get_thread() returns with the LOCK_rpl_thread locked. */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
/* Check if we already have a worker thread for this entry. */
|
/* Check if we already have a worker thread for this entry. */
|
||||||
cur_thread= e->rpl_thread;
|
cur_thread= e->rpl_thread;
|
||||||
if (cur_thread)
|
if (cur_thread)
|
||||||
@ -557,7 +594,24 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
|
|||||||
thread to do this event group in parallel with anything else that might
|
thread to do this event group in parallel with anything else that might
|
||||||
be running in other domains.
|
be running in other domains.
|
||||||
*/
|
*/
|
||||||
if (gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID)
|
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
|
||||||
|
/* get_thread() returns with the LOCK_rpl_thread locked. */
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
We are still executing the previous event group for this replication
|
||||||
|
domain, and we have to wait for that to finish before we can start on
|
||||||
|
the next one. So just re-use the thread.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
rgi->wait_commit_sub_id= 0;
|
||||||
|
rgi->wait_start_sub_id= 0;
|
||||||
|
e->prev_groupcommit_sub_id= e->current_sub_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (gtid_ev->flags2 & Gtid_log_event::FL_GROUP_COMMIT_ID)
|
||||||
{
|
{
|
||||||
e->last_server_id= gtid_ev->server_id;
|
e->last_server_id= gtid_ev->server_id;
|
||||||
e->last_seq_no= gtid_ev->seq_no;
|
e->last_seq_no= gtid_ev->seq_no;
|
||||||
@ -569,38 +623,8 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
|
|||||||
e->last_seq_no= 0;
|
e->last_seq_no= 0;
|
||||||
e->last_commit_id= 0;
|
e->last_commit_id= 0;
|
||||||
}
|
}
|
||||||
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
|
|
||||||
rgi->wait_commit_sub_id= 0;
|
|
||||||
/* get_thread() returns with the LOCK_rpl_thread locked. */
|
|
||||||
}
|
|
||||||
else if ((gtid_ev->flags & Gtid_log_event::FL_GROUP_COMMIT_ID) &&
|
|
||||||
e->last_commit_id == gtid_ev->commit_id)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
We are already executing something else in this domain. But the two
|
|
||||||
event groups were committed together in the same group commit on the
|
|
||||||
master, so we can still do them in parallel here on the slave.
|
|
||||||
|
|
||||||
However, the commit of this event must wait for the commit of the prior
|
|
||||||
event, to preserve binlog commit order and visibility across all
|
|
||||||
servers in the replication hierarchy.
|
|
||||||
*/
|
|
||||||
rpl_parallel_thread *rpt= global_rpl_thread_pool.get_thread(e);
|
|
||||||
rgi->wait_commit_sub_id= e->current_sub_id;
|
|
||||||
rgi->wait_commit_group_info= e->current_group_info;
|
|
||||||
e->rpl_thread= cur_thread= rpt;
|
|
||||||
/* get_thread() returns with the LOCK_rpl_thread locked. */
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
We are still executing the previous event group for this replication
|
|
||||||
domain, and we have to wait for that to finish before we can start on
|
|
||||||
the next one. So just re-use the thread.
|
|
||||||
*/
|
|
||||||
rgi->wait_commit_sub_id= 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
e->current_group_info= rgi;
|
||||||
e->current_sub_id= rgi->gtid_sub_id;
|
e->current_sub_id= rgi->gtid_sub_id;
|
||||||
current= rgi->parallel_entry= e;
|
current= rgi->parallel_entry= e;
|
||||||
}
|
}
|
||||||
@ -612,7 +636,7 @@ rpl_parallel::do_event(struct rpl_group_info *serial_rgi, Log_event *ev,
|
|||||||
but they might be from an old master).
|
but they might be from an old master).
|
||||||
*/
|
*/
|
||||||
qev->rgi= serial_rgi;
|
qev->rgi= serial_rgi;
|
||||||
rpt_handle_event(qev, parent_thd, NULL);
|
rpt_handle_event(qev, NULL);
|
||||||
delete_or_keep_event_post_apply(rli, typ, qev->ev);
|
delete_or_keep_event_post_apply(rli, typ, qev->ev);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
@ -60,6 +60,15 @@ struct rpl_parallel_entry {
|
|||||||
mysql_cond_t COND_parallel_entry;
|
mysql_cond_t COND_parallel_entry;
|
||||||
uint64 current_sub_id;
|
uint64 current_sub_id;
|
||||||
struct rpl_group_info *current_group_info;
|
struct rpl_group_info *current_group_info;
|
||||||
|
/*
|
||||||
|
The sub_id of the last event group in the previous batch of group-committed
|
||||||
|
transactions.
|
||||||
|
|
||||||
|
When we spawn parallel worker threads for the next group-committed batch,
|
||||||
|
they first need to wait for this sub_id to be committed before it is safe
|
||||||
|
to start executing them.
|
||||||
|
*/
|
||||||
|
uint64 prev_groupcommit_sub_id;
|
||||||
};
|
};
|
||||||
struct rpl_parallel {
|
struct rpl_parallel {
|
||||||
HASH domain_hash;
|
HASH domain_hash;
|
||||||
@ -69,7 +78,7 @@ struct rpl_parallel {
|
|||||||
~rpl_parallel();
|
~rpl_parallel();
|
||||||
rpl_parallel_entry *find(uint32 domain_id);
|
rpl_parallel_entry *find(uint32 domain_id);
|
||||||
void wait_for_done();
|
void wait_for_done();
|
||||||
bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev, THD *thd);
|
bool do_event(struct rpl_group_info *serial_rgi, Log_event *ev);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -1226,7 +1226,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
|
|||||||
middle of the "transaction". START SLAVE will resume at BEGIN
|
middle of the "transaction". START SLAVE will resume at BEGIN
|
||||||
while the MyISAM table has already been updated.
|
while the MyISAM table has already been updated.
|
||||||
*/
|
*/
|
||||||
if ((sql_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
|
if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
|
||||||
inc_event_relay_log_pos();
|
inc_event_relay_log_pos();
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1267,7 +1267,7 @@ void Relay_log_info::cleanup_context(THD *thd, bool error)
|
|||||||
{
|
{
|
||||||
DBUG_ENTER("Relay_log_info::cleanup_context");
|
DBUG_ENTER("Relay_log_info::cleanup_context");
|
||||||
|
|
||||||
DBUG_ASSERT(sql_thd == thd);
|
DBUG_ASSERT(opt_slave_parallel_threads > 0 || sql_thd == thd);
|
||||||
/*
|
/*
|
||||||
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
|
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
|
||||||
may have opened tables, which we cannot be sure have been closed (because
|
may have opened tables, which we cannot be sure have been closed (because
|
||||||
@ -1534,8 +1534,8 @@ end:
|
|||||||
|
|
||||||
|
|
||||||
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
|
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
|
||||||
: rli(rli_), gtid_sub_id(0), wait_commit_sub_id(0), wait_commit_group_info(0),
|
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
|
||||||
parallel_entry(0)
|
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0)
|
||||||
{
|
{
|
||||||
bzero(¤t_gtid, sizeof(current_gtid));
|
bzero(¤t_gtid, sizeof(current_gtid));
|
||||||
}
|
}
|
||||||
|
@ -604,6 +604,7 @@ private:
|
|||||||
struct rpl_group_info
|
struct rpl_group_info
|
||||||
{
|
{
|
||||||
Relay_log_info *rli;
|
Relay_log_info *rli;
|
||||||
|
THD *thd;
|
||||||
/*
|
/*
|
||||||
Current GTID being processed.
|
Current GTID being processed.
|
||||||
The sub_id gives the binlog order within one domain_id. A zero sub_id
|
The sub_id gives the binlog order within one domain_id. A zero sub_id
|
||||||
@ -630,10 +631,19 @@ struct rpl_group_info
|
|||||||
*/
|
*/
|
||||||
uint64 wait_commit_sub_id;
|
uint64 wait_commit_sub_id;
|
||||||
struct rpl_group_info *wait_commit_group_info;
|
struct rpl_group_info *wait_commit_group_info;
|
||||||
|
/*
|
||||||
|
If non-zero, the event group must wait for this sub_id to be committed
|
||||||
|
before the execution of the event group is allowed to start.
|
||||||
|
|
||||||
|
(When we execute in parallel the transactions that group committed
|
||||||
|
together on the master, we still need to wait for any prior transactions
|
||||||
|
to have commtted).
|
||||||
|
*/
|
||||||
|
uint64 wait_start_sub_id;
|
||||||
|
|
||||||
struct rpl_parallel_entry *parallel_entry;
|
struct rpl_parallel_entry *parallel_entry;
|
||||||
|
|
||||||
rpl_group_info(Relay_log_info *rli);
|
rpl_group_info(Relay_log_info *rli_);
|
||||||
~rpl_group_info() { };
|
~rpl_group_info() { };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -3246,7 +3246,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (opt_slave_parallel_threads > 0)
|
if (opt_slave_parallel_threads > 0)
|
||||||
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev, thd));
|
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
For GTID, allocate a new sub_id for the given domain_id.
|
For GTID, allocate a new sub_id for the given domain_id.
|
||||||
@ -3995,6 +3995,7 @@ pthread_handler_t handle_slave_sql(void *arg)
|
|||||||
thd = new THD; // note that contructor of THD uses DBUG_ !
|
thd = new THD; // note that contructor of THD uses DBUG_ !
|
||||||
thd->thread_stack = (char*)&thd; // remember where our stack is
|
thd->thread_stack = (char*)&thd; // remember where our stack is
|
||||||
thd->rpl_filter = mi->rpl_filter;
|
thd->rpl_filter = mi->rpl_filter;
|
||||||
|
serial_rgi.thd= thd;
|
||||||
|
|
||||||
DBUG_ASSERT(rli->inited);
|
DBUG_ASSERT(rli->inited);
|
||||||
DBUG_ASSERT(rli->mi == mi);
|
DBUG_ASSERT(rli->mi == mi);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user