MDEV-31273: Replace Log_event::writer with function parameter

This is a preparatory patch for precomputing binlog checksums outside
of holding LOCK_log, no functional changes.

Replace Log_event::writer with just passing the writer object as a
function parameter to Log_event::write().

This is mainly for code clarity. Having to set ev->writer before every
call to ev->write() is error-prone (what if it's forgotten in some
code place?), while passing it as parameter as usual makes it explicit
how the dataflow is.

As a minor point, it also improves the code, as the compiler now can
save the function parameter in a register across nested calls (when it
is a class member, compiler needs to reload across nested calls in
case the object would be modified during the call).

Reviewed-by: Monty <monty@mariadb.org>
Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen 2023-07-11 23:30:04 +02:00
parent d8dda7c14f
commit 77bd1beac8
2 changed files with 155 additions and 157 deletions

View File

@ -1304,8 +1304,6 @@ public:
*/
enum enum_binlog_checksum_alg checksum_alg;
Log_event_writer *writer;
#ifdef MYSQL_SERVER
THD* thd;
@ -1464,24 +1462,26 @@ public:
static void operator delete(void*, void*) { }
#ifdef MYSQL_SERVER
bool write_header(size_t event_data_length);
bool write_data(const uchar *buf, size_t data_length)
bool write_header(Log_event_writer *writer, size_t event_data_length);
bool write_data(Log_event_writer *writer, const uchar *buf, size_t data_length)
{ return writer->write_data(buf, data_length); }
bool write_data(const char *buf, size_t data_length)
{ return write_data((uchar*)buf, data_length); }
bool write_footer()
bool write_data(Log_event_writer *writer, const char *buf, size_t data_length)
{ return write_data(writer, (uchar*)buf, data_length); }
bool write_footer(Log_event_writer *writer)
{ return writer->write_footer(); }
my_bool need_checksum();
virtual bool write()
virtual bool write(Log_event_writer *writer)
{
return write_header(get_data_size()) || write_data_header() ||
write_data_body() || write_footer();
return write_header(writer, get_data_size()) ||
write_data_header(writer) ||
write_data_body(writer) ||
write_footer(writer);
}
virtual bool write_data_header()
virtual bool write_data_header(Log_event_writer *writer)
{ return 0; }
virtual bool write_data_body()
virtual bool write_data_body(Log_event_writer *writer)
{ return 0; }
/* Return start of query time or current time */
@ -2230,8 +2230,8 @@ public:
static int begin_event(String *packet, ulong ev_offset,
enum enum_binlog_checksum_alg checksum_alg);
#ifdef MYSQL_SERVER
bool write();
virtual bool write_post_header_for_derived() { return FALSE; }
bool write(Log_event_writer *writer);
virtual bool write_post_header_for_derived(Log_event_writer *writer) { return FALSE; }
#endif
bool is_valid() const { return query != 0; }
@ -2311,7 +2311,7 @@ public:
ulong query_length,
bool using_trans, bool direct, bool suppress_use,
int error);
virtual bool write();
virtual bool write(Log_event_writer *writer);
#endif
};
@ -2376,14 +2376,14 @@ public:
memcpy(nonce, nonce_arg, BINLOG_NONCE_LENGTH);
}
bool write_data_body()
bool write_data_body(Log_event_writer *writer)
{
uchar scheme_buf= crypto_scheme;
uchar key_version_buf[BINLOG_KEY_VERSION_LENGTH];
int4store(key_version_buf, key_version);
return write_data(&scheme_buf, sizeof(scheme_buf)) ||
write_data(key_version_buf, sizeof(key_version_buf)) ||
write_data(nonce, BINLOG_NONCE_LENGTH);
return write_data(writer, &scheme_buf, sizeof(scheme_buf)) ||
write_data(writer, key_version_buf, sizeof(key_version_buf)) ||
write_data(writer, nonce, BINLOG_NONCE_LENGTH);
}
#else
bool print(FILE* file, PRINT_EVENT_INFO* print_event_info);
@ -2536,7 +2536,7 @@ public:
Log_event_type get_type_code() { return FORMAT_DESCRIPTION_EVENT;}
my_off_t get_header_len(my_off_t) { return LOG_EVENT_MINIMAL_HEADER_LEN; }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
#endif /* HAVE_REPLICATION */
@ -2652,7 +2652,7 @@ Intvar_log_event(THD* thd_arg,uchar type_arg, ulonglong val_arg,
const char* get_var_type_name();
int get_data_size() { return 9; /* sizeof(type) + sizeof(val) */;}
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
#endif
bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; }
@ -2732,7 +2732,7 @@ class Rand_log_event: public Log_event
Log_event_type get_type_code() { return RAND_EVENT;}
int get_data_size() { return 16; /* sizeof(ulonglong) * 2*/ }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
#endif
bool is_valid() const { return 1; }
bool is_part_of_group() { return 1; }
@ -2812,7 +2812,7 @@ public:
Log_event_type get_type_code() { return XID_EVENT;}
int get_data_size() { return sizeof(xid); }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
#endif
private:
@ -2962,7 +2962,7 @@ public:
}
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
#endif
private:
@ -3031,7 +3031,7 @@ public:
~User_var_log_event() = default;
Log_event_type get_type_code() { return USER_VAR_EVENT;}
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
/*
Getter and setter for deferred User-event.
Returns true if the event is not applied directly
@ -3183,7 +3183,7 @@ public:
int get_data_size() { return ident_len + ROTATE_HEADER_LEN;}
bool is_valid() const { return new_log_ident != 0; }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
#endif
private:
@ -3217,7 +3217,7 @@ public:
int get_data_size() { return binlog_file_len + BINLOG_CHECKPOINT_HEADER_LEN;}
bool is_valid() const { return binlog_file_name != 0; }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
};
@ -3382,7 +3382,7 @@ public:
}
bool is_valid() const { return seq_no != 0; }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
static int make_compatible_event(String *packet, bool *need_dummy_event,
ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg);
static bool peek(const uchar *event_start, size_t event_len,
@ -3500,7 +3500,7 @@ public:
bool is_valid() const { return list != NULL; }
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
bool to_packet(String *packet);
bool write();
bool write(Log_event_writer *writer);
virtual int do_apply_event(rpl_group_info *rgi);
enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif
@ -3553,7 +3553,7 @@ public:
int get_data_size() { return block_len + APPEND_BLOCK_HEADER_LEN ;}
bool is_valid() const { return block != 0; }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
const char* get_db() { return db; }
#endif
@ -3594,7 +3594,7 @@ public:
int get_data_size() { return DELETE_FILE_HEADER_LEN ;}
bool is_valid() const { return file_id != 0; }
#ifdef MYSQL_SERVER
bool write();
bool write(Log_event_writer *writer);
const char* get_db() { return db; }
#endif
@ -3694,7 +3694,7 @@ public:
ulong get_post_header_size_for_derived();
#ifdef MYSQL_SERVER
bool write_post_header_for_derived();
bool write_post_header_for_derived(Log_event_writer *writer);
#endif
private:
@ -3762,8 +3762,8 @@ public:
virtual bool is_part_of_group() { return 1; }
#ifndef MYSQL_CLIENT
virtual bool write_data_header();
virtual bool write_data_body();
virtual bool write_data_header(Log_event_writer *writer);
virtual bool write_data_body(Log_event_writer *writer);
#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
@ -4430,8 +4430,8 @@ public:
virtual int get_data_size() { return (uint) m_data_size; }
#ifdef MYSQL_SERVER
virtual int save_field_metadata();
virtual bool write_data_header();
virtual bool write_data_body();
virtual bool write_data_header(Log_event_writer *writer);
virtual bool write_data_body(Log_event_writer *writer);
virtual const char *get_db() { return m_dbnam; }
#endif
@ -4690,9 +4690,9 @@ public:
#endif
#ifdef MYSQL_SERVER
virtual bool write_data_header();
virtual bool write_data_body();
virtual bool write_compressed();
virtual bool write_data_header(Log_event_writer *writer);
virtual bool write_data_body(Log_event_writer *writer);
virtual bool write_compressed(Log_event_writer *writer);
virtual const char *get_db() { return m_table->s->db.str; }
#endif
/*
@ -5008,7 +5008,7 @@ public:
#if defined(MYSQL_SERVER)
Write_rows_compressed_log_event(THD*, TABLE*, ulong table_id,
bool is_transactional);
virtual bool write();
virtual bool write(Log_event_writer *writer);
#endif
#ifdef HAVE_REPLICATION
Write_rows_compressed_log_event(const uchar *buf, uint event_len,
@ -5097,7 +5097,7 @@ public:
#if defined(MYSQL_SERVER)
Update_rows_compressed_log_event(THD*, TABLE*, ulong table_id,
bool is_transactional);
virtual bool write();
virtual bool write(Log_event_writer *writer);
#endif
#ifdef HAVE_REPLICATION
Update_rows_compressed_log_event(const uchar *buf, uint event_len,
@ -5187,7 +5187,7 @@ class Delete_rows_compressed_log_event : public Delete_rows_log_event
public:
#if defined(MYSQL_SERVER)
Delete_rows_compressed_log_event(THD*, TABLE*, ulong, bool is_transactional);
virtual bool write();
virtual bool write(Log_event_writer *writer);
#endif
#ifdef HAVE_REPLICATION
Delete_rows_compressed_log_event(const uchar *buf, uint event_len,
@ -5277,8 +5277,8 @@ public:
#ifdef MYSQL_SERVER
void pack_info(Protocol*);
virtual bool write_data_header();
virtual bool write_data_body();
virtual bool write_data_header(Log_event_writer *writer);
virtual bool write_data_body(Log_event_writer *writer);
#endif
Incident_log_event(const uchar *buf, uint event_len,
@ -5415,9 +5415,7 @@ private:
inline int Log_event_writer::write(Log_event *ev)
{
ev->writer= this;
int res= ev->write();
IF_DBUG(ev->writer= 0,); // writer must be set before every Log_event::write
int res= ev->write(this);
add_status(ev->logged_status());
return res;
}

View File

@ -903,7 +903,7 @@ int Log_event_writer::write_footer()
Log_event::write_header()
*/
bool Log_event::write_header(size_t event_data_length)
bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length)
{
uchar header[LOG_EVENT_HEADER_LEN];
ulong now;
@ -1061,7 +1061,7 @@ static void store_str_with_code_and_len(uchar **dst, const char *src,
will print!
*/
bool Query_log_event::write()
bool Query_log_event::write(Log_event_writer *writer)
{
uchar buf[QUERY_HEADER_LEN + MAX_SIZE_LOG_EVENT_STATUS];
uchar *start, *start_of_status;
@ -1310,16 +1310,16 @@ bool Query_log_event::write()
event_length= ((uint) (start-buf) + get_post_header_size_for_derived() +
db_len + 1 + q_len);
return write_header(event_length) ||
write_data(buf, QUERY_HEADER_LEN) ||
write_post_header_for_derived() ||
write_data(start_of_status, (uint) status_vars_len) ||
write_data(db, db_len + 1) ||
write_data(query, q_len) ||
write_footer();
return write_header(writer, event_length) ||
write_data(writer, buf, QUERY_HEADER_LEN) ||
write_post_header_for_derived(writer) ||
write_data(writer, start_of_status, (uint) status_vars_len) ||
write_data(writer, db, db_len + 1) ||
write_data(writer, query, q_len) ||
write_footer(writer);
}
bool Query_compressed_log_event::write()
bool Query_compressed_log_event::write(Log_event_writer *writer)
{
uchar *buffer;
uint32 alloc_size, compressed_size;
@ -1338,7 +1338,7 @@ bool Query_compressed_log_event::write()
uint32 q_len_tmp= q_len;
query= (char*) buffer;
q_len= compressed_size;
ret= Query_log_event::write();
ret= Query_log_event::write(writer);
query= query_tmp;
q_len= q_len_tmp;
}
@ -2452,7 +2452,7 @@ void Format_description_log_event::pack_info(Protocol *protocol)
}
#endif /* defined(HAVE_REPLICATION) */
bool Format_description_log_event::write()
bool Format_description_log_event::write(Log_event_writer *writer)
{
bool ret;
bool no_checksum;
@ -2500,11 +2500,11 @@ bool Format_description_log_event::write()
{
checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
}
ret= write_header(rec_size) ||
write_data(buff, sizeof(buff)) ||
write_data(post_header_len, number_of_event_types) ||
write_data(&checksum_byte, sizeof(checksum_byte)) ||
write_footer();
ret= write_header(writer, rec_size) ||
write_data(writer, buff, sizeof(buff)) ||
write_data(writer, post_header_len, number_of_event_types) ||
write_data(writer, &checksum_byte, sizeof(checksum_byte)) ||
write_footer(writer);
if (no_checksum)
checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
return ret;
@ -2714,14 +2714,14 @@ Rotate_log_event::Rotate_log_event(const char* new_log_ident_arg,
}
bool Rotate_log_event::write()
bool Rotate_log_event::write(Log_event_writer *writer)
{
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
return (write_header(ROTATE_HEADER_LEN + ident_len) ||
write_data(buf, ROTATE_HEADER_LEN) ||
write_data(new_log_ident, (uint) ident_len) ||
write_footer());
return (write_header(writer, ROTATE_HEADER_LEN + ident_len) ||
write_data(writer, buf, ROTATE_HEADER_LEN) ||
write_data(writer, new_log_ident, (uint) ident_len) ||
write_footer(writer));
}
@ -2871,14 +2871,14 @@ Binlog_checkpoint_log_event::Binlog_checkpoint_log_event(
}
bool Binlog_checkpoint_log_event::write()
bool Binlog_checkpoint_log_event::write(Log_event_writer *writer)
{
uchar buf[BINLOG_CHECKPOINT_HEADER_LEN];
int4store(buf, binlog_file_len);
return write_header(BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
write_data(buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
write_data(binlog_file_name, binlog_file_len) ||
write_footer();
return write_header(writer, BINLOG_CHECKPOINT_HEADER_LEN + binlog_file_len) ||
write_data(writer, buf, BINLOG_CHECKPOINT_HEADER_LEN) ||
write_data(writer, binlog_file_name, binlog_file_len) ||
write_footer(writer);
}
@ -3004,7 +3004,7 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len,
bool
Gtid_log_event::write()
Gtid_log_event::write(Log_event_writer *writer)
{
uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4];
size_t write_len= 13;
@ -3052,9 +3052,9 @@ Gtid_log_event::write()
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
write_len= GTID_HEADER_LEN;
}
return write_header(write_len) ||
write_data(buf, write_len) ||
write_footer();
return write_header(writer, write_len) ||
write_data(writer, buf, write_len) ||
write_footer(writer);
}
@ -3342,7 +3342,7 @@ Gtid_list_log_event::to_packet(String *packet)
bool
Gtid_list_log_event::write()
Gtid_list_log_event::write(Log_event_writer *writer)
{
char buf[128];
String packet(buf, sizeof(buf), system_charset_info);
@ -3350,9 +3350,9 @@ Gtid_list_log_event::write()
packet.length(0);
if (to_packet(&packet))
return true;
return write_header(get_data_size()) ||
write_data(packet.ptr(), packet.length()) ||
write_footer();
return write_header(writer, get_data_size()) ||
write_data(writer, packet.ptr(), packet.length()) ||
write_footer(writer);
}
@ -3446,14 +3446,14 @@ void Intvar_log_event::pack_info(Protocol *protocol)
#endif
bool Intvar_log_event::write()
bool Intvar_log_event::write(Log_event_writer *writer)
{
uchar buf[9];
buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val);
return write_header(sizeof(buf)) ||
write_data(buf, sizeof(buf)) ||
write_footer();
return write_header(writer, sizeof(buf)) ||
write_data(writer, buf, sizeof(buf)) ||
write_footer(writer);
}
@ -3525,14 +3525,14 @@ void Rand_log_event::pack_info(Protocol *protocol)
#endif
bool Rand_log_event::write()
bool Rand_log_event::write(Log_event_writer *writer)
{
uchar buf[16];
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
return write_header(sizeof(buf)) ||
write_data(buf, sizeof(buf)) ||
write_footer();
return write_header(writer, sizeof(buf)) ||
write_data(writer, buf, sizeof(buf)) ||
write_footer(writer);
}
@ -3781,12 +3781,12 @@ int Xid_log_event::do_commit()
#endif
bool Xid_log_event::write()
bool Xid_log_event::write(Log_event_writer *writer)
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
return write_header(sizeof(xid)) ||
write_data((uchar*)&xid, sizeof(xid)) ||
write_footer();
return write_header(writer, sizeof(xid)) ||
write_data(writer, (uchar*)&xid, sizeof(xid)) ||
write_footer(writer);
}
/**************************************************************************
@ -3831,7 +3831,7 @@ int XA_prepare_log_event::do_commit()
#endif // HAVE_REPLICATION
bool XA_prepare_log_event::write()
bool XA_prepare_log_event::write(Log_event_writer *writer)
{
uchar data[1 + 4 + 4 + 4]= {one_phase,};
uint8 one_phase_byte= one_phase;
@ -3842,14 +3842,14 @@ bool XA_prepare_log_event::write()
DBUG_ASSERT(xid_subheader_no_data == sizeof(data) - 1);
return write_header(sizeof(one_phase_byte) + xid_subheader_no_data +
return write_header(writer, sizeof(one_phase_byte) + xid_subheader_no_data +
static_cast<XID*>(xid)->gtrid_length +
static_cast<XID*>(xid)->bqual_length) ||
write_data(data, sizeof(data)) ||
write_data((uchar*) static_cast<XID*>(xid)->data,
write_data(writer, data, sizeof(data)) ||
write_data(writer, (uchar*) static_cast<XID*>(xid)->data,
static_cast<XID*>(xid)->gtrid_length +
static_cast<XID*>(xid)->bqual_length) ||
write_footer();
write_footer(writer);
}
@ -3973,7 +3973,7 @@ void User_var_log_event::pack_info(Protocol* protocol)
#endif // HAVE_REPLICATION
bool User_var_log_event::write()
bool User_var_log_event::write(Log_event_writer *writer)
{
char buf[UV_NAME_LEN_SIZE];
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
@ -4028,13 +4028,13 @@ bool User_var_log_event::write()
/* Length of the whole event */
event_length= sizeof(buf)+ name_len + buf1_length + val_len + unsigned_len;
return write_header(event_length) ||
write_data(buf, sizeof(buf)) ||
write_data(name, name_len) ||
write_data(buf1, buf1_length) ||
write_data(pos, val_len) ||
write_data(&flags, unsigned_len) ||
write_footer();
return write_header(writer, event_length) ||
write_data(writer, buf, sizeof(buf)) ||
write_data(writer, name, name_len) ||
write_data(writer, buf1, buf1_length) ||
write_data(writer, pos, val_len) ||
write_data(writer, &flags, unsigned_len) ||
write_footer(writer);
}
@ -4239,14 +4239,14 @@ Append_block_log_event::Append_block_log_event(THD *thd_arg,
}
bool Append_block_log_event::write()
bool Append_block_log_event::write(Log_event_writer *writer)
{
uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
return write_header(APPEND_BLOCK_HEADER_LEN + block_len) ||
write_data(buf, APPEND_BLOCK_HEADER_LEN) ||
write_data(block, block_len) ||
write_footer();
return write_header(writer, APPEND_BLOCK_HEADER_LEN + block_len) ||
write_data(writer, buf, APPEND_BLOCK_HEADER_LEN) ||
write_data(writer, block, block_len) ||
write_footer(writer);
}
@ -4349,13 +4349,13 @@ Delete_file_log_event::Delete_file_log_event(THD *thd_arg, const char* db_arg,
}
bool Delete_file_log_event::write()
bool Delete_file_log_event::write(Log_event_writer *writer)
{
uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
return write_header(sizeof(buf)) ||
write_data(buf, sizeof(buf)) ||
write_footer();
return write_header(writer, sizeof(buf)) ||
write_data(writer, buf, sizeof(buf)) ||
write_footer(writer);
}
@ -4438,14 +4438,14 @@ Execute_load_query_log_event(THD *thd_arg, const char* query_arg,
bool
Execute_load_query_log_event::write_post_header_for_derived()
Execute_load_query_log_event::write_post_header_for_derived(Log_event_writer *writer)
{
uchar buf[EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN];
int4store(buf, file_id);
int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
return write_data(buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
return write_data(writer, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
}
@ -5423,7 +5423,7 @@ Rows_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* defined(HAVE_REPLICATION) */
bool Rows_log_event::write_data_header()
bool Rows_log_event::write_data_header(Log_event_writer *writer)
{
uchar buf[ROWS_HEADER_LEN_V2]; // No need to init the buffer
DBUG_ASSERT(m_table_id != ~0ULL);
@ -5431,14 +5431,14 @@ bool Rows_log_event::write_data_header()
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
return (write_data(buf, 6));
return (write_data(writer, buf, 6));
});
int6store(buf + RW_MAPID_OFFSET, m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
return write_data(buf, ROWS_HEADER_LEN_V1);
return write_data(writer, buf, ROWS_HEADER_LEN_V1);
}
bool Rows_log_event::write_data_body()
bool Rows_log_event::write_data_body(Log_event_writer *writer)
{
/*
Note that this should be the number of *bits*, not the number of
@ -5451,10 +5451,10 @@ bool Rows_log_event::write_data_body()
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
res= res || write_data(sbuf, (size_t) (sbuf_end - sbuf));
res= res || write_data(writer, sbuf, (size_t) (sbuf_end - sbuf));
DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
res= res || write_data((uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
res= res || write_data(writer, (uchar*)m_cols.bitmap, no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
*/
@ -5462,17 +5462,17 @@ bool Rows_log_event::write_data_body()
{
DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
res= res || write_data((uchar*)m_cols_ai.bitmap,
res= res || write_data(writer, (uchar*)m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
res= res || write_data(m_rows_buf, (size_t) data_size);
res= res || write_data(writer, m_rows_buf, (size_t) data_size);
return res;
}
bool Rows_log_event::write_compressed()
bool Rows_log_event::write_compressed(Log_event_writer *writer)
{
uchar *m_rows_buf_tmp= m_rows_buf;
uchar *m_rows_cur_tmp= m_rows_cur;
@ -5486,7 +5486,7 @@ bool Rows_log_event::write_compressed()
(uint32)(m_rows_cur_tmp - m_rows_buf_tmp), &comlen))
{
m_rows_cur= comlen + m_rows_buf;
ret= Log_event::write();
ret= Log_event::write(writer);
}
my_safe_afree(m_rows_buf, alloc_size);
m_rows_buf= m_rows_buf_tmp;
@ -5528,15 +5528,15 @@ Annotate_rows_log_event::Annotate_rows_log_event(THD *thd,
}
bool Annotate_rows_log_event::write_data_header()
bool Annotate_rows_log_event::write_data_header(Log_event_writer *writer)
{
return 0;
}
bool Annotate_rows_log_event::write_data_body()
bool Annotate_rows_log_event::write_data_body(Log_event_writer *writer)
{
return write_data(m_query_txt, m_query_len);
return write_data(writer, m_query_txt, m_query_len);
}
@ -5983,7 +5983,7 @@ int Table_map_log_event::do_update_pos(rpl_group_info *rgi)
#endif /* defined(HAVE_REPLICATION) */
bool Table_map_log_event::write_data_header()
bool Table_map_log_event::write_data_header(Log_event_writer *writer)
{
DBUG_ASSERT(m_table_id != ~0ULL);
uchar buf[TABLE_MAP_HEADER_LEN];
@ -5991,14 +5991,14 @@ bool Table_map_log_event::write_data_header()
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
return (write_data(buf, 6));
return (write_data(writer, buf, 6));
});
int6store(buf + TM_MAPID_OFFSET, m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
return write_data(buf, TABLE_MAP_HEADER_LEN);
return write_data(writer, buf, TABLE_MAP_HEADER_LEN);
}
bool Table_map_log_event::write_data_body()
bool Table_map_log_event::write_data_body(Log_event_writer *writer)
{
DBUG_ASSERT(m_dbnam != NULL);
DBUG_ASSERT(m_tblnam != NULL);
@ -6019,17 +6019,17 @@ bool Table_map_log_event::write_data_body()
uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
return write_data(dbuf, sizeof(dbuf)) ||
write_data(m_dbnam, m_dblen+1) ||
write_data(tbuf, sizeof(tbuf)) ||
write_data(m_tblnam, m_tbllen+1) ||
write_data(cbuf, (size_t) (cbuf_end - cbuf)) ||
write_data(m_coltype, m_colcnt) ||
write_data(mbuf, (size_t) (mbuf_end - mbuf)) ||
write_data(m_field_metadata, m_field_metadata_size),
write_data(m_null_bits, (m_colcnt + 7) / 8) ||
write_data((const uchar*) m_metadata_buf.ptr(),
m_metadata_buf.length());
return write_data(writer, dbuf, sizeof(dbuf)) ||
write_data(writer, m_dbnam, m_dblen+1) ||
write_data(writer, tbuf, sizeof(tbuf)) ||
write_data(writer, m_tblnam, m_tbllen+1) ||
write_data(writer, cbuf, (size_t) (cbuf_end - cbuf)) ||
write_data(writer, m_coltype, m_colcnt) ||
write_data(writer, mbuf, (size_t) (mbuf_end - mbuf)) ||
write_data(writer, m_field_metadata, m_field_metadata_size),
write_data(writer, m_null_bits, (m_colcnt + 7) / 8) ||
write_data(writer, (const uchar*) m_metadata_buf.ptr(),
m_metadata_buf.length());
}
/**
@ -6457,9 +6457,9 @@ Write_rows_compressed_log_event::Write_rows_compressed_log_event(
m_type = WRITE_ROWS_COMPRESSED_EVENT_V1;
}
bool Write_rows_compressed_log_event::write()
bool Write_rows_compressed_log_event::write(Log_event_writer *writer)
{
return Rows_log_event::write_compressed();
return Rows_log_event::write_compressed(writer);
}
@ -7717,9 +7717,9 @@ Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(
m_type= DELETE_ROWS_COMPRESSED_EVENT_V1;
}
bool Delete_rows_compressed_log_event::write()
bool Delete_rows_compressed_log_event::write(Log_event_writer *writer)
{
return Rows_log_event::write_compressed();
return Rows_log_event::write_compressed(writer);
}
@ -7855,9 +7855,9 @@ Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg,
m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1;
}
bool Update_rows_compressed_log_event::write()
bool Update_rows_compressed_log_event::write(Log_event_writer *writer)
{
return Rows_log_event::write_compressed();
return Rows_log_event::write_compressed(writer);
}
void Update_rows_log_event::init(MY_BITMAP const *cols)
@ -8139,23 +8139,23 @@ int Incident_log_event::do_apply_event(rpl_group_info *rgi)
bool
Incident_log_event::write_data_header()
Incident_log_event::write_data_header(Log_event_writer *writer)
{
DBUG_ENTER("Incident_log_event::write_data_header");
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident);
DBUG_RETURN(write_data(buf, sizeof(buf)));
DBUG_RETURN(write_data(writer, buf, sizeof(buf)));
}
bool
Incident_log_event::write_data_body()
Incident_log_event::write_data_body(Log_event_writer *writer)
{
uchar tmp[1];
DBUG_ENTER("Incident_log_event::write_data_body");
tmp[0]= (uchar) m_message.length;
DBUG_RETURN(write_data(tmp, sizeof(tmp)) ||
write_data(m_message.str, m_message.length));
DBUG_RETURN(write_data(writer, tmp, sizeof(tmp)) ||
write_data(writer, m_message.str, m_message.length));
}