Merge gbichot@bk-internal.mysql.com:/home/bk/mysql-5.0

into mysql.com:/home/mysql_src/mysql-5.0-new-binlog-format
This commit is contained in:
unknown 2003-12-18 01:16:35 +01:00
commit 540e94f9dd
17 changed files with 2418 additions and 587 deletions

View File

@ -39,7 +39,7 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc ../mysys/mf_tempdir.c
mysqlbinlog_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
mysqlmanagerc_SOURCES = mysqlmanagerc.c
mysqlmanagerc_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES)
sql_src=log_event.h log_event.cc
sql_src=log_event.h mysql_priv.h log_event.cc
# Fix for mit-threads
DEFS = -DUNDEF_THREADS_HACK

View File

@ -14,12 +14,28 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
TODO: print the catalog (some USE catalog.db ????).
Standalone program to read a MySQL binary log (or relay log);
can read files produced by 3.23, 4.x, 5.0 servers.
Can read binlogs from 3.23/4.x/5.0 and relay logs from 4.x/5.0.
Should be able to read any file of these categories, even with --position.
An important fact: the Format_desc event of the log is at most the 3rd event
of the log; if it is the 3rd then there is this combination:
Format_desc_of_slave, Rotate_of_master, Format_desc_of_master.
*/
#define MYSQL_CLIENT
#undef MYSQL_SERVER
#include "client_priv.h"
#include <time.h>
#include <assert.h>
#include "log_event.h"
/* That one is necessary for defines of OPTION_NO_FOREIGN_KEY_CHECKS etc */
#include "mysql_priv.h"
#define BIN_LOG_HEADER_SIZE 4
#define PROBE_HEADER_LEN (EVENT_LEN_OFFSET+4)
@ -481,21 +497,26 @@ static int check_master_version(MYSQL* mysql)
}
/*
TODO fix this for new format (like local log); this will be done when 4.0 is
merged here (Victor's fixes are needed to make dump_remote_log_entries()
work).
*/
static void dump_remote_log_entries(const char* logname)
{
char buf[128];
char last_db[FN_REFLEN+1] = "";
LAST_EVENT_INFO last_event_info;
uint len;
NET* net = &mysql->net;
int old_format;
old_format = check_master_version(mysql);
if (!position)
position = BIN_LOG_HEADER_SIZE; // protect the innocent from spam
position = BIN_LOG_HEADER_SIZE;
if (position < BIN_LOG_HEADER_SIZE)
{
position = BIN_LOG_HEADER_SIZE;
// warn the guity
sql_print_error("Warning: The position in the binary log can't be less than %d.\nStarting from position %d\n", BIN_LOG_HEADER_SIZE, BIN_LOG_HEADER_SIZE);
}
int4store(buf, position);
@ -517,10 +538,11 @@ static void dump_remote_log_entries(const char* logname)
DBUG_PRINT("info",( "len= %u, net->read_pos[5] = %d\n",
len, net->read_pos[5]));
Log_event *ev = Log_event::read_log_event((const char*) net->read_pos + 1 ,
len - 1, &error, old_format);
len - 1, &error, 0);
//TODO this ,0) : we need to store the description_event like for local_log
if (ev)
{
ev->print(result_file, short_form, last_db);
ev->print(result_file, short_form, &last_event_info);
if (ev->get_type_code() == LOAD_EVENT)
dump_remote_file(net, ((Load_log_event*)ev)->fname);
delete ev;
@ -531,29 +553,98 @@ static void dump_remote_log_entries(const char* logname)
}
static int check_header(IO_CACHE* file)
static void check_header(IO_CACHE* file,
Format_description_log_event **description_event)
{
byte header[BIN_LOG_HEADER_SIZE];
byte buf[PROBE_HEADER_LEN];
int old_format=0;
*description_event= new Format_description_log_event(3);
my_off_t tmp_pos;
my_off_t pos = my_b_tell(file);
my_b_seek(file, (my_off_t)0);
if (my_b_read(file, header, sizeof(header)))
die("Failed reading header; Probably an empty file");
if (memcmp(header, BINLOG_MAGIC, sizeof(header)))
die("File is not a binary log file");
if (!my_b_read(file, buf, sizeof(buf)))
/*
Imagine we are running with --position=1000. We still need to know the
binlog format's. So we still need to find, if there is one, the Format_desc
event, or to know if this is a 3.23 binlog. So we need to first read the
first events of the log, those around offset 4.
Even if we are reading a 3.23 binlog from the start (no --position): we need
to know the header length (which is 13 in 3.23, 19 in 4.x) to be able to
successfully print the first event (Start_log_event_v3). So even in this
case, we need to "probe" the first bytes of the log *before* we do a real
read_log_event(). Because read_log_event() needs to know the header's length
to work fine.
*/
for(;;)
{
if (buf[4] == START_EVENT)
tmp_pos= my_b_tell(file); /* should be 4 the first time */
if (my_b_read(file, buf, sizeof(buf)))
{
uint event_len;
event_len = uint4korr(buf + EVENT_LEN_OFFSET);
old_format = (event_len < (LOG_EVENT_HEADER_LEN + START_HEADER_LEN));
if (file->error)
die("\
Could not read entry at offset %lu : Error in log format or read error",
tmp_pos);
/*
Otherwise this is just EOF : this log currently contains 0-2 events.
Maybe it's going to be filled in the next milliseconds; then we are
going to have a problem if this a 3.23 log (imagine we are locally
reading a 3.23 binlog which is being written presently): we won't know
it in read_log_event() and will fail().
Similar problems could happen with hot relay logs if --position is used
(but a --position which is posterior to the current size of the log).
These are rare problems anyway (reading a hot log + when we read the
first events there are not all there yet + when we read a bit later
there are more events + using a strange --position).
*/
break;
}
else
{
DBUG_PRINT("info",("buf[4]=%d", buf[4]));
/* always test for a Start_v3, even if no --position */
if (buf[4] == START_EVENT_V3) /* This is 3.23 or 4.x */
{
if (uint4korr(buf + EVENT_LEN_OFFSET) <
(LOG_EVENT_MINIMAL_HEADER_LEN + START_V3_HEADER_LEN))
{
/* This is 3.23 (format 1) */
delete *description_event;
*description_event= new Format_description_log_event(1);
}
break;
}
else if (tmp_pos>=position)
break;
else if (buf[4] == FORMAT_DESCRIPTION_EVENT) /* This is 5.0 */
{
my_b_seek(file, tmp_pos); /* seek back to event's start */
if (!(*description_event= (Format_description_log_event*)
Log_event::read_log_event(file, *description_event)))
/* EOF can't be hit here normally, so it's a real error */
die("Could not read a Format_description_log_event event \
at offset %lu ; this could be a log format error or read error",
tmp_pos);
DBUG_PRINT("info",("Setting description_event"));
}
else if (buf[4] == ROTATE_EVENT)
{
my_b_seek(file, tmp_pos); /* seek back to event's start */
if (!Log_event::read_log_event(file, *description_event))
/* EOF can't be hit here normally, so it's a real error */
die("Could not read a Rotate_log_event event \
at offset %lu ; this could be a log format error or read error",
tmp_pos);
}
else
break;
}
}
my_b_seek(file, pos);
return old_format;
}
@ -562,11 +653,15 @@ static void dump_local_log_entries(const char* logname)
File fd = -1;
IO_CACHE cache,*file= &cache;
ulonglong rec_count = 0;
char last_db[FN_REFLEN+1];
LAST_EVENT_INFO last_event_info;
byte tmp_buff[BIN_LOG_HEADER_SIZE];
bool old_format = 0;
last_db[0]=0;
/*
check_header() will set the pointer below.
Why do we need here a pointer on an event instead of an event ?
This is because the event will be created (alloced) in read_log_event()
(which returns a pointer) in check_header().
*/
Format_description_log_event* description_event;
if (logname && logname[0] != '-')
{
@ -575,14 +670,14 @@ static void dump_local_log_entries(const char* logname)
if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0,
MYF(MY_WME | MY_NABP)))
exit(1);
old_format = check_header(file);
check_header(file, &description_event);
}
else
else // reading from stdin; TODO: check that it works
{
if (init_io_cache(file, fileno(result_file), 0, READ_CACHE, (my_off_t) 0,
0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE)))
exit(1);
old_format = check_header(file);
check_header(file, &description_event);
if (position)
{
/* skip 'position' characters from stdout */
@ -599,6 +694,9 @@ static void dump_local_log_entries(const char* logname)
file->seek_not_done=0;
}
if (!description_event->is_valid())
die("Invalid Format_description log event; could be out of memory");
if (!position)
my_b_read(file, tmp_buff, BIN_LOG_HEADER_SIZE); // Skip header
for (;;)
@ -606,7 +704,7 @@ static void dump_local_log_entries(const char* logname)
char llbuff[21];
my_off_t old_off = my_b_tell(file);
Log_event* ev = Log_event::read_log_event(file, old_format);
Log_event* ev = Log_event::read_log_event(file, description_event);
if (!ev)
{
if (file->error)
@ -633,7 +731,7 @@ Could not read entry at offset %s : Error in log format or read error",
continue; // next
}
}
ev->print(result_file, short_form, last_db);
ev->print(result_file, short_form, &last_event_info);
break;
case CREATE_FILE_EVENT:
{
@ -661,18 +759,18 @@ Could not read entry at offset %s : Error in log format or read error",
filename and use LOCAL), prepared in the 'case EXEC_LOAD_EVENT'
below.
*/
ce->print(result_file, short_form, last_db, true);
ce->print(result_file, short_form, &last_event_info, true);
load_processor.process(ce);
ev= 0;
break;
}
case APPEND_BLOCK_EVENT:
ev->print(result_file, short_form, last_db);
ev->print(result_file, short_form, &last_event_info);
load_processor.process((Append_block_log_event*)ev);
break;
case EXEC_LOAD_EVENT:
{
ev->print(result_file, short_form, last_db);
ev->print(result_file, short_form, &last_event_info);
Execute_load_log_event *exv= (Execute_load_log_event*)ev;
Create_file_log_event *ce= load_processor.grab_event(exv->file_id);
/*
@ -682,7 +780,7 @@ Could not read entry at offset %s : Error in log format or read error",
*/
if (ce)
{
ce->print(result_file, short_form, last_db,true);
ce->print(result_file, short_form, &last_event_info,true);
my_free((char*)ce->fname,MYF(MY_WME));
delete ce;
}
@ -691,17 +789,23 @@ Could not read entry at offset %s : Error in log format or read error",
Create_file event for file_id: %u\n",exv->file_id);
break;
}
case FORMAT_DESCRIPTION_EVENT:
delete description_event;
description_event= (Format_description_log_event*) ev;
ev->print(result_file, short_form, &last_event_info);
break;
default:
ev->print(result_file, short_form, last_db);
ev->print(result_file, short_form, &last_event_info);
}
}
rec_count++;
if (ev)
delete ev;
if (ev && ev->get_type_code()!=FORMAT_DESCRIPTION_EVENT)
delete ev; /* otherwise, deleted in the end */
}
if (fd >= 0)
my_close(fd, MYF(MY_WME));
end_io_cache(file);
delete description_event;
}

View File

@ -498,6 +498,7 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
/* tell write offset in the SEQ_APPEND cache */
my_off_t my_b_append_tell(IO_CACHE* info);
my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */
#define my_b_bytes_in_cache(info) (uint) (*(info)->current_end - \
*(info)->current_pos)

View File

@ -0,0 +1,43 @@
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
drop table if exists t1;
Warnings:
Note 1051 Unknown table 't1'
create table t1(a varchar(10),b int);
set @@session.sql_mode=pipes_as_concat;
insert into t1 values('My'||'SQL', 1);
set @@session.sql_mode=default;
insert into t1 values('My'||'SQL', 2);
select * from t1 where b<3 order by a;
a b
0 2
MySQL 1
select * from t1 where b<3 order by a;
a b
0 2
MySQL 1
set @@session.sql_mode=ignore_space;
insert into t1 values(password ('MySQL'), 3);
set @@session.sql_mode=ansi_quotes;
create table "t2" ("a" int);
drop table t1, t2;
set @@session.sql_mode=default;
create table t1(a int auto_increment primary key);
create table t2(b int, a int);
set @@session.sql_auto_is_null=1;
insert into t1 values(null);
insert into t2 select 1,a from t1 where a is null;
set @@session.sql_auto_is_null=0;
insert into t1 values(null);
insert into t2 select 2,a from t1 where a is null;
select * from t2 order by b;
b a
1 1
select * from t2 order by b;
b a
1 1
drop table t1,t2;

View File

@ -0,0 +1,42 @@
# Replication of session variables.
# FOREIGN_KEY_CHECKS is tested in rpl_insert_id.test
source include/master-slave.inc;
drop table if exists t1;
create table t1(a varchar(10),b int);
set @@session.sql_mode=pipes_as_concat;
insert into t1 values('My'||'SQL', 1);
set @@session.sql_mode=default;
insert into t1 values('My'||'SQL', 2);
select * from t1 where b<3 order by a;
save_master_pos;
connection slave;
sync_with_master;
select * from t1 where b<3 order by a;
connection master;
# if the slave does the next sync_with_master fine, then it means it accepts the
# two lines of ANSI syntax below, which is what we want to check.
set @@session.sql_mode=ignore_space;
insert into t1 values(password ('MySQL'), 3);
set @@session.sql_mode=ansi_quotes;
create table "t2" ("a" int);
drop table t1, t2;
set @@session.sql_mode=default;
create table t1(a int auto_increment primary key);
create table t2(b int, a int);
set @@session.sql_auto_is_null=1;
insert into t1 values(null);
insert into t2 select 1,a from t1 where a is null;
set @@session.sql_auto_is_null=0;
insert into t1 values(null);
insert into t2 select 2,a from t1 where a is null;
select * from t2 order by b;
save_master_pos;
connection slave;
sync_with_master;
select * from t2 order by b;
connection master;
drop table t1,t2;
save_master_pos;
connection slave;
sync_with_master;

View File

@ -66,6 +66,13 @@ my_off_t my_b_append_tell(IO_CACHE* info)
return res;
}
my_off_t my_b_safe_tell(IO_CACHE *info)
{
if (unlikely(info->type == SEQ_READ_APPEND))
return my_b_append_tell(info);
return my_b_tell(info);
}
/*
Make next read happen at the given position
For write cache, make next write happen at the given position

View File

@ -987,10 +987,19 @@ innobase_commit_low(
trx->mysql_master_log_file_name
= active_mi->rli.group_master_log_name;
/*
Guilhem to Heikki: in 5.0 we don't need to do a computation
(old_pos+len) to get the end_pos, because we already have the
end_pos under hand in the replication code
(Query_log_event::exec_event()).
I tested the code change below (simulated a crash with kill
-9) and got the good (binlog, position) displayed by InnoDB at
crash recovery, so this code change is ok.
*/
trx->mysql_master_log_pos = ((ib_longlong)
(active_mi->rli.group_master_log_pos +
active_mi->rli.event_len
));
(active_mi->rli.future_group_master_log_pos
));
}
#endif /* HAVE_REPLICATION */

View File

@ -84,7 +84,8 @@ static int find_uniq_filename(char *name)
MYSQL_LOG::MYSQL_LOG()
:bytes_written(0), last_time(0), query_start(0), name(0),
file_id(1), open_count(1), log_type(LOG_CLOSED), write_error(0), inited(0),
need_start_event(1)
need_start_event(1), description_event_for_exec(0),
description_event_for_queue(0)
{
/*
We don't want to initialize LOCK_Log here as such initialization depends on
@ -111,6 +112,8 @@ void MYSQL_LOG::cleanup()
{
inited= 0;
close(LOG_CLOSE_INDEX);
delete description_event_for_queue;
delete description_event_for_exec;
(void) pthread_mutex_destroy(&LOCK_log);
(void) pthread_mutex_destroy(&LOCK_index);
(void) pthread_cond_destroy(&update_cond);
@ -179,7 +182,8 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
const char *new_name, const char *index_file_name_arg,
enum cache_type io_cache_type_arg,
bool no_auto_events_arg,
ulong max_size_arg)
ulong max_size_arg,
bool null_created_arg)
{
char buff[512];
File file= -1, index_file_nr= -1;
@ -272,8 +276,8 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if (my_b_safe_write(&log_file, (byte*) BINLOG_MAGIC,
BIN_LOG_HEADER_SIZE))
goto err;
bytes_written += BIN_LOG_HEADER_SIZE;
write_file_name_to_index_file=1;
bytes_written+= BIN_LOG_HEADER_SIZE;
write_file_name_to_index_file= 1;
}
if (!my_b_inited(&index_file))
@ -302,10 +306,42 @@ bool MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
}
if (need_start_event && !no_auto_events)
{
need_start_event=0;
Start_log_event s;
/*
In 4.x we set need_start_event=0 here, but in 5.0 we want a Start event
even if this is not the very first binlog.
*/
Format_description_log_event s(BINLOG_VERSION);
if (!s.is_valid())
goto err;
s.set_log_pos(this);
s.write(&log_file);
if (null_created_arg)
s.created= 0;
if (s.write(&log_file))
goto err;
bytes_written+= s.get_event_len();
}
if (description_event_for_queue &&
description_event_for_queue->binlog_version>=4)
{
/*
This is a relay log written to by the I/O slave thread.
Write the event so that others can later know the format of this relay
log.
Note that this event is very close to the original event from the
master (it has binlog version of the master, event types of the
master), so this is suitable to parse the next relay log's event. It
has been produced by
Format_description_log_event::Format_description_log_event(char*
buf,).
Why don't we want to write the description_event_for_queue if this event
is for format<4 (3.23 or 4.x): this is because in that case, the
description_event_for_queue describes the data received from the master,
but not the data written to the relay log (*conversion*), which is in
format 4 (slave's).
*/
if (description_event_for_queue->write(&log_file))
goto err;
bytes_written+= description_event_for_queue->get_event_len();
}
if (flush_io_cache(&log_file))
goto err;
@ -596,7 +632,7 @@ bool MYSQL_LOG::reset_logs(THD* thd)
if (!thd->slave_thread)
need_start_event=1;
open(save_name, save_log_type, 0, index_file_name,
io_cache_type, no_auto_events, max_size);
io_cache_type, no_auto_events, max_size, 0);
my_free((gptr) save_name, MYF(0));
err:
@ -986,8 +1022,17 @@ void MYSQL_LOG::new_file(bool need_lock)
Note that at this point, log_type != LOG_CLOSED (important for is_open()).
*/
/*
new_file() is only used for rotation (in FLUSH LOGS or because size >
max_binlog_size or max_relay_log_size).
If this is a binary log, the Format_description_log_event at the beginning of
the new file should have created=0 (to distinguish with the
Format_description_log_event written at server startup, which should
trigger temp tables deletion on slaves.
*/
open(old_name, save_log_type, new_name_ptr, index_file_name, io_cache_type,
no_auto_events, max_size);
no_auto_events, max_size, 1);
my_free(old_name,MYF(0));
end:
@ -1282,6 +1327,12 @@ bool MYSQL_LOG::write(Log_event* event_info)
}
#endif
#if MYSQL_VERSION_ID < 50000
/*
In 5.0 this is not needed anymore as we store the value of
FOREIGN_KEY_CHECKS in a binary way in the Query event's header.
The code below was enabled in 4.0 and 4.1.
*/
/*
If the user has set FOREIGN_KEY_CHECKS=0 we wrap every SQL
command in the binlog inside:
@ -1297,6 +1348,7 @@ bool MYSQL_LOG::write(Log_event* event_info)
if (e.write(file))
goto err;
}
#endif
}
/* Write the SQL command */
@ -1307,6 +1359,7 @@ bool MYSQL_LOG::write(Log_event* event_info)
/* Write log events to reset the 'run environment' of the SQL command */
#if MYSQL_VERSION_ID < 50000
if (thd && thd->options & OPTION_NO_FOREIGN_KEY_CHECKS)
{
Query_log_event e(thd, "SET FOREIGN_KEY_CHECKS=1", 24, 0);
@ -1314,6 +1367,7 @@ bool MYSQL_LOG::write(Log_event* event_info)
if (e.write(file))
goto err;
}
#endif
/*
Tell for transactional table handlers up to which position in the
@ -1720,6 +1774,7 @@ void MYSQL_LOG::close(uint exiting)
Stop_log_event s;
s.set_log_pos(this);
s.write(&log_file);
bytes_written+= s.get_event_len();
signal_update();
}
#endif /* HAVE_REPLICATION */

File diff suppressed because it is too large Load Diff

View File

@ -34,13 +34,43 @@
#define LOG_READ_TOO_LARGE -7
#define LOG_EVENT_OFFSET 4
#define BINLOG_VERSION 3
/*
3 is MySQL 4.x; 4 is MySQL 5.0.0.
Compared to version 3, version 4 has:
- a different Start_log_event, which includes info about the binary log
(sizes of headers); this info is included for better compatibility if the
master's MySQL version is different from the slave's.
- all events have a unique ID (the triplet (server_id, timestamp at server
start, other) to be sure an event is not executed more than once in a
multimaster setup, example:
M1
/ \
v v
M2 M3
\ /
v v
S
if a query is run on M1, it will arrive twice on S, so we need that S
remembers the last unique ID it has processed, to compare and know if the
event should be skipped or not. Example of ID: we already have the server id
(4 bytes), plus:
timestamp_when_the_master_started (4 bytes), a counter (a sequence number
which increments every time we write an event to the binlog) (3 bytes).
Q: how do we handle when the counter is overflowed and restarts from 0 ?
- Query and Load (Create or Execute) events may have a more precise timestamp
(with microseconds), number of matched/affected/warnings rows
and fields of session variables: SQL_MODE,
FOREIGN_KEY_CHECKS, UNIQUE_CHECKS, SQL_AUTO_IS_NULL, the collations and
charsets, the PASSWORD() version (old/new/...).
*/
#define BINLOG_VERSION 4
/*
We could have used SERVER_VERSION_LENGTH, but this introduces an
obscure dependency - if somebody decided to change SERVER_VERSION_LENGTH
this would have broken the replication protocol
this would break the replication protocol
*/
#define ST_SERVER_VER_LEN 50
@ -49,6 +79,12 @@
TERMINATED etc).
*/
/*
These are flags and structs to handle all the LOAD DATA INFILE options (LINES
TERMINATED etc).
DUMPFILE_FLAG is probably useless (DUMPFILE is a clause of SELECT, not of LOAD
DATA).
*/
#define DUMPFILE_FLAG 0x1
#define OPT_ENCLOSED_FLAG 0x2
#define REPLACE_FLAG 0x4
@ -136,16 +172,28 @@ struct sql_ex_info
#define LOG_EVENT_HEADER_LEN 19 /* the fixed header length */
#define OLD_HEADER_LEN 13 /* the fixed header length in 3.23 */
/*
Fixed header length, where 4.x and 5.0 agree. That is, 5.0 may have a longer
header (it will for sure when we have the unique event's ID), but at least
the first 19 bytes are the same in 4.x and 5.0. So when we have the unique
event's ID, LOG_EVENT_HEADER_LEN will be something like 26, but
LOG_EVENT_MINIMAL_HEADER_LEN will remain 19.
*/
#define LOG_EVENT_MINIMAL_HEADER_LEN 19
/* event-specific post-header sizes */
#define QUERY_HEADER_LEN (4 + 4 + 1 + 2)
// where 3.23, 4.x and 5.0 agree
#define QUERY_HEADER_MINIMAL_LEN (4 + 4 + 1 + 2)
// where 5.0 differs: 2 for len of N-bytes vars.
#define QUERY_HEADER_LEN (QUERY_HEADER_MINIMAL_LEN + 2)
#define LOAD_HEADER_LEN (4 + 4 + 4 + 1 +1 + 4)
#define START_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4)
#define ROTATE_HEADER_LEN 8
#define START_V3_HEADER_LEN (2 + ST_SERVER_VER_LEN + 4)
#define ROTATE_HEADER_LEN 8 // this is FROZEN (the Rotate post-header is frozen)
#define CREATE_FILE_HEADER_LEN 4
#define APPEND_BLOCK_HEADER_LEN 4
#define EXEC_LOAD_HEADER_LEN 4
#define DELETE_FILE_HEADER_LEN 4
#define FORMAT_DESCRIPTION_HEADER_LEN (START_V3_HEADER_LEN+1+LOG_EVENT_TYPES)
/*
Event header offsets;
@ -158,11 +206,12 @@ struct sql_ex_info
#define LOG_POS_OFFSET 13
#define FLAGS_OFFSET 17
/* start event post-header */
/* start event post-header (for v3 and v4) */
#define ST_BINLOG_VER_OFFSET 0
#define ST_SERVER_VER_OFFSET 2
#define ST_CREATED_OFFSET (ST_SERVER_VER_OFFSET + ST_SERVER_VER_LEN)
#define ST_COMMON_HEADER_LEN_OFFSET (ST_CREATED_OFFSET + 4)
/* slave event post-header (this event is never written) */
@ -176,7 +225,13 @@ struct sql_ex_info
#define Q_EXEC_TIME_OFFSET 4
#define Q_DB_LEN_OFFSET 8
#define Q_ERR_CODE_OFFSET 9
#define Q_STATUS_VARS_LEN_OFFSET 11
#define Q_DATA_OFFSET QUERY_HEADER_LEN
/* these are codes, not offsets; not more than 256 values (1 byte). */
#define Q_FLAGS2_CODE 0
#define Q_SQL_MODE_CODE 1
#define Q_CATALOG_CODE 2
/* Intvar event post-header */
@ -228,16 +283,6 @@ struct sql_ex_info
/* DF = "Delete File" */
#define DF_FILE_ID_OFFSET 0
#define QUERY_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN)
#define QUERY_DATA_OFFSET (LOG_EVENT_HEADER_LEN+QUERY_HEADER_LEN)
#define ROTATE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+ROTATE_HEADER_LEN)
#define LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+LOAD_HEADER_LEN)
#define CREATE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+\
+LOAD_HEADER_LEN+CREATE_FILE_HEADER_LEN)
#define DELETE_FILE_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+DELETE_FILE_HEADER_LEN)
#define EXEC_LOAD_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+EXEC_LOAD_HEADER_LEN)
#define APPEND_BLOCK_EVENT_OVERHEAD (LOG_EVENT_HEADER_LEN+APPEND_BLOCK_HEADER_LEN)
/* 4 bytes which all binlogs should begin with */
#define BINLOG_MAGIC "\xfe\x62\x69\x6e"
@ -264,15 +309,54 @@ struct sql_ex_info
*/
#define LOG_EVENT_THREAD_SPECIFIC_F 0x4
/*
OPTIONS_WRITTEN_TO_BIN_LOG are the bits of thd->options which must be written
to the binlog. OPTIONS_WRITTEN_TO_BINLOG could be written into the
Format_description_log_event, so that if later we don't want to replicate a
variable we did replicate, or the contrary, it's doable. But it should not be
too hard to decide once for all of what we replicate and what we don't, among
the fixed 32 bits of thd->options.
I (Guilhem) have read through every option's usage, and it looks like
OPTION_AUTO_IS_NULL and OPTION_NO_FOREIGN_KEYS are the only ones which alter
how the query modifies the table. It's good to replicate
OPTION_RELAXED_UNIQUE_CHECKS too because otherwise, the slave may insert data
slower than the master, in InnoDB.
OPTION_BIG_SELECTS is not needed (the slave thread runs with
max_join_size=HA_POS_ERROR) and OPTION_BIG_TABLES is not needed either, as
the manual says (because a too big in-memory temp table is automatically
written to disk).
*/
#define OPTIONS_WRITTEN_TO_BIN_LOG (OPTION_AUTO_IS_NULL | \
OPTION_NO_FOREIGN_KEY_CHECKS | OPTION_RELAXED_UNIQUE_CHECKS)
enum Log_event_type
{
UNKNOWN_EVENT= 0, START_EVENT= 1, QUERY_EVENT= 2, STOP_EVENT= 3,
ROTATE_EVENT= 4, INTVAR_EVENT= 5, LOAD_EVENT=6, SLAVE_EVENT= 7,
CREATE_FILE_EVENT= 8, APPEND_BLOCK_EVENT= 9, EXEC_LOAD_EVENT= 10,
DELETE_FILE_EVENT= 11, NEW_LOAD_EVENT= 12, RAND_EVENT= 13,
USER_VAR_EVENT= 14
/*
Every time you update this enum (when you add a type), you have to
update the code of Format_description_log_event::Format_description_log_event().
Make sure you always insert new types ***BEFORE*** ENUM_END_EVENT.
*/
UNKNOWN_EVENT= 0, START_EVENT_V3, QUERY_EVENT, STOP_EVENT, ROTATE_EVENT,
INTVAR_EVENT, LOAD_EVENT, SLAVE_EVENT, CREATE_FILE_EVENT,
APPEND_BLOCK_EVENT, EXEC_LOAD_EVENT, DELETE_FILE_EVENT,
/*
NEW_LOAD_EVENT is like LOAD_EVENT except that it has a longer sql_ex,
allowing multibyte TERMINATED BY etc; both types share the same class
(Load_log_event)
*/
NEW_LOAD_EVENT,
RAND_EVENT, USER_VAR_EVENT,
FORMAT_DESCRIPTION_EVENT,
ENUM_END_EVENT /* end marker */
};
/*
The number of types we handle in Format_description_log_event (UNKNOWN_EVENT
is not to be handled, it does not exist in binlogs, it does not have a
format).
*/
#define LOG_EVENT_TYPES (ENUM_END_EVENT-1)
enum Int_event_type
{
INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2
@ -285,8 +369,33 @@ class MYSQL_LOG;
class THD;
#endif
class Format_description_log_event;
struct st_relay_log_info;
#ifdef MYSQL_CLIENT
/*
A structure for mysqlbinlog to remember the last db, flags2, sql_mode etc; it
is passed to events' print() methods, so that they print only the necessary
USE and SET commands.
*/
typedef struct st_last_event_info
{
// TODO: have the last catalog here ??
char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
bool flags2_inited;
uint32 flags2;
bool sql_mode_inited;
ulonglong sql_mode;
st_last_event_info()
: flags2_inited(0), flags2(0), sql_mode_inited(0), sql_mode(0)
{
db[0]= 0; /* initially, the db is unknown */
}
} LAST_EVENT_INFO;
#endif
/*****************************************************************************
Log_event class
@ -337,21 +446,26 @@ public:
uint16 flags;
bool cache_stmt;
#ifndef MYSQL_CLIENT
THD* thd;
Log_event(THD* thd_arg, uint16 flags_arg, bool cache_stmt);
Log_event();
Log_event(THD* thd_arg, uint16 flags_arg, bool cache_stmt);
/*
read_log_event() functions read an event from a binlog or relay log; used by
SHOW BINLOG EVENTS, the binlog_dump thread on the master (reads master's
binlog), the slave IO thread (reads the event sent by binlog_dump), the
slave SQL thread (reads the event from the relay log).
If mutex is 0, the read will proceed without mutex.
We need the description_event to be able to parse the event (to know the
post-header's size); in fact in read_log_event we detect the event's type,
then call the specific event's constructor and pass description_event as an
argument.
*/
// if mutex is 0, the read will proceed without mutex
static Log_event* read_log_event(IO_CACHE* file,
pthread_mutex_t* log_lock,
bool old_format);
const Format_description_log_event *description_event);
static int read_log_event(IO_CACHE* file, String* packet,
pthread_mutex_t* log_lock);
/* set_log_pos() is used to fill log_pos with tell(log). */
@ -379,10 +493,12 @@ public:
return thd ? thd->db : 0;
}
#else
Log_event() : temp_buf(0) {}
// avoid having to link mysqlbinlog against libpthread
static Log_event* read_log_event(IO_CACHE* file, bool old_format);
static Log_event* read_log_event(IO_CACHE* file,
const Format_description_log_event *description_event);
/* print*() functions are used by mysqlbinlog */
virtual void print(FILE* file, bool short_form = 0, char* last_db = 0) = 0;
virtual void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0) = 0;
void print_timestamp(FILE* file, time_t *ts = 0);
void print_header(FILE* file);
#endif
@ -405,9 +521,9 @@ public:
virtual int write_data_body(IO_CACHE* file __attribute__((unused)))
{ return 0; }
virtual Log_event_type get_type_code() = 0;
virtual bool is_valid() = 0;
virtual const bool is_valid() = 0;
inline bool get_cache_stmt() { return cache_stmt; }
Log_event(const char* buf, bool old_format);
Log_event(const char* buf, const Format_description_log_event* description_event);
virtual ~Log_event() { free_temp_buf();}
void register_temp_buf(char* buf) { temp_buf = buf; }
void free_temp_buf()
@ -419,18 +535,37 @@ public:
}
}
virtual int get_data_size() { return 0;}
virtual int get_data_body_offset() { return 0; }
int get_event_len()
{
return (cached_event_len ? cached_event_len :
(cached_event_len = LOG_EVENT_HEADER_LEN + get_data_size()));
/*
We don't re-use the cached event's length anymore (we did in 4.x) because
this leads to nasty problems: when the 5.0 slave reads an event from a 4.0
master, it caches the event's length, then this event is converted before
it goes into the relay log, so it would be written to the relay log with
its old length, which is garbage.
*/
return (cached_event_len=(LOG_EVENT_HEADER_LEN + get_data_size()));
}
static Log_event* read_log_event(const char* buf, int event_len,
const char **error, bool old_format);
static Log_event* read_log_event(const char* buf, uint event_len,
const char **error,
const Format_description_log_event
*description_event);
/* returns the human readable name of the event's type */
const char* get_type_str();
};
/*
One class for each type of event.
Two constructors for each class:
- one to create the event for logging (when the server acts as a master),
called after an update to the database is done,
which accepts parameters like the query, the database, the options for LOAD
DATA INFILE...
- one to create the event from a packet (when the server acts as a slave),
called before reproducing the update, which accepts parameters (like a
buffer). Used to read from the master, from the relay log, and in
mysqlbinlog. This constructor must be format-tolerant.
*/
/*****************************************************************************
@ -445,6 +580,7 @@ protected:
char* data_buf;
public:
const char* query;
const char* catalog;
const char* db;
/*
If we already know the length of the query string
@ -462,6 +598,52 @@ public:
BUG#1686).
*/
ulong slave_proxy_id;
/*
Binlog format 3 and 4 start to differ (as far as class members are
concerned) from here.
*/
uint32 catalog_len;
/*
We want to be able to store a variable number of N-bit status vars:
(generally N=32; but N=64 for SQL_MODE) a user may want to log the number of
affected rows (for debugging) while another does not want to lose 4 bytes in
this.
The storage on disk is the following:
status_vars_len is part of the post-header,
status_vars are in the variable-length part, after the post-header, before
the db & query.
status_vars on disk is a sequence of pairs (code, value) where 'code' means
'sql_mode', 'affected' etc. Sometimes 'value' must be a short string, so its
first byte is its length. For now the order of status vars is:
flags2 - sql_mode - catalog.
We should add the same thing to Load_log_event, but in fact
LOAD DATA INFILE is going to be logged with a new type of event (logging of
the plain text query), so Load_log_event would be frozen, so no need. The
new way of logging LOAD DATA INFILE would use a derived class of
Query_log_event, so automatically benefit from the work already done for
status variables in Query_log_event.
*/
uint16 status_vars_len;
/*
'flags2' is a second set of flags (on top of those in Log_event), for
session variables. These are thd->options which is & against a mask
(OPTIONS_WRITTEN_TO_BINLOG).
flags2_inited helps make a difference between flags2==0 (3.23 or 4.x
master, we don't know flags2, so use the slave server's global options) and
flags2==0 (5.0 master, we know this has a meaning of flags all down which
must influence the query).
*/
bool flags2_inited;
bool sql_mode_inited;
uint32 flags2;
/* In connections sql_mode is 32 bits now but will be 64 bits soon */
ulonglong sql_mode;
#ifndef MYSQL_CLIENT
Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
@ -472,10 +654,11 @@ public:
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Query_log_event(const char* buf, int event_len, bool old_format);
Query_log_event(const char* buf, uint event_len,
const Format_description_log_event *description_event);
~Query_log_event()
{
if (data_buf)
@ -486,14 +669,11 @@ public:
Log_event_type get_type_code() { return QUERY_EVENT; }
int write(IO_CACHE* file);
int write_data(IO_CACHE* file); // returns 0 on success, -1 on error
bool is_valid() { return query != 0; }
const bool is_valid() { return query != 0; }
int get_data_size()
{
return (q_len + db_len + 2
+ 4 // thread_id
+ 4 // exec_time
+ 2 // error_code
);
/* Note that the "1" below is the db's length. */
return (q_len + db_len + 1 + status_vars_len + QUERY_HEADER_LEN);
}
};
@ -504,6 +684,7 @@ public:
Slave Log Event class
Note that this class is currently not used at all; no code writes a
Slave_log_event (though some code in repl_failsafe.cc reads Slave_log_event).
So it's not a problem if this code is not maintained.
****************************************************************************/
class Slave_log_event: public Log_event
@ -524,13 +705,13 @@ public:
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Slave_log_event(const char* buf, int event_len);
Slave_log_event(const char* buf, uint event_len);
~Slave_log_event();
int get_data_size();
bool is_valid() { return master_host != 0; }
const bool is_valid() { return master_host != 0; }
Log_event_type get_type_code() { return SLAVE_EVENT; }
int write_data(IO_CACHE* file );
};
@ -546,12 +727,18 @@ public:
class Load_log_event: public Log_event
{
protected:
int copy_log_event(const char *buf, ulong event_len, bool old_format);
int copy_log_event(const char *buf, ulong event_len,
int body_offset, const Format_description_log_event* description_event);
public:
ulong thread_id;
ulong slave_proxy_id;
uint32 table_name_len;
/*
No need to have a catalog, as these events can only come from 4.x.
TODO: this may become false if Dmitri pushes his new LOAD DATA INFILE in
5.0 only (not in 4.x).
*/
uint32 db_len;
uint32 fname_len;
uint32 num_fields;
@ -597,11 +784,18 @@ public:
bool use_rli_only_for_errors);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form, char* last_db, bool commented);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info = 0);
void print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info, bool commented);
#endif
Load_log_event(const char* buf, int event_len, bool old_format);
/*
Note that for all the events related to LOAD DATA (Load_log_event,
Create_file/Append/Exec/Delete, we pass description_event; however as
logging of LOAD DATA is going to be changed in 4.1 or 5.0, this is only used
for the common_header_len (post_header_len will not be changed).
*/
Load_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Load_log_event()
{}
Log_event_type get_type_code()
@ -610,27 +804,31 @@ public:
}
int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file);
bool is_valid() { return table_name != 0; }
const bool is_valid() { return table_name != 0; }
int get_data_size()
{
return (table_name_len + 2 + db_len + 2 + fname_len
+ 4 // thread_id
+ 4 // exec_time
+ 4 // skip_lines
+ 4 // field block len
return (table_name_len + db_len + 2 + fname_len
+ LOAD_HEADER_LEN
+ sql_ex.data_size() + field_block_len + num_fields);
}
int get_data_body_offset() { return LOAD_EVENT_OVERHEAD; }
};
extern char server_version[SERVER_VERSION_LENGTH];
/*****************************************************************************
Start Log Event class
Start Log Event_v3 class
Start_log_event_v3 is the Start_log_event of binlog format 3 (MySQL 3.23 and
4.x).
Format_description_log_event derives from Start_log_event_v3; it is the
Start_log_event of binlog format 4 (MySQL 5.0), that is, the event that
describes the other events' header/postheader lengths. This event is sent by
MySQL 5.0 whenever it starts sending a new binlog if the requested position
is >4 (otherwise if ==4 the event will be sent naturally).
****************************************************************************/
class Start_log_event: public Log_event
class Start_log_event_v3: public Log_event
{
public:
/*
@ -658,27 +856,81 @@ public:
char server_version[ST_SERVER_VER_LEN];
#ifndef MYSQL_CLIENT
Start_log_event() :Log_event(), binlog_version(BINLOG_VERSION)
{
created = (time_t) when;
memcpy(server_version, ::server_version, ST_SERVER_VER_LEN);
}
Start_log_event_v3();
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
Start_log_event_v3() {}
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Start_log_event(const char* buf, bool old_format);
~Start_log_event() {}
Log_event_type get_type_code() { return START_EVENT;}
Start_log_event_v3(const char* buf,
const Format_description_log_event* description_event);
~Start_log_event_v3() {}
Log_event_type get_type_code() { return START_EVENT_V3;}
int write_data(IO_CACHE* file);
bool is_valid() { return 1; }
const bool is_valid() { return 1; }
int get_data_size()
{
return START_HEADER_LEN;
return START_V3_HEADER_LEN; //no variable-sized part
}
};
/*
For binlog version 4.
This event is saved by threads which read it, as they need it for future
use (to decode the ordinary events).
*/
class Format_description_log_event: public Start_log_event_v3
{
public:
/*
The size of the fixed header which _all_ events have
(for binlogs written by this version, this is equal to
LOG_EVENT_HEADER_LEN), except FORMAT_DESCRIPTION_EVENT and ROTATE_EVENT
(those have a header of size LOG_EVENT_MINIMAL_HEADER_LEN).
*/
uint8 common_header_len;
uint8 number_of_event_types;
/* The list of post-headers' lengthes */
uint8 *post_header_len;
Format_description_log_event(uint8 binlog_ver, const char* server_ver=0);
#ifndef MYSQL_CLIENT
#ifdef HAVE_REPLICATION
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#endif
Format_description_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Format_description_log_event() { my_free((gptr)post_header_len, MYF(0)); }
Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;}
int write_data(IO_CACHE* file);
const bool is_valid()
{
return ((common_header_len >= ((binlog_version==1) ? OLD_HEADER_LEN :
LOG_EVENT_MINIMAL_HEADER_LEN)) &&
(post_header_len != NULL));
}
int get_event_len()
{
int i= LOG_EVENT_MINIMAL_HEADER_LEN + get_data_size();
DBUG_PRINT("info",("event_len=%d",i));
return i;
}
int get_data_size()
{
/*
The vector of post-header lengths is considered as part of the
post-header, because in a given version it never changes (contrary to the
query in a Query_log_event).
*/
return FORMAT_DESCRIPTION_HEADER_LEN;
}
};
@ -705,23 +957,26 @@ public:
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Intvar_log_event(const char* buf, bool old_format);
Intvar_log_event(const char* buf, const Format_description_log_event* description_event);
~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;}
const char* get_var_type_name();
int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;}
int write_data(IO_CACHE* file);
bool is_valid() { return 1; }
const bool is_valid() { return 1; }
};
/*****************************************************************************
Rand Log Event class
Logs random seed used by the next RAND(), and by PASSWORD() in 4.1.
Logs random seed used by the next RAND(), and by PASSWORD() in 4.1.0.
4.1.1 does not need it (it's repeatable again) so this event needn't be
written in 4.1.1 for PASSWORD() (but the fact that it is written is just a
waste, it does not cause bugs).
****************************************************************************/
class Rand_log_event: public Log_event
@ -739,15 +994,15 @@ class Rand_log_event: public Log_event
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Rand_log_event(const char* buf, bool old_format);
Rand_log_event(const char* buf, const Format_description_log_event* description_event);
~Rand_log_event() {}
Log_event_type get_type_code() { return RAND_EVENT;}
int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ }
int write_data(IO_CACHE* file);
bool is_valid() { return 1; }
const bool is_valid() { return 1; }
};
/*****************************************************************************
@ -757,6 +1012,9 @@ class Rand_log_event: public Log_event
Every time a query uses the value of a user variable, a User_var_log_event is
written before the Query_log_event, to set the user variable.
Every time a query uses the value of a user variable, a User_var_log_event is
written before the Query_log_event, to set the user variable.
****************************************************************************/
class User_var_log_event: public Log_event
{
@ -778,10 +1036,10 @@ public:
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
User_var_log_event(const char* buf, bool old_format);
User_var_log_event(const char* buf, const Format_description_log_event* description_event);
~User_var_log_event() {}
Log_event_type get_type_code() { return USER_VAR_EVENT;}
int get_data_size()
@ -791,7 +1049,7 @@ public:
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE + val_len);
}
int write_data(IO_CACHE* file);
bool is_valid() { return 1; }
const bool is_valid() { return 1; }
};
/*****************************************************************************
@ -809,15 +1067,15 @@ public:
{}
int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Stop_log_event(const char* buf, bool old_format):
Log_event(buf, old_format)
Stop_log_event(const char* buf, const Format_description_log_event* description_event):
Log_event(buf, description_event)
{}
~Stop_log_event() {}
Log_event_type get_type_code() { return STOP_EVENT;}
bool is_valid() { return 1; }
const bool is_valid() { return 1; }
};
#endif /* HAVE_REPLICATION */
@ -850,18 +1108,23 @@ public:
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Rotate_log_event(const char* buf, int event_len, bool old_format);
Rotate_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Rotate_log_event()
{
if (alloced)
my_free((gptr) new_log_ident, MYF(0));
}
Log_event_type get_type_code() { return ROTATE_EVENT;}
int get_event_len()
{
return (LOG_EVENT_MINIMAL_HEADER_LEN + get_data_size());
}
int get_data_size() { return ident_len + ROTATE_HEADER_LEN;}
bool is_valid() { return new_log_ident != 0; }
const bool is_valid() { return new_log_ident != 0; }
int write_data(IO_CACHE* file);
};
@ -899,11 +1162,12 @@ public:
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form, char* last_db, bool enable_local);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
void print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info, bool enable_local);
#endif
Create_file_log_event(const char* buf, int event_len, bool old_format);
Create_file_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Create_file_log_event() {}
Log_event_type get_type_code()
@ -916,12 +1180,7 @@ public:
Load_log_event::get_data_size() +
4 + 1 + block_len);
}
int get_data_body_offset()
{
return (fake_base ? LOAD_EVENT_OVERHEAD:
LOAD_EVENT_OVERHEAD + CREATE_FILE_HEADER_LEN);
}
bool is_valid() { return inited_from_old || block != 0; }
const bool is_valid() { return inited_from_old || block != 0; }
int write_data_header(IO_CACHE* file);
int write_data_body(IO_CACHE* file);
/*
@ -963,14 +1222,15 @@ public:
void pack_info(Protocol* protocol);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Append_block_log_event(const char* buf, int event_len);
Append_block_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Append_block_log_event() {}
Log_event_type get_type_code() { return APPEND_BLOCK_EVENT;}
int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;}
bool is_valid() { return block != 0; }
const bool is_valid() { return block != 0; }
int write_data(IO_CACHE* file);
const char* get_db() { return db; }
};
@ -993,15 +1253,16 @@ public:
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form, char* last_db, bool enable_local);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
void print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info, bool enable_local);
#endif
Delete_file_log_event(const char* buf, int event_len);
Delete_file_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Delete_file_log_event() {}
Log_event_type get_type_code() { return DELETE_FILE_EVENT;}
int get_data_size() { return DELETE_FILE_HEADER_LEN ;}
bool is_valid() { return file_id != 0; }
const bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file);
const char* get_db() { return db; }
};
@ -1024,14 +1285,15 @@ public:
int exec_event(struct st_relay_log_info* rli);
#endif /* HAVE_REPLICATION */
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
void print(FILE* file, bool short_form = 0, LAST_EVENT_INFO* last_event_info= 0);
#endif
Execute_load_log_event(const char* buf, int event_len);
Execute_load_log_event(const char* buf, uint event_len,
const Format_description_log_event* description_event);
~Execute_load_log_event() {}
Log_event_type get_type_code() { return EXEC_LOAD_EVENT;}
int get_data_size() { return EXEC_LOAD_HEADER_LEN ;}
bool is_valid() { return file_id != 0; }
const bool is_valid() { return file_id != 0; }
int write_data(IO_CACHE* file);
const char* get_db() { return db; }
};
@ -1040,13 +1302,18 @@ public:
class Unknown_log_event: public Log_event
{
public:
Unknown_log_event(const char* buf, bool old_format):
Log_event(buf, old_format)
/*
Even if this is an unknown event, we still pass description_event to
Log_event's ctor, this way we can extract maximum information from the
event's header (the unique ID for example).
*/
Unknown_log_event(const char* buf, const Format_description_log_event* description_event):
Log_event(buf, description_event)
{}
~Unknown_log_event() {}
void print(FILE* file, bool short_form= 0, char* last_db= 0);
void print(FILE* file, bool short_form= 0, LAST_EVENT_INFO* last_event_info= 0);
Log_event_type get_type_code() { return UNKNOWN_EVENT;}
bool is_valid() { return 1; }
const bool is_valid() { return 1; }
};
#endif

View File

@ -14,6 +14,15 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
Mostly this file is used in the server. But a little part of it is used in
mysqlbinlog too (definition of SELECT_DISTINCT and others).
The consequence is that 90% of the file is wrapped in #ifndef MYSQL_CLIENT,
except the part which must be in the server and in the client.
*/
#ifndef MYSQL_CLIENT
#include <my_global.h>
#include <assert.h>
#include <mysql_version.h>
@ -176,7 +185,15 @@ extern CHARSET_INFO *national_charset_info, *table_alias_charset;
#define TEST_NO_STACKTRACE 512
#define TEST_SIGINT 1024 /* Allow sigint on threads */
/* options for select set by the yacc parser (stored in lex->options) */
#endif
/*
This is included in the server and in the client.
Options for select set by the yacc parser (stored in lex->options).
None of the 32 defines below should have its value changed, or this will
break replication.
*/
#define SELECT_DISTINCT (1L << 0)
#define SELECT_STRAIGHT_JOIN (1L << 1)
#define SELECT_DESCRIBE (1L << 2)
@ -214,6 +231,9 @@ extern CHARSET_INFO *national_charset_info, *table_alias_charset;
#define OPTION_RELAXED_UNIQUE_CHECKS (1L << 27)
#define SELECT_NO_UNLOCK (1L << 28)
/* The rest of the file is included in the server only */
#ifndef MYSQL_CLIENT
/* options for UNION set by the yacc parser (stored in unit->union_option) */
#define UNION_ALL 1
@ -1102,3 +1122,5 @@ inline void setup_table_map(TABLE *table, TABLE_LIST *table_list, uint tablenr)
table->map= (table_map) 1 << tablenr;
table->force_index= table_list->force_index;
}
#endif /* MYSQL_CLIENT */

View File

@ -2023,7 +2023,7 @@ bool open_log(MYSQL_LOG *log, const char *hostname,
}
return log->open(opt_name, type, 0, index_file_name,
(read_append) ? SEQ_READ_APPEND : WRITE_CACHE,
no_auto_events, max_size);
no_auto_events, max_size, 0);
}

View File

@ -73,8 +73,6 @@ static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
static int request_table_dump(MYSQL* mysql, const char* db, const char* table);
static int create_table_from_dump(THD* thd, MYSQL *mysql, const char* db,
const char* table_name, bool overwrite);
static int check_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi);
/*
Find out which replications threads are running
@ -215,6 +213,12 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
pos Position in relay log file
need_data_lock Set to 1 if this functions should do mutex locks
errmsg Store pointer to error message here
look_for_description_event
1 if we should look for such an event. We only need
this when the SQL thread starts and opens an existing
relay log and has to execute it (possibly from an offset
>4); then we need to read the first event of the relay
log to be able to parse the events we have to execute.
DESCRIPTION
- Close old open relay log files.
@ -232,15 +236,35 @@ static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
ulonglong pos, bool need_data_lock,
const char** errmsg)
const char** errmsg,
bool look_for_description_event)
{
DBUG_ENTER("init_relay_log_pos");
DBUG_PRINT("info", ("pos=%lu", pos));
*errmsg=0;
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
if (need_data_lock)
pthread_mutex_lock(&rli->data_lock);
/*
Slave threads are not the only users of init_relay_log_pos(). CHANGE MASTER
is, too, and init_slave() too; these 2 functions allocate a description
event in init_relay_log_pos, which is not freed by the terminating SQL slave
thread as that thread is not started by these functions. So we have to free
the description_event here, in case, so that there is no memory leak in
running, say, CHANGE MASTER.
*/
delete rli->relay_log.description_event_for_exec;
/*
By default the relay log is in binlog format 3 (4.0).
Even if format is 4, this will work enough to read the first event
(Format_desc) (remember that format 4 is just lenghtened compared to format
3; format 3 is a prefix of format 4).
*/
rli->relay_log.description_event_for_exec= new
Format_description_log_event(3);
pthread_mutex_lock(log_lock);
@ -280,8 +304,8 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
In this case, we will use the same IO_CACHE pointer to
read data as the IO thread is using to write data.
*/
if (my_b_tell((rli->cur_log=rli->relay_log.get_log_file())) == 0 &&
check_binlog_magic(rli->cur_log,errmsg))
my_b_seek((rli->cur_log=rli->relay_log.get_log_file()), (off_t)0);
if (check_binlog_magic(rli->cur_log,errmsg))
goto err;
rli->cur_log_old_open_count=rli->relay_log.get_open_count();
}
@ -295,8 +319,85 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
goto err;
rli->cur_log = &rli->cache_buf;
}
if (pos >= BIN_LOG_HEADER_SIZE)
/*
In all cases, check_binlog_magic() has been called so we're at offset 4 for
sure.
*/
if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */
{
Log_event* ev;
while (look_for_description_event)
{
/*
Read the possible Format_description_log_event; if position was 4, no need, it will
be read naturally.
*/
DBUG_PRINT("info",("looking for a Format_description_log_event"));
if (my_b_tell(rli->cur_log) >= pos)
break;
/*
Because of we have rli->data_lock and log_lock, we can safely read an
event
*/
if (!(ev=Log_event::read_log_event(rli->cur_log,0,
rli->relay_log.description_event_for_exec)))
{
DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d",
rli->cur_log->error));
if (rli->cur_log->error) /* not EOF */
{
*errmsg= "I/O error reading event at position 4";
goto err;
}
break;
}
else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
{
DBUG_PRINT("info",("found Format_description_log_event"));
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev;
/*
As ev was returned by read_log_event, it has passed is_valid(), so
my_malloc() in ctor worked, no need to check again.
*/
/*
Ok, we found a Format_description event. But it is not sure that this
describes the whole relay log; indeed, one can have this sequence
(starting from position 4):
Format_desc (of slave)
Rotate (of master)
Format_desc (of slave)
So the Format_desc which really describes the rest of the relay log is
the 3rd event (it can't be further than that, because we rotate the
relay log when we queue a Rotate event from the master).
But what describes the Rotate is the first Format_desc.
So what we do is:
go on searching for Format_description events, until you exceed the
position (argument 'pos') or until you find another event than Rotate
or Format_desc.
*/
}
else
{
DBUG_PRINT("info",("found event of another type=%d",
ev->get_type_code()));
look_for_description_event= (ev->get_type_code() == ROTATE_EVENT);
delete ev;
}
}
my_b_seek(rli->cur_log,(off_t)pos);
#ifndef DBUG_OFF
{
char llbuf1[22], llbuf2[22];
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
llstr(my_b_tell(rli->cur_log),llbuf1),
llstr(rli->event_relay_log_pos,llbuf2)));
}
#endif
}
err:
/*
@ -311,6 +412,8 @@ err:
if (need_data_lock)
pthread_mutex_unlock(&rli->data_lock);
if (!rli->relay_log.description_event_for_exec->is_valid() && !*errmsg)
*errmsg= "Invalid Format_description log event; could be out of memory";
DBUG_RETURN ((*errmsg) ? 1 : 0);
}
@ -428,13 +531,15 @@ int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
sizeof(rli->group_relay_log_name)-1);
strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->event_relay_log_name)-1);
// Just first log with magic number and nothing else
rli->log_space_total= BIN_LOG_HEADER_SIZE;
rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
rli->relay_log.reset_bytes_written();
if (count_relay_log_space(rli))
{
*errmsg= "Error counting relay log space";
goto err;
}
if (!just_reset)
error= init_relay_log_pos(rli, rli->group_relay_log_name, rli->group_relay_log_pos,
0 /* do not need data lock */, errmsg);
0 /* do not need data lock */, errmsg, 0);
err:
#ifndef DBUG_OFF
@ -693,6 +798,10 @@ static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
different results. Note also the order of precedence of the do/ignore
rules (see code below). For that reason, users should not set conflicting
rules because they may get unpredicted results.
Thought which arose from a question of a big customer "I want to include all
tables like "abc.%" except the "%.EFG"". This can't be done now. If we
supported Perl regexps we could do it with this pattern: /^abc\.(?!EFG)/
(I could not find an equivalent in the regex library MySQL uses).
RETURN VALUES
0 should not be logged/replicated
@ -1087,30 +1196,74 @@ static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
return 1;
}
/*
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
relying on the binlog's version. This is not perfect: imagine an upgrade
of the master without waiting that all slaves are in sync with the master;
then a slave could be fooled about the binlog's format. This is what happens
when people upgrade a 3.23 master to 4.0 without doing RESET MASTER: 4.0
slaves are fooled. So we do this only to distinguish between 3.23 and more
recent masters (it's too late to change things for 3.23).
RETURNS
0 ok
1 error
*/
static int get_master_version_and_clock(MYSQL* mysql, MASTER_INFO* mi)
{
const char* errmsg= 0;
/*
Note the following switch will bug when we have MySQL branch 30 ;)
*/
switch (*mysql->server_version) {
case '3':
mi->old_format =
(strncmp(mysql->server_version, "3.23.57", 7) < 0) /* < .57 */ ?
BINLOG_FORMAT_323_LESS_57 :
BINLOG_FORMAT_323_GEQ_57 ;
break;
case '4':
case '5':
mi->old_format = BINLOG_FORMAT_CURRENT;
break;
default:
if (!my_isdigit(&my_charset_bin,*mysql->server_version))
errmsg = "Master reported unrecognized MySQL version";
break;
else
{
/*
Note the following switch will bug when we have MySQL branch 30 ;)
*/
switch (*mysql->server_version)
{
case '0':
case '1':
case '2':
errmsg = "Master reported unrecognized MySQL version";
break;
case '3':
mi->rli.relay_log.description_event_for_queue= new
Format_description_log_event(1, mysql->server_version);
break;
case '4':
mi->rli.relay_log.description_event_for_queue= new
Format_description_log_event(3, mysql->server_version);
break;
default:
/*
Master is MySQL >=5.0. Give a default Format_desc event, so that we can
take the early steps (like tests for "is this a 3.23 master") which we
have to take before we receive the real master's Format_desc which will
override this one. Note that the Format_desc we create below is garbage
(it has the format of the *slave*); it's only good to help know if the
master is 3.23, 4.0, etc.
*/
mi->rli.relay_log.description_event_for_queue= new
Format_description_log_event(4, mysql->server_version);
break;
}
}
/*
This does not mean that a 5.0 slave will be able to read a 6.0 master; but
as we don't know yet, we don't want to forbid this for now. If a 5.0 slave
can't read a 6.0 master, this will show up when the slave can't read some
events sent by the master, and there will be error messages.
*/
if (errmsg)
{
sql_print_error(errmsg);
return 1;
}
MYSQL_RES *master_clock_res;
MYSQL_ROW master_clock_row;
time_t slave_clock;
@ -1385,7 +1538,7 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
if (open_log(&rli->relay_log, glob_hostname, opt_relay_logname,
"-relay-bin", opt_relaylog_index_name,
LOG_BIN, 1 /* read_append cache */,
1 /* no auto events */,
0 /* starting from 5.0 we want relay logs to have auto events */,
max_relay_log_size ? max_relay_log_size : max_binlog_size))
{
sql_print_error("Failed in open_log() called from init_relay_log_info()");
@ -1419,7 +1572,7 @@ file '%s', errno %d)", fname, my_errno);
/* Init relay log with first entry in the relay index file */
if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
&msg))
&msg, 0))
{
sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
goto err;
@ -1484,7 +1637,7 @@ Failed to open the existing relay log info file '%s' (errno %d)",
rli->group_relay_log_name,
rli->group_relay_log_pos,
0 /* no data lock*/,
&msg))
&msg, 0))
{
char llbuf[22];
sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
@ -1493,8 +1646,18 @@ Failed to open the existing relay log info file '%s' (errno %d)",
goto err;
}
}
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
#ifndef DBUG_OFF
{
char llbuf1[22], llbuf2[22];
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
llstr(my_b_tell(rli->cur_log),llbuf1),
llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
}
#endif
/*
Now change the cache from READ to WRITE - must do this
before flush_relay_log_info
@ -2251,14 +2414,18 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
goto err;
}
int cmp_result;
/* The "compare and wait" main loop */
while (!thd->killed &&
init_abort_pos_wait == abort_pos_wait &&
slave_running)
{
bool pos_reached;
int cmp_result= 0;
DBUG_ASSERT(*group_master_log_name || group_master_log_pos == 0);
/*
If we are after RESET SLAVE, and the SQL slave thread has not processed
any event yet, it could be that group_master_log_name is "". In that case,
just wait for more events (as there is no sensible comparison to do).
*/
if (*group_master_log_name)
{
char *basename= group_master_log_name + dirname_length(group_master_log_name);
@ -2280,13 +2447,12 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
if (group_master_log_name_extension < log_name_extension)
cmp_result = -1 ;
else
cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
cmp_result= (group_master_log_name_extension > log_name_extension) ?
1 : 0 ;
if (((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
cmp_result > 0) || thd->killed)
break;
}
pos_reached = ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
cmp_result > 0);
if (pos_reached || thd->killed)
break;
//wait for master update, with optional timeout.
DBUG_PRINT("info",("Waiting for master update"));
@ -2343,6 +2509,11 @@ improper_arguments: %d timed_out: %d",
DBUG_RETURN( error ? error : event_count );
}
void set_slave_thread_options(THD* thd)
{
thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
OPTION_AUTO_IS_NULL;
}
/*
init_slave_thread()
@ -2359,8 +2530,7 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
thd->master_access= ~0;
thd->priv_user = 0;
thd->slave_thread = 1;
thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
OPTION_AUTO_IS_NULL;
set_slave_thread_options(thd);
/*
It's nonsense to constrain the slave threads with max_join_size; if a
query succeeded on master, we HAVE to execute it.
@ -2623,13 +2793,14 @@ bool st_relay_log_info::is_until_satisfied()
if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
{
/*
We have no cached comaprison results so we should compare log names
and cache result
/*
We have no cached comparison results so we should compare log names
and cache result.
If we are after RESET SLAVE, and the SQL slave thread has not processed
any event yet, it could be that group_master_log_name is "". In that case,
just wait for more events (as there is no sensible comparison to do).
*/
DBUG_ASSERT(*log_name || log_pos == 0);
if (*log_name)
{
const char *basename= log_name + dirname_length(log_name);
@ -2704,28 +2875,45 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
int exec_res;
/*
Skip queries originating from this server or number of
queries specified by the user in slave_skip_counter
We can't however skip event's that has something to do with the
Queries originating from this server must be skipped.
Low-level events (Format_desc, Rotate, Stop) from this server
must also be skipped. But for those we don't want to modify
group_master_log_pos, because these events did not exist on the master.
Format_desc is not completely skipped.
Skip queries specified by the user in slave_skip_counter.
We can't however skip events that has something to do with the
log files themselves.
Filtering on own server id is extremely important, to ignore execution of
events created by the creation/rotation of the relay log (remember that
now the relay log starts with its Format_desc, has a Rotate etc).
*/
if (ev->server_id == (uint32) ::server_id ||
(rli->slave_skip_counter && type_code != ROTATE_EVENT))
DBUG_PRINT("info",("type_code=%d, server_id=%d",type_code,ev->server_id));
if ((ev->server_id == (uint32) ::server_id &&
type_code!= FORMAT_DESCRIPTION_EVENT) ||
(rli->slave_skip_counter &&
type_code != ROTATE_EVENT && type_code != STOP_EVENT &&
type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT))
{
/* TODO: I/O thread should not even log events with the same server id */
rli->inc_group_relay_log_pos(ev->get_event_len(),
type_code != STOP_EVENT ? ev->log_pos : LL(0),
1/* skip lock*/);
DBUG_PRINT("info", ("event skipped"));
rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT ||
type_code == STOP_EVENT ||
type_code == FORMAT_DESCRIPTION_EVENT) ?
LL(0) : ev->log_pos,
1/* skip lock*/);
flush_relay_log_info(rli);
/*
Protect against common user error of setting the counter to 1
instead of 2 while recovering from an failed auto-increment insert
Protect against common user error of setting the counter to 1
instead of 2 while recovering from an insert which used auto_increment,
rand or user var.
*/
if (rli->slave_skip_counter &&
!((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
rli->slave_skip_counter == 1))
!((type_code == INTVAR_EVENT ||
type_code == RAND_EVENT ||
type_code == USER_VAR_EVENT) &&
rli->slave_skip_counter == 1))
--rli->slave_skip_counter;
pthread_mutex_unlock(&rli->data_lock);
delete ev;
@ -2741,7 +2929,16 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
ev->thd = thd;
exec_res = ev->exec_event(rli);
DBUG_ASSERT(rli->sql_thd==thd);
delete ev;
/*
Format_description_log_event should not be deleted because it will be
used to read info about the relay log's format; it will be deleted when
the SQL thread does not need it, i.e. when this thread terminates.
*/
if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
delete ev;
}
return exec_res;
}
else
@ -2842,7 +3039,8 @@ connected:
thd->proc_info = "Checking master version";
if (get_master_version_and_clock(mysql, mi))
goto err;
if (!mi->old_format)
if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
{
/*
Register ourselves with the master.
@ -3043,6 +3241,9 @@ err:
pthread_mutex_lock(&mi->run_lock);
mi->slave_running = 0;
mi->io_thd = 0;
/* Forget the relay log's format */
delete mi->rli.relay_log.description_event_for_queue;
mi->rli.relay_log.description_event_for_queue= 0;
// TODO: make rpl_status part of MASTER_INFO
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
mi->abort_slave = 0; // TODO: check if this is needed
@ -3137,15 +3338,38 @@ slave_begin:
if (init_relay_log_pos(rli,
rli->group_relay_log_name,
rli->group_relay_log_pos,
1 /*need data lock*/, &errmsg))
1 /*need data lock*/, &errmsg,
1 /*look for a description_event*/))
{
sql_print_error("Error initializing relay log position: %s",
errmsg);
goto err;
}
THD_CHECK_SENTRY(thd);
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
#ifndef DBUG_OFF
{
char llbuf1[22], llbuf2[22];
DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
llstr(my_b_tell(rli->cur_log),llbuf1),
llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
/*
Wonder if this is correct. I (Guilhem) wonder if my_b_tell() returns the
correct position when it's called just after my_b_seek() (the questionable
stuff is those "seek is done on next read" comments in the my_b_seek()
source code).
The crude reality is that this assertion randomly fails whereas
replication seems to work fine. And there is no easy explanation why it
fails (as we my_b_seek(rli->event_relay_log_pos) at the very end of
init_relay_log_pos() called above). Maybe the assertion would be
meaningful if we held rli->data_lock between the my_b_seek() and the
DBUG_ASSERT().
*/
#ifdef SHOULD_BE_CHECKED
DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
#endif
}
#endif
DBUG_ASSERT(rli->sql_thd == thd);
DBUG_PRINT("master_info",("log_file_name: %s position: %s",
@ -3205,11 +3429,9 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
/* When master_pos_wait() wakes up it will check this and terminate */
rli->slave_running= 0;
/*
Going out of the transaction. Necessary to mark it, in case the user
restarts replication from a non-transactional statement (with CHANGE
MASTER).
*/
/* Forget the relay log's format */
delete rli->relay_log.description_event_for_exec;
rli->relay_log.description_event_for_exec= 0;
/* Wake up master_pos_wait() */
pthread_mutex_unlock(&rli->data_lock);
DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
@ -3299,7 +3521,7 @@ static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
{
net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
Execute_load_log_event xev(thd,0,0);
xev.log_pos = mi->master_log_pos;
xev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&xev)))
{
sql_print_error("Slave I/O: error writing Exec_load event to \
@ -3313,7 +3535,6 @@ relay log");
{
cev->block = (char*)net->read_pos;
cev->block_len = num_bytes;
cev->log_pos = mi->master_log_pos;
if (unlikely(mi->rli.relay_log.append(cev)))
{
sql_print_error("Slave I/O: error writing Create_file event to \
@ -3327,7 +3548,7 @@ relay log");
{
aev.block = (char*)net->read_pos;
aev.block_len = num_bytes;
aev.log_pos = mi->master_log_pos;
aev.log_pos = cev->log_pos;
if (unlikely(mi->rli.relay_log.append(&aev)))
{
sql_print_error("Slave I/O: error writing Append_block event to \
@ -3355,6 +3576,7 @@ err:
DESCRIPTION
Updates the master info with the place in the next binary
log where we should start reading.
Rotate the relay log to avoid mixed-format relay logs.
NOTES
We assume we already locked mi->data_lock
@ -3386,21 +3608,30 @@ static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
if (disconnect_slave_event_count)
events_till_disconnect++;
#endif
/*
If description_event_for_queue is format <4, there is conversion in the
relay log to the slave's format (4). And Rotate can mean upgrade or
nothing. If upgrade, it's to 5.0 or newer, so we will get a Format_desc, so
no need to reset description_event_for_queue now. And if it's nothing (same
master version as before), no need (still using the slave's format).
*/
if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
{
delete mi->rli.relay_log.description_event_for_queue;
/* start from format 3 (MySQL 4.0) again */
mi->rli.relay_log.description_event_for_queue= new
Format_description_log_event(3);
}
rotate_relay_log(mi); /* will take the right mutexes */
DBUG_RETURN(0);
}
/*
queue_old_event()
Writes a 3.23 event to the relay log.
TODO:
Test this code before release - it has to be tested on a separate
setup with 3.23 master
Reads a 3.23 event and converts it to the slave's format. This code was copied
from MySQL 4.0.
*/
static int queue_old_event(MASTER_INFO *mi, const char *buf,
static int queue_binlog_ver_1_event(MASTER_INFO *mi, const char *buf,
ulong event_len)
{
const char *errmsg = 0;
@ -3408,7 +3639,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
bool ignore_event= 0;
char *tmp_buf = 0;
RELAY_LOG_INFO *rli= &mi->rli;
DBUG_ENTER("queue_old_event");
DBUG_ENTER("queue_binlog_ver_1_event");
/*
If we get Load event, we need to pass a non-reusable buffer
@ -3432,7 +3663,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
connected to the master).
*/
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
1 /*old format*/ );
mi->rli.relay_log.description_event_for_queue);
if (unlikely(!ev))
{
sql_print_error("Read invalid event from master: '%s',\
@ -3442,7 +3673,7 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_RETURN(1);
}
pthread_mutex_lock(&mi->data_lock);
ev->log_pos = mi->master_log_pos;
ev->log_pos= mi->master_log_pos; /* 3.23 events don't contain log_pos */
switch (ev->get_type_code()) {
case STOP_EVENT:
ignore_event= 1;
@ -3467,9 +3698,11 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
{
/* We come here when and only when tmp_buf != 0 */
DBUG_ASSERT(tmp_buf);
inc_pos=event_len;
ev->log_pos+= inc_pos;
int error = process_io_create_file(mi,(Create_file_log_event*)ev);
delete ev;
mi->master_log_pos += event_len;
mi->master_log_pos += inc_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
pthread_mutex_unlock(&mi->data_lock);
my_free((char*)tmp_buf, MYF(0));
@ -3481,6 +3714,12 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
}
if (likely(!ignore_event))
{
if (ev->log_pos)
/*
Don't do it for fake Rotate events (see comment in
Log_event::Log_event(const char* buf...) in log_event.cc).
*/
ev->log_pos+= event_len; /* make log_pos be the pos of the end of the event */
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
@ -3496,10 +3735,98 @@ static int queue_old_event(MASTER_INFO *mi, const char *buf,
DBUG_RETURN(0);
}
/*
Reads a 4.0 event and converts it to the slave's format. This code was copied
from queue_binlog_ver_1_event(), with some affordable simplifications.
*/
static int queue_binlog_ver_3_event(MASTER_INFO *mi, const char *buf,
ulong event_len)
{
const char *errmsg = 0;
ulong inc_pos;
char *tmp_buf = 0;
RELAY_LOG_INFO *rli= &mi->rli;
DBUG_ENTER("queue_binlog_ver_3_event");
/* read_log_event() will adjust log_pos to be end_log_pos */
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue);
if (unlikely(!ev))
{
sql_print_error("Read invalid event from master: '%s',\
master could be corrupt but a more likely cause of this is a bug",
errmsg);
my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(1);
}
pthread_mutex_lock(&mi->data_lock);
switch (ev->get_type_code()) {
case STOP_EVENT:
goto err;
case ROTATE_EVENT:
if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
inc_pos= 0;
break;
default:
inc_pos= event_len;
break;
}
if (unlikely(rli->relay_log.append(ev)))
{
delete ev;
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(1);
}
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
delete ev;
mi->master_log_pos+= inc_pos;
err:
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(0);
}
/*
queue_old_event()
Writes a 3.23 or 4.0 event to the relay log, after converting it to the 5.0
(exactly, slave's) format. To do the conversion, we create a 5.0 event from
the 3.23/4.0 bytes, then write this event to the relay log.
TODO:
Test this code before release - it has to be tested on a separate
setup with 3.23 master or 4.0 master
*/
static int queue_old_event(MASTER_INFO *mi, const char *buf,
ulong event_len)
{
switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
{
case 1:
return queue_binlog_ver_1_event(mi,buf,event_len);
case 3:
return queue_binlog_ver_3_event(mi,buf,event_len);
default: /* unsupported format; eg version 2 */
DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
mi->rli.relay_log.description_event_for_queue->binlog_version));
return 1;
}
}
/*
queue_event()
If the event is 3.23/4.0, passes it to queue_old_event() which will convert
it. Otherwise, writes a 5.0 (or newer) event to the relay log. Then there is
no format conversion, it's pure read/write of bytes.
So a 5.0.0 slave's relay log can contain events in the slave's format or in
any >=5.0.0 format.
*/
int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
@ -3509,7 +3836,8 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
RELAY_LOG_INFO *rli= &mi->rli;
DBUG_ENTER("queue_event");
if (mi->old_format)
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
pthread_mutex_lock(&mi->data_lock);
@ -3536,7 +3864,7 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
goto err;
case ROTATE_EVENT:
{
Rotate_log_event rev(buf,event_len,0);
Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
if (unlikely(process_io_rotate(mi,&rev)))
{
error= 1;
@ -3549,6 +3877,47 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
inc_pos= 0;
break;
}
case FORMAT_DESCRIPTION_EVENT:
{
/*
Create an event, and save it (when we rotate the relay log, we will have
to write this event again).
*/
/*
We are the only thread which reads/writes description_event_for_queue. The
relay_log struct does not move (though some members of it can change), so
we needn't any lock (no rli->data_lock, no log lock).
*/
Format_description_log_event* tmp= mi->rli.relay_log.description_event_for_queue;
const char* errmsg;
if (!(mi->rli.relay_log.description_event_for_queue= (Format_description_log_event*)
Log_event::read_log_event(buf, event_len, &errmsg,
mi->rli.relay_log.description_event_for_queue)))
{
delete tmp;
error= 2;
goto err;
}
delete tmp;
/*
Set 'created' to 0, so that in next relay logs this event does not trigger
cleaning actions on the slave in Format_description_log_event::exec_event().
*/
mi->rli.relay_log.description_event_for_queue->created= 0;
/*
Though this does some conversion to the slave's format, this will
preserve the master's binlog format version, and number of event types.
*/
/*
If the event was not requested by the slave (the slave did not ask for
it), i.e. has end_log_pos=0, we do not increment mi->master_log_pos
*/
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
DBUG_PRINT("info",("binlog format is now %d",
mi->rli.relay_log.description_event_for_queue->binlog_version));
}
break;
default:
inc_pos= event_len;
break;
@ -3574,20 +3943,29 @@ int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
We still want to increment, so that we won't re-read this event from the
master if the slave IO thread is now stopped/restarted (more efficient if
the events we are ignoring are big LOAD DATA INFILE).
But events which were generated by this slave and which do not exist in
the master's binlog (i.e. Format_desc, Rotate & Stop) should not increment
mi->master_log_pos.
*/
mi->master_log_pos+= inc_pos;
if (buf[EVENT_TYPE_OFFSET]!=FORMAT_DESCRIPTION_EVENT &&
buf[EVENT_TYPE_OFFSET]!=ROTATE_EVENT &&
buf[EVENT_TYPE_OFFSET]!=STOP_EVENT)
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
}
else /* write the event to the relay log */
if (likely(!(error= rli->relay_log.appendv(buf,event_len,0))))
if (likely(!(rli->relay_log.appendv(buf,event_len,0))))
{
mi->master_log_pos+= inc_pos;
DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
}
else
error=3;
err:
pthread_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error=%d", error));
DBUG_RETURN(error);
}
@ -3612,6 +3990,7 @@ void end_relay_log_info(RELAY_LOG_INFO* rli)
}
rli->inited = 0;
rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
/*
Delete the slave's temporary tables from memory.
In the future there will be other actions than this, to ensure persistance
@ -3832,6 +4211,7 @@ static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
relay_log_pos Current log pos
pending Number of bytes already processed from the event
*/
rli->event_relay_log_pos= max(rli->event_relay_log_pos, BIN_LOG_HEADER_SIZE);
my_b_seek(cur_log,rli->event_relay_log_pos);
DBUG_RETURN(cur_log);
}
@ -3890,28 +4270,40 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
hot_log=0; // Using old binary log
}
}
#ifndef DBUG_OFF
{
/* This is an assertion which sometimes fails, let's try to track it */
char llbuf1[22], llbuf2[22];
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
/*
The next assertion sometimes (very rarely) fails, let's try to track
it
*/
DBUG_PRINT("info", ("\
Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
DBUG_PRINT("info", ("my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
llstr(my_b_tell(cur_log),llbuf1),
llstr(rli->group_relay_log_pos,llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
llstr(rli->event_relay_log_pos,llbuf2)));
DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
}
#endif
/*
Relay log is always in new format - if the master is 3.23, the
I/O thread will convert the format for us
I/O thread will convert the format for us.
A problem: the description event may be in a previous relay log. So if the
slave has been shutdown meanwhile, we would have to look in old relay
logs, which may even have been deleted. So we need to write this
description event at the beginning of the relay log.
When the relay log is created when the I/O thread starts, easy: the master
will send the description event and we will queue it.
But if the relay log is created by new_file(): then the solution is:
MYSQL_LOG::open() will write the buffered description event.
*/
if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
if ((ev=Log_event::read_log_event(cur_log,0,
rli->relay_log.description_event_for_exec)))
{
DBUG_ASSERT(thd==rli->sql_thd);
/*
read it while we have a lock, to avoid a mutex lock in
inc_event_relay_log_pos()
*/
rli->future_event_relay_log_pos= my_b_tell(cur_log);
if (hot_log)
pthread_mutex_unlock(log_lock);
DBUG_RETURN(ev);
@ -4106,8 +4498,9 @@ void rotate_relay_log(MASTER_INFO* mi)
DBUG_ENTER("rotate_relay_log");
RELAY_LOG_INFO* rli= &mi->rli;
lock_slave_threads(mi);
pthread_mutex_lock(&rli->data_lock);
/* We don't lock rli->run_lock. This would lead to deadlocks. */
pthread_mutex_lock(&mi->run_lock);
/*
We need to test inited because otherwise, new_file() will attempt to lock
LOCK_log, which may not be inited (if we're not a slave).
@ -4136,8 +4529,7 @@ void rotate_relay_log(MASTER_INFO* mi)
*/
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
end:
pthread_mutex_unlock(&rli->data_lock);
unlock_slave_threads(mi);
pthread_mutex_unlock(&mi->run_lock);
DBUG_VOID_RETURN;
}

View File

@ -67,11 +67,6 @@ extern my_bool opt_log_slave_updates;
extern ulonglong relay_log_space_limit;
struct st_master_info;
enum enum_binlog_formats {
BINLOG_FORMAT_CURRENT=0, /* 0 is important for easy 'if (mi->old_format)' */
BINLOG_FORMAT_323_LESS_57,
BINLOG_FORMAT_323_GEQ_57 };
/*
TODO: this needs to be redone, but for now it does not matter since
we do not have multi-master yet.
@ -186,6 +181,8 @@ typedef struct st_relay_log_info
ulonglong group_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
ulonglong event_relay_log_pos;
ulonglong future_event_relay_log_pos;
/*
Original log name and position of the group we're currently executing
(whose coordinates are group_relay_log_name/pos in the relay log)
@ -207,11 +204,13 @@ typedef struct st_relay_log_info
/*
InnoDB internally stores the master log position it has processed
so far; the position to store is really the sum of
pos + pending + event_len here since we must store the pos of the
END of the current log event
so far; when the InnoDB code to store this position is called, we have not
updated rli->group_master_log_pos yet. So the position is the event's
log_pos (the position of the end of the event); we save it in the variable
below. It's the *coming* group_master_log_pos (the one which will be
group_master_log_pos in the coming milliseconds).
*/
int event_len;
ulonglong future_group_master_log_pos;
time_t last_master_timestamp;
@ -285,16 +284,17 @@ typedef struct st_relay_log_info
until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_UNKNOWN;
}
inline void inc_event_relay_log_pos(ulonglong val)
inline void inc_event_relay_log_pos()
{
event_relay_log_pos+= val;
event_relay_log_pos= future_event_relay_log_pos;
}
void inc_group_relay_log_pos(ulonglong val, ulonglong log_pos, bool skip_lock=0)
void inc_group_relay_log_pos(ulonglong log_pos,
bool skip_lock=0)
{
if (!skip_lock)
pthread_mutex_lock(&data_lock);
inc_event_relay_log_pos(val);
inc_event_relay_log_pos();
group_relay_log_pos= event_relay_log_pos;
strmake(group_relay_log_name,event_relay_log_name,
sizeof(group_relay_log_name)-1);
@ -311,8 +311,31 @@ typedef struct st_relay_log_info
not advance as it should on the non-transactional slave (it advances by
big leaps, whereas it should advance by small leaps).
*/
if (log_pos) // 3.23 binlogs don't have log_posx
group_master_log_pos= log_pos+ val;
/*
In 4.x we used the event's len to compute the positions here. This is
wrong if the event was 3.23/4.0 and has been converted to 5.0, because
then the event's len is not what is was in the master's binlog, so this
will make a wrong group_master_log_pos (yes it's a bug in 3.23->4.0
replication: Exec_master_log_pos is wrong). Only way to solve this is to
have the original offset of the end of the event the relay log. This is
what we do in 5.0: log_pos has become "end_log_pos" (because the real use
of log_pos in 4.0 was to compute the end_log_pos; so better to store
end_log_pos instead of begin_log_pos.
If we had not done this fix here, the problem would also have appeared
when the slave and master are 5.0 but with different event length (for
example the slave is more recent than the master and features the event
UID). It would give false MASTER_POS_WAIT, false Exec_master_log_pos in
SHOW SLAVE STATUS, and so the user would do some CHANGE MASTER using this
value which would lead to badly broken replication.
Even the relay_log_pos will be corrupted in this case, because the len is
the relay log is not "val".
With the end_log_pos solution, we avoid computations involving lengthes.
*/
DBUG_PRINT("info", ("log_pos=%lld group_master_log_pos=%lld",
log_pos,group_master_log_pos));
if (log_pos) // some events (like fake Rotate) don't have log_pos
// when we are here, log_pos is the end of the event
group_master_log_pos= log_pos;
pthread_cond_broadcast(&data_cond);
if (!skip_lock)
pthread_mutex_unlock(&data_lock);
@ -389,7 +412,6 @@ typedef struct st_master_info
int events_till_abort;
#endif
bool inited;
enum enum_binlog_formats old_format;
volatile bool abort_slave, slave_running;
volatile ulong slave_run_id;
/*
@ -404,7 +426,7 @@ typedef struct st_master_info
long clock_diff_with_master;
st_master_info()
:ssl(0), fd(-1), io_thd(0), inited(0), old_format(BINLOG_FORMAT_CURRENT),
:ssl(0), fd(-1), io_thd(0), inited(0),
abort_slave(0),slave_running(0), slave_run_id(0)
{
host[0] = 0; user[0] = 0; password[0] = 0;
@ -535,10 +557,12 @@ void lock_slave_threads(MASTER_INFO* mi);
void unlock_slave_threads(MASTER_INFO* mi);
void init_thread_mask(int* mask,MASTER_INFO* mi,bool inverse);
int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,ulonglong pos,
bool need_data_lock, const char** errmsg);
bool need_data_lock, const char** errmsg,
bool look_for_description_event);
int purge_relay_logs(RELAY_LOG_INFO* rli, THD *thd, bool just_reset,
const char** errmsg);
void set_slave_thread_options(THD* thd);
void rotate_relay_log(MASTER_INFO* mi);
extern "C" pthread_handler_decl(handle_slave_io,arg);

View File

@ -92,6 +92,7 @@ THD::THD():user_time(0), is_fatal_error(0),
global_read_lock(0), bootstrap(0), spcont(NULL)
{
host= user= priv_user= db= ip= 0;
catalog= (char*)"std"; // the only catalog we have for now
host_or_ip= "connecting host";
locked=some_tables_deleted=no_errors=password= 0;
query_start_used= 0;

View File

@ -26,6 +26,7 @@
class Query_log_event;
class Load_log_event;
class Slave_log_event;
class Format_description_log_event;
class sp_rcontext;
class sp_cache;
@ -99,7 +100,14 @@ class MYSQL_LOG
enum cache_type io_cache_type;
bool write_error, inited;
bool need_start_event;
bool no_auto_events; // For relay binlog
/*
no_auto_events means we don't want any of these automatic events :
Start/Rotate/Stop. That is, in 4.x when we rotate a relay log, we don't want
a Rotate_log event to be written to the relay log. When we start a relay log
etc. So in 4.x this is 1 for relay logs, 0 for binlogs.
In 5.0 it's 0 for relay logs too!
*/
bool no_auto_events;
/*
The max size before rotation (usable only if log_type == LOG_BIN: binary
logs and relay logs).
@ -116,6 +124,18 @@ class MYSQL_LOG
public:
MYSQL_LOG();
~MYSQL_LOG();
/*
These describe the log's format. This is used only for relay logs.
_for_exec is used by the SQL thread, _for_queue by the I/O thread. It's
necessary to have 2 distinct objects, because the I/O thread may be reading
events in a different format from what the SQL thread is reading (consider
the case of a master which has been upgraded from 5.0 to 5.1 without doing
RESET MASTER, or from 4.x to 5.0).
*/
Format_description_log_event *description_event_for_exec,
*description_event_for_queue;
void reset_bytes_written()
{
bytes_written = 0;
@ -144,7 +164,8 @@ public:
bool open(const char *log_name,enum_log_type log_type,
const char *new_name, const char *index_file_name_arg,
enum cache_type io_cache_type_arg,
bool no_auto_events_arg, ulong max_size);
bool no_auto_events_arg, ulong max_size,
bool null_created);
void new_file(bool need_lock= 1);
bool write(THD *thd, enum enum_server_command command,
const char *format,...);
@ -590,9 +611,10 @@ public:
the connection
priv_user - The user privilege we are using. May be '' for anonymous user.
db - currently selected database
catalog - currently selected catalog
ip - client IP
*/
char *host,*user,*priv_user,*db,*ip;
char *host,*user,*priv_user,*db,*catalog,*ip;
char priv_host[MAX_HOSTNAME];
/* remote (peer) port */
uint16 peer_port;

View File

@ -48,16 +48,34 @@ int check_binlog_magic(IO_CACHE* log, const char** errmsg)
return 0;
}
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
send to the slave (because the slave may not know it if it just asked for
MASTER_LOG_FILE='', MASTER_LOG_POS=4).
< 4.0.14, fake_rotate_event() was called only if the requested pos was
4. After this version we always call it, so that a 3.23.58 slave can rely on
it to detect if the master is 4.0 (and stop) (the _fake_ Rotate event has
zeros in the good positions which, by chance, make it possible for the 3.23
slave to detect that this event is unexpected) (this is luck which happens
because the master and slave disagree on the size of the header of
Log_event).
Relying on the event length of the Rotate event instead of these well-placed
zeros was not possible as Rotate events have a variable-length part.
*/
static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
ulonglong position, const char**errmsg)
ulonglong position, const char** errmsg)
{
DBUG_ENTER("fake_rotate_event");
char header[LOG_EVENT_HEADER_LEN], buf[ROTATE_HEADER_LEN];
memset(header, 0, 4); // when does not matter
memset(header, 0, 4); // 'when' (the timestamp) does not matter, is set to 0
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = log_file_name+dirname_length(log_file_name);
uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + ROTATE_EVENT_OVERHEAD;
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0);
@ -72,9 +90,9 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
*errmsg = "failed on my_net_write()";
return -1;
DBUG_RETURN(-1);
}
return 0;
DBUG_RETURN(0);
}
static int send_file(THD *thd)
@ -310,6 +328,36 @@ int purge_master_logs_before_date(THD* thd, time_t purge_time)
return purge_error_message(thd ,res);
}
int test_for_non_eof_log_read_errors(int error, const char *errmsg)
{
if (error == LOG_READ_EOF)
return 0;
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
switch (error) {
case LOG_READ_BOGUS:
errmsg = "bogus data in log event";
break;
case LOG_READ_TOO_LARGE:
errmsg = "log event entry exceeded max_allowed_packet; \
Increase max_allowed_packet on master";
break;
case LOG_READ_IO:
errmsg = "I/O error reading log event";
break;
case LOG_READ_MEM:
errmsg = "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
errmsg = "binlog truncated in the middle of event";
break;
default:
errmsg = "unknown error reading log event on the master";
break;
}
return error;
}
/*
TODO: Clean up loop to only have one call to send_file()
*/
@ -326,6 +374,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
int error;
const char *errmsg = "Unknown error";
NET* net = &thd->net;
pthread_mutex_t *log_lock;
#ifndef DBUG_OFF
int left_events = max_binlog_dump_events;
#endif
@ -385,18 +434,25 @@ impossible position";
goto err;
}
my_b_seek(&log, pos); // Seek will done on next read
/*
We need to start a packet with something other than 255
to distiquish it from error
to distinguish it from error
*/
packet->set("\0", 1, &my_charset_bin);
packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
/*
Tell the client about the log name with a fake Rotate event;
this is needed even if we also send a Format_description_log_event just
after, because that event does not contain the binlog's name.
Note that as this Rotate event is sent before Format_description_log_event,
the slave cannot have any info to understand this event's format, so the
header len of Rotate_log_event is FROZEN
(so in 5.0 it will have a header shorter than other events except
FORMAT_DESCRIPTION_EVENT).
Before 4.0.14 we called fake_rotate_event below only if
(pos == BIN_LOG_HEADER_SIZE), because if this is false then the slave
already knows the binlog's name.
Now we always call fake_rotate_event; if the slave already knew the log's
Since, we always call fake_rotate_event; if the slave already knew the log's
name (ex: CHANGE MASTER TO MASTER_LOG_FILE=...) this is useless but does
not harm much. It is nice for 3.23 (>=.58) slaves which test Rotate events
to see if the master is 4.0 (then they choose to stop because they can't
@ -413,15 +469,72 @@ impossible position";
*/
if (fake_rotate_event(net, packet, log_file_name, pos, &errmsg))
{
/*
This error code is not perfect, as fake_rotate_event() does not read
anything from the binlog; if it fails it's because of an error in
my_net_write(), fortunately it will say it in errmsg.
*/
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
packet->set("\0", 1, &my_charset_bin);
/*
We can set log_lock now, it does not move (it's a member of mysql_bin_log,
and it's already inited, and it will be destroyed only at shutdown).
*/
log_lock = mysql_bin_log.get_log_lock();
if (pos > BIN_LOG_HEADER_SIZE)
{
/* Try to find a Format_description_log_event at the beginning of the binlog */
if (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
/*
The packet has offsets equal to the normal offsets in a binlog event
+1 (the first character is \0).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
(*packet)[EVENT_TYPE_OFFSET+1]));
if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
{
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
int4store(packet->c_ptr() +LOG_POS_OFFSET+1,0);
/* send it */
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
goto err;
}
/*
No need to save this event. We are only doing simple reads (no real
parsing of the events) so we don't need it. And so we don't need the
artificial Format_description_log_event of 3.23&4.x.
*/
}
}
else
if (test_for_non_eof_log_read_errors(error, errmsg))
goto err;
/*
else: it's EOF, nothing to do, go on reading next events, the
Format_description_log_event will be found naturally if it is written.
*/
/* reset the packet as we wrote to it in any case */
packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); if false, the Format_description_log_event
event will be found naturally. */
/* seek to the requested position, to start the requested dump */
my_b_seek(&log, pos); // Seek will done on next read
while (!net->error && net->vio != 0 && !thd->killed)
{
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
#ifndef DBUG_OFF
@ -433,7 +546,7 @@ impossible position";
goto err;
}
#endif
if (my_net_write(net, (char*)packet->ptr(), packet->length()) )
if (my_net_write(net, (char*)packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
my_errno= ER_UNKNOWN_ERROR;
@ -454,34 +567,14 @@ impossible position";
}
/*
TODO: now that we are logging the offset, check to make sure
the recorded offset and the actual match
the recorded offset and the actual match.
Guilhem 2003-06: this is not true if this master is a slave <4.0.15
running with --log-slave-updates, because then log_pos may be the offset
in the-master-of-this-master's binlog.
*/
if (error != LOG_READ_EOF)
{
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
switch (error) {
case LOG_READ_BOGUS:
errmsg = "bogus data in log event";
break;
case LOG_READ_TOO_LARGE:
errmsg = "log event entry exceeded max_allowed_packet; \
Increase max_allowed_packet on master";
break;
case LOG_READ_IO:
errmsg = "I/O error reading log event";
break;
case LOG_READ_MEM:
errmsg = "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
errmsg = "binlog truncated in the middle of event";
break;
default:
errmsg = "unknown error reading log event on the master";
break;
}
if (test_for_non_eof_log_read_errors(error, errmsg))
goto err;
}
if (!(flags & BINLOG_DUMP_NON_BLOCK) &&
mysql_bin_log.is_active(log_file_name))
@ -615,8 +708,13 @@ Increase max_allowed_packet on master";
(void) my_close(file, MYF(MY_WME));
/*
Even if the previous log contained a Rotate_log_event, we still fake
one.
Call fake_rotate_event() in case the previous log (the one which we have
just finished reading) did not contain a Rotate event (for example (I
don't know any other example) the previous log was the last one before
the master was shutdown & restarted).
This way we tell the slave about the new log's name and position.
If the binlog is 5.0, the next event we are going to read and send is
Format_description_log_event.
*/
if ((file=open_binlog(&log, log_file_name, &errmsg)) < 0 ||
fake_rotate_event(net, packet, log_file_name, BIN_LOG_HEADER_SIZE, &errmsg))
@ -1112,7 +1210,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
mi->rli.group_relay_log_name,
mi->rli.group_relay_log_pos,
0 /*no data lock*/,
&msg))
&msg, 0))
{
net_printf(thd,0,"Failed initializing relay log position: %s",msg);
unlock_slave_threads(mi);
@ -1197,6 +1295,8 @@ int show_binlog_events(THD* thd)
const char *errmsg = 0;
IO_CACHE log;
File file = -1;
Format_description_log_event *description_event= new
Format_description_log_event(3); /* MySQL 4.0 by default */
Log_event::init_show_field_list(&field_list);
if (protocol-> send_fields(&field_list, 1))
@ -1235,10 +1335,35 @@ int show_binlog_events(THD* thd)
goto err;
pthread_mutex_lock(log_lock);
/*
open_binlog() sought to position 4.
Read the first event in case it's a Format_description_log_event, to know the
format. If there's no such event, we are 3.23 or 4.x. This code, like
before, can't read 3.23 binlogs.
This code will fail on a mixed relay log (one which has Format_desc then
Rotate then Format_desc).
*/
ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event);
if (ev)
{
if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT)
description_event= (Format_description_log_event*) ev;
else
delete ev;
}
my_b_seek(&log, pos);
if (!description_event->is_valid())
{
errmsg="Invalid Format_description event; could be out of memory";
goto err;
}
for (event_count = 0;
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,0)); )
(ev = Log_event::read_log_event(&log,(pthread_mutex_t*)0,description_event)); )
{
if (event_count >= limit_start &&
ev->net_send(protocol, linfo.log_file_name, pos))
@ -1267,6 +1392,7 @@ int show_binlog_events(THD* thd)
}
err:
delete description_event;
if (file >= 0)
{
end_io_cache(&log);