diff --git a/mysql-test/suite/rpl/t/rpl_parallel.test b/mysql-test/suite/rpl/t/rpl_parallel.test new file mode 100644 index 00000000000..5a58b9d0f50 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_parallel.test @@ -0,0 +1,44 @@ +--source include/have_binlog_format_statement.inc + +connect (s1,127.0.0.1,root,,test,$MASTER_MYPORT,); +connect (s2,127.0.0.1,root,,test,$SLAVE_MYPORT,); + +--connection s1 +SELECT @@server_id; +SET sql_log_bin=0; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM; +SET sql_log_bin=1; + +--connection s2 +SELECT @@server_id; +SET sql_log_bin=0; +CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=MyISAM; +SET sql_log_bin=1; + +--replace_result $MASTER_MYPORT MASTER_PORT +eval CHANGE MASTER TO master_host = '127.0.0.1', master_port = $MASTER_MYPORT, + master_user='root', master_use_gtid=current_pos; + +--connection s1 +INSERT INTO t1 VALUES (1); + +--connection s2 +query_vertical SHOW SLAVE STATUS; + +--source include/start_slave.inc +SELECT * FROM t1; +--sleep 1 +SELECT * FROM t1; + +--source include/stop_slave.inc + +--connection s1 +SET sql_log_bin=0; +DROP TABLE t1; +SET sql_log_bin=1; + +--connection s2 +RESET SLAVE ALL; +SET sql_log_bin=0; +DROP TABLE t1; +SET sql_log_bin=1; diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index a4a87c1e92e..65ae2b87179 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -13,9 +13,13 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, struct rpl_parallel_thread *rpt) { int err; + Relay_log_info *rli= qev->rli; + thd->rli_slave= rli; + thd->rpl_filter = rli->mi->rpl_filter; /* ToDo: Access to thd, and what about rli, split out a parallel part? */ - err= apply_event_and_update_pos(qev->ev, thd, qev->rli, rpt); + mysql_mutex_lock(&rli->data_lock); + err= apply_event_and_update_pos(qev->ev, thd, rli, rpt); /* ToDo: error handling. */ /* ToDo: also free qev->ev, or hold on to it for a bit if necessary. */ } @@ -108,7 +112,7 @@ handle_rpl_parallel_thread(void *arg) } } rpt_handle_event(events, thd, rpt); - free(events); + my_free(events); events= next; } @@ -313,6 +317,7 @@ rpl_parallel_thread_pool::destroy() rpl_parallel_change_thread_count(this, 0, true); mysql_mutex_destroy(&LOCK_rpl_thread_pool); mysql_cond_destroy(&COND_rpl_thread_pool); + inited= false; } @@ -325,8 +330,8 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_entry *entry) while ((rpt= free_list) == NULL) mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); free_list= rpt->next; - mysql_mutex_lock(&rpt->LOCK_rpl_thread); mysql_mutex_unlock(&LOCK_rpl_thread_pool); + mysql_mutex_lock(&rpt->LOCK_rpl_thread); rpt->current_entry= entry; return rpt; @@ -383,6 +388,9 @@ rpl_parallel::do_event(Relay_log_info *rli, Log_event *ev, THD *parent_thd) rpl_parallel_thread *cur_thread; rpl_parallel_thread::queued_event *qev; + /* ToDo: what to do with this lock?!? */ + mysql_mutex_unlock(&rli->data_lock); + if (!(qev= (rpl_parallel_thread::queued_event *)my_malloc(sizeof(*qev), MYF(0)))) { diff --git a/sql/slave.cc b/sql/slave.cc index 419fa579a09..d7e4d9a25ed 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5800,7 +5800,8 @@ static Log_event* next_event(Relay_log_info* rli) llstr(my_b_tell(cur_log),llbuf1), llstr(rli->event_relay_log_pos,llbuf2))); DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); - DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos); + DBUG_ASSERT(opt_slave_parallel_threads > 0 || + my_b_tell(cur_log) == rli->event_relay_log_pos); } #endif /*