MDEV-500: Session variable for server_id

MDEV-26: Global transaction id, partial commit

Change server_id to be a session variable.
User with SUPER can set it to binlog with different server_id.
Implement backward-compatible ::server_id mirror for plugins.
This commit is contained in:
unknown 2012-10-23 12:46:29 +02:00
parent be86e44280
commit ab8e8f4b27
18 changed files with 85 additions and 61 deletions

View File

@ -574,8 +574,8 @@ void Event_parse_data::check_originator_id(THD *thd)
status= Event_parse_data::SLAVESIDE_DISABLED; status= Event_parse_data::SLAVESIDE_DISABLED;
status_changed= true; status_changed= true;
} }
originator = thd->server_id; originator = thd->variables.server_id;
} }
else else
originator = server_id; originator = global_system_variables.server_id;
} }

View File

@ -6857,7 +6857,7 @@ ulonglong uuid_value;
void uuid_short_init() void uuid_short_init()
{ {
uuid_value= ((((ulonglong) server_id) << 56) + uuid_value= ((((ulonglong) global_system_variables.server_id) << 56) +
(((ulonglong) server_start_time) << 24)); (((ulonglong) server_start_time) << 24));
} }

View File

@ -686,7 +686,8 @@ bool Log_to_csv_event_handler::
/* do a write */ /* do a write */
if (table->field[1]->store(user_host, user_host_len, client_cs) || if (table->field[1]->store(user_host, user_host_len, client_cs) ||
table->field[2]->store((longlong) thread_id, TRUE) || table->field[2]->store((longlong) thread_id, TRUE) ||
table->field[3]->store((longlong) server_id, TRUE) || table->field[3]->store((longlong) global_system_variables.server_id,
TRUE) ||
table->field[4]->store(command_type, command_type_len, client_cs)) table->field[4]->store(command_type, command_type_len, client_cs))
goto err; goto err;
@ -883,7 +884,7 @@ bool Log_to_csv_event_handler::
table->field[8]->set_notnull(); table->field[8]->set_notnull();
} }
if (table->field[9]->store((longlong) server_id, TRUE)) if (table->field[9]->store((longlong)global_system_variables.server_id, TRUE))
goto err; goto err;
table->field[9]->set_notnull(); table->field[9]->set_notnull();

View File

@ -537,7 +537,7 @@ static char *slave_load_file_stem(char *buf, uint file_id,
to_unix_path(buf); to_unix_path(buf);
buf = strend(buf); buf = strend(buf);
buf = int10_to_str(::server_id, buf, 10); buf = int10_to_str(global_system_variables.server_id, buf, 10);
*buf++ = '-'; *buf++ = '-';
buf = int10_to_str(event_server_id, buf, 10); buf = int10_to_str(event_server_id, buf, 10);
*buf++ = '-'; *buf++ = '-';
@ -573,7 +573,7 @@ static void cleanup_load_tmpdir()
LOAD DATA. LOAD DATA.
*/ */
p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD)); p= strmake(prefbuf, STRING_WITH_LEN(PREFIX_SQL_LOAD));
p= int10_to_str(::server_id, p, 10); p= int10_to_str(global_system_variables.server_id, p, 10);
*(p++)= '-'; *(p++)= '-';
*p= 0; *p= 0;
@ -771,7 +771,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
crc(0), thd(thd_arg), crc(0), thd(thd_arg),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{ {
server_id= thd->server_id; server_id= thd->variables.server_id;
when= thd->start_time; when= thd->start_time;
when_sec_part=thd->start_time_sec_part; when_sec_part=thd->start_time_sec_part;
@ -796,7 +796,7 @@ Log_event::Log_event()
cache_type(Log_event::EVENT_INVALID_CACHE), crc(0), cache_type(Log_event::EVENT_INVALID_CACHE), crc(0),
thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{ {
server_id= ::server_id; server_id= global_system_variables.server_id;
/* /*
We can't call my_time() here as this would cause a call before We can't call my_time() here as this would cause a call before
my_init() is called my_init() is called
@ -928,10 +928,11 @@ Log_event::do_shall_skip(Relay_log_info *rli)
DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu," DBUG_PRINT("info", ("ev->server_id: %lu, ::server_id: %lu,"
" rli->replicate_same_server_id: %d," " rli->replicate_same_server_id: %d,"
" rli->slave_skip_counter: %lu", " rli->slave_skip_counter: %lu",
(ulong) server_id, (ulong) ::server_id, (ulong) server_id, (ulong) global_system_variables.server_id,
rli->replicate_same_server_id, rli->replicate_same_server_id,
rli->slave_skip_counter)); rli->slave_skip_counter));
if ((server_id == ::server_id && !rli->replicate_same_server_id) || if ((server_id == global_system_variables.server_id &&
!rli->replicate_same_server_id) ||
(rli->slave_skip_counter == 1 && rli->is_in_group()) || (rli->slave_skip_counter == 1 && rli->is_in_group()) ||
(flags & LOG_EVENT_SKIP_REPLICATION_F && (flags & LOG_EVENT_SKIP_REPLICATION_F &&
opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)) opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE))
@ -4818,7 +4819,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
perform, we don't call Start_log_event_v3::do_apply_event() perform, we don't call Start_log_event_v3::do_apply_event()
(this was just to update the log's description event). (this was just to update the log's description event).
*/ */
if (server_id != (uint32) ::server_id) if (server_id != (uint32) global_system_variables.server_id)
{ {
/* /*
If the event was not requested by the slave i.e. the master sent If the event was not requested by the slave i.e. the master sent
@ -4844,7 +4845,7 @@ int Format_description_log_event::do_apply_event(Relay_log_info const *rli)
int Format_description_log_event::do_update_pos(Relay_log_info *rli) int Format_description_log_event::do_update_pos(Relay_log_info *rli)
{ {
if (server_id == (uint32) ::server_id) if (server_id == (uint32) global_system_variables.server_id)
{ {
/* /*
We only increase the relay log position if we are skipping We only increase the relay log position if we are skipping
@ -5889,7 +5890,7 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
#endif #endif
DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu", DBUG_PRINT("info", ("server_id=%lu; ::server_id=%lu",
(ulong) this->server_id, (ulong) ::server_id)); (ulong) this->server_id, (ulong) global_system_variables.server_id));
DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident)); DBUG_PRINT("info", ("new_log_ident: %s", this->new_log_ident));
DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf))); DBUG_PRINT("info", ("pos: %s", llstr(this->pos, buf)));
@ -5909,7 +5910,8 @@ int Rotate_log_event::do_update_pos(Relay_log_info *rli)
5.0.0, there also are some rotates from the slave itself, in the 5.0.0, there also are some rotates from the slave itself, in the
relay log, which shall not change the group positions. relay log, which shall not change the group positions.
*/ */
if ((server_id != ::server_id || rli->replicate_same_server_id) && if ((server_id != global_system_variables.server_id ||
rli->replicate_same_server_id) &&
!is_relay_log_event() && !is_relay_log_event() &&
!rli->is_in_group()) !rli->is_in_group())
{ {

View File

@ -2426,7 +2426,7 @@ protected:
Events from ourself should be skipped, but they should not Events from ourself should be skipped, but they should not
decrease the slave skip counter. decrease the slave skip counter.
*/ */
if (this->server_id == ::server_id) if (this->server_id == global_system_variables.server_id)
return Log_event::EVENT_SKIP_IGNORE; return Log_event::EVENT_SKIP_IGNORE;
else else
return Log_event::EVENT_SKIP_NOT; return Log_event::EVENT_SKIP_NOT;
@ -2817,7 +2817,7 @@ private:
Events from ourself should be skipped, but they should not Events from ourself should be skipped, but they should not
decrease the slave skip counter. decrease the slave skip counter.
*/ */
if (this->server_id == ::server_id) if (this->server_id == global_system_variables.server_id)
return Log_event::EVENT_SKIP_IGNORE; return Log_event::EVENT_SKIP_IGNORE;
else else
return Log_event::EVENT_SKIP_NOT; return Log_event::EVENT_SKIP_NOT;

View File

@ -4937,9 +4937,9 @@ int mysqld_main(int argc, char **argv)
set_user(mysqld_user, user_info); set_user(mysqld_user, user_info);
} }
if (opt_bin_log && !server_id) if (opt_bin_log && !global_system_variables.server_id)
{ {
server_id= 1; global_system_variables.server_id= ::server_id= 1;
#ifdef EXTRA_DEBUG #ifdef EXTRA_DEBUG
sql_print_warning("You have enabled the binary log, but you haven't set " sql_print_warning("You have enabled the binary log, but you haven't set "
"server-id to a non-zero value: we force server id to 1; " "server-id to a non-zero value: we force server id to 1; "
@ -7819,6 +7819,7 @@ mysqld_get_one_option(int optid,
break; break;
case OPT_SERVER_ID: case OPT_SERVER_ID:
server_id_supplied = 1; server_id_supplied = 1;
::server_id= global_system_variables.server_id;
break; break;
case OPT_ONE_THREAD: case OPT_ONE_THREAD:
thread_handling= SCHEDULER_NO_THREADS; thread_handling= SCHEDULER_NO_THREADS;

View File

@ -89,14 +89,15 @@ void change_rpl_status(ulong from_status, ulong to_status)
void unregister_slave(THD* thd, bool only_mine, bool need_mutex) void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{ {
if (thd->server_id) uint32 thd_server_id= thd->variables.server_id;
if (thd_server_id)
{ {
if (need_mutex) if (need_mutex)
mysql_mutex_lock(&LOCK_slave_list); mysql_mutex_lock(&LOCK_slave_list);
SLAVE_INFO* old_si; SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list, if ((old_si = (SLAVE_INFO*)my_hash_search(&slave_list,
(uchar*)&thd->server_id, 4)) && (uchar*)&thd_server_id, 4)) &&
(!only_mine || old_si->thd == thd)) (!only_mine || old_si->thd == thd))
my_hash_delete(&slave_list, (uchar*)old_si); my_hash_delete(&slave_list, (uchar*)old_si);
@ -127,7 +128,7 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err2; goto err2;
thd->server_id= si->server_id= uint4korr(p); thd->variables.server_id= si->server_id= uint4korr(p);
p+= 4; p+= 4;
get_object(p,si->host, "Failed to register slave: too long 'report-host'"); get_object(p,si->host, "Failed to register slave: too long 'report-host'");
get_object(p,si->user, "Failed to register slave: too long 'report-user'"); get_object(p,si->user, "Failed to register slave: too long 'report-user'");
@ -145,7 +146,7 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
// si->rpl_recovery_rank= uint4korr(p); // si->rpl_recovery_rank= uint4korr(p);
p += 4; p += 4;
if (!(si->master_id= uint4korr(p))) if (!(si->master_id= uint4korr(p)))
si->master_id= server_id; si->master_id= global_system_variables.server_id;
si->thd= thd; si->thd= thd;
mysql_mutex_lock(&LOCK_slave_list); mysql_mutex_lock(&LOCK_slave_list);

View File

@ -176,7 +176,7 @@ void delegates_destroy()
plugins add to thd->lex will be automatically unlocked. plugins add to thd->lex will be automatically unlocked.
*/ */
#define FOREACH_OBSERVER(r, f, thd, args) \ #define FOREACH_OBSERVER(r, f, thd, args) \
param.server_id= thd->server_id; \ param.server_id= thd->variables.server_id; \
/* /*
Use a struct to make sure that they are allocated adjacent, check Use a struct to make sure that they are allocated adjacent, check
delete_dynamic(). delete_dynamic().
@ -348,7 +348,7 @@ int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
ulong hlen; ulong hlen;
Binlog_transmit_param param; Binlog_transmit_param param;
param.flags= flags; param.flags= flags;
param.server_id= thd->server_id; param.server_id= thd->variables.server_id;
int ret= 0; int ret= 0;
read_lock(); read_lock();

View File

@ -108,7 +108,7 @@ int injector::transaction::use_table(server_id_type sid, table tbl)
if ((error= check_state(TABLE_STATE))) if ((error= check_state(TABLE_STATE)))
DBUG_RETURN(error); DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id; server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid); m_thd->set_server_id(sid);
error= m_thd->binlog_write_table_map(tbl.get_table(), error= m_thd->binlog_write_table_map(tbl.get_table(),
tbl.is_transactional()); tbl.is_transactional());
@ -127,7 +127,7 @@ int injector::transaction::write_row (server_id_type sid, table tbl,
if (error) if (error)
DBUG_RETURN(error); DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id; server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid); m_thd->set_server_id(sid);
error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(), error= m_thd->binlog_write_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record); cols, colcnt, record);
@ -146,7 +146,7 @@ int injector::transaction::delete_row(server_id_type sid, table tbl,
if (error) if (error)
DBUG_RETURN(error); DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id; server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid); m_thd->set_server_id(sid);
error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(), error= m_thd->binlog_delete_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, record); cols, colcnt, record);
@ -165,7 +165,7 @@ int injector::transaction::update_row(server_id_type sid, table tbl,
if (error) if (error)
DBUG_RETURN(error); DBUG_RETURN(error);
server_id_type save_id= m_thd->server_id; server_id_type save_id= m_thd->variables.server_id;
m_thd->set_server_id(sid); m_thd->set_server_id(sid);
error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(), error= m_thd->binlog_update_row(tbl.get_table(), tbl.is_transactional(),
cols, colcnt, before, after); cols, colcnt, before, after);

View File

@ -61,7 +61,9 @@ Master_info::Master_info(LEX_STRING *connection_name_arg,
my_casedn_str(system_charset_info, cmp_connection_name.str); my_casedn_str(system_charset_info, cmp_connection_name.str);
} }
my_init_dynamic_array(&ignore_server_ids, sizeof(::server_id), 16, 16, MYF(0)); my_init_dynamic_array(&ignore_server_ids,
sizeof(global_system_variables.server_id), 16, 16,
MYF(0));
bzero((char*) &file, sizeof(file)); bzero((char*) &file, sizeof(file));
mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
@ -510,7 +512,7 @@ int flush_master_info(Master_info* mi,
char* ignore_server_ids_buf; char* ignore_server_ids_buf;
{ {
ignore_server_ids_buf= ignore_server_ids_buf=
(char *) my_malloc((sizeof(::server_id) * 3 + 1) * (char *) my_malloc((sizeof(global_system_variables.server_id) * 3 + 1) *
(1 + mi->ignore_server_ids.elements), MYF(MY_WME)); (1 + mi->ignore_server_ids.elements), MYF(MY_WME));
if (!ignore_server_ids_buf) if (!ignore_server_ids_buf)
DBUG_RETURN(1); DBUG_RETURN(1);

View File

@ -1091,7 +1091,8 @@ bool Relay_log_info::is_until_satisfied(THD *thd, Log_event *ev)
if (until_condition == UNTIL_MASTER_POS) if (until_condition == UNTIL_MASTER_POS)
{ {
if (ev && ev->server_id == (uint32) ::server_id && !replicate_same_server_id) if (ev && ev->server_id == (uint32) global_system_variables.server_id &&
!replicate_same_server_id)
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
log_name= group_master_log_name; log_name= group_master_log_name;
log_pos= (!ev)? group_master_log_pos : log_pos= (!ev)? group_master_log_pos :

View File

@ -706,7 +706,7 @@ int start_slave_thread(
if (start_lock) if (start_lock)
mysql_mutex_lock(start_lock); mysql_mutex_lock(start_lock);
if (!server_id) if (!global_system_variables.server_id)
{ {
if (start_cond) if (start_cond)
mysql_cond_broadcast(start_cond); mysql_cond_broadcast(start_cond);
@ -1403,7 +1403,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
(master_res= mysql_store_result(mysql)) && (master_res= mysql_store_result(mysql)) &&
(master_row= mysql_fetch_row(master_res))) (master_row= mysql_fetch_row(master_res)))
{ {
if ((::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) && if ((global_system_variables.server_id ==
(mi->master_id= strtoul(master_row[1], 0, 10))) &&
!mi->rli.replicate_same_server_id) !mi->rli.replicate_same_server_id)
{ {
errmsg= "The slave I/O thread stops because master and slave have equal \ errmsg= "The slave I/O thread stops because master and slave have equal \
@ -1976,7 +1977,7 @@ int register_slave_on_master(MYSQL* mysql, Master_info *mi,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int4store(pos, server_id); pos+= 4; int4store(pos, global_system_variables.server_id); pos+= 4;
pos= net_store_data(pos, (uchar*) report_host, report_host_len); pos= net_store_data(pos, (uchar*) report_host, report_host_len);
pos= net_store_data(pos, (uchar*) report_user, report_user_len); pos= net_store_data(pos, (uchar*) report_user, report_user_len);
pos= net_store_data(pos, (uchar*) report_password, report_password_len); pos= net_store_data(pos, (uchar*) report_password, report_password_len);
@ -2529,7 +2530,7 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
// TODO if big log files: Change next to int8store() // TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos); int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags); int2store(buf + 4, binlog_flags);
int4store(buf + 6, server_id); int4store(buf + 6, global_system_variables.server_id);
len = (uint) strlen(logname); len = (uint) strlen(logname);
memcpy(buf + 10, logname,len); memcpy(buf + 10, logname,len);
if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1)) if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
@ -2738,7 +2739,8 @@ int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
has a Rotate etc). has a Rotate etc).
*/ */
thd->server_id = ev->server_id; // use the original server id for logging /* Use the original server id for logging. */
thd->variables.server_id = ev->server_id;
thd->set_time(); // time the query thd->set_time(); // time the query
thd->lex->current_select= 0; thd->lex->current_select= 0;
if (!ev->when) if (!ev->when)
@ -3947,7 +3949,7 @@ static int process_io_create_file(Master_info* mi, Create_file_log_event* cev)
} }
DBUG_ASSERT(cev->inited_from_old); DBUG_ASSERT(cev->inited_from_old);
thd->file_id = cev->file_id = mi->file_id++; thd->file_id = cev->file_id = mi->file_id++;
thd->server_id = cev->server_id; thd->variables.server_id = cev->server_id;
cev_not_written = 1; cev_not_written = 1;
if (unlikely(net_request_file(net,cev->fname))) if (unlikely(net_request_file(net,cev->fname)))
@ -4620,7 +4622,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET); s_id= uint4korr(buf + SERVER_ID_OFFSET);
if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) || if ((s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
/* /*
the following conjunction deals with IGNORE_SERVER_IDS, if set the following conjunction deals with IGNORE_SERVER_IDS, if set
If the master is on the ignore list, execution of If the master is on the ignore list, execution of
@ -4651,7 +4654,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
IGNORE_SERVER_IDS it increments mi->master_log_pos IGNORE_SERVER_IDS it increments mi->master_log_pos
as well as rli->group_relay_log_pos. as well as rli->group_relay_log_pos.
*/ */
if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) || if (!(s_id == global_system_variables.server_id &&
!mi->rli.replicate_same_server_id) ||
(buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT && (buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT && buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
buf[EVENT_TYPE_OFFSET] != STOP_EVENT)) buf[EVENT_TYPE_OFFSET] != STOP_EVENT))

View File

@ -335,7 +335,7 @@ uint create_table_def_key(THD *thd, char *key,
uint key_length= (uint) (table_end-key); uint key_length= (uint) (table_end-key);
if (tmp_table) if (tmp_table)
{ {
int4store(key + key_length, thd->server_id); int4store(key + key_length, thd->variables.server_id);
int4store(key + key_length + 4, thd->variables.pseudo_thread_id); int4store(key + key_length + 4, thd->variables.pseudo_thread_id);
key_length+= TMP_TABLE_KEY_EXTRA; key_length+= TMP_TABLE_KEY_EXTRA;
} }
@ -2782,7 +2782,7 @@ bool open_table(THD *thd, TABLE_LIST *table_list, MEM_ROOT *mem_root,
{ {
DBUG_PRINT("error", DBUG_PRINT("error",
("query_id: %lu server_id: %u pseudo_thread_id: %lu", ("query_id: %lu server_id: %u pseudo_thread_id: %lu",
(ulong) table->query_id, (uint) thd->server_id, (ulong) table->query_id, (uint) thd->variables.server_id,
(ulong) thd->variables.pseudo_thread_id)); (ulong) thd->variables.pseudo_thread_id));
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr()); my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias.c_ptr());
DBUG_RETURN(TRUE); DBUG_RETURN(TRUE);
@ -6100,7 +6100,8 @@ TABLE *open_table_uncached(THD *thd, const char *path, const char *db,
("table: '%s'.'%s' path: '%s' server_id: %u " ("table: '%s'.'%s' path: '%s' server_id: %u "
"pseudo_thread_id: %lu", "pseudo_thread_id: %lu",
db, table_name, path, db, table_name, path,
(uint) thd->server_id, (ulong) thd->variables.pseudo_thread_id)); (uint) thd->variables.server_id,
(ulong) thd->variables.pseudo_thread_id));
table_list.db= (char*) db; table_list.db= (char*) db;
table_list.table_name= (char*) table_name; table_list.table_name= (char*) table_name;

View File

@ -914,7 +914,7 @@ THD::THD()
/* Variables with default values */ /* Variables with default values */
proc_info="login"; proc_info="login";
where= THD::DEFAULT_WHERE; where= THD::DEFAULT_WHERE;
server_id = ::server_id; variables.server_id = global_system_variables.server_id;
slave_net = 0; slave_net = 0;
command=COM_CONNECT; command=COM_CONNECT;
*scramble= '\0'; *scramble= '\0';
@ -5157,7 +5157,7 @@ int THD::binlog_write_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, cols, row_data, record); size_t const len= pack_row(table, cols, row_data, record);
Rows_log_event* const ev= Rows_log_event* const ev=
binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
len, is_trans, len, is_trans,
static_cast<Write_rows_log_event*>(0)); static_cast<Write_rows_log_event*>(0));
@ -5201,7 +5201,7 @@ int THD::binlog_update_row(TABLE* table, bool is_trans,
#endif #endif
Rows_log_event* const ev= Rows_log_event* const ev=
binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
before_size + after_size, is_trans, before_size + after_size, is_trans,
static_cast<Update_rows_log_event*>(0)); static_cast<Update_rows_log_event*>(0));
@ -5232,7 +5232,7 @@ int THD::binlog_delete_row(TABLE* table, bool is_trans,
size_t const len= pack_row(table, cols, row_data, record); size_t const len= pack_row(table, cols, row_data, record);
Rows_log_event* const ev= Rows_log_event* const ev=
binlog_prepare_pending_rows_event(table, server_id, cols, colcnt, binlog_prepare_pending_rows_event(table, variables.server_id, cols, colcnt,
len, is_trans, len, is_trans,
static_cast<Delete_rows_log_event*>(0)); static_cast<Delete_rows_log_event*>(0));

View File

@ -529,6 +529,7 @@ typedef struct system_variables
ulong tx_isolation; ulong tx_isolation;
ulong updatable_views_with_limit; ulong updatable_views_with_limit;
int max_user_connections; int max_user_connections;
ulong server_id;
/** /**
In slave thread we need to know in behalf of which In slave thread we need to know in behalf of which
thread the query is being run to replicate temp tables properly thread the query is being run to replicate temp tables properly
@ -1699,7 +1700,6 @@ public:
first byte of the packet in do_command() first byte of the packet in do_command()
*/ */
enum enum_server_command command; enum enum_server_command command;
uint32 server_id;
uint32 file_id; // for LOAD DATA INFILE uint32 file_id; // for LOAD DATA INFILE
/* remote (peer) port */ /* remote (peer) port */
uint16 peer_port; uint16 peer_port;
@ -1776,7 +1776,7 @@ public:
MY_BITMAP const* cols, size_t colcnt, MY_BITMAP const* cols, size_t colcnt,
const uchar *old_data, const uchar *new_data); const uchar *old_data, const uchar *new_data);
void set_server_id(uint32 sid) { server_id = sid; } void set_server_id(uint32 sid) { variables.server_id = sid; }
/* /*
Member functions to handle pending event for row-level logging. Member functions to handle pending event for row-level logging.

View File

@ -1257,10 +1257,10 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
/* TODO: The following has to be changed to an 8 byte integer */ /* TODO: The following has to be changed to an 8 byte integer */
pos = uint4korr(packet); pos = uint4korr(packet);
flags = uint2korr(packet + 4); flags = uint2korr(packet + 4);
thd->server_id=0; /* avoid suicide */ thd->variables.server_id=0; /* avoid suicide */
if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0 if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
kill_zombie_dump_threads(slave_server_id); kill_zombie_dump_threads(slave_server_id);
thd->server_id = slave_server_id; thd->variables.server_id = slave_server_id;
general_log_print(thd, command, "Log: '%s' Pos: %ld", packet+10, general_log_print(thd, command, "Log: '%s' Pos: %ld", packet+10,
(long) pos); (long) pos);

View File

@ -81,7 +81,7 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
uint ident_len = (uint) strlen(p); uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0); (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
int4store(header + EVENT_LEN_OFFSET, event_len); int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F); int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
@ -539,7 +539,7 @@ static int send_heartbeat_event(NET* net, String* packet,
uint ident_len = strlen(p); uint ident_len = strlen(p);
ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ulong event_len = ident_len + LOG_EVENT_HEADER_LEN +
(do_checksum ? BINLOG_CHECKSUM_LEN : 0); (do_checksum ? BINLOG_CHECKSUM_LEN : 0);
int4store(header + SERVER_ID_OFFSET, server_id); int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id);
int4store(header + EVENT_LEN_OFFSET, event_len); int4store(header + EVENT_LEN_OFFSET, event_len);
int2store(header + FLAGS_OFFSET, 0); int2store(header + FLAGS_OFFSET, 0);
@ -748,7 +748,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
mariadb_slave_capability= get_mariadb_slave_capability(thd); mariadb_slave_capability= get_mariadb_slave_capability(thd);
if (global_system_variables.log_warnings > 1) if (global_system_variables.log_warnings > 1)
sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
thd->server_id, log_ident, (ulong)pos); thd->variables.server_id, log_ident, (ulong)pos);
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
{ {
errmsg= "Failed to run hook 'transmit_start'"; errmsg= "Failed to run hook 'transmit_start'";
@ -1127,7 +1127,8 @@ impossible position";
int ret; int ret;
ulong signal_cnt; ulong signal_cnt;
DBUG_PRINT("wait",("waiting for data in binary log")); DBUG_PRINT("wait",("waiting for data in binary log"));
if (thd->server_id==0) // for mysqlbinlog (mysqlbinlog.server_id==0) /* For mysqlbinlog (mysqlbinlog.server_id==0). */
if (thd->variables.server_id==0)
{ {
mysql_mutex_unlock(log_lock); mysql_mutex_unlock(log_lock);
goto end; goto end;
@ -1647,7 +1648,7 @@ void kill_zombie_dump_threads(uint32 slave_server_id)
while ((tmp=it++)) while ((tmp=it++))
{ {
if (tmp->command == COM_BINLOG_DUMP && if (tmp->command == COM_BINLOG_DUMP &&
tmp->server_id == slave_server_id) tmp->variables.server_id == slave_server_id)
{ {
mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
break; break;
@ -1856,7 +1857,7 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
{ {
ulong s_id; ulong s_id;
get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i); get_dynamic(&lex_mi->repl_ignore_server_ids, (uchar*) &s_id, i);
if (s_id == ::server_id && replicate_same_server_id) if (s_id == global_system_variables.server_id && replicate_same_server_id)
{ {
my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id)); my_error(ER_SLAVE_IGNORE_SERVER_IDS, MYF(0), static_cast<int>(s_id));
ret= TRUE; ret= TRUE;

View File

@ -1988,18 +1988,28 @@ static Sys_var_charptr Sys_secure_file_priv(
CMD_LINE(REQUIRED_ARG), IN_FS_CHARSET, DEFAULT(0)); CMD_LINE(REQUIRED_ARG), IN_FS_CHARSET, DEFAULT(0));
static bool fix_server_id(sys_var *self, THD *thd, enum_var_type type) static bool fix_server_id(sys_var *self, THD *thd, enum_var_type type)
{
if (type == OPT_GLOBAL)
{ {
server_id_supplied = 1; server_id_supplied = 1;
thd->server_id= server_id; thd->variables.server_id= global_system_variables.server_id;
/*
Historically, server_id was a global variable that is exported to
plugins. Now it is a session variable, and lives in the
global_system_variables struct, but we still need to export the
value for reading to plugins for backwards compatibility reasons.
*/
::server_id= global_system_variables.server_id;
}
return false; return false;
} }
static Sys_var_ulong Sys_server_id( static Sys_var_ulong Sys_server_id(
"server_id", "server_id",
"Uniquely identifies the server instance in the community of " "Uniquely identifies the server instance in the community of "
"replication partners", "replication partners",
GLOBAL_VAR(server_id), CMD_LINE(REQUIRED_ARG, OPT_SERVER_ID), SESSION_VAR(server_id), CMD_LINE(REQUIRED_ARG, OPT_SERVER_ID),
VALID_RANGE(0, UINT_MAX32), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, VALID_RANGE(0, UINT_MAX32), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(fix_server_id)); NOT_IN_BINLOG, ON_CHECK(check_has_super), ON_UPDATE(fix_server_id));
static Sys_var_mybool Sys_slave_compressed_protocol( static Sys_var_mybool Sys_slave_compressed_protocol(
"slave_compressed_protocol", "slave_compressed_protocol",