MDEV-20065 parallel replication for galera slave

When replicating transactions from parallel slave replication
processing, Galera must respect the commit order of the parallel
slave replication. In the current implementation this is done by
calling `wait_for_prior_commit()` before the write set is
replicated and certified in before-prepare processing. This
however establishes a critical section which is held over
whole Galera replication step, and the commit rate will be
limited by Galera replication latency.

In order to allow concurrency in Galera replication step, the
critical section must be released at earliest point where Galera
can guarantee sequential consistency for replicated write sets.
This change passes a callback to release the critical section
by calling `wakeup_subsequent_commits()` to Galera library, which will
call the callback once the correct replication order can be established.
This functionality will be available from Galera 26.4.22 onwards.

Note that call to `wakeup_subsequent_commits()` at this stage is
safe from group commit point of view as Galera uses separate
`wait_for_commit` context to control commit ordering.

Signed-off-by: Julius Goryavsky <julius.goryavsky@mariadb.com>
This commit is contained in:
Teemu Ollakka 2024-11-21 14:31:04 +02:00 committed by Sergei Golubchik
parent 2d77e21758
commit 612c653d6a
2 changed files with 17 additions and 3 deletions

View File

@ -3997,6 +3997,16 @@ bool THD::wsrep_parallel_slave_wait_for_prior_commit()
return false;
}
void wsrep_parallel_slave_wakeup_subsequent_commits(void *thd_ptr)
{
THD *thd = (THD*)thd_ptr;
if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec &&
thd->wait_for_commit_ptr)
{
thd->wait_for_commit_ptr->wakeup_subsequent_commits(0);
}
}
/***** callbacks for wsrep service ************/
my_bool get_wsrep_recovery()

View File

@ -27,6 +27,7 @@
class THD;
void wsrep_commit_empty(THD* thd, bool all);
void wsrep_parallel_slave_wakeup_subsequent_commits(void *);
/*
Return true if THD has active wsrep transaction.
@ -265,8 +266,9 @@ static inline int wsrep_before_prepare(THD* thd, bool all)
{
DBUG_RETURN(ret);
}
if ((ret= thd->wsrep_cs().before_prepare()) == 0)
wsrep::provider::seq_cb_t seq_cb{
thd, wsrep_parallel_slave_wakeup_subsequent_commits};
if ((ret= thd->wsrep_cs().before_prepare(&seq_cb)) == 0)
{
DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
/* Here we init xid with UUID and wsrep seqno. GTID is
@ -319,7 +321,9 @@ static inline int wsrep_before_commit(THD* thd, bool all)
int ret= 0;
DBUG_ASSERT(wsrep_run_commit_hook(thd, all));
if ((ret= thd->wsrep_cs().before_commit()) == 0)
wsrep::provider::seq_cb_t seq_cb{
thd, wsrep_parallel_slave_wakeup_subsequent_commits};
if ((ret= thd->wsrep_cs().before_commit(&seq_cb)) == 0)
{
DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
if (!thd->variables.gtid_seq_no &&