Fixes for parallel slave:

- Made slaves temporary table multi-thread slave safe by adding mutex around save_temporary_table usage.
  - rli->save_temporary_tables is the active list of all used temporary tables
  - This is copied to THD->temporary_tables when temporary tables are opened and updated when temporary tables are closed
  - Added THD->lock_temporary_tables() and THD->unlock_temporary_tables() to simplify this.
- Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code.
- Added is_part_of_group() to mark functions that are part of the next function. This replaces setting IN_STMT when events are executed.
- Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code.
- If slave_skip_counter is set run things in single threaded mode. This simplifies code for skipping events.
- Updating state of relay log (IN_STMT and IN_TRANSACTION) is moved to one single function: update_state_of_relay_log()
  We can't use OPTION_BEGIN to check for the state anymore as the sql_driver and sql execution threads may be different.
  Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts
  is_in_group() is now independent of state of executed transaction.
- Reset thd->transaction.all.modified_non_trans_table() if we did set it for single table row events.
  This was mainly for keeping the flag as documented.
- Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it.
- Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock
- Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond
- Changed some functions to take rpl_group_info instead of Relay_log_info to make them multi-slave safe and to simplify usage
  - do_shall_skip()
  - continue_group()
  - sql_slave_killed()
  - next_event()
- Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure.
- set_thd_in_use_temporary_tables() removed as in_use is set on usage
- Added information to thd_proc_info() which thread is waiting for slave mutex to exit.
- In open_table() reuse code from find_temporary_table()

Other things:
- More DBUG statements
- Fixed the rpl_incident.test can be run with --debug
- More comments
- Disabled not used function rpl_connect_master()

mysql-test/suite/perfschema/r/all_instances.result:
  Moved sleep_lock and sleep_cond to rpl_group_info
mysql-test/suite/rpl/r/rpl_incident.result:
  Updated result
mysql-test/suite/rpl/t/rpl_incident-master.opt:
  Not needed anymore
mysql-test/suite/rpl/t/rpl_incident.test:
  Fixed that test can be run with --debug
sql/handler.cc:
  More DBUG_PRINT
sql/log.cc:
  More comments
sql/log_event.cc:
  Added DBUG statements
  do_shall_skip(), continue_group() now takes rpl_group_info param
  Use is_begin(), is_commit() and is_rollback() functions instead of inspecting query string
  We don't have set slaves temporary tables 'in_use' as this is now done when tables are opened.
  Removed IN_STMT flag setting. This is now done in update_state_of_relay_log()
  Use IN_TRANSACTION flag to test state of relay log.
  In rows_event_stmt_cleanup() reset thd->transaction.all.modified_non_trans_table if we had set this before.
sql/log_event.h:
  do_shall_skip(), continue_group() now takes rpl_group_info param
  Added is_part_of_group() to mark events that are part of the next event. This replaces setting IN_STMT when events are executed.
  Added is_begin(), is_commit() and is_rollback() functions to Query_log_event to simplify code.
sql/log_event_old.cc:
  Removed IN_STMT flag setting. This is now done in update_state_of_relay_log()
  do_shall_skip(), continue_group() now takes rpl_group_info param
sql/log_event_old.h:
  Added is_part_of_group() to mark events that are part of the next event.
  do_shall_skip(), continue_group() now takes rpl_group_info param
sql/mysqld.cc:
  Changed slave_open_temp_tables to uint32 to be able to use atomic operators on it.
  Relay_log_info::sleep_lock -> Rpl_group_info::sleep_lock
  Relay_log_info::sleep_cond -> Rpl_group_info::sleep_cond
sql/mysqld.h:
  Updated types and names
sql/rpl_gtid.cc:
  More DBUG
sql/rpl_parallel.cc:
  Updated TODO section
  Set thd for event that is execution
  Use new  is_begin(), is_commit() and is_rollback() functions.
  More comments
sql/rpl_rli.cc:
  sql_thd -> sql_driver_thd
  Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock
  Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond
  Clear IN_STMT and IN_TRANSACTION in init_relay_log_pos() and Relay_log_info::cleanup_context() to ensure the flags doesn't survive slave restarts.
  Reset table->in_use for temporary tables as the table may have been used by another THD.
  Use IN_TRANSACTION instead of OPTION_BEGIN to check state of relay log.
  Removed IN_STMT flag setting. This is now done in update_state_of_relay_log()
sql/rpl_rli.h:
  Changed relay log state flags to bit masks instead of bit positions (most other code we have uses bit masks)
  Added IN_TRANSACTION to mark if we are in a BEGIN ... COMMIT section.
  save_temporary_tables is now thread safe
  Relay_log_info::sleep_lock -> rpl_group_info::sleep_lock
  Relay_log_info::sleep_cond -> rpl_group_info::sleep_cond
  Relay_log_info->sql_thd renamed to Relay_log_info->sql_driver_thd to avoid wrong usage for merged code
  is_in_group() is now independent of state of executed transaction.
sql/slave.cc:
  Simplifed arguments to io_salve_killed(), sql_slave_killed() and check_io_slave_killed(); No reason to supply THD as this is part of the given structure.
  set_thd_in_use_temporary_tables() removed as in_use is set on usage in sql_base.cc
  sql_thd -> sql_driver_thd
  More DBUG
  Added update_state_of_relay_log() which will calculate the IN_STMT and IN_TRANSACTION state of the relay log after the current element is executed.
  If slave_skip_counter is set run things in single threaded mode.
  Simplifed arguments to io_salve_killed(), check_io_slave_killed() and sql_slave_killed(); No reason to supply THD as this is part of the given structure.
  Added information to thd_proc_info() which thread is waiting for slave mutex to exit.
  Disabled not used function rpl_connect_master()
  Updated argument to next_event()
sql/sql_base.cc:
  Added mutex around usage of slave's temporary tables. The active list is always kept up to date in sql->rgi_slave->save_temporary_tables.
  Clear thd->temporary_tables after query (safety)
  More DBUG
  When using temporary table, set table->in_use to current thd as the THD may be different for slave threads.
  Some code is ifdef:ed with REMOVE_AFTER_MERGE_WITH_10 as the given code in 10.0 is not yet in this tree.
  In open_table() reuse code from find_temporary_table()
sql/sql_binlog.cc:
  rli->sql_thd -> rli->sql_driver_thd
  Remove duplicate setting of rgi->rli
sql/sql_class.cc:
  Added helper functions rgi_lock_temporary_tables() and rgi_unlock_temporary_tables()
  Would have been nicer to have these inline, but there was no easy way to do that
sql/sql_class.h:
  Added functions to protect slaves temporary tables
sql/sql_parse.cc:
  Added DBUG_PRINT
sql/transaction.cc:
  Added comment
This commit is contained in:
Michael Widenius 2013-10-14 00:24:05 +03:00
parent 3784432256
commit 2e100cc5a4
23 changed files with 603 additions and 359 deletions

View File

@ -88,7 +88,6 @@ wait/synch/mutex/sql/Query_cache::structure_guard_mutex
wait/synch/mutex/sql/Relay_log_info::data_lock
wait/synch/mutex/sql/Relay_log_info::log_space_lock
wait/synch/mutex/sql/Relay_log_info::run_lock
wait/synch/mutex/sql/Relay_log_info::sleep_lock
wait/synch/mutex/sql/Slave_reporting_capability::err_lock
wait/synch/mutex/sql/TABLE_SHARE::LOCK_ha_data
wait/synch/mutex/sql/THD::LOCK_thd_data
@ -146,7 +145,6 @@ wait/synch/cond/sql/MYSQL_RELAY_LOG::update_cond
wait/synch/cond/sql/Query_cache::COND_cache_status_changed
wait/synch/cond/sql/Relay_log_info::data_cond
wait/synch/cond/sql/Relay_log_info::log_space_cond
wait/synch/cond/sql/Relay_log_info::sleep_cond
wait/synch/cond/sql/Relay_log_info::start_cond
wait/synch/cond/sql/Relay_log_info::stop_cond
wait/synch/cond/sql/THD::COND_wakeup_ready

View File

@ -8,6 +8,7 @@ a
1
2
3
SET GLOBAL debug_dbug= '+d,incident_database_resync_on_replace,*';
REPLACE INTO t1 VALUES (4);
SELECT * FROM t1;
a

View File

@ -1 +0,0 @@
--loose-debug=+d,incident_database_resync_on_replace

View File

@ -7,12 +7,19 @@ CREATE TABLE t1 (a INT);
INSERT INTO t1 VALUES (1),(2),(3);
SELECT * FROM t1;
let $debug_save= `SELECT @@GLOBAL.debug`;
SET GLOBAL debug_dbug= '+d,incident_database_resync_on_replace,*';
# This will generate an incident log event and store it in the binary
# log before the replace statement.
REPLACE INTO t1 VALUES (4);
--save_master_pos
SELECT * FROM t1;
--disable_query_log
eval SET GLOBAL debug_dbug= '$debug_save';
--enable_query_log
connection slave;
# Wait until SQL thread stops with error LOST_EVENT on master
call mtr.add_suppression("Slave SQL.*The incident LOST_EVENTS occured on the master.* 1590");

View File

@ -1247,6 +1247,8 @@ int ha_commit_trans(THD *thd, bool all)
bool need_prepare_ordered, need_commit_ordered;
my_xid xid;
DBUG_ENTER("ha_commit_trans");
DBUG_PRINT("info",("thd: %p option_bits: %lu all: %d",
thd, (ulong) thd->variables.option_bits, all));
/* Just a random warning to test warnings pushed during autocommit. */
DBUG_EXECUTE_IF("warn_during_ha_commit_trans",
@ -1306,6 +1308,8 @@ int ha_commit_trans(THD *thd, bool all)
/* rw_trans is TRUE when we in a transaction changing data */
bool rw_trans= is_real_trans && (rw_ha_count > 0);
MDL_request mdl_request;
DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
is_real_trans, rw_trans, rw_ha_count));
if (rw_trans)
{

View File

@ -6554,9 +6554,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
the commit and wake them up. This way, all transactions in the queue get
committed in a single disk operation.
The return value of this function is TRUE if queued as the first entry in
the queue (meaning this is the leader), FALSE otherwise.
The main work in this function is when the commit in one transaction has
been marked to wait for the commit of another transaction to happen
first. This is used to support in-order parallel replication, where
@ -6570,6 +6567,10 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
transactions already prepared to commit but just waiting for the first one
to commit. If so, we add those to the queue as well, transitively for all
waiters.
@retval TRUE If queued as the first entry in the queue (meaning this
is the leader)
@retval FALSE Otherwise
*/
bool
@ -6657,7 +6658,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
The end result is a breath-first traversal of the tree of waiters,
re-using the next_subsequent_commit pointers in place of extra stack
space in a recursive traversal.
The temporary list created in next_subsequent_commit is not
used by the caller or any other function.
*/
list= wfc;
cur= list;
last= list;
@ -7239,6 +7244,7 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
Note that this function may release and re-acquire LOCK_log and
LOCK_prepare_ordered if it needs to wait.
*/
void
MYSQL_BIN_LOG::wait_for_sufficient_commits()
{

View File

@ -940,6 +940,8 @@ Log_event::Log_event(const char* buf,
int Log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Log_event::do_update_pos");
/*
rli is null when (as far as I (Guilhem) know) the caller is
Load_log_event::do_apply_event *and* that one is called from
@ -973,13 +975,14 @@ int Log_event::do_update_pos(rpl_group_info *rgi)
if (debug_not_change_ts_if_art_event == 0)
debug_not_change_ts_if_art_event= 2; );
}
return 0; // Cannot fail currently
DBUG_RETURN(0); // Cannot fail currently
}
Log_event::enum_skip_reason
Log_event::do_shall_skip(Relay_log_info *rli)
Log_event::do_shall_skip(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu,"
" rli->replicate_same_server_id: %d,"
" rli->slave_skip_counter: %lu",
@ -2525,11 +2528,11 @@ void Log_event::print_timestamp(IO_CACHE* file, time_t* ts)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
inline Log_event::enum_skip_reason
Log_event::continue_group(Relay_log_info *rli)
Log_event::continue_group(rpl_group_info *rgi)
{
if (rli->slave_skip_counter == 1)
if (rgi->rli->slave_skip_counter == 1)
return Log_event::EVENT_SKIP_IGNORE;
return Log_event::do_shall_skip(rli);
return Log_event::do_shall_skip(rgi);
}
#endif
@ -4263,11 +4266,13 @@ int Query_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
Query_log_event::do_shall_skip(Relay_log_info *rli)
Query_log_event::do_shall_skip(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Query_log_event::do_shall_skip");
DBUG_PRINT("debug", ("query: %s; q_len: %d", query, q_len));
DBUG_ASSERT(query && q_len > 0);
DBUG_ASSERT(thd == rgi->thd);
/*
An event skipped due to @@skip_replication must not be counted towards the
@ -4279,19 +4284,19 @@ Query_log_event::do_shall_skip(Relay_log_info *rli)
if (rli->slave_skip_counter > 0)
{
if (strcmp("BEGIN", query) == 0)
if (is_begin())
{
thd->variables.option_bits|= OPTION_BEGIN;
DBUG_RETURN(Log_event::continue_group(rli));
DBUG_RETURN(Log_event::continue_group(rgi));
}
if (strcmp("COMMIT", query) == 0 || strcmp("ROLLBACK", query) == 0)
if (is_commit() || is_rollback())
{
thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
}
DBUG_RETURN(Log_event::do_shall_skip(rli));
DBUG_RETURN(Log_event::do_shall_skip(rgi));
}
@ -4465,7 +4470,7 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
{
DBUG_ENTER("Start_log_event_v3::do_apply_event");
int error= 0;
Relay_log_info const *rli= rgi->rli;
Relay_log_info *rli= rgi->rli;
switch (binlog_version)
{
@ -4479,24 +4484,14 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
*/
if (created)
{
error= close_temporary_tables(thd);
rli->close_temporary_tables();
/*
The following is only false if we get here with a BINLOG statement
*/
if (rli->mi)
cleanup_load_tmpdir(&rli->mi->cmp_connection_name);
}
else
{
/*
Set all temporary tables thread references to the current thread
as they may point to the "old" SQL slave thread in case of its
restart.
*/
TABLE *table;
for (table= thd->temporary_tables; table; table= table->next)
table->in_use= thd;
}
break;
/*
@ -4511,7 +4506,7 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi)
Can distinguish, based on the value of 'created': this event was
generated at master startup.
*/
error= close_temporary_tables(thd);
rli->close_temporary_tables();
}
/*
Otherwise, can't distinguish a Start_log_event generated at
@ -4895,7 +4890,7 @@ int Format_description_log_event::do_update_pos(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
Format_description_log_event::do_shall_skip(Relay_log_info *rli)
Format_description_log_event::do_shall_skip(rpl_group_info *rgi)
{
return Log_event::EVENT_SKIP_NOT;
}
@ -5970,8 +5965,8 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
flush_relay_log_info(rli);
/*
Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of
a master's downgrade from 5.0 to 4.0.
Reset thd->variables.option_bits and sql_mode etc, because this could
be the signal of a master's downgrade from 5.0 to 4.0.
However, no need to reset description_event_for_exec: indeed, if the next
master is 5.0 (even 5.0.1) we will soon get a Format_desc; if the next
master is 4.0 then the events are in the slave's format (conversion).
@ -5991,9 +5986,9 @@ int Rotate_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
Rotate_log_event::do_shall_skip(Relay_log_info *rli)
Rotate_log_event::do_shall_skip(rpl_group_info *rgi)
{
enum_skip_reason reason= Log_event::do_shall_skip(rli);
enum_skip_reason reason= Log_event::do_shall_skip(rgi);
switch (reason) {
case Log_event::EVENT_SKIP_NOT:
@ -6302,8 +6297,9 @@ Gtid_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
Gtid_log_event::do_shall_skip(Relay_log_info *rli)
Gtid_log_event::do_shall_skip(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
An event skipped due to @@skip_replication must not be counted towards the
number of events to be skipped due to @@sql_slave_skip_counter.
@ -6315,10 +6311,13 @@ Gtid_log_event::do_shall_skip(Relay_log_info *rli)
if (rli->slave_skip_counter > 0)
{
if (!(flags2 & FL_STANDALONE))
{
thd->variables.option_bits|= OPTION_BEGIN;
return Log_event::continue_group(rli);
DBUG_ASSERT(rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION));
}
return Log_event::do_shall_skip(rli);
return Log_event::continue_group(rgi);
}
return Log_event::do_shall_skip(rgi);
}
@ -6707,13 +6706,6 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
int Intvar_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
/*
We are now in a statement until the associated query log event has
been processed.
*/
rli->set_flag(Relay_log_info::IN_STMT);
if (rgi->deferred_events_collecting)
return rgi->deferred_events->add(this);
@ -6738,7 +6730,7 @@ int Intvar_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
Intvar_log_event::do_shall_skip(Relay_log_info *rli)
Intvar_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
It is a common error to set the slave skip counter to 1 instead of
@ -6748,7 +6740,7 @@ Intvar_log_event::do_shall_skip(Relay_log_info *rli)
that we do not change the value of the slave skip counter since it
will be decreased by the following insert event.
*/
return continue_group(rli);
return continue_group(rgi);
}
#endif
@ -6818,13 +6810,6 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Rand_log_event::do_apply_event(rpl_group_info *rgi)
{
Relay_log_info const *rli= rgi->rli;
/*
We are now in a statement until the associated query log event has
been processed.
*/
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
if (rgi->deferred_events_collecting)
return rgi->deferred_events->add(this);
@ -6842,7 +6827,7 @@ int Rand_log_event::do_update_pos(rpl_group_info *rgi)
Log_event::enum_skip_reason
Rand_log_event::do_shall_skip(Relay_log_info *rli)
Rand_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
It is a common error to set the slave skip counter to 1 instead of
@ -6852,7 +6837,7 @@ Rand_log_event::do_shall_skip(Relay_log_info *rli)
that we do not change the value of the slave skip counter since it
will be decreased by the following insert event.
*/
return continue_group(rli);
return continue_group(rgi);
}
/**
@ -6998,14 +6983,16 @@ int Xid_log_event::do_apply_event(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
Xid_log_event::do_shall_skip(Relay_log_info *rli)
Xid_log_event::do_shall_skip(rpl_group_info *rgi)
{
DBUG_ENTER("Xid_log_event::do_shall_skip");
if (rli->slave_skip_counter > 0) {
if (rgi->rli->slave_skip_counter > 0)
{
DBUG_ASSERT(!rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION));
thd->variables.option_bits&= ~OPTION_BEGIN;
DBUG_RETURN(Log_event::EVENT_SKIP_COUNT);
}
DBUG_RETURN(Log_event::do_shall_skip(rli));
DBUG_RETURN(Log_event::do_shall_skip(rgi));
}
#endif /* !MYSQL_CLIENT */
@ -7418,7 +7405,6 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
{
Item *it= 0;
CHARSET_INFO *charset;
Relay_log_info const *rli= rgi->rli;
DBUG_ENTER("User_var_log_event::do_apply_event");
if (rgi->deferred_events_collecting)
@ -7435,12 +7421,6 @@ int User_var_log_event::do_apply_event(rpl_group_info *rgi)
double real_val;
longlong int_val;
/*
We are now in a statement until the associated query log event has
been processed.
*/
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
if (is_null)
{
it= new Item_null();
@ -7511,7 +7491,7 @@ int User_var_log_event::do_update_pos(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
User_var_log_event::do_shall_skip(Relay_log_info *rli)
User_var_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
It is a common error to set the slave skip counter to 1 instead
@ -7521,7 +7501,7 @@ User_var_log_event::do_shall_skip(Relay_log_info *rli)
that we do not change the value of the slave skip counter since it
will be decreased by the following insert event.
*/
return continue_group(rli);
return continue_group(rgi);
}
#endif /* !MYSQL_CLIENT */
@ -7724,9 +7704,11 @@ void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
Start_log_event_v3::do_apply_event(), not here. Because if we come
here, the master was sane.
*/
int Stop_log_event::do_update_pos(rpl_group_info *rgi)
{
Relay_log_info *rli= rgi->rli;
DBUG_ENTER("Stop_log_event::do_update_pos");
/*
We do not want to update master_log pos because we get a rotate event
before stop, so by now group_master_log_name is set to the next log.
@ -7734,7 +7716,7 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi)
could give false triggers in MASTER_POS_WAIT() that we have reached
the target position when in fact we have not.
*/
if (thd->variables.option_bits & OPTION_BEGIN)
if (rli->get_flag(Relay_log_info::IN_TRANSACTION))
rli->inc_event_relay_log_pos();
else
{
@ -7742,7 +7724,7 @@ int Stop_log_event::do_update_pos(rpl_group_info *rgi)
rli->inc_group_relay_log_pos(0);
flush_relay_log_info(rli);
}
return 0;
DBUG_RETURN(0);
}
#endif /* !MYSQL_CLIENT */
@ -8514,13 +8496,13 @@ int Begin_load_query_log_event::get_create_or_append() const
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
Log_event::enum_skip_reason
Begin_load_query_log_event::do_shall_skip(Relay_log_info *rli)
Begin_load_query_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1, then we should not start executing
on the next event.
*/
return continue_group(rli);
return continue_group(rgi);
}
#endif
@ -9272,17 +9254,6 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
*/
thd->set_time(when, when_sec_part);
/*
Now we are in a statement and will stay in a statement until we
see a STMT_END_F.
We set this flag here, before actually applying any rows, in
case the SQL thread is stopped and we need to detect that we're
inside a statement and halting abruptly might cause problems
when restarting.
*/
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
set_flags(COMPLETE_ROWS_F);
@ -9442,17 +9413,17 @@ int Rows_log_event::do_apply_event(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
Rows_log_event::do_shall_skip(Relay_log_info *rli)
Rows_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1 and this event does not end a
statement, then we should not start executing on the next event.
Otherwise, we defer the decision to the normal skipping logic.
*/
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::do_shall_skip(rli);
return Log_event::do_shall_skip(rgi);
}
/**
@ -9469,6 +9440,8 @@ Rows_log_event::do_shall_skip(Relay_log_info *rli)
static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd)
{
int error;
DBUG_ENTER("rows_event_stmt_cleanup");
{
/*
This is the end of a statement or transaction, so close (and
@ -9520,9 +9493,16 @@ static int rows_event_stmt_cleanup(rpl_group_info *rgi, THD * thd)
*/
thd->reset_current_stmt_binlog_format_row();
/*
Reset modified_non_trans_table that we have set in
rows_log_event::do_apply_event()
*/
if (!thd->in_multi_stmt_transaction_mode())
thd->transaction.all.modified_non_trans_table= 0;
rgi->cleanup_context(thd, 0);
}
return error;
DBUG_RETURN(error);
}
/**
@ -9795,9 +9775,9 @@ int Annotate_rows_log_event::do_update_pos(rpl_group_info *rgi)
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
Log_event::enum_skip_reason
Annotate_rows_log_event::do_shall_skip(Relay_log_info *rli)
Annotate_rows_log_event::do_shall_skip(rpl_group_info *rgi)
{
return continue_group(rli);
return continue_group(rgi);
}
#endif
@ -10265,7 +10245,7 @@ check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list)
enum_tbl_map_status res= OK_TO_PROCESS;
Relay_log_info *rli= rgi->rli;
if (rli->sql_thd->slave_thread /* filtering is for slave only */ &&
if (rgi->thd->slave_thread /* filtering is for slave only */ &&
(!rli->mi->rpl_filter->db_ok(table_list->db) ||
(rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
res= FILTERED_OUT;
@ -10316,7 +10296,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
/* call from mysql_client_binlog_statement() will not set rli->mi */
filter= rli->sql_thd->slave_thread ? rli->mi->rpl_filter : global_rpl_filter;
filter= rgi->thd->slave_thread ? rli->mi->rpl_filter : global_rpl_filter;
strmov(db_mem, filter->get_rewrite_db(m_dbnam, &dummy_len));
strmov(tname_mem, m_tblnam);
@ -10404,13 +10384,13 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
}
Log_event::enum_skip_reason
Table_map_log_event::do_shall_skip(Relay_log_info *rli)
Table_map_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1, then we should not start executing
on the next event.
*/
return continue_group(rli);
return continue_group(rgi);
}
int Table_map_log_event::do_update_pos(rpl_group_info *rgi)

View File

@ -1342,9 +1342,9 @@ public:
@see do_shall_skip
*/
enum_skip_reason shall_skip(Relay_log_info *rli)
enum_skip_reason shall_skip(rpl_group_info *rgi)
{
return do_shall_skip(rli);
return do_shall_skip(rgi);
}
@ -1352,6 +1352,7 @@ public:
Check if an event is non-final part of a stand-alone event group,
such as Intvar_log_event (such events should be processed as part
of the following event group, not individually).
See also is_part_of_group()
*/
static bool is_part_of_group(enum Log_event_type ev_type)
{
@ -1375,6 +1376,11 @@ public:
return false;
}
}
/*
Same as above, but works on the object. In addition this is true for all
rows event except the last one.
*/
virtual bool is_part_of_group() { return 0; }
static bool is_group_event(enum Log_event_type ev_type)
{
@ -1408,14 +1414,14 @@ protected:
A typical usage is:
@code
enum_skip_reason do_shall_skip(Relay_log_info *rli) {
return continue_group(rli);
enum_skip_reason do_shall_skip(rpl_group_info *rgi) {
return continue_group(rgi);
}
@endcode
@return Skip reason
*/
enum_skip_reason continue_group(Relay_log_info *rli);
enum_skip_reason continue_group(rpl_group_info *rgi);
/**
Primitive to apply an event to the database.
@ -1493,7 +1499,7 @@ protected:
The event shall be skipped because the slave skip counter was
non-zero. The caller shall decrease the counter by one.
*/
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -1985,7 +1991,7 @@ public:
public: /* !!! Public in this patch to allow old usage */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
@ -2017,6 +2023,9 @@ public: /* !!! Public in this patch to allow old usage */
!strncasecmp(query, "SAVEPOINT", 9) ||
!strncasecmp(query, "ROLLBACK", 8);
}
bool is_begin() { return !strcmp(query, "BEGIN"); }
bool is_commit() { return !strcmp(query, "COMMIT"); }
bool is_rollback() { return !strcmp(query, "ROLLBACK"); }
};
@ -2501,7 +2510,7 @@ public:
protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*)
virtual enum_skip_reason do_shall_skip(rpl_group_info*)
{
/*
Events from ourself should be skipped, but they should not
@ -2598,7 +2607,7 @@ protected:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -2672,12 +2681,13 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -2751,12 +2761,13 @@ class Rand_log_event: public Log_event
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -2804,7 +2815,7 @@ class Xid_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
enum_skip_reason do_shall_skip(Relay_log_info *rli);
enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -2867,12 +2878,13 @@ public:
void set_deferred() { deferred= true; }
#endif
bool is_valid() const { return name != 0; }
bool is_part_of_group() { return 1; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -2906,7 +2918,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli)
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi)
{
/*
Events from ourself should be skipped, but they should not
@ -3008,7 +3020,7 @@ public:
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -3121,7 +3133,7 @@ public:
void pack_info(THD *thd, Protocol *protocol);
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
#else
void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
@ -3497,7 +3509,7 @@ public:
Log_event_type get_type_code() { return BEGIN_LOAD_QUERY_EVENT; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -3619,6 +3631,7 @@ public:
virtual int get_data_size();
virtual Log_event_type get_type_code();
virtual bool is_valid() const;
virtual bool is_part_of_group() { return 1; }
#ifndef MYSQL_CLIENT
virtual bool write_data_header(IO_CACHE*);
@ -3637,7 +3650,7 @@ public:
private:
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info*);
virtual enum_skip_reason do_shall_skip(rpl_group_info*);
#endif
private:
@ -4030,6 +4043,7 @@ public:
virtual Log_event_type get_type_code() { return TABLE_MAP_EVENT; }
virtual bool is_valid() const { return m_memory != NULL; /* we check malloc */ }
virtual bool is_part_of_group() { return 1; }
virtual int get_data_size() { return (uint) m_data_size; }
#ifdef MYSQL_SERVER
@ -4052,7 +4066,7 @@ private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
#ifdef MYSQL_SERVER
@ -4195,6 +4209,7 @@ public:
{
return m_rows_buf && m_cols.bitmap;
}
bool is_part_of_group() { return get_flags(STMT_END_F) != 0; }
uint m_row_count; /* The number of rows added to the event */
@ -4280,7 +4295,7 @@ private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.

View File

@ -205,17 +205,6 @@ Old_rows_log_event::do_apply_event(Old_rows_log_event *ev, rpl_group_info *rgi)
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(ev_thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
/*
Now we are in a statement and will stay in a statement until we
see a STMT_END_F.
We set this flag here, before actually applying any rows, in
case the SQL thread is stopped and we need to detect that we're
inside a statement and halting abruptly might cause problems
when restarting.
*/
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
error= do_before_row_operations(table);
while (error == 0 && row_start < ev->m_rows_end)
{
@ -1613,17 +1602,6 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
/*
Now we are in a statement and will stay in a statement until we
see a STMT_END_F.
We set this flag here, before actually applying any rows, in
case the SQL thread is stopped and we need to detect that we're
inside a statement and halting abruptly might cause problems
when restarting.
*/
const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
if ( m_width == table->s->fields && bitmap_is_set_all(&m_cols))
set_flags(COMPLETE_ROWS_F);
@ -1820,17 +1798,17 @@ int Old_rows_log_event::do_apply_event(rpl_group_info *rgi)
Log_event::enum_skip_reason
Old_rows_log_event::do_shall_skip(Relay_log_info *rli)
Old_rows_log_event::do_shall_skip(rpl_group_info *rgi)
{
/*
If the slave skip counter is 1 and this event does not end a
statement, then we should not start executing on the next event.
Otherwise, we defer the decision to the normal skipping logic.
*/
if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
if (rgi->rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::do_shall_skip(rli);
return Log_event::do_shall_skip(rgi);
}
int

View File

@ -145,6 +145,7 @@ public:
{
return m_rows_buf && m_cols.bitmap;
}
bool is_part_of_group() { return 1; }
uint m_row_count; /* The number of rows added to the event */
@ -216,7 +217,7 @@ private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(rpl_group_info *rgi);
virtual int do_update_pos(rpl_group_info *rgi);
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
virtual enum_skip_reason do_shall_skip(rpl_group_info *rgi);
/*
Primitive to prepare for a sequence of row executions.

View File

@ -467,11 +467,12 @@ uint lower_case_table_names;
ulong tc_heuristic_recover= 0;
int32 thread_count;
int32 thread_running;
int32 slave_open_temp_tables;
ulong thread_created;
ulong back_log, connect_timeout, concurrency, server_id;
ulong table_cache_size, table_def_size;
ulong what_to_log;
ulong slow_launch_time, slave_open_temp_tables;
ulong slow_launch_time;
ulong open_files_limit, max_binlog_size;
ulong slave_trans_retries;
uint slave_net_timeout;
@ -767,7 +768,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_relay_log_info_sleep_lock,
key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOG_INFO_lock,
key_LOCK_thread_count, key_LOCK_thread_cache,
@ -839,7 +840,7 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_relay_log_info_data_lock, "Relay_log_info::data_lock", 0},
{ &key_relay_log_info_log_space_lock, "Relay_log_info::log_space_lock", 0},
{ &key_relay_log_info_run_lock, "Relay_log_info::run_lock", 0},
{ &key_relay_log_info_sleep_lock, "Relay_log_info::sleep_lock", 0},
{ &key_rpl_group_info_sleep_lock, "Rpl_group_info::sleep_lock", 0},
{ &key_structure_guard_mutex, "Query_cache::structure_guard_mutex", 0},
{ &key_TABLE_SHARE_LOCK_ha_data, "TABLE_SHARE::LOCK_ha_data", 0},
{ &key_LOCK_error_messages, "LOCK_error_messages", PSI_FLAG_GLOBAL},
@ -888,7 +889,7 @@ PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_master_info_sleep_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
key_relay_log_info_sleep_cond,
key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache,
key_BINLOG_COND_queue_busy;
@ -934,7 +935,7 @@ static PSI_cond_info all_server_conds[]=
{ &key_relay_log_info_log_space_cond, "Relay_log_info::log_space_cond", 0},
{ &key_relay_log_info_start_cond, "Relay_log_info::start_cond", 0},
{ &key_relay_log_info_stop_cond, "Relay_log_info::stop_cond", 0},
{ &key_relay_log_info_sleep_cond, "Relay_log_info::sleep_cond", 0},
{ &key_rpl_group_info_sleep_cond, "Rpl_group_info::sleep_cond", 0},
{ &key_TABLE_SHARE_cond, "TABLE_SHARE::cond", 0},
{ &key_user_level_lock_cond, "User_level_lock::cond", 0},
{ &key_COND_thread_count, "COND_thread_count", PSI_FLAG_GLOBAL},
@ -7285,7 +7286,7 @@ SHOW_VAR status_vars[]= {
{"Select_range", (char*) offsetof(STATUS_VAR, select_range_count), SHOW_LONG_STATUS},
{"Select_range_check", (char*) offsetof(STATUS_VAR, select_range_check_count), SHOW_LONG_STATUS},
{"Select_scan", (char*) offsetof(STATUS_VAR, select_scan_count), SHOW_LONG_STATUS},
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_LONG},
{"Slave_open_temp_tables", (char*) &slave_open_temp_tables, SHOW_INT},
#ifdef HAVE_REPLICATION
{"Slave_retried_transactions",(char*)&slave_retried_transactions, SHOW_LONG},
{"Slave_heartbeat_period", (char*) &show_heartbeat_period, SHOW_SIMPLE_FUNC},

View File

@ -153,7 +153,7 @@ extern ulong delayed_insert_timeout;
extern ulong delayed_insert_limit, delayed_queue_size;
extern ulong delayed_insert_threads, delayed_insert_writes;
extern ulong delayed_rows_in_use,delayed_insert_errors;
extern ulong slave_open_temp_tables;
extern int32 slave_open_temp_tables;
extern ulonglong query_cache_size;
extern ulong query_cache_min_res_unit;
extern ulong slow_launch_threads, slow_launch_time;
@ -246,7 +246,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_master_info_sleep_lock,
key_mutex_slave_reporting_capability_err_lock, key_relay_log_info_data_lock,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_relay_log_info_sleep_lock,
key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index;
@ -278,7 +278,7 @@ extern PSI_cond_key key_BINLOG_COND_xid_list, key_BINLOG_update_cond,
key_master_info_sleep_cond,
key_relay_log_info_data_cond, key_relay_log_info_log_space_cond,
key_relay_log_info_start_cond, key_relay_log_info_stop_cond,
key_relay_log_info_sleep_cond,
key_rpl_group_info_sleep_cond,
key_TABLE_SHARE_cond, key_user_level_lock_cond,
key_COND_thread_count, key_COND_thread_cache, key_COND_flush_thread_cache;
extern PSI_cond_key key_RELAYLOG_update_cond, key_COND_wakeup_ready,

View File

@ -65,6 +65,7 @@ int
rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
uint64 sub_id;
DBUG_ENTER("rpl_slave_state::record_and_update_gtid");
/*
Update the GTID position, if we have it and did not already update
@ -74,10 +75,10 @@ rpl_slave_state::record_and_update_gtid(THD *thd, rpl_group_info *rgi)
{
rgi->gtid_sub_id= 0;
if (record_gtid(thd, &rgi->current_gtid, sub_id, false, false))
return 1;
DBUG_RETURN(1);
update_state_hash(sub_id, &rgi->current_gtid);
}
return 0;
DBUG_RETURN(0);
}
@ -310,6 +311,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
element *elem;
ulonglong thd_saved_option= thd->variables.option_bits;
Query_tables_list lex_backup;
DBUG_ENTER("record_gtid");
if (unlikely(!loaded))
{
@ -320,7 +322,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
We already complained loudly about this, but we can try to continue
until the DBA fixes it.
*/
return 0;
DBUG_RETURN(0);
}
if (!in_statement)
@ -329,7 +331,7 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
DBUG_EXECUTE_IF("gtid_inject_record_gtid",
{
my_error(ER_CANNOT_UPDATE_GTID_STATE, MYF(0));
return 1;
DBUG_RETURN(1);
} );
thd->lex->reset_n_backup_query_tables_list(&lex_backup);
@ -347,8 +349,11 @@ rpl_slave_state::record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id,
table->no_replicate= 1;
if (!in_transaction)
{
DBUG_PRINT("info", ("resetting OPTION_BEGIN"));
thd->variables.option_bits&=
~(ulonglong)(OPTION_NOT_AUTOCOMMIT|OPTION_BEGIN);
}
bitmap_set_all(table->write_set);
@ -457,7 +462,7 @@ end:
}
thd->lex->restore_backup_query_tables_list(&lex_backup);
thd->variables.option_bits= thd_saved_option;
return err;
DBUG_RETURN(err);
}

View File

@ -43,11 +43,6 @@
slave rolls back the transaction; parallel execution needs to be able
to deal with this wrt. commit_orderer and such.
- Relay_log_info::is_in_group(). This needs to be handled correctly in all
callers. I think it needs to be split into two, one version in
Relay_log_info to be used from next_event() in slave.cc, one to be used in
per-transaction stuff.
- We should fail if we connect to the master with opt_slave_parallel_threads
greater than zero and master does not support GTID. Just to avoid a bunch
of potential problems, we won't be able to do any parallel replication
@ -71,6 +66,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev,
/* ToDo: Access to thd, and what about rli, split out a parallel part? */
mysql_mutex_lock(&rli->data_lock);
qev->ev->thd= thd;
err= apply_event_and_update_pos(qev->ev, thd, rgi, rpt);
thd->rgi_slave= NULL;
/* ToDo: error handling. */
@ -234,8 +230,8 @@ handle_rpl_parallel_thread(void *arg)
((group_standalone && !Log_event::is_part_of_group(event_type)) ||
event_type == XID_EVENT ||
(event_type == QUERY_EVENT &&
(!strcmp("COMMIT", ((Query_log_event *)events->ev)->query) ||
!strcmp("ROLLBACK", ((Query_log_event *)events->ev)->query))));
(((Query_log_event *)events->ev)->is_commit() ||
((Query_log_event *)events->ev)->is_rollback())));
delete_or_keep_event_post_apply(rgi, event_type, events->ev);
my_free(events);
@ -612,6 +608,11 @@ rpl_parallel::wait_for_done()
}
/*
do_event() is executed by the sql_driver_thd thread.
It's main purpose is to find a thread that can exectue the query.
*/
bool
rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
{
@ -718,9 +719,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev)
if (!cur_thread)
{
/*
Nothing else is currently running in this domain. We can spawn a new
thread to do this event group in parallel with anything else that might
be running in other domains.
Nothing else is currently running in this domain. We can
spawn a new thread to do this event group in parallel with
anything else that might be running in other domains.
*/
cur_thread= e->rpl_thread= global_rpl_thread_pool.get_thread(e);
/* get_thread() returns with the LOCK_rpl_thread locked. */

View File

@ -56,7 +56,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
#endif
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_thd(0),
abort_pos_wait(0), slave_run_id(0), sql_driver_thd(),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0), executed_entries(0),
last_event_start_time(0), m_flags(0),
@ -85,12 +85,10 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery)
&data_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_log_space_lock,
&log_space_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_relay_log_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_relay_log_info_data_cond, &data_cond, NULL);
mysql_cond_init(key_relay_log_info_start_cond, &start_cond, NULL);
mysql_cond_init(key_relay_log_info_stop_cond, &stop_cond, NULL);
mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond, NULL);
mysql_cond_init(key_relay_log_info_sleep_cond, &sleep_cond, NULL);
relay_log.init_pthread_objects();
DBUG_VOID_RETURN;
}
@ -103,12 +101,10 @@ Relay_log_info::~Relay_log_info()
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
mysql_mutex_destroy(&log_space_lock);
mysql_mutex_destroy(&sleep_lock);
mysql_cond_destroy(&data_cond);
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
mysql_cond_destroy(&log_space_cond);
mysql_cond_destroy(&sleep_cond);
relay_log.cleanup();
DBUG_VOID_RETURN;
}
@ -523,6 +519,8 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log,
}
rli->group_relay_log_pos = rli->event_relay_log_pos = pos;
rli->clear_flag(Relay_log_info::IN_STMT);
rli->clear_flag(Relay_log_info::IN_TRANSACTION);
/*
Test to see if the previous run was with the skip of purging
@ -935,6 +933,9 @@ void Relay_log_info::close_temporary_tables()
for (table=save_temporary_tables ; table ; table=next)
{
next=table->next;
/* Reset in_use as the table may have been created by another thd */
table->in_use=0;
/*
Don't ask for disk deletion. For now, anyway they will be deleted when
slave restarts, but it is a better intention to not delete them.
@ -1094,8 +1095,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
!replicate_same_server_id)
DBUG_RETURN(FALSE);
log_name= group_master_log_name;
log_pos= (!ev)? group_master_log_pos :
((thd->variables.option_bits & OPTION_BEGIN || !ev->log_pos) ?
log_pos= ((!ev)? group_master_log_pos :
(get_flag(IN_TRANSACTION) || !ev->log_pos) ?
group_master_log_pos : ev->log_pos - ev->data_written);
}
else
@ -1195,7 +1196,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
#ifndef DBUG_OFF
extern uint debug_not_change_ts_if_art_event;
#endif
clear_flag(IN_STMT);
DBUG_ENTER("Relay_log_info::stmt_done");
DBUG_ASSERT(rgi->rli == this);
/*
@ -1204,6 +1205,9 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
(not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
BEGIN/COMMIT, not with SET AUTOCOMMIT= .
We can't use rgi->rli->get_flag(IN_TRANSACTION) here as OPTION_BEGIN
is also used for single row transactions.
CAUTION: opt_using_transactions means innodb || bdb ; suppose the
master supports InnoDB and BDB, but the slave supports only BDB,
problems will arise: - suppose an InnoDB table is created on the
@ -1221,7 +1225,8 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
if ((rgi->thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
if ((rgi->thd->variables.option_bits & OPTION_BEGIN) &&
opt_using_transactions)
inc_event_relay_log_pos();
else
{
@ -1255,6 +1260,7 @@ void Relay_log_info::stmt_done(my_off_t event_master_log_pos,
IF_DBUG(debug_not_change_ts_if_art_event > 0, 1)))
last_master_timestamp= event_creation_time;
}
DBUG_VOID_RETURN;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@ -1417,12 +1423,17 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli_)
tables_to_lock_count(0)
{
bzero(&current_gtid, sizeof(current_gtid));
mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_rpl_group_info_sleep_cond, &sleep_cond, NULL);
}
rpl_group_info::~rpl_group_info()
{
free_annotate_event();
mysql_mutex_destroy(&sleep_lock);
mysql_cond_destroy(&sleep_cond);
}
@ -1492,6 +1503,7 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi,
void rpl_group_info::cleanup_context(THD *thd, bool error)
{
DBUG_ENTER("Relay_log_info::cleanup_context");
DBUG_PRINT("enter", ("error: %d", (int) error));
DBUG_ASSERT(this->thd == thd);
/*
@ -1514,9 +1526,20 @@ void rpl_group_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables();
slave_close_thread_tables(thd);
if (error)
{
thd->mdl_context.release_transactional_locks();
/* ToDo: This must clear the flag in rgi, not rli. */
if (thd == rli->sql_driver_thd)
{
/*
Reset flags. This is needed to handle incident events and errors in
the relay log noticed by the sql driver thread.
*/
rli->clear_flag(Relay_log_info::IN_STMT);
rli->clear_flag(Relay_log_info::IN_TRANSACTION);
}
}
/*
Cleanup for the flags that have been set at do_apply_event.
*/

View File

@ -59,14 +59,14 @@ class Relay_log_info : public Slave_reporting_capability
{
public:
/**
Flags for the state of the replication.
Flags for the state of reading the relay log. Note that these are
bit masks.
*/
enum enum_state_flag {
/** The replication thread is inside a statement */
IN_STMT,
/** Flag counter. Should always be last */
STATE_FLAGS_COUNT
/** We are inside a group of events forming a statement */
IN_STMT=1,
/** We have inside a transaction */
IN_TRANSACTION=2
};
/*
@ -131,9 +131,14 @@ public:
IO_CACHE info_file;
/*
When we restart slave thread we need to have access to the previously
created temporary tables. Modified only on init/end and by the SQL
thread, read only by SQL thread.
List of temporary tables used by this connection.
This is updated when a temporary table is created or dropped by
a replication thread.
Not reset when replication ends, to allow one to access the tables
when replication restarts.
Protected by data_lock.
*/
TABLE *save_temporary_tables;
@ -141,13 +146,13 @@ public:
standard lock acquisition order to avoid deadlocks:
run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
*/
mysql_mutex_t data_lock, run_lock, sleep_lock;
mysql_mutex_t data_lock, run_lock;
/*
start_cond is broadcast when SQL thread is started
stop_cond - when stopped
data_cond - when data protected by data_lock changes
*/
mysql_cond_t start_cond, stop_cond, data_cond, sleep_cond;
mysql_cond_t start_cond, stop_cond, data_cond;
/* parent Master_info structure */
Master_info *mi;
@ -164,8 +169,8 @@ public:
- an autocommiting query + its associated events (INSERT_ID,
TIMESTAMP...)
We need these rli coordinates :
- relay log name and position of the beginning of the group we currently are
executing. Needed to know where we have to restart when replication has
- relay log name and position of the beginning of the group we currently
are executing. Needed to know where we have to restart when replication has
stopped in the middle of a group (which has been rolled back by the slave).
- relay log name and position just after the event we have just
executed. This event is part of the current group.
@ -239,7 +244,13 @@ public:
ulong max_relay_log_size;
mysql_mutex_t log_space_lock;
mysql_cond_t log_space_cond;
THD * sql_thd;
/*
THD for the main sql thread, the one that starts threads to process
slave requests. If there is only one thread, then this THD is also
used for SQL processing.
A kill sent to this THD will kill the replication.
*/
THD *sql_driver_thd;
#ifndef DBUG_OFF
int events_till_abort;
#endif
@ -399,6 +410,25 @@ public:
time_t event_creation_time, THD *thd,
rpl_group_info *rgi);
/**
Is the replication inside a group?
The reader of the relay log is inside a group if either:
- The IN_TRANSACTION flag is set, meaning we're inside a transaction
- The IN_STMT flag is set, meaning we have read at least one row from
a multi-event entry.
This flag reflects the state of the log 'just now', ie after the last
read event would be executed.
This allow us to test if we can stop replication before reading
the next entry.
@retval true Replication thread is currently inside a group
@retval false Replication thread is currently not inside a group
*/
bool is_in_group() const {
return (m_flags & (IN_STMT | IN_TRANSACTION));
}
/**
Set the value of a replication state flag.
@ -407,7 +437,7 @@ public:
*/
void set_flag(enum_state_flag flag)
{
m_flags |= (1UL << flag);
m_flags|= flag;
}
/**
@ -419,7 +449,7 @@ public:
*/
bool get_flag(enum_state_flag flag)
{
return m_flags & (1UL << flag);
return m_flags & flag;
}
/**
@ -429,22 +459,7 @@ public:
*/
void clear_flag(enum_state_flag flag)
{
m_flags &= ~(1UL << flag);
}
/**
Is the replication inside a group?
Replication is inside a group if either:
- The OPTION_BEGIN flag is set, meaning we're inside a transaction
- The RLI_IN_STMT flag is set, meaning we're inside a statement
@retval true Replication thread is currently inside a group
@retval false Replication thread is currently not inside a group
*/
bool is_in_group() const {
return (sql_thd->variables.option_bits & OPTION_BEGIN) ||
(m_flags & (1UL << IN_STMT));
m_flags&= ~flag;
}
time_t get_row_stmt_start_timestamp()
@ -482,7 +497,12 @@ public:
private:
/* ToDo: This must be moved to rpl_group_info. */
/*
Holds the state of the data in the relay log.
We need this to ensure that we are not in the middle of a
statement or inside BEGIN ... COMMIT when should rotate the
relay log.
*/
uint32 m_flags;
/*
@ -503,8 +523,11 @@ private:
together.
In parallel replication, there will be one rpl_group_info object for
each running thd. All rpl_group_info will share the same Relay_log_info.
each running sql thread, each having their own thd.
All rpl_group_info will share the same Relay_log_info.
*/
struct rpl_group_info
{
Relay_log_info *rli;
@ -566,6 +589,8 @@ struct rpl_group_info
RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
uint tables_to_lock_count; /* RBR: Count of tables to lock */
table_mapping m_table_map; /* RBR: Mapping table-id to table */
mysql_mutex_t sleep_lock;
mysql_cond_t sleep_cond;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();

View File

@ -146,8 +146,8 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
static bool wait_for_relay_log_space(Relay_log_info* rli);
static inline bool io_slave_killed(THD* thd,Master_info* mi);
static inline bool sql_slave_killed(THD* thd,Relay_log_info* rli);
static bool io_slave_killed(Master_info* mi);
static bool sql_slave_killed(rpl_group_info *rgi);
static int init_slave_thread(THD* thd, Master_info *mi,
SLAVE_THD_TYPE thd_type);
static void print_slave_skip_errors(void);
@ -156,14 +156,14 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
bool suppress_warnings);
static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
bool reconnect, bool suppress_warnings);
static Log_event* next_event(Relay_log_info* rli);
static Log_event* next_event(rpl_group_info* rgi);
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
static int terminate_slave_thread(THD *thd,
mysql_mutex_t *term_lock,
mysql_cond_t *term_cond,
volatile uint *slave_running,
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
static bool check_io_slave_killed(Master_info *mi, const char *info);
static bool send_show_master_info_header(THD *thd, bool full,
size_t gtid_pos_length);
static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
@ -570,13 +570,6 @@ void init_slave_skip_errors(const char* arg)
DBUG_VOID_RETURN;
}
static void set_thd_in_use_temporary_tables(Relay_log_info *rli)
{
TABLE *table;
for (table= rli->save_temporary_tables ; table ; table= table->next)
table->in_use= rli->sql_thd;
}
int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
{
@ -592,7 +585,7 @@ int terminate_slave_threads(Master_info* mi,int thread_mask,bool skip_lock)
{
DBUG_PRINT("info",("Terminating SQL thread"));
mi->rli.abort_slave=1;
if ((error=terminate_slave_thread(mi->rli.sql_thd, sql_lock,
if ((error=terminate_slave_thread(mi->rli.sql_driver_thd, sql_lock,
&mi->rli.stop_cond,
&mi->rli.slave_running,
skip_lock)) &&
@ -957,13 +950,12 @@ void end_slave()
DBUG_VOID_RETURN;
}
static bool io_slave_killed(THD* thd, Master_info* mi)
static bool io_slave_killed(Master_info* mi)
{
DBUG_ENTER("io_slave_killed");
DBUG_ASSERT(mi->io_thd == thd);
DBUG_ASSERT(mi->slave_running); // tracking buffer overrun
DBUG_RETURN(mi->abort_slave || abort_loop || thd->killed);
DBUG_RETURN(mi->abort_slave || abort_loop || mi->io_thd->killed);
}
/**
@ -979,26 +971,36 @@ static bool io_slave_killed(THD* thd, Master_info* mi)
@return TRUE the killed status is recognized, FALSE a possible killed
status is deferred.
*/
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
static bool sql_slave_killed(rpl_group_info *rgi)
{
bool ret= FALSE;
Relay_log_info *rli= rgi->rli;
THD *thd= rgi->thd;
DBUG_ENTER("sql_slave_killed");
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_ASSERT(rli->sql_driver_thd == thd);
DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
if (abort_loop || thd->killed || rli->abort_slave)
if (abort_loop || rli->sql_driver_thd->killed || rli->abort_slave)
{
/*
The transaction should always be binlogged if OPTION_KEEP_LOG is set
(it implies that something can not be rolled back). And such case
should be regarded similarly as modifing a non-transactional table
because retrying of the transaction will lead to an error or inconsistency
as well.
Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped.
The transaction should always be binlogged if OPTION_KEEP_LOG is
set (it implies that something can not be rolled back). And such
case should be regarded similarly as modifing a
non-transactional table because retrying of the transaction will
lead to an error or inconsistency as well.
Example: OPTION_KEEP_LOG is set if a temporary table is created
or dropped.
Note that transaction.all.modified_non_trans_table may be 1
if last statement was a single row transaction without begin/end.
Testing this flag must always be done in connection with
rli->is_in_group().
*/
if ((thd->transaction.all.modified_non_trans_table ||
(thd->variables.option_bits & OPTION_KEEP_LOG))
&& rli->is_in_group())
(thd->variables.option_bits & OPTION_KEEP_LOG)) &&
rli->is_in_group())
{
char msg_stopped[]=
"... Slave SQL Thread stopped with incomplete event group "
@ -1008,20 +1010,28 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
"ignores duplicate key, key not found, and similar errors (see "
"documentation for details).";
DBUG_PRINT("info", ("modified_non_trans_table: %d OPTION_BEGIN: %d "
"is_in_group: %d",
thd->transaction.all.modified_non_trans_table,
test(thd->variables.option_bits & OPTION_BEGIN),
rli->is_in_group()));
if (rli->abort_slave)
{
DBUG_PRINT("info", ("Request to stop slave SQL Thread received while "
DBUG_PRINT("info",
("Request to stop slave SQL Thread received while "
"applying a group that has non-transactional "
"changes; waiting for completion of the group ... "));
/*
Slave sql thread shutdown in face of unfinished group modified
Non-trans table is handled via a timer. The slave may eventually
give out to complete the current group and in that case there
might be issues at consequent slave restart, see the error message.
WL#2975 offers a robust solution requiring to store the last exectuted
event's coordinates along with the group's coordianates
instead of waiting with @c last_event_start_time the timer.
Slave sql thread shutdown in face of unfinished group
modified Non-trans table is handled via a timer. The slave
may eventually give out to complete the current group and in
that case there might be issues at consequent slave restart,
see the error message. WL#2975 offers a robust solution
requiring to store the last exectuted event's coordinates
along with the group's coordianates instead of waiting with
@c last_event_start_time the timer.
*/
if (rli->last_event_start_time == 0)
@ -1049,7 +1059,8 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
else
{
ret= TRUE;
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR),
msg_stopped);
}
}
@ -1461,7 +1472,7 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
mi->clock_diff_with_master=
(long) (time((time_t*) 0) - strtoul(master_row[0], 0, 10));
}
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@ -1526,7 +1537,7 @@ not always make sense; please check the manual before using it).";
}
else if (mysql_errno(mysql))
{
if (check_io_slave_killed(mi->io_thd, mi, NULL))
if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@ -1599,7 +1610,7 @@ be equal for the Statement-format replication to work";
goto err;
}
}
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@ -1662,7 +1673,7 @@ be equal for the Statement-format replication to work";
goto err;
}
}
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(err_code= mysql_errno(mysql)))
{
@ -1707,7 +1718,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
sprintf(query, query_format, llbuf);
if (mysql_real_query(mysql, query, strlen(query))
&& !check_io_slave_killed(mi->io_thd, mi, NULL))
&& !check_io_slave_killed(mi, NULL))
{
errmsg= "The slave I/O thread stops because SET @master_heartbeat_period "
"on master failed.";
@ -1742,7 +1753,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
rc= mysql_real_query(mysql, query, strlen(query));
if (rc != 0)
{
if (check_io_slave_killed(mi->io_thd, mi, NULL))
if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
if (mysql_errno(mysql) == ER_UNKNOWN_SYSTEM_VARIABLE)
@ -1788,7 +1799,7 @@ when it try to get the value of TIME_ZONE global variable from master.";
DBUG_ASSERT(mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_OFF ||
mi->checksum_alg_before_fd == BINLOG_CHECKSUM_ALG_CRC32);
}
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@ -2052,7 +2063,7 @@ after_set_capability:
rpl_global_gtid_slave_state.load(mi->io_thd, master_row[0],
strlen(master_row[0]), false, false);
}
else if (check_io_slave_killed(mi->io_thd, mi, NULL))
else if (check_io_slave_killed(mi, NULL))
goto slave_killed_err;
else if (is_network_error(mysql_errno(mysql)))
{
@ -2118,7 +2129,7 @@ static bool wait_for_relay_log_space(Relay_log_info* rli)
"\
Waiting for the slave SQL thread to free enough relay log space");
while (rli->log_space_limit < rli->log_space_total &&
!(slave_killed=io_slave_killed(thd,mi)) &&
!(slave_killed=io_slave_killed(mi)) &&
!rli->ignore_log_space_limit)
mysql_cond_wait(&rli->log_space_cond, &rli->log_space_lock);
@ -2293,7 +2304,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
{
*suppress_warnings= TRUE; // Suppress reconnect warning
}
else if (!check_io_slave_killed(mi->io_thd, mi, NULL))
else if (!check_io_slave_killed(mi, NULL))
{
char buf[256];
my_snprintf(buf, sizeof(buf), "%s (Errno: %d)", mysql_error(mysql),
@ -2463,8 +2474,15 @@ static bool send_show_master_info_data(THD *thd, Master_info *mi, bool full,
&my_charset_bin);
mysql_mutex_lock(&mi->run_lock);
if (full)
protocol->store(mi->rli.sql_thd ? mi->rli.sql_thd->proc_info : "",
{
/*
Show what the sql driver replication thread is doing
This is only meaningful if there is only one slave thread.
*/
protocol->store(mi->rli.sql_driver_thd ?
mi->rli.sql_driver_thd->proc_info : "",
&my_charset_bin);
}
protocol->store(mi->io_thd ? mi->io_thd->proc_info : "", &my_charset_bin);
mysql_mutex_unlock(&mi->run_lock);
@ -2797,7 +2815,7 @@ static int init_slave_thread(THD* thd, Master_info *mi,
@retval True if the thread has been killed, false otherwise.
*/
template <typename killed_func, typename rpl_info>
static inline bool slave_sleep(THD *thd, time_t seconds,
static bool slave_sleep(THD *thd, time_t seconds,
killed_func func, rpl_info info)
{
@ -2813,7 +2831,7 @@ static inline bool slave_sleep(THD *thd, time_t seconds,
mysql_mutex_lock(lock);
old_proc_info= thd->enter_cond(cond, lock, thd->proc_info);
while (! (ret= func(thd, info)))
while (! (ret= func(info)))
{
int error= mysql_cond_timedwait(cond, lock, &abstime);
if (error == ETIMEDOUT || error == ETIME)
@ -3024,7 +3042,6 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
{
int exec_res= 0;
Relay_log_info* rli= rgi->rli;
DBUG_ENTER("apply_event_and_update_pos");
DBUG_PRINT("exec_event",("%s(type_code: %d; server_id: %d)",
@ -3074,7 +3091,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
(ev->flags & LOG_EVENT_SKIP_REPLICATION_F ? OPTION_SKIP_REPLICATION : 0);
ev->thd = thd; // because up to this point, ev->thd == 0
int reason= ev->shall_skip(rli);
int reason= ev->shall_skip(rgi);
if (reason == Log_event::EVENT_SKIP_COUNT)
{
DBUG_ASSERT(rli->slave_skip_counter > 0);
@ -3098,9 +3115,10 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
// EVENT_SKIP_COUNT
"skipped because event skip counter was non-zero"
};
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
DBUG_PRINT("info", ("OPTION_BEGIN: %d IN_STMT: %d IN_TRANSACTION: %d",
test(thd->variables.option_bits & OPTION_BEGIN),
rli->get_flag(Relay_log_info::IN_STMT)));
rli->get_flag(Relay_log_info::IN_STMT),
rli->get_flag(Relay_log_info::IN_TRANSACTION)));
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
#endif
@ -3149,6 +3167,80 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
}
/**
Keep the relay log transaction state up to date.
The state reflects how things are after the given event, that has just been
read from the relay log, is executed.
This is only needed to ensure we:
- Don't abort the sql driver thread in the middle of an event group.
- Don't rotate the io thread in the middle of a statement or transaction.
The mechanism is that the io thread, when it needs to rotate the relay
log, will wait until the sql driver has read all the cached events
and then continue reading events one by one from the master until
the sql threads signals that log doesn't have an active group anymore.
There are two possible cases. We keep them as 2 separate flags mainly
to make debugging easier.
- IN_STMT is set when we have read an event that should be used
together with the next event. This is for example setting a
variable that is used when executing the next statement.
- IN_TRANSACTION is set when we are inside a BEGIN...COMMIT group
To test the state one should use the is_in_group() function.
*/
inline void update_state_of_relay_log(Relay_log_info *rli, Log_event *ev)
{
Log_event_type typ= ev->get_type_code();
/* check if we are in a multi part event */
if (ev->is_part_of_group())
rli->set_flag(Relay_log_info::IN_STMT);
else if (Log_event::is_group_event(typ))
{
/*
If it was not a is_part_of_group() and not a group event (like
rotate) then we can reset the IN_STMT flag. We have the above
if only to allow us to have a rotate element anywhere.
*/
rli->clear_flag(Relay_log_info::IN_STMT);
}
/* Check for an event that starts or stops a transaction */
if (typ == QUERY_EVENT)
{
Query_log_event *qev= (Query_log_event*) ev;
/*
Trivial optimization to avoid the following somewhat expensive
checks.
*/
if (qev->q_len <= sizeof("ROLLBACK"))
{
if (qev->is_begin())
rli->set_flag(Relay_log_info::IN_TRANSACTION);
if (qev->is_commit() || qev->is_rollback())
rli->clear_flag(Relay_log_info::IN_TRANSACTION);
}
}
if (typ == XID_EVENT)
rli->clear_flag(Relay_log_info::IN_TRANSACTION);
if (typ == GTID_EVENT &&
!(((Gtid_log_event*) ev)->flags2 & Gtid_log_event::FL_STANDALONE))
{
/* This GTID_EVENT will generate a BEGIN event */
rli->set_flag(Relay_log_info::IN_TRANSACTION);
}
DBUG_PRINT("info", ("event: %u IN_STMT: %d IN_TRANSACTION: %d",
(uint) typ,
rli->get_flag(Relay_log_info::IN_STMT),
rli->get_flag(Relay_log_info::IN_TRANSACTION)));
}
/**
Top-level function for executing the next event from the relay log.
@ -3177,6 +3269,7 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd,
@retval 1 The event was not applied.
*/
static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
rpl_group_info *serial_rgi)
{
@ -3189,11 +3282,9 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
*/
mysql_mutex_lock(&rli->data_lock);
Log_event * ev = next_event(rli);
Log_event * ev = next_event(serial_rgi);
DBUG_ASSERT(rli->sql_thd==thd);
if (sql_slave_killed(thd,rli))
if (sql_slave_killed(serial_rgi))
{
mysql_mutex_unlock(&rli->data_lock);
delete ev;
@ -3216,8 +3307,8 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
sql_print_information("Slave SQL thread stopped because it reached its"
" UNTIL position %s", llstr(rli->until_pos(), buf));
/*
Setting abort_slave flag because we do not want additional message about
error in query execution to be printed.
Setting abort_slave flag because we do not want additional
message about error in query execution to be printed.
*/
rli->abort_slave= 1;
mysql_mutex_unlock(&rli->data_lock);
@ -3245,7 +3336,14 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
};);
}
if (opt_slave_parallel_threads > 0)
update_state_of_relay_log(rli, ev);
/*
Execute queries in parallel, except if slave_skip_counter is set,
as it's is easier to skip queries in single threaded mode.
*/
if (opt_slave_parallel_threads > 0 && rli->slave_skip_counter == 0)
DBUG_RETURN(rli->parallel.do_event(serial_rgi, ev));
/*
@ -3310,7 +3408,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli,
serial_rgi->cleanup_context(thd, 1);
/* chance for concurrent connection to get more locks */
slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
sql_slave_killed, rli);
sql_slave_killed, serial_rgi);
mysql_mutex_lock(&rli->data_lock); // because of SHOW STATUS
rli->trans_retries++;
rli->retried_trans++;
@ -3358,9 +3456,9 @@ on this slave.\
}
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info)
static bool check_io_slave_killed(Master_info *mi, const char *info)
{
if (io_slave_killed(thd, mi))
if (io_slave_killed(mi))
{
if (info && global_system_variables.log_warnings)
sql_print_information("%s", info);
@ -3411,7 +3509,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
return 1; // Don't retry forever
slave_sleep(thd, mi->connect_retry, io_slave_killed, mi);
}
if (check_io_slave_killed(thd, mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
if (check_io_slave_killed(mi, messages[SLAVE_RECON_MSG_KILLED_WAITING]))
return 1;
thd->proc_info = messages[SLAVE_RECON_MSG_AFTER];
if (!suppress_warnings)
@ -3448,7 +3546,7 @@ static int try_to_reconnect(THD *thd, MYSQL *mysql, Master_info *mi,
sql_print_information("%s", buf);
}
}
if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(thd, mi))
if (safe_reconnect(thd, mysql, mi, 1) || io_slave_killed(mi))
{
if (global_system_variables.log_warnings)
sql_print_information("%s", messages[SLAVE_RECON_MSG_KILLED_AFTER]);
@ -3631,11 +3729,14 @@ connected:
if (ret == 2)
{
if (check_io_slave_killed(mi->io_thd, mi, "Slave I/O thread killed"
if (check_io_slave_killed(mi, "Slave I/O thread killed"
"while calling get_master_version_and_clock(...)"))
goto err;
suppress_warnings= FALSE;
/* Try to reconnect because the error was caused by a transient network problem */
/*
Try to reconnect because the error was caused by a transient network
problem
*/
if (try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
reconnect_messages[SLAVE_RECON_ACT_REG]))
goto err;
@ -3650,7 +3751,7 @@ connected:
thd_proc_info(thd, "Registering slave on master");
if (register_slave_on_master(mysql, mi, &suppress_warnings))
{
if (!check_io_slave_killed(thd, mi, "Slave I/O thread killed "
if (!check_io_slave_killed(mi, "Slave I/O thread killed "
"while registering slave on master"))
{
sql_print_error("Slave I/O thread couldn't register on master");
@ -3675,13 +3776,13 @@ connected:
}
DBUG_PRINT("info",("Starting reading binary log from master"));
while (!io_slave_killed(thd,mi))
while (!io_slave_killed(mi))
{
thd_proc_info(thd, "Requesting binlog dump");
if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
if (check_io_slave_killed(mi, "Slave I/O thread killed while \
requesting master dump") ||
try_to_reconnect(thd, mysql, mi, &retry_count, suppress_warnings,
reconnect_messages[SLAVE_RECON_ACT_DUMP]))
@ -3701,7 +3802,7 @@ requesting master dump") ||
const char *event_buf;
DBUG_ASSERT(mi->last_error().number == 0);
while (!io_slave_killed(thd,mi))
while (!io_slave_killed(mi))
{
ulong event_len;
/*
@ -3712,7 +3813,7 @@ requesting master dump") ||
*/
thd_proc_info(thd, "Waiting for master to send event");
event_len= read_event(mysql, mi, &suppress_warnings);
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
if (check_io_slave_killed(mi, "Slave I/O thread killed while \
reading event"))
goto err;
DBUG_EXECUTE_IF("FORCE_SLAVE_TO_RECONNECT_EVENT",
@ -3802,10 +3903,11 @@ Stopping slave I/O thread due to out-of-memory error from master");
- if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
the clean value is 0), then we are reading only one more event as we
should, and we'll block only at the next event. No big deal.
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
the clean value is 1), then we are going into wait_for_relay_log_space()
for no reason, but this function will do a clean read, notice the clean
value and exit immediately.
- if mi->rli.ignore_log_space_limit is 0 but becomes 1 just
after (so the clean value is 1), then we are going into
wait_for_relay_log_space() for no reason, but this function
will do a clean read, notice the clean value and exit
immediately.
*/
#ifndef DBUG_OFF
{
@ -3866,7 +3968,7 @@ err:
mi->mysql=0;
}
write_ignored_events_info_to_relay_log(thd, mi);
thd_proc_info(thd, "Waiting for slave mutex on exit");
thd_proc_info(thd, "Slave io thread waiting for slave mutex on exit");
mysql_mutex_lock(&mi->run_lock);
err_during_init:
@ -3996,7 +4098,6 @@ pthread_handler_t handle_slave_sql(void *arg)
thd = new THD; // note that contructor of THD uses DBUG_ !
thd->thread_stack = (char*)&thd; // remember where our stack is
thd->rpl_filter = mi->rpl_filter;
serial_rgi->thd= thd;
DBUG_ASSERT(rli->inited);
DBUG_ASSERT(rli->mi == mi);
@ -4007,7 +4108,15 @@ pthread_handler_t handle_slave_sql(void *arg)
rli->events_till_abort = abort_slave_event_count;
#endif
rli->sql_thd= thd;
/*
THD for the sql driver thd. In parallel replication this is the thread
that reads things from the relay log and calls rpl_parallel::do_event()
to execute queries.
In single thread replication this is the THD for the thread that is
executing SQL queries too.
*/
serial_rgi->thd= rli->sql_driver_thd= thd;
/* Inform waiting threads that slave has started */
rli->slave_run_id++;
@ -4032,8 +4141,6 @@ pthread_handler_t handle_slave_sql(void *arg)
serial_rgi->deferred_events= new Deferred_log_events(rli);
}
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
/*
binlog_annotate_row_events must be TRUE only after an Annotate_rows event
has been recieved and only till the last corresponding rbr event has been
@ -4110,7 +4217,6 @@ pthread_handler_t handle_slave_sql(void *arg)
#endif
}
#endif
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
rli->group_master_log_name,
@ -4193,10 +4299,9 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
/* Read queries from the IO/THREAD until this thread is killed */
while (!sql_slave_killed(thd,rli))
while (!sql_slave_killed(serial_rgi))
{
thd_proc_info(thd, "Reading event from the relay log");
DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
if (saved_skip && rli->slave_skip_counter == 0)
@ -4217,7 +4322,7 @@ log '%s' at position %s, relay log '%s' position: %s%s", RPL_LOG_NAME,
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(thd,rli))
if (!sql_slave_killed(serial_rgi))
{
/*
retrieve as much info as possible from the thd and, error
@ -4349,7 +4454,7 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
thd->catalog= 0;
thd->reset_query();
thd->reset_db(NULL, 0);
thd_proc_info(thd, "Waiting for slave mutex on exit");
thd_proc_info(thd, "Sql driver thread waiting for slave mutex on exit");
mysql_mutex_lock(&rli->run_lock);
err_during_init:
/* We need data_lock, at least to wake up any waiting master_pos_wait() */
@ -4367,17 +4472,14 @@ err_during_init:
rli->ignore_log_space_limit= 0; /* don't need any lock */
/* we die so won't remember charset - re-update them on next thread start */
rli->cached_charset_invalidate();
rli->save_temporary_tables = thd->temporary_tables;
/*
TODO: see if we can do this conditionally in next_event() instead
to avoid unneeded position re-init
*/
thd->temporary_tables = 0; // remove tempation from destructor to close them
DBUG_ASSERT(rli->sql_thd == thd);
THD_CHECK_SENTRY(thd);
rli->sql_thd= 0;
set_thd_in_use_temporary_tables(rli); // (re)set sql_thd in use for saved temp tables
serial_rgi->thd= rli->sql_driver_thd= 0;
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
delete thd;
@ -5474,7 +5576,7 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
"terminated.");
DBUG_RETURN(1);
}
while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
while (!(slave_was_killed = io_slave_killed(mi)) &&
(reconnect ? mysql_reconnect(mysql) != 0 :
mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, client_flag) == 0))
@ -5552,19 +5654,20 @@ static int safe_reconnect(THD* thd, MYSQL* mysql, Master_info* mi,
}
#ifdef NOT_USED
MYSQL *rpl_connect_master(MYSQL *mysql)
{
THD *thd= current_thd;
Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
bool allocated= false;
my_bool my_true= 1;
THD *thd;
if (!mi)
{
sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
return NULL;
}
thd= mi->io_thd;
if (!mysql)
{
if(!(mysql= mysql_init(NULL)))
@ -5607,11 +5710,11 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
if (mi->user == NULL
|| mi->user[0] == 0
|| io_slave_killed(thd, mi)
|| io_slave_killed( mi)
|| !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
mi->port, 0, 0))
{
if (!io_slave_killed(thd, mi))
if (!io_slave_killed( mi))
sql_print_error("rpl_connect_master: error connecting to master: %s (server_error: %d)",
mysql_error(mysql), mysql_errno(mysql));
@ -5621,6 +5724,7 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
}
return mysql;
}
#endif
/*
Store the file and position where the execute-slave thread are in the
@ -5727,16 +5831,17 @@ static IO_CACHE *reopen_relay_log(Relay_log_info *rli, const char **errmsg)
error is reported through the sql_print_information() or
sql_print_error() functions.
*/
static Log_event* next_event(Relay_log_info* rli)
static Log_event* next_event(rpl_group_info *rgi)
{
Log_event* ev;
Relay_log_info *rli= rgi->rli;
IO_CACHE* cur_log = rli->cur_log;
mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->sql_thd;
THD *thd = rgi->thd;
DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0);
DBUG_ASSERT(thd != 0 && thd == rli->sql_driver_thd);
#ifndef DBUG_OFF
if (abort_slave_event_count && !rli->events_till_abort--)
@ -5752,7 +5857,7 @@ static Log_event* next_event(Relay_log_info* rli)
*/
mysql_mutex_assert_owner(&rli->data_lock);
while (!sql_slave_killed(thd,rli))
while (!sql_slave_killed(rgi))
{
/*
We can have two kinds of log reading:
@ -5821,7 +5926,6 @@ static Log_event* next_event(Relay_log_info* rli)
opt_slave_sql_verify_checksum)))
{
DBUG_ASSERT(thd==rli->sql_thd);
/*
read it while we have a lock, to avoid a mutex lock in
inc_event_relay_log_pos()
@ -5832,7 +5936,6 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);
}
DBUG_ASSERT(thd==rli->sql_thd);
if (opt_reckless_slave) // For mysql-test
cur_log->error = 0;
if (cur_log->error < 0)
@ -5920,14 +6023,15 @@ static Log_event* next_event(Relay_log_info* rli)
and reads one more event and starts honoring log_space_limit again.
If the SQL thread needs more events to be able to rotate the log (it
might need to finish the current group first), then it can ask for one
more at a time. Thus we don't outgrow the relay log indefinitely,
might need to finish the current group first), then it can ask for
one more at a time. Thus we don't outgrow the relay log indefinitely,
but rather in a controlled manner, until the next rotate.
When the SQL thread starts it sets ignore_log_space_limit to false.
We should also reset ignore_log_space_limit to 0 when the user does
RESET SLAVE, but in fact, no need as RESET SLAVE requires that the slave
be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
RESET SLAVE, but in fact, no need as RESET SLAVE requires that the
slave be stopped, and the SQL thread sets ignore_log_space_limit
to 0 when
it stops.
*/
mysql_mutex_lock(&rli->log_space_lock);
@ -5965,7 +6069,7 @@ static Log_event* next_event(Relay_log_info* rli)
mysql_mutex_unlock(&rli->log_space_lock);
mysql_cond_broadcast(&rli->log_space_cond);
// Note that wait_for_update_relay_log unlocks lock_log !
rli->relay_log.wait_for_update_relay_log(rli->sql_thd);
rli->relay_log.wait_for_update_relay_log(rli->sql_driver_thd);
// re-acquire data lock since we released it earlier
mysql_mutex_lock(&rli->data_lock);
rli->last_master_timestamp= save_timestamp;

View File

@ -57,6 +57,7 @@
#include "sql_table.h" // build_table_filename
#include "datadict.h" // dd_frm_is_view()
#include "sql_hset.h" // Hash_set
#include "rpl_rli.h" // rpl_group_info
#ifdef __WIN__
#include <io.h>
#endif
@ -1230,11 +1231,24 @@ bool close_cached_connection_tables(THD *thd, LEX_STRING *connection)
static void mark_temp_tables_as_free_for_reuse(THD *thd)
{
DBUG_ENTER("mark_temp_tables_as_free_for_reuse");
thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
thd->unlock_temporary_tables();
if (thd->rgi_slave)
{
/*
Temporary tables are shared with other by sql execution threads.
As a safety messure, clear the pointer to the common area.
*/
thd->temporary_tables= 0;
}
DBUG_VOID_RETURN;
}
@ -1248,6 +1262,7 @@ static void mark_temp_tables_as_free_for_reuse(THD *thd)
void mark_tmp_table_for_reuse(TABLE *table)
{
DBUG_ENTER("mark_tmp_table_for_reuse");
DBUG_ASSERT(table->s->tmp_table);
table->query_id= 0;
@ -1278,6 +1293,7 @@ void mark_tmp_table_for_reuse(TABLE *table)
LOCK TABLES is allowed (but ignored) for a temporary table.
*/
table->reginfo.lock_type= TL_WRITE;
DBUG_VOID_RETURN;
}
@ -1628,6 +1644,10 @@ static inline uint tmpkeyval(THD *thd, TABLE *table)
/*
Close all temporary tables created by 'CREATE TEMPORARY TABLE' for thread
creates one DROP TEMPORARY TABLE binlog event for each pseudo-thread
Temporary tables created in a sql slave is closed by
Relay_log_info::close_temporary_tables()
*/
bool close_temporary_tables(THD *thd)
@ -1642,6 +1662,7 @@ bool close_temporary_tables(THD *thd)
if (!thd->temporary_tables)
DBUG_RETURN(FALSE);
DBUG_ASSERT(!thd->rgi_slave);
if (!mysql_bin_log.is_open())
{
@ -2096,16 +2117,42 @@ TABLE *find_temporary_table(THD *thd,
const char *table_key,
uint table_key_length)
{
TABLE *result= 0;
if (!thd->have_temporary_tables())
return NULL;
thd->lock_temporary_tables();
for (TABLE *table= thd->temporary_tables; table; table= table->next)
{
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
return table;
/*
We need to set the THD as it may be different in case of
parallel replication
*/
if (table->in_use != thd)
{
table->in_use= thd;
#ifdef REMOVE_AFTER_MERGE_WITH_10
if (thd->rgi_slave)
{
/*
We may be stealing an opened temporary tables from one slave
thread to another, we need to let the performance schema know that,
for aggregates per thread to work properly.
*/
table->file->unbind_psi();
table->file->rebind_psi();
}
#endif
}
result= table;
break;
}
}
return NULL;
thd->unlock_temporary_tables();
return result;
}
@ -2153,6 +2200,9 @@ int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
/* Table might be in use by some outer statement. */
if (table->query_id && table->query_id != thd->query_id)
{
DBUG_PRINT("info", ("table->query_id: %lu thd->query_id: %lu",
(ulong) table->query_id, (ulong) thd->query_id));
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
DBUG_RETURN(-1);
}
@ -2181,6 +2231,7 @@ void close_temporary_table(THD *thd, TABLE *table,
table->s->db.str, table->s->table_name.str,
(long) table, table->alias.c_ptr()));
thd->lock_temporary_tables();
if (table->prev)
{
table->prev->next= table->next;
@ -2200,12 +2251,14 @@ void close_temporary_table(THD *thd, TABLE *table,
if (thd->temporary_tables)
table->next->prev= 0;
}
if (thd->slave_thread)
if (thd->rgi_slave)
{
/* natural invariant of temporary_tables */
DBUG_ASSERT(slave_open_temp_tables || !thd->temporary_tables);
slave_open_temp_tables--;
thread_safe_decrement32(&slave_open_temp_tables, &thread_running_lock);
table->in_use= 0; // No statistics
}
thd->unlock_temporary_tables();
close_temporary(table, free_share, delete_table);
DBUG_VOID_RETURN;
}
@ -2651,17 +2704,13 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
TODO: move this block into a separate function.
*/
if (table_list->open_type != OT_BASE_ONLY &&
! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
! (flags & MYSQL_OPEN_SKIP_TEMPORARY) && thd->have_temporary_tables())
{
for (table= thd->temporary_tables; table ; table=table->next)
{
if (table->s->table_cache_key.length == key_length +
TMP_TABLE_KEY_EXTRA &&
!memcmp(table->s->table_cache_key.str, key,
key_length + TMP_TABLE_KEY_EXTRA))
if ((table= find_temporary_table(thd, key,
key_length + TMP_TABLE_KEY_EXTRA)))
{
/*
We're trying to use the same temporary table twice in a query.
Check if we're trying to use the same temporary table twice in a query.
Right now we don't support this because a temporary table
is always represented by only one TABLE object in THD, and
it can not be cloned. Emit an error for an unsupported behaviour.
@ -2681,7 +2730,6 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
goto reset;
}
}
}
if (table_list->open_type == OT_TEMPORARY_ONLY ||
(flags & MYSQL_OPEN_TEMPORARY_ONLY))
@ -5987,14 +6035,18 @@ TABLE *open_table_uncached(THD *thd, handlerton *hton,
if (add_to_temporary_tables_list)
{
thd->lock_temporary_tables();
/* growing temp list at the head */
tmp_table->next= thd->temporary_tables;
if (tmp_table->next)
tmp_table->next->prev= tmp_table;
thd->temporary_tables= tmp_table;
thd->temporary_tables->prev= 0;
if (thd->slave_thread)
slave_open_temp_tables++;
if (thd->rgi_slave)
{
thread_safe_increment32(&slave_open_temp_tables, &thread_running_lock);
}
thd->unlock_temporary_tables();
}
tmp_table->pos_in_table_list= 0;
DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,

View File

@ -99,6 +99,7 @@ void mysql_client_binlog_statement(THD* thd)
}
if (!(rgi= thd->rgi_fake))
rgi= thd->rgi_fake= new rpl_group_info(rli);
rgi->thd= thd;
const char *error= 0;
char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
@ -115,7 +116,7 @@ void mysql_client_binlog_statement(THD* thd)
goto end;
}
rli->sql_thd= thd;
rli->sql_driver_thd= thd;
rli->no_storage= TRUE;
for (char const *strptr= thd->lex->comment.str ;
@ -200,8 +201,6 @@ void mysql_client_binlog_statement(THD* thd)
}
}
rgi->rli= rli;
rgi->thd= thd;
ev= Log_event::read_log_event(bufptr, event_len, &error,
rli->relay_log.description_event_for_exec,
0);

View File

@ -5597,6 +5597,24 @@ THD::signal_wakeup_ready()
}
void THD::rgi_lock_temporary_tables()
{
mysql_mutex_lock(&rgi_slave->rli->data_lock);
temporary_tables= rgi_slave->rli->save_temporary_tables;
}
void THD::rgi_unlock_temporary_tables()
{
rgi_slave->rli->save_temporary_tables= temporary_tables;
mysql_mutex_unlock(&rgi_slave->rli->data_lock);
}
bool THD::rgi_have_temporary_tables()
{
return rgi_slave->rli->save_temporary_tables != 0;
}
wait_for_commit::wait_for_commit()
: subsequent_commits_list(0), next_subsequent_commit(0), waitee(0),
opaque_pointer(0),

View File

@ -3371,6 +3371,27 @@ private:
bool wakeup_ready;
mysql_mutex_t LOCK_wakeup_ready;
mysql_cond_t COND_wakeup_ready;
/* Protect against add/delete of temporary tables in parallel replication */
void rgi_lock_temporary_tables();
void rgi_unlock_temporary_tables();
bool rgi_have_temporary_tables();
public:
inline void lock_temporary_tables()
{
if (rgi_slave)
rgi_lock_temporary_tables();
}
inline void unlock_temporary_tables()
{
if (rgi_slave)
rgi_unlock_temporary_tables();
}
inline bool have_temporary_tables()
{
return (temporary_tables ||
(rgi_slave && rgi_have_temporary_tables()));
}
};

View File

@ -3955,6 +3955,7 @@ end_with_restore_list:
break;
case SQLCOM_BEGIN:
DBUG_PRINT("info", ("Executing SQLCOM_BEGIN thd: %p", thd));
if (trans_begin(thd, lex->start_transaction_opt))
goto error;
my_ok(thd);

View File

@ -139,6 +139,11 @@ bool trans_begin(THD *thd, uint flags)
}
thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG);
/*
The following set should not be needed as the flag should always be 0
when we come here. We should at some point change this to an assert.
*/
thd->transaction.all.modified_non_trans_table= FALSE;
if (res)