MDEV-26: Global transaction id. Partial commit.
This commit is contained in:
parent
f5c3c2855d
commit
be86e44280
79
sql/log.cc
79
sql/log.cc
@ -5278,6 +5278,67 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
|
|||||||
DBUG_RETURN(error);
|
DBUG_RETURN(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Generate a new global transaction ID, and write it to the binlog */
|
||||||
|
bool
|
||||||
|
MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
|
||||||
|
bool is_transactional)
|
||||||
|
{
|
||||||
|
rpl_gtid gtid;
|
||||||
|
uint64 seq_no;
|
||||||
|
|
||||||
|
seq_no= thd->variables.gtid_seq_no;
|
||||||
|
/*
|
||||||
|
Reset the session variable gtid_seq_no, to reduce the risk of accidentally
|
||||||
|
producing a duplicate GTID.
|
||||||
|
*/
|
||||||
|
thd->variables.gtid_seq_no= 0;
|
||||||
|
if (seq_no != 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
If we see a higher sequence number, use that one as the basis of any
|
||||||
|
later generated sequence numbers.
|
||||||
|
|
||||||
|
This way, in simple tree replication topologies with just one master
|
||||||
|
generating events at any point in time, sequence number will always be
|
||||||
|
monotonic irrespectively of server_id. Only if events are produced in
|
||||||
|
parallel on multiple master servers will sequence id be non-monotonic
|
||||||
|
and server id needed to distinguish.
|
||||||
|
|
||||||
|
We will not rely on this in the server code, but it makes things
|
||||||
|
conceptually easier to understand for the DBA.
|
||||||
|
*/
|
||||||
|
mysql_mutex_lock(&LOCK_gtid_counter);
|
||||||
|
if (global_gtid_counter < seq_no)
|
||||||
|
global_gtid_counter= seq_no;
|
||||||
|
mysql_mutex_unlock(&LOCK_gtid_counter);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
mysql_mutex_lock(&LOCK_gtid_counter);
|
||||||
|
seq_no= ++global_gtid_counter;
|
||||||
|
mysql_mutex_unlock(&LOCK_gtid_counter);
|
||||||
|
}
|
||||||
|
gtid.seq_no= seq_no;
|
||||||
|
gtid.domain_id= thd->variables.gtid_domain_id;
|
||||||
|
|
||||||
|
Gtid_log_event gtid_event(thd, gtid.seq_no, gtid.domain_id, standalone,
|
||||||
|
LOG_EVENT_SUPPRESS_USE_F, is_transactional);
|
||||||
|
gtid.server_id= gtid_event.server_id;
|
||||||
|
|
||||||
|
/* Write the event to the binary log. */
|
||||||
|
if (gtid_event.write(&mysql_bin_log.log_file))
|
||||||
|
return true;
|
||||||
|
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
|
||||||
|
|
||||||
|
/* Update the replication state (last GTID in each replication domain). */
|
||||||
|
mysql_mutex_lock(&LOCK_rpl_gtid_state);
|
||||||
|
global_rpl_gtid_state.update(>id);
|
||||||
|
mysql_mutex_unlock(&LOCK_rpl_gtid_state);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Write an event to the binary log. If with_annotate != NULL and
|
Write an event to the binary log. If with_annotate != NULL and
|
||||||
*with_annotate = TRUE write also Annotate_rows before the event
|
*with_annotate = TRUE write also Annotate_rows before the event
|
||||||
@ -5347,6 +5408,7 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
|
|||||||
my_org_b_tell= my_b_tell(file);
|
my_org_b_tell= my_b_tell(file);
|
||||||
mysql_mutex_lock(&LOCK_log);
|
mysql_mutex_lock(&LOCK_log);
|
||||||
prev_binlog_id= current_binlog_id;
|
prev_binlog_id= current_binlog_id;
|
||||||
|
write_gtid_event(thd, true, using_trans);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -6219,19 +6281,6 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
Log "BEGIN" at the beginning of every transaction. Here, a transaction is
|
|
||||||
either a BEGIN..COMMIT block or a single statement in autocommit mode.
|
|
||||||
|
|
||||||
Create the necessary events here, where we have the correct THD (and
|
|
||||||
thread context).
|
|
||||||
|
|
||||||
Due to group commit the actual writing to binlog may happen in a different
|
|
||||||
thread.
|
|
||||||
*/
|
|
||||||
Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), using_trx_cache, TRUE,
|
|
||||||
TRUE, 0);
|
|
||||||
entry.begin_event= &qinfo;
|
|
||||||
entry.end_event= end_ev;
|
entry.end_event= end_ev;
|
||||||
if (cache_mngr->stmt_cache.has_incident() ||
|
if (cache_mngr->stmt_cache.has_incident() ||
|
||||||
cache_mngr->trx_cache.has_incident())
|
cache_mngr->trx_cache.has_incident())
|
||||||
@ -6607,10 +6656,8 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry)
|
|||||||
{
|
{
|
||||||
binlog_cache_mngr *mngr= entry->cache_mngr;
|
binlog_cache_mngr *mngr= entry->cache_mngr;
|
||||||
|
|
||||||
if (entry->begin_event->write(&log_file))
|
if (write_gtid_event(entry->thd, false, entry->using_trx_cache))
|
||||||
return ER_ERROR_ON_WRITE;
|
return ER_ERROR_ON_WRITE;
|
||||||
status_var_add(entry->thd->status_var.binlog_bytes_written,
|
|
||||||
entry->begin_event->data_written);
|
|
||||||
|
|
||||||
if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
|
if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
|
||||||
write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
|
write_cache(entry->thd, mngr->get_binlog_cache_log(FALSE)))
|
||||||
|
@ -420,11 +420,10 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG
|
|||||||
bool using_stmt_cache;
|
bool using_stmt_cache;
|
||||||
bool using_trx_cache;
|
bool using_trx_cache;
|
||||||
/*
|
/*
|
||||||
Extra events (BEGIN, COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
|
Extra events (COMMIT/ROLLBACK/XID, and possibly INCIDENT) to be
|
||||||
written during group commit. The incident_event is only valid if
|
written during group commit. The incident_event is only valid if
|
||||||
trx_data->has_incident() is true.
|
trx_data->has_incident() is true.
|
||||||
*/
|
*/
|
||||||
Log_event *begin_event;
|
|
||||||
Log_event *end_event;
|
Log_event *end_event;
|
||||||
Log_event *incident_event;
|
Log_event *incident_event;
|
||||||
/* Set during group commit to record any per-thread error. */
|
/* Set during group commit to record any per-thread error. */
|
||||||
@ -771,6 +770,7 @@ public:
|
|||||||
inline IO_CACHE *get_index_file() { return &index_file;}
|
inline IO_CACHE *get_index_file() { return &index_file;}
|
||||||
inline uint32 get_open_count() { return open_count; }
|
inline uint32 get_open_count() { return open_count; }
|
||||||
void set_status_variables(THD *thd);
|
void set_status_variables(THD *thd);
|
||||||
|
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional);
|
||||||
};
|
};
|
||||||
|
|
||||||
class Log_event_handler
|
class Log_event_handler
|
||||||
|
461
sql/log_event.cc
461
sql/log_event.cc
@ -749,6 +749,8 @@ const char* Log_event::get_type_str(Log_event_type type)
|
|||||||
case INCIDENT_EVENT: return "Incident";
|
case INCIDENT_EVENT: return "Incident";
|
||||||
case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
|
case ANNOTATE_ROWS_EVENT: return "Annotate_rows";
|
||||||
case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
|
case BINLOG_CHECKPOINT_EVENT: return "Binlog_checkpoint";
|
||||||
|
case GTID_EVENT: return "Gtid";
|
||||||
|
case GTID_LIST_EVENT: return "Gtid_list";
|
||||||
default: return "Unknown"; /* impossible */
|
default: return "Unknown"; /* impossible */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1560,6 +1562,12 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
|
|||||||
case BINLOG_CHECKPOINT_EVENT:
|
case BINLOG_CHECKPOINT_EVENT:
|
||||||
ev = new Binlog_checkpoint_log_event(buf, event_len, description_event);
|
ev = new Binlog_checkpoint_log_event(buf, event_len, description_event);
|
||||||
break;
|
break;
|
||||||
|
case GTID_EVENT:
|
||||||
|
ev = new Gtid_log_event(buf, event_len, description_event);
|
||||||
|
break;
|
||||||
|
case GTID_LIST_EVENT:
|
||||||
|
ev = new Gtid_list_log_event(buf, event_len, description_event);
|
||||||
|
break;
|
||||||
#ifdef HAVE_REPLICATION
|
#ifdef HAVE_REPLICATION
|
||||||
case SLAVE_EVENT: /* can never happen (unused event) */
|
case SLAVE_EVENT: /* can never happen (unused event) */
|
||||||
ev = new Slave_log_event(buf, event_len, description_event);
|
ev = new Slave_log_event(buf, event_len, description_event);
|
||||||
@ -3432,6 +3440,53 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Replace an event (GTID event) with a BEGIN query event, to be compatible
|
||||||
|
with an old slave.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
Query_log_event::begin_event(String *packet, ulong ev_offset,
|
||||||
|
uint8 checksum_alg)
|
||||||
|
{
|
||||||
|
uchar *p= (uchar *)packet->ptr() + ev_offset;
|
||||||
|
uchar *q= p + LOG_EVENT_HEADER_LEN;
|
||||||
|
size_t data_len= packet->length() - ev_offset;
|
||||||
|
uint16 flags;
|
||||||
|
|
||||||
|
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
|
||||||
|
data_len-= BINLOG_CHECKSUM_LEN;
|
||||||
|
else
|
||||||
|
DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
|
||||||
|
checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
|
||||||
|
|
||||||
|
/* Currently we only need to replace GTID event. */
|
||||||
|
DBUG_ASSERT(data_len == LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN);
|
||||||
|
if (data_len != LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
flags= uint2korr(p + FLAGS_OFFSET);
|
||||||
|
flags&= ~LOG_EVENT_THREAD_SPECIFIC_F;
|
||||||
|
flags|= LOG_EVENT_SUPPRESS_USE_F;
|
||||||
|
int2store(p + FLAGS_OFFSET, flags);
|
||||||
|
|
||||||
|
p[EVENT_TYPE_OFFSET]= QUERY_EVENT;
|
||||||
|
int4store(q + Q_THREAD_ID_OFFSET, 0);
|
||||||
|
int4store(q + Q_EXEC_TIME_OFFSET, 0);
|
||||||
|
q[Q_DB_LEN_OFFSET]= 0;
|
||||||
|
int2store(q + Q_ERR_CODE_OFFSET, 0);
|
||||||
|
int2store(q + Q_STATUS_VARS_LEN_OFFSET, 0);
|
||||||
|
q[Q_DATA_OFFSET]= 0; /* Zero terminator for empty db */
|
||||||
|
q+= Q_DATA_OFFSET + 1;
|
||||||
|
memcpy(q, "BEGIN", 5);
|
||||||
|
|
||||||
|
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
|
||||||
|
{
|
||||||
|
ha_checksum crc= my_checksum(0L, p, data_len);
|
||||||
|
int4store(p + data_len, crc);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#ifdef MYSQL_CLIENT
|
#ifdef MYSQL_CLIENT
|
||||||
/**
|
/**
|
||||||
@ -4454,6 +4509,8 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
|
|||||||
post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
|
post_header_len[ANNOTATE_ROWS_EVENT-1]= ANNOTATE_ROWS_HEADER_LEN;
|
||||||
post_header_len[BINLOG_CHECKPOINT_EVENT-1]=
|
post_header_len[BINLOG_CHECKPOINT_EVENT-1]=
|
||||||
BINLOG_CHECKPOINT_HEADER_LEN;
|
BINLOG_CHECKPOINT_HEADER_LEN;
|
||||||
|
post_header_len[GTID_EVENT-1]= GTID_HEADER_LEN;
|
||||||
|
post_header_len[GTID_LIST_EVENT-1]= GTID_LIST_HEADER_LEN;
|
||||||
|
|
||||||
// Sanity-check that all post header lengths are initialized.
|
// Sanity-check that all post header lengths are initialized.
|
||||||
int i;
|
int i;
|
||||||
@ -5992,6 +6049,406 @@ bool Binlog_checkpoint_log_event::write(IO_CACHE *file)
|
|||||||
#endif /* MYSQL_CLIENT */
|
#endif /* MYSQL_CLIENT */
|
||||||
|
|
||||||
|
|
||||||
|
/**************************************************************************
|
||||||
|
Global transaction ID stuff
|
||||||
|
**************************************************************************/
|
||||||
|
|
||||||
|
/**
|
||||||
|
Current replication state (hash of last GTID executed, per replication
|
||||||
|
domain).
|
||||||
|
*/
|
||||||
|
rpl_state global_rpl_gtid_state;
|
||||||
|
|
||||||
|
|
||||||
|
rpl_state::rpl_state()
|
||||||
|
{
|
||||||
|
my_hash_init(&hash, &my_charset_bin, 32,
|
||||||
|
offsetof(rpl_gtid, domain_id), sizeof(uint32),
|
||||||
|
NULL, my_free, HASH_UNIQUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
rpl_state::~rpl_state()
|
||||||
|
{
|
||||||
|
my_hash_free(&hash);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef MYSQL_SERVER
|
||||||
|
/*
|
||||||
|
Update replication state with a new GTID.
|
||||||
|
|
||||||
|
If the replication domain id already exists, then the new GTID replaces the
|
||||||
|
old one for that domain id. Else a new entry is inserted.
|
||||||
|
|
||||||
|
Returns 0 for ok, 1 for error.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
rpl_state::update(const struct rpl_gtid *gtid)
|
||||||
|
{
|
||||||
|
uchar *rec;
|
||||||
|
|
||||||
|
rec= my_hash_search(&hash, (const uchar *)gtid, 0);
|
||||||
|
if (rec)
|
||||||
|
{
|
||||||
|
const rpl_gtid *old_gtid= (const rpl_gtid *)rec;
|
||||||
|
if (old_gtid->server_id == gtid->server_id &&
|
||||||
|
old_gtid->seq_no > gtid->seq_no)
|
||||||
|
sql_print_warning("Out-of-order GTIDs detected for server_id=%u. "
|
||||||
|
"Please ensure that independent replication streams "
|
||||||
|
"use different replication domain_id to avoid "
|
||||||
|
"inconsistencies.", gtid->server_id);
|
||||||
|
else
|
||||||
|
memcpy(rec, gtid, sizeof(*gtid));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME))))
|
||||||
|
return 1;
|
||||||
|
memcpy(rec, gtid, sizeof(*gtid));
|
||||||
|
return my_hash_insert(&hash, rec);
|
||||||
|
}
|
||||||
|
#endif /* MYSQL_SERVER */
|
||||||
|
|
||||||
|
|
||||||
|
Gtid_log_event::Gtid_log_event(const char *buf, uint event_len,
|
||||||
|
const Format_description_log_event *description_event)
|
||||||
|
: Log_event(buf, description_event), seq_no(0)
|
||||||
|
{
|
||||||
|
uint8 header_size= description_event->common_header_len;
|
||||||
|
uint8 post_header_len= description_event->post_header_len[GTID_EVENT-1];
|
||||||
|
if (event_len < header_size + post_header_len ||
|
||||||
|
post_header_len < GTID_HEADER_LEN)
|
||||||
|
return;
|
||||||
|
|
||||||
|
buf+= header_size;
|
||||||
|
seq_no= uint8korr(buf);
|
||||||
|
buf+= 8;
|
||||||
|
domain_id= uint4korr(buf);
|
||||||
|
buf+= 4;
|
||||||
|
flags2= *buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef MYSQL_SERVER
|
||||||
|
|
||||||
|
Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
|
||||||
|
uint32 domain_id_arg, bool standalone,
|
||||||
|
uint16 flags_arg, bool is_transactional)
|
||||||
|
: Log_event(thd_arg, flags_arg, is_transactional),
|
||||||
|
seq_no(seq_no_arg), domain_id(domain_id_arg),
|
||||||
|
flags2(standalone ? FL_STANDALONE : 0)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
Gtid_log_event::write(IO_CACHE *file)
|
||||||
|
{
|
||||||
|
uchar buf[GTID_HEADER_LEN];
|
||||||
|
int8store(buf, seq_no);
|
||||||
|
int4store(buf+8, domain_id);
|
||||||
|
buf[12]= flags2;
|
||||||
|
bzero(buf+13, GTID_HEADER_LEN-13);
|
||||||
|
return write_header(file, GTID_HEADER_LEN) ||
|
||||||
|
wrapper_my_b_safe_write(file, buf, GTID_HEADER_LEN) ||
|
||||||
|
write_footer(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Replace a GTID event with either a BEGIN event, dummy event, or nothing, as
|
||||||
|
appropriate to work with old slave that does not know global transaction id.
|
||||||
|
|
||||||
|
The need_dummy_event argument is an IN/OUT argument. It is passed as TRUE
|
||||||
|
if slave has capability lower than MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES.
|
||||||
|
It is returned TRUE if we return a BEGIN (or dummy) event to be sent to the
|
||||||
|
slave, FALSE if event should be skipped completely.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
|
||||||
|
ulong ev_offset, uint8 checksum_alg)
|
||||||
|
{
|
||||||
|
uchar flags2;
|
||||||
|
if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)
|
||||||
|
return 1;
|
||||||
|
flags2= (*packet)[ev_offset + LOG_EVENT_HEADER_LEN + 12];
|
||||||
|
if (flags2 & FL_STANDALONE)
|
||||||
|
{
|
||||||
|
if (need_dummy_event)
|
||||||
|
return Query_log_event::dummy_event(packet, ev_offset, checksum_alg);
|
||||||
|
else
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
*need_dummy_event= true;
|
||||||
|
return Query_log_event::begin_event(packet, ev_offset, checksum_alg);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef HAVE_REPLICATION
|
||||||
|
void
|
||||||
|
Gtid_log_event::pack_info(THD *thd, Protocol *protocol)
|
||||||
|
{
|
||||||
|
char buf[6+5+10+1+10+1+20+1];
|
||||||
|
char *p;
|
||||||
|
p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID "));
|
||||||
|
if (domain_id)
|
||||||
|
{
|
||||||
|
p= longlong10_to_str(domain_id, p, 10);
|
||||||
|
*p++= '-';
|
||||||
|
}
|
||||||
|
p= longlong10_to_str(server_id, p, 10);
|
||||||
|
*p++= '-';
|
||||||
|
p= longlong10_to_str(seq_no, p, 10);
|
||||||
|
|
||||||
|
protocol->store(buf, p-buf, &my_charset_bin);
|
||||||
|
}
|
||||||
|
|
||||||
|
static char gtid_begin_string[5] = {'B','E','G','I','N'};
|
||||||
|
|
||||||
|
int
|
||||||
|
Gtid_log_event::do_apply_event(Relay_log_info const *rli)
|
||||||
|
{
|
||||||
|
const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
|
||||||
|
|
||||||
|
/* ToDo: record the new GTID. */
|
||||||
|
|
||||||
|
if (flags2 & FL_STANDALONE)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
/* Execute this like a BEGIN query event. */
|
||||||
|
thd->set_query_and_id(gtid_begin_string, sizeof(gtid_begin_string),
|
||||||
|
&my_charset_bin, next_query_id());
|
||||||
|
Parser_state parser_state;
|
||||||
|
if (!parser_state.init(thd, thd->query(), thd->query_length()))
|
||||||
|
{
|
||||||
|
mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
|
||||||
|
/* Finalize server status flags after executing a statement. */
|
||||||
|
thd->update_server_status();
|
||||||
|
log_slow_statement(thd);
|
||||||
|
general_log_write(thd, COM_QUERY, thd->query(), thd->query_length());
|
||||||
|
}
|
||||||
|
|
||||||
|
thd->reset_query();
|
||||||
|
free_root(thd->mem_root,MYF(MY_KEEP_PREALLOC));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int
|
||||||
|
Gtid_log_event::do_update_pos(Relay_log_info *rli)
|
||||||
|
{
|
||||||
|
rli->inc_event_relay_log_pos();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Log_event::enum_skip_reason
|
||||||
|
Gtid_log_event::do_shall_skip(Relay_log_info *rli)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
An event skipped due to @@skip_replication must not be counted towards the
|
||||||
|
number of events to be skipped due to @@sql_slave_skip_counter.
|
||||||
|
*/
|
||||||
|
if (flags & LOG_EVENT_SKIP_REPLICATION_F &&
|
||||||
|
opt_replicate_events_marked_for_skip != RPL_SKIP_REPLICATE)
|
||||||
|
return Log_event::EVENT_SKIP_IGNORE;
|
||||||
|
|
||||||
|
if (rli->slave_skip_counter > 0)
|
||||||
|
{
|
||||||
|
if (!(flags2 & FL_STANDALONE))
|
||||||
|
thd->variables.option_bits|= OPTION_BEGIN;
|
||||||
|
return Log_event::continue_group(rli);
|
||||||
|
}
|
||||||
|
return Log_event::do_shall_skip(rli);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* HAVE_REPLICATION */
|
||||||
|
|
||||||
|
#else /* !MYSQL_SERVER */
|
||||||
|
|
||||||
|
void
|
||||||
|
Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
|
||||||
|
{
|
||||||
|
Write_on_release_cache cache(&print_event_info->head_cache, file,
|
||||||
|
Write_on_release_cache::FLUSH_F);
|
||||||
|
char buf[21];
|
||||||
|
|
||||||
|
print_header(&cache, print_event_info, FALSE);
|
||||||
|
longlong10_to_str(seq_no, buf, 10);
|
||||||
|
if (!print_event_info->short_form)
|
||||||
|
{
|
||||||
|
my_b_printf(&cache, "\tGTID ");
|
||||||
|
if (domain_id)
|
||||||
|
my_b_printf(&cache, "%u-", domain_id);
|
||||||
|
my_b_printf(&cache, "%u-%s", server_id, buf);
|
||||||
|
}
|
||||||
|
my_b_printf(&cache, "\n");
|
||||||
|
|
||||||
|
if (!print_event_info->domain_id_printed ||
|
||||||
|
print_event_info->domain_id != domain_id)
|
||||||
|
{
|
||||||
|
my_b_printf(&cache, "/*!100001 SET @@session.gtid_domain_id=%u*/%s\n",
|
||||||
|
domain_id, print_event_info->delimiter);
|
||||||
|
print_event_info->domain_id= domain_id;
|
||||||
|
print_event_info->domain_id_printed= true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!print_event_info->server_id_printed ||
|
||||||
|
print_event_info->server_id != server_id)
|
||||||
|
{
|
||||||
|
my_b_printf(&cache, "/*!100001 SET @@session.server_id=%u*/%s\n",
|
||||||
|
server_id, print_event_info->delimiter);
|
||||||
|
print_event_info->server_id= server_id;
|
||||||
|
print_event_info->server_id_printed= true;
|
||||||
|
}
|
||||||
|
|
||||||
|
my_b_printf(&cache, "/*!100001 SET @@session.gtid_seq_no=%s*/%s\n",
|
||||||
|
buf, print_event_info->delimiter);
|
||||||
|
if (!(flags2 & FL_STANDALONE))
|
||||||
|
my_b_printf(&cache, "BEGIN%s\n", print_event_info->delimiter);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* MYSQL_SERVER */
|
||||||
|
|
||||||
|
|
||||||
|
/* GTID list. */
|
||||||
|
|
||||||
|
Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len,
|
||||||
|
const Format_description_log_event *description_event)
|
||||||
|
: Log_event(buf, description_event), count(0), list(0)
|
||||||
|
{
|
||||||
|
uint32 i;
|
||||||
|
uint8 header_size= description_event->common_header_len;
|
||||||
|
uint8 post_header_len= description_event->post_header_len[GTID_LIST_EVENT-1];
|
||||||
|
if (event_len < header_size + post_header_len ||
|
||||||
|
post_header_len < GTID_LIST_HEADER_LEN)
|
||||||
|
return;
|
||||||
|
|
||||||
|
buf+= header_size;
|
||||||
|
count= uint4korr(buf) & ((1<<28)-1);
|
||||||
|
buf+= 4;
|
||||||
|
if (count == 0 ||
|
||||||
|
event_len - (header_size + post_header_len) < count*element_size ||
|
||||||
|
(!(list= (rpl_gtid *)my_malloc(count*sizeof(*list), MYF(MY_WME)))))
|
||||||
|
return;
|
||||||
|
|
||||||
|
for (i= 0; i < count; ++i)
|
||||||
|
{
|
||||||
|
list[i].domain_id= uint4korr(buf);
|
||||||
|
buf+= 4;
|
||||||
|
list[i].server_id= uint4korr(buf);
|
||||||
|
buf+= 4;
|
||||||
|
list[i].seq_no= uint8korr(buf);
|
||||||
|
buf+= 8;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef MYSQL_SERVER
|
||||||
|
|
||||||
|
Gtid_list_log_event::Gtid_list_log_event(rpl_state *gtid_set)
|
||||||
|
: count(gtid_set->count()), list(0)
|
||||||
|
{
|
||||||
|
DBUG_ASSERT(count != 0);
|
||||||
|
|
||||||
|
/* Failure to allocate memory will be caught by is_valid() returning false. */
|
||||||
|
if (count != 0 && count < (1<<28) &&
|
||||||
|
(list = (rpl_gtid *)my_malloc(count * sizeof(*list), MYF(MY_WME))))
|
||||||
|
{
|
||||||
|
uint32 i;
|
||||||
|
|
||||||
|
for (i= 0; i < count; ++i)
|
||||||
|
list[i]= *(rpl_gtid *)my_hash_element(>id_set->hash, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
Gtid_list_log_event::write(IO_CACHE *file)
|
||||||
|
{
|
||||||
|
uint32 i;
|
||||||
|
uchar buf[element_size];
|
||||||
|
|
||||||
|
DBUG_ASSERT(count < 1<<28);
|
||||||
|
|
||||||
|
if (write_header(file, get_data_size()))
|
||||||
|
return 1;
|
||||||
|
int4store(buf, count & ((1<<28)-1));
|
||||||
|
if (wrapper_my_b_safe_write(file, buf, GTID_LIST_HEADER_LEN))
|
||||||
|
return 1;
|
||||||
|
for (i= 0; i < count; ++i)
|
||||||
|
{
|
||||||
|
int4store(buf, list[i].domain_id);
|
||||||
|
int4store(buf+4, list[i].server_id);
|
||||||
|
int8store(buf+8, list[i].seq_no);
|
||||||
|
if (wrapper_my_b_safe_write(file, buf, element_size))
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return write_footer(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef HAVE_REPLICATION
|
||||||
|
void
|
||||||
|
Gtid_list_log_event::pack_info(THD *thd, Protocol *protocol)
|
||||||
|
{
|
||||||
|
char buf_mem[1024];
|
||||||
|
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
|
||||||
|
uint32 i;
|
||||||
|
|
||||||
|
buf.length(0);
|
||||||
|
for (i= 0; i < count; ++i)
|
||||||
|
{
|
||||||
|
if (i)
|
||||||
|
buf.append(STRING_WITH_LEN(", "));
|
||||||
|
else
|
||||||
|
buf.append(STRING_WITH_LEN("["));
|
||||||
|
if (list[i].domain_id)
|
||||||
|
{
|
||||||
|
buf.append_ulonglong((ulonglong)list[i].domain_id);
|
||||||
|
buf.append(STRING_WITH_LEN("-"));
|
||||||
|
}
|
||||||
|
buf.append_ulonglong((ulonglong)list[i].server_id);
|
||||||
|
buf.append(STRING_WITH_LEN("-"));
|
||||||
|
buf.append_ulonglong(list[i].seq_no);
|
||||||
|
}
|
||||||
|
buf.append(STRING_WITH_LEN("]"));
|
||||||
|
|
||||||
|
protocol->store(&buf);
|
||||||
|
}
|
||||||
|
#endif /* HAVE_REPLICATION */
|
||||||
|
|
||||||
|
#else /* !MYSQL_SERVER */
|
||||||
|
|
||||||
|
void
|
||||||
|
Gtid_list_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
|
||||||
|
{
|
||||||
|
if (!print_event_info->short_form)
|
||||||
|
{
|
||||||
|
Write_on_release_cache cache(&print_event_info->head_cache, file,
|
||||||
|
Write_on_release_cache::FLUSH_F);
|
||||||
|
char buf[21];
|
||||||
|
uint32 i;
|
||||||
|
|
||||||
|
print_header(&cache, print_event_info, FALSE);
|
||||||
|
for (i= 0; i < count; ++i)
|
||||||
|
{
|
||||||
|
if (list[i].domain_id)
|
||||||
|
my_b_printf(&cache, "%u-", list[i].domain_id);
|
||||||
|
longlong10_to_str(list[i].seq_no, buf, 10);
|
||||||
|
my_b_printf(&cache, "%u-%s", list[i].server_id, buf);
|
||||||
|
if (i < count-1)
|
||||||
|
my_b_printf(&cache, "\n# ");
|
||||||
|
else
|
||||||
|
my_b_printf(&cache, "\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* MYSQL_SERVER */
|
||||||
|
|
||||||
|
|
||||||
/**************************************************************************
|
/**************************************************************************
|
||||||
Intvar_log_event methods
|
Intvar_log_event methods
|
||||||
**************************************************************************/
|
**************************************************************************/
|
||||||
@ -11236,7 +11693,9 @@ st_print_event_info::st_print_event_info()
|
|||||||
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
|
auto_increment_increment(0),auto_increment_offset(0), charset_inited(0),
|
||||||
lc_time_names_number(~0),
|
lc_time_names_number(~0),
|
||||||
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
|
charset_database_number(ILLEGAL_CHARSET_INFO_NUMBER),
|
||||||
thread_id(0), thread_id_printed(false), skip_replication(0),
|
thread_id(0), thread_id_printed(false), server_id(0),
|
||||||
|
server_id_printed(false), domain_id(0), domain_id_printed(false),
|
||||||
|
skip_replication(0),
|
||||||
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
|
base64_output_mode(BASE64_OUTPUT_UNSPEC), printed_fd_event(FALSE)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
235
sql/log_event.h
235
sql/log_event.h
@ -260,6 +260,8 @@ struct sql_ex_info
|
|||||||
#define HEARTBEAT_HEADER_LEN 0
|
#define HEARTBEAT_HEADER_LEN 0
|
||||||
#define ANNOTATE_ROWS_HEADER_LEN 0
|
#define ANNOTATE_ROWS_HEADER_LEN 0
|
||||||
#define BINLOG_CHECKPOINT_HEADER_LEN 4
|
#define BINLOG_CHECKPOINT_HEADER_LEN 4
|
||||||
|
#define GTID_HEADER_LEN 19
|
||||||
|
#define GTID_LIST_HEADER_LEN 4
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Max number of possible extra bytes in a replication event compared to a
|
Max number of possible extra bytes in a replication event compared to a
|
||||||
@ -599,16 +601,13 @@ enum enum_binlog_checksum_alg {
|
|||||||
because they mis-compute the offsets into the master's binlog).
|
because they mis-compute the offsets into the master's binlog).
|
||||||
*/
|
*/
|
||||||
#define MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES 2
|
#define MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES 2
|
||||||
/* MariaDB > 5.5, which knows about binlog_checkpoint_log_event. */
|
/* MariaDB >= 10.0, which knows about binlog_checkpoint_log_event. */
|
||||||
#define MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT 3
|
#define MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT 3
|
||||||
/*
|
/* MariaDB >= 10.0.1, which knows about global transaction id events. */
|
||||||
MariaDB server which understands MySQL 5.6 ignorable events. This server
|
#define MARIA_SLAVE_CAPABILITY_GTID 4
|
||||||
can tolerate receiving any event with the LOG_EVENT_IGNORABLE_F flag set.
|
|
||||||
*/
|
|
||||||
#define MARIA_SLAVE_CAPABILITY_IGNORABLE 4
|
|
||||||
|
|
||||||
/* Our capability. */
|
/* Our capability. */
|
||||||
#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT
|
#define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_GTID
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -694,6 +693,18 @@ enum Log_event_type
|
|||||||
that are prepared in storage engines but not yet committed.
|
that are prepared in storage engines but not yet committed.
|
||||||
*/
|
*/
|
||||||
BINLOG_CHECKPOINT_EVENT= 161,
|
BINLOG_CHECKPOINT_EVENT= 161,
|
||||||
|
/*
|
||||||
|
Gtid event. For global transaction ID, used to start a new event group,
|
||||||
|
instead of the old BEGIN query event, and also to mark stand-alone
|
||||||
|
events.
|
||||||
|
*/
|
||||||
|
GTID_EVENT= 162,
|
||||||
|
/*
|
||||||
|
Gtid list event. Logged at the start of every binlog, to record the
|
||||||
|
current replication state. This consists of the last GTID seen for
|
||||||
|
each replication domain.
|
||||||
|
*/
|
||||||
|
GTID_LIST_EVENT= 163,
|
||||||
|
|
||||||
/* Add new MariaDB events here - right above this comment! */
|
/* Add new MariaDB events here - right above this comment! */
|
||||||
|
|
||||||
@ -766,6 +777,11 @@ typedef struct st_print_event_info
|
|||||||
uint charset_database_number;
|
uint charset_database_number;
|
||||||
uint thread_id;
|
uint thread_id;
|
||||||
bool thread_id_printed;
|
bool thread_id_printed;
|
||||||
|
uint32 server_id;
|
||||||
|
bool server_id_printed;
|
||||||
|
uint32 domain_id;
|
||||||
|
bool domain_id_printed;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Track when @@skip_replication changes so we need to output a SET
|
Track when @@skip_replication changes so we need to output a SET
|
||||||
statement for it.
|
statement for it.
|
||||||
@ -1874,6 +1890,7 @@ public:
|
|||||||
}
|
}
|
||||||
Log_event_type get_type_code() { return QUERY_EVENT; }
|
Log_event_type get_type_code() { return QUERY_EVENT; }
|
||||||
static int dummy_event(String *packet, ulong ev_offset, uint8 checksum_alg);
|
static int dummy_event(String *packet, ulong ev_offset, uint8 checksum_alg);
|
||||||
|
static int begin_event(String *packet, ulong ev_offset, uint8 checksum_alg);
|
||||||
#ifdef MYSQL_SERVER
|
#ifdef MYSQL_SERVER
|
||||||
bool write(IO_CACHE* file);
|
bool write(IO_CACHE* file);
|
||||||
virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; }
|
virtual bool write_post_header_for_derived(IO_CACHE* file) { return FALSE; }
|
||||||
@ -2927,6 +2944,210 @@ public:
|
|||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct rpl_gtid
|
||||||
|
{
|
||||||
|
uint32 domain_id;
|
||||||
|
uint32 server_id;
|
||||||
|
uint64 seq_no;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct rpl_state
|
||||||
|
{
|
||||||
|
HASH hash;
|
||||||
|
|
||||||
|
rpl_state();
|
||||||
|
~rpl_state();
|
||||||
|
|
||||||
|
ulong count() const { return hash.records; }
|
||||||
|
int update(const struct rpl_gtid *gtid);
|
||||||
|
};
|
||||||
|
|
||||||
|
extern rpl_state global_rpl_gtid_state;
|
||||||
|
|
||||||
|
/**
|
||||||
|
@class Gtid_log_event
|
||||||
|
|
||||||
|
This event is logged as part of every event group to give the global
|
||||||
|
transaction id (GTID) of that group.
|
||||||
|
|
||||||
|
It replaces the BEGIN query event used in earlier versions to begin most
|
||||||
|
event groups, but is also used for events that used to be stand-alone.
|
||||||
|
|
||||||
|
@section Gtid_log_event_binary_format Binary Format
|
||||||
|
|
||||||
|
The binary format for Gtid_log_event has 6 extra reserved bytes to make the
|
||||||
|
length a total of 19 byte (+ 19 bytes of header in common with all events).
|
||||||
|
This is just the minimal size for a BEGIN query event, which makes it easy
|
||||||
|
to replace this event with such BEGIN event to remain compatible with old
|
||||||
|
slave servers.
|
||||||
|
|
||||||
|
<table>
|
||||||
|
<caption>Post-Header</caption>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<th>Name</th>
|
||||||
|
<th>Format</th>
|
||||||
|
<th>Description</th>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>seq_no</td>
|
||||||
|
<td>8 byte unsigned integer</td>
|
||||||
|
<td>increasing id within one server_id. Starts at 1, holes in the sequence
|
||||||
|
may occur</td>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>domain_id</td>
|
||||||
|
<td>4 byte unsigned integer</td>
|
||||||
|
<td>Replication domain id, identifying independent replication streams></td>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>flags</td>
|
||||||
|
<td>1 byte bitfield</td>
|
||||||
|
<td>Bit 0 set indicates stand-alone event (no terminating COMMIT)</td>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>Reserved</td>
|
||||||
|
<td>6 bytes</td>
|
||||||
|
<td>Reserved bytes, set to 0. Maybe be used for future expansion.</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
|
||||||
|
The Body of Gtid_log_event is empty. The total event size is 19 bytes +
|
||||||
|
the normal 19 bytes common-header.
|
||||||
|
*/
|
||||||
|
|
||||||
|
class Gtid_log_event: public Log_event
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
uint64 seq_no;
|
||||||
|
uint32 domain_id;
|
||||||
|
uchar flags2;
|
||||||
|
|
||||||
|
/* Flags2. */
|
||||||
|
|
||||||
|
/* FL_STANDALONE is set when there is no terminating COMMIT event. */
|
||||||
|
static const uchar FL_STANDALONE= 1;
|
||||||
|
|
||||||
|
#ifdef MYSQL_SERVER
|
||||||
|
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
|
||||||
|
uint16 flags, bool is_transactional);
|
||||||
|
#ifdef HAVE_REPLICATION
|
||||||
|
void pack_info(THD *thd, Protocol *protocol);
|
||||||
|
virtual int do_apply_event(Relay_log_info const *rli);
|
||||||
|
virtual int do_update_pos(Relay_log_info *rli);
|
||||||
|
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
|
||||||
|
#endif
|
||||||
|
#else
|
||||||
|
void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
|
||||||
|
#endif
|
||||||
|
Gtid_log_event(const char *buf, uint event_len,
|
||||||
|
const Format_description_log_event *description_event);
|
||||||
|
~Gtid_log_event() { }
|
||||||
|
Log_event_type get_type_code() { return GTID_EVENT; }
|
||||||
|
int get_data_size() { return GTID_HEADER_LEN; }
|
||||||
|
bool is_valid() const { return seq_no != 0; }
|
||||||
|
#ifdef MYSQL_SERVER
|
||||||
|
bool write(IO_CACHE *file);
|
||||||
|
static int make_compatible_event(String *packet, bool *need_dummy_event,
|
||||||
|
ulong ev_offset, uint8 checksum_alg);
|
||||||
|
#endif
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
@class Gtid_list_log_event
|
||||||
|
|
||||||
|
This event is logged at the start of every binlog file to record the
|
||||||
|
current replication state: the last global transaction id (GTID) applied
|
||||||
|
on the server within each replication domain.
|
||||||
|
|
||||||
|
It consists of a list of GTIDs, one for each replication domain ever seen
|
||||||
|
on the server.
|
||||||
|
|
||||||
|
@section Gtid_list_log_event_binary_format Binary Format
|
||||||
|
|
||||||
|
<table>
|
||||||
|
<caption>Post-Header</caption>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<th>Name</th>
|
||||||
|
<th>Format</th>
|
||||||
|
<th>Description</th>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>count</td>
|
||||||
|
<td>4 byte unsigned integer</td>
|
||||||
|
<td>The lower 28 bits are the number of GTIDs. The upper 4 bits are
|
||||||
|
reserved for flags bits for future expansion</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
|
||||||
|
<table>
|
||||||
|
<caption>Body</caption>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<th>Name</th>
|
||||||
|
<th>Format</th>
|
||||||
|
<th>Description</th>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>domain_id</td>
|
||||||
|
<td>4 byte unsigned integer</td>
|
||||||
|
<td>Replication domain id of one GTID</td>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>server_id</td>
|
||||||
|
<td>4 byte unsigned integer</td>
|
||||||
|
<td>Server id of one GTID</td>
|
||||||
|
</tr>
|
||||||
|
|
||||||
|
<tr>
|
||||||
|
<td>seq_no</td>
|
||||||
|
<td>8 byte unsigned integer</td>
|
||||||
|
<td>sequence number of one GTID</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
|
||||||
|
The three elements in the body repeat COUNT times to form the GTID list.
|
||||||
|
*/
|
||||||
|
|
||||||
|
class Gtid_list_log_event: public Log_event
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
uint32 count;
|
||||||
|
struct rpl_gtid *list;
|
||||||
|
|
||||||
|
static const uint element_size= 4+4+8;
|
||||||
|
|
||||||
|
#ifdef MYSQL_SERVER
|
||||||
|
Gtid_list_log_event(rpl_state *gtid_set);
|
||||||
|
#ifdef HAVE_REPLICATION
|
||||||
|
void pack_info(THD *thd, Protocol *protocol);
|
||||||
|
#endif
|
||||||
|
#else
|
||||||
|
void print(FILE *file, PRINT_EVENT_INFO *print_event_info);
|
||||||
|
#endif
|
||||||
|
Gtid_list_log_event(const char *buf, uint event_len,
|
||||||
|
const Format_description_log_event *description_event);
|
||||||
|
~Gtid_list_log_event() { my_free(list); }
|
||||||
|
Log_event_type get_type_code() { return GTID_LIST_EVENT; }
|
||||||
|
int get_data_size() { return GTID_LIST_HEADER_LEN + count*element_size; }
|
||||||
|
bool is_valid() const { return list != NULL; }
|
||||||
|
#ifdef MYSQL_SERVER
|
||||||
|
bool write(IO_CACHE *file);
|
||||||
|
#endif
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/* the classes below are for the new LOAD DATA INFILE logging */
|
/* the classes below are for the new LOAD DATA INFILE logging */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -675,6 +675,8 @@ mysql_mutex_t
|
|||||||
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
|
mysql_mutex_t LOCK_stats, LOCK_global_user_client_stats,
|
||||||
LOCK_global_table_stats, LOCK_global_index_stats;
|
LOCK_global_table_stats, LOCK_global_index_stats;
|
||||||
|
|
||||||
|
mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
The below lock protects access to two global server variables:
|
The below lock protects access to two global server variables:
|
||||||
max_prepared_stmt_count and prepared_stmt_count. These variables
|
max_prepared_stmt_count and prepared_stmt_count. These variables
|
||||||
@ -770,6 +772,8 @@ PSI_mutex_key key_LOCK_stats,
|
|||||||
key_LOCK_global_index_stats,
|
key_LOCK_global_index_stats,
|
||||||
key_LOCK_wakeup_ready;
|
key_LOCK_wakeup_ready;
|
||||||
|
|
||||||
|
PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state;
|
||||||
|
|
||||||
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
|
PSI_mutex_key key_LOCK_prepare_ordered, key_LOCK_commit_ordered;
|
||||||
|
|
||||||
static PSI_mutex_info all_server_mutexes[]=
|
static PSI_mutex_info all_server_mutexes[]=
|
||||||
@ -813,6 +817,8 @@ static PSI_mutex_info all_server_mutexes[]=
|
|||||||
{ &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
|
{ &key_LOCK_global_table_stats, "LOCK_global_table_stats", PSI_FLAG_GLOBAL},
|
||||||
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
|
{ &key_LOCK_global_index_stats, "LOCK_global_index_stats", PSI_FLAG_GLOBAL},
|
||||||
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
|
{ &key_LOCK_wakeup_ready, "THD::LOCK_wakeup_ready", 0},
|
||||||
|
{ &key_LOCK_gtid_counter, "LOCK_gtid_counter", PSI_FLAG_GLOBAL},
|
||||||
|
{ &key_LOCK_rpl_gtid_state, "LOCK_rpl_gtid_state", PSI_FLAG_GLOBAL},
|
||||||
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
|
{ &key_LOCK_thd_data, "THD::LOCK_thd_data", 0},
|
||||||
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
|
{ &key_LOCK_user_conn, "LOCK_user_conn", PSI_FLAG_GLOBAL},
|
||||||
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
|
{ &key_LOCK_uuid_short_generator, "LOCK_uuid_short_generator", PSI_FLAG_GLOBAL},
|
||||||
@ -1279,6 +1285,12 @@ struct st_VioSSLFd *ssl_acceptor_fd;
|
|||||||
*/
|
*/
|
||||||
uint connection_count= 0, extra_connection_count= 0;
|
uint connection_count= 0, extra_connection_count= 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
Running counter for generating new GTIDs locally.
|
||||||
|
*/
|
||||||
|
uint64 global_gtid_counter= 0;
|
||||||
|
|
||||||
|
|
||||||
/* Function declarations */
|
/* Function declarations */
|
||||||
|
|
||||||
pthread_handler_t signal_hand(void *arg);
|
pthread_handler_t signal_hand(void *arg);
|
||||||
@ -1940,6 +1952,8 @@ static void clean_up_mutexes()
|
|||||||
mysql_mutex_destroy(&LOCK_global_user_client_stats);
|
mysql_mutex_destroy(&LOCK_global_user_client_stats);
|
||||||
mysql_mutex_destroy(&LOCK_global_table_stats);
|
mysql_mutex_destroy(&LOCK_global_table_stats);
|
||||||
mysql_mutex_destroy(&LOCK_global_index_stats);
|
mysql_mutex_destroy(&LOCK_global_index_stats);
|
||||||
|
mysql_mutex_destroy(&LOCK_gtid_counter);
|
||||||
|
mysql_mutex_destroy(&LOCK_rpl_gtid_state);
|
||||||
#ifdef HAVE_OPENSSL
|
#ifdef HAVE_OPENSSL
|
||||||
mysql_mutex_destroy(&LOCK_des_key_file);
|
mysql_mutex_destroy(&LOCK_des_key_file);
|
||||||
#ifndef HAVE_YASSL
|
#ifndef HAVE_YASSL
|
||||||
@ -4002,6 +4016,10 @@ static int init_thread_environment()
|
|||||||
&LOCK_global_table_stats, MY_MUTEX_INIT_FAST);
|
&LOCK_global_table_stats, MY_MUTEX_INIT_FAST);
|
||||||
mysql_mutex_init(key_LOCK_global_index_stats,
|
mysql_mutex_init(key_LOCK_global_index_stats,
|
||||||
&LOCK_global_index_stats, MY_MUTEX_INIT_FAST);
|
&LOCK_global_index_stats, MY_MUTEX_INIT_FAST);
|
||||||
|
mysql_mutex_init(key_LOCK_gtid_counter,
|
||||||
|
&LOCK_gtid_counter, MY_MUTEX_INIT_FAST);
|
||||||
|
mysql_mutex_init(key_LOCK_rpl_gtid_state,
|
||||||
|
&LOCK_rpl_gtid_state, MY_MUTEX_INIT_SLOW);
|
||||||
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
|
mysql_mutex_init(key_LOCK_prepare_ordered, &LOCK_prepare_ordered,
|
||||||
MY_MUTEX_INIT_SLOW);
|
MY_MUTEX_INIT_SLOW);
|
||||||
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
|
mysql_mutex_init(key_LOCK_commit_ordered, &LOCK_commit_ordered,
|
||||||
|
@ -252,6 +252,8 @@ extern PSI_mutex_key key_LOCK_stats,
|
|||||||
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
|
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
|
||||||
key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
|
key_LOCK_global_index_stats, key_LOCK_wakeup_ready;
|
||||||
|
|
||||||
|
extern PSI_mutex_key key_LOCK_gtid_counter, key_LOCK_rpl_gtid_state;
|
||||||
|
|
||||||
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
|
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
|
||||||
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
|
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
|
||||||
key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock;
|
key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock;
|
||||||
@ -341,6 +343,7 @@ extern mysql_mutex_t
|
|||||||
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
|
LOCK_slave_list, LOCK_active_mi, LOCK_manager,
|
||||||
LOCK_global_system_variables, LOCK_user_conn,
|
LOCK_global_system_variables, LOCK_user_conn,
|
||||||
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
|
LOCK_prepared_stmt_count, LOCK_error_messages, LOCK_connection_count;
|
||||||
|
extern mysql_mutex_t LOCK_gtid_counter, LOCK_rpl_gtid_state;
|
||||||
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
|
extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_thread_count;
|
||||||
#ifdef HAVE_OPENSSL
|
#ifdef HAVE_OPENSSL
|
||||||
extern mysql_mutex_t LOCK_des_key_file;
|
extern mysql_mutex_t LOCK_des_key_file;
|
||||||
@ -546,6 +549,7 @@ inline int set_current_thd(THD *thd)
|
|||||||
extern handlerton *maria_hton;
|
extern handlerton *maria_hton;
|
||||||
|
|
||||||
extern uint extra_connection_count;
|
extern uint extra_connection_count;
|
||||||
|
extern uint64 global_gtid_counter;
|
||||||
extern my_bool opt_userstat_running, debug_assert_if_crashed_table;
|
extern my_bool opt_userstat_running, debug_assert_if_crashed_table;
|
||||||
extern uint mysqld_extra_port;
|
extern uint mysqld_extra_port;
|
||||||
extern ulong opt_progress_report_time;
|
extern ulong opt_progress_report_time;
|
||||||
|
@ -639,7 +639,7 @@ bool check_master_connection_name(LEX_STRING *name)
|
|||||||
file names without a prefix.
|
file names without a prefix.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void create_logfile_name_with_suffix(char *res_file_name, uint length,
|
void create_logfile_name_with_suffix(char *res_file_name, size_t length,
|
||||||
const char *info_file, bool append,
|
const char *info_file, bool append,
|
||||||
LEX_STRING *suffix)
|
LEX_STRING *suffix)
|
||||||
{
|
{
|
||||||
|
@ -170,7 +170,7 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
bool check_master_connection_name(LEX_STRING *name);
|
bool check_master_connection_name(LEX_STRING *name);
|
||||||
void create_logfile_name_with_suffix(char *res_file_name, uint length,
|
void create_logfile_name_with_suffix(char *res_file_name, size_t length,
|
||||||
const char *info_file,
|
const char *info_file,
|
||||||
bool append,
|
bool append,
|
||||||
LEX_STRING *suffix);
|
LEX_STRING *suffix);
|
||||||
|
@ -534,6 +534,12 @@ typedef struct system_variables
|
|||||||
thread the query is being run to replicate temp tables properly
|
thread the query is being run to replicate temp tables properly
|
||||||
*/
|
*/
|
||||||
my_thread_id pseudo_thread_id;
|
my_thread_id pseudo_thread_id;
|
||||||
|
/**
|
||||||
|
When replicating an event group with GTID, keep these values around so
|
||||||
|
slave binlog can receive the same GTID as the original.
|
||||||
|
*/
|
||||||
|
uint32 gtid_domain_id;
|
||||||
|
uint64 gtid_seq_no;
|
||||||
/**
|
/**
|
||||||
Place holders to store Multi-source variables in sys_var.cc during
|
Place holders to store Multi-source variables in sys_var.cc during
|
||||||
update and show of variables.
|
update and show of variables.
|
||||||
|
@ -616,10 +616,34 @@ send_event_to_slave(THD *thd, NET *net, String* const packet, ushort flags,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Do not send binlog checkpoint events to a slave that does not understand it.
|
Replace GTID events with old-style BEGIN events for slaves that do not
|
||||||
|
understand global transaction IDs. For stand-alone events, where there is
|
||||||
|
no terminating COMMIT query event, omit the GTID event or replace it with
|
||||||
|
a dummy event, as appropriate.
|
||||||
*/
|
*/
|
||||||
if (unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
|
if (event_type == GTID_EVENT &&
|
||||||
mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT)
|
mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID)
|
||||||
|
{
|
||||||
|
bool need_dummy=
|
||||||
|
mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES;
|
||||||
|
bool err= Gtid_log_event::make_compatible_event(packet, &need_dummy,
|
||||||
|
ev_offset,
|
||||||
|
current_checksum_alg);
|
||||||
|
if (err)
|
||||||
|
return "Failed to replace GTID event with backwards-compatible event: "
|
||||||
|
"currupt event.";
|
||||||
|
if (!need_dummy)
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Do not send binlog checkpoint or gtid list events to a slave that does not
|
||||||
|
understand it.
|
||||||
|
*/
|
||||||
|
if ((unlikely(event_type == BINLOG_CHECKPOINT_EVENT) &&
|
||||||
|
mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_BINLOG_CHECKPOINT) ||
|
||||||
|
(unlikely(event_type == GTID_LIST_EVENT) &&
|
||||||
|
mariadb_slave_capability < MARIA_SLAVE_CAPABILITY_GTID))
|
||||||
{
|
{
|
||||||
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
|
if (mariadb_slave_capability >= MARIA_SLAVE_CAPABILITY_TOLERATE_HOLES)
|
||||||
{
|
{
|
||||||
|
@ -1201,6 +1201,29 @@ static Sys_var_ulong Sys_pseudo_thread_id(
|
|||||||
BLOCK_SIZE(1), NO_MUTEX_GUARD, IN_BINLOG,
|
BLOCK_SIZE(1), NO_MUTEX_GUARD, IN_BINLOG,
|
||||||
ON_CHECK(check_has_super));
|
ON_CHECK(check_has_super));
|
||||||
|
|
||||||
|
static Sys_var_uint Sys_gtid_domain_id(
|
||||||
|
"gtid_domain_id",
|
||||||
|
"Used with global transaction ID to identify logically independent "
|
||||||
|
"replication streams. When events can propagate through multiple "
|
||||||
|
"parallel paths (for example multiple masters), each independent "
|
||||||
|
"source server must use a distinct domain_id. For simple tree-shaped "
|
||||||
|
"replication topologies, it can be left at its default, 0.",
|
||||||
|
SESSION_VAR(gtid_domain_id),
|
||||||
|
CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, UINT_MAX32), DEFAULT(0),
|
||||||
|
BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG,
|
||||||
|
ON_CHECK(check_has_super));
|
||||||
|
|
||||||
|
static Sys_var_ulonglong Sys_gtid_seq_no(
|
||||||
|
"gtid_seq_no",
|
||||||
|
"Internal server usage, for replication with global transaction id. "
|
||||||
|
"When set, next event group logged to the binary log will use this "
|
||||||
|
"sequence number, not generate a new one, thus allowing to preserve "
|
||||||
|
"master's GTID in slave's binlog.",
|
||||||
|
SESSION_ONLY(gtid_seq_no),
|
||||||
|
NO_CMD_LINE, VALID_RANGE(0, ULONGLONG_MAX), DEFAULT(0),
|
||||||
|
BLOCK_SIZE(1), NO_MUTEX_GUARD, NOT_IN_BINLOG,
|
||||||
|
ON_CHECK(check_has_super));
|
||||||
|
|
||||||
static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type)
|
static bool fix_max_join_size(sys_var *self, THD *thd, enum_var_type type)
|
||||||
{
|
{
|
||||||
SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables;
|
SV *sv= type == OPT_GLOBAL ? &global_system_variables : &thd->variables;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user