From 612c653d6afdf91c0ba4d3ce93c35cfef1ac8f33 Mon Sep 17 00:00:00 2001 From: Teemu Ollakka Date: Thu, 21 Nov 2024 14:31:04 +0200 Subject: [PATCH] MDEV-20065 parallel replication for galera slave When replicating transactions from parallel slave replication processing, Galera must respect the commit order of the parallel slave replication. In the current implementation this is done by calling `wait_for_prior_commit()` before the write set is replicated and certified in before-prepare processing. This however establishes a critical section which is held over whole Galera replication step, and the commit rate will be limited by Galera replication latency. In order to allow concurrency in Galera replication step, the critical section must be released at earliest point where Galera can guarantee sequential consistency for replicated write sets. This change passes a callback to release the critical section by calling `wakeup_subsequent_commits()` to Galera library, which will call the callback once the correct replication order can be established. This functionality will be available from Galera 26.4.22 onwards. Note that call to `wakeup_subsequent_commits()` at this stage is safe from group commit point of view as Galera uses separate `wait_for_commit` context to control commit ordering. Signed-off-by: Julius Goryavsky --- sql/wsrep_mysqld.cc | 10 ++++++++++ sql/wsrep_trans_observer.h | 10 +++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index b7674568939..d14c069dc1d 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -3997,6 +3997,16 @@ bool THD::wsrep_parallel_slave_wait_for_prior_commit() return false; } +void wsrep_parallel_slave_wakeup_subsequent_commits(void *thd_ptr) +{ + THD *thd = (THD*)thd_ptr; + if (thd->rgi_slave && thd->rgi_slave->is_parallel_exec && + thd->wait_for_commit_ptr) + { + thd->wait_for_commit_ptr->wakeup_subsequent_commits(0); + } +} + /***** callbacks for wsrep service ************/ my_bool get_wsrep_recovery() diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index 99c7a30f88a..c09c65f4228 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -27,6 +27,7 @@ class THD; void wsrep_commit_empty(THD* thd, bool all); +void wsrep_parallel_slave_wakeup_subsequent_commits(void *); /* Return true if THD has active wsrep transaction. @@ -265,8 +266,9 @@ static inline int wsrep_before_prepare(THD* thd, bool all) { DBUG_RETURN(ret); } - - if ((ret= thd->wsrep_cs().before_prepare()) == 0) + wsrep::provider::seq_cb_t seq_cb{ + thd, wsrep_parallel_slave_wakeup_subsequent_commits}; + if ((ret= thd->wsrep_cs().before_prepare(&seq_cb)) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); /* Here we init xid with UUID and wsrep seqno. GTID is @@ -319,7 +321,9 @@ static inline int wsrep_before_commit(THD* thd, bool all) int ret= 0; DBUG_ASSERT(wsrep_run_commit_hook(thd, all)); - if ((ret= thd->wsrep_cs().before_commit()) == 0) + wsrep::provider::seq_cb_t seq_cb{ + thd, wsrep_parallel_slave_wakeup_subsequent_commits}; + if ((ret= thd->wsrep_cs().before_commit(&seq_cb)) == 0) { DBUG_ASSERT(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); if (!thd->variables.gtid_seq_no &&