diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 83385e9d005..ab94c415db7 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -476,6 +476,30 @@ static bool check_database(const char *log_dbname) } + +static int +write_event_header_and_base64(Log_event *ev, FILE *result_file, + PRINT_EVENT_INFO *print_event_info) +{ + DBUG_ENTER("write_event_header_and_base64"); + /* Write header and base64 output to cache */ + IO_CACHE result_cache; + if (init_io_cache(&result_cache, -1, 0, WRITE_CACHE, 0L, FALSE, + MYF(MY_WME | MY_NABP))) + { + return 1; + } + + ev->print_header(&result_cache, print_event_info, FALSE); + ev->print_base64(&result_cache, print_event_info, FALSE); + + /* Read data from cache and write to result file */ + my_b_copy_to_file(&result_cache, result_file); + end_io_cache(&result_cache); + DBUG_RETURN(0); +} + + /* Process an event @@ -538,18 +562,18 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, print_event_info->base64_output= opt_base64_output; + DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str())); + switch (ev_type) { case QUERY_EVENT: if (check_database(((Query_log_event*)ev)->db)) goto end; if (opt_base64_output) - { - ev->print_header(result_file, print_event_info); - ev->print_base64(result_file, print_event_info); - } + write_event_header_and_base64(ev, result_file, print_event_info); else ev->print(result_file, print_event_info); break; + case CREATE_FILE_EVENT: { Create_file_log_event* ce= (Create_file_log_event*)ev; @@ -570,8 +594,7 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, */ if (opt_base64_output) { - ce->print_header(result_file, print_event_info); - ce->print_base64(result_file, print_event_info); + write_event_header_and_base64(ce, result_file, print_event_info); } else ce->print(result_file, print_event_info, TRUE); diff --git a/include/base64.h b/include/base64.h index 4653e824a9a..a6bffebfe07 100644 --- a/include/base64.h +++ b/include/base64.h @@ -39,7 +39,8 @@ int base64_encode(const void *src, size_t src_len, char *dst); /* Decode a base64 string into data */ -int base64_decode(const char *src, size_t src_len, void *dst); +int base64_decode(const char *src, size_t src_len, + void *dst, const char **end_ptr); #ifdef __cplusplus diff --git a/include/my_sys.h b/include/my_sys.h index 115183bf9c2..e26e6535c66 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -517,6 +517,7 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); (uint) (*(info)->current_pos - (info)->request_pos)) /* tell write offset in the SEQ_APPEND cache */ +int my_b_copy_to_file(IO_CACHE *cache, FILE *file); my_off_t my_b_append_tell(IO_CACHE* info); my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */ diff --git a/mysql-test/r/binlog_row_mix_innodb_myisam.result b/mysql-test/r/binlog_row_mix_innodb_myisam.result index ae66f98739d..a192d243bb0 100644 --- a/mysql-test/r/binlog_row_mix_innodb_myisam.result +++ b/mysql-test/r/binlog_row_mix_innodb_myisam.result @@ -359,15 +359,6 @@ show binlog events from 102; Log_name Pos Event_type Server_id End_log_pos Info master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Query 1 # use `test`; BEGIN -master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` ( - `a` int(11) NOT NULL DEFAULT '0', - `b` int(11) DEFAULT NULL, - PRIMARY KEY (`a`) -) ENGINE=InnoDB -master-bin.000001 # Table_map 1 # table_id: # (test.t2) -master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2 master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F @@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2 master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Query 1 # use `test`; BEGIN -master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` ( - `a` int(11) NOT NULL DEFAULT '0', - `b` int(11) DEFAULT NULL, - PRIMARY KEY (`a`) -) ENGINE=InnoDB -master-bin.000001 # Table_map 1 # table_id: # (test.t2) -master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2 master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Table_map 1 # table_id: # (test.t1) diff --git a/mysql-test/t/view.test b/mysql-test/t/view.test index 934e0624fd6..0eadb33c3e1 100644 --- a/mysql-test/t/view.test +++ b/mysql-test/t/view.test @@ -347,13 +347,13 @@ create view v3 (x,y,z) as select b, a, b from t1; create view v4 (x,y,z) as select c+1, b, a from t1; create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1; # try insert to VIEW with fields duplicate --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v3 values (-60,4,30); # try insert to VIEW with expression in SELECT list --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v4 values (-60,4,30); # try insert to VIEW using temporary table algorithm --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v5 values (-60,4,30); insert into v1 values (-60,4,30); insert into v1 (z,y,x) values (50,6,-100); @@ -375,13 +375,13 @@ create view v3 (x,y,z) as select b, a, b from t1; create view v4 (x,y,z) as select c+1, b, a from t1; create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1; # try insert to VIEW with fields duplicate --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v3 select c, b, a from t2; # try insert to VIEW with expression in SELECT list --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v4 select c, b, a from t2; # try insert to VIEW using temporary table algorithm --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v5 select c, b, a from t2; insert into v1 select c, b, a from t2; insert into v1 (z,y,x) select a+20,b+2,-100 from t2; @@ -1249,14 +1249,14 @@ drop table t1; # create table t1 (s1 smallint); create view v1 as select * from t1 where 20 < (select (s1) from t1); --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v1 values (30); create view v2 as select * from t1; create view v3 as select * from t1 where 20 < (select (s1) from v2); --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v3 values (30); create view v4 as select * from v2 where 20 < (select (s1) from t1); --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v4 values (30); drop view v4, v3, v2, v1; drop table t1; @@ -2861,7 +2861,7 @@ DROP TABLE t1; # create table t1 (s1 int); create view v1 as select s1 as a, s1 as b from t1; ---error 1573 +--error ER_NON_INSERTABLE_TABLE insert into v1 values (1,1); update v1 set a = 5; drop view v1; diff --git a/mysys/base64.c b/mysys/base64.c index fb51bdb3a60..363aa2cc739 100644 --- a/mysys/base64.c +++ b/mysys/base64.c @@ -125,44 +125,69 @@ pos(unsigned char c) /* Decode a base64 string - Note: We require that dst is pre-allocated to correct size. - See base64_needed_decoded_length(). + SYNOPSIS + base64_decode() + src Pointer to base64-encoded string + len Length of string at 'src' + dst Pointer to location where decoded data will be stored + end_ptr Pointer to variable that will refer to the character + after the end of the encoded data that were decoded. Can + be NULL. - RETURN Number of bytes produced in dst or -1 in case of failure + DESCRIPTION + + The base64-encoded data in the range ['src','*end_ptr') will be + decoded and stored starting at 'dst'. The decoding will stop + after 'len' characters have been read from 'src', or when padding + occurs in the base64-encoded data. In either case: if 'end_ptr' is + non-null, '*end_ptr' will be set to point to the character after + the last read character, even in the presence of error. + + NOTE + We require that 'dst' is pre-allocated to correct size. + + SEE ALSO + base64_needed_decoded_length(). + + RETURN VALUE + Number of bytes written at 'dst' or -1 in case of failure */ int -base64_decode(const char *src, size_t size, void *dst) +base64_decode(const char *const src_base, size_t const len, + void *dst, const char **end_ptr) { char b[3]; size_t i= 0; char *dst_base= (char *)dst; + char const *src= src_base; char *d= dst_base; size_t j; - while (i < size) + while (i < len) { unsigned c= 0; size_t mark= 0; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); c += pos(*src++); c <<= 6; i++; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); c += pos(*src++); c <<= 6; i++; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); - if (* src != '=') + if (*src != '=') c += pos(*src++); else { - i= size; + src += 2; /* There should be two bytes padding */ + i= len; mark= 2; c <<= 6; goto end; @@ -170,13 +195,14 @@ base64_decode(const char *src, size_t size, void *dst) c <<= 6; i++; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); if (*src != '=') c += pos(*src++); else { - i= size; + src += 1; /* There should be one byte padding */ + i= len; mark= 1; goto end; } @@ -191,11 +217,14 @@ base64_decode(const char *src, size_t size, void *dst) *d++= b[j]; } - if (i != size) - { - return -1; - } - return d - dst_base; + if (end_ptr != NULL) + *end_ptr= src; + + /* + The variable 'i' is set to 'len' when padding has been read, so it + does not actually reflect the number of bytes read from 'src'. + */ + return i != len ? -1 : d - dst_base; } diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index f1ea21c2a47..895f03dc714 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -24,6 +24,54 @@ #include #include +/* + Copy contents of an IO_CACHE to a file. + + SYNOPSIS + my_b_copy_to_file() + cache IO_CACHE to copy from + file File to copy to + + DESCRIPTION + Copy the contents of the cache to the file. The cache will be + re-inited to a read cache and will read from the beginning of the + cache. + + If a failure to write fully occurs, the cache is only copied + partially. + + TODO + Make this function solid by handling partial reads from the cache + in a correct manner: it should be atomic. + + RETURN VALUE + 0 All OK + 1 An error occured +*/ +int +my_b_copy_to_file(IO_CACHE *cache, FILE *file) +{ + byte buf[IO_SIZE]; + uint bytes_in_cache; + DBUG_ENTER("my_b_copy_to_file"); + + /* Reinit the cache to read from the beginning of the cache */ + if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + DBUG_RETURN(1); + bytes_in_cache= my_b_bytes_in_cache(cache); + while (bytes_in_cache > 0) { + uint const read_bytes= min(bytes_in_cache, sizeof(buf)); + DBUG_PRINT("debug", ("Remaining %u bytes - Reading %u bytes", + bytes_in_cache, read_bytes)); + if (my_b_read(cache, buf, read_bytes)) + DBUG_RETURN(1); + if (my_fwrite(file, buf, read_bytes, MYF(MY_WME | MY_NABP)) == (uint) -1) + DBUG_RETURN(1); + bytes_in_cache -= read_bytes; + } + DBUG_RETURN(0); +} + my_off_t my_b_append_tell(IO_CACHE* info) { /* diff --git a/sql/log.cc b/sql/log.cc index b63ec563baf..a1ed9bd6df3 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -32,10 +32,22 @@ #include +/* + Define placement versions of operator new and operator delete since + we cannot be sure that the include exists. + */ +inline void *operator new(size_t, void *ptr) { return ptr; } +inline void *operator new[](size_t, void *ptr) { return ptr; } +inline void operator delete(void*, void*) { /* Do nothing */ } +inline void operator delete[](void*, void*) { /* Do nothing */ } + /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_USER_HOST_SIZE 512 #define MAX_TIME_SIZE 32 +#define MY_OFF_T_UNDEF (~(my_off_t)0UL) + +#define FLAGSTR(V,F) ((V)&(F)?#F" ":"") LOGGER logger; @@ -70,23 +82,92 @@ char *make_default_log_name(char *buff,const char* log_ext) } /* - This is a POD. Please keep it that way! - - Don't add constructors, destructors, or virtual functions. + Helper class to store binary log transaction data. */ -struct binlog_trx_data { +class binlog_trx_data { +public: + binlog_trx_data() +#ifdef HAVE_ROW_BASED_REPLICATION + : m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF) +#endif + { + trans_log.end_of_file= max_binlog_cache_size; + } + + ~binlog_trx_data() + { +#ifdef HAVE_ROW_BASED_REPLICATION + DBUG_ASSERT(pending() == NULL); +#endif + close_cached_file(&trans_log); + } + + my_off_t position() const { + return my_b_tell(&trans_log); + } + bool empty() const { #ifdef HAVE_ROW_BASED_REPLICATION - return pending == NULL && my_b_tell(&trans_log) == 0; + return pending() == NULL && my_b_tell(&trans_log) == 0; #else return my_b_tell(&trans_log) == 0; #endif } - binlog_trx_data() {} - IO_CACHE trans_log; // The transaction cache + + /* + Truncate the transaction cache to a certain position. This + includes deleting the pending event. + */ + void truncate(my_off_t pos) + { #ifdef HAVE_ROW_BASED_REPLICATION - Rows_log_event *pending; // The pending binrows event + delete pending(); + set_pending(0); +#endif + reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); + } + + /* + Reset the entire contents of the transaction cache, emptying it + completely. + */ + void reset() { + if (!empty()) + truncate(0); +#ifdef HAVE_ROW_BASED_REPLICATION + before_stmt_pos= MY_OFF_T_UNDEF; +#endif + trans_log.end_of_file= max_binlog_cache_size; + } + +#ifdef HAVE_ROW_BASED_REPLICATION + Rows_log_event *pending() const + { + return m_pending; + } + + void set_pending(Rows_log_event *const pending) + { + m_pending= pending; + } +#endif + + IO_CACHE trans_log; // The transaction cache + +private: +#ifdef HAVE_ROW_BASED_REPLICATION + /* + Pending binrows event. This event is the event where the rows are + currently written. + */ + Rows_log_event *m_pending; + +public: + /* + Binlog position before the start of the current statement. + */ + my_off_t before_stmt_pos; #endif }; @@ -1149,6 +1230,69 @@ void Log_to_csv_event_handler:: } + /* + Save position of binary log transaction cache. + + SYNPOSIS + binlog_trans_log_savepos() + + thd The thread to take the binlog data from + pos Pointer to variable where the position will be stored + + DESCRIPTION + + Save the current position in the binary log transaction cache into + the variable pointed to by 'pos' + */ + +static void +binlog_trans_log_savepos(THD *thd, my_off_t *pos) +{ + DBUG_ENTER("binlog_trans_log_savepos"); + DBUG_ASSERT(pos != NULL); + if (thd->ha_data[binlog_hton->slot] == NULL) + thd->binlog_setup_trx_data(); + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; + DBUG_ASSERT(mysql_bin_log.is_open()); + *pos= trx_data->position(); + DBUG_PRINT("return", ("*pos=%u", *pos)); + DBUG_VOID_RETURN; +} + + +/* + Truncate the binary log transaction cache. + + SYNPOSIS + binlog_trans_log_truncate() + + thd The thread to take the binlog data from + pos Position to truncate to + + DESCRIPTION + + Truncate the binary log to the given position. Will not change + anything else. + + */ +static void +binlog_trans_log_truncate(THD *thd, my_off_t pos) +{ + DBUG_ENTER("binlog_trans_log_truncate"); + DBUG_PRINT("enter", ("pos=%u", pos)); + + DBUG_ASSERT(thd->ha_data[binlog_hton->slot] != NULL); + /* Only true if binlog_trans_log_savepos() wasn't called before */ + DBUG_ASSERT(pos != ~(my_off_t) 0); + + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; + trx_data->truncate(pos); + DBUG_VOID_RETURN; +} + + /* this function is mostly a placeholder. conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open) @@ -1175,27 +1319,62 @@ static int binlog_close_connection(handlerton *hton, THD *thd) { binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; - IO_CACHE *trans_log= &trx_data->trans_log; DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty()); - close_cached_file(trans_log); thd->ha_data[binlog_hton->slot]= 0; + trx_data->~binlog_trx_data(); my_free((gptr)trx_data, MYF(0)); return 0; } +/* + End a transaction. + + SYNOPSIS + binlog_end_trans() + + thd The thread whose transaction should be ended + trx_data Pointer to the transaction data to use + end_ev The end event to use, or NULL + all True if the entire transaction should be ended, false if + only the statement transaction should be ended. + + DESCRIPTION + + End the currently open transaction. The transaction can be either + a real transaction (if 'all' is true) or a statement transaction + (if 'all' is false). + + If 'end_ev' is NULL, the transaction is a rollback of only + transactional tables, so the transaction cache will be truncated + to either just before the last opened statement transaction (if + 'all' is false), or reset completely (if 'all' is true). + */ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, - Log_event *end_ev) +binlog_end_trans(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev, bool all) { DBUG_ENTER("binlog_end_trans"); int error=0; IO_CACHE *trans_log= &trx_data->trans_log; + DBUG_PRINT("enter", ("transaction: %s, end_ev=%p", + all ? "all" : "stmt", end_ev)); + DBUG_PRINT("info", ("thd->options={ %s%s}", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN))); - - /* NULL denotes ROLLBACK with nothing to replicate */ + /* + NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of + only transactional tables. If the transaction contain changes to + any non-transactiona tables, we need write the transaction and log + a ROLLBACK last. + */ if (end_ev != NULL) { /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + We can always end the statement when ending a transaction since transactions are not allowed inside stored functions. If they were, we would have to ensure that we're not ending a statement @@ -1204,38 +1383,55 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, #ifdef HAVE_ROW_BASED_REPLICATION thd->binlog_flush_pending_rows_event(TRUE); #endif - error= mysql_bin_log.write(thd, trans_log, end_ev); + /* + We write the transaction cache to the binary log if either we're + committing the entire transaction, or if we are doing an + autocommit outside a transaction. + */ + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + { + error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev); + trx_data->reset(); +#ifdef HAVE_ROW_BASED_REPLICATION + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); +#endif + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } + } } #ifdef HAVE_ROW_BASED_REPLICATION else { -#ifdef HAVE_ROW_BASED_REPLICATION - thd->binlog_delete_pending_rows_event(); -#endif + /* + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. + + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + + */ + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + trx_data->reset(); + else + trx_data->truncate(trx_data->before_stmt_pos); // ...statement + + /* + We need to step the table map version on a rollback to ensure + that a new table map event is generated instead of the one that + was written to the thrown-away transaction cache. + */ + mysql_bin_log.update_table_map_version(); } - - /* - We need to step the table map version both after writing the - entire transaction to the log file and after rolling back the - transaction. - - We need to step the table map version after writing the - transaction cache to disk. In addition, we need to step the table - map version on a rollback to ensure that a new table map event is - generated instead of the one that was written to the thrown-away - transaction cache. - */ - mysql_bin_log.update_table_map_version(); #endif - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } - reinit_io_cache(trans_log, WRITE_CACHE, (my_off_t) 0, 0, 1); // cannot fail - trans_log->end_of_file= max_binlog_cache_size; DBUG_RETURN(error); } @@ -1252,26 +1448,31 @@ static int binlog_prepare(handlerton *hton, THD *thd, bool all) static int binlog_commit(handlerton *hton, THD *thd, bool all) { + int error= 0; DBUG_ENTER("binlog_commit"); binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_ASSERT(mysql_bin_log.is_open() && - (all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))); + DBUG_ASSERT(mysql_bin_log.is_open()); - if (trx_data->empty()) + if (all && trx_data->empty()) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log() + trx_data->reset(); DBUG_RETURN(0); } - if (all) + if (all) { Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE); qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) - DBUG_RETURN(binlog_end_trans(thd, trx_data, &qev)); + int error= binlog_end_trans(thd, trx_data, &qev, all); + DBUG_RETURN(error); } else - DBUG_RETURN(binlog_end_trans(thd, trx_data, &invisible_commit)); + { + int error= binlog_end_trans(thd, trx_data, &invisible_commit, all); + DBUG_RETURN(error); + } } static int binlog_rollback(handlerton *hton, THD *thd, bool all) @@ -1281,13 +1482,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - /* - First assert is guaranteed - see trans_register_ha() call below. - The second must be true. If it is not, we're registering - unnecessary, doing extra work. The cause should be found and eliminated - */ - DBUG_ASSERT(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); - DBUG_ASSERT(mysql_bin_log.is_open() && !trx_data->empty()); + DBUG_ASSERT(mysql_bin_log.is_open()); + + if (trx_data->empty()) { + trx_data->reset(); + DBUG_RETURN(0); + } + /* Update the binary log with a BEGIN/ROLLBACK block if we have cached some queries and we updated some non-transactional @@ -1299,10 +1500,10 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) { Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE); qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) - error= binlog_end_trans(thd, trx_data, &qev); + error= binlog_end_trans(thd, trx_data, &qev, all); } else - error= binlog_end_trans(thd, trx_data, 0); + error= binlog_end_trans(thd, trx_data, 0, all); DBUG_RETURN(error); } @@ -1330,11 +1531,8 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all) static int binlog_savepoint_set(handlerton *hton, THD *thd, void *sv) { DBUG_ENTER("binlog_savepoint_set"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; - DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(&trx_data->trans_log)); - *(my_off_t *)sv= my_b_tell(&trx_data->trans_log); + binlog_trans_log_savepos(thd, (my_off_t*) sv); /* Write it to the binary log */ int const error= @@ -1349,7 +1547,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); + DBUG_ASSERT(mysql_bin_log.is_open()); /* Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some @@ -1364,7 +1562,7 @@ static int binlog_savepoint_rollback(handlerton *hton, THD *thd, void *sv) thd->query, thd->query_length, TRUE, FALSE); DBUG_RETURN(error); } - reinit_io_cache(trans_log, WRITE_CACHE, *(my_off_t *)sv, 0, 0); + binlog_trans_log_truncate(thd, *(my_off_t*)sv); DBUG_RETURN(0); } @@ -2495,7 +2693,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) thread. If the transaction involved MyISAM tables, it should go into binlog even on rollback. */ - (void) pthread_mutex_lock(&LOCK_thread_count); + VOID(pthread_mutex_lock(&LOCK_thread_count)); /* Save variables so that we can reopen the log */ save_name=name; @@ -2527,7 +2725,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) my_free((gptr) save_name, MYF(0)); err: - (void) pthread_mutex_unlock(&LOCK_thread_count); + VOID(pthread_mutex_unlock(&LOCK_thread_count)); pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); @@ -3093,18 +3291,76 @@ int THD::binlog_setup_trx_data() ha_data[binlog_hton->slot]= 0; DBUG_RETURN(1); // Didn't manage to set it up } - trx_data->trans_log.end_of_file= max_binlog_cache_size; + + trx_data= new (ha_data[binlog_hton->slot]) binlog_trx_data; + DBUG_RETURN(0); } +#ifdef HAVE_ROW_BASED_REPLICATION +/* + Function to start a statement and optionally a transaction for the + binary log. + + SYNOPSIS + binlog_start_trans_and_stmt() + + DESCRIPTION + + This function does three things: + - Start a transaction if not in autocommit mode or if a BEGIN + statement has been seen. + + - Start a statement transaction to allow us to truncate the binary + log. + + - Save the currrent binlog position so that we can roll back the + statement by truncating the transaction log. + + We only update the saved position if the old one was undefined, + the reason is that there are some cases (e.g., for CREATE-SELECT) + where the position is saved twice (e.g., both in + select_create::prepare() and THD::binlog_write_table_map()) , but + we should use the first. This means that calls to this function + can be used to start the statement before the first table map + event, to include some extra events. + */ + +void +THD::binlog_start_trans_and_stmt() +{ + DBUG_ENTER("binlog_start_trans_and_stmt"); + binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; + DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data)); + if (trx_data) + DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u", + trx_data->before_stmt_pos)); + if (trx_data == NULL || + trx_data->before_stmt_pos == MY_OFF_T_UNDEF) + { + /* + The call to binlog_trans_log_savepos() might create the trx_data + structure, if it didn't exist before, so we save the position + into an auto variable and then write it into the transaction + data for the binary log (i.e., trx_data). + */ + my_off_t pos= 0; + binlog_trans_log_savepos(this, &pos); + trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; + + trx_data->before_stmt_pos= pos; + + if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trans_register_ha(this, TRUE, binlog_hton); + trans_register_ha(this, FALSE, binlog_hton); + } + DBUG_VOID_RETURN; +} + /* Write a table map to the binary log. - - This function is called from ha_external_lock() after the storage - engine has registered for the transaction. */ -#ifdef HAVE_ROW_BASED_REPLICATION int THD::binlog_write_table_map(TABLE *table, bool is_trans) { int error; @@ -3123,10 +3379,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) Table_map_log_event the_event(this, table, table->s->table_map_id, is_trans, flags); - if (is_trans) - trans_register_ha(this, - (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0, - binlog_hton); + if (is_trans && binlog_table_maps == 0) + binlog_start_trans_and_stmt(); if ((error= mysql_bin_log.write(&the_event))) DBUG_RETURN(error); @@ -3147,7 +3401,7 @@ THD::binlog_get_pending_rows_event() const (since the trx_data is set up there). In that case, we just return NULL. */ - return trx_data ? trx_data->pending : NULL; + return trx_data ? trx_data->pending() : NULL; } void @@ -3160,7 +3414,7 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) (binlog_trx_data*) ha_data[binlog_hton->slot]; DBUG_ASSERT(trx_data); - trx_data->pending= ev; + trx_data->set_pending(ev); } @@ -3169,8 +3423,9 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) (either cached binlog if transaction, or disk binlog). Sets a new pending event. */ -int MYSQL_BIN_LOG:: - flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event) +int +MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, + Rows_log_event* event) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(mysql_bin_log.is_open()); @@ -3183,9 +3438,9 @@ int MYSQL_BIN_LOG:: DBUG_ASSERT(trx_data); - DBUG_PRINT("info", ("trx_data->pending=%p", trx_data->pending)); + DBUG_PRINT("info", ("trx_data->pending()=%p", trx_data->pending())); - if (Rows_log_event* pending= trx_data->pending) + if (Rows_log_event* pending= trx_data->pending()) { IO_CACHE *file= &log_file; @@ -3335,15 +3590,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; IO_CACHE *trans_log= &trx_data->trans_log; - bool trans_log_in_use= my_b_tell(trans_log) != 0; - if (event_info->get_cache_stmt() && !trans_log_in_use) - trans_register_ha(thd, - (thd->options & - (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0, - binlog_hton); - if (event_info->get_cache_stmt() || trans_log_in_use) + my_off_t trans_log_pos= my_b_tell(trans_log); + if (event_info->get_cache_stmt() || trans_log_pos != 0) { - DBUG_PRINT("info", ("Using trans_log")); + DBUG_PRINT("info", ("Using trans_log: cache=%d, trans_log_pos=%u", + event_info->get_cache_stmt(), + trans_log_pos)); + if (trans_log_pos == 0) + thd->binlog_start_trans_and_stmt(); file= trans_log; } /* @@ -3547,61 +3801,69 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event) uint length; /* - Log "BEGIN" at the beginning of the transaction. - which may contain more than 1 SQL statement. - */ - if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + We only bother to write to the binary log if there is anything + to write. + */ + if (my_b_tell(cache) > 0) { - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); /* - Imagine this is rollback due to net timeout, after all statements of - the transaction succeeded. Then we want a zero-error code in BEGIN. - In other words, if there was a really serious error code it's already - in the statement's events, there is no need to put it also in this - internally generated event, and as this event is generated late it - would lead to false alarms. - This is safer than thd->clear_error() against kills at shutdown. + Log "BEGIN" at the beginning of the transaction. + which may contain more than 1 SQL statement. */ - qinfo.error_code= 0; - /* - Now this Query_log_event has artificial log_pos 0. It must be adjusted - to reflect the real position in the log. Not doing it would confuse the - slave: it would prevent this one from knowing where he is in the - master's binlog, which would result in wrong positions being shown to - the user, MASTER_POS_WAIT undue waiting etc. - */ - if (qinfo.write(&log_file)) - goto err; - } - /* Read from the file used to cache the queries .*/ - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - goto err; - length=my_b_bytes_in_cache(cache); - DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); - do - { - /* Write data to the binary log file */ - if (my_b_write(&log_file, cache->read_pos, length)) - goto err; - cache->read_pos=cache->read_end; // Mark buffer used up - DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); - } while ((length=my_b_fill(cache))); + if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + { + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); + /* + Imagine this is rollback due to net timeout, after all statements of + the transaction succeeded. Then we want a zero-error code in BEGIN. + In other words, if there was a really serious error code it's already + in the statement's events, there is no need to put it also in this + internally generated event, and as this event is generated late it + would lead to false alarms. + This is safer than thd->clear_error() against kills at shutdown. + */ + qinfo.error_code= 0; + /* + Now this Query_log_event has artificial log_pos 0. It must be adjusted + to reflect the real position in the log. Not doing it would confuse the + slave: it would prevent this one from knowing where he is in the + master's binlog, which would result in wrong positions being shown to + the user, MASTER_POS_WAIT undue waiting etc. + */ + if (qinfo.write(&log_file)) + goto err; + } + /* Read from the file used to cache the queries .*/ + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + goto err; + length=my_b_bytes_in_cache(cache); + DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); + do + { + /* Write data to the binary log file */ + if (my_b_write(&log_file, cache->read_pos, length)) + goto err; + cache->read_pos=cache->read_end; // Mark buffer used up + DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); + } while ((length=my_b_fill(cache))); - if (commit_event->write(&log_file)) - goto err; + if (commit_event && commit_event->write(&log_file)) + goto err; #ifndef DBUG_OFF -DBUG_skip_commit: + DBUG_skip_commit: #endif - if (flush_and_sync()) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); - if (cache->error) // Error on read - { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + if (flush_and_sync()) + goto err; + DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); + if (cache->error) // Error on read + { + sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); + write_error=1; // Don't give more errors + goto err; + } + signal_update(); } - signal_update(); + /* if commit_event is Xid_log_event, increase the number of prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated @@ -3610,7 +3872,7 @@ DBUG_skip_commit: If the commit_event is not Xid_log_event (then it's a Query_log_event) rotate binlog, if necessary. */ - if (commit_event->get_type_code() == XID_EVENT) + if (commit_event && commit_event->get_type_code() == XID_EVENT) { pthread_mutex_lock(&LOCK_prep_xids); prepared_xids++; @@ -4620,12 +4882,17 @@ int TC_LOG_BINLOG::log(THD *thd, my_xid xid) Xid_log_event xle(thd, xid); binlog_trx_data *trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle)); // invert return value + /* + We always commit the entire transaction when writing an XID. Also + note that the return value is inverted. + */ + DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) { pthread_mutex_lock(&LOCK_prep_xids); + DBUG_ASSERT(prepared_xids > 0); if (--prepared_xids == 0) pthread_cond_signal(&COND_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids); diff --git a/sql/log_event.cc b/sql/log_event.cc index faff2cae48e..4a6346bf57c 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -32,32 +32,111 @@ #define log_cs &my_charset_latin1 +/* + Cache that will automatically be written to a dedicated file on + destruction. + + DESCRIPTION + + */ +class Write_on_release_cache +{ +public: + enum flag + { + FLUSH_F + }; + + typedef unsigned short flag_set; + + /* + Constructor. + + SYNOPSIS + Write_on_release_cache + cache Pointer to cache to use + file File to write cache to upon destruction + flags Flags for the cache + + DESCRIPTION + + Class used to guarantee copy of cache to file before exiting the + current block. On successful copy of the cache, the cache will + be reinited as a WRITE_CACHE. + + Currently, a pointer to the cache is provided in the + constructor, but it would be possible to create a subclass + holding the IO_CACHE itself. + */ + Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0) + : m_cache(cache), m_file(file), m_flags(flags) + { + reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE); + } + + ~Write_on_release_cache() + { + if (!my_b_copy_to_file(m_cache, m_file)) + reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE); + if (m_flags | FLUSH_F) + fflush(m_file); + } + + /* + Return a pointer to the internal IO_CACHE. + + SYNOPSIS + operator&() + + DESCRIPTION + Function to return a pointer to the internal, so that the object + can be treated as a IO_CACHE and used with the my_b_* IO_CACHE + functions + + RETURN VALUE + A pointer to the internal IO_CACHE. + */ + IO_CACHE *operator&() + { + return m_cache; + } + +private: + // Hidden, to prevent usage. + Write_on_release_cache(Write_on_release_cache const&); + + IO_CACHE *m_cache; + FILE *m_file; + flag_set m_flags; +}; + + /* pretty_print_str() */ #ifdef MYSQL_CLIENT -static void pretty_print_str(FILE* file, char* str, int len) +static void pretty_print_str(IO_CACHE* cache, char* str, int len) { char* end = str + len; - fputc('\'', file); + my_b_printf(cache, "\'"); while (str < end) { char c; switch ((c=*str++)) { - case '\n': fprintf(file, "\\n"); break; - case '\r': fprintf(file, "\\r"); break; - case '\\': fprintf(file, "\\\\"); break; - case '\b': fprintf(file, "\\b"); break; - case '\t': fprintf(file, "\\t"); break; - case '\'': fprintf(file, "\\'"); break; - case 0 : fprintf(file, "\\0"); break; + case '\n': my_b_printf(cache, "\\n"); break; + case '\r': my_b_printf(cache, "\\r"); break; + case '\\': my_b_printf(cache, "\\\\"); break; + case '\b': my_b_printf(cache, "\\b"); break; + case '\t': my_b_printf(cache, "\\t"); break; + case '\'': my_b_printf(cache, "\\'"); break; + case 0 : my_b_printf(cache, "\\0"); break; default: - fputc(c, file); + my_b_printf(cache, "%c", c); break; } } - fputc('\'', file); + my_b_printf(cache, "\'"); } #endif /* MYSQL_CLIENT */ @@ -294,14 +373,15 @@ append_query_string(CHARSET_INFO *csinfo, */ #ifdef MYSQL_CLIENT -static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, - uint32 flags, const char* name, bool* need_comma) +static void print_set_option(IO_CACHE* file, uint32 bits_changed, + uint32 option, uint32 flags, const char* name, + bool* need_comma) { if (bits_changed & option) { if (*need_comma) - fprintf(file,", "); - fprintf(file,"%s=%d", name, test(flags & option)); + my_b_printf(file,", "); + my_b_printf(file,"%s=%d", name, test(flags & option)); *need_comma= 1; } } @@ -960,20 +1040,23 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, Log_event::print_header() */ -void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Log_event::print_header(IO_CACHE* file, + PRINT_EVENT_INFO* print_event_info, + bool is_more __attribute__((unused))) { char llbuff[22]; my_off_t hexdump_from= print_event_info->hexdump_from; + DBUG_ENTER("Log_event::print_header"); - fputc('#', file); + my_b_printf(file, "#"); print_timestamp(file); - fprintf(file, " server id %d end_log_pos %s ", server_id, - llstr(log_pos,llbuff)); + my_b_printf(file, " server id %d end_log_pos %s ", server_id, + llstr(log_pos,llbuff)); /* mysqlbinlog --hexdump */ if (print_event_info->hexdump_from) { - fprintf(file, "\n"); + my_b_printf(file, "\n"); uchar *ptr= (uchar*)temp_buf; my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN; @@ -986,15 +1069,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) /* Pretty-print event common header if header is exactly 19 bytes */ if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN) { - fprintf(file, "# Position Timestamp Type Master ID " - "Size Master Pos Flags \n"); - fprintf(file, "# %8.8lx %02x %02x %02x %02x %02x " - "%02x %02x %02x %02x %02x %02x %02x %02x " - "%02x %02x %02x %02x %02x %02x\n", - (unsigned long) hexdump_from, - ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], - ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], - ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); + char emit_buf[256]; // Enough for storing one line + my_b_printf(file, "# Position Timestamp Type Master ID " + "Size Master Pos Flags \n"); + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %02x %02x %02x %02x %02x " + "%02x %02x %02x %02x %02x %02x %02x %02x " + "%02x %02x %02x %02x %02x %02x\n", + (unsigned long) hexdump_from, + ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], + ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], + ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); + my_b_write(file, (byte*) emit_buf, bytes_written); ptr += LOG_EVENT_MINIMAL_HEADER_LEN; hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN; } @@ -1011,9 +1100,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) if (i % 16 == 15) { - fprintf(file, "# %8.8lx %-48.48s |%16s|\n", - (unsigned long) (hexdump_from + (i & 0xfffffff0)), - hex_string, char_string); + /* + my_b_printf() does not support full printf() formats, so we + have to do it this way. + + TODO: Rewrite my_b_printf() to support full printf() syntax. + */ + char emit_buf[256]; + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %-48.48s |%16s|\n", + (unsigned long) (hexdump_from + (i & 0xfffffff0)), + hex_string, char_string); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); + my_b_write(file, (byte*) emit_buf, bytes_written); hex_string[0]= 0; char_string[0]= 0; c= char_string; @@ -1025,28 +1126,52 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) /* Non-full last line */ if (hex_string[0]) - fprintf(file, "# %8.8lx %-48.48s |%s|\n# ", - (unsigned long) (hexdump_from + (i & 0xfffffff0)), - hex_string, char_string); + { + char emit_buf[256]; + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %-48.48s |%s|\n# ", + (unsigned long) (hexdump_from + (i & 0xfffffff0)), + hex_string, char_string); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); + my_b_write(file, (byte*) emit_buf, bytes_written); + } } + DBUG_VOID_RETURN; } -void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Log_event::print_base64(IO_CACHE* file, + PRINT_EVENT_INFO* print_event_info, + bool more) { - uchar *ptr= (uchar*)temp_buf; + const uchar *ptr= (const uchar *)temp_buf; my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET); - char *tmp_str= - (char *) my_malloc(base64_needed_encoded_length(size), MYF(MY_WME)); + DBUG_ENTER("Log_event::print_base64"); + + size_t const tmp_str_sz= base64_needed_encoded_length(size); + char *const tmp_str= (char *) my_malloc(tmp_str_sz, MYF(MY_WME)); if (!tmp_str) { fprintf(stderr, "\nError: Out of memory. " "Could not print correct binlog event.\n"); - return; + DBUG_VOID_RETURN; } - int res= base64_encode(ptr, size, tmp_str); - fprintf(file, "\nBINLOG '\n%s\n';\n", tmp_str); + + int const res= base64_encode(ptr, size, tmp_str); + DBUG_ASSERT(res == 0); + + if (my_b_tell(file) == 0) + my_b_printf(file, "\nBINLOG '\n"); + + my_b_printf(file, "%s\n", tmp_str); + + if (!more) + my_b_printf(file, "';\n"); + my_free(tmp_str, MYF(0)); + DBUG_VOID_RETURN; } @@ -1054,9 +1179,10 @@ void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) Log_event::print_timestamp() */ -void Log_event::print_timestamp(FILE* file, time_t* ts) +void Log_event::print_timestamp(IO_CACHE* file, time_t* ts) { struct tm *res; + DBUG_ENTER("Log_event::print_timestamp"); if (!ts) ts = &when; #ifdef MYSQL_SERVER // This is always false @@ -1066,13 +1192,14 @@ void Log_event::print_timestamp(FILE* file, time_t* ts) res=localtime(ts); #endif - fprintf(file,"%02d%02d%02d %2d:%02d:%02d", - res->tm_year % 100, - res->tm_mon+1, - res->tm_mday, - res->tm_hour, - res->tm_min, - res->tm_sec); + my_b_printf(file,"%02d%02d%02d %2d:%02d:%02d", + res->tm_year % 100, + res->tm_mon+1, + res->tm_mday, + res->tm_hour, + res->tm_min, + res->tm_sec); + DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -1544,7 +1671,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, */ #ifdef MYSQL_CLIENT -void Query_log_event::print_query_header(FILE* file, +void Query_log_event::print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info) { // TODO: print the catalog ?? @@ -1554,9 +1681,10 @@ void Query_log_event::print_query_header(FILE* file, if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", - get_type_str(), (ulong) thread_id, (ulong) exec_time, error_code); + print_header(file, print_event_info, FALSE); + my_b_printf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", + get_type_str(), (ulong) thread_id, (ulong) exec_time, + error_code); } if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db) @@ -1564,15 +1692,15 @@ void Query_log_event::print_query_header(FILE* file, if (different_db= memcmp(print_event_info->db, db, db_len + 1)) memcpy(print_event_info->db, db, db_len + 1); if (db[0] && different_db) - fprintf(file, "use %s;\n", db); + my_b_printf(file, "use %s;\n", db); } end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); *end++=';'; *end++='\n'; - my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME)); + my_b_write(file, (byte*) buff, (uint) (end-buff)); if (flags & LOG_EVENT_THREAD_SPECIFIC_F) - fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); + my_b_printf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); /* If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to @@ -1594,14 +1722,14 @@ void Query_log_event::print_query_header(FILE* file, if (unlikely(tmp)) /* some bits have changed */ { bool need_comma= 0; - fprintf(file, "SET "); + my_b_printf(file, "SET "); print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2, "@@session.foreign_key_checks", &need_comma); print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2, "@@session.sql_auto_is_null", &need_comma); print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2, "@@session.unique_checks", &need_comma); - fprintf(file,";\n"); + my_b_printf(file,";\n"); print_event_info->flags2= flags2; } } @@ -1629,14 +1757,14 @@ void Query_log_event::print_query_header(FILE* file, } if (unlikely(print_event_info->sql_mode != sql_mode)) { - fprintf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); + my_b_printf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); print_event_info->sql_mode= sql_mode; } } if (print_event_info->auto_increment_increment != auto_increment_increment || print_event_info->auto_increment_offset != auto_increment_offset) { - fprintf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", + my_b_printf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", auto_increment_increment,auto_increment_offset); print_event_info->auto_increment_increment= auto_increment_increment; print_event_info->auto_increment_offset= auto_increment_offset; @@ -1656,16 +1784,17 @@ void Query_log_event::print_query_header(FILE* file, CHARSET_INFO *cs_info= get_charset(uint2korr(charset), MYF(MY_WME)); if (cs_info) { - fprintf(file, "/*!\\C %s */;\n", cs_info->csname); /* for mysql client */ + /* for mysql client */ + my_b_printf(file, "/*!\\C %s */;\n", cs_info->csname); } - fprintf(file,"SET " - "@@session.character_set_client=%d," - "@@session.collation_connection=%d," - "@@session.collation_server=%d" - ";\n", - uint2korr(charset), - uint2korr(charset+2), - uint2korr(charset+4)); + my_b_printf(file,"SET " + "@@session.character_set_client=%d," + "@@session.collation_connection=%d," + "@@session.collation_server=%d" + ";\n", + uint2korr(charset), + uint2korr(charset+2), + uint2korr(charset+4)); memcpy(print_event_info->charset, charset, 6); } } @@ -1673,7 +1802,7 @@ void Query_log_event::print_query_header(FILE* file, { if (bcmp(print_event_info->time_zone_str, time_zone_str, time_zone_len+1)) { - fprintf(file,"SET @@session.time_zone='%s';\n", time_zone_str); + my_b_printf(file,"SET @@session.time_zone='%s';\n", time_zone_str); memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1); } } @@ -1682,9 +1811,11 @@ void Query_log_event::print_query_header(FILE* file, void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { - print_query_header(file, print_event_info); - my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); - fputs(";\n", file); + Write_on_release_cache cache(&print_event_info->head_cache, file); + + print_query_header(&cache, print_event_info); + my_b_write(&cache, (byte*) query, q_len); + my_b_printf(&cache, ";\n"); } #endif /* MYSQL_CLIENT */ @@ -2014,18 +2145,23 @@ void Start_log_event_v3::pack_info(Protocol *protocol) #ifdef MYSQL_CLIENT void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + DBUG_ENTER("Start_log_event_v3::print"); + + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, - server_version); - print_timestamp(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tStart: binlog v %d, server v %s created ", + binlog_version, server_version); + print_timestamp(&cache); if (created) - fprintf(file," at startup"); - fputc('\n', file); + my_b_printf(&cache," at startup"); + my_b_printf(&cache, "\n"); if (flags & LOG_EVENT_BINLOG_IN_USE_F) - fprintf(file, "# Warning: this binlog was not closed properly. " - "Most probably mysqld crashed writing it.\n"); + my_b_printf(&cache, "# Warning: this binlog was not closed properly. " + "Most probably mysqld crashed writing it.\n"); } if (!artificial_event && created) { @@ -2036,12 +2172,12 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) and rollback unfinished transaction. Probably this can be done with RESET CONNECTION (syntax to be defined). */ - fprintf(file,"RESET CONNECTION;\n"); + my_b_printf(&cache,"RESET CONNECTION;\n"); #else - fprintf(file,"ROLLBACK;\n"); + my_b_printf(&cache,"ROLLBACK;\n"); #endif } - fflush(file); + DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -2381,19 +2517,19 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli) if (server_id == (uint32) ::server_id) { /* - Do not modify rli->group_master_log_pos, as this event did not exist on - the master. That is, just update the *relay log* coordinates; this is - done by passing log_pos=0 to inc_group_relay_log_pos, like we do in - Stop_log_event::exec_event(). - If in a transaction, don't touch group_* coordinates. - */ - if (thd->options & OPTION_BEGIN) - rli->inc_event_relay_log_pos(); - else - { - rli->inc_group_relay_log_pos(0); - flush_relay_log_info(rli); - } + We only increase the relay log position if we are skipping + events and do not touch any group_* variables, nor flush the + relay log info. If there is a crash, we will have to re-skip + the events again, but that is a minor issue. + + If we do not skip stepping the group log position (and the + server id was changed when restarting the server), it might well + be that we start executing at a position that is invalid, e.g., + at a Rows_log_event or a Query_log_event preceeded by a + Intvar_log_event instead of starting at a Table_map_log_event or + the Intvar_log_event respectively. + */ + rli->inc_event_relay_log_pos(); DBUG_RETURN(0); } @@ -2765,15 +2901,17 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) } -void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, +void Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info, bool commented) { + Write_on_release_cache cache(&print_event_info->head_cache, file_arg); + DBUG_ENTER("Load_log_event::print"); if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n", - thread_id, exec_time); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tQuery\tthread_id=%ld\texec_time=%ld\n", + thread_id, exec_time); } bool different_db= 1; @@ -2791,65 +2929,65 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, } if (db && db[0] && different_db) - fprintf(file, "%suse %s;\n", + my_b_printf(&cache, "%suse %s;\n", commented ? "# " : "", db); if (flags & LOG_EVENT_THREAD_SPECIFIC_F) - fprintf(file,"%sSET @@session.pseudo_thread_id=%lu;\n", + my_b_printf(&cache,"%sSET @@session.pseudo_thread_id=%lu;\n", commented ? "# " : "", (ulong)thread_id); - fprintf(file, "%sLOAD DATA ", + my_b_printf(&cache, "%sLOAD DATA ", commented ? "# " : ""); if (check_fname_outside_temp_buf()) - fprintf(file, "LOCAL "); - fprintf(file, "INFILE '%-*s' ", fname_len, fname); + my_b_printf(&cache, "LOCAL "); + my_b_printf(&cache, "INFILE '%-*s' ", fname_len, fname); if (sql_ex.opt_flags & REPLACE_FLAG) - fprintf(file," REPLACE "); + my_b_printf(&cache," REPLACE "); else if (sql_ex.opt_flags & IGNORE_FLAG) - fprintf(file," IGNORE "); + my_b_printf(&cache," IGNORE "); - fprintf(file, "INTO TABLE `%s`", table_name); - fprintf(file, " FIELDS TERMINATED BY "); - pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); + my_b_printf(&cache, "INTO TABLE `%s`", table_name); + my_b_printf(&cache, " FIELDS TERMINATED BY "); + pretty_print_str(&cache, sql_ex.field_term, sql_ex.field_term_len); if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) - fprintf(file," OPTIONALLY "); - fprintf(file, " ENCLOSED BY "); - pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); + my_b_printf(&cache," OPTIONALLY "); + my_b_printf(&cache, " ENCLOSED BY "); + pretty_print_str(&cache, sql_ex.enclosed, sql_ex.enclosed_len); - fprintf(file, " ESCAPED BY "); - pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len); + my_b_printf(&cache, " ESCAPED BY "); + pretty_print_str(&cache, sql_ex.escaped, sql_ex.escaped_len); - fprintf(file," LINES TERMINATED BY "); - pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len); + my_b_printf(&cache," LINES TERMINATED BY "); + pretty_print_str(&cache, sql_ex.line_term, sql_ex.line_term_len); if (sql_ex.line_start) { - fprintf(file," STARTING BY "); - pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); + my_b_printf(&cache," STARTING BY "); + pretty_print_str(&cache, sql_ex.line_start, sql_ex.line_start_len); } if ((long) skip_lines > 0) - fprintf(file, " IGNORE %ld LINES", (long) skip_lines); + my_b_printf(&cache, " IGNORE %ld LINES", (long) skip_lines); if (num_fields) { uint i; const char* field = fields; - fprintf(file, " ("); + my_b_printf(&cache, " ("); for (i = 0; i < num_fields; i++) { if (i) - fputc(',', file); - fprintf(file, field); + my_b_printf(&cache, ","); + my_b_printf(&cache, field); field += field_lens[i] + 1; } - fputc(')', file); + my_b_printf(&cache, ")"); } - fprintf(file, ";\n"); + my_b_printf(&cache, ";\n"); DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -3185,17 +3323,16 @@ void Rotate_log_event::pack_info(Protocol *protocol) void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { char buf[22]; + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); if (print_event_info->short_form) return; - print_header(file, print_event_info); - fprintf(file, "\tRotate to "); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tRotate to "); if (new_log_ident) - my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, - MYF(MY_NABP | MY_WME)); - fprintf(file, " pos: %s", llstr(pos, buf)); - fputc('\n', file); - fflush(file); + my_b_write(&cache, (byte*) new_log_ident, (uint)ident_len); + my_b_printf(&cache, " pos: %s\n", llstr(pos, buf)); } #endif /* MYSQL_CLIENT */ @@ -3407,14 +3544,16 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) char llbuff[22]; const char *msg; LINT_INIT(msg); + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tIntvar\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tIntvar\n"); } - fprintf(file, "SET "); + my_b_printf(&cache, "SET "); switch (type) { case LAST_INSERT_ID_EVENT: msg="LAST_INSERT_ID"; @@ -3427,8 +3566,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) msg="INVALID_INT"; break; } - fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); - fflush(file); + my_b_printf(&cache, "%s=%s;\n", msg, llstr(val,llbuff)); } #endif @@ -3497,15 +3635,17 @@ bool Rand_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char llbuff[22],llbuff2[22]; if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tRand\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tRand\n"); } - fprintf(file, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", - llstr(seed1, llbuff),llstr(seed2, llbuff2)); - fflush(file); + my_b_printf(&cache, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", + llstr(seed1, llbuff),llstr(seed2, llbuff2)); } #endif /* MYSQL_CLIENT */ @@ -3567,16 +3707,18 @@ bool Xid_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { char buf[64]; longlong10_to_str(xid, buf, 10); - print_header(file, print_event_info); - fprintf(file, "\tXid = %s\n", buf); - fflush(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tXid = %s\n", buf); } - fprintf(file, "COMMIT;\n"); + my_b_printf(&cache, "COMMIT;\n"); } #endif /* MYSQL_CLIENT */ @@ -3766,19 +3908,22 @@ bool User_var_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tUser_var\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tUser_var\n"); } - fprintf(file, "SET @`"); - my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME)); - fprintf(file, "`"); + my_b_printf(&cache, "SET @`"); + my_b_write(&cache, (byte*) name, (uint) (name_len)); + my_b_printf(&cache, "`"); if (is_null) { - fprintf(file, ":=NULL;\n"); + my_b_printf(&cache, ":=NULL;\n"); } else { @@ -3786,12 +3931,12 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) case REAL_RESULT: double real_val; float8get(real_val, val); - fprintf(file, ":=%.14g;\n", real_val); + my_b_printf(&cache, ":=%.14g;\n", real_val); break; case INT_RESULT: char int_buf[22]; longlong10_to_str(uint8korr(val), int_buf, -10); - fprintf(file, ":=%s;\n", int_buf); + my_b_printf(&cache, ":=%s;\n", int_buf); break; case DECIMAL_RESULT: { @@ -3807,7 +3952,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) bin2decimal(val+2, &dec, precision, scale); decimal2string(&dec, str_buf, &str_len, 0, 0, 0); str_buf[str_len]= 0; - fprintf(file, ":=%s;\n",str_buf); + my_b_printf(&cache, ":=%s;\n",str_buf); break; } case STRING_RESULT: @@ -3843,9 +3988,9 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) Generate an unusable command (=> syntax error) is probably the best thing we can do here. */ - fprintf(file, ":=???;\n"); + my_b_printf(&cache, ":=???;\n"); else - fprintf(file, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); + my_b_printf(&cache, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); my_afree(hex_str); } break; @@ -3855,7 +4000,6 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) return; } } - fflush(file); } #endif @@ -3939,13 +4083,14 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) #ifdef HAVE_REPLICATION #ifdef MYSQL_CLIENT -void Unknown_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file_arg); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "# %s", "Unknown event\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n# %s", "Unknown event\n"); } #endif @@ -4012,12 +4157,13 @@ Slave_log_event::~Slave_log_event() #ifdef MYSQL_CLIENT void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + char llbuff[22]; if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "\ + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n\ Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n", master_host, master_port, master_log, llstr(master_pos, llbuff)); } @@ -4097,12 +4243,14 @@ int Slave_log_event::exec_event(struct st_relay_log_info* rli) #ifdef MYSQL_CLIENT void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fprintf(file, "\tStop\n"); - fflush(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tStop\n"); } #endif /* MYSQL_CLIENT */ @@ -4277,6 +4425,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, bool enable_local) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) { if (enable_local && check_fname_outside_temp_buf()) @@ -4292,10 +4442,10 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info That one is for "file_id: etc" below: in mysqlbinlog we want the #, in SHOW BINLOG EVENTS we don't. */ - fprintf(file, "#"); + my_b_printf(&cache, "#"); } - fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len); + my_b_printf(&cache, " file_id: %d block_len: %d\n", file_id, block_len); } @@ -4465,12 +4615,13 @@ bool Append_block_log_event::write(IO_CACHE* file) void Append_block_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#%s: file_id: %d block_len: %d\n", - get_type_str(), file_id, block_len); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#%s: file_id: %d block_len: %d\n", + get_type_str(), file_id, block_len); } #endif /* MYSQL_CLIENT */ @@ -4608,11 +4759,12 @@ bool Delete_file_log_event::write(IO_CACHE* file) void Delete_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#Delete_file: file_id=%u\n", file_id); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#Delete_file: file_id=%u\n", file_id); } #endif /* MYSQL_CLIENT */ @@ -4703,12 +4855,13 @@ bool Execute_load_log_event::write(IO_CACHE* file) void Execute_load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#Exec_load: file_id=%d\n", - file_id); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#Exec_load: file_id=%d\n", + file_id); } #endif @@ -4925,29 +5078,30 @@ void Execute_load_query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, const char *local_fname) { - print_query_header(file, print_event_info); + Write_on_release_cache cache(&print_event_info->head_cache, file); + + print_query_header(&cache, print_event_info); if (local_fname) { - my_fwrite(file, (byte*) query, fn_pos_start, MYF(MY_NABP | MY_WME)); - fprintf(file, " LOCAL INFILE \'"); - fprintf(file, local_fname); - fprintf(file, "\'"); + my_b_write(&cache, (byte*) query, fn_pos_start); + my_b_printf(&cache, " LOCAL INFILE \'"); + my_b_printf(&cache, local_fname); + my_b_printf(&cache, "\'"); if (dup_handling == LOAD_DUP_REPLACE) - fprintf(file, " REPLACE"); - fprintf(file, " INTO"); - my_fwrite(file, (byte*) query + fn_pos_end, q_len-fn_pos_end, - MYF(MY_NABP | MY_WME)); - fprintf(file, ";\n"); + my_b_printf(&cache, " REPLACE"); + my_b_printf(&cache, " INTO"); + my_b_write(&cache, (byte*) query + fn_pos_end, q_len-fn_pos_end); + my_b_printf(&cache, ";\n"); } else { - my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); - fprintf(file, ";\n"); + my_b_write(&cache, (byte*) query, q_len); + my_b_printf(&cache, ";\n"); } if (!print_event_info->short_form) - fprintf(file, "# file_id: %d \n", file_id); + my_b_printf(&cache, "# file_id: %d \n", file_id); } #endif @@ -5797,6 +5951,31 @@ void Rows_log_event::pack_info(Protocol *protocol) } #endif +#ifdef MYSQL_CLIENT +void Rows_log_event::print_helper(FILE *file, + PRINT_EVENT_INFO *print_event_info, + char const *const name) +{ + IO_CACHE *const head= &print_event_info->head_cache; + IO_CACHE *const body= &print_event_info->body_cache; + if (!print_event_info->short_form) + { + bool const last_stmt_event= get_flags(STMT_END_F); + print_header(head, print_event_info, !last_stmt_event); + my_b_printf(head, "\t%s: table id %lu", name, m_table_id); + print_base64(body, print_event_info, !last_stmt_event); + } + + if (get_flags(STMT_END_F)) + { + my_b_copy_to_file(head, file); + my_b_copy_to_file(body, file); + reinit_io_cache(head, WRITE_CACHE, 0, FALSE, TRUE); + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + } +} +#endif + /************************************************************************** Table_map_log_event member functions and support functions **************************************************************************/ @@ -6148,10 +6327,11 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) { if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tTable_map: `%s`.`%s` mapped to number %lu\n", - m_dbnam, m_tblnam, m_table_id); - print_base64(file, print_event_info); + print_header(&print_event_info->head_cache, print_event_info, TRUE); + my_b_printf(&print_event_info->head_cache, + "\tTable_map: `%s`.`%s` mapped to number %lu\n", + m_dbnam, m_tblnam, m_table_id); + print_base64(&print_event_info->body_cache, print_event_info, TRUE); } } #endif @@ -6513,12 +6693,7 @@ int Write_rows_log_event::do_exec_row(TABLE *table) #ifdef MYSQL_CLIENT void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tWrite_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Write_rows"); } #endif @@ -6846,12 +7021,7 @@ int Delete_rows_log_event::do_exec_row(TABLE *table) void Delete_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tDelete_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Delete_rows"); } #endif @@ -7017,12 +7187,7 @@ int Update_rows_log_event::do_exec_row(TABLE *table) void Update_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tUpdate_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Update_rows"); } #endif diff --git a/sql/log_event.h b/sql/log_event.h index 801a83dcc97..81ce2f18b4d 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -519,14 +519,30 @@ typedef struct st_print_event_info bzero(db, sizeof(db)); bzero(charset, sizeof(charset)); bzero(time_zone_str, sizeof(time_zone_str)); + uint const flags = MYF(MY_WME | MY_NABP); + init_io_cache(&head_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags); + init_io_cache(&body_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags); } + ~st_print_event_info() { + end_io_cache(&head_cache); + end_io_cache(&body_cache); + } + + /* Settings on how to print the events */ bool short_form; bool base64_output; my_off_t hexdump_from; uint8 common_header_len; + /* + These two caches are used by the row-based replication events to + collect the header information and the main body of the events + making up a statement. + */ + IO_CACHE head_cache; + IO_CACHE body_cache; } PRINT_EVENT_INFO; #endif @@ -637,9 +653,11 @@ public: const Format_description_log_event *description_event); /* print*() functions are used by mysqlbinlog */ virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; - void print_timestamp(FILE* file, time_t *ts = 0); - void print_header(FILE* file, PRINT_EVENT_INFO* print_event_info); - void print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info); + void print_timestamp(IO_CACHE* file, time_t *ts = 0); + void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, + bool is_more); + void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, + bool is_more); #endif static void *operator new(size_t size) @@ -804,7 +822,7 @@ public: uint32 q_len_arg); #endif /* HAVE_REPLICATION */ #else - void print_query_header(FILE* file, PRINT_EVENT_INFO* print_event_info); + void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info); void print(FILE* file, PRINT_EVENT_INFO* print_event_info); #endif @@ -1864,6 +1882,10 @@ protected: Log_event_type event_type, const Format_description_log_event *description_event); +#ifdef MYSQL_CLIENT + void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name); +#endif + #ifndef MYSQL_CLIENT virtual int do_add_row_data(byte *data, my_size_t length); #endif diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index a34e8c152cf..0bcd15c2b82 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -5968,6 +5968,9 @@ ER_CANT_ACTIVATE_LOG ger "Kann Logdatei '%-.64s' nicht aktivieren" ER_RBR_NOT_AVAILABLE eng "The server was not built with row-based replication" +ER_BASE64_DECODE_ERROR + eng "Decoding of base64 string failed" + swe "Avkodning av base64 sträng misslyckades" ger "Der Server hat keine zeilenbasierte Replikation" ER_NO_TRIGGERS_ON_SYSTEM_SCHEMA eng "Triggers can not be created on system tables" diff --git a/sql/slave.cc b/sql/slave.cc index 5d871a64347..1852ff68070 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3101,17 +3101,22 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT)) { DBUG_PRINT("info", ("event skipped")); - if (thd->options & OPTION_BEGIN) - rli->inc_event_relay_log_pos(); - else - { - rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT || - type_code == STOP_EVENT || - type_code == FORMAT_DESCRIPTION_EVENT) ? - LL(0) : ev->log_pos, - 1/* skip lock*/); - flush_relay_log_info(rli); - } + /* + We only skip the event here and do not increase the group log + position. In the event that we have to restart, this means + that we might have to skip the event again, but that is a + minor issue. + + If we were to increase the group log position when skipping an + event, it might be that we are restarting at the wrong + position and have events before that we should have executed, + so not increasing the group log position is a sure bet in this + case. + + In this way, we just step the group log position when we + *know* that we are at the end of a group. + */ + rli->inc_event_relay_log_pos(); /* Protect against common user error of setting the counter to 1 diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 0939ad66cd0..23ca5330053 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -31,6 +31,7 @@ void mysql_client_binlog_statement(THD* thd) { + DBUG_ENTER("mysql_client_binlog_statement"); DBUG_PRINT("info",("binlog base64: '%*s'", (thd->lex->comment.length < 2048 ? thd->lex->comment.length : 2048), @@ -43,8 +44,8 @@ void mysql_client_binlog_statement(THD* thd) my_bool nsok= thd->net.no_send_ok; thd->net.no_send_ok= TRUE; - const my_size_t coded_len= thd->lex->comment.length + 1; - const my_size_t event_len= base64_needed_decoded_length(coded_len); + my_size_t coded_len= thd->lex->comment.length + 1; + my_size_t decoded_len= base64_needed_decoded_length(coded_len); DBUG_ASSERT(coded_len > 0); /* @@ -57,9 +58,8 @@ void mysql_client_binlog_statement(THD* thd) new Format_description_log_event(4); const char *error= 0; - char *buf= (char *) my_malloc(event_len, MYF(MY_WME)); + char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; - int res; /* Out of memory check @@ -73,43 +73,97 @@ void mysql_client_binlog_statement(THD* thd) thd->rli_fake->sql_thd= thd; thd->rli_fake->no_storage= TRUE; - res= base64_decode(thd->lex->comment.str, coded_len, buf); - - DBUG_PRINT("info",("binlog base64 decoded_len=%d, event_len=%d\n", - res, uint4korr(buf + EVENT_LEN_OFFSET))); - /* - Note that 'res' is the correct event length, 'event_len' was - calculated based on the base64-string that possibly contained - extra spaces, so it can be longer than the real event. - */ - if (res < EVENT_LEN_OFFSET - || (uint) res != uint4korr(buf+EVENT_LEN_OFFSET)) + for (char const *strptr= thd->lex->comment.str ; + strptr < thd->lex->comment.str + thd->lex->comment.length ; ) { - my_error(ER_SYNTAX_ERROR, MYF(0)); - goto end; - } + char const *endptr= 0; + int bytes_decoded= base64_decode(strptr, coded_len, buf, &endptr); - ev= Log_event::read_log_event(buf, res, &error, desc); + DBUG_PRINT("info", + ("bytes_decoded=%d; strptr=0x%lu; endptr=0x%lu ('%c':%d)", + bytes_decoded, strptr, endptr, *endptr, *endptr)); + + if (bytes_decoded < 0) + { + my_error(ER_BASE64_DECODE_ERROR, MYF(0)); + goto end; + } + else if (bytes_decoded == 0) + break; // If no bytes where read, the string contained only whitespace + + DBUG_ASSERT(bytes_decoded > 0); + DBUG_ASSERT(endptr > strptr); + coded_len-= endptr - strptr; + strptr= endptr; - DBUG_PRINT("info",("binlog base64 err=%s", error)); - if (!ev) - { /* - This could actually be an out-of-memory, but it is more - likely causes by a bad statement + Now we have one or more events stored in the buffer. The size of + the buffer is computed based on how much base64-encoded data + there were, so there should be ample space for the data (maybe + even too much, since a statement can consist of a considerable + number of events). + + TODO: Switch to use a stream-based base64 encoder/decoder in + order to be able to read exactly what is necessary. */ - my_error(ER_SYNTAX_ERROR, MYF(0)); - goto end; - } - DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); - DBUG_PRINT("info",("buf+EVENT_TYPE_OFFSET=%d", buf+EVENT_TYPE_OFFSET)); + DBUG_PRINT("info",("binlog base64 decoded_len=%d, bytes_decoded=%d", + decoded_len, bytes_decoded)); - ev->thd= thd; - if (ev->exec_event(thd->rli_fake)) - { - my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement"); - goto end; + /* + Now we start to read events of the buffer, until there are no + more. + */ + for (char *bufptr= buf ; bytes_decoded > 0 ; ) + { + /* + Checking that the first event in the buffer is not truncated. + */ + ulong event_len= uint4korr(bufptr + EVENT_LEN_OFFSET); + DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d", + event_len, bytes_decoded)); + if (bytes_decoded < EVENT_LEN_OFFSET || (uint) bytes_decoded < event_len) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + ev= Log_event::read_log_event(bufptr, event_len, &error, desc); + + DBUG_PRINT("info",("binlog base64 err=%s", error)); + if (!ev) + { + /* + This could actually be an out-of-memory, but it is more likely + causes by a bad statement + */ + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + bytes_decoded -= event_len; + bufptr += event_len; + + DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); + DBUG_PRINT("info",("bufptr+EVENT_TYPE_OFFSET=0x%lx", + bufptr+EVENT_TYPE_OFFSET)); + DBUG_PRINT("info", ("bytes_decoded=%d; bufptr=0x%lx; buf[EVENT_LEN_OFFSET]=%u", + bytes_decoded, bufptr, uint4korr(bufptr+EVENT_LEN_OFFSET))); + ev->thd= thd; + if (int err= ev->exec_event(thd->rli_fake)) + { + DBUG_PRINT("info", ("exec_event() - error=%d", error)); + /* + TODO: Maybe a better error message since the BINLOG statement + now contains several events. + */ + my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement"); + goto end; + } + + delete ev; + ev= 0; + } } /* @@ -126,10 +180,7 @@ end: */ thd->net.no_send_ok= nsok; - if (ev) - delete ev; - if (desc) - delete desc; - if (buf) - my_free(buf, MYF(0)); + delete desc; + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_VOID_RETURN; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 6b46c9676f7..95283ec2fc8 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -930,6 +930,7 @@ public: /* Public interface to write RBR events to the binlog */ + void binlog_start_trans_and_stmt(); int binlog_write_table_map(TABLE *table, bool is_transactional); int binlog_write_row(TABLE* table, bool is_transactional, MY_BITMAP const* cols, my_size_t colcnt, diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index c765501f669..4dcbf9af4a0 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1266,7 +1266,7 @@ err: if (thd->lex->current_select) thd->lex->current_select->no_error= 0; // Give error table->file->print_error(error,MYF(0)); - + before_trg_err: table->file->restore_auto_increment(prev_insert_id); if (key) @@ -2033,6 +2033,10 @@ err: rolled back. We only need to roll back a potential statement transaction, since real transactions are rolled back in close_thread_tables(). + + TODO: This is not true any more, table maps are generated on the + first call to ha_*_row() instead. Remove code that are used to + cover for the case outlined above. */ ha_rollback_stmt(thd); @@ -2402,6 +2406,7 @@ select_insert::prepare(List &values, SELECT_LEX_UNIT *u) DBUG_ENTER("select_insert::prepare"); unit= u; + /* Since table in which we are going to insert is added to the first select, LEX::current_select should point to the first select while @@ -2631,58 +2636,56 @@ void select_insert::send_error(uint errcode,const char *err) if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) my_message(errcode, err, MYF(0)); - if (!table) + /* + If the creation of the table failed (due to a syntax error, for + example), no table will have been opened and therefore 'table' + will be NULL. In that case, we still need to execute the rollback + and the end of the function to truncate the binary log, but we can + skip all the intermediate steps. + */ + if (table) { /* - This can only happen when using CREATE ... SELECT and the table was not - created becasue of an syntax error + If we are not in prelocked mode, we end the bulk insert started + before. */ - DBUG_VOID_RETURN; - } - if (!thd->prelocked_mode) - table->file->ha_end_bulk_insert(); - /* - If at least one row has been inserted/modified and will stay in the table - (the table doesn't have transactions) we must write to the binlog (and - the error code will make the slave stop). + if (!thd->prelocked_mode) + table->file->ha_end_bulk_insert(); - For many errors (example: we got a duplicate key error while - inserting into a MyISAM table), no row will be added to the table, - so passing the error to the slave will not help since there will - be an error code mismatch (the inserts will succeed on the slave - with no error). + /* + If at least one row has been inserted/modified and will stay in + the table (the table doesn't have transactions) we must write to + the binlog (and the error code will make the slave stop). - If we are using row-based replication we have two cases where this - code is executed: replication of CREATE-SELECT and replication of - INSERT-SELECT. + For many errors (example: we got a duplicate key error while + inserting into a MyISAM table), no row will be added to the table, + so passing the error to the slave will not help since there will + be an error code mismatch (the inserts will succeed on the slave + with no error). - When replicating a CREATE-SELECT statement, we shall not write the - events to the binary log and should thus not set - OPTION_STATUS_NO_TRANS_UPDATE. - - When replicating INSERT-SELECT, we shall not write the events to - the binary log for transactional table, but shall write all events - if there is one or more writes to non-transactional tables. In - this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a - write to a non-transactional table, otherwise it is cleared. - */ - if (info.copied || info.deleted || info.updated) - { - if (!table->file->has_transactions()) + If table creation failed, the number of rows modified will also be + zero, so no check for that is made. + */ + if (info.copied || info.deleted || info.updated) { - if (mysql_bin_log.is_open()) + DBUG_ASSERT(table != NULL); + if (!table->file->has_transactions()) { - thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + if (mysql_bin_log.is_open()) + { + thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, + table->file->has_transactions(), FALSE); + } + if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && + !can_rollback_data()) + thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; + query_cache_invalidate3(thd, table, 1); } - if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && - !can_rollback_data()) - thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; - query_cache_invalidate3(thd, table, 1); } + table->file->ha_release_auto_increment(); } + ha_rollback_stmt(thd); - table->file->ha_release_auto_increment(); DBUG_VOID_RETURN; } @@ -2690,8 +2693,11 @@ void select_insert::send_error(uint errcode,const char *err) bool select_insert::send_eof() { int error,error2; + bool const trans_table= table->file->has_transactions(); ulonglong id; DBUG_ENTER("select_insert::send_eof"); + DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", + trans_table, table->file->table_type())); error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); @@ -2711,9 +2717,8 @@ bool select_insert::send_eof() are not logged in RBR) - We are using statement based replication */ - if (!table->file->has_transactions() && - (!table->s->tmp_table || - !thd->current_stmt_binlog_row_based)) + if (!trans_table && + (!table->s->tmp_table || !thd->current_stmt_binlog_row_based)) thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; } @@ -2729,11 +2734,22 @@ bool select_insert::send_eof() thd->clear_error(); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + trans_table, FALSE); + } + /* + We will call ha_autocommit_or_rollback() also for + non-transactional tables under row-based replication: there might + be events in the binary logs transaction, and we need to write + them to the binary log. + */ + if (trans_table || thd->current_stmt_binlog_row_based) + { + int const error2= ha_autocommit_or_rollback(thd, error); + if (error2 && !error) + error=error2; } - if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) - error=error2; table->file->ha_release_auto_increment(); + if (error) { table->file->print_error(error,MYF(0)); @@ -2930,14 +2946,19 @@ select_create::prepare(List &values, SELECT_LEX_UNIT *u) class MY_HOOKS : public TABLEOP_HOOKS { public: MY_HOOKS(select_create *x) : ptr(x) { } - virtual void do_prelock(TABLE **tables, uint count) - { - if (ptr->get_thd()->current_stmt_binlog_row_based && - !(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE)) - ptr->binlog_show_create_table(tables, count); - } private: + virtual void do_prelock(TABLE **tables, uint count) + { + TABLE const *const table = *tables; + if (ptr->get_thd()->current_stmt_binlog_row_based && + table->s->tmp_table == NO_TMP_TABLE && + !ptr->get_create_info()->table_existed) + { + ptr->binlog_show_create_table(tables, count); + } + } + select_create *ptr; }; @@ -2946,6 +2967,20 @@ select_create::prepare(List &values, SELECT_LEX_UNIT *u) #endif unit= u; + +#ifdef HAVE_ROW_BASED_REPLICATION + /* + Start a statement transaction before the create if we are creating + a non-temporary table and are using row-based replication for the + statement. + */ + if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 && + thd->current_stmt_binlog_row_based) + { + thd->binlog_start_trans_and_stmt(); + } +#endif + if (!(table= create_table_from_items(thd, create_info, create_table, extra_fields, keys, &values, &thd->extra_lock, hook_ptr))) @@ -3093,8 +3128,17 @@ void select_create::abort() table->s->version= 0; hash_delete(&open_cache,(byte*) table); if (!create_info->table_existed) + { quick_rm_table(table_type, create_table->db, create_table->table_name, 0); + /* + We roll back the statement, including truncating the + transaction cache of the binary log, if the statement + failed. + */ + if (thd->current_stmt_binlog_row_based) + ha_rollback_stmt(thd); + } /* Tell threads waiting for refresh that something has happened */ if (version != refresh_version) broadcast_refresh(); diff --git a/storage/ndb/src/mgmapi/mgmapi.cpp b/storage/ndb/src/mgmapi/mgmapi.cpp index df71d71a8c1..7c5fafd2286 100644 --- a/storage/ndb/src/mgmapi/mgmapi.cpp +++ b/storage/ndb/src/mgmapi/mgmapi.cpp @@ -2013,7 +2013,7 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) { break; void *tmp_data = malloc(base64_needed_decoded_length((size_t) (len - 1))); - const int res = base64_decode(buf64, len-1, tmp_data); + const int res = base64_decode(buf64, len-1, tmp_data, NULL); delete[] buf64; UtilBuffer tmp; tmp.append((void *) tmp_data, res); diff --git a/unittest/mysys/base64-t.c b/unittest/mysys/base64-t.c index 6d85964b20d..ccec0c77086 100644 --- a/unittest/mysys/base64-t.c +++ b/unittest/mysys/base64-t.c @@ -54,7 +54,7 @@ main(void) /* Decode */ dst= (char *) malloc(base64_needed_decoded_length(strlen(str))); - dst_len= base64_decode(str, strlen(str), dst); + dst_len= base64_decode(str, strlen(str), dst, NULL); ok(dst_len == src_len, "Comparing lengths"); cmp= memcmp(src, dst, src_len);