From 1d35777647a809992d5305dc7f3082ee801fa28a Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 25 Jan 2013 15:21:49 +0100 Subject: [PATCH] MDEV-26: Global transaction ID. When starting slave, check binlog state in addition to mysql.rpl_slave.state. This allows to switch a previous master to be a slave directly with MASTER_GTID_POS=AUTO. --- sql/log.cc | 12 +- sql/log.h | 2 + sql/log_event.cc | 303 ++++++++++++++++++++++++++++++++++++++--------- sql/log_event.h | 25 +++- sql/slave.cc | 21 +++- 5 files changed, 297 insertions(+), 66 deletions(-) diff --git a/sql/log.cc b/sql/log.cc index e82f25bdc50..33e10acba8f 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -120,7 +120,7 @@ static MYSQL_BIN_LOG::xid_count_per_binlog * static bool start_binlog_background_thread(); -rpl_binlog_state rpl_global_gtid_binlog_state; +static rpl_binlog_state rpl_global_gtid_binlog_state; /** purge logs, master and slave sides both, related error code @@ -5488,6 +5488,13 @@ end: } +int +MYSQL_BIN_LOG::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + return rpl_global_gtid_binlog_state.get_most_recent_gtid_list(list, size); +} + + /** Write an event to the binary log. If with_annotate != NULL and *with_annotate = TRUE write also Annotate_rows before the event @@ -8176,8 +8183,7 @@ int TC_LOG_BINLOG::open(const char *opt_name) else error= read_state_from_file(); /* Pick the next unused seq_no from the loaded/recovered binlog state. */ - global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_for_server_id - (global_system_variables.server_id); + global_gtid_counter= rpl_global_gtid_binlog_state.seq_no_from_state(); delete ev; end_io_cache(&log); diff --git a/sql/log.h b/sql/log.h index 18edea88a7f..50374fb8992 100644 --- a/sql/log.h +++ b/sql/log.h @@ -396,6 +396,7 @@ private: ( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID ) class binlog_cache_mngr; +class rpl_gtid; class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG { private: @@ -773,6 +774,7 @@ public: bool write_gtid_event(THD *thd, bool standalone, bool is_transactional); int read_state_from_file(); int write_state_to_file(); + int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); }; class Log_event_handler diff --git a/sql/log_event.cc b/sql/log_event.cc index 3f008b30aa7..56fff03d411 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -6279,6 +6279,24 @@ rpl_slave_state::next_subid(uint32 domain_id) #endif +static +bool +rpl_slave_state_tostring_helper(String *dest, rpl_gtid *gtid, bool *first) +{ + if (*first) + *first= false; + else + if (dest->append(",",1)) + return true; + return + dest->append_ulonglong(gtid->domain_id) || + dest->append("-",1) || + dest->append_ulonglong(gtid->server_id) || + dest->append("-",1) || + dest->append_ulonglong(gtid->seq_no); +} + + /* Prepare the current slave state as a string, suitable for sending to the master to request to receive binlog events starting from that GTID state. @@ -6288,10 +6306,20 @@ rpl_slave_state::next_subid(uint32 domain_id) */ int -rpl_slave_state::tostring(String *dest) +rpl_slave_state::tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra) { bool first= true; uint32 i; + HASH gtid_hash; + uchar *rec; + rpl_gtid *gtid; + int res= 1; + + my_hash_init(>id_hash, &my_charset_bin, 32, offsetof(rpl_gtid, domain_id), + sizeof(uint32), NULL, NULL, HASH_UNIQUE); + for (i= 0; i < num_extra; ++i) + if (my_hash_insert(>id_hash, (uchar *)(&extra_gtids[i]))) + goto err; lock(); @@ -6319,19 +6347,43 @@ rpl_slave_state::tostring(String *dest) } } - if (first) - first= false; - else - dest->append("-",1); - dest->append_ulonglong(best_gtid.domain_id); - dest->append("-",1); - dest->append_ulonglong(best_gtid.server_id); - dest->append("-",1); - dest->append_ulonglong(best_gtid.seq_no); - } + /* Check if we have something newer in the extra list. */ + rec= my_hash_search(>id_hash, (const uchar *)&best_gtid.domain_id, 0); + if (rec) + { + gtid= (rpl_gtid *)rec; + if (gtid->seq_no > best_gtid.seq_no) + memcpy(&best_gtid, gtid, sizeof(best_gtid)); + if (my_hash_delete(>id_hash, rec)) + { + unlock(); + goto err; + } + } + + if (rpl_slave_state_tostring_helper(dest, &best_gtid, &first)) + { + unlock(); + goto err; + } + } unlock(); - return 0; + + /* Also add any remaining extra domain_ids. */ + for (i= 0; i < gtid_hash.records; ++i) + { + gtid= (rpl_gtid *)my_hash_element(>id_hash, i); + if (rpl_slave_state_tostring_helper(dest, gtid, &first)) + goto err; + } + + res= 0; + +err: + my_hash_free(>id_hash); + + return res; } @@ -6359,18 +6411,28 @@ rpl_slave_state::is_empty() rpl_binlog_state::rpl_binlog_state() { - my_hash_init(&hash, &my_charset_bin, 32, - offsetof(rpl_gtid, domain_id), 2*sizeof(uint32), NULL, my_free, - HASH_UNIQUE); + my_hash_init(&hash, &my_charset_bin, 32, offsetof(element, domain_id), + sizeof(uint32), NULL, my_free, HASH_UNIQUE); mysql_mutex_init(key_LOCK_binlog_state, &LOCK_binlog_state, MY_MUTEX_INIT_SLOW); } +void +rpl_binlog_state::reset() +{ + uint32 i; + + for (i= 0; i < hash.records; ++i) + my_hash_free(&((element *)my_hash_element(&hash, i))->hash); + my_hash_reset(&hash); +} + rpl_binlog_state::~rpl_binlog_state() { - mysql_mutex_destroy(&LOCK_binlog_state); + reset(); my_hash_free(&hash); + mysql_mutex_destroy(&LOCK_binlog_state); } @@ -6385,67 +6447,129 @@ rpl_binlog_state::~rpl_binlog_state() int rpl_binlog_state::update(const struct rpl_gtid *gtid) { - uchar *rec; + rpl_gtid *lookup_gtid; + element *elem; - rec= my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); - if (rec) + elem= (element *)my_hash_search(&hash, (const uchar *)(>id->domain_id), 0); + if (elem) { - const rpl_gtid *old_gtid= (const rpl_gtid *)rec; - if (old_gtid->seq_no > gtid->seq_no) - sql_print_warning("Out-of-order GTIDs detected for " - "domain_id=%u, server_id=%u. " - "Please ensure that independent replication streams " - "use different replication domain_id to avoid " - "inconsistencies.", gtid->domain_id, gtid->server_id); - else - memcpy(rec, gtid, sizeof(*gtid)); + /* + By far the most common case is that successive events within same + replication domain have the same server id (it changes only when + switching to a new master). So save a hash lookup in this case. + */ + if (likely(elem->last_gtid->server_id == gtid->server_id)) + { + elem->last_gtid->seq_no= gtid->seq_no; + return 0; + } + + lookup_gtid= (rpl_gtid *) + my_hash_search(&elem->hash, (const uchar *)>id->server_id, 0); + if (lookup_gtid) + { + lookup_gtid->seq_no= gtid->seq_no; + elem->last_gtid= lookup_gtid; + return 0; + } + + /* Allocate a new GTID and insert it. */ + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (!lookup_gtid) + return 1; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) + { + my_free(lookup_gtid); + return 1; + } + elem->last_gtid= lookup_gtid; return 0; } - if (!(rec= (uchar *)my_malloc(sizeof(*gtid), MYF(MY_WME)))) - return 1; - memcpy(rec, gtid, sizeof(*gtid)); - return my_hash_insert(&hash, rec); -} + /* First time we see this domain_id; allocate a new element. */ + elem= (element *)my_malloc(sizeof(*elem), MYF(MY_WME)); + lookup_gtid= (rpl_gtid *)my_malloc(sizeof(*lookup_gtid), MYF(MY_WME)); + if (elem && lookup_gtid) + { + elem->domain_id= gtid->domain_id; + my_hash_init(&elem->hash, &my_charset_bin, 32, + offsetof(rpl_gtid, server_id), sizeof(uint32), NULL, my_free, + HASH_UNIQUE); + elem->last_gtid= lookup_gtid; + memcpy(lookup_gtid, gtid, sizeof(*lookup_gtid)); + if (0 == my_hash_insert(&elem->hash, (const uchar *)lookup_gtid)) + { + lookup_gtid= NULL; /* Do not free. */ + if (0 == my_hash_insert(&hash, (const uchar *)elem)) + return 0; + } + my_hash_free(&elem->hash); + } - -void -rpl_binlog_state::reset() -{ - my_hash_reset(&hash); + /* An error. */ + if (elem) + my_free(elem); + if (lookup_gtid) + my_free(lookup_gtid); + return 1; } uint32 -rpl_binlog_state::seq_no_for_server_id(uint32 server_id) +rpl_binlog_state::seq_no_from_state() { - ulong i; + ulong i, j; uint64 seq_no= 0; for (i= 0; i < hash.records; ++i) { - const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); - if (gtid->server_id == server_id && gtid->seq_no > seq_no) - seq_no= gtid->seq_no; + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j < e->hash.records; ++j) + { + const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid->seq_no > seq_no) + seq_no= gtid->seq_no; + } } return seq_no; } +/* + Write binlog state to text file, so we can read it in again without having + to scan last binlog file (normal shutdown/startup, not crash recovery). + + The most recent GTID within each domain_id is written after any other GTID + within this domain. +*/ int rpl_binlog_state::write_to_iocache(IO_CACHE *dest) { - ulong i; + ulong i, j; char buf[21]; - for (i= 0; i < count(); ++i) + for (i= 0; i < hash.records; ++i) { size_t res; - const rpl_gtid *gtid= (const rpl_gtid *)my_hash_element(&hash, i); - longlong10_to_str(gtid->seq_no, buf, 10); - res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); - if (res == (size_t) -1) - return 1; + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (const rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + longlong10_to_str(gtid->seq_no, buf, 10); + res= my_b_printf(dest, "%u-%u-%s\n", gtid->domain_id, gtid->server_id, buf); + if (res == (size_t) -1) + return 1; + } } return 0; @@ -6861,6 +6985,78 @@ Gtid_list_log_event::Gtid_list_log_event(const char *buf, uint event_len, } +uint32 +rpl_binlog_state::count() +{ + uint32 c= 0; + uint32 i; + + for (i= 0; i < hash.records; ++i) + c+= ((element *)my_hash_element(&hash, i))->hash.records; + + return c; +} + + +int +rpl_binlog_state::get_gtid_list(rpl_gtid *gtid_list, uint32 list_size) +{ + uint32 i, j, pos; + + pos= 0; + for (i= 0; i < hash.records; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + for (j= 0; j <= e->hash.records; ++j) + { + const rpl_gtid *gtid; + if (j < e->hash.records) + { + gtid= (rpl_gtid *)my_hash_element(&e->hash, j); + if (gtid == e->last_gtid) + continue; + } + else + gtid= e->last_gtid; + + if (pos >= list_size) + return 1; + memcpy(>id_list[pos++], gtid, sizeof(*gtid)); + } + } + + return 0; +} + + +/* + Get a list of the most recently binlogged GTID, for each domain_id. + + This can be used when switching from being a master to being a slave, + to know where to start replicating from the new master. + + The returned list must be de-allocated with my_free(). + + Returns 0 for ok, non-zero for out-of-memory. +*/ +int +rpl_binlog_state::get_most_recent_gtid_list(rpl_gtid **list, uint32 *size) +{ + uint32 i; + + *size= hash.records; + if (!(*list= (rpl_gtid *)my_malloc(*size * sizeof(rpl_gtid), MYF(MY_WME)))) + return 1; + for (i= 0; i < *size; ++i) + { + element *e= (element *)my_hash_element(&hash, i); + memcpy(&((*list)[i]), e->last_gtid, sizeof(rpl_gtid)); + } + + return 0; +} + + #ifdef MYSQL_SERVER Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) @@ -6871,12 +7067,7 @@ Gtid_list_log_event::Gtid_list_log_event(rpl_binlog_state *gtid_set) /* Failure to allocate memory will be caught by is_valid() returning false. */ if (count != 0 && count < (1<<28) && (list = (rpl_gtid *)my_malloc(count * sizeof(*list), MYF(MY_WME)))) - { - uint32 i; - - for (i= 0; i < count; ++i) - list[i]= *(rpl_gtid *)my_hash_element(>id_set->hash, i); - } + gtid_set->get_gtid_list(list, count); } bool diff --git a/sql/log_event.h b/sql/log_event.h index 6d9b4172858..ff7460e7ef6 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -2981,7 +2981,7 @@ struct rpl_slave_state uint32 domain_id; list_element *grab_list() { list_element *l= list; list= NULL; return l; } - void add (list_element *l) + void add(list_element *l) { l->next= list; list= l; @@ -3008,7 +3008,7 @@ struct rpl_slave_state int record_gtid(THD *thd, const rpl_gtid *gtid, uint64 sub_id, bool in_transaction); uint64 next_subid(uint32 domain_id); - int tostring(String *dest); + int tostring(String *dest, rpl_gtid *extra_gtids, uint32 num_extra); bool is_empty(); void lock() { DBUG_ASSERT(inited); mysql_mutex_lock(&LOCK_slave_state); } @@ -3027,10 +3027,21 @@ struct rpl_slave_state containing a gigen GTID, by simply scanning backwards from the newest one until a lower seq_no is found in the Gtid_list_log_event at the start of a binlog for the given domain_id and server_id. + + We also remember the last logged GTID for every domain_id. This is used + to know where to start when a master is changed to a slave. As a side + effect, it also allows to skip a hash lookup in the very common case of + logging a new GTID with same server id as last GTID. */ struct rpl_binlog_state { - /* Mapping from (domain_id,server_id) to its GTID. */ + struct element { + uint32 domain_id; + HASH hash; /* Containing all server_id for one domain_id */ + /* The most recent entry in the hash. */ + rpl_gtid *last_gtid; + }; + /* Mapping from domain_id to collection of elements. */ HASH hash; /* Mutex protecting access to the state. */ mysql_mutex_t LOCK_binlog_state; @@ -3038,12 +3049,14 @@ struct rpl_binlog_state rpl_binlog_state(); ~rpl_binlog_state(); - ulong count() const { return hash.records; } - int update(const struct rpl_gtid *gtid); void reset(); - uint32 seq_no_for_server_id(uint32 server_id); + int update(const struct rpl_gtid *gtid); + uint32 seq_no_from_state(); int write_to_iocache(IO_CACHE *dest); int read_from_iocache(IO_CACHE *src); + uint32 count(); + int get_gtid_list(rpl_gtid *gtid_list, uint32 list_size); + int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size); }; diff --git a/sql/slave.cc b/sql/slave.cc index a16c56053f4..f39a1f18d5c 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -1792,10 +1792,29 @@ after_set_capability: char str_buf[256]; String connect_state(str_buf, sizeof(str_buf), system_charset_info); connect_state.length(0); + rpl_gtid *binlog_gtid_list= NULL; + uint32 num_binlog_gtids= 0; + + if (opt_bin_log) + { + int err= mysql_bin_log.get_most_recent_gtid_list(&binlog_gtid_list, + &num_binlog_gtids); + if (err) + { + err_code= ER_OUTOFMEMORY; + errmsg= "The slave I/O thread stops because a fatal out-of-memory " + "error is encountered when it tries to compute @slave_connect_state."; + sprintf(err_buff, "%s Error: Out of memory", errmsg); + goto err; + } + } connect_state.append(STRING_WITH_LEN("SET @slave_connect_state='"), system_charset_info); - rpl_global_gtid_slave_state.tostring(&connect_state); + rpl_global_gtid_slave_state.tostring(&connect_state, binlog_gtid_list, + num_binlog_gtids); + if (binlog_gtid_list) + my_free(binlog_gtid_list); connect_state.append(STRING_WITH_LEN("'"), system_charset_info); rc= mysql_real_query(mysql, connect_state.ptr(), connect_state.length()); if (rc)