diff --git a/mysql-test/suite/rpl/r/rpl_incompatible_heartbeat.result b/mysql-test/suite/rpl/r/rpl_incompatible_heartbeat.result new file mode 100644 index 00000000000..51da761c50d --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_incompatible_heartbeat.result @@ -0,0 +1,17 @@ +include/master-slave.inc +[connection master] +connection master; +SET @saved_dbug = @@GLOBAL.debug_dbug; +SET @@global.debug_dbug= 'd,simulate_pos_4G'; +connection slave; +include/stop_slave.inc +CHANGE MASTER TO MASTER_HEARTBEAT_PERIOD=0.001; +include/start_slave.inc +connection master; +SET @@GLOBAL.debug_dbug = @saved_dbug; +connection slave; +connection master; +CREATE TABLE t (f INT) ENGINE=INNODB; +INSERT INTO t VALUES (10); +DROP TABLE t; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_incompatible_heartbeat.test b/mysql-test/suite/rpl/t/rpl_incompatible_heartbeat.test new file mode 100644 index 00000000000..104debe707f --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_incompatible_heartbeat.test @@ -0,0 +1,44 @@ +# ==== Purpose ==== +# +# Test verifies that slave IO thread can process heartbeat events with log_pos +# values higher than UINT32_MAX. +# +# ==== Implementation ==== +# +# Steps: +# 0 - Stop slave threads. Configure a small master_heartbeat_period. +# 1 - Using debug points, simulate a huge binlog offset higher than +# UINT32_MAX on master. +# 2 - Start the slave and observe that slave IO thread is able to process +# the offset received through heartbeat event. +# +# ==== References ==== +# +# MDEV-16146: MariaDB slave stops with incompatible heartbeat +# +--source include/have_debug.inc +--source include/have_innodb.inc +--source include/have_binlog_format_mixed.inc +# Test simulates binarylog offsets higher than UINT32_MAX +--source include/have_64bit.inc +--source include/master-slave.inc + +--connection master +SET @saved_dbug = @@GLOBAL.debug_dbug; +SET @@global.debug_dbug= 'd,simulate_pos_4G'; + +--connection slave +--source include/stop_slave.inc +CHANGE MASTER TO MASTER_HEARTBEAT_PERIOD=0.001; +--source include/start_slave.inc + +--connection master +sleep 1; +SET @@GLOBAL.debug_dbug = @saved_dbug; +--sync_slave_with_master + +--connection master +CREATE TABLE t (f INT) ENGINE=INNODB; +INSERT INTO t VALUES (10); +DROP TABLE t; +--source include/rpl_end.inc diff --git a/sql/log_event.cc b/sql/log_event.cc index c32f31db1f6..94b2af20354 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -14324,14 +14324,23 @@ st_print_event_info::st_print_event_info() #endif #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) -Heartbeat_log_event::Heartbeat_log_event(const char* buf, uint event_len, +Heartbeat_log_event::Heartbeat_log_event(const char* buf, ulong event_len, const Format_description_log_event* description_event) :Log_event(buf, description_event) { uint8 header_size= description_event->common_header_len; - ident_len = event_len - header_size; - set_if_smaller(ident_len,FN_REFLEN-1); - log_ident= buf + header_size; + if (log_pos == 0) + { + log_pos= uint8korr(buf + header_size); + log_ident= buf + header_size + HB_SUB_HEADER_LEN; + ident_len= event_len - (header_size + HB_SUB_HEADER_LEN); + } + else + { + log_ident= buf + header_size; + ident_len = event_len - header_size; + } + } #endif diff --git a/sql/log_event.h b/sql/log_event.h index 3fc44a9669f..bd40795a7fb 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -570,6 +570,14 @@ class String; #define MARIA_SLAVE_CAPABILITY_MINE MARIA_SLAVE_CAPABILITY_GTID +/* + When the size of 'log_pos' within Heartbeat_log_event exceeds UINT32_MAX it + cannot be accommodated in common_header, as 'log_pos' is of 4 bytes size. In + such cases, sub_header, of size 8 bytes will hold larger 'log_pos' value. +*/ +#define HB_SUB_HEADER_LEN 8 + + /** @enum Log_event_type @@ -5160,12 +5168,13 @@ static inline bool copy_event_cache_to_file_and_reinit(IO_CACHE *cache, class Heartbeat_log_event: public Log_event { public: - Heartbeat_log_event(const char* buf, uint event_len, + uint8 hb_flags; + Heartbeat_log_event(const char* buf, ulong event_len, const Format_description_log_event* description_event); Log_event_type get_type_code() { return HEARTBEAT_LOG_EVENT; } bool is_valid() const { - return (log_ident != NULL && + return (log_ident != NULL && ident_len <= FN_REFLEN-1 && log_pos >= BIN_LOG_HEADER_SIZE); } const char * get_log_ident() { return log_ident; } diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 59a3f686e45..7ff0e27b008 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -32,6 +32,7 @@ #include "debug_sync.h" #include "log.h" // get_gtid_list_event + enum enum_gtid_until_state { GTID_UNTIL_NOT_DONE, GTID_UNTIL_STOP_AFTER_STANDALONE, @@ -781,7 +782,7 @@ get_slave_until_gtid(THD *thd, String *out_str) @param event_coordinates binlog file name and position of the last real event master sent from binlog - @note + @note Among three essential pieces of heartbeat data Log_event::when is computed locally. The error to send is serious and should force terminating @@ -795,6 +796,8 @@ static int send_heartbeat_event(binlog_send_info *info, DBUG_ENTER("send_heartbeat_event"); ulong ev_offset; + char sub_header_buf[HB_SUB_HEADER_LEN]; + bool sub_header_in_use=false; if (reset_transmit_packet(info, info->flags, &ev_offset, &info->errmsg)) DBUG_RETURN(1); @@ -815,18 +818,38 @@ static int send_heartbeat_event(binlog_send_info *info, ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + (do_checksum ? BINLOG_CHECKSUM_LEN : 0); int4store(header + SERVER_ID_OFFSET, global_system_variables.server_id); + DBUG_EXECUTE_IF("simulate_pos_4G", + { + const_cast(coord)->pos= (UINT_MAX32 + (ulong)1); + DBUG_SET("-d, simulate_pos_4G"); + };); + if (coord->pos <= UINT_MAX32) + { + int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos + } + else + { + // Set common_header.log_pos=0 to indicate its overflow + int4store(header + LOG_POS_OFFSET, 0); + sub_header_in_use= true; + int8store(sub_header_buf, coord->pos); + event_len+= HB_SUB_HEADER_LEN; + } + int4store(header + EVENT_LEN_OFFSET, event_len); int2store(header + FLAGS_OFFSET, 0); - int4store(header + LOG_POS_OFFSET, coord->pos); // log_pos - packet->append(header, sizeof(header)); - packet->append(p, ident_len); // log_file_name + if (sub_header_in_use) + packet->append(sub_header_buf, sizeof(sub_header_buf)); + packet->append(p, ident_len); // log_file_name if (do_checksum) { char b[BINLOG_CHECKSUM_LEN]; ha_checksum crc= my_checksum(0, (uchar*) header, sizeof(header)); + if (sub_header_in_use) + crc= my_checksum(crc, (uchar*) sub_header_buf, sizeof(sub_header_buf)); crc= my_checksum(crc, (uchar*) p, ident_len); int4store(b, crc); packet->append(b, sizeof(b));