replication updates. This changeset seems to be working fine on test systems.
If no problems are discovered in the next week, this will make the replication code ready for 4.0.2 release.
This commit is contained in:
parent
ab1fdbdeac
commit
be388422eb
@ -706,8 +706,6 @@ char ***_sframep_ __attribute__((unused)))
|
||||
if (!_no_db_)
|
||||
{
|
||||
int save_errno=errno;
|
||||
if (!init_done)
|
||||
_db_push_ (_DBUG_START_CONDITION_);
|
||||
/* Sasha: the test below is so we could call functions with DBUG_ENTER
|
||||
before my_thread_init(). I needed this because I suspected corruption
|
||||
of a block allocated by my_thread_init() itself, so I wanted to use
|
||||
@ -715,6 +713,8 @@ char ***_sframep_ __attribute__((unused)))
|
||||
*/
|
||||
if (!(state=code_state()))
|
||||
return;
|
||||
if (!init_done)
|
||||
_db_push_ (_DBUG_START_CONDITION_);
|
||||
|
||||
*_sfunc_ = state->func;
|
||||
*_sfile_ = state->file;
|
||||
@ -794,10 +794,10 @@ uint *_slevel_)
|
||||
if (!_no_db_)
|
||||
{
|
||||
int save_errno=errno;
|
||||
if (!(state=code_state()))
|
||||
return;
|
||||
if (!init_done)
|
||||
_db_push_ ("");
|
||||
if (!(state=code_state()))
|
||||
return; /* Only happens at end of program */
|
||||
if (stack->flags & (TRACE_ON | DEBUG_ON | PROFILE_ON))
|
||||
{
|
||||
if (!state->locked)
|
||||
|
@ -1911,45 +1911,6 @@ int main(int argc, char **argv)
|
||||
using_update_log=1;
|
||||
}
|
||||
|
||||
init_slave();
|
||||
|
||||
if (opt_bin_log && !server_id)
|
||||
{
|
||||
server_id= !master_host ? 1 : 2;
|
||||
switch (server_id) {
|
||||
#ifdef EXTRA_DEBUG
|
||||
case 1:
|
||||
sql_print_error("\
|
||||
Warning: You have enabled the binary log, but you haven't set server-id:\n\
|
||||
Updates will be logged to the binary log, but connections to slaves will\n\
|
||||
not be accepted.");
|
||||
break;
|
||||
#endif
|
||||
case 2:
|
||||
sql_print_error("\
|
||||
Warning: You should set server-id to a non-0 value if master_host is set.\n\
|
||||
The server will not act as a slave.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (opt_bin_log)
|
||||
{
|
||||
if (!opt_bin_logname)
|
||||
{
|
||||
char tmp[FN_REFLEN];
|
||||
/* TODO: The following should be using fn_format(); We just need to
|
||||
first change fn_format() to cut the file name if it's too long.
|
||||
*/
|
||||
strmake(tmp,glob_hostname,FN_REFLEN-5);
|
||||
strmov(strcend(tmp,'.'),"-bin");
|
||||
opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
|
||||
}
|
||||
mysql_bin_log.set_index_file_name(opt_binlog_index_name);
|
||||
open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
|
||||
LOG_BIN);
|
||||
using_update_log=1;
|
||||
}
|
||||
|
||||
if (opt_slow_log)
|
||||
open_log(&mysql_slow_log, glob_hostname, opt_slow_logname, "-slow.log",
|
||||
LOG_NORMAL);
|
||||
@ -2020,6 +1981,46 @@ The server will not act as a slave.");
|
||||
if (!opt_noacl)
|
||||
udf_init();
|
||||
#endif
|
||||
/* init_slave() must be called after the thread keys are created */
|
||||
init_slave();
|
||||
|
||||
if (opt_bin_log && !server_id)
|
||||
{
|
||||
server_id= !master_host ? 1 : 2;
|
||||
switch (server_id) {
|
||||
#ifdef EXTRA_DEBUG
|
||||
case 1:
|
||||
sql_print_error("\
|
||||
Warning: You have enabled the binary log, but you haven't set server-id:\n\
|
||||
Updates will be logged to the binary log, but connections to slaves will\n\
|
||||
not be accepted.");
|
||||
break;
|
||||
#endif
|
||||
case 2:
|
||||
sql_print_error("\
|
||||
Warning: You should set server-id to a non-0 value if master_host is set.\n\
|
||||
The server will not act as a slave.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (opt_bin_log)
|
||||
{
|
||||
if (!opt_bin_logname)
|
||||
{
|
||||
char tmp[FN_REFLEN];
|
||||
/* TODO: The following should be using fn_format(); We just need to
|
||||
first change fn_format() to cut the file name if it's too long.
|
||||
*/
|
||||
strmake(tmp,glob_hostname,FN_REFLEN-5);
|
||||
strmov(strcend(tmp,'.'),"-bin");
|
||||
opt_bin_logname=my_strdup(tmp,MYF(MY_WME));
|
||||
}
|
||||
mysql_bin_log.set_index_file_name(opt_binlog_index_name);
|
||||
open_log(&mysql_bin_log, glob_hostname, opt_bin_logname, "-bin",
|
||||
LOG_BIN);
|
||||
using_update_log=1;
|
||||
}
|
||||
|
||||
|
||||
if (opt_bootstrap)
|
||||
{
|
||||
|
@ -108,7 +108,11 @@ net_printf(NET *net, uint errcode, ...)
|
||||
thd->query_error = 1; // if we are here, something is wrong :-)
|
||||
query_cache_abort(net); // Safety
|
||||
va_start(args,errcode);
|
||||
format=ER(errcode);
|
||||
// Sasha: this is needed to make net_printf() work with 0 argument for
|
||||
// errorcode and use the argument after that as the format string. This
|
||||
// is usefull for rare errors that are not worth the hassle to put in
|
||||
// errmsg.sys, but at the same time, the message is not fixed text
|
||||
format=errcode ? ER(errcode) : va_arg(args,char*);
|
||||
offset= net->return_errno ? 2 : 0;
|
||||
text_pos=(char*) net->buff+head_length+offset+1;
|
||||
(void) vsprintf(my_const_cast(char*) (text_pos),format,args);
|
||||
|
61
sql/slave.cc
61
sql/slave.cc
@ -166,6 +166,7 @@ int init_relay_log_pos(RELAY_LOG_INFO* rli,const char* log,
|
||||
ulonglong pos, bool need_data_lock,
|
||||
const char** errmsg)
|
||||
{
|
||||
*errmsg=0;
|
||||
if (rli->log_pos_current)
|
||||
return 0;
|
||||
pthread_mutex_t *log_lock=rli->relay_log.get_log_lock();
|
||||
@ -348,6 +349,7 @@ int terminate_slave_thread(THD* thd, pthread_mutex_t* term_lock,
|
||||
/* is is criticate to test if the slave is running. Otherwise, we might
|
||||
be referening freed memory trying to kick it
|
||||
*/
|
||||
THD_CHECK_SENTRY(thd);
|
||||
if (*slave_running)
|
||||
{
|
||||
KICK_SLAVE(thd);
|
||||
@ -966,6 +968,8 @@ int init_relay_log_info(RELAY_LOG_INFO* rli, const char* info_fname)
|
||||
rli->cur_log_fd = -1;
|
||||
rli->slave_skip_counter=0;
|
||||
rli->log_pos_current=0;
|
||||
rli->abort_pos_wait=0;
|
||||
rli->skip_log_purge=0;
|
||||
// TODO: make this work with multi-master
|
||||
if (!opt_relay_logname)
|
||||
{
|
||||
@ -1296,9 +1300,16 @@ int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
|
||||
bool pos_reached = 0;
|
||||
int event_count = 0;
|
||||
pthread_mutex_lock(&data_lock);
|
||||
while (!thd->killed)
|
||||
abort_pos_wait=0; // abort only if master info changes during wait
|
||||
while (!thd->killed || !abort_pos_wait)
|
||||
{
|
||||
int cmp_result;
|
||||
if (abort_pos_wait)
|
||||
{
|
||||
abort_pos_wait=0;
|
||||
pthread_mutex_unlock(&data_lock);
|
||||
return -1;
|
||||
}
|
||||
DBUG_ASSERT(*master_log_name || master_log_pos == 0);
|
||||
if (*master_log_name)
|
||||
{
|
||||
@ -1350,10 +1361,7 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
|
||||
thd->thread_id = thread_id++;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
|
||||
if (init_thr_lock() ||
|
||||
my_pthread_setspecific_ptr(THR_THD, thd) ||
|
||||
my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
|
||||
my_pthread_setspecific_ptr(THR_NET, &thd->net))
|
||||
if (init_thr_lock() || thd->store_globals())
|
||||
{
|
||||
end_thread(thd,0);
|
||||
DBUG_RETURN(-1);
|
||||
@ -1367,7 +1375,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
|
||||
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
|
||||
#endif
|
||||
|
||||
thd->mem_root.free=thd->mem_root.used=0; // Probably not needed
|
||||
if (thd->max_join_size == (ulong) ~0L)
|
||||
thd->options |= OPTION_BIG_SELECTS;
|
||||
|
||||
@ -1381,7 +1388,6 @@ static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
|
||||
}
|
||||
thd->version=refresh_version;
|
||||
thd->set_time();
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
@ -1611,6 +1617,7 @@ slave_begin:
|
||||
my_thread_init();
|
||||
thd = new THD; // note that contructor of THD uses DBUG_ !
|
||||
DBUG_ENTER("handle_slave_io");
|
||||
THD_CHECK_SENTRY(thd);
|
||||
|
||||
pthread_detach_this_thread();
|
||||
if (init_slave_thread(thd, SLAVE_THD_IO))
|
||||
@ -1808,11 +1815,12 @@ err:
|
||||
DBUG_ASSERT(thd->net.buff != 0);
|
||||
net_end(&thd->net); // destructor will not free it, because net.vio is 0
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
THD_CHECK_SENTRY(thd);
|
||||
delete thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
my_thread_end(); // clean-up before broadcast
|
||||
pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
|
||||
pthread_mutex_unlock(&mi->run_lock);
|
||||
my_thread_end();
|
||||
#ifndef DBUG_OFF
|
||||
if(abort_slave_event_count && !events_till_abort)
|
||||
goto slave_begin;
|
||||
@ -1848,6 +1856,7 @@ slave_begin:
|
||||
my_thread_init();
|
||||
thd = new THD; // note that contructor of THD uses DBUG_ !
|
||||
DBUG_ENTER("handle_slave_sql");
|
||||
THD_CHECK_SENTRY(thd);
|
||||
|
||||
pthread_detach_this_thread();
|
||||
if (init_slave_thread(thd, SLAVE_THD_SQL))
|
||||
@ -1861,6 +1870,7 @@ slave_begin:
|
||||
sql_print_error("Failed during slave thread initialization");
|
||||
goto err;
|
||||
}
|
||||
THD_CHECK_SENTRY(thd);
|
||||
thd->thread_stack = (char*)&thd; // remember where our stack is
|
||||
thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
|
||||
threads.append(thd);
|
||||
@ -1891,6 +1901,7 @@ log '%s' at position %s,relay log: name='%s',pos='%s'", RPL_LOG_NAME,
|
||||
{
|
||||
thd->proc_info = "Processing master log event";
|
||||
DBUG_ASSERT(rli->sql_thd == thd);
|
||||
THD_CHECK_SENTRY(thd);
|
||||
if (exec_relay_log_event(thd,rli))
|
||||
{
|
||||
// do not scare the user if SQL thread was simply killed or stopped
|
||||
@ -1926,14 +1937,16 @@ the slave SQL thread with \"SLAVE START\". We stopped at log \
|
||||
DBUG_ASSERT(thd->net.buff != 0);
|
||||
net_end(&thd->net); // destructor will not free it, because we are weird
|
||||
DBUG_ASSERT(rli->sql_thd == thd);
|
||||
THD_CHECK_SENTRY(thd);
|
||||
rli->sql_thd = 0;
|
||||
pthread_mutex_lock(&LOCK_thread_count);
|
||||
THD_CHECK_SENTRY(thd);
|
||||
delete thd;
|
||||
pthread_mutex_unlock(&LOCK_thread_count);
|
||||
my_thread_end(); // clean-up before broadcasting termination
|
||||
pthread_cond_broadcast(&rli->stop_cond);
|
||||
// tell the world we are done
|
||||
pthread_mutex_unlock(&rli->run_lock);
|
||||
my_thread_end();
|
||||
#ifndef DBUG_OFF // TODO: reconsider the code below
|
||||
if (abort_slave_event_count && !rli->events_till_abort)
|
||||
goto slave_begin;
|
||||
@ -2431,11 +2444,33 @@ Log_event* next_event(RELAY_LOG_INFO* rli)
|
||||
my_close(rli->cur_log_fd, MYF(MY_WME));
|
||||
rli->cur_log_fd = -1;
|
||||
|
||||
// purge_first_log will properly set up relay log coordinates in rli
|
||||
if (rli->relay_log.purge_first_log(rli))
|
||||
// TODO: make skip_log_purge a start-up option. At this point this
|
||||
// is not critical priority
|
||||
if (!rli->skip_log_purge)
|
||||
{
|
||||
errmsg = "Error purging processed log";
|
||||
goto err;
|
||||
// purge_first_log will properly set up relay log coordinates in rli
|
||||
if (rli->relay_log.purge_first_log(rli))
|
||||
{
|
||||
errmsg = "Error purging processed log";
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO: verify that no lock is ok here. At this point, if we
|
||||
// get this wrong, this is actually no big deal - the only time
|
||||
// this code will ever be executed is if we are recovering from
|
||||
// a bug when a full reload of the slave is not feasible or
|
||||
// desirable.
|
||||
if (rli->relay_log.find_next_log(&rli->linfo,0/*no lock*/))
|
||||
{
|
||||
errmsg = "error switching to the next log";
|
||||
goto err;
|
||||
}
|
||||
rli->relay_log_pos = 4;
|
||||
strnmov(rli->relay_log_name,rli->linfo.log_file_name,
|
||||
sizeof(rli->relay_log_name));
|
||||
flush_relay_log_info(rli);
|
||||
}
|
||||
|
||||
// next log is hot
|
||||
|
@ -151,10 +151,13 @@ typedef struct st_relay_log_info
|
||||
char last_slave_error[MAX_SLAVE_ERRMSG];
|
||||
THD* sql_thd;
|
||||
bool log_pos_current;
|
||||
bool abort_pos_wait;
|
||||
bool skip_log_purge;
|
||||
|
||||
st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0),
|
||||
cur_log_init_count(0),
|
||||
log_pos_current(0)
|
||||
log_pos_current(0),abort_pos_wait(0),
|
||||
skip_log_purge(0)
|
||||
{
|
||||
relay_log_name[0] = master_log_name[0] = 0;
|
||||
bzero(&info_file,sizeof(info_file));
|
||||
|
@ -104,6 +104,9 @@ THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
|
||||
cond_count=0;
|
||||
convert_set=0;
|
||||
mysys_var=0;
|
||||
#ifndef DBUG_OFF
|
||||
dbug_sentry=THD_SENTRY_MAGIC;
|
||||
#endif
|
||||
net.vio=0;
|
||||
ull=0;
|
||||
system_thread=cleanup_done=0;
|
||||
@ -191,6 +194,7 @@ void THD::cleanup(void)
|
||||
|
||||
THD::~THD()
|
||||
{
|
||||
THD_CHECK_SENTRY(this);
|
||||
DBUG_ENTER("~THD()");
|
||||
/* Close connection */
|
||||
if (net.vio)
|
||||
@ -223,12 +227,16 @@ THD::~THD()
|
||||
mysys_var=0; // Safety (shouldn't be needed)
|
||||
#ifdef SIGNAL_WITH_VIO_CLOSE
|
||||
pthread_mutex_destroy(&active_vio_lock);
|
||||
#endif
|
||||
#ifndef DBUG_OFF
|
||||
dbug_sentry = THD_SENTRY_GONE;
|
||||
#endif
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
void THD::awake(bool prepare_to_die)
|
||||
{
|
||||
THD_CHECK_SENTRY(this);
|
||||
if (prepare_to_die)
|
||||
killed = 1;
|
||||
thr_alarm_kill(real_id);
|
||||
|
@ -251,6 +251,11 @@ public:
|
||||
|
||||
class delayed_insert;
|
||||
|
||||
#define THD_SENTRY_MAGIC 0xfeedd1ff
|
||||
#define THD_SENTRY_GONE 0xdeadbeef
|
||||
|
||||
#define THD_CHECK_SENTRY(thd) DBUG_ASSERT(thd->dbug_sentry == THD_SENTRY_MAGIC)
|
||||
|
||||
/* For each client connection we create a separate thread with THD serving as
|
||||
a thread/connection descriptor */
|
||||
|
||||
@ -312,6 +317,9 @@ public:
|
||||
// TODO: document the variables below
|
||||
MYSQL_LOCK *lock,*locked_tables;
|
||||
ULL *ull;
|
||||
#ifndef DBUG_OFF
|
||||
uint dbug_sentry; // watch out for memory corruption
|
||||
#endif
|
||||
struct st_my_thread_var *mysys_var;
|
||||
enum enum_server_command command;
|
||||
uint32 server_id;
|
||||
|
@ -714,7 +714,10 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
return 1;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&mi->data_lock);
|
||||
/* data lock not needed since we have already stopped the running threads,
|
||||
and we have the hold on the run locks which will keep all threads that
|
||||
could possibly modify the data structures from running
|
||||
*/
|
||||
if ((lex_mi->host || lex_mi->port) && !lex_mi->log_file_name && !lex_mi->pos)
|
||||
{
|
||||
// if we change host or port, we must reset the postion
|
||||
@ -746,6 +749,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
if (lex_mi->relay_log_name)
|
||||
{
|
||||
need_relay_log_purge = 0;
|
||||
mi->rli.skip_log_purge=1;
|
||||
strnmov(mi->rli.relay_log_name,lex_mi->relay_log_name,
|
||||
sizeof(mi->rli.relay_log_name));
|
||||
}
|
||||
@ -759,16 +763,14 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
flush_master_info(mi);
|
||||
if (need_relay_log_purge)
|
||||
{
|
||||
pthread_mutex_unlock(&mi->data_lock);
|
||||
mi->rli.skip_log_purge=0;
|
||||
thd->proc_info="purging old relay logs";
|
||||
if (purge_relay_logs(&mi->rli,0 /* not only reset, but also reinit*/,
|
||||
&errmsg))
|
||||
{
|
||||
send_error(&thd->net, 0, "Failed purging old relay logs");
|
||||
unlock_slave_threads(mi);
|
||||
net_printf(&thd->net, 0, "Failed purging old relay logs: %s",errmsg);
|
||||
return 1;
|
||||
}
|
||||
pthread_mutex_lock(&mi->rli.data_lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -778,6 +780,7 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
0 /*no data lock*/,
|
||||
&msg))
|
||||
{
|
||||
//Sasha: note that I had to change net_printf() to make this work
|
||||
net_printf(&thd->net,0,"Failed initializing relay log position: %s",msg);
|
||||
unlock_slave_threads(mi);
|
||||
return 1;
|
||||
@ -789,7 +792,10 @@ int change_master(THD* thd, MASTER_INFO* mi)
|
||||
sizeof(mi->rli.master_log_name));
|
||||
if (!mi->rli.master_log_name[0]) // uninitialized case
|
||||
mi->rli.master_log_pos=0;
|
||||
pthread_cond_broadcast(&mi->rli.data_cond);
|
||||
|
||||
pthread_mutex_lock(&mi->rli.data_lock);
|
||||
mi->rli.abort_pos_wait = 1;
|
||||
pthread_cond_broadcast(&mi->data_cond);
|
||||
pthread_mutex_unlock(&mi->rli.data_lock);
|
||||
|
||||
thd->proc_info = "starting slave";
|
||||
|
Loading…
x
Reference in New Issue
Block a user