From 7b85fb5c13f4a2a56d1bbc27f47afb5b70fa5d0d Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Mon, 2 Oct 2006 15:05:05 +0200 Subject: [PATCH 01/10] BUG#19459 (BINLOG RBR command does not lock tables correctly causing crash for, e.g., NDB): Submitting patch to base64_decode() adding extra parameter. --- include/base64.h | 3 +- mysys/base64.c | 63 ++++++++++++++++++++++--------- sql/share/errmsg.txt | 3 ++ storage/ndb/src/mgmapi/mgmapi.cpp | 2 +- unittest/mysys/base64-t.c | 2 +- 5 files changed, 53 insertions(+), 20 deletions(-) diff --git a/include/base64.h b/include/base64.h index 4653e824a9a..a6bffebfe07 100644 --- a/include/base64.h +++ b/include/base64.h @@ -39,7 +39,8 @@ int base64_encode(const void *src, size_t src_len, char *dst); /* Decode a base64 string into data */ -int base64_decode(const char *src, size_t src_len, void *dst); +int base64_decode(const char *src, size_t src_len, + void *dst, const char **end_ptr); #ifdef __cplusplus diff --git a/mysys/base64.c b/mysys/base64.c index 610797dd2ce..6c338b83868 100644 --- a/mysys/base64.c +++ b/mysys/base64.c @@ -125,44 +125,69 @@ pos(unsigned char c) /* Decode a base64 string - Note: We require that dst is pre-allocated to correct size. - See base64_needed_decoded_length(). + SYNOPSIS + base64_decode() + src Pointer to base64-encoded string + len Length of string at 'src' + dst Pointer to location where decoded data will be stored + end_ptr Pointer to variable that will refer to the character + after the end of the encoded data that were decoded. Can + be NULL. - RETURN Number of bytes produced in dst or -1 in case of failure + DESCRIPTION + + The base64-encoded data in the range ['src','*end_ptr') will be + decoded and stored starting at 'dst'. The decoding will stop + after 'len' characters have been read from 'src', or when padding + occurs in the base64-encoded data. In either case: if 'end_ptr' is + non-null, '*end_ptr' will be set to point to the character after + the last read character, even in the presence of error. + + NOTE + We require that 'dst' is pre-allocated to correct size. + + SEE ALSO + base64_needed_decoded_length(). + + RETURN VALUE + Number of bytes written at 'dst' or -1 in case of failure */ int -base64_decode(const char *src, size_t size, void *dst) +base64_decode(const char *const src_base, size_t const len, + void *dst, const char **end_ptr) { char b[3]; size_t i= 0; char *dst_base= (char *)dst; + char const *src= src_base; char *d= dst_base; size_t j; - while (i < size) + while (i < len) { unsigned c= 0; size_t mark= 0; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); c += pos(*src++); c <<= 6; i++; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); c += pos(*src++); c <<= 6; i++; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); - if (* src != '=') + if (*src != '=') c += pos(*src++); else { - i= size; + src += 2; /* There should be two bytes padding */ + i= len; mark= 2; c <<= 6; goto end; @@ -170,13 +195,14 @@ base64_decode(const char *src, size_t size, void *dst) c <<= 6; i++; - SKIP_SPACE(src, i, size); + SKIP_SPACE(src, i, len); if (*src != '=') c += pos(*src++); else { - i= size; + src += 1; /* There should be one byte padding */ + i= len; mark= 1; goto end; } @@ -191,11 +217,14 @@ base64_decode(const char *src, size_t size, void *dst) *d++= b[j]; } - if (i != size) - { - return -1; - } - return d - dst_base; + if (end_ptr != NULL) + *end_ptr= src; + + /* + The variable 'i' is set to 'len' when padding has been read, so it + does not actually reflect the number of bytes read from 'src'. + */ + return i != len ? -1 : d - dst_base; } diff --git a/sql/share/errmsg.txt b/sql/share/errmsg.txt index 01fcfd1adf9..b37459f5cbf 100644 --- a/sql/share/errmsg.txt +++ b/sql/share/errmsg.txt @@ -5837,3 +5837,6 @@ ER_PARTITION_MERGE_ERROR swe "%s kan inte användas i en partitionerad tabell" ER_RBR_NOT_AVAILABLE eng "The server was not built with row-based replication" +ER_BASE64_DECODE_ERROR + eng "Decoding of base64 string failed" + swe "Avkodning av base64 sträng misslyckades" diff --git a/storage/ndb/src/mgmapi/mgmapi.cpp b/storage/ndb/src/mgmapi/mgmapi.cpp index 6dfb48667aa..79ec99a1a10 100644 --- a/storage/ndb/src/mgmapi/mgmapi.cpp +++ b/storage/ndb/src/mgmapi/mgmapi.cpp @@ -1771,7 +1771,7 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) { break; void *tmp_data = malloc(base64_needed_decoded_length((size_t) (len - 1))); - const int res = base64_decode(buf64, len-1, tmp_data); + const int res = base64_decode(buf64, len-1, tmp_data, NULL); delete[] buf64; UtilBuffer tmp; tmp.append((void *) tmp_data, res); diff --git a/unittest/mysys/base64-t.c b/unittest/mysys/base64-t.c index 6d85964b20d..ccec0c77086 100644 --- a/unittest/mysys/base64-t.c +++ b/unittest/mysys/base64-t.c @@ -54,7 +54,7 @@ main(void) /* Decode */ dst= (char *) malloc(base64_needed_decoded_length(strlen(str))); - dst_len= base64_decode(str, strlen(str), dst); + dst_len= base64_decode(str, strlen(str), dst, NULL); ok(dst_len == src_len, "Comparing lengths"); cmp= memcmp(src, dst, src_len); From d1b9686245d9952ec386f0e35f4693a973dc5581 Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Mon, 2 Oct 2006 15:08:40 +0200 Subject: [PATCH 02/10] BUG#19459 (BINLOG RBR command does not lock tables correctly causing crash for, e.g., NDB): Adding new function my_b_copy_to_file() to copy an IO_CACHE to a file. --- include/my_sys.h | 1 + mysys/mf_iocache2.c | 47 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/include/my_sys.h b/include/my_sys.h index 4ea7cecf0a1..b2d7cf1fd37 100644 --- a/include/my_sys.h +++ b/include/my_sys.h @@ -517,6 +517,7 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); (uint) (*(info)->current_pos - (info)->request_pos)) /* tell write offset in the SEQ_APPEND cache */ +int my_b_copy_to_file(IO_CACHE *cache, FILE *file); my_off_t my_b_append_tell(IO_CACHE* info); my_off_t my_b_safe_tell(IO_CACHE* info); /* picks the correct tell() */ diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index f1ea21c2a47..d76b895aeb0 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -24,6 +24,53 @@ #include #include +/* + Copy contents of an IO_CACHE to a file. + + SYNOPSIS + copy_io_cache_to_file() + cache IO_CACHE to copy from + file File to copy to + + DESCRIPTION + Copy the contents of the cache to the file. The cache will be + re-inited to a read cache and will read from the beginning of the + cache. + + If a failure to write fully occurs, the cache is only copied + partially. + + TODO + Make this function solid by handling partial reads from the cache + in a correct manner: it should be atomic. + + RETURN VALUE + 0 All OK + 1 An error occured +*/ +int +my_b_copy_to_file(IO_CACHE *cache, FILE *file) +{ + byte buf[IO_SIZE]; + DBUG_ENTER("my_b_copy_to_file"); + + /* Reinit the cache to read from the beginning of the cache */ + if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) + DBUG_RETURN(1); + uint bytes_in_cache= my_b_bytes_in_cache(cache); + while (bytes_in_cache > 0) { + uint const read_bytes= min(bytes_in_cache, sizeof(buf)); + DBUG_PRINT("debug", ("Remaining %u bytes - Reading %u bytes", + bytes_in_cache, read_bytes)); + if (my_b_read(cache, buf, read_bytes)) + DBUG_RETURN(1); + if (my_fwrite(file, buf, read_bytes, MYF(MY_WME | MY_NABP)) == (uint) -1) + DBUG_RETURN(1); + bytes_in_cache -= read_bytes; + } + DBUG_RETURN(0); +} + my_off_t my_b_append_tell(IO_CACHE* info) { /* From a4e0b4e61f3711cc2d8ed1539e76c4ba62e886fb Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Thu, 5 Oct 2006 10:46:14 +0200 Subject: [PATCH 03/10] I had forgotten to delete an already disabled line of C++ code. --- BUG#20265 (Replication of CREATE-SELECT does not work correctly): Fixing bug by making binary log handle statement transactions. The binary log transaction cache can now be truncated to remove events inserted during this statement or transaction. Also, the binary log participate in XA transaction handling, although not as a full 2pc resource. --- .../r/binlog_row_mix_innodb_myisam.result | 18 - sql/log.cc | 539 +++++++++++++----- sql/sql_class.h | 1 + sql/sql_insert.cc | 150 +++-- 4 files changed, 502 insertions(+), 206 deletions(-) diff --git a/mysql-test/r/binlog_row_mix_innodb_myisam.result b/mysql-test/r/binlog_row_mix_innodb_myisam.result index ae66f98739d..a192d243bb0 100644 --- a/mysql-test/r/binlog_row_mix_innodb_myisam.result +++ b/mysql-test/r/binlog_row_mix_innodb_myisam.result @@ -359,15 +359,6 @@ show binlog events from 102; Log_name Pos Event_type Server_id End_log_pos Info master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Query 1 # use `test`; BEGIN -master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` ( - `a` int(11) NOT NULL DEFAULT '0', - `b` int(11) DEFAULT NULL, - PRIMARY KEY (`a`) -) ENGINE=InnoDB -master-bin.000001 # Table_map 1 # table_id: # (test.t2) -master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Query 1 # use `test`; DROP TABLE if exists t2 master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F @@ -375,15 +366,6 @@ master-bin.000001 # Query 1 # use `test`; DROP TABLE IF EXISTS t2 master-bin.000001 # Query 1 # use `test`; CREATE TABLE t2 (a int, b int, primary key (a)) engine=innodb master-bin.000001 # Table_map 1 # table_id: # (test.t1) master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Query 1 # use `test`; BEGIN -master-bin.000001 # Query 1 # use `test`; CREATE TABLE `t2` ( - `a` int(11) NOT NULL DEFAULT '0', - `b` int(11) DEFAULT NULL, - PRIMARY KEY (`a`) -) ENGINE=InnoDB -master-bin.000001 # Table_map 1 # table_id: # (test.t2) -master-bin.000001 # Write_rows 1 # table_id: # flags: STMT_END_F -master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Query 1 # use `test`; TRUNCATE table t2 master-bin.000001 # Xid 1 # COMMIT /* xid= */ master-bin.000001 # Table_map 1 # table_id: # (test.t1) diff --git a/sql/log.cc b/sql/log.cc index dba4b65efd9..cec60ba100b 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -32,11 +32,22 @@ #include +/* + Define placement versions of operator new and operator delete since + we cannot be sure that the include exists. + */ +inline void *operator new(size_t, void *ptr) { return ptr; } +inline void *operator new[](size_t, void *ptr) { return ptr; } +inline void operator delete(void*, void*) { /* Do nothing */ } +inline void operator delete[](void*, void*) { /* Do nothing */ } + /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_USER_HOST_SIZE 512 #define MAX_TIME_SIZE 32 +#define FLAGSTR(V,F) ((V)&(F)?#F" ":"") + LOGGER logger; MYSQL_BIN_LOG mysql_bin_log; @@ -70,23 +81,96 @@ char *make_default_log_name(char *buff,const char* log_ext) } /* - This is a POD. Please keep it that way! - - Don't add constructors, destructors, or virtual functions. + Helper class to store binary log transaction data. */ -struct binlog_trx_data { +class binlog_trx_data { +public: + enum { + UNDEF_POS = ~ (my_off_t) 0 + }; + + binlog_trx_data() +#ifdef HAVE_ROW_BASED_REPLICATION + : m_pending(0), before_stmt_pos(UNDEF_POS) +#endif + { + trans_log.end_of_file= max_binlog_cache_size; + } + + ~binlog_trx_data() + { +#ifdef HAVE_ROW_BASED_REPLICATION + DBUG_ASSERT(pending() == NULL); +#endif + close_cached_file(&trans_log); + } + + my_off_t position() const { + return my_b_tell(&trans_log); + } + bool empty() const { #ifdef HAVE_ROW_BASED_REPLICATION - return pending == NULL && my_b_tell(&trans_log) == 0; + return pending() == NULL && my_b_tell(&trans_log) == 0; #else return my_b_tell(&trans_log) == 0; #endif } - binlog_trx_data() {} - IO_CACHE trans_log; // The transaction cache + + /* + Truncate the transaction cache to a certain position. This + includes deleting the pending event. + */ + void truncate(my_off_t pos) + { #ifdef HAVE_ROW_BASED_REPLICATION - Rows_log_event *pending; // The pending binrows event + delete pending(); + set_pending(0); +#endif + reinit_io_cache(&trans_log, WRITE_CACHE, pos, 0, 0); + } + + /* + Reset the entire contents of the transaction cache, emptying it + completely. + */ + void reset() { + if (!empty()) + truncate(0); +#ifdef HAVE_ROW_BASED_REPLICATION + before_stmt_pos= UNDEF_POS; +#endif + trans_log.end_of_file= max_binlog_cache_size; + } + +#ifdef HAVE_ROW_BASED_REPLICATION + Rows_log_event *pending() const + { + return m_pending; + } + + void set_pending(Rows_log_event *const pending) + { + m_pending= pending; + } +#endif + + IO_CACHE trans_log; // The transaction cache + +private: +#ifdef HAVE_ROW_BASED_REPLICATION + /* + Pending binrows event. This event is the event where the rows are + currently written. + */ + Rows_log_event *m_pending; + +public: + /* + Binlog position before the start of the current statement. + */ + my_off_t before_stmt_pos; #endif }; @@ -1148,6 +1232,69 @@ void Log_to_csv_event_handler:: } + /* + Save position of binary log transaction cache. + + SYNPOSIS + binlog_trans_log_savepos() + + thd The thread to take the binlog data from + pos Pointer to variable where the position will be stored + + DESCRIPTION + + Save the current position in the binary log transaction cache into + the variable pointed to by 'pos' + */ + +static void +binlog_trans_log_savepos(THD *thd, my_off_t *pos) +{ + DBUG_ENTER("binlog_trans_log_savepos"); + DBUG_ASSERT(pos != NULL); + if (thd->ha_data[binlog_hton.slot] == NULL) + thd->binlog_setup_trx_data(); + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + DBUG_ASSERT(mysql_bin_log.is_open()); + *pos= trx_data->position(); + DBUG_PRINT("return", ("*pos=%u", *pos)); + DBUG_VOID_RETURN; +} + + +/* + Truncate the binary log transaction cache. + + SYNPOSIS + binlog_trans_log_truncate() + + thd The thread to take the binlog data from + pos Position to truncate to + + DESCRIPTION + + Truncate the binary log to the given position. Will not change + anything else. + + */ +static void +binlog_trans_log_truncate(THD *thd, my_off_t pos) +{ + DBUG_ENTER("binlog_trans_log_truncate"); + DBUG_PRINT("enter", ("pos=%u", pos)); + + DBUG_ASSERT(thd->ha_data[binlog_hton.slot] != NULL); + /* Only true if binlog_trans_log_savepos() wasn't called before */ + DBUG_ASSERT(pos != ~(my_off_t) 0); + + binlog_trx_data *const trx_data= + (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + trx_data->truncate(pos); + DBUG_VOID_RETURN; +} + + /* this function is mostly a placeholder. conceptually, binlog initialization (now mostly done in MYSQL_BIN_LOG::open) @@ -1174,26 +1321,62 @@ static int binlog_close_connection(THD *thd) { binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; - IO_CACHE *trans_log= &trx_data->trans_log; DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty()); - close_cached_file(trans_log); thd->ha_data[binlog_hton.slot]= 0; + trx_data->~binlog_trx_data(); my_free((gptr)trx_data, MYF(0)); return 0; } +/* + End a transaction. + + SYNOPSIS + binlog_end_trans() + + thd The thread whose transaction should be ended + trx_data Pointer to the transaction data to use + end_ev The end event to use, or NULL + all True if the entire transaction should be ended, false if + only the statement transaction should be ended. + + DESCRIPTION + + End the currently open transaction. The transaction can be either + a real transaction (if 'all' is true) or a statement transaction + (if 'all' is false). + + If 'end_ev' is NULL, the transaction is a rollback of only + transactional tables, so the transaction cache will be truncated + to either just before the last opened statement transaction (if + 'all' is false), or reset completely (if 'all' is true). + */ static int -binlog_end_trans(THD *thd, binlog_trx_data *trx_data, Log_event *end_ev) +binlog_end_trans(THD *thd, binlog_trx_data *trx_data, + Log_event *end_ev, bool all) { DBUG_ENTER("binlog_end_trans"); int error=0; IO_CACHE *trans_log= &trx_data->trans_log; + DBUG_PRINT("enter", ("transaction: %s, end_ev=%p", + all ? "all" : "stmt", end_ev)); + DBUG_PRINT("info", ("thd->options={ %s%s}", + FLAGSTR(thd->options, OPTION_NOT_AUTOCOMMIT), + FLAGSTR(thd->options, OPTION_BEGIN))); - - /* NULL denotes ROLLBACK with nothing to replicate */ + /* + NULL denotes ROLLBACK with nothing to replicate: i.e., rollback of + only transactional tables. If the transaction contain changes to + any non-transactiona tables, we need write the transaction and log + a ROLLBACK last. + */ if (end_ev != NULL) { /* + Doing a commit or a rollback including non-transactional tables, + i.e., ending a transaction where we might write the transaction + cache to the binary log. + We can always end the statement when ending a transaction since transactions are not allowed inside stored functions. If they were, we would have to ensure that we're not ending a statement @@ -1202,38 +1385,55 @@ binlog_end_trans(THD *thd, binlog_trx_data *trx_data, Log_event *end_ev) #ifdef HAVE_ROW_BASED_REPLICATION thd->binlog_flush_pending_rows_event(TRUE); #endif - error= mysql_bin_log.write(thd, trans_log, end_ev); + /* + We write the transaction cache to the binary log if either we're + committing the entire transaction, or if we are doing an + autocommit outside a transaction. + */ + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + { + error= mysql_bin_log.write(thd, &trx_data->trans_log, end_ev); + trx_data->reset(); +#ifdef HAVE_ROW_BASED_REPLICATION + /* + We need to step the table map version after writing the + transaction cache to disk. + */ + mysql_bin_log.update_table_map_version(); +#endif + statistic_increment(binlog_cache_use, &LOCK_status); + if (trans_log->disk_writes != 0) + { + statistic_increment(binlog_cache_disk_use, &LOCK_status); + trans_log->disk_writes= 0; + } + } } #ifdef HAVE_ROW_BASED_REPLICATION else { -#ifdef HAVE_ROW_BASED_REPLICATION - thd->binlog_delete_pending_rows_event(); -#endif + /* + If rolling back an entire transaction or a single statement not + inside a transaction, we reset the transaction cache. + + If rolling back a statement in a transaction, we truncate the + transaction cache to remove the statement. + + */ + if (all || !(thd->options & (OPTION_BEGIN | OPTION_NOT_AUTOCOMMIT))) + trx_data->reset(); + else + trx_data->truncate(trx_data->before_stmt_pos); // ...statement + + /* + We need to step the table map version on a rollback to ensure + that a new table map event is generated instead of the one that + was written to the thrown-away transaction cache. + */ + mysql_bin_log.update_table_map_version(); } - - /* - We need to step the table map version both after writing the - entire transaction to the log file and after rolling back the - transaction. - - We need to step the table map version after writing the - transaction cache to disk. In addition, we need to step the table - map version on a rollback to ensure that a new table map event is - generated instead of the one that was written to the thrown-away - transaction cache. - */ - mysql_bin_log.update_table_map_version(); #endif - statistic_increment(binlog_cache_use, &LOCK_status); - if (trans_log->disk_writes != 0) - { - statistic_increment(binlog_cache_disk_use, &LOCK_status); - trans_log->disk_writes= 0; - } - reinit_io_cache(trans_log, WRITE_CACHE, (my_off_t) 0, 0, 1); // cannot fail - trans_log->end_of_file= max_binlog_cache_size; DBUG_RETURN(error); } @@ -1250,26 +1450,31 @@ static int binlog_prepare(THD *thd, bool all) static int binlog_commit(THD *thd, bool all) { + int error= 0; DBUG_ENTER("binlog_commit"); binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_ASSERT(mysql_bin_log.is_open() && - (all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))); + DBUG_ASSERT(mysql_bin_log.is_open()); - if (trx_data->empty()) + if (all && trx_data->empty()) { // we're here because trans_log was flushed in MYSQL_BIN_LOG::log() + trx_data->reset(); DBUG_RETURN(0); } - if (all) + if (all) { Query_log_event qev(thd, STRING_WITH_LEN("COMMIT"), TRUE, FALSE); qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) - DBUG_RETURN(binlog_end_trans(thd, trx_data, &qev)); + int error= binlog_end_trans(thd, trx_data, &qev, all); + DBUG_RETURN(error); } else - DBUG_RETURN(binlog_end_trans(thd, trx_data, &invisible_commit)); + { + int error= binlog_end_trans(thd, trx_data, &invisible_commit, all); + DBUG_RETURN(error); + } } static int binlog_rollback(THD *thd, bool all) @@ -1279,13 +1484,13 @@ static int binlog_rollback(THD *thd, bool all) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; IO_CACHE *trans_log= &trx_data->trans_log; - /* - First assert is guaranteed - see trans_register_ha() call below. - The second must be true. If it is not, we're registering - unnecessary, doing extra work. The cause should be found and eliminated - */ - DBUG_ASSERT(all || !(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))); - DBUG_ASSERT(mysql_bin_log.is_open() && !trx_data->empty()); + DBUG_ASSERT(mysql_bin_log.is_open()); + + if (trx_data->empty()) { + trx_data->reset(); + DBUG_RETURN(0); + } + /* Update the binary log with a BEGIN/ROLLBACK block if we have cached some queries and we updated some non-transactional @@ -1297,10 +1502,10 @@ static int binlog_rollback(THD *thd, bool all) { Query_log_event qev(thd, STRING_WITH_LEN("ROLLBACK"), TRUE, FALSE); qev.error_code= 0; // see comment in MYSQL_LOG::write(THD, IO_CACHE) - error= binlog_end_trans(thd, trx_data, &qev); + error= binlog_end_trans(thd, trx_data, &qev, all); } else - error= binlog_end_trans(thd, trx_data, 0); + error= binlog_end_trans(thd, trx_data, 0, all); DBUG_RETURN(error); } @@ -1328,11 +1533,8 @@ static int binlog_rollback(THD *thd, bool all) static int binlog_savepoint_set(THD *thd, void *sv) { DBUG_ENTER("binlog_savepoint_set"); - binlog_trx_data *const trx_data= - (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; - DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(&trx_data->trans_log)); - *(my_off_t *)sv= my_b_tell(&trx_data->trans_log); + binlog_trans_log_savepos(thd, (my_off_t*) sv); /* Write it to the binary log */ int const error= @@ -1347,7 +1549,7 @@ static int binlog_savepoint_rollback(THD *thd, void *sv) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; IO_CACHE *trans_log= &trx_data->trans_log; - DBUG_ASSERT(mysql_bin_log.is_open() && my_b_tell(trans_log)); + DBUG_ASSERT(mysql_bin_log.is_open()); /* Write ROLLBACK TO SAVEPOINT to the binlog cache if we have updated some @@ -1362,7 +1564,7 @@ static int binlog_savepoint_rollback(THD *thd, void *sv) thd->query, thd->query_length, TRUE, FALSE); DBUG_RETURN(error); } - reinit_io_cache(trans_log, WRITE_CACHE, *(my_off_t *)sv, 0, 0); + binlog_trans_log_truncate(thd, *(my_off_t*)sv); DBUG_RETURN(0); } @@ -2487,7 +2689,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) thread. If the transaction involved MyISAM tables, it should go into binlog even on rollback. */ - (void) pthread_mutex_lock(&LOCK_thread_count); + VOID(pthread_mutex_lock(&LOCK_thread_count)); /* Save variables so that we can reopen the log */ save_name=name; @@ -2519,7 +2721,7 @@ bool MYSQL_BIN_LOG::reset_logs(THD* thd) my_free((gptr) save_name, MYF(0)); err: - (void) pthread_mutex_unlock(&LOCK_thread_count); + VOID(pthread_mutex_unlock(&LOCK_thread_count)); pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_log); DBUG_RETURN(error); @@ -3085,18 +3287,76 @@ int THD::binlog_setup_trx_data() ha_data[binlog_hton.slot]= 0; DBUG_RETURN(1); // Didn't manage to set it up } - trx_data->trans_log.end_of_file= max_binlog_cache_size; + + trx_data= new (ha_data[binlog_hton.slot]) binlog_trx_data; + DBUG_RETURN(0); } +#ifdef HAVE_ROW_BASED_REPLICATION +/* + Function to start a statement and optionally a transaction for the + binary log. + + SYNOPSIS + binlog_start_trans_and_stmt() + + DESCRIPTION + + This function does three things: + - Start a transaction if not in autocommit mode or if a BEGIN + statement has been seen. + + - Start a statement transaction to allow us to truncate the binary + log. + + - Save the currrent binlog position so that we can roll back the + statement by truncating the transaction log. + + We only update the saved position if the old one was undefined, + the reason is that there are some cases (e.g., for CREATE-SELECT) + where the position is saved twice (e.g., both in + select_create::prepare() and THD::binlog_write_table_map()) , but + we should use the first. This means that calls to this function + can be used to start the statement before the first table map + event, to include some extra events. + */ + +void +THD::binlog_start_trans_and_stmt() +{ + DBUG_ENTER("binlog_start_trans_and_stmt"); + binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot]; + DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data)); + if (trx_data) + DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u", + trx_data->before_stmt_pos)); + if (trx_data == NULL || + trx_data->before_stmt_pos == binlog_trx_data::UNDEF_POS) + { + /* + The call to binlog_trans_log_savepos() might create the trx_data + structure, if it didn't exist before, so we save the position + into an auto variable and then write it into the transaction + data for the binary log (i.e., trx_data). + */ + my_off_t pos= 0; + binlog_trans_log_savepos(this, &pos); + trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot]; + + trx_data->before_stmt_pos= pos; + + if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + trans_register_ha(this, TRUE, &binlog_hton); + trans_register_ha(this, FALSE, &binlog_hton); + } + DBUG_VOID_RETURN; +} + /* Write a table map to the binary log. - - This function is called from ha_external_lock() after the storage - engine has registered for the transaction. */ -#ifdef HAVE_ROW_BASED_REPLICATION int THD::binlog_write_table_map(TABLE *table, bool is_trans) { int error; @@ -3115,10 +3375,8 @@ int THD::binlog_write_table_map(TABLE *table, bool is_trans) Table_map_log_event the_event(this, table, table->s->table_map_id, is_trans, flags); - if (is_trans) - trans_register_ha(this, - (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0, - &binlog_hton); + if (is_trans && binlog_table_maps == 0) + binlog_start_trans_and_stmt(); if ((error= mysql_bin_log.write(&the_event))) DBUG_RETURN(error); @@ -3139,7 +3397,7 @@ THD::binlog_get_pending_rows_event() const (since the trx_data is set up there). In that case, we just return NULL. */ - return trx_data ? trx_data->pending : NULL; + return trx_data ? trx_data->pending() : NULL; } void @@ -3152,7 +3410,7 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) (binlog_trx_data*) ha_data[binlog_hton.slot]; DBUG_ASSERT(trx_data); - trx_data->pending= ev; + trx_data->set_pending(ev); } @@ -3161,8 +3419,9 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev) (either cached binlog if transaction, or disk binlog). Sets a new pending event. */ -int MYSQL_BIN_LOG:: - flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event) +int +MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd, + Rows_log_event* event) { DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)"); DBUG_ASSERT(thd->current_stmt_binlog_row_based && mysql_bin_log.is_open()); @@ -3175,9 +3434,9 @@ int MYSQL_BIN_LOG:: DBUG_ASSERT(trx_data); - DBUG_PRINT("info", ("trx_data->pending=%p", trx_data->pending)); + DBUG_PRINT("info", ("trx_data->pending()=%p", trx_data->pending())); - if (Rows_log_event* pending= trx_data->pending) + if (Rows_log_event* pending= trx_data->pending()) { IO_CACHE *file= &log_file; @@ -3327,15 +3586,14 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; IO_CACHE *trans_log= &trx_data->trans_log; - bool trans_log_in_use= my_b_tell(trans_log) != 0; - if (event_info->get_cache_stmt() && !trans_log_in_use) - trans_register_ha(thd, - (thd->options & - (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) != 0, - &binlog_hton); - if (event_info->get_cache_stmt() || trans_log_in_use) + my_off_t trans_log_pos= my_b_tell(trans_log); + if (event_info->get_cache_stmt() || trans_log_pos != 0) { - DBUG_PRINT("info", ("Using trans_log")); + DBUG_PRINT("info", ("Using trans_log: cache=%d, trans_log_pos=%u", + event_info->get_cache_stmt(), + trans_log_pos)); + if (trans_log_pos == 0) + thd->binlog_start_trans_and_stmt(); file= trans_log; } /* @@ -3542,61 +3800,69 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_CACHE *cache, Log_event *commit_event) uint length; /* - Log "BEGIN" at the beginning of the transaction. - which may contain more than 1 SQL statement. - */ - if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + We only bother to write to the binary log if there is anything + to write. + */ + if (my_b_tell(cache) > 0) { - Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); /* - Imagine this is rollback due to net timeout, after all statements of - the transaction succeeded. Then we want a zero-error code in BEGIN. - In other words, if there was a really serious error code it's already - in the statement's events, there is no need to put it also in this - internally generated event, and as this event is generated late it - would lead to false alarms. - This is safer than thd->clear_error() against kills at shutdown. + Log "BEGIN" at the beginning of the transaction. + which may contain more than 1 SQL statement. */ - qinfo.error_code= 0; - /* - Now this Query_log_event has artificial log_pos 0. It must be adjusted - to reflect the real position in the log. Not doing it would confuse the - slave: it would prevent this one from knowing where he is in the - master's binlog, which would result in wrong positions being shown to - the user, MASTER_POS_WAIT undue waiting etc. - */ - if (qinfo.write(&log_file)) - goto err; - } - /* Read from the file used to cache the queries .*/ - if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) - goto err; - length=my_b_bytes_in_cache(cache); - DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); - do - { - /* Write data to the binary log file */ - if (my_b_write(&log_file, cache->read_pos, length)) - goto err; - cache->read_pos=cache->read_end; // Mark buffer used up - DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); - } while ((length=my_b_fill(cache))); + if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) + { + Query_log_event qinfo(thd, STRING_WITH_LEN("BEGIN"), TRUE, FALSE); + /* + Imagine this is rollback due to net timeout, after all statements of + the transaction succeeded. Then we want a zero-error code in BEGIN. + In other words, if there was a really serious error code it's already + in the statement's events, there is no need to put it also in this + internally generated event, and as this event is generated late it + would lead to false alarms. + This is safer than thd->clear_error() against kills at shutdown. + */ + qinfo.error_code= 0; + /* + Now this Query_log_event has artificial log_pos 0. It must be adjusted + to reflect the real position in the log. Not doing it would confuse the + slave: it would prevent this one from knowing where he is in the + master's binlog, which would result in wrong positions being shown to + the user, MASTER_POS_WAIT undue waiting etc. + */ + if (qinfo.write(&log_file)) + goto err; + } + /* Read from the file used to cache the queries .*/ + if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) + goto err; + length=my_b_bytes_in_cache(cache); + DBUG_EXECUTE_IF("half_binlogged_transaction", length-=100;); + do + { + /* Write data to the binary log file */ + if (my_b_write(&log_file, cache->read_pos, length)) + goto err; + cache->read_pos=cache->read_end; // Mark buffer used up + DBUG_EXECUTE_IF("half_binlogged_transaction", goto DBUG_skip_commit;); + } while ((length=my_b_fill(cache))); - if (commit_event->write(&log_file)) - goto err; + if (commit_event && commit_event->write(&log_file)) + goto err; #ifndef DBUG_OFF -DBUG_skip_commit: + DBUG_skip_commit: #endif - if (flush_and_sync()) - goto err; - DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); - if (cache->error) // Error on read - { - sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); - write_error=1; // Don't give more errors - goto err; + if (flush_and_sync()) + goto err; + DBUG_EXECUTE_IF("half_binlogged_transaction", abort();); + if (cache->error) // Error on read + { + sql_print_error(ER(ER_ERROR_ON_READ), cache->file_name, errno); + write_error=1; // Don't give more errors + goto err; + } + signal_update(); } - signal_update(); + /* if commit_event is Xid_log_event, increase the number of prepared_xids (it's decreasd in ::unlog()). Binlog cannot be rotated @@ -3605,7 +3871,7 @@ DBUG_skip_commit: If the commit_event is not Xid_log_event (then it's a Query_log_event) rotate binlog, if necessary. */ - if (commit_event->get_type_code() == XID_EVENT) + if (commit_event && commit_event->get_type_code() == XID_EVENT) { pthread_mutex_lock(&LOCK_prep_xids); prepared_xids++; @@ -4605,12 +4871,17 @@ int TC_LOG_BINLOG::log(THD *thd, my_xid xid) Xid_log_event xle(thd, xid); binlog_trx_data *trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; - DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle)); // invert return value + /* + We always commit the entire transaction when writing an XID. Also + note that the return value is inverted. + */ + DBUG_RETURN(!binlog_end_trans(thd, trx_data, &xle, TRUE)); } void TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) { pthread_mutex_lock(&LOCK_prep_xids); + DBUG_ASSERT(prepared_xids > 0); if (--prepared_xids == 0) pthread_cond_signal(&COND_prep_xids); pthread_mutex_unlock(&LOCK_prep_xids); diff --git a/sql/sql_class.h b/sql/sql_class.h index 35de14835a4..bb6e391b267 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -929,6 +929,7 @@ public: /* Public interface to write RBR events to the binlog */ + void binlog_start_trans_and_stmt(); int binlog_write_table_map(TABLE *table, bool is_transactional); int binlog_write_row(TABLE* table, bool is_transactional, MY_BITMAP const* cols, my_size_t colcnt, diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 25ed03c4051..96a1bce548d 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -1234,7 +1234,7 @@ err: if (thd->lex->current_select) thd->lex->current_select->no_error= 0; // Give error table->file->print_error(error,MYF(0)); - + before_trg_err: table->file->restore_auto_increment(prev_insert_id); if (key) @@ -1982,6 +1982,10 @@ err: rolled back. We only need to roll back a potential statement transaction, since real transactions are rolled back in close_thread_tables(). + + TODO: This is not true any more, table maps are generated on the + first call to ha_*_row() instead. Remove code that are used to + cover for the case outlined above. */ ha_rollback_stmt(thd); @@ -2086,8 +2090,6 @@ bool delayed_insert::handle_inserts(void) thd.start_time=row->start_time; thd.query_start_used=row->query_start_used; - /* for the binlog, forget auto_increment ids generated by previous rows */ -// thd.auto_inc_intervals_in_cur_stmt_for_binlog.empty(); thd.first_successful_insert_id_in_prev_stmt= row->first_successful_insert_id_in_prev_stmt; thd.stmt_depends_on_first_successful_insert_id_in_prev_stmt= @@ -2317,6 +2319,7 @@ select_insert::prepare(List &values, SELECT_LEX_UNIT *u) DBUG_ENTER("select_insert::prepare"); unit= u; + /* Since table in which we are going to insert is added to the first select, LEX::current_select should point to the first select while @@ -2547,56 +2550,54 @@ void select_insert::send_error(uint errcode,const char *err) if (errcode != ER_UNKNOWN_ERROR && !thd->net.report_error) my_message(errcode, err, MYF(0)); - if (!table) + /* + If the creation of the table failed (due to a syntax error, for + example), no table will have been opened and therefore 'table' + will be NULL. In that case, we still need to execute the rollback + and the end of the function to truncate the binary log, but we can + skip all the intermediate steps. + */ + if (table) { /* - This can only happen when using CREATE ... SELECT and the table was not - created becasue of an syntax error + If we are not in prelocked mode, we end the bulk insert started + before. */ - DBUG_VOID_RETURN; - } - if (!thd->prelocked_mode) - table->file->ha_end_bulk_insert(); - /* - If at least one row has been inserted/modified and will stay in the table - (the table doesn't have transactions) we must write to the binlog (and - the error code will make the slave stop). + if (!thd->prelocked_mode) + table->file->ha_end_bulk_insert(); - For many errors (example: we got a duplicate key error while - inserting into a MyISAM table), no row will be added to the table, - so passing the error to the slave will not help since there will - be an error code mismatch (the inserts will succeed on the slave - with no error). + /* + If at least one row has been inserted/modified and will stay in + the table (the table doesn't have transactions) we must write to + the binlog (and the error code will make the slave stop). - If we are using row-based replication we have two cases where this - code is executed: replication of CREATE-SELECT and replication of - INSERT-SELECT. + For many errors (example: we got a duplicate key error while + inserting into a MyISAM table), no row will be added to the table, + so passing the error to the slave will not help since there will + be an error code mismatch (the inserts will succeed on the slave + with no error). - When replicating a CREATE-SELECT statement, we shall not write the - events to the binary log and should thus not set - OPTION_STATUS_NO_TRANS_UPDATE. - - When replicating INSERT-SELECT, we shall not write the events to - the binary log for transactional table, but shall write all events - if there is one or more writes to non-transactional tables. In - this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a - write to a non-transactional table, otherwise it is cleared. - */ - if (info.copied || info.deleted || info.updated) - { - if (!table->file->has_transactions()) + If table creation failed, the number of rows modified will also be + zero, so no check for that is made. + */ + if (info.copied || info.deleted || info.updated) { - if (mysql_bin_log.is_open()) + DBUG_ASSERT(table != NULL); + if (!table->file->has_transactions()) { - thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + if (mysql_bin_log.is_open()) + { + thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, + table->file->has_transactions(), FALSE); + } + if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && + !can_rollback_data()) + thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; + query_cache_invalidate3(thd, table, 1); } - if (!thd->current_stmt_binlog_row_based && !table->s->tmp_table && - !can_rollback_data()) - thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; - query_cache_invalidate3(thd, table, 1); } } + ha_rollback_stmt(thd); DBUG_VOID_RETURN; } @@ -2605,8 +2606,11 @@ void select_insert::send_error(uint errcode,const char *err) bool select_insert::send_eof() { int error,error2; + bool const trans_table= table->file->has_transactions(); ulonglong id; DBUG_ENTER("select_insert::send_eof"); + DBUG_PRINT("enter", ("trans_table=%d, table_type='%s'", + trans_table, table->file->table_type())); error= (!thd->prelocked_mode) ? table->file->ha_end_bulk_insert():0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); @@ -2626,9 +2630,8 @@ bool select_insert::send_eof() are not logged in RBR) - We are using statement based replication */ - if (!table->file->has_transactions() && - (!table->s->tmp_table || - !thd->current_stmt_binlog_row_based)) + if (!trans_table && + (!table->s->tmp_table || !thd->current_stmt_binlog_row_based)) thd->options|= OPTION_STATUS_NO_TRANS_UPDATE; } @@ -2644,10 +2647,21 @@ bool select_insert::send_eof() thd->clear_error(); thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); + trans_table, FALSE); } - if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) - error=error2; + /* + We will call ha_autocommit_or_rollback() also for + non-transactional tables under row-based replication: there might + be events in the binary logs transaction, and we need to write + them to the binary log. + */ + if (trans_table || thd->current_stmt_binlog_row_based) + { + int const error2= ha_autocommit_or_rollback(thd, error); + if (error2 && !error) + error=error2; + } + if (error) { table->file->print_error(error,MYF(0)); @@ -2843,14 +2857,19 @@ select_create::prepare(List &values, SELECT_LEX_UNIT *u) class MY_HOOKS : public TABLEOP_HOOKS { public: MY_HOOKS(select_create *x) : ptr(x) { } - virtual void do_prelock(TABLE **tables, uint count) - { - if (ptr->get_thd()->current_stmt_binlog_row_based && - !(ptr->get_create_info()->options & HA_LEX_CREATE_TMP_TABLE)) - ptr->binlog_show_create_table(tables, count); - } private: + virtual void do_prelock(TABLE **tables, uint count) + { + TABLE const *const table = *tables; + if (ptr->get_thd()->current_stmt_binlog_row_based && + table->s->tmp_table == NO_TMP_TABLE && + !ptr->get_create_info()->table_existed) + { + ptr->binlog_show_create_table(tables, count); + } + } + select_create *ptr; }; @@ -2859,6 +2878,20 @@ select_create::prepare(List &values, SELECT_LEX_UNIT *u) #endif unit= u; + +#ifdef HAVE_ROW_BASED_REPLICATION + /* + Start a statement transaction before the create if we are creating + a non-temporary table and are using row-based replication for the + statement. + */ + if ((thd->lex->create_info.options & HA_LEX_CREATE_TMP_TABLE) == 0 && + thd->current_stmt_binlog_row_based) + { + thd->binlog_start_trans_and_stmt(); + } +#endif + if (!(table= create_table_from_items(thd, create_info, create_table, extra_fields, keys, &values, &thd->extra_lock, hook_ptr))) @@ -3006,7 +3039,16 @@ void select_create::abort() table->s->version= 0; hash_delete(&open_cache,(byte*) table); if (!create_info->table_existed) + { quick_rm_table(table_type, create_table->db, create_table->table_name); + /* + We roll back the statement, including truncating the + transaction cache of the binary log, if the statement + failed. + */ + if (thd->current_stmt_binlog_row_based) + ha_rollback_stmt(thd); + } /* Tell threads waiting for refresh that something has happened */ if (version != refresh_version) broadcast_refresh(); From 93ea3261eb64f46585c51740b505a4384977e1f5 Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Thu, 5 Oct 2006 22:16:19 +0200 Subject: [PATCH 04/10] Post-merge fixes. --- sql/log.cc | 20 ++++++++++---------- sql/sql_insert.cc | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/log.cc b/sql/log.cc index 0b1b94ac576..a798604a6ae 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -1253,10 +1253,10 @@ binlog_trans_log_savepos(THD *thd, my_off_t *pos) { DBUG_ENTER("binlog_trans_log_savepos"); DBUG_ASSERT(pos != NULL); - if (thd->ha_data[binlog_hton.slot] == NULL) + if (thd->ha_data[binlog_hton->slot] == NULL) thd->binlog_setup_trx_data(); binlog_trx_data *const trx_data= - (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; DBUG_ASSERT(mysql_bin_log.is_open()); *pos= trx_data->position(); DBUG_PRINT("return", ("*pos=%u", *pos)); @@ -1285,12 +1285,12 @@ binlog_trans_log_truncate(THD *thd, my_off_t pos) DBUG_ENTER("binlog_trans_log_truncate"); DBUG_PRINT("enter", ("pos=%u", pos)); - DBUG_ASSERT(thd->ha_data[binlog_hton.slot] != NULL); + DBUG_ASSERT(thd->ha_data[binlog_hton->slot] != NULL); /* Only true if binlog_trans_log_savepos() wasn't called before */ DBUG_ASSERT(pos != ~(my_off_t) 0); binlog_trx_data *const trx_data= - (binlog_trx_data*) thd->ha_data[binlog_hton.slot]; + (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; trx_data->truncate(pos); DBUG_VOID_RETURN; } @@ -1323,7 +1323,7 @@ static int binlog_close_connection(handlerton *hton, THD *thd) binlog_trx_data *const trx_data= (binlog_trx_data*) thd->ha_data[binlog_hton->slot]; DBUG_ASSERT(mysql_bin_log.is_open() && trx_data->empty()); - thd->ha_data[binlog_hton.slot]= 0; + thd->ha_data[binlog_hton->slot]= 0; trx_data->~binlog_trx_data(); my_free((gptr)trx_data, MYF(0)); return 0; @@ -3294,7 +3294,7 @@ int THD::binlog_setup_trx_data() DBUG_RETURN(1); // Didn't manage to set it up } - trx_data= new (ha_data[binlog_hton.slot]) binlog_trx_data; + trx_data= new (ha_data[binlog_hton->slot]) binlog_trx_data; DBUG_RETURN(0); } @@ -3332,7 +3332,7 @@ void THD::binlog_start_trans_and_stmt() { DBUG_ENTER("binlog_start_trans_and_stmt"); - binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot]; + binlog_trx_data *trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; DBUG_PRINT("enter", ("trx_data=0x%lu", trx_data)); if (trx_data) DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u", @@ -3348,13 +3348,13 @@ THD::binlog_start_trans_and_stmt() */ my_off_t pos= 0; binlog_trans_log_savepos(this, &pos); - trx_data= (binlog_trx_data*) ha_data[binlog_hton.slot]; + trx_data= (binlog_trx_data*) ha_data[binlog_hton->slot]; trx_data->before_stmt_pos= pos; if (options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) - trans_register_ha(this, TRUE, &binlog_hton); - trans_register_ha(this, FALSE, &binlog_hton); + trans_register_ha(this, TRUE, binlog_hton); + trans_register_ha(this, FALSE, binlog_hton); } DBUG_VOID_RETURN; } diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index c5a2e8674e6..b8037372084 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -2637,10 +2637,10 @@ void select_insert::send_error(uint errcode,const char *err) query_cache_invalidate3(thd, table, 1); } } + table->file->ha_release_auto_increment(); } ha_rollback_stmt(thd); - table->file->ha_release_auto_increment(); DBUG_VOID_RETURN; } From cb1fdf51650f65de26c1d433aabeec3d9b85c823 Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Fri, 6 Oct 2006 08:41:34 +0200 Subject: [PATCH 05/10] Fix to build on 64-bit systems where sizeof(unsigned long long) > sizeof(unsigned long). --- sql/log.cc | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/log.cc b/sql/log.cc index a798604a6ae..60561d68d03 100644 --- a/sql/log.cc +++ b/sql/log.cc @@ -45,6 +45,7 @@ inline void operator delete[](void*, void*) { /* Do nothing */ } #define MAX_LOG_BUFFER_SIZE 1024 #define MAX_USER_HOST_SIZE 512 #define MAX_TIME_SIZE 32 +#define MY_OFF_T_UNDEF (~(my_off_t)0UL) #define FLAGSTR(V,F) ((V)&(F)?#F" ":"") @@ -85,13 +86,9 @@ char *make_default_log_name(char *buff,const char* log_ext) */ class binlog_trx_data { public: - enum { - UNDEF_POS = ~ (my_off_t) 0 - }; - binlog_trx_data() #ifdef HAVE_ROW_BASED_REPLICATION - : m_pending(0), before_stmt_pos(UNDEF_POS) + : m_pending(0), before_stmt_pos(MY_OFF_T_UNDEF) #endif { trans_log.end_of_file= max_binlog_cache_size; @@ -139,7 +136,7 @@ public: if (!empty()) truncate(0); #ifdef HAVE_ROW_BASED_REPLICATION - before_stmt_pos= UNDEF_POS; + before_stmt_pos= MY_OFF_T_UNDEF; #endif trans_log.end_of_file= max_binlog_cache_size; } @@ -3338,7 +3335,7 @@ THD::binlog_start_trans_and_stmt() DBUG_PRINT("enter", ("trx_data->before_stmt_pos=%u", trx_data->before_stmt_pos)); if (trx_data == NULL || - trx_data->before_stmt_pos == binlog_trx_data::UNDEF_POS) + trx_data->before_stmt_pos == MY_OFF_T_UNDEF) { /* The call to binlog_trans_log_savepos() might create the trx_data From e762328b544d1ff67b9ae472d62df13c2987d2b9 Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Fri, 6 Oct 2006 10:17:02 +0200 Subject: [PATCH 06/10] BUG#19459 (BINLOG RBR command does not lock tables correctly causing crash for, e.g., NDB): Before, mysqlbinlog printed table map events as a separate statement, so when executing the event, the opened table was subsequently closed when the statement ended. Instead, the row-based events that make up a statement are now printed as *one* BINLOG statement, which means that the table maps and the following *_rows_log_event events are executed fully before the statement ends. Changing implementation of BINLOG statement to be able to read the emitted format, which now consists of several chunks of BASE64-encoded data. --- client/mysqlbinlog.cc | 35 ++- mysys/mf_iocache2.c | 2 +- sql/log_event.cc | 575 +++++++++++++++++++++++++++--------------- sql/log_event.h | 30 ++- sql/sql_binlog.cc | 131 +++++++--- 5 files changed, 517 insertions(+), 256 deletions(-) diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index 81ad74466eb..f6d48fdef72 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -475,6 +475,30 @@ static bool check_database(const char *log_dbname) } + +static int +write_event_header_and_base64(Log_event *ev, FILE *result_file, + PRINT_EVENT_INFO *print_event_info) +{ + DBUG_ENTER("write_event_header_and_base64"); + /* Write header and base64 output to cache */ + IO_CACHE result_cache; + if (init_io_cache(&result_cache, -1, 0, WRITE_CACHE, 0L, FALSE, + MYF(MY_WME | MY_NABP))) + { + return 1; + } + + ev->print_header(&result_cache, print_event_info, FALSE); + ev->print_base64(&result_cache, print_event_info, FALSE); + + /* Read data from cache and write to result file */ + my_b_copy_to_file(&result_cache, result_file); + end_io_cache(&result_cache); + DBUG_RETURN(0); +} + + /* Process an event @@ -537,18 +561,18 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, print_event_info->base64_output= opt_base64_output; + DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str())); + switch (ev_type) { case QUERY_EVENT: if (check_database(((Query_log_event*)ev)->db)) goto end; if (opt_base64_output) - { - ev->print_header(result_file, print_event_info); - ev->print_base64(result_file, print_event_info); - } + write_event_header_and_base64(ev, result_file, print_event_info); else ev->print(result_file, print_event_info); break; + case CREATE_FILE_EVENT: { Create_file_log_event* ce= (Create_file_log_event*)ev; @@ -569,8 +593,7 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev, */ if (opt_base64_output) { - ce->print_header(result_file, print_event_info); - ce->print_base64(result_file, print_event_info); + write_event_header_and_base64(ce, result_file, print_event_info); } else ce->print(result_file, print_event_info, TRUE); diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index d76b895aeb0..57a06e263a6 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -28,7 +28,7 @@ Copy contents of an IO_CACHE to a file. SYNOPSIS - copy_io_cache_to_file() + my_b_copy_to_file() cache IO_CACHE to copy from file File to copy to diff --git a/sql/log_event.cc b/sql/log_event.cc index bf876366879..9cccce8f679 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -31,32 +31,111 @@ #define log_cs &my_charset_latin1 +/* + Cache that will automatically be written to a dedicated file on + destruction. + + DESCRIPTION + + */ +class Write_on_release_cache +{ +public: + enum flag + { + FLUSH_F + }; + + typedef unsigned short flag_set; + + /* + Constructor. + + SYNOPSIS + Write_on_release_cache + cache Pointer to cache to use + file File to write cache to upon destruction + flags Flags for the cache + + DESCRIPTION + + Class used to guarantee copy of cache to file before exiting the + current block. On successful copy of the cache, the cache will + be reinited as a WRITE_CACHE. + + Currently, a pointer to the cache is provided in the + constructor, but it would be possible to create a subclass + holding the IO_CACHE itself. + */ + Write_on_release_cache(IO_CACHE *cache, FILE *file, flag_set flags = 0) + : m_cache(cache), m_file(file), m_flags(flags) + { + reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE); + } + + ~Write_on_release_cache() + { + if (!my_b_copy_to_file(m_cache, m_file)) + reinit_io_cache(m_cache, WRITE_CACHE, 0L, FALSE, TRUE); + if (m_flags | FLUSH_F) + fflush(m_file); + } + + /* + Return a pointer to the internal IO_CACHE. + + SYNOPSIS + operator&() + + DESCRIPTION + Function to return a pointer to the internal, so that the object + can be treated as a IO_CACHE and used with the my_b_* IO_CACHE + functions + + RETURN VALUE + A pointer to the internal IO_CACHE. + */ + IO_CACHE *operator&() + { + return m_cache; + } + +private: + // Hidden, to prevent usage. + Write_on_release_cache(Write_on_release_cache const&); + + IO_CACHE *m_cache; + FILE *m_file; + flag_set m_flags; +}; + + /* pretty_print_str() */ #ifdef MYSQL_CLIENT -static void pretty_print_str(FILE* file, char* str, int len) +static void pretty_print_str(IO_CACHE* cache, char* str, int len) { char* end = str + len; - fputc('\'', file); + my_b_printf(cache, "\'"); while (str < end) { char c; switch ((c=*str++)) { - case '\n': fprintf(file, "\\n"); break; - case '\r': fprintf(file, "\\r"); break; - case '\\': fprintf(file, "\\\\"); break; - case '\b': fprintf(file, "\\b"); break; - case '\t': fprintf(file, "\\t"); break; - case '\'': fprintf(file, "\\'"); break; - case 0 : fprintf(file, "\\0"); break; + case '\n': my_b_printf(cache, "\\n"); break; + case '\r': my_b_printf(cache, "\\r"); break; + case '\\': my_b_printf(cache, "\\\\"); break; + case '\b': my_b_printf(cache, "\\b"); break; + case '\t': my_b_printf(cache, "\\t"); break; + case '\'': my_b_printf(cache, "\\'"); break; + case 0 : my_b_printf(cache, "\\0"); break; default: - fputc(c, file); + my_b_printf(cache, "%c", c); break; } } - fputc('\'', file); + my_b_printf(cache, "\'"); } #endif /* MYSQL_CLIENT */ @@ -293,14 +372,15 @@ append_query_string(CHARSET_INFO *csinfo, */ #ifdef MYSQL_CLIENT -static void print_set_option(FILE* file, uint32 bits_changed, uint32 option, - uint32 flags, const char* name, bool* need_comma) +static void print_set_option(IO_CACHE* file, uint32 bits_changed, + uint32 option, uint32 flags, const char* name, + bool* need_comma) { if (bits_changed & option) { if (*need_comma) - fprintf(file,", "); - fprintf(file,"%s=%d", name, test(flags & option)); + my_b_printf(file,", "); + my_b_printf(file,"%s=%d", name, test(flags & option)); *need_comma= 1; } } @@ -959,20 +1039,23 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len, Log_event::print_header() */ -void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Log_event::print_header(IO_CACHE* file, + PRINT_EVENT_INFO* print_event_info, + bool is_more __attribute__((unused))) { char llbuff[22]; my_off_t hexdump_from= print_event_info->hexdump_from; + DBUG_ENTER("Log_event::print_header"); - fputc('#', file); + my_b_printf(file, "#"); print_timestamp(file); - fprintf(file, " server id %d end_log_pos %s ", server_id, - llstr(log_pos,llbuff)); + my_b_printf(file, " server id %d end_log_pos %s ", server_id, + llstr(log_pos,llbuff)); /* mysqlbinlog --hexdump */ if (print_event_info->hexdump_from) { - fprintf(file, "\n"); + my_b_printf(file, "\n"); uchar *ptr= (uchar*)temp_buf; my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET) - LOG_EVENT_MINIMAL_HEADER_LEN; @@ -985,15 +1068,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) /* Pretty-print event common header if header is exactly 19 bytes */ if (print_event_info->common_header_len == LOG_EVENT_MINIMAL_HEADER_LEN) { - fprintf(file, "# Position Timestamp Type Master ID " - "Size Master Pos Flags \n"); - fprintf(file, "# %8.8lx %02x %02x %02x %02x %02x " - "%02x %02x %02x %02x %02x %02x %02x %02x " - "%02x %02x %02x %02x %02x %02x\n", - (unsigned long) hexdump_from, - ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], - ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], - ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); + char emit_buf[256]; // Enough for storing one line + my_b_printf(file, "# Position Timestamp Type Master ID " + "Size Master Pos Flags \n"); + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %02x %02x %02x %02x %02x " + "%02x %02x %02x %02x %02x %02x %02x %02x " + "%02x %02x %02x %02x %02x %02x\n", + (unsigned long) hexdump_from, + ptr[0], ptr[1], ptr[2], ptr[3], ptr[4], ptr[5], ptr[6], + ptr[7], ptr[8], ptr[9], ptr[10], ptr[11], ptr[12], ptr[13], + ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); + my_b_write(file, emit_buf, bytes_written); ptr += LOG_EVENT_MINIMAL_HEADER_LEN; hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN; } @@ -1010,9 +1099,21 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) if (i % 16 == 15) { - fprintf(file, "# %8.8lx %-48.48s |%16s|\n", - (unsigned long) (hexdump_from + (i & 0xfffffff0)), - hex_string, char_string); + /* + my_b_printf() does not support full printf() formats, so we + have to do it this way. + + TODO: Rewrite my_b_printf() to support full printf() syntax. + */ + char emit_buf[256]; + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %-48.48s |%16s|\n", + (unsigned long) (hexdump_from + (i & 0xfffffff0)), + hex_string, char_string); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); + my_b_write(file, emit_buf, bytes_written); hex_string[0]= 0; char_string[0]= 0; c= char_string; @@ -1024,28 +1125,52 @@ void Log_event::print_header(FILE* file, PRINT_EVENT_INFO* print_event_info) /* Non-full last line */ if (hex_string[0]) - fprintf(file, "# %8.8lx %-48.48s |%s|\n# ", - (unsigned long) (hexdump_from + (i & 0xfffffff0)), - hex_string, char_string); + { + char emit_buf[256]; + int const bytes_written= + my_snprintf(emit_buf, sizeof(emit_buf), + "# %8.8lx %-48.48s |%s|\n# ", + (unsigned long) (hexdump_from + (i & 0xfffffff0)), + hex_string, char_string); + DBUG_ASSERT(bytes_written >= 0); + DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); + my_b_write(file, emit_buf, bytes_written); + } } + DBUG_VOID_RETURN; } -void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Log_event::print_base64(IO_CACHE* file, + PRINT_EVENT_INFO* print_event_info, + bool more) { - uchar *ptr= (uchar*)temp_buf; + const uchar *ptr= (const uchar *)temp_buf; my_off_t size= uint4korr(ptr + EVENT_LEN_OFFSET); - char *tmp_str= - (char *) my_malloc(base64_needed_encoded_length(size), MYF(MY_WME)); + DBUG_ENTER("Log_event::print_base64"); + + size_t const tmp_str_sz= base64_needed_encoded_length(size); + char *const tmp_str= (char *) my_malloc(tmp_str_sz, MYF(MY_WME)); if (!tmp_str) { fprintf(stderr, "\nError: Out of memory. " "Could not print correct binlog event.\n"); - return; + DBUG_VOID_RETURN; } - int res= base64_encode(ptr, size, tmp_str); - fprintf(file, "\nBINLOG '\n%s\n';\n", tmp_str); + + int const res= base64_encode(ptr, size, tmp_str); + DBUG_ASSERT(res == 0); + + if (my_b_tell(file) == 0) + my_b_printf(file, "\nBINLOG '\n"); + + my_b_printf(file, "%s\n", tmp_str); + + if (!more) + my_b_printf(file, "';\n"); + my_free(tmp_str, MYF(0)); + DBUG_VOID_RETURN; } @@ -1053,9 +1178,10 @@ void Log_event::print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info) Log_event::print_timestamp() */ -void Log_event::print_timestamp(FILE* file, time_t* ts) +void Log_event::print_timestamp(IO_CACHE* file, time_t* ts) { struct tm *res; + DBUG_ENTER("Log_event::print_timestamp"); if (!ts) ts = &when; #ifdef MYSQL_SERVER // This is always false @@ -1065,13 +1191,14 @@ void Log_event::print_timestamp(FILE* file, time_t* ts) res=localtime(ts); #endif - fprintf(file,"%02d%02d%02d %2d:%02d:%02d", - res->tm_year % 100, - res->tm_mon+1, - res->tm_mday, - res->tm_hour, - res->tm_min, - res->tm_sec); + my_b_printf(file,"%02d%02d%02d %2d:%02d:%02d", + res->tm_year % 100, + res->tm_mon+1, + res->tm_mday, + res->tm_hour, + res->tm_min, + res->tm_sec); + DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -1531,7 +1658,7 @@ Query_log_event::Query_log_event(const char* buf, uint event_len, */ #ifdef MYSQL_CLIENT -void Query_log_event::print_query_header(FILE* file, +void Query_log_event::print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info) { // TODO: print the catalog ?? @@ -1541,9 +1668,10 @@ void Query_log_event::print_query_header(FILE* file, if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", - get_type_str(), (ulong) thread_id, (ulong) exec_time, error_code); + print_header(file, print_event_info, FALSE); + my_b_printf(file, "\t%s\tthread_id=%lu\texec_time=%lu\terror_code=%d\n", + get_type_str(), (ulong) thread_id, (ulong) exec_time, + error_code); } if (!(flags & LOG_EVENT_SUPPRESS_USE_F) && db) @@ -1551,15 +1679,15 @@ void Query_log_event::print_query_header(FILE* file, if (different_db= memcmp(print_event_info->db, db, db_len + 1)) memcpy(print_event_info->db, db, db_len + 1); if (db[0] && different_db) - fprintf(file, "use %s;\n", db); + my_b_printf(file, "use %s;\n", db); } end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10); *end++=';'; *end++='\n'; - my_fwrite(file, (byte*) buff, (uint) (end-buff),MYF(MY_NABP | MY_WME)); + my_b_write(file, (byte*) buff, (uint) (end-buff)); if (flags & LOG_EVENT_THREAD_SPECIFIC_F) - fprintf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); + my_b_printf(file,"SET @@session.pseudo_thread_id=%lu;\n",(ulong)thread_id); /* If flags2_inited==0, this is an event from 3.23 or 4.0; nothing to @@ -1581,14 +1709,14 @@ void Query_log_event::print_query_header(FILE* file, if (unlikely(tmp)) /* some bits have changed */ { bool need_comma= 0; - fprintf(file, "SET "); + my_b_printf(file, "SET "); print_set_option(file, tmp, OPTION_NO_FOREIGN_KEY_CHECKS, ~flags2, "@@session.foreign_key_checks", &need_comma); print_set_option(file, tmp, OPTION_AUTO_IS_NULL, flags2, "@@session.sql_auto_is_null", &need_comma); print_set_option(file, tmp, OPTION_RELAXED_UNIQUE_CHECKS, ~flags2, "@@session.unique_checks", &need_comma); - fprintf(file,";\n"); + my_b_printf(file,";\n"); print_event_info->flags2= flags2; } } @@ -1616,14 +1744,14 @@ void Query_log_event::print_query_header(FILE* file, } if (unlikely(print_event_info->sql_mode != sql_mode)) { - fprintf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); + my_b_printf(file,"SET @@session.sql_mode=%lu;\n",(ulong)sql_mode); print_event_info->sql_mode= sql_mode; } } if (print_event_info->auto_increment_increment != auto_increment_increment || print_event_info->auto_increment_offset != auto_increment_offset) { - fprintf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", + my_b_printf(file,"SET @@session.auto_increment_increment=%lu, @@session.auto_increment_offset=%lu;\n", auto_increment_increment,auto_increment_offset); print_event_info->auto_increment_increment= auto_increment_increment; print_event_info->auto_increment_offset= auto_increment_offset; @@ -1643,16 +1771,17 @@ void Query_log_event::print_query_header(FILE* file, CHARSET_INFO *cs_info= get_charset(uint2korr(charset), MYF(MY_WME)); if (cs_info) { - fprintf(file, "/*!\\C %s */;\n", cs_info->csname); /* for mysql client */ + /* for mysql client */ + my_b_printf(file, "/*!\\C %s */;\n", cs_info->csname); } - fprintf(file,"SET " - "@@session.character_set_client=%d," - "@@session.collation_connection=%d," - "@@session.collation_server=%d" - ";\n", - uint2korr(charset), - uint2korr(charset+2), - uint2korr(charset+4)); + my_b_printf(file,"SET " + "@@session.character_set_client=%d," + "@@session.collation_connection=%d," + "@@session.collation_server=%d" + ";\n", + uint2korr(charset), + uint2korr(charset+2), + uint2korr(charset+4)); memcpy(print_event_info->charset, charset, 6); } } @@ -1660,7 +1789,7 @@ void Query_log_event::print_query_header(FILE* file, { if (bcmp(print_event_info->time_zone_str, time_zone_str, time_zone_len+1)) { - fprintf(file,"SET @@session.time_zone='%s';\n", time_zone_str); + my_b_printf(file,"SET @@session.time_zone='%s';\n", time_zone_str); memcpy(print_event_info->time_zone_str, time_zone_str, time_zone_len+1); } } @@ -1669,9 +1798,11 @@ void Query_log_event::print_query_header(FILE* file, void Query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { - print_query_header(file, print_event_info); - my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); - fputs(";\n", file); + Write_on_release_cache cache(&print_event_info->head_cache, file); + + print_query_header(&cache, print_event_info); + my_b_write(&cache, (byte*) query, q_len); + my_b_printf(&cache, ";\n"); } #endif /* MYSQL_CLIENT */ @@ -1971,18 +2102,23 @@ void Start_log_event_v3::pack_info(Protocol *protocol) #ifdef MYSQL_CLIENT void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + DBUG_ENTER("Start_log_event_v3::print"); + + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tStart: binlog v %d, server v %s created ", binlog_version, - server_version); - print_timestamp(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tStart: binlog v %d, server v %s created ", + binlog_version, server_version); + print_timestamp(&cache); if (created) - fprintf(file," at startup"); - fputc('\n', file); + my_b_printf(&cache," at startup"); + my_b_printf(&cache, "\n"); if (flags & LOG_EVENT_BINLOG_IN_USE_F) - fprintf(file, "# Warning: this binlog was not closed properly. " - "Most probably mysqld crashed writing it.\n"); + my_b_printf(&cache, "# Warning: this binlog was not closed properly. " + "Most probably mysqld crashed writing it.\n"); } if (!artificial_event && created) { @@ -1993,12 +2129,12 @@ void Start_log_event_v3::print(FILE* file, PRINT_EVENT_INFO* print_event_info) and rollback unfinished transaction. Probably this can be done with RESET CONNECTION (syntax to be defined). */ - fprintf(file,"RESET CONNECTION;\n"); + my_b_printf(&cache,"RESET CONNECTION;\n"); #else - fprintf(file,"ROLLBACK;\n"); + my_b_printf(&cache,"ROLLBACK;\n"); #endif } - fflush(file); + DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -2722,15 +2858,17 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) } -void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, +void Load_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info, bool commented) { + Write_on_release_cache cache(&print_event_info->head_cache, file_arg); + DBUG_ENTER("Load_log_event::print"); if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n", - thread_id, exec_time); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tQuery\tthread_id=%ld\texec_time=%ld\n", + thread_id, exec_time); } bool different_db= 1; @@ -2748,65 +2886,65 @@ void Load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, } if (db && db[0] && different_db) - fprintf(file, "%suse %s;\n", + my_b_printf(&cache, "%suse %s;\n", commented ? "# " : "", db); if (flags & LOG_EVENT_THREAD_SPECIFIC_F) - fprintf(file,"%sSET @@session.pseudo_thread_id=%lu;\n", + my_b_printf(&cache,"%sSET @@session.pseudo_thread_id=%lu;\n", commented ? "# " : "", (ulong)thread_id); - fprintf(file, "%sLOAD DATA ", + my_b_printf(&cache, "%sLOAD DATA ", commented ? "# " : ""); if (check_fname_outside_temp_buf()) - fprintf(file, "LOCAL "); - fprintf(file, "INFILE '%-*s' ", fname_len, fname); + my_b_printf(&cache, "LOCAL "); + my_b_printf(&cache, "INFILE '%-*s' ", fname_len, fname); if (sql_ex.opt_flags & REPLACE_FLAG) - fprintf(file," REPLACE "); + my_b_printf(&cache," REPLACE "); else if (sql_ex.opt_flags & IGNORE_FLAG) - fprintf(file," IGNORE "); + my_b_printf(&cache," IGNORE "); - fprintf(file, "INTO TABLE `%s`", table_name); - fprintf(file, " FIELDS TERMINATED BY "); - pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); + my_b_printf(&cache, "INTO TABLE `%s`", table_name); + my_b_printf(&cache, " FIELDS TERMINATED BY "); + pretty_print_str(&cache, sql_ex.field_term, sql_ex.field_term_len); if (sql_ex.opt_flags & OPT_ENCLOSED_FLAG) - fprintf(file," OPTIONALLY "); - fprintf(file, " ENCLOSED BY "); - pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); + my_b_printf(&cache," OPTIONALLY "); + my_b_printf(&cache, " ENCLOSED BY "); + pretty_print_str(&cache, sql_ex.enclosed, sql_ex.enclosed_len); - fprintf(file, " ESCAPED BY "); - pretty_print_str(file, sql_ex.escaped, sql_ex.escaped_len); + my_b_printf(&cache, " ESCAPED BY "); + pretty_print_str(&cache, sql_ex.escaped, sql_ex.escaped_len); - fprintf(file," LINES TERMINATED BY "); - pretty_print_str(file, sql_ex.line_term, sql_ex.line_term_len); + my_b_printf(&cache," LINES TERMINATED BY "); + pretty_print_str(&cache, sql_ex.line_term, sql_ex.line_term_len); if (sql_ex.line_start) { - fprintf(file," STARTING BY "); - pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); + my_b_printf(&cache," STARTING BY "); + pretty_print_str(&cache, sql_ex.line_start, sql_ex.line_start_len); } if ((long) skip_lines > 0) - fprintf(file, " IGNORE %ld LINES", (long) skip_lines); + my_b_printf(&cache, " IGNORE %ld LINES", (long) skip_lines); if (num_fields) { uint i; const char* field = fields; - fprintf(file, " ("); + my_b_printf(&cache, " ("); for (i = 0; i < num_fields; i++) { if (i) - fputc(',', file); - fprintf(file, field); + my_b_printf(&cache, ","); + my_b_printf(&cache, field); field += field_lens[i] + 1; } - fputc(')', file); + my_b_printf(&cache, ")"); } - fprintf(file, ";\n"); + my_b_printf(&cache, ";\n"); DBUG_VOID_RETURN; } #endif /* MYSQL_CLIENT */ @@ -3139,17 +3277,16 @@ void Rotate_log_event::pack_info(Protocol *protocol) void Rotate_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { char buf[22]; + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); if (print_event_info->short_form) return; - print_header(file, print_event_info); - fprintf(file, "\tRotate to "); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tRotate to "); if (new_log_ident) - my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, - MYF(MY_NABP | MY_WME)); - fprintf(file, " pos: %s", llstr(pos, buf)); - fputc('\n', file); - fflush(file); + my_b_write(&cache, (byte*) new_log_ident, (uint)ident_len); + my_b_printf(&cache, " pos: %s\n", llstr(pos, buf)); } #endif /* MYSQL_CLIENT */ @@ -3364,14 +3501,16 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) char llbuff[22]; const char *msg; LINT_INIT(msg); + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tIntvar\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tIntvar\n"); } - fprintf(file, "SET "); + my_b_printf(&cache, "SET "); switch (type) { case LAST_INSERT_ID_EVENT: msg="LAST_INSERT_ID"; @@ -3384,8 +3523,7 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) msg="INVALID_INT"; break; } - fprintf(file, "%s=%s;\n", msg, llstr(val,llbuff)); - fflush(file); + my_b_printf(&cache, "%s=%s;\n", msg, llstr(val,llbuff)); } #endif @@ -3454,15 +3592,17 @@ bool Rand_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + char llbuff[22],llbuff2[22]; if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tRand\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tRand\n"); } - fprintf(file, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", - llstr(seed1, llbuff),llstr(seed2, llbuff2)); - fflush(file); + my_b_printf(&cache, "SET @@RAND_SEED1=%s, @@RAND_SEED2=%s;\n", + llstr(seed1, llbuff),llstr(seed2, llbuff2)); } #endif /* MYSQL_CLIENT */ @@ -3524,16 +3664,18 @@ bool Xid_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void Xid_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { char buf[64]; longlong10_to_str(xid, buf, 10); - print_header(file, print_event_info); - fprintf(file, "\tXid = %s\n", buf); - fflush(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tXid = %s\n", buf); } - fprintf(file, "COMMIT;\n"); + my_b_printf(&cache, "COMMIT;\n"); } #endif /* MYSQL_CLIENT */ @@ -3723,19 +3865,22 @@ bool User_var_log_event::write(IO_CACHE* file) #ifdef MYSQL_CLIENT void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tUser_var\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tUser_var\n"); } - fprintf(file, "SET @`"); - my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME)); - fprintf(file, "`"); + my_b_printf(&cache, "SET @`"); + my_b_write(&cache, (byte*) name, (uint) (name_len)); + my_b_printf(&cache, "`"); if (is_null) { - fprintf(file, ":=NULL;\n"); + my_b_printf(&cache, ":=NULL;\n"); } else { @@ -3743,12 +3888,12 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) case REAL_RESULT: double real_val; float8get(real_val, val); - fprintf(file, ":=%.14g;\n", real_val); + my_b_printf(&cache, ":=%.14g;\n", real_val); break; case INT_RESULT: char int_buf[22]; longlong10_to_str(uint8korr(val), int_buf, -10); - fprintf(file, ":=%s;\n", int_buf); + my_b_printf(&cache, ":=%s;\n", int_buf); break; case DECIMAL_RESULT: { @@ -3764,7 +3909,7 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) bin2decimal(val+2, &dec, precision, scale); decimal2string(&dec, str_buf, &str_len, 0, 0, 0); str_buf[str_len]= 0; - fprintf(file, ":=%s;\n",str_buf); + my_b_printf(&cache, ":=%s;\n",str_buf); break; } case STRING_RESULT: @@ -3800,9 +3945,9 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) Generate an unusable command (=> syntax error) is probably the best thing we can do here. */ - fprintf(file, ":=???;\n"); + my_b_printf(&cache, ":=???;\n"); else - fprintf(file, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); + my_b_printf(&cache, ":=_%s %s COLLATE `%s`;\n", cs->csname, hex_str, cs->name); my_afree(hex_str); } break; @@ -3812,7 +3957,6 @@ void User_var_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) return; } } - fflush(file); } #endif @@ -3896,13 +4040,14 @@ int User_var_log_event::exec_event(struct st_relay_log_info* rli) #ifdef HAVE_REPLICATION #ifdef MYSQL_CLIENT -void Unknown_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) +void Unknown_log_event::print(FILE* file_arg, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file_arg); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "# %s", "Unknown event\n"); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n# %s", "Unknown event\n"); } #endif @@ -3969,12 +4114,13 @@ Slave_log_event::~Slave_log_event() #ifdef MYSQL_CLIENT void Slave_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + char llbuff[22]; if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "\ + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n\ Slave: master_host: '%s' master_port: %d master_log: '%s' master_pos: %s\n", master_host, master_port, master_log, llstr(master_pos, llbuff)); } @@ -4054,12 +4200,14 @@ int Slave_log_event::exec_event(struct st_relay_log_info* rli) #ifdef MYSQL_CLIENT void Stop_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file, + Write_on_release_cache::FLUSH_F); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fprintf(file, "\tStop\n"); - fflush(file); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\tStop\n"); } #endif /* MYSQL_CLIENT */ @@ -4234,6 +4382,8 @@ Create_file_log_event::Create_file_log_event(const char* buf, uint len, void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, bool enable_local) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) { if (enable_local && check_fname_outside_temp_buf()) @@ -4249,10 +4399,10 @@ void Create_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info That one is for "file_id: etc" below: in mysqlbinlog we want the #, in SHOW BINLOG EVENTS we don't. */ - fprintf(file, "#"); + my_b_printf(&cache, "#"); } - fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len); + my_b_printf(&cache, " file_id: %d block_len: %d\n", file_id, block_len); } @@ -4422,12 +4572,13 @@ bool Append_block_log_event::write(IO_CACHE* file) void Append_block_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#%s: file_id: %d block_len: %d\n", - get_type_str(), file_id, block_len); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#%s: file_id: %d block_len: %d\n", + get_type_str(), file_id, block_len); } #endif /* MYSQL_CLIENT */ @@ -4565,11 +4716,12 @@ bool Delete_file_log_event::write(IO_CACHE* file) void Delete_file_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#Delete_file: file_id=%u\n", file_id); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#Delete_file: file_id=%u\n", file_id); } #endif /* MYSQL_CLIENT */ @@ -4660,12 +4812,13 @@ bool Execute_load_log_event::write(IO_CACHE* file) void Execute_load_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info) { + Write_on_release_cache cache(&print_event_info->head_cache, file); + if (print_event_info->short_form) return; - print_header(file, print_event_info); - fputc('\n', file); - fprintf(file, "#Exec_load: file_id=%d\n", - file_id); + print_header(&cache, print_event_info, FALSE); + my_b_printf(&cache, "\n#Exec_load: file_id=%d\n", + file_id); } #endif @@ -4882,29 +5035,30 @@ void Execute_load_query_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info, const char *local_fname) { - print_query_header(file, print_event_info); + Write_on_release_cache cache(&print_event_info->head_cache, file); + + print_query_header(&cache, print_event_info); if (local_fname) { - my_fwrite(file, (byte*) query, fn_pos_start, MYF(MY_NABP | MY_WME)); - fprintf(file, " LOCAL INFILE \'"); - fprintf(file, local_fname); - fprintf(file, "\'"); + my_b_write(&cache, (byte*) query, fn_pos_start); + my_b_printf(&cache, " LOCAL INFILE \'"); + my_b_printf(&cache, local_fname); + my_b_printf(&cache, "\'"); if (dup_handling == LOAD_DUP_REPLACE) - fprintf(file, " REPLACE"); - fprintf(file, " INTO"); - my_fwrite(file, (byte*) query + fn_pos_end, q_len-fn_pos_end, - MYF(MY_NABP | MY_WME)); - fprintf(file, ";\n"); + my_b_printf(&cache, " REPLACE"); + my_b_printf(&cache, " INTO"); + my_b_write(&cache, (byte*) query + fn_pos_end, q_len-fn_pos_end); + my_b_printf(&cache, ";\n"); } else { - my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)); - fprintf(file, ";\n"); + my_b_write(&cache, (byte*) query, q_len); + my_b_printf(&cache, ";\n"); } if (!print_event_info->short_form) - fprintf(file, "# file_id: %d \n", file_id); + my_b_printf(&cache, "# file_id: %d \n", file_id); } #endif @@ -5639,6 +5793,31 @@ void Rows_log_event::pack_info(Protocol *protocol) } #endif +#ifdef MYSQL_CLIENT +void Rows_log_event::print_helper(FILE *file, + PRINT_EVENT_INFO *print_event_info, + char const *const name) +{ + IO_CACHE *const head= &print_event_info->head_cache; + IO_CACHE *const body= &print_event_info->body_cache; + if (!print_event_info->short_form) + { + bool const last_stmt_event= get_flags(STMT_END_F); + print_header(head, print_event_info, !last_stmt_event); + my_b_printf(head, "\t%s: table id %lu", name, m_table_id); + print_base64(body, print_event_info, !last_stmt_event); + } + + if (get_flags(STMT_END_F)) + { + my_b_copy_to_file(head, file); + my_b_copy_to_file(body, file); + reinit_io_cache(head, WRITE_CACHE, 0, FALSE, TRUE); + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); + } +} +#endif + /************************************************************************** Table_map_log_event member functions **************************************************************************/ @@ -6049,10 +6228,11 @@ void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info) { if (!print_event_info->short_form) { - print_header(file, print_event_info); - fprintf(file, "\tTable_map: `%s`.`%s` mapped to number %lu\n", - m_dbnam, m_tblnam, m_table_id); - print_base64(file, print_event_info); + print_header(&print_event_info->head_cache, print_event_info, TRUE); + my_b_printf(&print_event_info->head_cache, + "\tTable_map: `%s`.`%s` mapped to number %lu\n", + m_dbnam, m_tblnam, m_table_id); + print_base64(&print_event_info->body_cache, print_event_info, TRUE); } } #endif @@ -6315,12 +6495,7 @@ int Write_rows_log_event::do_exec_row(TABLE *table) #ifdef MYSQL_CLIENT void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tWrite_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Write_rows"); } #endif @@ -6644,12 +6819,7 @@ int Delete_rows_log_event::do_exec_row(TABLE *table) void Delete_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tDelete_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Delete_rows"); } #endif @@ -6800,12 +6970,7 @@ int Update_rows_log_event::do_exec_row(TABLE *table) void Update_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info) { - if (!print_event_info->short_form) - { - print_header(file, print_event_info); - fprintf(file, "\tUpdate_rows: table id %lu", m_table_id); - print_base64(file, print_event_info); - } + Rows_log_event::print_helper(file, print_event_info, "Update_rows"); } #endif diff --git a/sql/log_event.h b/sql/log_event.h index 36933f4a7dd..68e165e97ef 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -519,14 +519,30 @@ typedef struct st_print_event_info bzero(db, sizeof(db)); bzero(charset, sizeof(charset)); bzero(time_zone_str, sizeof(time_zone_str)); + uint const flags = MYF(MY_WME | MY_NABP); + init_io_cache(&head_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags); + init_io_cache(&body_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags); } + ~st_print_event_info() { + end_io_cache(&head_cache); + end_io_cache(&body_cache); + } + + /* Settings on how to print the events */ bool short_form; bool base64_output; my_off_t hexdump_from; uint8 common_header_len; + /* + These two caches are used by the row-based replication events to + collect the header information and the main body of the events + making up a statement. + */ + IO_CACHE head_cache; + IO_CACHE body_cache; } PRINT_EVENT_INFO; #endif @@ -637,9 +653,11 @@ public: const Format_description_log_event *description_event); /* print*() functions are used by mysqlbinlog */ virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; - void print_timestamp(FILE* file, time_t *ts = 0); - void print_header(FILE* file, PRINT_EVENT_INFO* print_event_info); - void print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info); + void print_timestamp(IO_CACHE* file, time_t *ts = 0); + void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, + bool is_more); + void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info, + bool is_more); #endif static void *operator new(size_t size) @@ -804,7 +822,7 @@ public: uint32 q_len_arg); #endif /* HAVE_REPLICATION */ #else - void print_query_header(FILE* file, PRINT_EVENT_INFO* print_event_info); + void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info); void print(FILE* file, PRINT_EVENT_INFO* print_event_info); #endif @@ -1843,6 +1861,10 @@ protected: Log_event_type event_type, const Format_description_log_event *description_event); +#ifdef MYSQL_CLIENT + void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name); +#endif + #ifndef MYSQL_CLIENT virtual int do_add_row_data(byte *data, my_size_t length); #endif diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 0939ad66cd0..23ca5330053 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -31,6 +31,7 @@ void mysql_client_binlog_statement(THD* thd) { + DBUG_ENTER("mysql_client_binlog_statement"); DBUG_PRINT("info",("binlog base64: '%*s'", (thd->lex->comment.length < 2048 ? thd->lex->comment.length : 2048), @@ -43,8 +44,8 @@ void mysql_client_binlog_statement(THD* thd) my_bool nsok= thd->net.no_send_ok; thd->net.no_send_ok= TRUE; - const my_size_t coded_len= thd->lex->comment.length + 1; - const my_size_t event_len= base64_needed_decoded_length(coded_len); + my_size_t coded_len= thd->lex->comment.length + 1; + my_size_t decoded_len= base64_needed_decoded_length(coded_len); DBUG_ASSERT(coded_len > 0); /* @@ -57,9 +58,8 @@ void mysql_client_binlog_statement(THD* thd) new Format_description_log_event(4); const char *error= 0; - char *buf= (char *) my_malloc(event_len, MYF(MY_WME)); + char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME)); Log_event *ev = 0; - int res; /* Out of memory check @@ -73,43 +73,97 @@ void mysql_client_binlog_statement(THD* thd) thd->rli_fake->sql_thd= thd; thd->rli_fake->no_storage= TRUE; - res= base64_decode(thd->lex->comment.str, coded_len, buf); - - DBUG_PRINT("info",("binlog base64 decoded_len=%d, event_len=%d\n", - res, uint4korr(buf + EVENT_LEN_OFFSET))); - /* - Note that 'res' is the correct event length, 'event_len' was - calculated based on the base64-string that possibly contained - extra spaces, so it can be longer than the real event. - */ - if (res < EVENT_LEN_OFFSET - || (uint) res != uint4korr(buf+EVENT_LEN_OFFSET)) + for (char const *strptr= thd->lex->comment.str ; + strptr < thd->lex->comment.str + thd->lex->comment.length ; ) { - my_error(ER_SYNTAX_ERROR, MYF(0)); - goto end; - } + char const *endptr= 0; + int bytes_decoded= base64_decode(strptr, coded_len, buf, &endptr); - ev= Log_event::read_log_event(buf, res, &error, desc); + DBUG_PRINT("info", + ("bytes_decoded=%d; strptr=0x%lu; endptr=0x%lu ('%c':%d)", + bytes_decoded, strptr, endptr, *endptr, *endptr)); + + if (bytes_decoded < 0) + { + my_error(ER_BASE64_DECODE_ERROR, MYF(0)); + goto end; + } + else if (bytes_decoded == 0) + break; // If no bytes where read, the string contained only whitespace + + DBUG_ASSERT(bytes_decoded > 0); + DBUG_ASSERT(endptr > strptr); + coded_len-= endptr - strptr; + strptr= endptr; - DBUG_PRINT("info",("binlog base64 err=%s", error)); - if (!ev) - { /* - This could actually be an out-of-memory, but it is more - likely causes by a bad statement + Now we have one or more events stored in the buffer. The size of + the buffer is computed based on how much base64-encoded data + there were, so there should be ample space for the data (maybe + even too much, since a statement can consist of a considerable + number of events). + + TODO: Switch to use a stream-based base64 encoder/decoder in + order to be able to read exactly what is necessary. */ - my_error(ER_SYNTAX_ERROR, MYF(0)); - goto end; - } - DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); - DBUG_PRINT("info",("buf+EVENT_TYPE_OFFSET=%d", buf+EVENT_TYPE_OFFSET)); + DBUG_PRINT("info",("binlog base64 decoded_len=%d, bytes_decoded=%d", + decoded_len, bytes_decoded)); - ev->thd= thd; - if (ev->exec_event(thd->rli_fake)) - { - my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement"); - goto end; + /* + Now we start to read events of the buffer, until there are no + more. + */ + for (char *bufptr= buf ; bytes_decoded > 0 ; ) + { + /* + Checking that the first event in the buffer is not truncated. + */ + ulong event_len= uint4korr(bufptr + EVENT_LEN_OFFSET); + DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d", + event_len, bytes_decoded)); + if (bytes_decoded < EVENT_LEN_OFFSET || (uint) bytes_decoded < event_len) + { + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + ev= Log_event::read_log_event(bufptr, event_len, &error, desc); + + DBUG_PRINT("info",("binlog base64 err=%s", error)); + if (!ev) + { + /* + This could actually be an out-of-memory, but it is more likely + causes by a bad statement + */ + my_error(ER_SYNTAX_ERROR, MYF(0)); + goto end; + } + + bytes_decoded -= event_len; + bufptr += event_len; + + DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code())); + DBUG_PRINT("info",("bufptr+EVENT_TYPE_OFFSET=0x%lx", + bufptr+EVENT_TYPE_OFFSET)); + DBUG_PRINT("info", ("bytes_decoded=%d; bufptr=0x%lx; buf[EVENT_LEN_OFFSET]=%u", + bytes_decoded, bufptr, uint4korr(bufptr+EVENT_LEN_OFFSET))); + ev->thd= thd; + if (int err= ev->exec_event(thd->rli_fake)) + { + DBUG_PRINT("info", ("exec_event() - error=%d", error)); + /* + TODO: Maybe a better error message since the BINLOG statement + now contains several events. + */ + my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement"); + goto end; + } + + delete ev; + ev= 0; + } } /* @@ -126,10 +180,7 @@ end: */ thd->net.no_send_ok= nsok; - if (ev) - delete ev; - if (desc) - delete desc; - if (buf) - my_free(buf, MYF(0)); + delete desc; + my_free(buf, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_VOID_RETURN; } From b8941bbc8f853aca954429117c378f9df2864e8c Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Mon, 9 Oct 2006 13:47:06 +0200 Subject: [PATCH 07/10] Post-merge fixes. --- mysql-test/t/view.test | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/mysql-test/t/view.test b/mysql-test/t/view.test index 934e0624fd6..0eadb33c3e1 100644 --- a/mysql-test/t/view.test +++ b/mysql-test/t/view.test @@ -347,13 +347,13 @@ create view v3 (x,y,z) as select b, a, b from t1; create view v4 (x,y,z) as select c+1, b, a from t1; create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1; # try insert to VIEW with fields duplicate --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v3 values (-60,4,30); # try insert to VIEW with expression in SELECT list --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v4 values (-60,4,30); # try insert to VIEW using temporary table algorithm --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v5 values (-60,4,30); insert into v1 values (-60,4,30); insert into v1 (z,y,x) values (50,6,-100); @@ -375,13 +375,13 @@ create view v3 (x,y,z) as select b, a, b from t1; create view v4 (x,y,z) as select c+1, b, a from t1; create algorithm=temptable view v5 (x,y,z) as select c, b, a from t1; # try insert to VIEW with fields duplicate --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v3 select c, b, a from t2; # try insert to VIEW with expression in SELECT list --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v4 select c, b, a from t2; # try insert to VIEW using temporary table algorithm --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v5 select c, b, a from t2; insert into v1 select c, b, a from t2; insert into v1 (z,y,x) select a+20,b+2,-100 from t2; @@ -1249,14 +1249,14 @@ drop table t1; # create table t1 (s1 smallint); create view v1 as select * from t1 where 20 < (select (s1) from t1); --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v1 values (30); create view v2 as select * from t1; create view v3 as select * from t1 where 20 < (select (s1) from v2); --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v3 values (30); create view v4 as select * from v2 where 20 < (select (s1) from t1); --- error 1573 +-- error ER_NON_INSERTABLE_TABLE insert into v4 values (30); drop view v4, v3, v2, v1; drop table t1; @@ -2861,7 +2861,7 @@ DROP TABLE t1; # create table t1 (s1 int); create view v1 as select s1 as a, s1 as b from t1; ---error 1573 +--error ER_NON_INSERTABLE_TABLE insert into v1 values (1,1); update v1 set a = 5; drop view v1; From 0b6ff112b241d73a34dff5f873af8840190efdb4 Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Tue, 10 Oct 2006 12:29:24 +0200 Subject: [PATCH 08/10] Fix to make it build on all platforms. Replacing C++ code with C code in a C file. --- mysys/mf_iocache2.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c index 57a06e263a6..895f03dc714 100644 --- a/mysys/mf_iocache2.c +++ b/mysys/mf_iocache2.c @@ -52,12 +52,13 @@ int my_b_copy_to_file(IO_CACHE *cache, FILE *file) { byte buf[IO_SIZE]; + uint bytes_in_cache; DBUG_ENTER("my_b_copy_to_file"); /* Reinit the cache to read from the beginning of the cache */ if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) DBUG_RETURN(1); - uint bytes_in_cache= my_b_bytes_in_cache(cache); + bytes_in_cache= my_b_bytes_in_cache(cache); while (bytes_in_cache > 0) { uint const read_bytes= min(bytes_in_cache, sizeof(buf)); DBUG_PRINT("debug", ("Remaining %u bytes - Reading %u bytes", From 799585c5ac03929991d54f701e8991f45fd6073c Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Tue, 10 Oct 2006 14:58:40 +0200 Subject: [PATCH 09/10] Fixes to make replication team tree build on Windows. --- sql/log_event.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/log_event.cc b/sql/log_event.cc index 097caf56da2..5eca92fabe4 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -1083,7 +1083,7 @@ void Log_event::print_header(IO_CACHE* file, ptr[14], ptr[15], ptr[16], ptr[17], ptr[18]); DBUG_ASSERT(bytes_written >= 0); DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); - my_b_write(file, emit_buf, bytes_written); + my_b_write(file, (byte*) emit_buf, bytes_written); ptr += LOG_EVENT_MINIMAL_HEADER_LEN; hexdump_from += LOG_EVENT_MINIMAL_HEADER_LEN; } @@ -1114,7 +1114,7 @@ void Log_event::print_header(IO_CACHE* file, hex_string, char_string); DBUG_ASSERT(bytes_written >= 0); DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); - my_b_write(file, emit_buf, bytes_written); + my_b_write(file, (byte*) emit_buf, bytes_written); hex_string[0]= 0; char_string[0]= 0; c= char_string; @@ -1135,7 +1135,7 @@ void Log_event::print_header(IO_CACHE* file, hex_string, char_string); DBUG_ASSERT(bytes_written >= 0); DBUG_ASSERT(static_cast(bytes_written) < sizeof(emit_buf)); - my_b_write(file, emit_buf, bytes_written); + my_b_write(file, (byte*) emit_buf, bytes_written); } } DBUG_VOID_RETURN; From 6c3b1f6eb0a070b3e600f00638c46094415c90b2 Mon Sep 17 00:00:00 2001 From: "mats@romeo.(none)" <> Date: Tue, 10 Oct 2006 15:18:21 +0200 Subject: [PATCH 10/10] BUG#21474 (There is a rotation before the last table map): Removing code to step the group log position and just stepping the event log position. If the group log position were stepped one time too many, it might be that the group starts at a position that is not possible, e.g., at a Rows_log_event, or between an Intvar_log_event and the following associated Query_log_event. --- sql/log_event.cc | 26 +++++++++++++------------- sql/slave.cc | 27 ++++++++++++++++----------- 2 files changed, 29 insertions(+), 24 deletions(-) diff --git a/sql/log_event.cc b/sql/log_event.cc index faff2cae48e..accf606998e 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2381,19 +2381,19 @@ int Format_description_log_event::exec_event(struct st_relay_log_info* rli) if (server_id == (uint32) ::server_id) { /* - Do not modify rli->group_master_log_pos, as this event did not exist on - the master. That is, just update the *relay log* coordinates; this is - done by passing log_pos=0 to inc_group_relay_log_pos, like we do in - Stop_log_event::exec_event(). - If in a transaction, don't touch group_* coordinates. - */ - if (thd->options & OPTION_BEGIN) - rli->inc_event_relay_log_pos(); - else - { - rli->inc_group_relay_log_pos(0); - flush_relay_log_info(rli); - } + We only increase the relay log position if we are skipping + events and do not touch any group_* variables, nor flush the + relay log info. If there is a crash, we will have to re-skip + the events again, but that is a minor issue. + + If we do not skip stepping the group log position (and the + server id was changed when restarting the server), it might well + be that we start executing at a position that is invalid, e.g., + at a Rows_log_event or a Query_log_event preceeded by a + Intvar_log_event instead of starting at a Table_map_log_event or + the Intvar_log_event respectively. + */ + rli->inc_event_relay_log_pos(); DBUG_RETURN(0); } diff --git a/sql/slave.cc b/sql/slave.cc index f9645fc83e3..f9a847b9c26 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -3101,17 +3101,22 @@ static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli) type_code != START_EVENT_V3 && type_code!= FORMAT_DESCRIPTION_EVENT)) { DBUG_PRINT("info", ("event skipped")); - if (thd->options & OPTION_BEGIN) - rli->inc_event_relay_log_pos(); - else - { - rli->inc_group_relay_log_pos((type_code == ROTATE_EVENT || - type_code == STOP_EVENT || - type_code == FORMAT_DESCRIPTION_EVENT) ? - LL(0) : ev->log_pos, - 1/* skip lock*/); - flush_relay_log_info(rli); - } + /* + We only skip the event here and do not increase the group log + position. In the event that we have to restart, this means + that we might have to skip the event again, but that is a + minor issue. + + If we were to increase the group log position when skipping an + event, it might be that we are restarting at the wrong + position and have events before that we should have executed, + so not increasing the group log position is a sure bet in this + case. + + In this way, we just step the group log position when we + *know* that we are at the end of a group. + */ + rli->inc_event_relay_log_pos(); /* Protect against common user error of setting the counter to 1