Bug#46640: output from mysqlbinlog command in 5.1 breaks replication

The BINLOG statement was sharing too much code with the slave SQL thread, introduced with
the patch for Bug#32407. This caused statements to be logged with the wrong server_id, the
id stored inside the events of the BINLOG statement rather than the id of the running 
server.
      
Fix by rearranging code a bit so that only relevant parts of the code are executed by
the BINLOG statement, and the server_id of the server executing the statements will 
not be overrided by the server_id stored in the 'format description BINLOG statement'.

mysql-test/extra/binlog_tests/binlog.test:
  Added test to verify if the server_id stored in the 'format 
  description BINLOG statement' will override the server_id
  of the server executing the statements.
mysql-test/suite/binlog/r/binlog_row_binlog.result:
  Test result for bug#46640
mysql-test/suite/binlog/r/binlog_stm_binlog.result:
  Test result for bug#46640
sql/log_event.cc:
  Moved rows_event_stmt_clean() call from update_pos() to apply_event(). This in any case
  makes more sense, and is needed as update_pos() is no longer called when executing
  BINLOG statements.
  
  Moved setting of rli->relay_log.description_event_for_exec from 
  Format_description_log_event::do_update_pos() to 
  Format_description_log_event::do_apply_event()
sql/log_event_old.cc:
  Moved rows_event_stmt_clean() call from update_pos() to apply_event(). This in any case
  makes more sense, and is needed as update_pos() is no longer called when executing
  BINLOG statements.
sql/slave.cc:
  The skip flag is no longer needed, as the code path for BINLOG statement has been 
  cleaned up.
sql/sql_binlog.cc:
  Don't invoke the update_pos() code path for the BINLOG statement, as it contains code 
  that is redundant and/or harmful (especially setting thd->server_id).
This commit is contained in:
unknown 2009-10-14 09:39:05 +08:00
parent 610213149c
commit f9c6730258
8 changed files with 233 additions and 156 deletions

View File

@ -270,3 +270,42 @@ INSERT INTO test.t1 VALUES (1), (2);
CREATE TABLE test.t2 SELECT * FROM test.t1; CREATE TABLE test.t2 SELECT * FROM test.t1;
USE test; USE test;
DROP TABLES t1, t2; DROP TABLES t1, t2;
#
# Bug#46640
# This test verifies if the server_id stored in the "format
# description BINLOG statement" will override the server_id
# of the server executing the statements.
#
connect (fresh,localhost,root,,test);
connection fresh;
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY);
# Format description event, with server_id = 10;
BINLOG '
3u9kSA8KAAAAZgAAAGoAAAABAAQANS4xLjM1LW1hcmlhLWJldGExLWRlYnVnLWxvZwAAAAAAAAAA
AAAAAAAAAAAAAAAAAADe72RIEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC
';
# What server_id is logged for a statement? Should be our own, not the
# one from the format description event.
INSERT INTO t1 VALUES (1);
# INSERT INTO t1 VALUES (2), with server_id=20. Check that this is logged
# with our own server id, not the 20 from the BINLOG statement.
BINLOG '
3u9kSBMUAAAAKQAAAJEBAAAAABoAAAAAAAAABHRlc3QAAnQxAAEDAAA=
3u9kSBcUAAAAIgAAALMBAAAQABoAAAAAAAEAAf/+AgAAAA==
';
# Show binlog events to check that server ids are correct.
--replace_column 1 # 2 # 5 #
--replace_regex /Server ver: .*, Binlog ver: .*/Server ver: #, Binlog ver: #/ /table_id: [0-9]+/table_id: #/
SHOW BINLOG EVENTS;
DROP TABLE t1;
disconnect fresh;

View File

@ -1309,3 +1309,27 @@ INSERT INTO test.t1 VALUES (1), (2);
CREATE TABLE test.t2 SELECT * FROM test.t1; CREATE TABLE test.t2 SELECT * FROM test.t1;
USE test; USE test;
DROP TABLES t1, t2; DROP TABLES t1, t2;
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY);
BINLOG '
3u9kSA8KAAAAZgAAAGoAAAABAAQANS4xLjM1LW1hcmlhLWJldGExLWRlYnVnLWxvZwAAAAAAAAAA
AAAAAAAAAAAAAAAAAADe72RIEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC
';
INSERT INTO t1 VALUES (1);
BINLOG '
3u9kSBMUAAAAKQAAAJEBAAAAABoAAAAAAAAABHRlc3QAAnQxAAEDAAA=
3u9kSBcUAAAAIgAAALMBAAAQABoAAAAAAAEAAf/+AgAAAA==
';
SHOW BINLOG EVENTS;
Log_name Pos Event_type Server_id End_log_pos Info
# # Format_desc 1 # Server ver: #, Binlog ver: #
# # Query 1 # use `test`; CREATE TABLE t1 (a INT PRIMARY KEY)
# # Query 1 # BEGIN
# # Table_map 1 # table_id: # (test.t1)
# # Write_rows 1 # table_id: # flags: STMT_END_F
# # Query 1 # COMMIT
# # Query 1 # BEGIN
# # Table_map 1 # table_id: # (test.t1)
# # Write_rows 1 # table_id: # flags: STMT_END_F
# # Query 1 # COMMIT
DROP TABLE t1;

View File

@ -784,3 +784,24 @@ INSERT INTO test.t1 VALUES (1), (2);
CREATE TABLE test.t2 SELECT * FROM test.t1; CREATE TABLE test.t2 SELECT * FROM test.t1;
USE test; USE test;
DROP TABLES t1, t2; DROP TABLES t1, t2;
RESET MASTER;
CREATE TABLE t1 (a INT PRIMARY KEY);
BINLOG '
3u9kSA8KAAAAZgAAAGoAAAABAAQANS4xLjM1LW1hcmlhLWJldGExLWRlYnVnLWxvZwAAAAAAAAAA
AAAAAAAAAAAAAAAAAADe72RIEzgNAAgAEgAEBAQEEgAAUwAEGggAAAAICAgC
';
INSERT INTO t1 VALUES (1);
BINLOG '
3u9kSBMUAAAAKQAAAJEBAAAAABoAAAAAAAAABHRlc3QAAnQxAAEDAAA=
3u9kSBcUAAAAIgAAALMBAAAQABoAAAAAAAEAAf/+AgAAAA==
';
SHOW BINLOG EVENTS;
Log_name Pos Event_type Server_id End_log_pos Info
# # Format_desc 1 # Server ver: #, Binlog ver: #
# # Query 1 # use `test`; CREATE TABLE t1 (a INT PRIMARY KEY)
# # Query 1 # use `test`; INSERT INTO t1 VALUES (1)
# # Query 1 # BEGIN
# # Table_map 1 # table_id: # (test.t1)
# # Write_rows 1 # table_id: # flags: STMT_END_F
# # Query 1 # COMMIT
DROP TABLE t1;

View File

@ -3859,6 +3859,7 @@ bool Format_description_log_event::write(IO_CACHE* file)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Format_description_log_event::do_apply_event(Relay_log_info const *rli) int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
{ {
int ret= 0;
DBUG_ENTER("Format_description_log_event::do_apply_event"); DBUG_ENTER("Format_description_log_event::do_apply_event");
#ifdef USING_TRANSACTIONS #ifdef USING_TRANSACTIONS
@ -3900,17 +3901,21 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
0, then 96, then jump to first really asked event (which is 0, then 96, then jump to first really asked event (which is
>96). So this is ok. >96). So this is ok.
*/ */
DBUG_RETURN(Start_log_event_v3::do_apply_event(rli)); ret= Start_log_event_v3::do_apply_event(rli);
} }
DBUG_RETURN(0);
if (!ret)
{
/* Save the information describing this binlog */
delete rli->relay_log.description_event_for_exec;
const_cast<Relay_log_info *>(rli)->relay_log.description_event_for_exec= this;
}
DBUG_RETURN(ret);
} }
int Format_description_log_event::do_update_pos(Relay_log_info *rli) int Format_description_log_event::do_update_pos(Relay_log_info *rli)
{ {
/* save the information describing this binlog */
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= this;
if (server_id == (uint32) ::server_id) if (server_id == (uint32) ::server_id)
{ {
/* /*
@ -7506,6 +7511,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->reset_current_stmt_binlog_row_based(); thd->reset_current_stmt_binlog_row_based();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error); const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
thd->is_slave_error= 1; thd->is_slave_error= 1;
DBUG_RETURN(error);
} }
/* /*
This code would ideally be placed in do_update_pos() instead, but This code would ideally be placed in do_update_pos() instead, but
@ -7534,6 +7540,14 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
} }
if (get_flags(STMT_END_F))
if (error= rows_event_stmt_cleanup(rli, thd))
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
@ -7632,33 +7646,19 @@ Rows_log_event::do_update_pos(Relay_log_info *rli)
if (get_flags(STMT_END_F)) if (get_flags(STMT_END_F))
{ {
if ((error= rows_event_stmt_cleanup(rli, thd)) == 0) /*
{ Indicate that a statement is finished.
/* Step the group log position if we are not in a transaction,
Indicate that a statement is finished. otherwise increase the event log position.
Step the group log position if we are not in a transaction, */
otherwise increase the event log position. rli->stmt_done(log_pos, when);
*/ /*
rli->stmt_done(log_pos, when); Clear any errors in thd->net.last_err*. It is not known if this is
needed or not. It is believed that any errors that may exist in
/* thd->net.last_err* are allowed. Examples of errors are "key not
Clear any errors pushed in thd->net.last_err* if for example "no key found", which is produced in the test case rpl_row_conflicts.test
found" (as this is allowed). This is a safety measure; apparently */
those errors (e.g. when executing a Delete_rows_log_event of a thd->clear_error();
non-existing row, like in rpl_row_mystery22.test,
thd->net.last_error = "Can't find record in 't1'" and last_errno=1032)
do not become visible. We still prefer to wipe them out.
*/
thd->clear_error();
}
else
{
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
}
} }
else else
{ {

View File

@ -1814,7 +1814,56 @@ int Old_rows_log_event::do_apply_event(Relay_log_info const *rli)
const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0); const_cast<Relay_log_info*>(rli)->last_event_start_time= my_time(0);
} }
DBUG_RETURN(0); if (get_flags(STMT_END_F))
{
/*
This is the end of a statement or transaction, so close (and
unlock) the tables we opened when processing the
Table_map_log_event starting the statement.
OBSERVER. This will clear *all* mappings, not only those that
are open for the table. There is not good handle for on-close
actions for tables.
NOTE. Even if we have no table ('table' == 0) we still need to be
here, so that we increase the group relay log position. If we didn't, we
could have a group relay log position which lags behind "forever"
(assume the last master's transaction is ignored by the slave because of
replicate-ignore rules).
*/
thd->binlog_flush_pending_rows_event(true);
/*
If this event is not in a transaction, the call below will, if some
transactional storage engines are involved, commit the statement into
them and flush the pending event to binlog.
If this event is in a transaction, the call will do nothing, but a
Xid_log_event will come next which will, if some transactional engines
are involved, commit the transaction and flush the pending event to the
binlog.
*/
if (error= ha_autocommit_or_rollback(thd, 0))
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
/*
Now what if this is not a transactional engine? we still need to
flush the pending event to the binlog; we did it with
thd->binlog_flush_pending_rows_event(). Note that we imitate
what is done for real queries: a call to
ha_autocommit_or_rollback() (sometimes only if involves a
transactional engine), and a call to be sure to have the pending
event flushed.
*/
thd->reset_current_stmt_binlog_row_based();
const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
}
DBUG_RETURN(error);
} }
@ -1844,71 +1893,18 @@ Old_rows_log_event::do_update_pos(Relay_log_info *rli)
if (get_flags(STMT_END_F)) if (get_flags(STMT_END_F))
{ {
/* /*
This is the end of a statement or transaction, so close (and Indicate that a statement is finished.
unlock) the tables we opened when processing the Step the group log position if we are not in a transaction,
Table_map_log_event starting the statement. otherwise increase the event log position.
*/
OBSERVER. This will clear *all* mappings, not only those that rli->stmt_done(log_pos, when);
are open for the table. There is not good handle for on-close
actions for tables.
NOTE. Even if we have no table ('table' == 0) we still need to be
here, so that we increase the group relay log position. If we didn't, we
could have a group relay log position which lags behind "forever"
(assume the last master's transaction is ignored by the slave because of
replicate-ignore rules).
*/
thd->binlog_flush_pending_rows_event(true);
/* /*
If this event is not in a transaction, the call below will, if some Clear any errors in thd->net.last_err*. It is not known if this is
transactional storage engines are involved, commit the statement into needed or not. It is believed that any errors that may exist in
them and flush the pending event to binlog. thd->net.last_err* are allowed. Examples of errors are "key not
If this event is in a transaction, the call will do nothing, but a found", which is produced in the test case rpl_row_conflicts.test
Xid_log_event will come next which will, if some transactional engines
are involved, commit the transaction and flush the pending event to the
binlog.
*/ */
error= ha_autocommit_or_rollback(thd, 0); thd->clear_error();
/*
Now what if this is not a transactional engine? we still need to
flush the pending event to the binlog; we did it with
thd->binlog_flush_pending_rows_event(). Note that we imitate
what is done for real queries: a call to
ha_autocommit_or_rollback() (sometimes only if involves a
transactional engine), and a call to be sure to have the pending
event flushed.
*/
thd->reset_current_stmt_binlog_row_based();
rli->cleanup_context(thd, 0);
if (error == 0)
{
/*
Indicate that a statement is finished.
Step the group log position if we are not in a transaction,
otherwise increase the event log position.
*/
rli->stmt_done(log_pos, when);
/*
Clear any errors pushed in thd->net.client_last_err* if for
example "no key found" (as this is allowed). This is a safety
measure; apparently those errors (e.g. when executing a
Delete_rows_log_event_old of a non-existing row, like in
rpl_row_mystery22.test, thd->net.last_error = "Can't
find record in 't1'" and last_errno=1032) do not become
visible. We still prefer to wipe them out.
*/
thd->clear_error();
}
else
rli->report(ERROR_LEVEL, error,
"Error in %s event: commit of row events failed, "
"table `%s`.`%s`",
get_type_str(), m_table->s->db.str,
m_table->s->table_name.str);
} }
else else
{ {

View File

@ -2082,8 +2082,7 @@ static int has_temporary_error(THD *thd)
@retval 2 No error calling ev->apply_event(), but error calling @retval 2 No error calling ev->apply_event(), but error calling
ev->update_pos(). ev->update_pos().
*/ */
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
bool skip)
{ {
int exec_res= 0; int exec_res= 0;
@ -2128,38 +2127,34 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli,
ev->when= my_time(0); ev->when= my_time(0);
ev->thd = thd; // because up to this point, ev->thd == 0 ev->thd = thd; // because up to this point, ev->thd == 0
if (skip) int reason= ev->shall_skip(rli);
{ if (reason == Log_event::EVENT_SKIP_COUNT)
int reason= ev->shall_skip(rli); --rli->slave_skip_counter;
if (reason == Log_event::EVENT_SKIP_COUNT) pthread_mutex_unlock(&rli->data_lock);
--rli->slave_skip_counter; if (reason == Log_event::EVENT_SKIP_NOT)
pthread_mutex_unlock(&rli->data_lock);
if (reason == Log_event::EVENT_SKIP_NOT)
exec_res= ev->apply_event(rli);
#ifndef DBUG_OFF
/*
This only prints information to the debug trace.
TODO: Print an informational message to the error log?
*/
static const char *const explain[] = {
// EVENT_SKIP_NOT,
"not skipped",
// EVENT_SKIP_IGNORE,
"skipped because event should be ignored",
// EVENT_SKIP_COUNT
"skipped because event skip counter was non-zero"
};
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
thd->options & OPTION_BEGIN ? 1 : 0,
rli->get_flag(Relay_log_info::IN_STMT)));
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
#endif
}
else
exec_res= ev->apply_event(rli); exec_res= ev->apply_event(rli);
#ifndef DBUG_OFF
/*
This only prints information to the debug trace.
TODO: Print an informational message to the error log?
*/
static const char *const explain[] = {
// EVENT_SKIP_NOT,
"not skipped",
// EVENT_SKIP_IGNORE,
"skipped because event should be ignored",
// EVENT_SKIP_COUNT
"skipped because event skip counter was non-zero"
};
DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
thd->options & OPTION_BEGIN ? 1 : 0,
rli->get_flag(Relay_log_info::IN_STMT)));
DBUG_PRINT("skip_event", ("%s event was %s",
ev->get_type_str(), explain[reason]));
#endif
DBUG_PRINT("info", ("apply_event error = %d", exec_res)); DBUG_PRINT("info", ("apply_event error = %d", exec_res));
if (exec_res == 0) if (exec_res == 0)
{ {
@ -2278,7 +2273,7 @@ static int exec_relay_log_event(THD* thd, Relay_log_info* rli)
delete ev; delete ev;
DBUG_RETURN(1); DBUG_RETURN(1);
} }
exec_res= apply_event_and_update_pos(ev, thd, rli, TRUE); exec_res= apply_event_and_update_pos(ev, thd, rli);
/* /*
Format_description_log_event should not be deleted because it will be Format_description_log_event should not be deleted because it will be

View File

@ -190,8 +190,7 @@ int purge_relay_logs(Relay_log_info* rli, THD *thd, bool just_reset,
void set_slave_thread_options(THD* thd); void set_slave_thread_options(THD* thd);
void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli); void set_slave_thread_default_charset(THD *thd, Relay_log_info const *rli);
void rotate_relay_log(Master_info* mi); void rotate_relay_log(Master_info* mi);
int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli, int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli);
bool skip);
pthread_handler_t handle_slave_io(void *arg); pthread_handler_t handle_slave_io(void *arg);
pthread_handler_t handle_slave_sql(void *arg); pthread_handler_t handle_slave_sql(void *arg);

View File

@ -56,17 +56,20 @@ void mysql_client_binlog_statement(THD* thd)
Format_description_event. Format_description_event.
*/ */
my_bool have_fd_event= TRUE; my_bool have_fd_event= TRUE;
if (!thd->rli_fake) int err;
Relay_log_info *rli;
rli= thd->rli_fake;
if (!rli)
{ {
thd->rli_fake= new Relay_log_info; rli= thd->rli_fake= new Relay_log_info;
#ifdef HAVE_purify #ifdef HAVE_purify
thd->rli_fake->is_fake= TRUE; rli->is_fake= TRUE;
#endif #endif
have_fd_event= FALSE; have_fd_event= FALSE;
} }
if (thd->rli_fake && !thd->rli_fake->relay_log.description_event_for_exec) if (rli && !rli->relay_log.description_event_for_exec)
{ {
thd->rli_fake->relay_log.description_event_for_exec= rli->relay_log.description_event_for_exec=
new Format_description_log_event(4); new Format_description_log_event(4);
have_fd_event= FALSE; have_fd_event= FALSE;
} }
@ -78,16 +81,16 @@ void mysql_client_binlog_statement(THD* thd)
/* /*
Out of memory check Out of memory check
*/ */
if (!(thd->rli_fake && if (!(rli &&
thd->rli_fake->relay_log.description_event_for_exec && rli->relay_log.description_event_for_exec &&
buf)) buf))
{ {
my_error(ER_OUTOFMEMORY, MYF(0), 1); /* needed 1 bytes */ my_error(ER_OUTOFMEMORY, MYF(0), 1); /* needed 1 bytes */
goto end; goto end;
} }
thd->rli_fake->sql_thd= thd; rli->sql_thd= thd;
thd->rli_fake->no_storage= TRUE; rli->no_storage= TRUE;
for (char const *strptr= thd->lex->comment.str ; for (char const *strptr= thd->lex->comment.str ;
strptr < thd->lex->comment.str + thd->lex->comment.length ; ) strptr < thd->lex->comment.str + thd->lex->comment.length ; )
@ -170,8 +173,7 @@ void mysql_client_binlog_statement(THD* thd)
} }
ev= Log_event::read_log_event(bufptr, event_len, &error, ev= Log_event::read_log_event(bufptr, event_len, &error,
thd->rli_fake->relay_log. rli->relay_log.description_event_for_exec);
description_event_for_exec);
DBUG_PRINT("info",("binlog base64 err=%s", error)); DBUG_PRINT("info",("binlog base64 err=%s", error));
if (!ev) if (!ev)
@ -209,18 +211,10 @@ void mysql_client_binlog_statement(THD* thd)
reporting. reporting.
*/ */
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
if (apply_event_and_update_pos(ev, thd, thd->rli_fake, FALSE)) err= ev->apply_event(rli);
{ #else
delete ev; err= 0;
/*
TODO: Maybe a better error message since the BINLOG statement
now contains several events.
*/
my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
goto end;
}
#endif #endif
/* /*
Format_description_log_event should not be deleted because it Format_description_log_event should not be deleted because it
will be used to read info about the relay log's format; it will be used to read info about the relay log's format; it
@ -228,8 +222,17 @@ void mysql_client_binlog_statement(THD* thd)
i.e. when this thread terminates. i.e. when this thread terminates.
*/ */
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT) if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
delete ev; delete ev;
ev= 0; ev= 0;
if (err)
{
/*
TODO: Maybe a better error message since the BINLOG statement
now contains several events.
*/
my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
goto end;
}
} }
} }
@ -238,7 +241,7 @@ void mysql_client_binlog_statement(THD* thd)
my_ok(thd); my_ok(thd);
end: end:
thd->rli_fake->clear_tables_to_lock(); rli->clear_tables_to_lock();
my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }