diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index c26469acb78..1ad2fa782ad 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -290,6 +290,7 @@ retry_event_group(rpl_group_info *rgi, rpl_parallel_thread *rpt, THD *thd= rgi->thd; rpl_parallel_entry *entry= rgi->parallel_entry; ulong retries= 0; + Format_description_log_event *description_event= NULL; do_retry: event_count= 0; @@ -355,6 +356,14 @@ do_retry: goto err; } cur_offset= rgi->retry_start_offset; + delete description_event; + description_event= + read_relay_log_description_event(&rlog, cur_offset, &errmsg); + if (!description_event) + { + err= 1; + goto err; + } my_b_seek(&rlog, cur_offset); do @@ -367,8 +376,7 @@ do_retry: for (;;) { old_offset= cur_offset; - ev= Log_event::read_log_event(&rlog, 0, - rli->relay_log.description_event_for_exec /* ToDo: this needs fixing */, + ev= Log_event::read_log_event(&rlog, 0, description_event, opt_slave_sql_verify_checksum); cur_offset= my_b_tell(&rlog); @@ -416,7 +424,12 @@ do_retry: } event_type= ev->get_type_code(); - if (!Log_event::is_group_event(event_type)) + if (event_type == FORMAT_DESCRIPTION_EVENT) + { + delete description_event; + description_event= (Format_description_log_event *)ev; + continue; + } else if (!Log_event::is_group_event(event_type)) { delete ev; continue; @@ -472,6 +485,8 @@ do_retry: err: + if (description_event) + delete description_event; if (fd >= 0) { end_io_cache(&rlog); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 34ec0f0393f..bb8528c5a98 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -518,6 +518,90 @@ void Relay_log_info::clear_until_condition() } +/* + Read the correct format description event for starting to replicate from + a given position in a relay log file. +*/ +Format_description_log_event * +read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, + const char **errmsg) +{ + Log_event *ev; + Format_description_log_event *fdev; + bool found= false; + + /* + By default the relay log is in binlog format 3 (4.0). + Even if format is 4, this will work enough to read the first event + (Format_desc) (remember that format 4 is just lenghtened compared to format + 3; format 3 is a prefix of format 4). + */ + fdev= new Format_description_log_event(3); + + while (!found) + { + Log_event_type typ; + + /* + Read the possible Format_description_log_event; if position + was 4, no need, it will be read naturally. + */ + DBUG_PRINT("info",("looking for a Format_description_log_event")); + + if (my_b_tell(cur_log) >= start_pos) + break; + + if (!(ev= Log_event::read_log_event(cur_log, 0, fdev, + opt_slave_sql_verify_checksum))) + { + DBUG_PRINT("info",("could not read event, cur_log->error=%d", + cur_log->error)); + if (cur_log->error) /* not EOF */ + { + *errmsg= "I/O error reading event at position 4"; + delete fdev; + return NULL; + } + break; + } + typ= ev->get_type_code(); + if (typ == FORMAT_DESCRIPTION_EVENT) + { + DBUG_PRINT("info",("found Format_description_log_event")); + delete fdev; + fdev= (Format_description_log_event*) ev; + /* + As ev was returned by read_log_event, it has passed is_valid(), so + my_malloc() in ctor worked, no need to check again. + */ + /* + Ok, we found a Format_description event. But it is not sure that this + describes the whole relay log; indeed, one can have this sequence + (starting from position 4): + Format_desc (of slave) + Rotate (of master) + Format_desc (of master) + So the Format_desc which really describes the rest of the relay log + is the 3rd event (it can't be further than that, because we rotate + the relay log when we queue a Rotate event from the master). + But what describes the Rotate is the first Format_desc. + So what we do is: + go on searching for Format_description events, until you exceed the + position (argument 'pos') or until you find another event than Rotate + or Format_desc. + */ + } + else + { + DBUG_PRINT("info",("found event of another type=%d", typ)); + found= (typ != ROTATE_EVENT); + delete ev; + } + } + return fdev; +} + + /* Open the given relay log @@ -641,68 +725,13 @@ int init_relay_log_pos(Relay_log_info* rli,const char* log, */ if (pos > BIN_LOG_HEADER_SIZE) /* If pos<=4, we stay at 4 */ { - Log_event* ev; - while (look_for_description_event) + if (look_for_description_event) { - /* - Read the possible Format_description_log_event; if position - was 4, no need, it will be read naturally. - */ - DBUG_PRINT("info",("looking for a Format_description_log_event")); - - if (my_b_tell(rli->cur_log) >= pos) - break; - - /* - Because of we have rli->data_lock and log_lock, we can safely read an - event - */ - if (!(ev= Log_event::read_log_event(rli->cur_log, 0, - rli->relay_log.description_event_for_exec, - opt_slave_sql_verify_checksum))) - { - DBUG_PRINT("info",("could not read event, rli->cur_log->error=%d", - rli->cur_log->error)); - if (rli->cur_log->error) /* not EOF */ - { - *errmsg= "I/O error reading event at position 4"; - goto err; - } - break; - } - else if (ev->get_type_code() == FORMAT_DESCRIPTION_EVENT) - { - DBUG_PRINT("info",("found Format_description_log_event")); - delete rli->relay_log.description_event_for_exec; - rli->relay_log.description_event_for_exec= (Format_description_log_event*) ev; - /* - As ev was returned by read_log_event, it has passed is_valid(), so - my_malloc() in ctor worked, no need to check again. - */ - /* - Ok, we found a Format_description event. But it is not sure that this - describes the whole relay log; indeed, one can have this sequence - (starting from position 4): - Format_desc (of slave) - Rotate (of master) - Format_desc (of master) - So the Format_desc which really describes the rest of the relay log - is the 3rd event (it can't be further than that, because we rotate - the relay log when we queue a Rotate event from the master). - But what describes the Rotate is the first Format_desc. - So what we do is: - go on searching for Format_description events, until you exceed the - position (argument 'pos') or until you find another event than Rotate - or Format_desc. - */ - } - else - { - DBUG_PRINT("info",("found event of another type=%d", - ev->get_type_code())); - look_for_description_event= (ev->get_type_code() == ROTATE_EVENT); - delete ev; - } + Format_description_log_event *fdev; + if (!(fdev= read_relay_log_description_event(rli->cur_log, pos, errmsg))) + goto err; + delete rli->relay_log.description_event_for_exec; + rli->relay_log.description_event_for_exec= fdev; } my_b_seek(rli->cur_log,(off_t)pos); #ifndef DBUG_OFF diff --git a/sql/slave.h b/sql/slave.h index e65b4a589a1..e16f801b577 100644 --- a/sql/slave.h +++ b/sql/slave.h @@ -220,6 +220,10 @@ void end_relay_log_info(Relay_log_info* rli); void lock_slave_threads(Master_info* mi); void unlock_slave_threads(Master_info* mi); void init_thread_mask(int* mask,Master_info* mi,bool inverse); +Format_description_log_event * +read_relay_log_description_event(IO_CACHE *cur_log, ulonglong start_pos, + const char **errmsg); + int init_relay_log_pos(Relay_log_info* rli,const char* log,ulonglong pos, bool need_data_lock, const char** errmsg, bool look_for_description_event);