Merge romeo.(none):/home/bkroot/mysql-5.1-new-rpl

into  romeo.(none):/home/bk/b22583-mysql-5.1-new-rpl


sql/log_event.h:
  Auto merged
sql/sql_class.cc:
  Auto merged
sql/log_event.cc:
  Manual merge
This commit is contained in:
unknown 2007-03-22 17:34:25 +01:00
commit d141cb0d19
16 changed files with 1008 additions and 403 deletions

View File

@ -18,6 +18,29 @@ create table t2 like t1;
load data local infile 'MYSQLTEST_VARDIR/master-data/test/rpl_misc_functions.outfile' into table t2;
select * from t1, t2 where (t1.id=t2.id) and not(t1.i=t2.i and t1.r1=t2.r1 and t1.r2=t2.r2 and t1.p=t2.p);
id i r1 r2 p id i r1 r2 p
stop slave;
drop table t1;
drop table t1;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (col_a double default NULL);
CREATE PROCEDURE test_replication_sp1()
BEGIN
INSERT INTO t1 VALUES (rand()), (rand());
INSERT INTO t1 VALUES (rand());
END|
CREATE PROCEDURE test_replication_sp2()
BEGIN
CALL test_replication_sp1();
CALL test_replication_sp1();
END|
CREATE FUNCTION test_replication_sf() RETURNS DOUBLE DETERMINISTIC
BEGIN
RETURN (rand() + rand());
END|
CALL test_replication_sp1();
CALL test_replication_sp2();
INSERT INTO t1 VALUES (test_replication_sf());
INSERT INTO t1 VALUES (test_replication_sf());
INSERT INTO t1 VALUES (test_replication_sf());
DROP PROCEDURE IF EXISTS test_replication_sp1;
DROP PROCEDURE IF EXISTS test_replication_sp2;
DROP FUNCTION IF EXISTS test_replication_sf;
DROP TABLE IF EXISTS t1;

View File

@ -97,8 +97,8 @@ Replicate_Do_Table
Replicate_Ignore_Table <Replicate_Ignore_Table>
Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table
Last_Errno 146
Last_Error Error in Write_rows event: error during transaction execution on table test.t1
Last_Errno 1105
Last_Error Unknown error
Skip_Counter 0
Exec_Master_Log_Pos <Exec_Master_Log_Pos>
Relay_Log_Space <Relay_Log_Space>

View File

@ -122,7 +122,7 @@ Replicate_Do_Table
Replicate_Ignore_Table
Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table
Last_Errno 1364
Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0
Exec_Master_Log_Pos #

View File

@ -122,7 +122,7 @@ Replicate_Do_Table
Replicate_Ignore_Table
Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table
Last_Errno 1364
Last_Errno 1105
Last_Error Error in Write_rows event: error during transaction execution on table test.t1_nodef
Skip_Counter 0
Exec_Master_Log_Pos #

View File

@ -28,10 +28,74 @@ create table t2 like t1;
eval load data local infile '$MYSQLTEST_VARDIR/master-data/test/rpl_misc_functions.outfile' into table t2;
# compare them with the replica; the SELECT below should return no row
select * from t1, t2 where (t1.id=t2.id) and not(t1.i=t2.i and t1.r1=t2.r1 and t1.r2=t2.r2 and t1.p=t2.p);
stop slave;
drop table t1;
connection master;
drop table t1;
# End of 4.1 tests
#
# BUG#25543 test calling rand() multiple times on the master in
# a stored procedure.
#
--disable_warnings
DROP TABLE IF EXISTS t1;
--enable_warnings
CREATE TABLE t1 (col_a double default NULL);
DELIMITER |;
# Use a SP that calls rand() multiple times
CREATE PROCEDURE test_replication_sp1()
BEGIN
INSERT INTO t1 VALUES (rand()), (rand());
INSERT INTO t1 VALUES (rand());
END|
# Use a SP that calls another SP to call rand() multiple times
CREATE PROCEDURE test_replication_sp2()
BEGIN
CALL test_replication_sp1();
CALL test_replication_sp1();
END|
# Use a SF that calls rand() multiple times
CREATE FUNCTION test_replication_sf() RETURNS DOUBLE DETERMINISTIC
BEGIN
RETURN (rand() + rand());
END|
DELIMITER ;|
# Exercise the functions and procedures then compare the results on
# the master to those on the slave.
CALL test_replication_sp1();
CALL test_replication_sp2();
INSERT INTO t1 VALUES (test_replication_sf());
INSERT INTO t1 VALUES (test_replication_sf());
INSERT INTO t1 VALUES (test_replication_sf());
# Record the results of the query on the master
--exec $MYSQL --port=$MASTER_MYPORT test -e "SELECT * FROM test.t1" > $MYSQLTEST_VARDIR/tmp/rpl_rand_master.sql
--sync_slave_with_master
# Record the results of the query on the slave
--exec $MYSQL --port=$SLAVE_MYPORT test -e "SELECT * FROM test.t1" > $MYSQLTEST_VARDIR/tmp/rpl_rand_slave.sql
# Compare the results from the master to the slave.
--exec diff $MYSQLTEST_VARDIR/tmp/rpl_rand_master.sql $MYSQLTEST_VARDIR/tmp/rpl_rand_slave.sql
# Cleanup
--disable_warnings
DROP PROCEDURE IF EXISTS test_replication_sp1;
DROP PROCEDURE IF EXISTS test_replication_sp2;
DROP FUNCTION IF EXISTS test_replication_sf;
DROP TABLE IF EXISTS t1;
--enable_warnings
# If all is good, when can cleanup our dump files.
--system rm $MYSQLTEST_VARDIR/tmp/rpl_rand_master.sql
--system rm $MYSQLTEST_VARDIR/tmp/rpl_rand_slave.sql

View File

@ -1548,7 +1548,13 @@ static int binlog_commit(handlerton *hton, THD *thd, bool all)
(binlog_trx_data*) thd->ha_data[binlog_hton->slot];
DBUG_ASSERT(mysql_bin_log.is_open());
if (all && trx_data->empty())
/*
The condition here has to be identical to the one inside
binlog_end_trans(), guarding the write of the transaction cache to
the binary log.
*/
if ((all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) &&
trx_data->empty())
{
// we're here because trans_log was flushed in MYSQL_BIN_LOG::log_xid()
trx_data->reset();
@ -2499,7 +2505,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/*
Set 'created' to 0, so that in next relay logs this event does not
trigger cleaning actions on the slave in
Format_description_log_event::exec_event().
Format_description_log_event::apply_event_impl().
*/
description_event_for_queue->created= 0;
/* Don't set log_pos in event header */
@ -3206,8 +3212,10 @@ void MYSQL_BIN_LOG::new_file_impl(bool need_lock)
{
tc_log_page_waits++;
pthread_mutex_lock(&LOCK_prep_xids);
while (prepared_xids)
while (prepared_xids) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_wait(&COND_prep_xids, &LOCK_prep_xids);
}
pthread_mutex_unlock(&LOCK_prep_xids);
}
@ -5061,8 +5069,10 @@ void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid)
{
pthread_mutex_lock(&LOCK_prep_xids);
DBUG_ASSERT(prepared_xids > 0);
if (--prepared_xids == 0)
if (--prepared_xids == 0) {
DBUG_PRINT("info", ("prepared_xids=%lu", prepared_xids));
pthread_cond_signal(&COND_prep_xids);
}
pthread_mutex_unlock(&LOCK_prep_xids);
rotate_and_purge(0); // as ::write() did not rotate
}

File diff suppressed because it is too large Load Diff

View File

@ -516,6 +516,7 @@ class THD;
class Format_description_log_event;
struct st_relay_log_info;
typedef st_relay_log_info RELAY_LOG_INFO;
#ifdef MYSQL_CLIENT
/*
@ -604,6 +605,33 @@ typedef struct st_print_event_info
class Log_event
{
public:
/**
Enumeration of what kinds of skipping (and non-skipping) that can
occur when the slave executes an event.
@see shall_skip
@see do_shall_skip
*/
enum enum_skip_reason {
/**
Don't skip event.
*/
EVENT_SKIP_NOT,
/**
Skip event by ignoring it.
This means that the slave skip counter will not be changed.
*/
EVENT_SKIP_IGNORE,
/**
Skip event and decrease skip counter.
*/
EVENT_SKIP_COUNT
};
/*
The following type definition is to be used whenever data is placed
and manipulated in a common buffer. Use this typedef for buffers
@ -685,16 +713,14 @@ public:
static void init_show_field_list(List<Item>* field_list);
#ifdef HAVE_REPLICATION
int net_send(Protocol *protocol, const char* log_name, my_off_t pos);
/*
pack_info() is used by SHOW BINLOG EVENTS; as print() it prepares and sends
a string to display to the user, so it resembles print().
*/
virtual void pack_info(Protocol *protocol);
/*
The SQL slave thread calls exec_event() to execute the event; this is where
the slave's data is modified.
*/
virtual int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
virtual const char* get_db()
{
@ -767,6 +793,127 @@ public:
*description_event);
/* returns the human readable name of the event's type */
const char* get_type_str();
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
public:
/**
Apply the event to the database.
This function represents the public interface for applying an
event.
@see do_apply_event
*/
int apply_event(RELAY_LOG_INFO const *rli) {
return do_apply_event(rli);
}
/**
Update the relay log position.
This function represents the public interface for "stepping over"
the event and will update the relay log information.
@see do_update_pos
*/
int update_pos(RELAY_LOG_INFO *rli)
{
return do_update_pos(rli);
}
/**
Decide if the event shall be skipped, and the reason for skipping
it.
@see do_shall_skip
*/
enum_skip_reason shall_skip(RELAY_LOG_INFO *rli)
{
return do_shall_skip(rli);
}
protected:
/**
Primitive to apply an event to the database.
This is where the change to the database is made.
@note The primitive is protected instead of private, since there
is a hierarchy of actions to be performed in some cases.
@see Format_description_log_event::do_apply_event()
@param rli Pointer to relay log info structure
@retval 0 Event applied successfully
@retval errno Error code if event application failed
*/
virtual int do_apply_event(RELAY_LOG_INFO const *rli)
{
return 0; /* Default implementation does nothing */
}
/**
Advance relay log coordinates.
This function is called to advance the relay log coordinates to
just after the event. It is essential that both the relay log
coordinate and the group log position is updated correctly, since
this function is used also for skipping events.
Normally, each implementation of do_update_pos() shall:
- Update the event position to refer to the position just after
the event.
- Update the group log position to refer to the position just
after the event <em>if the event is last in a group</em>
@param rli Pointer to relay log info structure
@retval 0 Coordinates changed successfully
@retval errno Error code if advancing failed (usually just
1). Observe that handler errors are returned by the
do_apply_event() function, and not by this one.
*/
virtual int do_update_pos(RELAY_LOG_INFO *rli);
/**
Decide if this event shall be skipped or not and the reason for
skipping it.
The default implementation decide that the event shall be skipped
if either:
- the server id of the event is the same as the server id of the
server and <code>rli->replicate_same_server_id</code> is true,
or
- if <code>rli->slave_skip_counter</code> is greater than zero.
@see do_apply_event
@see do_update_pos
@retval Log_event::EVENT_SKIP_NOT
The event shall not be skipped and should be applied.
@retval Log_event::EVENT_SKIP_IGNORE
The event shall be skipped by just ignoring it, i.e., the slave
skip counter shall not be changed. This happends if, for example,
the originating server id of the event is the same as the server
id of the slave.
@retval Log_event::EVENT_SKIP_COUNT
The event shall be skipped because the slave skip counter was
non-zero. The caller shall decrease the counter by one.
*/
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
#endif
};
/*
@ -807,10 +954,10 @@ public:
uint16 error_code;
ulong thread_id;
/*
For events created by Query_log_event::exec_event (and
Load_log_event::exec_event()) we need the *original* thread id, to be able
to log the event with the original (=master's) thread id (fix for
BUG#1686).
For events created by Query_log_event::do_apply_event (and
Load_log_event::do_apply_event()) we need the *original* thread
id, to be able to log the event with the original (=master's)
thread id (fix for BUG#1686).
*/
ulong slave_proxy_id;
@ -873,9 +1020,6 @@ public:
const char* get_db() { return db; }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
int exec_event(struct st_relay_log_info* rli, const char *query_arg,
uint32 q_len_arg);
#endif /* HAVE_REPLICATION */
#else
void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info);
@ -904,6 +1048,16 @@ public:
*/
virtual ulong get_post_header_size_for_derived() { return 0; }
/* Writes derived event-specific part of post header. */
public: /* !!! Public in this patch to allow old usage */
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual int do_update_pos(RELAY_LOG_INFO *rli);
int do_apply_event(RELAY_LOG_INFO const *rli,
const char *query_arg,
uint32 q_len_arg);
#endif /* HAVE_REPLICATION */
};
@ -952,9 +1106,8 @@ public:
uint16 master_port;
#ifndef MYSQL_CLIENT
Slave_log_event(THD* thd_arg, struct st_relay_log_info* rli);
Slave_log_event(THD* thd_arg, RELAY_LOG_INFO* rli);
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif
@ -967,6 +1120,11 @@ public:
#ifndef MYSQL_CLIENT
bool write(IO_CACHE* file);
#endif
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const* rli);
#endif
};
#endif /* HAVE_REPLICATION */
@ -1036,12 +1194,6 @@ public:
const char* get_db() { return db; }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli)
{
return exec_event(thd->slave_net,rli,0);
}
int exec_event(NET* net, struct st_relay_log_info* rli,
bool use_rli_only_for_errors);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1073,6 +1225,17 @@ public:
+ LOAD_HEADER_LEN
+ sql_ex.data_size() + field_block_len + num_fields);
}
public: /* !!! Public in this patch to allow old usage */
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const* rli)
{
return do_apply_event(thd->slave_net,rli,0);
}
int do_apply_event(NET *net, RELAY_LOG_INFO const *rli,
bool use_rli_only_for_errors);
#endif
};
extern char server_version[SERVER_VERSION_LENGTH];
@ -1130,7 +1293,6 @@ public:
Start_log_event_v3();
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
Start_log_event_v3() {}
@ -1150,6 +1312,22 @@ public:
return START_V3_HEADER_LEN; //no variable-sized part
}
virtual bool is_artificial_event() { return artificial_event; }
protected:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO*)
{
/*
Events from ourself should be skipped, but they should not
decrease the slave skip counter.
*/
if (this->server_id == ::server_id)
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::EVENT_SKIP_NOT;
}
#endif
};
@ -1175,13 +1353,6 @@ public:
uchar server_version_split[3];
Format_description_log_event(uint8 binlog_ver, const char* server_ver=0);
#ifndef MYSQL_CLIENT
#ifdef HAVE_REPLICATION
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#endif
Format_description_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Format_description_log_event() { my_free((gptr)post_header_len, MYF(0)); }
@ -1204,7 +1375,15 @@ public:
*/
return FORMAT_DESCRIPTION_HEADER_LEN;
}
void calc_server_version_split();
protected:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual int do_update_pos(RELAY_LOG_INFO *rli);
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
#endif
};
@ -1228,7 +1407,6 @@ public:
{}
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1243,6 +1421,13 @@ public:
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual int do_update_pos(RELAY_LOG_INFO *rli);
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
#endif
};
@ -1269,7 +1454,6 @@ class Rand_log_event: public Log_event
{}
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1283,6 +1467,13 @@ class Rand_log_event: public Log_event
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual int do_update_pos(RELAY_LOG_INFO *rli);
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
#endif
};
/*****************************************************************************
@ -1306,7 +1497,6 @@ class Xid_log_event: public Log_event
Xid_log_event(THD* thd_arg, my_xid x): Log_event(thd_arg,0,0), xid(x) {}
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1320,6 +1510,11 @@ class Xid_log_event: public Log_event
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
#endif
};
/*****************************************************************************
@ -1349,7 +1544,6 @@ public:
val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg)
{ is_null= !val; }
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif
@ -1361,6 +1555,13 @@ public:
bool write(IO_CACHE* file);
#endif
bool is_valid() const { return 1; }
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual int do_update_pos(RELAY_LOG_INFO *rli);
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
#endif
};
@ -1375,7 +1576,6 @@ public:
#ifndef MYSQL_CLIENT
Stop_log_event() :Log_event()
{}
int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif
@ -1386,6 +1586,22 @@ public:
~Stop_log_event() {}
Log_event_type get_type_code() { return STOP_EVENT;}
bool is_valid() const { return 1; }
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_update_pos(RELAY_LOG_INFO *rli);
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli)
{
/*
Events from ourself should be skipped, but they should not
decrease the slave skip counter.
*/
if (this->server_id == ::server_id)
return Log_event::EVENT_SKIP_IGNORE;
else
return Log_event::EVENT_SKIP_NOT;
}
#endif
};
/*****************************************************************************
@ -1412,7 +1628,6 @@ public:
ulonglong pos_arg, uint flags);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1431,6 +1646,12 @@ public:
#ifndef MYSQL_CLIENT
bool write(IO_CACHE* file);
#endif
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_update_pos(RELAY_LOG_INFO *rli);
virtual enum_skip_reason do_shall_skip(RELAY_LOG_INFO *rli);
#endif
};
@ -1465,7 +1686,6 @@ public:
bool using_trans);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1499,6 +1719,11 @@ public:
*/
bool write_base(IO_CACHE* file);
#endif
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
#endif
};
@ -1531,7 +1756,6 @@ public:
Append_block_log_event(THD* thd, const char* db_arg, char* block_arg,
uint block_len_arg, bool using_trans);
#ifdef HAVE_REPLICATION
int exec_event(struct st_relay_log_info* rli);
void pack_info(Protocol* protocol);
virtual int get_create_or_append() const;
#endif /* HAVE_REPLICATION */
@ -1549,6 +1773,11 @@ public:
bool write(IO_CACHE* file);
const char* get_db() { return db; }
#endif
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
#endif
};
@ -1568,7 +1797,6 @@ public:
Delete_file_log_event(THD* thd, const char* db_arg, bool using_trans);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1585,6 +1813,11 @@ public:
bool write(IO_CACHE* file);
const char* get_db() { return db; }
#endif
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
#endif
};
@ -1604,7 +1837,6 @@ public:
Execute_load_log_event(THD* thd, const char* db_arg, bool using_trans);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1620,6 +1852,11 @@ public:
bool write(IO_CACHE* file);
const char* get_db() { return db; }
#endif
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
#endif
};
@ -1689,7 +1926,6 @@ public:
bool using_trans, bool suppress_use);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -1708,7 +1944,12 @@ public:
#ifndef MYSQL_CLIENT
bool write_post_header_for_derived(IO_CACHE* file);
#endif
};
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
#endif
};
#ifdef MYSQL_CLIENT
@ -1806,7 +2047,6 @@ public:
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int exec_event(struct st_relay_log_info *rli);
virtual void pack_info(Protocol *protocol);
#endif
@ -1816,6 +2056,11 @@ public:
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
virtual int do_update_pos(RELAY_LOG_INFO *rli);
#endif
#ifndef MYSQL_CLIENT
TABLE *m_table;
#endif
@ -1899,7 +2144,6 @@ public:
flag_set get_flags(flag_set flags) const { return m_flags & flags; }
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int exec_event(struct st_relay_log_info *rli);
virtual void pack_info(Protocol *protocol);
#endif
@ -1983,6 +2227,8 @@ protected:
private:
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
virtual int do_apply_event(RELAY_LOG_INFO const *rli);
/*
Primitive to prepare for a sequence of row executions.
@ -2030,7 +2276,7 @@ private:
RETURN VALUE
Error code, if something went wrong, 0 otherwise.
*/
virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end) = 0;
/*
@ -2101,7 +2347,7 @@ private:
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end);
virtual int do_exec_row(TABLE *table);
#endif
@ -2166,7 +2412,7 @@ private:
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end);
virtual int do_exec_row(TABLE *table);
#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
@ -2237,7 +2483,7 @@ private:
virtual int do_before_row_operations(TABLE *table);
virtual int do_after_row_operations(TABLE *table, int error);
virtual int do_prepare_row(THD*, RELAY_LOG_INFO*, TABLE*,
virtual int do_prepare_row(THD*, RELAY_LOG_INFO const*, TABLE*,
char const *row_start, char const **row_end);
virtual int do_exec_row(TABLE *table);
#endif

View File

@ -29,14 +29,15 @@ int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
st_relay_log_info::st_relay_log_info()
:no_storage(FALSE), info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
:no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
unsafe_to_stop_at(0)
last_event_start_time(0)
{
DBUG_ENTER("st_relay_log_info::st_relay_log_info");
@ -1001,6 +1002,22 @@ bool st_relay_log_info::is_until_satisfied()
log_pos= group_relay_log_pos;
}
#ifndef DBUG_OFF
{
char buf[32];
DBUG_PRINT("info", ("group_master_log_name='%s', group_master_log_pos=%s",
group_master_log_name, llstr(group_master_log_pos, buf)));
DBUG_PRINT("info", ("group_relay_log_name='%s', group_relay_log_pos=%s",
group_relay_log_name, llstr(group_relay_log_pos, buf)));
DBUG_PRINT("info", ("(%s) log_name='%s', log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
log_name, llstr(log_pos, buf)));
DBUG_PRINT("info", ("(%s) until_log_name='%s', until_log_pos=%s",
until_condition == UNTIL_MASTER_POS ? "master" : "relay",
until_log_name, llstr(until_log_pos, buf)));
}
#endif
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
{
/*
@ -1056,30 +1073,19 @@ void st_relay_log_info::cached_charset_invalidate()
}
bool st_relay_log_info::cached_charset_compare(char *charset)
bool st_relay_log_info::cached_charset_compare(char *charset) const
{
DBUG_ENTER("st_relay_log_info::cached_charset_compare");
if (bcmp(cached_charset, charset, sizeof(cached_charset)))
{
memcpy(cached_charset, charset, sizeof(cached_charset));
memcpy(const_cast<char*>(cached_charset), charset, sizeof(cached_charset));
DBUG_RETURN(1);
}
DBUG_RETURN(0);
}
void st_relay_log_info::transaction_end(THD* thd)
{
DBUG_ENTER("st_relay_log_info::transaction_end");
/*
Nothing to do here right now.
*/
DBUG_VOID_RETURN;
}
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
void st_relay_log_info::cleanup_context(THD *thd, bool error)
{
@ -1106,7 +1112,7 @@ void st_relay_log_info::cleanup_context(THD *thd, bool error)
m_table_map.clear_tables();
close_thread_tables(thd);
clear_tables_to_lock();
unsafe_to_stop_at= 0;
last_event_start_time= 0;
DBUG_VOID_RETURN;
}

View File

@ -58,6 +58,15 @@ typedef struct st_relay_log_info
*/
bool no_storage;
/*
If true, events with the same server id should be replicated. This
field is set on creation of a relay log info structure by copying
the value of ::replicate_same_server_id and can be overridden if
necessary. For example of when this is done, check sql_binlog.cc,
where the BINLOG statement can be used to execute "raw" events.
*/
bool replicate_same_server_id;
/*** The following variables can only be read when protect by data lock ****/
/*
@ -292,14 +301,19 @@ typedef struct st_relay_log_info
When the 6 bytes are equal to 0 is used to mean "cache is invalidated".
*/
void cached_charset_invalidate();
bool cached_charset_compare(char *charset);
void transaction_end(THD*);
bool cached_charset_compare(char *charset) const;
void cleanup_context(THD *, bool);
void clear_tables_to_lock();
time_t unsafe_to_stop_at;
/*
Used by row-based replication to detect that it should not stop at
this event, but give it a chance to send more events. The time
where the last event inside a group started is stored here. If the
variable is zero, we are not in a group (but may be in a
transaction).
*/
time_t last_event_start_time;
} RELAY_LOG_INFO;

View File

@ -108,7 +108,7 @@ field_length_from_packed(enum_field_types const field_type,
*/
int
table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table)
table_def::compatible_with(RELAY_LOG_INFO const *rli_arg, TABLE *table)
const
{
/*
@ -116,6 +116,7 @@ table_def::compatible_with(RELAY_LOG_INFO *rli, TABLE *table)
*/
uint const cols_to_check= min(table->s->fields, size());
int error= 0;
RELAY_LOG_INFO const *rli= const_cast<RELAY_LOG_INFO*>(rli_arg);
TABLE_SHARE const *const tsh= table->s;

View File

@ -117,7 +117,7 @@ public:
@retval 1 if the table definition is not compatible with @c table
@retval 0 if the table definition is compatible with @c table
*/
int compatible_with(RELAY_LOG_INFO *rli, TABLE *table) const;
int compatible_with(RELAY_LOG_INFO const *rli, TABLE *table) const;
private:
my_size_t m_size; // Number of elements in the types array

View File

@ -519,11 +519,11 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
really one minute of idleness, we don't timeout if the slave SQL thread
is actively working.
*/
if (!rli->unsafe_to_stop_at)
if (rli->last_event_start_time == 0)
DBUG_RETURN(1);
DBUG_PRINT("info", ("Slave SQL thread is in an unsafe situation, giving "
"it some grace period"));
if (difftime(time(0), rli->unsafe_to_stop_at) > 60)
if (difftime(time(0), rli->last_event_start_time) > 60)
{
slave_print_msg(ERROR_LEVEL, rli, 0,
"SQL thread had to stop in an unsafe situation, in "
@ -557,7 +557,7 @@ static bool sql_slave_killed(THD* thd, RELAY_LOG_INFO* rli)
void
*/
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
int err_code, const char* msg, ...)
{
void (*report_function)(const char *, ...);
@ -579,9 +579,9 @@ void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
It's an error, it must be reported in Last_error and Last_errno in SHOW
SLAVE STATUS.
*/
pbuff= rli->last_slave_error;
pbuff= const_cast<RELAY_LOG_INFO*>(rli)->last_slave_error;
pbuffsize= sizeof(rli->last_slave_error);
rli->last_slave_errno = err_code;
const_cast<RELAY_LOG_INFO*>(rli)->last_slave_errno = err_code;
report_function= sql_print_error;
break;
case WARNING_LEVEL:
@ -813,7 +813,7 @@ do not trust column Seconds_Behind_Master of SHOW SLAVE STATUS");
{
if ((master_row= mysql_fetch_row(master_res)) &&
(::server_id == strtoul(master_row[1], 0, 10)) &&
!replicate_same_server_id)
!mi->rli.replicate_same_server_id)
errmsg= "The slave I/O thread stops because master and slave have equal \
MySQL server ids; these ids must be different for replication to work (or \
the --replicate-same-server-id option must be used on slave but this does \
@ -1390,7 +1390,7 @@ void set_slave_thread_options(THD* thd)
DBUG_VOID_RETURN;
}
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO const *rli)
{
DBUG_ENTER("set_slave_thread_default_charset");
@ -1401,7 +1401,14 @@ void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli)
thd->variables.collation_server=
global_system_variables.collation_server;
thd->update_charset();
rli->cached_charset_invalidate();
/*
We use a const cast here since the conceptual (and externally
visible) behavior of the function is to set the default charset of
the thread. That the cache has to be invalidated is a secondary
effect.
*/
const_cast<RELAY_LOG_INFO*>(rli)->cached_charset_invalidate();
DBUG_VOID_RETURN;
}
@ -1609,7 +1616,8 @@ static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
}
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli,
int expected_error)
{
DBUG_ENTER("check_expected_error");
@ -1715,77 +1723,42 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
if (ev)
{
int type_code = ev->get_type_code();
int exec_res;
int exec_res= 0;
/*
Queries originating from this server must be skipped.
Low-level events (Format_desc, Rotate, Stop) from this server
must also be skipped. But for those we don't want to modify
group_master_log_pos, because these events did not exist on the master.
Format_desc is not completely skipped.
Skip queries specified by the user in slave_skip_counter.
We can't however skip events that has something to do with the
log files themselves.
Filtering on own server id is extremely important, to ignore execution of
events created by the creation/rotation of the relay log (remember that
now the relay log starts with its Format_desc, has a Rotate etc).
*/
DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id));
DBUG_PRINT("info",("type_code=%d (%s), server_id=%d",
type_code, ev->get_type_str(), ev->server_id));
DBUG_PRINT("info", ("thd->options={ %s%s}",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
if ((ev->server_id == (uint32) ::server_id &&
!replicate_same_server_id &&
type_code != FORMAT_DESCRIPTION_EVENT) ||
(rli->slave_skip_counter &&
type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
{
DBUG_PRINT("info", ("event skipped"));
/*
We only skip the event here and do not increase the group log
position. In the event that we have to restart, this means
that we might have to skip the event again, but that is a
minor issue.
If we were to increase the group log position when skipping an
event, it might be that we are restarting at the wrong
position and have events before that we should have executed,
so not increasing the group log position is a sure bet in this
case.
In this way, we just step the group log position when we
*know* that we are at the end of a group.
*/
rli->inc_event_relay_log_pos();
/*
Execute the event to change the database and update the binary
log coordinates, but first we set some data that is needed for
the thread.
/*
Protect against common user error of setting the counter to 1
instead of 2 while recovering from an insert which used auto_increment,
rand or user var.
*/
if (rli->slave_skip_counter &&
!((type_code == INTVAR_EVENT ||
type_code == RAND_EVENT ||
type_code == USER_VAR_EVENT) &&
rli->slave_skip_counter == 1) &&
/*
The events from ourselves which have something to do with the relay
log itself must be skipped, true, but they mustn't decrement
rli->slave_skip_counter, because the user is supposed to not see
these events (they are not in the master's binlog) and if we
decremented, START SLAVE would for example decrement when it sees
the Rotate, so the event which the user probably wanted to skip
would not be skipped.
*/
!(ev->server_id == (uint32) ::server_id &&
(type_code == ROTATE_EVENT || type_code == STOP_EVENT ||
type_code == START_EVENT_V3 || type_code == FORMAT_DESCRIPTION_EVENT)))
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
delete ev;
DBUG_RETURN(0); // avoid infinite update loops
}
pthread_mutex_unlock(&rli->data_lock);
The event will be executed unless it is supposed to be skipped.
Queries originating from this server must be skipped. Low-level
events (Format_description_log_event, Rotate_log_event,
Stop_log_event) from this server must also be skipped. But for
those we don't want to modify 'group_master_log_pos', because
these events did not exist on the master.
Format_description_log_event is not completely skipped.
Skip queries specified by the user in 'slave_skip_counter'. We
can't however skip events that has something to do with the log
files themselves.
Filtering on own server id is extremely important, to ignore
execution of events created by the creation/rotation of the relay
log (remember that now the relay log starts with its Format_desc,
has a Rotate etc).
*/
thd->server_id = ev->server_id; // use the original server id for logging
thd->set_time(); // time the query
@ -1793,13 +1766,63 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
if (!ev->when)
ev->when = time(NULL);
ev->thd = thd; // because up to this point, ev->thd == 0
DBUG_PRINT("info", ("thd->options={ %s%s}",
FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT),
FLAGSTR(thd->options, OPTION_BEGIN)));
exec_res = ev->exec_event(rli);
DBUG_PRINT("info", ("exec_event result: %d", exec_res));
DBUG_ASSERT(rli->sql_thd==thd);
int reason= ev->shall_skip(rli);
if (reason == Log_event::EVENT_SKIP_COUNT)
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
#ifndef DBUG_OFF
else
{
/*
This only prints information to the debug trace.
TODO: Print an informational message to the error log?
*/
static const char *const explain[] = {
"event was not skipped", // EVENT_SKIP_NOT,
"event originated from this server", // EVENT_SKIP_IGNORE,
"event skip counter was non-zero" // EVENT_SKIP_COUNT
};
DBUG_PRINT("info", ("%s was skipped because %s",
ev->get_type_str(), explain[reason]));
}
#endif
DBUG_PRINT("info", ("apply_event error = %d", exec_res));
if (exec_res == 0)
{
int error= ev->update_pos(rli);
char buf[22];
DBUG_PRINT("info", ("update_pos error = %d", error));
DBUG_PRINT("info", ("group %s %s",
llstr(rli->group_relay_log_pos, buf),
rli->group_relay_log_name));
DBUG_PRINT("info", ("event %s %s",
llstr(rli->event_relay_log_pos, buf),
rli->event_relay_log_name));
/*
The update should not fail, so print an error message and
return an error code.
TODO: Replace this with a decent error message when merged
with BUG#24954 (which adds several new error message).
*/
if (error)
{
slave_print_msg(ERROR_LEVEL, rli, ER_UNKNOWN_ERROR,
"It was not possible to update the positions"
" of the relay log information: the slave may"
" be in an inconsistent state."
" Stopped in %s position %s",
rli->group_relay_log_name,
llstr(rli->group_relay_log_pos, buf));
DBUG_RETURN(1);
}
}
/*
Format_description_log_event should not be deleted because it will be
used to read info about the relay log's format; it will be deleted when
@ -2366,13 +2389,17 @@ Slave SQL thread aborted. Can't execute init_slave query");
THD_CHECK_SENTRY(thd);
if (exec_relay_log_event(thd,rli))
{
DBUG_PRINT("info", ("exec_relay_log_event() failed"));
// do not scare the user if SQL thread was simply killed or stopped
if (!sql_slave_killed(thd,rli))
{
/*
retrieve as much info as possible from the thd and, error codes and warnings
and print this to the error log as to allow the user to locate the error
retrieve as much info as possible from the thd and, error
codes and warnings and print this to the error log as to
allow the user to locate the error
*/
DBUG_PRINT("info", ("thd->net.last_errno=%d; rli->last_slave_errno=%d",
thd->net.last_errno, rli->last_slave_errno));
if (thd->net.last_errno != 0)
{
if (rli->last_slave_errno == 0)
@ -2699,6 +2726,7 @@ static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(1);
}
pthread_mutex_lock(&mi->data_lock);
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
switch (ev->get_type_code()) {
@ -2962,7 +2990,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
pthread_mutex_lock(log_lock);
if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
!replicate_same_server_id)
!mi->rli.replicate_same_server_id)
{
/*
Do not write it to the relay log.

View File

@ -162,9 +162,9 @@ bool show_binlog_info(THD* thd);
bool rpl_master_has_bug(RELAY_LOG_INFO *rli, uint bug_id);
const char *print_slave_db_safe(const char *db);
int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int error_code);
int check_expected_error(THD* thd, RELAY_LOG_INFO const *rli, int error_code);
void skip_load_data_infile(NET* net);
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO* rli,
void slave_print_msg(enum loglevel level, RELAY_LOG_INFO const *rli,
int err_code, const char* msg, ...)
ATTRIBUTE_FORMAT(printf, 4, 5);
@ -182,7 +182,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
const char** errmsg);
void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD* thd, RELAY_LOG_INFO *rli);
void set_slave_thread_default_charset(THD *thd, RELAY_LOG_INFO const *rli);
void rotate_relay_log(MASTER_INFO* mi);
pthread_handler_t handle_slave_io(void *arg);

View File

@ -163,9 +163,17 @@ void mysql_client_binlog_statement(THD* thd)
(ulong) uint4korr(bufptr+EVENT_LEN_OFFSET)));
#endif
ev->thd= thd;
if (IF_DBUG(int err= ) ev->exec_event(thd->rli_fake))
/*
We go directly to the application phase, since we don't need
to check if the event shall be skipped or not.
Neither do we have to update the log positions, since that is
not used at all: the rli_fake instance is used only for error
reporting.
*/
if (IF_DBUG(int err= ) ev->apply_event(thd->rli_fake))
{
DBUG_PRINT("error", ("exec_event() returned: %d", err));
DBUG_PRINT("info", ("apply_event() returned: %d", err));
/*
TODO: Maybe a better error message since the BINLOG statement
now contains several events.

View File

@ -679,11 +679,22 @@ bool THD::store_globals()
void THD::cleanup_after_query()
{
/*
Reset rand_used so that detection of calls to rand() will save random
seeds if needed by the slave.
Do not reset rand_used if inside a stored function or trigger because
only the call to these operations is logged. Thus only the calling
statement needs to detect rand() calls made by its substatements. These
substatements must not set rand_used to 0 because it would remove the
detection of rand() by the calling statement.
*/
if (!in_sub_stmt) /* stored functions and triggers are a special case */
{
/* Forget those values, for next binlogger: */
stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
rand_used= 0;
}
if (first_successful_insert_id_in_cur_stmt > 0)
{