Merge MDEV-5657 (parallel replication) to 10.0
This commit is contained in:
commit
20959fa09c
@ -867,6 +867,14 @@ The following options may be given as the first argument:
|
||||
operations that are idempotent. This means that CREATE
|
||||
TABLE is treated CREATE TABLE OR REPLACE and DROP TABLE
|
||||
is threated as DROP TABLE IF EXISTS.
|
||||
--slave-domain-parallel-threads=#
|
||||
Maximum number of parallel threads to use on slave for
|
||||
events in a single replication domain. When using
|
||||
multiple domains, this can be used to limit a single
|
||||
domain from grabbing all threads and thus stalling other
|
||||
domains. The default of 0 means to allow a domain to grab
|
||||
as many threads as it wants, up to the value of
|
||||
slave_parallel_threads.
|
||||
--slave-exec-mode=name
|
||||
Modes for how replication events should be executed.
|
||||
Legal values are STRICT (default) and IDEMPOTENT. In
|
||||
@ -1275,6 +1283,7 @@ skip-show-database FALSE
|
||||
skip-slave-start FALSE
|
||||
slave-compressed-protocol FALSE
|
||||
slave-ddl-exec-mode IDEMPOTENT
|
||||
slave-domain-parallel-threads 0
|
||||
slave-exec-mode STRICT
|
||||
slave-max-allowed-packet 1073741824
|
||||
slave-net-timeout 3600
|
||||
|
@ -37,15 +37,15 @@ where name like 'Wait/Synch/Cond/sql/%'
|
||||
order by name limit 10;
|
||||
NAME ENABLED TIMED
|
||||
wait/synch/cond/sql/COND_flush_thread_cache YES YES
|
||||
wait/synch/cond/sql/COND_group_commit_orderer YES YES
|
||||
wait/synch/cond/sql/COND_manager YES YES
|
||||
wait/synch/cond/sql/COND_parallel_entry YES YES
|
||||
wait/synch/cond/sql/COND_prepare_ordered YES YES
|
||||
wait/synch/cond/sql/COND_queue_state YES YES
|
||||
wait/synch/cond/sql/COND_rpl_thread YES YES
|
||||
wait/synch/cond/sql/COND_rpl_thread_pool YES YES
|
||||
wait/synch/cond/sql/COND_rpl_thread_queue YES YES
|
||||
wait/synch/cond/sql/COND_server_started YES YES
|
||||
wait/synch/cond/sql/COND_thread_cache YES YES
|
||||
wait/synch/cond/sql/COND_thread_count YES YES
|
||||
select * from performance_schema.setup_instruments
|
||||
where name='Wait';
|
||||
select * from performance_schema.setup_instruments
|
||||
|
@ -115,6 +115,7 @@ SET GLOBAL slave_parallel_threads=10;
|
||||
SET debug_sync='RESET';
|
||||
include/start_slave.inc
|
||||
*** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
|
||||
SET debug_sync='RESET';
|
||||
FLUSH LOGS;
|
||||
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
|
||||
INSERT INTO t3 VALUES (1,1), (3,3), (5,5), (7,7);
|
||||
@ -141,6 +142,7 @@ INSERT INTO t3 VALUES (6, foo(16,
|
||||
''));
|
||||
SET debug_sync='now WAIT_FOR master_queued3';
|
||||
SET debug_sync='now SIGNAL master_cont1';
|
||||
SET debug_sync='RESET';
|
||||
SELECT * FROM t3 ORDER BY a;
|
||||
a b
|
||||
1 1
|
||||
@ -213,6 +215,9 @@ slave-bin.000003 # Query # # use `test`; INSERT INTO t3 VALUES (6, foo(16,
|
||||
slave-bin.000003 # Xid # # COMMIT /* XID */
|
||||
*** Test STOP SLAVE in parallel mode ***
|
||||
include/stop_slave.inc
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET binlog_direct_non_transactional_updates=0;
|
||||
SET sql_log_bin=0;
|
||||
CALL mtr.add_suppression("Statement is unsafe because it accesses a non-transactional table after accessing a transactional table within the same transaction");
|
||||
@ -227,10 +232,15 @@ INSERT INTO t3 VALUES(21, 21);
|
||||
INSERT INTO t3 VALUES(22, 22);
|
||||
SET binlog_format=@old_format;
|
||||
BEGIN;
|
||||
INSERT INTO t2 VALUES (21);
|
||||
INSERT INTO t2 VALUES (21);
|
||||
START SLAVE;
|
||||
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
|
||||
STOP SLAVE;
|
||||
SET debug_sync='now WAIT_FOR wait_for_done_waiting';
|
||||
ROLLBACK;
|
||||
SET GLOBAL debug_dbug=@old_dbug;
|
||||
SET debug_sync='RESET';
|
||||
include/wait_for_slave_to_stop.inc
|
||||
SELECT * FROM t1 WHERE a >= 20 ORDER BY a;
|
||||
a
|
||||
@ -292,6 +302,7 @@ a b
|
||||
32 32
|
||||
33 33
|
||||
34 34
|
||||
SET debug_sync='RESET';
|
||||
SET sql_log_bin=0;
|
||||
CALL mtr.add_suppression("Query execution was interrupted");
|
||||
CALL mtr.add_suppression("Commit failed due to failure of an earlier commit on which this one depends");
|
||||
@ -308,6 +319,7 @@ STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
|
||||
a b
|
||||
31 31
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -379,6 +391,7 @@ a b
|
||||
42 42
|
||||
43 43
|
||||
44 44
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='now WAIT_FOR t2_query';
|
||||
SET debug_sync='now SIGNAL t2_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_ready';
|
||||
@ -386,9 +399,7 @@ KILL THD_ID;
|
||||
SET debug_sync='now WAIT_FOR t2_killed';
|
||||
SET debug_sync='now SIGNAL t1_cont';
|
||||
include/wait_for_slave_sql_error.inc [errno=1317,1964]
|
||||
SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
|
||||
a b
|
||||
41 41
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -463,6 +474,7 @@ a b
|
||||
52 52
|
||||
53 53
|
||||
54 54
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='now WAIT_FOR t2_query';
|
||||
SET debug_sync='now SIGNAL t2_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_ready';
|
||||
@ -473,6 +485,7 @@ include/wait_for_slave_sql_error.inc [errno=1317,1964]
|
||||
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
|
||||
a b
|
||||
51 51
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -514,14 +527,18 @@ include/start_slave.inc
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=3;
|
||||
SET GLOBAL slave_parallel_threads=4;
|
||||
include/start_slave.inc
|
||||
*** 4. Test killing thread that is waiting to start transaction until previous transaction commits ***
|
||||
SET binlog_format=statement;
|
||||
SET gtid_domain_id=2;
|
||||
BEGIN;
|
||||
INSERT INTO t3 VALUES (70, foo(70,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
|
||||
INSERT INTO t3 VALUES (60, foo(60,
|
||||
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
SET debug_sync='now WAIT_FOR d2_query';
|
||||
SET gtid_domain_id=1;
|
||||
@ -540,15 +557,27 @@ INSERT INTO t3 VALUES (63, foo(63,
|
||||
'ha_write_row_end SIGNAL d0_query WAIT_FOR d0_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d0_done WAIT_FOR d0_cont'));
|
||||
SET debug_sync='now WAIT_FOR d0_query';
|
||||
SET gtid_domain_id=3;
|
||||
BEGIN;
|
||||
INSERT INTO t3 VALUES (68, foo(68,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
|
||||
INSERT INTO t3 VALUES (69, foo(69,
|
||||
'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
SET debug_sync='now WAIT_FOR d3_query';
|
||||
SET debug_sync='now SIGNAL d2_cont2';
|
||||
SET debug_sync='now WAIT_FOR d2_done';
|
||||
SET debug_sync='now SIGNAL d1_cont2';
|
||||
SET debug_sync='now WAIT_FOR d1_done';
|
||||
SET debug_sync='now SIGNAL d0_cont2';
|
||||
SET debug_sync='now WAIT_FOR d0_done';
|
||||
SET debug_sync='now SIGNAL d3_cont2';
|
||||
SET debug_sync='now WAIT_FOR d3_done';
|
||||
SET binlog_format=statement;
|
||||
INSERT INTO t3 VALUES (64, foo(64,
|
||||
'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
|
||||
INSERT INTO t3 VALUES (65, foo(65, '', ''));
|
||||
SET debug_sync='now WAIT_FOR master_queued2';
|
||||
@ -569,23 +598,34 @@ a b
|
||||
65 65
|
||||
66 66
|
||||
67 67
|
||||
68 68
|
||||
69 69
|
||||
70 70
|
||||
SET debug_sync='RESET';
|
||||
SET debug_sync='now SIGNAL d0_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_waiting';
|
||||
SET debug_sync='now SIGNAL d3_cont';
|
||||
SET debug_sync='now WAIT_FOR t2_waiting';
|
||||
SET debug_sync='now SIGNAL d1_cont';
|
||||
SET debug_sync='now WAIT_FOR t3_waiting';
|
||||
SET debug_sync='now SIGNAL d2_cont';
|
||||
SET debug_sync='now WAIT_FOR t4_waiting';
|
||||
KILL THD_ID;
|
||||
SET debug_sync='now WAIT_FOR t3_killed';
|
||||
SET debug_sync='now SIGNAL t1_cont';
|
||||
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
|
||||
a b
|
||||
60 60
|
||||
61 61
|
||||
62 62
|
||||
63 63
|
||||
64 64
|
||||
68 68
|
||||
69 69
|
||||
70 70
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -597,11 +637,11 @@ RETURN x;
|
||||
END
|
||||
||
|
||||
SET sql_log_bin=1;
|
||||
INSERT INTO t3 VALUES (69,0);
|
||||
UPDATE t3 SET b=b+1 WHERE a=60;
|
||||
include/start_slave.inc
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
a b
|
||||
60 60
|
||||
60 61
|
||||
61 61
|
||||
62 62
|
||||
63 63
|
||||
@ -609,7 +649,9 @@ a b
|
||||
65 65
|
||||
66 66
|
||||
67 67
|
||||
69 0
|
||||
68 68
|
||||
69 69
|
||||
70 70
|
||||
SET sql_log_bin=0;
|
||||
DROP FUNCTION foo;
|
||||
CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
@ -634,37 +676,31 @@ include/start_slave.inc
|
||||
SET @old_max_queued= @@GLOBAL.slave_parallel_max_queued;
|
||||
SET GLOBAL slave_parallel_max_queued=9000;
|
||||
SET binlog_format=statement;
|
||||
INSERT INTO t3 VALUES (70, foo(0,
|
||||
INSERT INTO t3 VALUES (80, foo(0,
|
||||
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
|
||||
SET debug_sync='now WAIT_FOR query_waiting';
|
||||
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
|
||||
INSERT INTO t3 VALUES (72, 0);
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
|
||||
a b
|
||||
70 0
|
||||
71 10000
|
||||
72 0
|
||||
80 0
|
||||
81 10000
|
||||
SET debug_sync='now WAIT_FOR wait_queue_ready';
|
||||
KILL THD_ID;
|
||||
SET debug_sync='now WAIT_FOR wait_queue_killed';
|
||||
SET debug_sync='now SIGNAL query_cont';
|
||||
include/wait_for_slave_sql_error.inc [errno=1317,1927,1964]
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
a b
|
||||
70 0
|
||||
71 10000
|
||||
SET GLOBAL debug_dbug=@old_dbug;
|
||||
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
|
||||
INSERT INTO t3 VALUES (73,0);
|
||||
INSERT INTO t3 VALUES (82,0);
|
||||
SET debug_sync='RESET';
|
||||
include/start_slave.inc
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
|
||||
a b
|
||||
70 0
|
||||
71 10000
|
||||
72 0
|
||||
73 0
|
||||
80 0
|
||||
81 10000
|
||||
82 0
|
||||
include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
|
@ -163,6 +163,7 @@ SET debug_sync='RESET';
|
||||
|
||||
--echo *** Test that group-committed transactions on the master can replicate in parallel on the slave. ***
|
||||
--connection server_1
|
||||
SET debug_sync='RESET';
|
||||
FLUSH LOGS;
|
||||
--source include/wait_for_binlog_checkpoint.inc
|
||||
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
|
||||
@ -230,6 +231,7 @@ REAP;
|
||||
REAP;
|
||||
--connection con_temp5
|
||||
REAP;
|
||||
SET debug_sync='RESET';
|
||||
|
||||
--connection server_1
|
||||
SELECT * FROM t3 ORDER BY a;
|
||||
@ -270,6 +272,10 @@ SELECT * FROM t3 ORDER BY a;
|
||||
--echo *** Test STOP SLAVE in parallel mode ***
|
||||
--connection server_2
|
||||
--source include/stop_slave.inc
|
||||
# Respawn all worker threads to clear any left-over debug_sync or other stuff.
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
|
||||
--connection server_1
|
||||
# Set up a couple of transactions. The first will be blocked halfway
|
||||
@ -288,7 +294,7 @@ BEGIN;
|
||||
INSERT INTO t2 VALUES (20);
|
||||
--disable_warnings
|
||||
INSERT INTO t1 VALUES (20);
|
||||
--disable_warnings
|
||||
--enable_warnings
|
||||
INSERT INTO t2 VALUES (21);
|
||||
INSERT INTO t3 VALUES (20, 20);
|
||||
COMMIT;
|
||||
@ -300,7 +306,7 @@ SET binlog_format=@old_format;
|
||||
# Start a connection that will block the replicated transaction halfway.
|
||||
--connection con_temp1
|
||||
BEGIN;
|
||||
INSERT INTO t2 VALUES (21);
|
||||
INSERT INTO t2 VALUES (21);
|
||||
|
||||
--connection server_2
|
||||
START SLAVE;
|
||||
@ -312,13 +318,20 @@ START SLAVE;
|
||||
--connection con_temp2
|
||||
# Initiate slave stop. It will have to wait for the current event group
|
||||
# to complete.
|
||||
# The dbug injection causes debug_sync to signal 'wait_for_done_waiting'
|
||||
# when the SQL driver thread is ready.
|
||||
SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_for_done_trigger";
|
||||
send STOP SLAVE;
|
||||
|
||||
--connection con_temp1
|
||||
SET debug_sync='now WAIT_FOR wait_for_done_waiting';
|
||||
ROLLBACK;
|
||||
|
||||
--connection con_temp2
|
||||
reap;
|
||||
SET GLOBAL debug_dbug=@old_dbug;
|
||||
SET debug_sync='RESET';
|
||||
|
||||
--connection server_2
|
||||
--source include/wait_for_slave_to_stop.inc
|
||||
@ -397,6 +410,7 @@ REAP;
|
||||
|
||||
--connection server_1
|
||||
SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
|
||||
SET debug_sync='RESET';
|
||||
|
||||
--connection server_2
|
||||
SET sql_log_bin=0;
|
||||
@ -431,6 +445,7 @@ SELECT * FROM t3 WHERE a >= 30 ORDER BY a;
|
||||
|
||||
# Now we have to disable the debug_sync statements, so they do not trigger
|
||||
# when the events are retried.
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -535,6 +550,7 @@ REAP;
|
||||
|
||||
--connection server_1
|
||||
SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
|
||||
SET debug_sync='RESET';
|
||||
|
||||
--connection server_2
|
||||
# Wait until T2 is inside executing its insert of 42, then find it in SHOW
|
||||
@ -559,10 +575,10 @@ SET debug_sync='now SIGNAL t1_cont';
|
||||
|
||||
--let $slave_sql_errno= 1317,1964
|
||||
--source include/wait_for_slave_sql_error.inc
|
||||
SELECT * FROM t3 WHERE a >= 40 ORDER BY a;
|
||||
|
||||
# Now we have to disable the debug_sync statements, so they do not trigger
|
||||
# when the events are retried.
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -673,6 +689,7 @@ REAP;
|
||||
|
||||
--connection server_1
|
||||
SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
|
||||
SET debug_sync='RESET';
|
||||
|
||||
--connection server_2
|
||||
# Wait until T2 is inside executing its insert of 52, then find it in SHOW
|
||||
@ -701,6 +718,7 @@ SELECT * FROM t3 WHERE a >= 50 ORDER BY a;
|
||||
|
||||
# Now we have to disable the debug_sync statements, so they do not trigger
|
||||
# when the events are retried.
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -752,7 +770,7 @@ CHANGE MASTER TO master_use_gtid=slave_pos;
|
||||
--source include/stop_slave.inc
|
||||
SET GLOBAL binlog_format=@old_format;
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=3;
|
||||
SET GLOBAL slave_parallel_threads=4;
|
||||
--source include/start_slave.inc
|
||||
|
||||
|
||||
@ -762,24 +780,29 @@ SET GLOBAL slave_parallel_threads=3;
|
||||
# can run in parallel with each other (same group commit and commit id),
|
||||
# but not in parallel with T1.
|
||||
#
|
||||
# We use three worker threads. T1 and T2 will be queued on the first, T3 on
|
||||
# the second, and T4 on the third. We will delay T1 commit, T3 will wait for
|
||||
# T1 to commit before it can start. We will kill T3 during this wait, and
|
||||
# We use four worker threads, each Ti will be queued on each their own
|
||||
# worker thread. We will delay T1 commit, T3 will wait for T1 to begin
|
||||
# commit before it can start. We will kill T3 during this wait, and
|
||||
# check that everything works correctly.
|
||||
#
|
||||
# It is rather tricky to get the correct thread id of the worker to kill.
|
||||
# We start by injecting three dummy transactions in a debug_sync-controlled
|
||||
# We start by injecting four dummy transactions in a debug_sync-controlled
|
||||
# manner to be able to get known thread ids for the workers in a pool with
|
||||
# just 3 worker threads. Then we let in each of the real test transactions
|
||||
# just 4 worker threads. Then we let in each of the real test transactions
|
||||
# T1-T4 one at a time in a way which allows us to know which transaction
|
||||
# ends up with which thread id.
|
||||
|
||||
--connection server_1
|
||||
SET binlog_format=statement;
|
||||
SET gtid_domain_id=2;
|
||||
BEGIN;
|
||||
# This debug_sync will linger on and be used to control T4 later.
|
||||
INSERT INTO t3 VALUES (70, foo(70,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t4_waiting', ''));
|
||||
INSERT INTO t3 VALUES (60, foo(60,
|
||||
'ha_write_row_end SIGNAL d2_query WAIT_FOR d2_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d2_done WAIT_FOR d2_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
|
||||
--connection server_2
|
||||
@ -813,12 +836,30 @@ INSERT INTO t3 VALUES (63, foo(63,
|
||||
SET debug_sync='now WAIT_FOR d0_query';
|
||||
--let $d0_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(63%' AND INFO NOT LIKE '%LIKE%'`
|
||||
|
||||
--connection server_1
|
||||
SET gtid_domain_id=3;
|
||||
BEGIN;
|
||||
# These debug_sync's will linger on and be used to control T2 later.
|
||||
INSERT INTO t3 VALUES (68, foo(68,
|
||||
'rpl_parallel_start_waiting_for_prior SIGNAL t2_waiting', ''));
|
||||
INSERT INTO t3 VALUES (69, foo(69,
|
||||
'ha_write_row_end SIGNAL d3_query WAIT_FOR d3_cont2',
|
||||
'rpl_parallel_end_of_group SIGNAL d3_done WAIT_FOR d3_cont'));
|
||||
COMMIT;
|
||||
SET gtid_domain_id=0;
|
||||
|
||||
--connection server_2
|
||||
SET debug_sync='now WAIT_FOR d3_query';
|
||||
--let $d3_thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE INFO LIKE '%foo(69%' AND INFO NOT LIKE '%LIKE%'`
|
||||
|
||||
SET debug_sync='now SIGNAL d2_cont2';
|
||||
SET debug_sync='now WAIT_FOR d2_done';
|
||||
SET debug_sync='now SIGNAL d1_cont2';
|
||||
SET debug_sync='now WAIT_FOR d1_done';
|
||||
SET debug_sync='now SIGNAL d0_cont2';
|
||||
SET debug_sync='now WAIT_FOR d0_done';
|
||||
SET debug_sync='now SIGNAL d3_cont2';
|
||||
SET debug_sync='now WAIT_FOR d3_done';
|
||||
|
||||
# Now prepare the real transactions T1, T2, T3, T4 on the master.
|
||||
|
||||
@ -826,7 +867,7 @@ SET debug_sync='now WAIT_FOR d0_done';
|
||||
# Create transaction T1.
|
||||
SET binlog_format=statement;
|
||||
INSERT INTO t3 VALUES (64, foo(64,
|
||||
'commit_before_prepare_ordered SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
'rpl_parallel_before_mark_start_commit SIGNAL t1_waiting WAIT_FOR t1_cont', ''));
|
||||
|
||||
# Create transaction T2, as a group commit leader on the master.
|
||||
SET debug_sync='commit_after_release_LOCK_prepare_ordered SIGNAL master_queued2 WAIT_FOR master_cont2';
|
||||
@ -861,6 +902,7 @@ REAP;
|
||||
|
||||
--connection server_1
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
SET debug_sync='RESET';
|
||||
|
||||
--connection server_2
|
||||
# Now we have the four transactions pending for replication on the slave.
|
||||
@ -872,15 +914,20 @@ SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
SET debug_sync='now SIGNAL d0_cont';
|
||||
SET debug_sync='now WAIT_FOR t1_waiting';
|
||||
|
||||
# T2 will be queued on the same worker D0 as T1.
|
||||
# Make the worker D3 free, and wait for T2 to be queued in it.
|
||||
SET debug_sync='now SIGNAL d3_cont';
|
||||
SET debug_sync='now WAIT_FOR t2_waiting';
|
||||
|
||||
# Now release worker D1, and wait for T3 to be queued in it.
|
||||
# T3 will wait for T1 to commit before it can start.
|
||||
SET debug_sync='now SIGNAL d1_cont';
|
||||
SET debug_sync='now WAIT_FOR t3_waiting';
|
||||
|
||||
# Release worker D2. T4 may or may not have time to be queued on it, but
|
||||
# it will not be able to complete due to T3 being killed.
|
||||
# Release worker D2. Wait for T4 to be queued, so we are sure it has
|
||||
# received the debug_sync signal (else we might overwrite it with the
|
||||
# next debug_sync).
|
||||
SET debug_sync='now SIGNAL d2_cont';
|
||||
SET debug_sync='now WAIT_FOR t4_waiting';
|
||||
|
||||
# Now we kill the waiting transaction T3 in worker D1.
|
||||
--replace_result $d1_thd_id THD_ID
|
||||
@ -895,10 +942,15 @@ SET debug_sync='now SIGNAL t1_cont';
|
||||
--let $slave_sql_errno= 1317,1927,1964
|
||||
--source include/wait_for_slave_sql_error.inc
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 60 ORDER BY a;
|
||||
# Since T2, T3, and T4 run in parallel, we can not be sure if T2 will have time
|
||||
# to commit or not before the stop. However, T1 should commit, and T3/T4 may
|
||||
# not have committed. (After slave restart we check that all become committed
|
||||
# eventually).
|
||||
SELECT * FROM t3 WHERE a >= 60 AND a != 65 ORDER BY a;
|
||||
|
||||
# Now we have to disable the debug_sync statements, so they do not trigger
|
||||
# when the events are retried.
|
||||
SET debug_sync='RESET';
|
||||
SET GLOBAL slave_parallel_threads=0;
|
||||
SET GLOBAL slave_parallel_threads=10;
|
||||
SET sql_log_bin=0;
|
||||
@ -914,7 +966,7 @@ CREATE FUNCTION foo(x INT, d1 VARCHAR(500), d2 VARCHAR(500))
|
||||
SET sql_log_bin=1;
|
||||
|
||||
--connection server_1
|
||||
INSERT INTO t3 VALUES (69,0);
|
||||
UPDATE t3 SET b=b+1 WHERE a=60;
|
||||
--save_master_pos
|
||||
|
||||
--connection server_2
|
||||
@ -951,6 +1003,7 @@ SET GLOBAL slave_parallel_threads=10;
|
||||
|
||||
--echo *** 5. Test killing thread that is waiting for queue of max length to shorten ***
|
||||
|
||||
# Find the thread id of the driver SQL thread that we want to kill.
|
||||
--let $wait_condition= SELECT COUNT(*) = 1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'
|
||||
--source include/wait_condition.inc
|
||||
--let $thd_id= `SELECT ID FROM INFORMATION_SCHEMA.PROCESSLIST WHERE STATE LIKE '%Slave has read all relay log%'`
|
||||
@ -961,12 +1014,8 @@ SET GLOBAL slave_parallel_max_queued=9000;
|
||||
--let bigstring= `SELECT REPEAT('x', 10000)`
|
||||
SET binlog_format=statement;
|
||||
# Create an event that will wait to be signalled.
|
||||
INSERT INTO t3 VALUES (70, foo(0,
|
||||
INSERT INTO t3 VALUES (80, foo(0,
|
||||
'ha_write_row_end SIGNAL query_waiting WAIT_FOR query_cont', ''));
|
||||
--disable_query_log
|
||||
# Create an event that will fill up the queue.
|
||||
eval INSERT INTO t3 VALUES (71, LENGTH('$bigstring'));
|
||||
--enable_query_log
|
||||
|
||||
--connection server_2
|
||||
SET debug_sync='now WAIT_FOR query_waiting';
|
||||
@ -977,11 +1026,14 @@ SET @old_dbug= @@GLOBAL.debug_dbug;
|
||||
SET GLOBAL debug_dbug="+d,rpl_parallel_wait_queue_max";
|
||||
|
||||
--connection server_1
|
||||
# This event will have to wait for the queue to become shorter before it can
|
||||
# be queued. We will test that things work when we kill the SQL driver thread
|
||||
# during this wait.
|
||||
INSERT INTO t3 VALUES (72, 0);
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
--disable_query_log
|
||||
# Create an event that will fill up the queue.
|
||||
# The Xid event at the end of the event group will have to wait for the Query
|
||||
# event with the INSERT to drain so the queue becomes shorter. However that in
|
||||
# turn waits for the prior event group to continue.
|
||||
eval INSERT INTO t3 VALUES (81, LENGTH('$bigstring'));
|
||||
--enable_query_log
|
||||
SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
|
||||
|
||||
--connection server_2
|
||||
SET debug_sync='now WAIT_FOR wait_queue_ready';
|
||||
@ -995,19 +1047,19 @@ SET debug_sync='now SIGNAL query_cont';
|
||||
--let $slave_sql_errno= 1317,1927,1964
|
||||
--source include/wait_for_slave_sql_error.inc
|
||||
STOP SLAVE IO_THREAD;
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
|
||||
SET GLOBAL debug_dbug=@old_dbug;
|
||||
SET GLOBAL slave_parallel_max_queued= @old_max_queued;
|
||||
|
||||
--connection server_1
|
||||
INSERT INTO t3 VALUES (73,0);
|
||||
INSERT INTO t3 VALUES (82,0);
|
||||
--save_master_pos
|
||||
|
||||
--connection server_2
|
||||
SET debug_sync='RESET';
|
||||
--source include/start_slave.inc
|
||||
--sync_with_master
|
||||
SELECT * FROM t3 WHERE a >= 70 ORDER BY a;
|
||||
SELECT * FROM t3 WHERE a >= 80 ORDER BY a;
|
||||
|
||||
|
||||
--connection server_2
|
||||
|
@ -0,0 +1,13 @@
|
||||
SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
|
||||
SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
|
||||
must be zero because of default
|
||||
0
|
||||
SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
|
||||
ERROR HY000: Variable 'slave_domain_parallel_threads' is a GLOBAL variable
|
||||
SET GLOBAL slave_domain_parallel_threads= 0;
|
||||
SET GLOBAL slave_domain_parallel_threads= DEFAULT;
|
||||
SET GLOBAL slave_domain_parallel_threads= 10;
|
||||
SELECT @@GLOBAL.slave_domain_parallel_threads;
|
||||
@@GLOBAL.slave_domain_parallel_threads
|
||||
10
|
||||
SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
|
@ -0,0 +1,14 @@
|
||||
--source include/not_embedded.inc
|
||||
|
||||
SET @save_slave_domain_parallel_threads= @@GLOBAL.slave_domain_parallel_threads;
|
||||
|
||||
SELECT @@GLOBAL.slave_domain_parallel_threads as 'must be zero because of default';
|
||||
--error ER_INCORRECT_GLOBAL_LOCAL_VAR
|
||||
SELECT @@SESSION.slave_domain_parallel_threads as 'no session var';
|
||||
|
||||
SET GLOBAL slave_domain_parallel_threads= 0;
|
||||
SET GLOBAL slave_domain_parallel_threads= DEFAULT;
|
||||
SET GLOBAL slave_domain_parallel_threads= 10;
|
||||
SELECT @@GLOBAL.slave_domain_parallel_threads;
|
||||
|
||||
SET GLOBAL slave_domain_parallel_threads = @save_slave_domain_parallel_threads;
|
@ -1281,10 +1281,6 @@ read_append_buffer:
|
||||
size_t transfer_len;
|
||||
|
||||
DBUG_ASSERT(info->append_read_pos <= info->write_pos);
|
||||
/*
|
||||
TODO: figure out if the assert below is needed or correct.
|
||||
*/
|
||||
DBUG_ASSERT(pos_in_file == info->end_of_file);
|
||||
copy_len=MY_MIN(Count, len_in_buff);
|
||||
memcpy(Buffer, info->append_read_pos, copy_len);
|
||||
info->append_read_pos += copy_len;
|
||||
|
29
sql/log.cc
29
sql/log.cc
@ -6710,13 +6710,15 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
|
||||
*/
|
||||
wfc= orig_entry->thd->wait_for_commit_ptr;
|
||||
orig_entry->queued_by_other= false;
|
||||
if (wfc && wfc->waiting_for_commit)
|
||||
if (wfc && wfc->waitee)
|
||||
{
|
||||
mysql_mutex_lock(&wfc->LOCK_wait_commit);
|
||||
/* Do an extra check here, this time safely under lock. */
|
||||
if (wfc->waiting_for_commit)
|
||||
if (wfc->waitee)
|
||||
{
|
||||
PSI_stage_info old_stage;
|
||||
wait_for_commit *loc_waitee;
|
||||
|
||||
/*
|
||||
By setting wfc->opaque_pointer to our own entry, we mark that we are
|
||||
ready to commit, but waiting for another transaction to commit before
|
||||
@ -6727,21 +6729,20 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
|
||||
queued_by_other flag is set.
|
||||
*/
|
||||
wfc->opaque_pointer= orig_entry;
|
||||
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
|
||||
orig_entry->thd->ENTER_COND(&wfc->COND_wait_commit,
|
||||
&wfc->LOCK_wait_commit,
|
||||
&stage_waiting_for_prior_transaction_to_commit,
|
||||
&old_stage);
|
||||
DEBUG_SYNC(orig_entry->thd, "group_commit_waiting_for_prior");
|
||||
while (wfc->waiting_for_commit && !orig_entry->thd->check_killed())
|
||||
while ((loc_waitee= wfc->waitee) && !orig_entry->thd->check_killed())
|
||||
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
|
||||
wfc->opaque_pointer= NULL;
|
||||
DBUG_PRINT("info", ("After waiting for prior commit, queued_by_other=%d",
|
||||
orig_entry->queued_by_other));
|
||||
|
||||
if (wfc->waiting_for_commit)
|
||||
if (loc_waitee)
|
||||
{
|
||||
/* Wait terminated due to kill. */
|
||||
wait_for_commit *loc_waitee= wfc->waitee;
|
||||
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
|
||||
if (loc_waitee->wakeup_subsequent_commits_running ||
|
||||
orig_entry->queued_by_other)
|
||||
@ -6751,13 +6752,14 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
|
||||
do
|
||||
{
|
||||
mysql_cond_wait(&wfc->COND_wait_commit, &wfc->LOCK_wait_commit);
|
||||
} while (wfc->waiting_for_commit);
|
||||
} while (wfc->waitee);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* We were killed, so remove us from the list of waitee. */
|
||||
wfc->remove_from_list(&loc_waitee->subsequent_commits_list);
|
||||
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
|
||||
wfc->waitee= NULL;
|
||||
|
||||
orig_entry->thd->EXIT_COND(&old_stage);
|
||||
/* Interrupted by kill. */
|
||||
@ -6773,12 +6775,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
|
||||
}
|
||||
else
|
||||
mysql_mutex_unlock(&wfc->LOCK_wait_commit);
|
||||
|
||||
if (wfc->wakeup_error)
|
||||
{
|
||||
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
}
|
||||
if (wfc && wfc->wakeup_error)
|
||||
{
|
||||
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -9110,7 +9111,7 @@ start_binlog_background_thread()
|
||||
array_elements(all_binlog_threads));
|
||||
#endif
|
||||
|
||||
if (mysql_thread_create(key_thread_binlog, &th, NULL,
|
||||
if (mysql_thread_create(key_thread_binlog, &th, &connection_attrib,
|
||||
binlog_background_thread, NULL))
|
||||
return 1;
|
||||
|
||||
|
@ -549,6 +549,7 @@ ulong rpl_recovery_rank=0;
|
||||
ulong stored_program_cache_size= 0;
|
||||
|
||||
ulong opt_slave_parallel_threads= 0;
|
||||
ulong opt_slave_domain_parallel_threads= 0;
|
||||
ulong opt_binlog_commit_wait_count= 0;
|
||||
ulong opt_binlog_commit_wait_usec= 0;
|
||||
ulong opt_slave_parallel_max_queued= 131072;
|
||||
@ -982,8 +983,10 @@ PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
|
||||
key_COND_wait_commit;
|
||||
PSI_cond_key key_RELAYLOG_COND_queue_busy;
|
||||
PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
|
||||
PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
|
||||
key_COND_parallel_entry, key_COND_prepare_ordered;
|
||||
PSI_cond_key key_COND_rpl_thread_queue, key_COND_rpl_thread,
|
||||
key_COND_rpl_thread_pool,
|
||||
key_COND_parallel_entry, key_COND_group_commit_orderer,
|
||||
key_COND_prepare_ordered;
|
||||
PSI_cond_key key_COND_wait_gtid;
|
||||
|
||||
static PSI_cond_info all_server_conds[]=
|
||||
@ -1027,8 +1030,10 @@ static PSI_cond_info all_server_conds[]=
|
||||
{ &key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_GLOBAL},
|
||||
{ &key_COND_flush_thread_cache, "COND_flush_thread_cache", PSI_FLAG_GLOBAL},
|
||||
{ &key_COND_rpl_thread, "COND_rpl_thread", 0},
|
||||
{ &key_COND_rpl_thread_queue, "COND_rpl_thread_queue", 0},
|
||||
{ &key_COND_rpl_thread_pool, "COND_rpl_thread_pool", 0},
|
||||
{ &key_COND_parallel_entry, "COND_parallel_entry", 0},
|
||||
{ &key_COND_group_commit_orderer, "COND_group_commit_orderer", 0},
|
||||
{ &key_COND_prepare_ordered, "COND_prepare_ordered", 0},
|
||||
{ &key_COND_wait_gtid, "COND_wait_gtid", 0}
|
||||
};
|
||||
@ -9436,7 +9441,7 @@ PSI_stage_info stage_binlog_waiting_background_tasks= { 0, "Waiting for backgrou
|
||||
PSI_stage_info stage_binlog_processing_checkpoint_notify= { 0, "Processing binlog checkpoint notification", 0};
|
||||
PSI_stage_info stage_binlog_stopping_background_thread= { 0, "Stopping binlog background thread", 0};
|
||||
PSI_stage_info stage_waiting_for_work_from_sql_thread= { 0, "Waiting for work from SQL thread", 0};
|
||||
PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to commit", 0};
|
||||
PSI_stage_info stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for prior transaction to start commit before starting next transaction", 0};
|
||||
PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for room in worker thread event queue", 0};
|
||||
PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in MASTER_GTID_WAIT() (primary waiter)", 0};
|
||||
PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", 0};
|
||||
|
@ -180,6 +180,7 @@ extern ulong opt_binlog_rows_event_max_size;
|
||||
extern ulong rpl_recovery_rank, thread_cache_size;
|
||||
extern ulong stored_program_cache_size;
|
||||
extern ulong opt_slave_parallel_threads;
|
||||
extern ulong opt_slave_domain_parallel_threads;
|
||||
extern ulong opt_slave_parallel_max_queued;
|
||||
extern ulong opt_binlog_commit_wait_count;
|
||||
extern ulong opt_binlog_commit_wait_usec;
|
||||
@ -295,8 +296,9 @@ extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,
|
||||
key_COND_wait_commit;
|
||||
extern PSI_cond_key key_RELAYLOG_COND_queue_busy;
|
||||
extern PSI_cond_key key_TC_LOG_MMAP_COND_queue_busy;
|
||||
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_pool,
|
||||
key_COND_parallel_entry;
|
||||
extern PSI_cond_key key_COND_rpl_thread, key_COND_rpl_thread_queue,
|
||||
key_COND_rpl_thread_pool,
|
||||
key_COND_parallel_entry, key_COND_group_commit_orderer;
|
||||
extern PSI_cond_key key_COND_wait_gtid;
|
||||
|
||||
extern PSI_thread_key key_thread_bootstrap, key_thread_delayed_insert,
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -9,16 +9,66 @@ struct rpl_parallel_entry;
|
||||
struct rpl_parallel_thread_pool;
|
||||
|
||||
class Relay_log_info;
|
||||
|
||||
|
||||
/*
|
||||
Structure used to keep track of the parallel replication of a batch of
|
||||
event-groups that group-committed together on the master.
|
||||
|
||||
It is used to ensure that every event group in one batch has reached the
|
||||
commit stage before the next batch starts executing.
|
||||
|
||||
Note the lifetime of this structure:
|
||||
|
||||
- It is allocated when the first event in a new batch of group commits
|
||||
is queued, from the free list rpl_parallel_entry::gco_free_list.
|
||||
|
||||
- The gco for the batch currently being queued is owned by
|
||||
rpl_parallel_entry::current_gco. The gco for a previous batch that has
|
||||
been fully queued is owned by the gco->prev_gco pointer of the gco for
|
||||
the following batch.
|
||||
|
||||
- The worker thread waits on gco->COND_group_commit_orderer for
|
||||
rpl_parallel_entry::count_committing_event_groups to reach wait_count
|
||||
before starting; the first waiter links the gco into the next_gco
|
||||
pointer of the gco of the previous batch for signalling.
|
||||
|
||||
- When an event group reaches the commit stage, it signals the
|
||||
COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and
|
||||
rpl_parallel_entry::count_committing_event_groups has reached
|
||||
gco->next_gco->wait_count.
|
||||
|
||||
- When gco->wait_count is reached for a worker and the wait completes,
|
||||
the worker frees gco->prev_gco; at this point it is guaranteed not to
|
||||
be needed any longer.
|
||||
*/
|
||||
struct group_commit_orderer {
|
||||
/* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
|
||||
mysql_cond_t COND_group_commit_orderer;
|
||||
uint64 wait_count;
|
||||
group_commit_orderer *prev_gco;
|
||||
group_commit_orderer *next_gco;
|
||||
bool installed;
|
||||
};
|
||||
|
||||
|
||||
struct rpl_parallel_thread {
|
||||
bool delay_start;
|
||||
bool running;
|
||||
bool stop;
|
||||
mysql_mutex_t LOCK_rpl_thread;
|
||||
mysql_cond_t COND_rpl_thread;
|
||||
mysql_cond_t COND_rpl_thread_queue;
|
||||
struct rpl_parallel_thread *next; /* For free list. */
|
||||
struct rpl_parallel_thread_pool *pool;
|
||||
THD *thd;
|
||||
struct rpl_parallel_entry *current_entry;
|
||||
/*
|
||||
Who owns the thread, if any (it's a pointer into the
|
||||
rpl_parallel_entry::rpl_threads array.
|
||||
*/
|
||||
struct rpl_parallel_thread **current_owner;
|
||||
/* The rpl_parallel_entry of the owner. */
|
||||
rpl_parallel_entry *current_entry;
|
||||
struct queued_event {
|
||||
queued_event *next;
|
||||
Log_event *ev;
|
||||
@ -31,6 +81,9 @@ struct rpl_parallel_thread {
|
||||
size_t event_size;
|
||||
} *event_queue, *last_in_queue;
|
||||
uint64 queued_size;
|
||||
queued_event *qev_free_list;
|
||||
rpl_group_info *rgi_free_list;
|
||||
group_commit_orderer *gco_free_list;
|
||||
|
||||
void enqueue(queued_event *qev)
|
||||
{
|
||||
@ -42,15 +95,25 @@ struct rpl_parallel_thread {
|
||||
queued_size+= qev->event_size;
|
||||
}
|
||||
|
||||
void dequeue(queued_event *list)
|
||||
void dequeue1(queued_event *list)
|
||||
{
|
||||
queued_event *tmp;
|
||||
|
||||
DBUG_ASSERT(list == event_queue);
|
||||
event_queue= last_in_queue= NULL;
|
||||
for (tmp= list; tmp; tmp= tmp->next)
|
||||
queued_size-= tmp->event_size;
|
||||
}
|
||||
|
||||
void dequeue2(size_t dequeue_size)
|
||||
{
|
||||
queued_size-= dequeue_size;
|
||||
}
|
||||
|
||||
queued_event *get_qev(Log_event *ev, ulonglong event_size,
|
||||
Relay_log_info *rli);
|
||||
void free_qev(queued_event *qev);
|
||||
rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
|
||||
rpl_parallel_entry *e);
|
||||
void free_rgi(rpl_group_info *rgi);
|
||||
group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev);
|
||||
void free_gco(group_commit_orderer *gco);
|
||||
};
|
||||
|
||||
|
||||
@ -66,14 +129,16 @@ struct rpl_parallel_thread_pool {
|
||||
rpl_parallel_thread_pool();
|
||||
int init(uint32 size);
|
||||
void destroy();
|
||||
struct rpl_parallel_thread *get_thread(rpl_parallel_entry *entry);
|
||||
struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
|
||||
rpl_parallel_entry *entry);
|
||||
void release_thread(rpl_parallel_thread *rpt);
|
||||
};
|
||||
|
||||
|
||||
struct rpl_parallel_entry {
|
||||
mysql_mutex_t LOCK_parallel_entry;
|
||||
mysql_cond_t COND_parallel_entry;
|
||||
uint32 domain_id;
|
||||
uint32 last_server_id;
|
||||
uint64 last_seq_no;
|
||||
uint64 last_commit_id;
|
||||
bool active;
|
||||
/*
|
||||
@ -82,15 +147,41 @@ struct rpl_parallel_entry {
|
||||
waiting for event groups to complete.
|
||||
*/
|
||||
bool force_abort;
|
||||
/*
|
||||
At STOP SLAVE (force_abort=true), we do not want to process all events in
|
||||
the queue (which could unnecessarily delay stop, if a lot of events happen
|
||||
to be queued). The stop_count provides a safe point at which to stop, so
|
||||
that everything before becomes committed and nothing after does. The value
|
||||
corresponds to group_commit_orderer::wait_count; if wait_count is less than
|
||||
or equal to stop_count, we execute the associated event group, else we
|
||||
skip it (and all following) and stop.
|
||||
*/
|
||||
uint64 stop_count;
|
||||
|
||||
rpl_parallel_thread *rpl_thread;
|
||||
/*
|
||||
Cyclic array recording the last rpl_thread_max worker threads that we
|
||||
queued event for. This is used to limit how many workers a single domain
|
||||
can occupy (--slave-domain-parallel-threads).
|
||||
|
||||
Note that workers are never explicitly deleted from the array. Instead,
|
||||
we need to check (under LOCK_rpl_thread) that the thread still belongs
|
||||
to us before re-using (rpl_thread::current_owner).
|
||||
*/
|
||||
rpl_parallel_thread **rpl_threads;
|
||||
uint32 rpl_thread_max;
|
||||
uint32 rpl_thread_idx;
|
||||
/*
|
||||
The sub_id of the last transaction to commit within this domain_id.
|
||||
Must be accessed under LOCK_parallel_entry protection.
|
||||
|
||||
Event groups commit in order, so the rpl_group_info for an event group
|
||||
will be alive (at least) as long as
|
||||
rpl_grou_info::gtid_sub_id > last_committed_sub_id. This can be used to
|
||||
safely refer back to previous event groups if they are still executing,
|
||||
and ignore them if they completed, without requiring explicit
|
||||
synchronisation between the threads.
|
||||
*/
|
||||
uint64 last_committed_sub_id;
|
||||
mysql_mutex_t LOCK_parallel_entry;
|
||||
mysql_cond_t COND_parallel_entry;
|
||||
/*
|
||||
The sub_id of the last event group in this replication domain that was
|
||||
queued for execution by a worker thread.
|
||||
@ -98,14 +189,29 @@ struct rpl_parallel_entry {
|
||||
uint64 current_sub_id;
|
||||
rpl_group_info *current_group_info;
|
||||
/*
|
||||
The sub_id of the last event group in the previous batch of group-committed
|
||||
transactions.
|
||||
|
||||
When we spawn parallel worker threads for the next group-committed batch,
|
||||
they first need to wait for this sub_id to be committed before it is safe
|
||||
to start executing them.
|
||||
If we get an error in some event group, we set the sub_id of that event
|
||||
group here. Then later event groups (with higher sub_id) can know not to
|
||||
try to start (event groups that already started will be rolled back when
|
||||
wait_for_prior_commit() returns error).
|
||||
The value is ULONGLONG_MAX when no error occured.
|
||||
*/
|
||||
uint64 prev_groupcommit_sub_id;
|
||||
uint64 stop_on_error_sub_id;
|
||||
/* Total count of event groups queued so far. */
|
||||
uint64 count_queued_event_groups;
|
||||
/*
|
||||
Count of event groups that have started (but not necessarily completed)
|
||||
the commit phase. We use this to know when every event group in a previous
|
||||
batch of master group commits have started committing on the slave, so
|
||||
that it is safe to start executing the events in the following batch.
|
||||
*/
|
||||
uint64 count_committing_event_groups;
|
||||
/* The group_commit_orderer object for the events currently being queued. */
|
||||
group_commit_orderer *current_gco;
|
||||
|
||||
rpl_parallel_thread * choose_thread(Relay_log_info *rli, bool *did_enter_cond,
|
||||
PSI_stage_info *old_stage, bool reuse);
|
||||
group_commit_orderer *get_gco();
|
||||
void free_gco(group_commit_orderer *gco);
|
||||
};
|
||||
struct rpl_parallel {
|
||||
HASH domain_hash;
|
||||
@ -116,7 +222,7 @@ struct rpl_parallel {
|
||||
~rpl_parallel();
|
||||
void reset();
|
||||
rpl_parallel_entry *find(uint32 domain_id);
|
||||
void wait_for_done();
|
||||
void wait_for_done(THD *thd);
|
||||
bool workers_idle();
|
||||
bool do_event(rpl_group_info *serial_rgi, Log_event *ev,
|
||||
ulonglong event_size);
|
||||
|
@ -1479,14 +1479,27 @@ end:
|
||||
}
|
||||
|
||||
|
||||
rpl_group_info::rpl_group_info(Relay_log_info *rli_)
|
||||
: rli(rli_), thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
|
||||
wait_commit_group_info(0), wait_start_sub_id(0), parallel_entry(0),
|
||||
deferred_events(NULL), m_annotate_event(0), tables_to_lock(0),
|
||||
tables_to_lock_count(0), trans_retries(0), last_event_start_time(0),
|
||||
is_parallel_exec(false), is_error(false),
|
||||
row_stmt_start_timestamp(0), long_find_row_note_printed(false)
|
||||
void
|
||||
rpl_group_info::reinit(Relay_log_info *rli)
|
||||
{
|
||||
this->rli= rli;
|
||||
tables_to_lock= NULL;
|
||||
tables_to_lock_count= 0;
|
||||
trans_retries= 0;
|
||||
last_event_start_time= 0;
|
||||
is_error= false;
|
||||
row_stmt_start_timestamp= 0;
|
||||
long_find_row_note_printed= false;
|
||||
did_mark_start_commit= false;
|
||||
commit_orderer.reinit();
|
||||
}
|
||||
|
||||
rpl_group_info::rpl_group_info(Relay_log_info *rli)
|
||||
: thd(0), gtid_sub_id(0), wait_commit_sub_id(0),
|
||||
wait_commit_group_info(0), parallel_entry(0),
|
||||
deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
|
||||
{
|
||||
reinit(rli);
|
||||
bzero(¤t_gtid, sizeof(current_gtid));
|
||||
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
|
||||
MY_MUTEX_INIT_FAST);
|
||||
@ -1710,4 +1723,40 @@ void rpl_group_info::slave_close_thread_tables(THD *thd)
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void
|
||||
mark_start_commit_inner(rpl_parallel_entry *e, group_commit_orderer *gco)
|
||||
{
|
||||
uint64 count= ++e->count_committing_event_groups;
|
||||
if (gco->next_gco && gco->next_gco->wait_count == count)
|
||||
mysql_cond_broadcast(&gco->next_gco->COND_group_commit_orderer);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
rpl_group_info::mark_start_commit_no_lock()
|
||||
{
|
||||
if (did_mark_start_commit)
|
||||
return;
|
||||
mark_start_commit_inner(parallel_entry, gco);
|
||||
did_mark_start_commit= true;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
rpl_group_info::mark_start_commit()
|
||||
{
|
||||
rpl_parallel_entry *e;
|
||||
|
||||
if (did_mark_start_commit)
|
||||
return;
|
||||
|
||||
e= this->parallel_entry;
|
||||
mysql_mutex_lock(&e->LOCK_parallel_entry);
|
||||
mark_start_commit_inner(e, gco);
|
||||
mysql_mutex_unlock(&e->LOCK_parallel_entry);
|
||||
did_mark_start_commit= true;
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -481,6 +481,7 @@ private:
|
||||
|
||||
struct rpl_group_info
|
||||
{
|
||||
rpl_group_info *next; /* For free list in rpl_parallel_thread */
|
||||
Relay_log_info *rli;
|
||||
THD *thd;
|
||||
/*
|
||||
@ -510,14 +511,15 @@ struct rpl_group_info
|
||||
uint64 wait_commit_sub_id;
|
||||
rpl_group_info *wait_commit_group_info;
|
||||
/*
|
||||
If non-zero, the event group must wait for this sub_id to be committed
|
||||
before the execution of the event group is allowed to start.
|
||||
This holds a pointer to a struct that keeps track of the need to wait
|
||||
for the previous batch of event groups to reach the commit stage, before
|
||||
this batch can start to execute.
|
||||
|
||||
(When we execute in parallel the transactions that group committed
|
||||
together on the master, we still need to wait for any prior transactions
|
||||
to have commtted).
|
||||
to have reached the commit stage).
|
||||
*/
|
||||
uint64 wait_start_sub_id;
|
||||
group_commit_orderer *gco;
|
||||
|
||||
struct rpl_parallel_entry *parallel_entry;
|
||||
|
||||
@ -567,18 +569,22 @@ struct rpl_group_info
|
||||
char future_event_master_log_name[FN_REFLEN];
|
||||
bool is_parallel_exec;
|
||||
bool is_error;
|
||||
/*
|
||||
Set true when we signalled that we reach the commit phase. Used to avoid
|
||||
counting one event group twice.
|
||||
*/
|
||||
bool did_mark_start_commit;
|
||||
|
||||
private:
|
||||
/*
|
||||
Runtime state for printing a note when slave is taking
|
||||
too long while processing a row event.
|
||||
*/
|
||||
time_t row_stmt_start_timestamp;
|
||||
bool long_find_row_note_printed;
|
||||
public:
|
||||
|
||||
rpl_group_info(Relay_log_info *rli_);
|
||||
~rpl_group_info();
|
||||
void reinit(Relay_log_info *rli);
|
||||
|
||||
/*
|
||||
Returns true if the argument event resides in the containter;
|
||||
@ -661,6 +667,8 @@ public:
|
||||
void clear_tables_to_lock();
|
||||
void cleanup_context(THD *, bool);
|
||||
void slave_close_thread_tables(THD *);
|
||||
void mark_start_commit_no_lock();
|
||||
void mark_start_commit();
|
||||
|
||||
time_t get_row_stmt_start_timestamp()
|
||||
{
|
||||
|
@ -331,7 +331,7 @@ run_slave_init_thread()
|
||||
pthread_t th;
|
||||
|
||||
slave_init_thread_running= true;
|
||||
if (mysql_thread_create(key_thread_slave_init, &th, NULL,
|
||||
if (mysql_thread_create(key_thread_slave_init, &th, &connection_attrib,
|
||||
handle_slave_init, NULL))
|
||||
{
|
||||
sql_print_error("Failed to create thread while initialising slave");
|
||||
@ -4542,7 +4542,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
|
||||
}
|
||||
|
||||
if (opt_slave_parallel_threads > 0)
|
||||
rli->parallel.wait_for_done();
|
||||
rli->parallel.wait_for_done(thd);
|
||||
|
||||
/* Thread stopped. Print the current replication position to the log */
|
||||
{
|
||||
@ -4568,7 +4568,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
|
||||
get the correct position printed.)
|
||||
*/
|
||||
if (opt_slave_parallel_threads > 0)
|
||||
rli->parallel.wait_for_done();
|
||||
rli->parallel.wait_for_done(thd);
|
||||
|
||||
/*
|
||||
Some events set some playgrounds, which won't be cleared because thread
|
||||
|
@ -6118,14 +6118,23 @@ bool THD::rgi_have_temporary_tables()
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
wait_for_commit::reinit()
|
||||
{
|
||||
subsequent_commits_list= NULL;
|
||||
next_subsequent_commit= NULL;
|
||||
waitee= NULL;
|
||||
opaque_pointer= NULL;
|
||||
wakeup_error= 0;
|
||||
wakeup_subsequent_commits_running= false;
|
||||
}
|
||||
|
||||
|
||||
wait_for_commit::wait_for_commit()
|
||||
: subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
|
||||
opaque_pointer(0),
|
||||
waiting_for_commit(false), wakeup_error(0),
|
||||
wakeup_subsequent_commits_running(false)
|
||||
{
|
||||
mysql_mutex_init(key_LOCK_wait_commit, &LOCK_wait_commit, MY_MUTEX_INIT_FAST);
|
||||
mysql_cond_init(key_COND_wait_commit, &COND_wait_commit, 0);
|
||||
reinit();
|
||||
}
|
||||
|
||||
|
||||
@ -6173,7 +6182,7 @@ wait_for_commit::wakeup(int wakeup_error)
|
||||
|
||||
*/
|
||||
mysql_mutex_lock(&LOCK_wait_commit);
|
||||
waiting_for_commit= false;
|
||||
waitee= NULL;
|
||||
this->wakeup_error= wakeup_error;
|
||||
/*
|
||||
Note that it is critical that the mysql_cond_signal() here is done while
|
||||
@ -6205,9 +6214,8 @@ wait_for_commit::wakeup(int wakeup_error)
|
||||
void
|
||||
wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
|
||||
{
|
||||
waiting_for_commit= true;
|
||||
wakeup_error= 0;
|
||||
DBUG_ASSERT(!this->waitee /* No prior registration allowed */);
|
||||
wakeup_error= 0;
|
||||
this->waitee= waitee;
|
||||
|
||||
mysql_mutex_lock(&waitee->LOCK_wait_commit);
|
||||
@ -6217,7 +6225,7 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
|
||||
see comments on wakeup_subsequent_commits2() for details.
|
||||
*/
|
||||
if (waitee->wakeup_subsequent_commits_running)
|
||||
waiting_for_commit= false;
|
||||
this->waitee= NULL;
|
||||
else
|
||||
{
|
||||
/*
|
||||
@ -6247,9 +6255,9 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
|
||||
thd->ENTER_COND(&COND_wait_commit, &LOCK_wait_commit,
|
||||
&stage_waiting_for_prior_transaction_to_commit,
|
||||
&old_stage);
|
||||
while (waiting_for_commit && !thd->check_killed())
|
||||
while ((loc_waitee= this->waitee) && !thd->check_killed())
|
||||
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
|
||||
if (!waiting_for_commit)
|
||||
if (!loc_waitee)
|
||||
{
|
||||
if (wakeup_error)
|
||||
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
|
||||
@ -6262,7 +6270,6 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
|
||||
waiter as to whether we succeed or fail (eg. we may roll back but waitee
|
||||
might attempt to commit both us and any subsequent commits waiting for us).
|
||||
*/
|
||||
loc_waitee= this->waitee;
|
||||
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
|
||||
if (loc_waitee->wakeup_subsequent_commits_running)
|
||||
{
|
||||
@ -6271,21 +6278,29 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
|
||||
do
|
||||
{
|
||||
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
|
||||
} while (waiting_for_commit);
|
||||
} while (this->waitee);
|
||||
if (wakeup_error)
|
||||
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
|
||||
goto end;
|
||||
}
|
||||
remove_from_list(&loc_waitee->subsequent_commits_list);
|
||||
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
|
||||
this->waitee= NULL;
|
||||
|
||||
DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
|
||||
wakeup_error= thd->killed_errno();
|
||||
if (!wakeup_error)
|
||||
wakeup_error= ER_QUERY_INTERRUPTED;
|
||||
my_message(wakeup_error, ER(wakeup_error), MYF(0));
|
||||
thd->EXIT_COND(&old_stage);
|
||||
/*
|
||||
Must do the DEBUG_SYNC() _after_ exit_cond(), as DEBUG_SYNC is not safe to
|
||||
use within enter_cond/exit_cond.
|
||||
*/
|
||||
DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
|
||||
return wakeup_error;
|
||||
|
||||
end:
|
||||
thd->EXIT_COND(&old_stage);
|
||||
waitee= NULL;
|
||||
return wakeup_error;
|
||||
}
|
||||
|
||||
@ -6368,10 +6383,11 @@ wait_for_commit::wakeup_subsequent_commits2(int wakeup_error)
|
||||
void
|
||||
wait_for_commit::unregister_wait_for_prior_commit2()
|
||||
{
|
||||
wait_for_commit *loc_waitee;
|
||||
|
||||
mysql_mutex_lock(&LOCK_wait_commit);
|
||||
if (waiting_for_commit)
|
||||
if ((loc_waitee= this->waitee))
|
||||
{
|
||||
wait_for_commit *loc_waitee= this->waitee;
|
||||
mysql_mutex_lock(&loc_waitee->LOCK_wait_commit);
|
||||
if (loc_waitee->wakeup_subsequent_commits_running)
|
||||
{
|
||||
@ -6383,7 +6399,7 @@ wait_for_commit::unregister_wait_for_prior_commit2()
|
||||
See comments on wakeup_subsequent_commits2() for more details.
|
||||
*/
|
||||
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
|
||||
while (waiting_for_commit)
|
||||
while (this->waitee)
|
||||
mysql_cond_wait(&COND_wait_commit, &LOCK_wait_commit);
|
||||
}
|
||||
else
|
||||
@ -6391,10 +6407,10 @@ wait_for_commit::unregister_wait_for_prior_commit2()
|
||||
/* Remove ourselves from the list in the waitee. */
|
||||
remove_from_list(&loc_waitee->subsequent_commits_list);
|
||||
mysql_mutex_unlock(&loc_waitee->LOCK_wait_commit);
|
||||
this->waitee= NULL;
|
||||
}
|
||||
}
|
||||
mysql_mutex_unlock(&LOCK_wait_commit);
|
||||
this->waitee= NULL;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1656,8 +1656,8 @@ struct wait_for_commit
|
||||
{
|
||||
/*
|
||||
The LOCK_wait_commit protects the fields subsequent_commits_list and
|
||||
wakeup_subsequent_commits_running (for a waitee), and the flag
|
||||
waiting_for_commit and associated COND_wait_commit (for a waiter).
|
||||
wakeup_subsequent_commits_running (for a waitee), and the pointer
|
||||
waiterr and associated COND_wait_commit (for a waiter).
|
||||
*/
|
||||
mysql_mutex_t LOCK_wait_commit;
|
||||
mysql_cond_t COND_wait_commit;
|
||||
@ -1665,7 +1665,13 @@ struct wait_for_commit
|
||||
wait_for_commit *subsequent_commits_list;
|
||||
/* Link field for entries in subsequent_commits_list. */
|
||||
wait_for_commit *next_subsequent_commit;
|
||||
/* Our waitee, if we did register_wait_for_prior_commit(), else NULL. */
|
||||
/*
|
||||
Our waitee, if we did register_wait_for_prior_commit(), and were not
|
||||
yet woken up. Else NULL.
|
||||
|
||||
When this is cleared for wakeup, the COND_wait_commit condition is
|
||||
signalled.
|
||||
*/
|
||||
wait_for_commit *waitee;
|
||||
/*
|
||||
Generic pointer for use by the transaction coordinator to optimise the
|
||||
@ -1676,12 +1682,6 @@ struct wait_for_commit
|
||||
used by another transaction coordinator for similar purposes.
|
||||
*/
|
||||
void *opaque_pointer;
|
||||
/*
|
||||
The waiting_for_commit flag is cleared when a waiter has been woken
|
||||
up. The COND_wait_commit condition is signalled when this has been
|
||||
cleared.
|
||||
*/
|
||||
bool waiting_for_commit;
|
||||
/* The wakeup error code from the waitee. 0 means no error. */
|
||||
int wakeup_error;
|
||||
/*
|
||||
@ -1697,10 +1697,14 @@ struct wait_for_commit
|
||||
Quick inline check, to avoid function call and locking in the common case
|
||||
where no wakeup is registered, or a registered wait was already signalled.
|
||||
*/
|
||||
if (waiting_for_commit)
|
||||
if (waitee)
|
||||
return wait_for_prior_commit2(thd);
|
||||
else
|
||||
{
|
||||
if (wakeup_error)
|
||||
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
|
||||
return wakeup_error;
|
||||
}
|
||||
}
|
||||
void wakeup_subsequent_commits(int wakeup_error)
|
||||
{
|
||||
@ -1721,7 +1725,7 @@ struct wait_for_commit
|
||||
}
|
||||
void unregister_wait_for_prior_commit()
|
||||
{
|
||||
if (waiting_for_commit)
|
||||
if (waitee)
|
||||
unregister_wait_for_prior_commit2();
|
||||
}
|
||||
/*
|
||||
@ -1741,7 +1745,7 @@ struct wait_for_commit
|
||||
}
|
||||
next_ptr_ptr= &cur->next_subsequent_commit;
|
||||
}
|
||||
waiting_for_commit= false;
|
||||
waitee= NULL;
|
||||
}
|
||||
|
||||
void wakeup(int wakeup_error);
|
||||
@ -1752,6 +1756,7 @@ struct wait_for_commit
|
||||
|
||||
wait_for_commit();
|
||||
~wait_for_commit();
|
||||
void reinit();
|
||||
};
|
||||
|
||||
|
||||
|
@ -1768,6 +1768,49 @@ static Sys_var_ulong Sys_slave_parallel_threads(
|
||||
ON_UPDATE(fix_slave_parallel_threads));
|
||||
|
||||
|
||||
static bool
|
||||
check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var)
|
||||
{
|
||||
bool running;
|
||||
|
||||
mysql_mutex_lock(&LOCK_active_mi);
|
||||
running= master_info_index->give_error_if_slave_running();
|
||||
mysql_mutex_unlock(&LOCK_active_mi);
|
||||
if (running)
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool
|
||||
fix_slave_domain_parallel_threads(sys_var *self, THD *thd, enum_var_type type)
|
||||
{
|
||||
bool running;
|
||||
|
||||
mysql_mutex_unlock(&LOCK_global_system_variables);
|
||||
mysql_mutex_lock(&LOCK_active_mi);
|
||||
running= master_info_index->give_error_if_slave_running();
|
||||
mysql_mutex_unlock(&LOCK_active_mi);
|
||||
mysql_mutex_lock(&LOCK_global_system_variables);
|
||||
|
||||
return running ? true : false;
|
||||
}
|
||||
|
||||
|
||||
static Sys_var_ulong Sys_slave_domain_parallel_threads(
|
||||
"slave_domain_parallel_threads",
|
||||
"Maximum number of parallel threads to use on slave for events in a "
|
||||
"single replication domain. When using multiple domains, this can be "
|
||||
"used to limit a single domain from grabbing all threads and thus "
|
||||
"stalling other domains. The default of 0 means to allow a domain to "
|
||||
"grab as many threads as it wants, up to the value of "
|
||||
"slave_parallel_threads.",
|
||||
GLOBAL_VAR(opt_slave_domain_parallel_threads), CMD_LINE(REQUIRED_ARG),
|
||||
VALID_RANGE(0,16383), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
|
||||
NOT_IN_BINLOG, ON_CHECK(check_slave_domain_parallel_threads),
|
||||
ON_UPDATE(fix_slave_domain_parallel_threads));
|
||||
|
||||
|
||||
static Sys_var_ulong Sys_slave_parallel_max_queued(
|
||||
"slave_parallel_max_queued",
|
||||
"Limit on how much memory SQL threads should use per parallel "
|
||||
|
Loading…
x
Reference in New Issue
Block a user