Merge work:/home/bk/mysql-4.0
into mysql.sashanet.com:/reiser-data/mysql-4.0
This commit is contained in:
commit
b66cc56a0d
@ -15,7 +15,8 @@
|
||||
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
|
||||
|
||||
/* mysqltest test tool
|
||||
* See man page for more information.
|
||||
* See the manual for more information
|
||||
* TODO: document better how mysqltest works
|
||||
*
|
||||
* Written by:
|
||||
* Sasha Pachev <sasha@mysql.com>
|
||||
@ -26,9 +27,6 @@
|
||||
/**********************************************************************
|
||||
TODO:
|
||||
|
||||
- Print also the queries that returns a result to the log file; This makes
|
||||
it much easier to find out what's wrong.
|
||||
|
||||
- Do comparison line by line, instead of doing a full comparison of
|
||||
the text file. This will save space as we don't need to keep many
|
||||
results in memory. It will also make it possible to do simple
|
||||
@ -43,7 +41,7 @@
|
||||
|
||||
**********************************************************************/
|
||||
|
||||
#define MTEST_VERSION "1.13"
|
||||
#define MTEST_VERSION "1.14"
|
||||
|
||||
#include <my_global.h>
|
||||
#include <mysql_embed.h>
|
||||
@ -88,6 +86,12 @@
|
||||
#define CON_RETRY_SLEEP 2
|
||||
#define MAX_CON_TRIES 5
|
||||
|
||||
#ifndef OS2
|
||||
#define SLAVE_POLL_INTERVAL 300000 /* 0.3 of a sec */
|
||||
#else
|
||||
#defile SLAVE_POLL_INTERVAL 0.3
|
||||
#endif
|
||||
|
||||
enum {OPT_MANAGER_USER=256,OPT_MANAGER_HOST,OPT_MANAGER_PASSWD,
|
||||
OPT_MANAGER_PORT,OPT_MANAGER_WAIT_TIMEOUT};
|
||||
|
||||
@ -187,6 +191,7 @@ Q_DISABLE_RPL_PARSE, Q_EVAL_RESULT,
|
||||
Q_ENABLE_QUERY_LOG, Q_DISABLE_QUERY_LOG,
|
||||
Q_ENABLE_RESULT_LOG, Q_DISABLE_RESULT_LOG,
|
||||
Q_SERVER_START, Q_SERVER_STOP,Q_REQUIRE_MANAGER,
|
||||
Q_WAIT_FOR_SLAVE_TO_STOP,
|
||||
Q_UNKNOWN, /* Unknown command. */
|
||||
Q_COMMENT, /* Comments, ignored. */
|
||||
Q_COMMENT_WITH_COMMAND
|
||||
@ -222,7 +227,7 @@ const char *command_names[] = {
|
||||
"enable_query_log", "disable_query_log",
|
||||
"enable_result_log", "disable_result_log",
|
||||
"server_start", "server_stop",
|
||||
"require_manager",
|
||||
"require_manager", "wait_for_slave_to_stop",
|
||||
0
|
||||
};
|
||||
|
||||
@ -653,6 +658,45 @@ int open_file(const char* name)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ugly long name, but we are following the convention */
|
||||
int do_wait_for_slave_to_stop(struct st_query* __attribute__((unused)) q)
|
||||
{
|
||||
MYSQL* mysql = &cur_con->mysql;
|
||||
#ifndef OS2
|
||||
struct timeval t;
|
||||
#endif
|
||||
for (;;)
|
||||
{
|
||||
MYSQL_RES* res;
|
||||
MYSQL_ROW row;
|
||||
int done;
|
||||
LINT_INIT(res);
|
||||
|
||||
if (mysql_query(mysql,"show status like 'Slave_running'")
|
||||
|| !(res=mysql_store_result(mysql)))
|
||||
die("Query failed while probing slave for stop: %s",
|
||||
mysql_error(mysql));
|
||||
if (!(row=mysql_fetch_row(res)) || !row[1])
|
||||
{
|
||||
mysql_free_result(res);
|
||||
die("Strange result from query while probing slave for stop");
|
||||
}
|
||||
done = !strcmp(row[1],"OFF");
|
||||
mysql_free_result(res);
|
||||
if (done)
|
||||
break;
|
||||
#ifndef OS2
|
||||
t.tv_sec=0;
|
||||
t.tv_usec=SLAVE_POLL_INTERVAL;
|
||||
select(0,0,0,0,&t); /* sleep */
|
||||
#else
|
||||
DosSleep(OS2_SLAVE_POLL_INTERVAL);
|
||||
#endif
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int do_require_manager(struct st_query* __attribute__((unused)) q)
|
||||
{
|
||||
if (!manager)
|
||||
@ -2335,6 +2379,7 @@ int main(int argc, char** argv)
|
||||
case Q_DISABLE_RESULT_LOG: disable_result_log=1; break;
|
||||
case Q_SOURCE: do_source(q); break;
|
||||
case Q_SLEEP: do_sleep(q); break;
|
||||
case Q_WAIT_FOR_SLAVE_TO_STOP: do_wait_for_slave_to_stop(q); break;
|
||||
case Q_REQUIRE_MANAGER: do_require_manager(q); break;
|
||||
#ifndef EMBEDDED_LIBRARY
|
||||
case Q_SERVER_START: do_server_start(q); break;
|
||||
|
@ -643,7 +643,10 @@ extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count);
|
||||
extern int my_b_append(IO_CACHE *info,const byte *Buffer,uint Count);
|
||||
extern int my_block_write(IO_CACHE *info, const byte *Buffer,
|
||||
uint Count, my_off_t pos);
|
||||
extern int flush_io_cache(IO_CACHE *info);
|
||||
extern int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock);
|
||||
|
||||
#define flush_io_cache(info) _flush_io_cache((info),1)
|
||||
|
||||
extern int end_io_cache(IO_CACHE *info);
|
||||
extern uint my_b_fill(IO_CACHE *info);
|
||||
extern void my_b_seek(IO_CACHE *info,my_off_t pos);
|
||||
|
@ -33,7 +33,6 @@ master-bin.003
|
||||
insert into t2 values(1234);
|
||||
set insert_id=1234;
|
||||
insert into t2 values(NULL);
|
||||
slave stop;
|
||||
set sql_slave_skip_counter=1;
|
||||
slave start;
|
||||
purge master logs to 'master-bin.003';
|
||||
@ -66,7 +65,7 @@ slave stop;
|
||||
slave start;
|
||||
show slave status;
|
||||
Master_Host Master_User Master_Port Connect_retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_do_db Replicate_ignore_db Last_errno Last_error Skip_counter Exec_master_log_pos
|
||||
127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1312 master-bin.006 Yes Yes 0 0 445
|
||||
127.0.0.1 root MASTER_PORT 60 master-bin.006 445 mysql-relay-bin.004 1376 master-bin.006 Yes Yes 0 0 445
|
||||
lock tables t3 read;
|
||||
select count(*) from t3 where n >= 4;
|
||||
count(*)
|
||||
|
@ -51,9 +51,7 @@ insert into t2 values(NULL);
|
||||
connection slave;
|
||||
sync_with_master;
|
||||
|
||||
#the slave may have already stopped, so we ignore the error
|
||||
--error 0,1199
|
||||
!slave stop;
|
||||
wait_for_slave_to_stop;
|
||||
|
||||
#restart slave skipping one event
|
||||
set sql_slave_skip_counter=1;
|
||||
|
@ -808,13 +808,19 @@ int my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count)
|
||||
Buffer+=rest_length;
|
||||
Count-=rest_length;
|
||||
info->write_pos+=rest_length;
|
||||
if (flush_io_cache(info))
|
||||
if (_flush_io_cache(info,0))
|
||||
{
|
||||
unlock_append_buffer(info);
|
||||
return 1;
|
||||
}
|
||||
if (Count >= IO_SIZE)
|
||||
{ /* Fill first intern buffer */
|
||||
length=Count & (uint) ~(IO_SIZE-1);
|
||||
if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
|
||||
{
|
||||
unlock_append_buffer(info);
|
||||
return info->error= -1;
|
||||
}
|
||||
Count-=length;
|
||||
Buffer+=length;
|
||||
}
|
||||
@ -883,14 +889,16 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
|
||||
|
||||
/* Flush write cache */
|
||||
|
||||
int flush_io_cache(IO_CACHE *info)
|
||||
int _flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
|
||||
{
|
||||
uint length;
|
||||
my_bool append_cache;
|
||||
my_off_t pos_in_file;
|
||||
DBUG_ENTER("flush_io_cache");
|
||||
|
||||
append_cache = (info->type == SEQ_READ_APPEND);
|
||||
if (!(append_cache = (info->type == SEQ_READ_APPEND)))
|
||||
need_append_buffer_lock=0;
|
||||
|
||||
if (info->type == WRITE_CACHE || append_cache)
|
||||
{
|
||||
if (info->file == -1)
|
||||
@ -898,6 +906,8 @@ int flush_io_cache(IO_CACHE *info)
|
||||
if (real_open_cached_file(info))
|
||||
DBUG_RETURN((info->error= -1));
|
||||
}
|
||||
if (need_append_buffer_lock)
|
||||
lock_append_buffer(info);
|
||||
if ((length=(uint) (info->write_pos - info->write_buffer)))
|
||||
{
|
||||
pos_in_file=info->pos_in_file;
|
||||
@ -909,6 +919,8 @@ int flush_io_cache(IO_CACHE *info)
|
||||
if (my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)) ==
|
||||
MY_FILEPOS_ERROR)
|
||||
{
|
||||
if (need_append_buffer_lock)
|
||||
unlock_append_buffer(info);
|
||||
DBUG_RETURN((info->error= -1));
|
||||
}
|
||||
if (!append_cache)
|
||||
@ -932,6 +944,8 @@ int flush_io_cache(IO_CACHE *info)
|
||||
info->end_of_file+=(info->write_pos-info->append_read_pos);
|
||||
|
||||
info->append_read_pos=info->write_pos=info->write_buffer;
|
||||
if (need_append_buffer_lock)
|
||||
unlock_append_buffer(info);
|
||||
DBUG_RETURN(info->error);
|
||||
}
|
||||
}
|
||||
@ -942,6 +956,8 @@ int flush_io_cache(IO_CACHE *info)
|
||||
info->inited=0;
|
||||
}
|
||||
#endif
|
||||
if (need_append_buffer_lock)
|
||||
unlock_append_buffer(info);
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
25
sql/log.cc
25
sql/log.cc
@ -703,12 +703,37 @@ void MYSQL_LOG::new_file(bool inside_mutex)
|
||||
}
|
||||
}
|
||||
|
||||
bool MYSQL_LOG::append(Log_event* ev)
|
||||
{
|
||||
bool error = 0;
|
||||
pthread_mutex_lock(&LOCK_log);
|
||||
|
||||
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
|
||||
// Log_event::write() is smart enough to use my_b_write() or
|
||||
// my_b_append() depending on the kind of cache we have
|
||||
if (ev->write(&log_file))
|
||||
{
|
||||
error=1;
|
||||
goto err;
|
||||
}
|
||||
if ((uint)my_b_append_tell(&log_file) > max_binlog_size)
|
||||
{
|
||||
new_file(1);
|
||||
}
|
||||
signal_update();
|
||||
err:
|
||||
pthread_mutex_unlock(&LOCK_log);
|
||||
return error;
|
||||
}
|
||||
|
||||
bool MYSQL_LOG::appendv(const char* buf, uint len,...)
|
||||
{
|
||||
bool error = 0;
|
||||
va_list(args);
|
||||
va_start(args,len);
|
||||
|
||||
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
|
||||
|
||||
pthread_mutex_lock(&LOCK_log);
|
||||
do
|
||||
{
|
||||
|
@ -26,6 +26,18 @@
|
||||
|
||||
#include <assert.h>
|
||||
|
||||
inline int my_b_safe_write(IO_CACHE* file, const char* buf,
|
||||
int len)
|
||||
{
|
||||
// Sasha: We are not writing this with the ? operator to avoid hitting
|
||||
// a possible compiler bug. At least gcc 2.95 cannot deal with
|
||||
// several layers of ternary operators that evaluated comma(,) operator
|
||||
// expressions inside - I do have a test case if somebody wants it
|
||||
if (file->type == SEQ_READ_APPEND)
|
||||
return my_b_append(file,buf,len);
|
||||
return my_b_write(file,buf,len);
|
||||
}
|
||||
|
||||
#ifdef MYSQL_CLIENT
|
||||
static void pretty_print_str(FILE* file, char* str, int len)
|
||||
{
|
||||
@ -403,7 +415,7 @@ int Log_event::write_header(IO_CACHE* file)
|
||||
pos += 4;
|
||||
int2store(pos, flags);
|
||||
pos += 2;
|
||||
return (my_b_write(file, (byte*) buf, (uint) (pos - buf)));
|
||||
return (my_b_safe_write(file, (byte*) buf, (uint) (pos - buf)));
|
||||
}
|
||||
|
||||
#ifndef MYSQL_CLIENT
|
||||
@ -677,7 +689,7 @@ int Start_log_event::write_data(IO_CACHE* file)
|
||||
int2store(buff + ST_BINLOG_VER_OFFSET,binlog_version);
|
||||
memcpy(buff + ST_SERVER_VER_OFFSET,server_version,ST_SERVER_VER_LEN);
|
||||
int4store(buff + ST_CREATED_OFFSET,created);
|
||||
return (my_b_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
|
||||
return (my_b_safe_write(file, (byte*) buff, sizeof(buff)) ? -1 : 0);
|
||||
}
|
||||
|
||||
Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
|
||||
@ -714,8 +726,8 @@ int Rotate_log_event::write_data(IO_CACHE* file)
|
||||
{
|
||||
char buf[ROTATE_HEADER_LEN];
|
||||
int8store(buf, pos + R_POS_OFFSET);
|
||||
return my_b_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
|
||||
my_b_write(file, (byte*)new_log_ident, (uint) ident_len);
|
||||
return my_b_safe_write(file, (byte*)buf, ROTATE_HEADER_LEN) ||
|
||||
my_b_safe_write(file, (byte*)new_log_ident, (uint) ident_len);
|
||||
}
|
||||
|
||||
#ifndef MYSQL_CLIENT
|
||||
@ -812,9 +824,9 @@ int Query_log_event::write_data(IO_CACHE* file)
|
||||
buf[Q_DB_LEN_OFFSET] = (char)db_len;
|
||||
int2store(buf + Q_ERR_CODE_OFFSET, error_code);
|
||||
|
||||
return (my_b_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
|
||||
my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
|
||||
my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
|
||||
return (my_b_safe_write(file, (byte*) buf, QUERY_HEADER_LEN) ||
|
||||
my_b_safe_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
|
||||
my_b_safe_write(file, (byte*) query, q_len)) ? -1 : 0;
|
||||
}
|
||||
|
||||
Intvar_log_event::Intvar_log_event(const char* buf, bool old_format):
|
||||
@ -840,7 +852,7 @@ int Intvar_log_event::write_data(IO_CACHE* file)
|
||||
char buf[9];
|
||||
buf[I_TYPE_OFFSET] = type;
|
||||
int8store(buf + I_VAL_OFFSET, val);
|
||||
return my_b_write(file, (byte*) buf, sizeof(buf));
|
||||
return my_b_safe_write(file, (byte*) buf, sizeof(buf));
|
||||
}
|
||||
|
||||
#ifdef MYSQL_CLIENT
|
||||
@ -878,7 +890,7 @@ int Load_log_event::write_data_header(IO_CACHE* file)
|
||||
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
|
||||
buf[L_DB_LEN_OFFSET] = (char)db_len;
|
||||
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
|
||||
return my_b_write(file, (byte*)buf, LOAD_HEADER_LEN);
|
||||
return my_b_safe_write(file, (byte*)buf, LOAD_HEADER_LEN);
|
||||
}
|
||||
|
||||
int Load_log_event::write_data_body(IO_CACHE* file)
|
||||
@ -886,20 +898,20 @@ int Load_log_event::write_data_body(IO_CACHE* file)
|
||||
if (sql_ex.write_data(file)) return 1;
|
||||
if (num_fields && fields && field_lens)
|
||||
{
|
||||
if (my_b_write(file, (byte*)field_lens, num_fields) ||
|
||||
my_b_write(file, (byte*)fields, field_block_len))
|
||||
if (my_b_safe_write(file, (byte*)field_lens, num_fields) ||
|
||||
my_b_safe_write(file, (byte*)fields, field_block_len))
|
||||
return 1;
|
||||
}
|
||||
return (my_b_write(file, (byte*)table_name, table_name_len + 1) ||
|
||||
my_b_write(file, (byte*)db, db_len + 1) ||
|
||||
my_b_write(file, (byte*)fname, fname_len));
|
||||
return (my_b_safe_write(file, (byte*)table_name, table_name_len + 1) ||
|
||||
my_b_safe_write(file, (byte*)db, db_len + 1) ||
|
||||
my_b_safe_write(file, (byte*)fname, fname_len));
|
||||
}
|
||||
|
||||
|
||||
static bool write_str(IO_CACHE *file, char *str, byte length)
|
||||
{
|
||||
return (my_b_write(file, &length, 1) ||
|
||||
my_b_write(file, (byte*) str, (int) length));
|
||||
return (my_b_safe_write(file, &length, 1) ||
|
||||
my_b_safe_write(file, (byte*) str, (int) length));
|
||||
}
|
||||
|
||||
int sql_ex_info::write_data(IO_CACHE* file)
|
||||
@ -911,7 +923,7 @@ int sql_ex_info::write_data(IO_CACHE* file)
|
||||
write_str(file, line_term, line_term_len) ||
|
||||
write_str(file, line_start, line_start_len) ||
|
||||
write_str(file, escaped, escaped_len) ||
|
||||
my_b_write(file,(byte*) &opt_flags,1));
|
||||
my_b_safe_write(file,(byte*) &opt_flags,1));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -923,7 +935,7 @@ int sql_ex_info::write_data(IO_CACHE* file)
|
||||
old_ex.escaped= *escaped;
|
||||
old_ex.opt_flags= opt_flags;
|
||||
old_ex.empty_flags=empty_flags;
|
||||
return my_b_write(file, (byte*) &old_ex, sizeof(old_ex));
|
||||
return my_b_safe_write(file, (byte*) &old_ex, sizeof(old_ex));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1280,7 +1292,7 @@ int Slave_log_event::write_data(IO_CACHE* file)
|
||||
int8store(mem_pool + SL_MASTER_POS_OFFSET, master_pos);
|
||||
int2store(mem_pool + SL_MASTER_PORT_OFFSET, master_port);
|
||||
// log and host are already there
|
||||
return my_b_write(file, (byte*)mem_pool, get_data_size());
|
||||
return my_b_safe_write(file, (byte*)mem_pool, get_data_size());
|
||||
}
|
||||
|
||||
void Slave_log_event::init_from_mem_pool(int data_size)
|
||||
@ -1330,8 +1342,8 @@ int Create_file_log_event::write_data_body(IO_CACHE* file)
|
||||
int res;
|
||||
if ((res = Load_log_event::write_data_body(file)) || fake_base)
|
||||
return res;
|
||||
return (my_b_write(file, (byte*) "", 1) ||
|
||||
my_b_write(file, (byte*) block, block_len));
|
||||
return (my_b_safe_write(file, (byte*) "", 1) ||
|
||||
my_b_safe_write(file, (byte*) block, block_len));
|
||||
}
|
||||
|
||||
int Create_file_log_event::write_data_header(IO_CACHE* file)
|
||||
@ -1341,7 +1353,7 @@ int Create_file_log_event::write_data_header(IO_CACHE* file)
|
||||
return res;
|
||||
byte buf[CREATE_FILE_HEADER_LEN];
|
||||
int4store(buf + CF_FILE_ID_OFFSET, file_id);
|
||||
return my_b_write(file, buf, CREATE_FILE_HEADER_LEN);
|
||||
return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN);
|
||||
}
|
||||
|
||||
int Create_file_log_event::write_base(IO_CACHE* file)
|
||||
@ -1423,8 +1435,8 @@ int Append_block_log_event::write_data(IO_CACHE* file)
|
||||
{
|
||||
byte buf[APPEND_BLOCK_HEADER_LEN];
|
||||
int4store(buf + AB_FILE_ID_OFFSET, file_id);
|
||||
return (my_b_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
|
||||
my_b_write(file, (byte*) block, block_len));
|
||||
return (my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
|
||||
my_b_safe_write(file, (byte*) block, block_len));
|
||||
}
|
||||
|
||||
#ifdef MYSQL_CLIENT
|
||||
@ -1473,7 +1485,7 @@ int Delete_file_log_event::write_data(IO_CACHE* file)
|
||||
{
|
||||
byte buf[DELETE_FILE_HEADER_LEN];
|
||||
int4store(buf + DF_FILE_ID_OFFSET, file_id);
|
||||
return my_b_write(file, buf, DELETE_FILE_HEADER_LEN);
|
||||
return my_b_safe_write(file, buf, DELETE_FILE_HEADER_LEN);
|
||||
}
|
||||
|
||||
#ifdef MYSQL_CLIENT
|
||||
@ -1520,7 +1532,7 @@ int Execute_load_log_event::write_data(IO_CACHE* file)
|
||||
{
|
||||
byte buf[EXEC_LOAD_HEADER_LEN];
|
||||
int4store(buf + EL_FILE_ID_OFFSET, file_id);
|
||||
return my_b_write(file, buf, EXEC_LOAD_HEADER_LEN);
|
||||
return my_b_safe_write(file, buf, EXEC_LOAD_HEADER_LEN);
|
||||
}
|
||||
|
||||
#ifdef MYSQL_CLIENT
|
||||
|
91
sql/slave.cc
91
sql/slave.cc
@ -54,6 +54,9 @@ static int stuck_count = 0;
|
||||
typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL} SLAVE_THD_TYPE;
|
||||
|
||||
void skip_load_data_infile(NET* net);
|
||||
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev);
|
||||
static int queue_old_event(MASTER_INFO* mi, const char* buf,
|
||||
uint event_len);
|
||||
static inline bool slave_killed(THD* thd,MASTER_INFO* mi);
|
||||
static inline bool slave_killed(THD* thd,RELAY_LOG_INFO* rli);
|
||||
static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type);
|
||||
@ -1918,26 +1921,15 @@ the slave SQL thread with \"mysqladmin start-slave\". We stopped at log \
|
||||
DBUG_RETURN(0); // Can't return anything here
|
||||
}
|
||||
|
||||
int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
|
||||
static int process_io_rotate(MASTER_INFO* mi, Rotate_log_event* rev)
|
||||
{
|
||||
int error;
|
||||
bool inc_pos = 1;
|
||||
if (mi->old_format)
|
||||
return 1; // TODO: deal with old format
|
||||
|
||||
switch (buf[EVENT_TYPE_OFFSET])
|
||||
{
|
||||
case ROTATE_EVENT:
|
||||
{
|
||||
Rotate_log_event rev(buf,event_len,0);
|
||||
if (!rev.is_valid())
|
||||
if (!rev->is_valid())
|
||||
return 1;
|
||||
DBUG_ASSERT(rev.ident_len<sizeof(mi->master_log_name));
|
||||
memcpy(mi->master_log_name,rev.new_log_ident,
|
||||
rev.ident_len);
|
||||
mi->master_log_name[rev.ident_len] = 0;
|
||||
mi->master_log_pos = rev.pos;
|
||||
inc_pos = 0;
|
||||
DBUG_ASSERT(rev->ident_len<sizeof(mi->master_log_name));
|
||||
memcpy(mi->master_log_name,rev->new_log_ident,
|
||||
rev->ident_len);
|
||||
mi->master_log_name[rev->ident_len] = 0;
|
||||
mi->master_log_pos = rev->pos;
|
||||
#ifndef DBUG_OFF
|
||||
/* if we do not do this, we will be getting the first
|
||||
rotate event forever, so
|
||||
@ -1946,6 +1938,69 @@ int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
|
||||
if (disconnect_slave_event_count)
|
||||
events_till_disconnect++;
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int queue_old_event(MASTER_INFO* mi, const char* buf,
|
||||
uint event_len)
|
||||
{
|
||||
const char* errmsg = 0;
|
||||
bool inc_pos = 1;
|
||||
Log_event* ev = Log_event::read_log_event(buf,event_len, &errmsg,
|
||||
1/*old format*/);
|
||||
if (!ev)
|
||||
{
|
||||
sql_print_error("Read invalid event from master: '%s',\
|
||||
master could be corrupt but a more likely cause of this is a bug",
|
||||
errmsg);
|
||||
return 1;
|
||||
}
|
||||
ev->log_pos = mi->master_log_pos;
|
||||
switch (ev->get_type_code())
|
||||
{
|
||||
case ROTATE_EVENT:
|
||||
if (process_io_rotate(mi,(Rotate_log_event*)ev))
|
||||
{
|
||||
delete ev;
|
||||
return 1;
|
||||
}
|
||||
inc_pos = 0;
|
||||
break;
|
||||
case LOAD_EVENT:
|
||||
// TODO: actually process it
|
||||
mi->master_log_pos += event_len;
|
||||
return 0;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
if (mi->rli.relay_log.append(ev))
|
||||
{
|
||||
delete ev;
|
||||
return 1;
|
||||
}
|
||||
delete ev;
|
||||
if (inc_pos)
|
||||
mi->master_log_pos += event_len;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int queue_event(MASTER_INFO* mi,const char* buf,uint event_len)
|
||||
{
|
||||
int error;
|
||||
bool inc_pos = 1;
|
||||
if (mi->old_format)
|
||||
return queue_old_event(mi,buf,event_len);
|
||||
// TODO: figure out if other events in addition to Rotate
|
||||
// require special processing
|
||||
switch (buf[EVENT_TYPE_OFFSET])
|
||||
{
|
||||
case ROTATE_EVENT:
|
||||
{
|
||||
Rotate_log_event rev(buf,event_len,0);
|
||||
if (process_io_rotate(mi,&rev))
|
||||
return 1;
|
||||
inc_pos=0;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
|
@ -108,6 +108,7 @@ public:
|
||||
//v stands for vector
|
||||
//invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
|
||||
bool appendv(const char* buf,uint len,...);
|
||||
bool append(Log_event* ev);
|
||||
|
||||
int generate_new_name(char *new_name,const char *old_name);
|
||||
void make_log_name(char* buf, const char* log_ident);
|
||||
|
Loading…
x
Reference in New Issue
Block a user