From 77bd1beac8545a3cf3b9cc386d30b994a461932e Mon Sep 17 00:00:00 2001 From: Kristian Nielsen Date: Tue, 11 Jul 2023 23:30:04 +0200 Subject: [PATCH] MDEV-31273: Replace Log_event::writer with function parameter This is a preparatory patch for precomputing binlog checksums outside of holding LOCK_log, no functional changes. Replace Log_event::writer with just passing the writer object as a function parameter to Log_event::write(). This is mainly for code clarity. Having to set ev->writer before every call to ev->write() is error-prone (what if it's forgotten in some code place?), while passing it as parameter as usual makes it explicit how the dataflow is. As a minor point, it also improves the code, as the compiler now can save the function parameter in a register across nested calls (when it is a class member, compiler needs to reload across nested calls in case the object would be modified during the call). Reviewed-by: Monty Signed-off-by: Kristian Nielsen --- sql/log_event.h | 92 ++++++++--------- sql/log_event_server.cc | 220 ++++++++++++++++++++-------------------- 2 files changed, 155 insertions(+), 157 deletions(-) diff --git a/sql/log_event.h b/sql/log_event.h index 848ad0cead9..8eb543ca39d 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1304,8 +1304,6 @@ public: */ enum enum_binlog_checksum_alg checksum_alg; - Log_event_writer *writer; - #ifdef MYSQL_SERVER THD* thd; @@ -1464,24 +1462,26 @@ public: static void operator delete(void*, void*) { } #ifdef MYSQL_SERVER - bool write_header(size_t event_data_length); - bool write_data(const uchar *buf, size_t data_length) + bool write_header(Log_event_writer *writer, size_t event_data_length); + bool write_data(Log_event_writer *writer, const uchar *buf, size_t data_length) { return writer->write_data(buf, data_length); } - bool write_data(const char *buf, size_t data_length) - { return write_data((uchar*)buf, data_length); } - bool write_footer() + bool write_data(Log_event_writer *writer, const char *buf, size_t data_length) + { return write_data(writer, (uchar*)buf, data_length); } + bool write_footer(Log_event_writer *writer) { return writer->write_footer(); } my_bool need_checksum(); - virtual bool write() + virtual bool write(Log_event_writer *writer) { - return write_header(get_data_size()) || write_data_header() || - write_data_body() || write_footer(); + return write_header(writer, get_data_size()) || + write_data_header(writer) || + write_data_body(writer) || + write_footer(writer); } - virtual bool write_data_header() + virtual bool write_data_header(Log_event_writer *writer) { return 0; } - virtual bool write_data_body() + virtual bool write_data_body(Log_event_writer *writer) { return 0; } /* Return start of query time or current time */ @@ -2230,8 +2230,8 @@ public: static int begin_event(String *packet, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); #ifdef MYSQL_SERVER - bool write(); - virtual bool write_post_header_for_derived() { return FALSE; } + bool write(Log_event_writer *writer); + virtual bool write_post_header_for_derived(Log_event_writer *writer) { return FALSE; } #endif bool is_valid() const { return query != 0; } @@ -2311,7 +2311,7 @@ public: ulong query_length, bool using_trans, bool direct, bool suppress_use, int error); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif }; @@ -2376,14 +2376,14 @@ public: memcpy(nonce, nonce_arg, BINLOG_NONCE_LENGTH); } - bool write_data_body() + bool write_data_body(Log_event_writer *writer) { uchar scheme_buf= crypto_scheme; uchar key_version_buf[BINLOG_KEY_VERSION_LENGTH]; int4store(key_version_buf, key_version); - return write_data(&scheme_buf, sizeof(scheme_buf)) || - write_data(key_version_buf, sizeof(key_version_buf)) || - write_data(nonce, BINLOG_NONCE_LENGTH); + return write_data(writer, &scheme_buf, sizeof(scheme_buf)) || + write_data(writer, key_version_buf, sizeof(key_version_buf)) || + write_data(writer, nonce, BINLOG_NONCE_LENGTH); } #else bool print(FILE* file, PRINT_EVENT_INFO* print_event_info); @@ -2536,7 +2536,7 @@ public: Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;} my_off_t get_header_len(my_off_t) { return LOG_EVENT_MINIMAL_HEADER_LEN; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #ifdef HAVE_REPLICATION void pack_info(Protocol* protocol); #endif /* HAVE_REPLICATION */ @@ -2652,7 +2652,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg, const char* get_var_type_name(); int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;} #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif bool is_valid() const { return 1; } bool is_part_of_group() { return 1; } @@ -2732,7 +2732,7 @@ class Rand_log_event: public Log_event Log_event_type get_type_code() { return RAND_EVENT;} int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif bool is_valid() const { return 1; } bool is_part_of_group() { return 1; } @@ -2812,7 +2812,7 @@ public: Log_event_type get_type_code() { return XID_EVENT;} int get_data_size() { return sizeof(xid); } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif private: @@ -2962,7 +2962,7 @@ public: } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif private: @@ -3031,7 +3031,7 @@ public: ~User_var_log_event() = default; Log_event_type get_type_code() { return USER_VAR_EVENT;} #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); /* Getter and setter for deferred User-event. Returns true if the event is not applied directly @@ -3183,7 +3183,7 @@ public: int get_data_size() { return ident_len + ROTATE_HEADER_LEN;} bool is_valid() const { return new_log_ident != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); #endif private: @@ -3217,7 +3217,7 @@ public: int get_data_size() { return binlog_file_len + BINLOG_CHECKPOINT_HEADER_LEN;} bool is_valid() const { return binlog_file_name != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); enum_skip_reason do_shall_skip(rpl_group_info *rgi); #endif }; @@ -3382,7 +3382,7 @@ public: } bool is_valid() const { return seq_no != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); static int make_compatible_event(String *packet, bool *need_dummy_event, ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); static bool peek(const uchar *event_start, size_t event_len, @@ -3500,7 +3500,7 @@ public: bool is_valid() const { return list != NULL; } #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) bool to_packet(String *packet); - bool write(); + bool write(Log_event_writer *writer); virtual int do_apply_event(rpl_group_info *rgi); enum_skip_reason do_shall_skip(rpl_group_info *rgi); #endif @@ -3553,7 +3553,7 @@ public: int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;} bool is_valid() const { return block != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); const char* get_db() { return db; } #endif @@ -3594,7 +3594,7 @@ public: int get_data_size() { return DELETE_FILE_HEADER_LEN ;} bool is_valid() const { return file_id != 0; } #ifdef MYSQL_SERVER - bool write(); + bool write(Log_event_writer *writer); const char* get_db() { return db; } #endif @@ -3694,7 +3694,7 @@ public: ulong get_post_header_size_for_derived(); #ifdef MYSQL_SERVER - bool write_post_header_for_derived(); + bool write_post_header_for_derived(Log_event_writer *writer); #endif private: @@ -3762,8 +3762,8 @@ public: virtual bool is_part_of_group() { return 1; } #ifndef MYSQL_CLIENT - virtual bool write_data_header(); - virtual bool write_data_body(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); #endif #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) @@ -4430,8 +4430,8 @@ public: virtual int get_data_size() { return (uint) m_data_size; } #ifdef MYSQL_SERVER virtual int save_field_metadata(); - virtual bool write_data_header(); - virtual bool write_data_body(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); virtual const char *get_db() { return m_dbnam; } #endif @@ -4690,9 +4690,9 @@ public: #endif #ifdef MYSQL_SERVER - virtual bool write_data_header(); - virtual bool write_data_body(); - virtual bool write_compressed(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); + virtual bool write_compressed(Log_event_writer *writer); virtual const char *get_db() { return m_table->s->db.str; } #endif /* @@ -5008,7 +5008,7 @@ public: #if defined(MYSQL_SERVER) Write_rows_compressed_log_event(THD*, TABLE*, ulong table_id, bool is_transactional); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif #ifdef HAVE_REPLICATION Write_rows_compressed_log_event(const uchar *buf, uint event_len, @@ -5097,7 +5097,7 @@ public: #if defined(MYSQL_SERVER) Update_rows_compressed_log_event(THD*, TABLE*, ulong table_id, bool is_transactional); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif #ifdef HAVE_REPLICATION Update_rows_compressed_log_event(const uchar *buf, uint event_len, @@ -5187,7 +5187,7 @@ class Delete_rows_compressed_log_event : public Delete_rows_log_event public: #if defined(MYSQL_SERVER) Delete_rows_compressed_log_event(THD*, TABLE*, ulong, bool is_transactional); - virtual bool write(); + virtual bool write(Log_event_writer *writer); #endif #ifdef HAVE_REPLICATION Delete_rows_compressed_log_event(const uchar *buf, uint event_len, @@ -5277,8 +5277,8 @@ public: #ifdef MYSQL_SERVER void pack_info(Protocol*); - virtual bool write_data_header(); - virtual bool write_data_body(); + virtual bool write_data_header(Log_event_writer *writer); + virtual bool write_data_body(Log_event_writer *writer); #endif Incident_log_event(const uchar *buf, uint event_len, @@ -5415,9 +5415,7 @@ private: inline int Log_event_writer::write(Log_event *ev) { - ev->writer= this; - int res= ev->write(); - IF_DBUG(ev->writer= 0,); // writer must be set before every Log_event::write + int res= ev->write(this); add_status(ev->logged_status()); return res; } diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index b31c9f9b77b..9f1213ff303 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -903,7 +903,7 @@ int Log_event_writer::write_footer() Log_event::write_header() */ -bool Log_event::write_header(size_t event_data_length) +bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length) { uchar header[LOG_EVENT_HEADER_LEN]; ulong now; @@ -1061,7 +1061,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src, will print! */ -bool Query_log_event::write() +bool Query_log_event::write(Log_event_writer *writer) { uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS]; uchar *start, *start_of_status; @@ -1310,16 +1310,16 @@ bool Query_log_event::write() event_length= ((uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len); - return write_header(event_length) || - write_data(buf, QUERY_HEADER_LEN) || - write_post_header_for_derived() || - write_data(start_of_status, (uint) status_vars_len) || - write_data(db, db_len + 1) || - write_data(query, q_len) || - write_footer(); + return write_header(writer, event_length) || + write_data(writer, buf, QUERY_HEADER_LEN) || + write_post_header_for_derived(writer) || + write_data(writer, start_of_status, (uint) status_vars_len) || + write_data(writer, db, db_len + 1) || + write_data(writer, query, q_len) || + write_footer(writer); } -bool Query_compressed_log_event::write() +bool Query_compressed_log_event::write(Log_event_writer *writer) { uchar *buffer; uint32 alloc_size, compressed_size; @@ -1338,7 +1338,7 @@ bool Query_compressed_log_event::write() uint32 q_len_tmp= q_len; query= (char*) buffer; q_len= compressed_size; - ret= Query_log_event::write(); + ret= Query_log_event::write(writer); query= query_tmp; q_len= q_len_tmp; } @@ -2452,7 +2452,7 @@ void Format_description_log_event::pack_info(Protocol *protocol) } #endif /* defined(HAVE_REPLICATION) */ -bool Format_description_log_event::write() +bool Format_description_log_event::write(Log_event_writer *writer) { bool ret; bool no_checksum; @@ -2500,11 +2500,11 @@ bool Format_description_log_event::write() { checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway } - ret= write_header(rec_size) || - write_data(buff, sizeof(buff)) || - write_data(post_header_len, number_of_event_types) || - write_data(&checksum_byte, sizeof(checksum_byte)) || - write_footer(); + ret= write_header(writer, rec_size) || + write_data(writer, buff, sizeof(buff)) || + write_data(writer, post_header_len, number_of_event_types) || + write_data(writer, &checksum_byte, sizeof(checksum_byte)) || + write_footer(writer); if (no_checksum) checksum_alg= BINLOG_CHECKSUM_ALG_OFF; return ret; @@ -2714,14 +2714,14 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg, } -bool Rotate_log_event::write() +bool Rotate_log_event::write(Log_event_writer *writer) { char buf[ROTATE_HEADER_LEN]; int8store(buf + R_POS_OFFSET, pos); - return (write_header(ROTATE_HEADER_LEN + ident_len) || - write_data(buf, ROTATE_HEADER_LEN) || - write_data(new_log_ident, (uint) ident_len) || - write_footer()); + return (write_header(writer, ROTATE_HEADER_LEN + ident_len) || + write_data(writer, buf, ROTATE_HEADER_LEN) || + write_data(writer, new_log_ident, (uint) ident_len) || + write_footer(writer)); } @@ -2871,14 +2871,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event( } -bool Binlog_checkpoint_log_event::write() +bool Binlog_checkpoint_log_event::write(Log_event_writer *writer) { uchar buf[BINLOG_CHECKPOINT_HEADER_LEN]; int4store(buf, binlog_file_len); - return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || - write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) || - write_data(binlog_file_name, binlog_file_len) || - write_footer(); + return write_header(writer, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) || + write_data(writer, buf, BINLOG_CHECKPOINT_HEADER_LEN) || + write_data(writer, binlog_file_name, binlog_file_len) || + write_footer(writer); } @@ -3004,7 +3004,7 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len, bool -Gtid_log_event::write() +Gtid_log_event::write(Log_event_writer *writer) { uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4]; size_t write_len= 13; @@ -3052,9 +3052,9 @@ Gtid_log_event::write() bzero(buf+write_len, GTID_HEADER_LEN-write_len); write_len= GTID_HEADER_LEN; } - return write_header(write_len) || - write_data(buf, write_len) || - write_footer(); + return write_header(writer, write_len) || + write_data(writer, buf, write_len) || + write_footer(writer); } @@ -3342,7 +3342,7 @@ Gtid_list_log_event::to_packet(String *packet) bool -Gtid_list_log_event::write() +Gtid_list_log_event::write(Log_event_writer *writer) { char buf[128]; String packet(buf, sizeof(buf), system_charset_info); @@ -3350,9 +3350,9 @@ Gtid_list_log_event::write() packet.length(0); if (to_packet(&packet)) return true; - return write_header(get_data_size()) || - write_data(packet.ptr(), packet.length()) || - write_footer(); + return write_header(writer, get_data_size()) || + write_data(writer, packet.ptr(), packet.length()) || + write_footer(writer); } @@ -3446,14 +3446,14 @@ void Intvar_log_event::pack_info(Protocol *protocol) #endif -bool Intvar_log_event::write() +bool Intvar_log_event::write(Log_event_writer *writer) { uchar buf[9]; buf[I_TYPE_OFFSET]= (uchar) type; int8store(buf + I_VAL_OFFSET, val); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -3525,14 +3525,14 @@ void Rand_log_event::pack_info(Protocol *protocol) #endif -bool Rand_log_event::write() +bool Rand_log_event::write(Log_event_writer *writer) { uchar buf[16]; int8store(buf + RAND_SEED1_OFFSET, seed1); int8store(buf + RAND_SEED2_OFFSET, seed2); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -3781,12 +3781,12 @@ int Xid_log_event::do_commit() #endif -bool Xid_log_event::write() +bool Xid_log_event::write(Log_event_writer *writer) { DBUG_EXECUTE_IF("do_not_write_xid", return 0;); - return write_header(sizeof(xid)) || - write_data((uchar*)&xid, sizeof(xid)) || - write_footer(); + return write_header(writer, sizeof(xid)) || + write_data(writer, (uchar*)&xid, sizeof(xid)) || + write_footer(writer); } /************************************************************************** @@ -3831,7 +3831,7 @@ int XA_prepare_log_event::do_commit() #endif // HAVE_REPLICATION -bool XA_prepare_log_event::write() +bool XA_prepare_log_event::write(Log_event_writer *writer) { uchar data[1 + 4 + 4 + 4]= {one_phase,}; uint8 one_phase_byte= one_phase; @@ -3842,14 +3842,14 @@ bool XA_prepare_log_event::write() DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1); - return write_header(sizeof(one_phase_byte) + xid_subheader_no_data + + return write_header(writer, sizeof(one_phase_byte) + xid_subheader_no_data + static_cast(xid)->gtrid_length + static_cast(xid)->bqual_length) || - write_data(data, sizeof(data)) || - write_data((uchar*) static_cast(xid)->data, + write_data(writer, data, sizeof(data)) || + write_data(writer, (uchar*) static_cast(xid)->data, static_cast(xid)->gtrid_length + static_cast(xid)->bqual_length) || - write_footer(); + write_footer(writer); } @@ -3973,7 +3973,7 @@ void User_var_log_event::pack_info(Protocol* protocol) #endif // HAVE_REPLICATION -bool User_var_log_event::write() +bool User_var_log_event::write(Log_event_writer *writer) { char buf[UV_NAME_LEN_SIZE]; char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE + @@ -4028,13 +4028,13 @@ bool User_var_log_event::write() /* Length of the whole event */ event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len; - return write_header(event_length) || - write_data(buf, sizeof(buf)) || - write_data(name, name_len) || - write_data(buf1, buf1_length) || - write_data(pos, val_len) || - write_data(&flags, unsigned_len) || - write_footer(); + return write_header(writer, event_length) || + write_data(writer, buf, sizeof(buf)) || + write_data(writer, name, name_len) || + write_data(writer, buf1, buf1_length) || + write_data(writer, pos, val_len) || + write_data(writer, &flags, unsigned_len) || + write_footer(writer); } @@ -4239,14 +4239,14 @@ Append_block_log_event::Append_block_log_event(THD *thd_arg, } -bool Append_block_log_event::write() +bool Append_block_log_event::write(Log_event_writer *writer) { uchar buf[APPEND_BLOCK_HEADER_LEN]; int4store(buf + AB_FILE_ID_OFFSET, file_id); - return write_header(APPEND_BLOCK_HEADER_LEN + block_len) || - write_data(buf, APPEND_BLOCK_HEADER_LEN) || - write_data(block, block_len) || - write_footer(); + return write_header(writer, APPEND_BLOCK_HEADER_LEN + block_len) || + write_data(writer, buf, APPEND_BLOCK_HEADER_LEN) || + write_data(writer, block, block_len) || + write_footer(writer); } @@ -4349,13 +4349,13 @@ Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg, } -bool Delete_file_log_event::write() +bool Delete_file_log_event::write(Log_event_writer *writer) { uchar buf[DELETE_FILE_HEADER_LEN]; int4store(buf + DF_FILE_ID_OFFSET, file_id); - return write_header(sizeof(buf)) || - write_data(buf, sizeof(buf)) || - write_footer(); + return write_header(writer, sizeof(buf)) || + write_data(writer, buf, sizeof(buf)) || + write_footer(writer); } @@ -4438,14 +4438,14 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg, bool -Execute_load_query_log_event::write_post_header_for_derived() +Execute_load_query_log_event::write_post_header_for_derived(Log_event_writer *writer) { uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN]; int4store(buf, file_id); int4store(buf + 4, fn_pos_start); int4store(buf + 4 + 4, fn_pos_end); *(buf + 4 + 4 + 4)= (uchar) dup_handling; - return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); + return write_data(writer, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN); } @@ -5423,7 +5423,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi) #endif /* defined(HAVE_REPLICATION) */ -bool Rows_log_event::write_data_header() +bool Rows_log_event::write_data_header(Log_event_writer *writer) { uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer DBUG_ASSERT(m_table_id != ~0ULL); @@ -5431,14 +5431,14 @@ bool Rows_log_event::write_data_header() { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (write_data(buf, 6)); + return (write_data(writer, buf, 6)); }); int6store(buf + RW_MAPID_OFFSET, m_table_id); int2store(buf + RW_FLAGS_OFFSET, m_flags); - return write_data(buf, ROWS_HEADER_LEN_V1); + return write_data(writer, buf, ROWS_HEADER_LEN_V1); } -bool Rows_log_event::write_data_body() +bool Rows_log_event::write_data_body(Log_event_writer *writer) { /* Note that this should be the number of *bits*, not the number of @@ -5451,10 +5451,10 @@ bool Rows_log_event::write_data_body() DBUG_ASSERT(static_cast(sbuf_end - sbuf) <= sizeof(sbuf)); DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf)); - res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf)); + res= res || write_data(writer, sbuf, (size_t) (sbuf_end - sbuf)); DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols)); - res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); + res= res || write_data(writer, (uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols)); /* TODO[refactor write]: Remove the "down cast" here (and elsewhere). */ @@ -5462,17 +5462,17 @@ bool Rows_log_event::write_data_body() { DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai)); - res= res || write_data((uchar*)m_cols_ai.bitmap, + res= res || write_data(writer, (uchar*)m_cols_ai.bitmap, no_bytes_in_map(&m_cols_ai)); } DBUG_DUMP("rows", m_rows_buf, data_size); - res= res || write_data(m_rows_buf, (size_t) data_size); + res= res || write_data(writer, m_rows_buf, (size_t) data_size); return res; } -bool Rows_log_event::write_compressed() +bool Rows_log_event::write_compressed(Log_event_writer *writer) { uchar *m_rows_buf_tmp= m_rows_buf; uchar *m_rows_cur_tmp= m_rows_cur; @@ -5486,7 +5486,7 @@ bool Rows_log_event::write_compressed() (uint32)(m_rows_cur_tmp - m_rows_buf_tmp), &comlen)) { m_rows_cur= comlen + m_rows_buf; - ret= Log_event::write(); + ret= Log_event::write(writer); } my_safe_afree(m_rows_buf, alloc_size); m_rows_buf= m_rows_buf_tmp; @@ -5528,15 +5528,15 @@ Annotate_rows_log_event::Annotate_rows_log_event(THD *thd, } -bool Annotate_rows_log_event::write_data_header() +bool Annotate_rows_log_event::write_data_header(Log_event_writer *writer) { return 0; } -bool Annotate_rows_log_event::write_data_body() +bool Annotate_rows_log_event::write_data_body(Log_event_writer *writer) { - return write_data(m_query_txt, m_query_len); + return write_data(writer, m_query_txt, m_query_len); } @@ -5983,7 +5983,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi) #endif /* defined(HAVE_REPLICATION) */ -bool Table_map_log_event::write_data_header() +bool Table_map_log_event::write_data_header(Log_event_writer *writer) { DBUG_ASSERT(m_table_id != ~0ULL); uchar buf[TABLE_MAP_HEADER_LEN]; @@ -5991,14 +5991,14 @@ bool Table_map_log_event::write_data_header() { int4store(buf + 0, m_table_id); int2store(buf + 4, m_flags); - return (write_data(buf, 6)); + return (write_data(writer, buf, 6)); }); int6store(buf + TM_MAPID_OFFSET, m_table_id); int2store(buf + TM_FLAGS_OFFSET, m_flags); - return write_data(buf, TABLE_MAP_HEADER_LEN); + return write_data(writer, buf, TABLE_MAP_HEADER_LEN); } -bool Table_map_log_event::write_data_body() +bool Table_map_log_event::write_data_body(Log_event_writer *writer) { DBUG_ASSERT(m_dbnam != NULL); DBUG_ASSERT(m_tblnam != NULL); @@ -6019,17 +6019,17 @@ bool Table_map_log_event::write_data_body() uchar mbuf[MAX_INT_WIDTH]; uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size); - return write_data(dbuf, sizeof(dbuf)) || - write_data(m_dbnam, m_dblen+1) || - write_data(tbuf, sizeof(tbuf)) || - write_data(m_tblnam, m_tbllen+1) || - write_data(cbuf, (size_t) (cbuf_end - cbuf)) || - write_data(m_coltype, m_colcnt) || - write_data(mbuf, (size_t) (mbuf_end - mbuf)) || - write_data(m_field_metadata, m_field_metadata_size), - write_data(m_null_bits, (m_colcnt + 7) / 8) || - write_data((const uchar*) m_metadata_buf.ptr(), - m_metadata_buf.length()); + return write_data(writer, dbuf, sizeof(dbuf)) || + write_data(writer, m_dbnam, m_dblen+1) || + write_data(writer, tbuf, sizeof(tbuf)) || + write_data(writer, m_tblnam, m_tbllen+1) || + write_data(writer, cbuf, (size_t) (cbuf_end - cbuf)) || + write_data(writer, m_coltype, m_colcnt) || + write_data(writer, mbuf, (size_t) (mbuf_end - mbuf)) || + write_data(writer, m_field_metadata, m_field_metadata_size), + write_data(writer, m_null_bits, (m_colcnt + 7) / 8) || + write_data(writer, (const uchar*) m_metadata_buf.ptr(), + m_metadata_buf.length()); } /** @@ -6457,9 +6457,9 @@ Write_rows_compressed_log_event::Write_rows_compressed_log_event( m_type = WRITE_ROWS_COMPRESSED_EVENT_V1; } -bool Write_rows_compressed_log_event::write() +bool Write_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } @@ -7717,9 +7717,9 @@ Delete_rows_compressed_log_event::Delete_rows_compressed_log_event( m_type= DELETE_ROWS_COMPRESSED_EVENT_V1; } -bool Delete_rows_compressed_log_event::write() +bool Delete_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } @@ -7855,9 +7855,9 @@ Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg, m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1; } -bool Update_rows_compressed_log_event::write() +bool Update_rows_compressed_log_event::write(Log_event_writer *writer) { - return Rows_log_event::write_compressed(); + return Rows_log_event::write_compressed(writer); } void Update_rows_log_event::init(MY_BITMAP const *cols) @@ -8139,23 +8139,23 @@ int Incident_log_event::do_apply_event(rpl_group_info *rgi) bool -Incident_log_event::write_data_header() +Incident_log_event::write_data_header(Log_event_writer *writer) { DBUG_ENTER("Incident_log_event::write_data_header"); DBUG_PRINT("enter", ("m_incident: %d", m_incident)); uchar buf[sizeof(int16)]; int2store(buf, (int16) m_incident); - DBUG_RETURN(write_data(buf, sizeof(buf))); + DBUG_RETURN(write_data(writer, buf, sizeof(buf))); } bool -Incident_log_event::write_data_body() +Incident_log_event::write_data_body(Log_event_writer *writer) { uchar tmp[1]; DBUG_ENTER("Incident_log_event::write_data_body"); tmp[0]= (uchar) m_message.length; - DBUG_RETURN(write_data(tmp, sizeof(tmp)) || - write_data(m_message.str, m_message.length)); + DBUG_RETURN(write_data(writer, tmp, sizeof(tmp)) || + write_data(writer, m_message.str, m_message.length)); }