diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index fe9005df3ef..1c34a57b1e3 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -161,6 +161,8 @@ class Master_info : public Slave_reporting_capability events_queued_since_last_gtid is non-zero. */ rpl_gtid last_queued_gtid; + /* Whether last_queued_gtid had the FL_STANDALONE flag set. */ + bool last_queued_gtid_standalone; /* When slave IO thread needs to reconnect, gtid_reconnect_event_skip_count counts number of events to skip from the first GTID-prefixed event group, diff --git a/sql/slave.cc b/sql/slave.cc index 0a2fcb38b3d..0b99d2bd6d7 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -5176,11 +5176,11 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) case GTID_EVENT: { - uchar dummy_flag; + uchar gtid_flag; if (Gtid_log_event::peek(buf, event_len, checksum_alg, &event_gtid.domain_id, &event_gtid.server_id, - &event_gtid.seq_no, &dummy_flag, + &event_gtid.seq_no, >id_flag, rli->relay_log.description_event_for_queue)) { error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; @@ -5233,6 +5233,9 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) /* We have successfully queued to relay log everything before this GTID, so in case of reconnect we can start from after any previous GTID. + (Normally we would have updated gtid_current_pos earlier at the end of + the previous event group, but better leave an extra check here for + safety). */ if (mi->events_queued_since_last_gtid) { @@ -5240,6 +5243,8 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) mi->events_queued_since_last_gtid= 0; } mi->last_queued_gtid= event_gtid; + mi->last_queued_gtid_standalone= + (gtid_flag & Gtid_log_event::FL_STANDALONE) != 0; ++mi->events_queued_since_last_gtid; inc_pos= event_len; } @@ -5321,6 +5326,19 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) else rli->relay_log.harvest_bytes_written(&rli->log_space_total); } + else if (mi->gtid_reconnect_event_skip_count == 0) + { + /* + Add a fake rotate event so that SQL thread can see the old-style + position where we re-connected in the middle of a GTID event group. + */ + Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0); + fake_rev.server_id= mi->master_id; + if (rli->relay_log.append_no_lock(&fake_rev)) + error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; + else + rli->relay_log.harvest_bytes_written(&rli->log_space_total); + } } else if ((s_id == global_system_variables.server_id && @@ -5392,6 +5410,26 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len) } mysql_mutex_unlock(log_lock); + if (!error && + mi->using_gtid != Master_info::USE_GTID_NO && + mi->events_queued_since_last_gtid > 0 && + ( (mi->last_queued_gtid_standalone && + !Log_event::is_part_of_group((Log_event_type)(uchar) + buf[EVENT_TYPE_OFFSET])) || + (!mi->last_queued_gtid_standalone && + ((uchar)buf[EVENT_TYPE_OFFSET] == XID_EVENT || + ((uchar)buf[EVENT_TYPE_OFFSET] == QUERY_EVENT && + Query_log_event::peek_is_commit_rollback(buf, event_len, + checksum_alg)))))) + { + /* + The whole of the current event group is queued. So in case of + reconnect we can start from after the current GTID. + */ + mi->gtid_current_pos.update(&mi->last_queued_gtid); + mi->events_queued_since_last_gtid= 0; + } + skip_relay_logging: err: