MDEV-31273: Eliminate Log_event::checksum_alg

This is a preparatory commit for pre-computing checksums outside of
holding LOCK_log, no functional changes.

Which checksum algorithm is used (if any) when writing an event does not
belong in the event, it is a property of the log being written to.

Instead decide the checksum algorithm when constructing the
Log_event_writer object, and store it there.

Introduce a client-only Log_event::read_checksum_alg to be able to
print the checksum read, and a
Format_description_log_event::source_checksum_alg which is the
checksum algorithm (if any) to use when reading events from a log.

Also eliminate some redundant `enum` keywords on the enum_binlog_checksum_alg
type.

Reviewed-by: Monty <monty@mariadb.org>
Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
This commit is contained in:
Kristian Nielsen 2023-07-27 15:46:57 +02:00
parent 77bd1beac8
commit 8eee9806fb
14 changed files with 189 additions and 222 deletions

View File

@ -1,3 +1,4 @@
RESET MASTER;
# #
# MDEV-30698 Cover missing test cases for mariadb-binlog options # MDEV-30698 Cover missing test cases for mariadb-binlog options
# --raw [and] --flashback # --raw [and] --flashback

View File

@ -21,6 +21,8 @@
--source include/linux.inc --source include/linux.inc
--source include/have_log_bin.inc --source include/have_log_bin.inc
RESET MASTER;
--echo # --echo #
--echo # MDEV-30698 Cover missing test cases for mariadb-binlog options --echo # MDEV-30698 Cover missing test cases for mariadb-binlog options
--echo # --raw [and] --flashback --echo # --raw [and] --flashback

View File

@ -3863,23 +3863,21 @@ Event_log::write_description_event(enum_binlog_checksum_alg checksum_alg,
bool encrypt, bool dont_set_created, bool encrypt, bool dont_set_created,
bool is_relay_log) bool is_relay_log)
{ {
Format_description_log_event s(BINLOG_VERSION); Format_description_log_event s(BINLOG_VERSION, NULL, checksum_alg);
/* /*
don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache don't set LOG_EVENT_BINLOG_IN_USE_F for SEQ_READ_APPEND io_cache
as we won't be able to reset it later as we won't be able to reset it later
*/ */
if (io_cache_type == WRITE_CACHE) if (io_cache_type == WRITE_CACHE)
s.flags |= LOG_EVENT_BINLOG_IN_USE_F; s.flags |= LOG_EVENT_BINLOG_IN_USE_F;
s.checksum_alg= checksum_alg;
if (is_relay_log) if (is_relay_log)
s.set_relay_log_event(); s.set_relay_log_event();
crypto.scheme = 0; crypto.scheme = 0;
DBUG_ASSERT(s.checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if (!s.is_valid()) if (!s.is_valid())
return -1; return -1;
s.dont_set_created= dont_set_created; s.dont_set_created= dont_set_created;
if (write_event(&s, 0, &log_file)) if (write_event(&s, checksum_alg, 0, &log_file))
return -1; return -1;
if (encrypt) if (encrypt)
@ -3897,8 +3895,7 @@ Event_log::write_description_event(enum_binlog_checksum_alg checksum_alg,
return -1; return -1;
Start_encryption_log_event sele(1, key_version, crypto.nonce); Start_encryption_log_event sele(1, key_version, crypto.nonce);
sele.checksum_alg= s.checksum_alg; if (write_event(&sele, checksum_alg, 0, &log_file))
if (write_event(&sele, 0, &log_file))
return -1; return -1;
// Start_encryption_log_event is written, enable the encryption // Start_encryption_log_event is written, enable the encryption
@ -4172,7 +4169,8 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Don't set log_pos in event header */ /* Don't set log_pos in event header */
description_event_for_queue->set_artificial_event(); description_event_for_queue->set_artificial_event();
if (write_event(description_event_for_queue)) if (write_event(description_event_for_queue,
description_event_for_queue->used_checksum_alg))
goto err; goto err;
bytes_written+= description_event_for_queue->data_written; bytes_written+= description_event_for_queue->data_written;
} }
@ -5616,17 +5614,19 @@ int MYSQL_BIN_LOG::new_file_impl()
*/ */
Rotate_log_event r(new_name + dirname_length(new_name), 0, LOG_EVENT_OFFSET, Rotate_log_event r(new_name + dirname_length(new_name), 0, LOG_EVENT_OFFSET,
is_relay_log ? Rotate_log_event::RELAY_LOG : 0); is_relay_log ? Rotate_log_event::RELAY_LOG : 0);
enum_binlog_checksum_alg checksum_alg = BINLOG_CHECKSUM_ALG_UNDEF;
/* /*
The current relay-log's closing Rotate event must have checksum The current relay-log's closing Rotate event must have checksum
value computed with an algorithm of the last relay-logged FD event. value computed with an algorithm of the last relay-logged FD event.
*/ */
if (is_relay_log) if (is_relay_log)
r.checksum_alg= relay_log_checksum_alg; checksum_alg= relay_log_checksum_alg;
DBUG_ASSERT(!is_relay_log || else
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); checksum_alg= (enum_binlog_checksum_alg)binlog_checksum_options;
DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
if ((DBUG_IF("fault_injection_new_file_rotate_event") && if ((DBUG_IF("fault_injection_new_file_rotate_event") &&
(error= close_on_error= TRUE)) || (error= close_on_error= TRUE)) ||
(error= write_event(&r))) (error= write_event(&r, checksum_alg)))
{ {
DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno= 2;); DBUG_EXECUTE_IF("fault_injection_new_file_rotate_event", errno= 2;);
close_on_error= TRUE; close_on_error= TRUE;
@ -5743,10 +5743,22 @@ end2:
DBUG_RETURN(error); DBUG_RETURN(error);
} }
bool Event_log::write_event(Log_event *ev, binlog_cache_data *cache_data, bool Event_log::write_event(Log_event *ev, binlog_cache_data *data,
IO_CACHE *file) IO_CACHE *file)
{ {
Log_event_writer writer(file, 0, &crypto); return write_event(ev, ev->select_checksum_alg(), data, file);
}
bool MYSQL_BIN_LOG::write_event(Log_event *ev)
{
return write_event(ev, ev->select_checksum_alg(), 0, &log_file);
}
bool MYSQL_BIN_LOG::write_event(Log_event *ev,
enum_binlog_checksum_alg checksum_alg,
binlog_cache_data *cache_data, IO_CACHE *file)
{
Log_event_writer writer(file, 0, checksum_alg, &crypto);
if (crypto.scheme && file == &log_file) if (crypto.scheme && file == &log_file)
{ {
writer.ctx= alloca(crypto.ctx_size); writer.ctx= alloca(crypto.ctx_size);
@ -5757,17 +5769,19 @@ bool Event_log::write_event(Log_event *ev, binlog_cache_data *cache_data,
return writer.write(ev); return writer.write(ev);
} }
bool MYSQL_BIN_LOG::append(Log_event *ev) bool MYSQL_BIN_LOG::append(Log_event *ev,
enum_binlog_checksum_alg checksum_alg)
{ {
bool res; bool res;
mysql_mutex_lock(&LOCK_log); mysql_mutex_lock(&LOCK_log);
res= append_no_lock(ev); res= append_no_lock(ev, checksum_alg);
mysql_mutex_unlock(&LOCK_log); mysql_mutex_unlock(&LOCK_log);
return res; return res;
} }
bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev) bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev,
enum_binlog_checksum_alg checksum_alg)
{ {
bool error = 0; bool error = 0;
DBUG_ENTER("MYSQL_BIN_LOG::append"); DBUG_ENTER("MYSQL_BIN_LOG::append");
@ -5775,7 +5789,7 @@ bool MYSQL_BIN_LOG::append_no_lock(Log_event* ev)
mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_log);
DBUG_ASSERT(log_file.type == SEQ_READ_APPEND); DBUG_ASSERT(log_file.type == SEQ_READ_APPEND);
if (write_event(ev)) if (write_event(ev, checksum_alg))
{ {
error=1; error=1;
goto err; goto err;
@ -6175,7 +6189,8 @@ THD::binlog_start_trans_and_stmt()
uchar *buf= 0; uchar *buf= 0;
size_t len= 0; size_t len= 0;
IO_CACHE tmp_io_cache; IO_CACHE tmp_io_cache;
Log_event_writer writer(&tmp_io_cache, 0); // Replicated events in writeset doesn't have checksum
Log_event_writer writer(&tmp_io_cache, 0, BINLOG_CHECKSUM_ALG_OFF, NULL);
if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, if(!open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
128, MYF(MY_WME))) 128, MYF(MY_WME)))
{ {
@ -6190,8 +6205,6 @@ THD::binlog_start_trans_and_stmt()
} }
Gtid_log_event gtid_event(this, seqno, domain_id, true, Gtid_log_event gtid_event(this, seqno, domain_id, true,
LOG_EVENT_SUPPRESS_USE_F, true, 0); LOG_EVENT_SUPPRESS_USE_F, true, 0);
// Replicated events in writeset doesn't have checksum
gtid_event.checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
gtid_event.server_id= server_id; gtid_event.server_id= server_id;
writer.write(&gtid_event); writer.write(&gtid_event);
wsrep_write_cache_buf(&tmp_io_cache, &buf, &len); wsrep_write_cache_buf(&tmp_io_cache, &buf, &len);
@ -6392,7 +6405,7 @@ bool MYSQL_BIN_LOG::write_table_map(THD *thd, TABLE *table, bool with_annotate)
binlog_cache_data *cache_data= (cache_mngr-> binlog_cache_data *cache_data= (cache_mngr->
get_binlog_cache_data(is_transactional)); get_binlog_cache_data(is_transactional));
IO_CACHE *file= &cache_data->cache_log; IO_CACHE *file= &cache_data->cache_log;
Log_event_writer writer(file, cache_data); Log_event_writer writer(file, cache_data, the_event.select_checksum_alg(), NULL);
if (with_annotate) if (with_annotate)
if (thd->binlog_write_annotated_row(&writer)) if (thd->binlog_write_annotated_row(&writer))
@ -6578,7 +6591,8 @@ Event_log::flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
if (Rows_log_event* pending= cache_data->pending()) if (Rows_log_event* pending= cache_data->pending())
{ {
Log_event_writer writer(&cache_data->cache_log, cache_data); Log_event_writer writer(&cache_data->cache_log, cache_data,
pending->select_checksum_alg(), NULL);
/* /*
Write pending event to the cache. Write pending event to the cache.
@ -7704,11 +7718,13 @@ class CacheWriter: public Log_event_writer
public: public:
size_t remains; size_t remains;
CacheWriter(THD *thd_arg, IO_CACHE *file_arg, bool do_checksum, CacheWriter(THD *thd_arg, IO_CACHE *file_arg,
enum_binlog_checksum_alg checksum_alg,
Binlog_crypt_data *cr) Binlog_crypt_data *cr)
: Log_event_writer(file_arg, 0, cr), remains(0), thd(thd_arg), : Log_event_writer(file_arg, 0, checksum_alg, cr), remains(0), thd(thd_arg),
first(true) first(true)
{ checksum_len= do_checksum ? BINLOG_CHECKSUM_LEN : 0; } {
}
~CacheWriter() ~CacheWriter()
{ status_var_add(thd->status_var.binlog_bytes_written, bytes_written); } { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); }
@ -7904,7 +7920,9 @@ int Event_log::write_cache(THD *thd, IO_CACHE *cache)
size_t val; size_t val;
size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN]; uchar header[LOG_EVENT_HEADER_LEN];
CacheWriter writer(thd, get_log_file(), binlog_checksum_options, &crypto); CacheWriter writer(thd, get_log_file(),
(enum_binlog_checksum_alg)binlog_checksum_options,
&crypto);
if (crypto.scheme) if (crypto.scheme)
{ {
@ -9447,11 +9465,11 @@ void MYSQL_BIN_LOG::close(uint exiting)
{ {
Stop_log_event s; Stop_log_event s;
// the checksumming rule for relay-log case is similar to Rotate // the checksumming rule for relay-log case is similar to Rotate
s.checksum_alg= is_relay_log ? relay_log_checksum_alg enum_binlog_checksum_alg checksum_alg= is_relay_log ?
: (enum_binlog_checksum_alg)binlog_checksum_options; relay_log_checksum_alg :
DBUG_ASSERT(!is_relay_log || (enum_binlog_checksum_alg)binlog_checksum_options;
relay_log_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF); DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
write_event(&s); write_event(&s, checksum_alg);
bytes_written+= s.data_written; bytes_written+= s.data_written;
flush_io_cache(&log_file); flush_io_cache(&log_file);
update_binlog_end_pos(); update_binlog_end_pos();
@ -11619,7 +11637,7 @@ bool Recovery_context::decide_or_assess(xid_recovery_member *member, int round,
if (truncate_gtid.seq_no == 0 /* was reset or never set */ || if (truncate_gtid.seq_no == 0 /* was reset or never set */ ||
(truncate_set_in_1st && round == 2 /* reevaluted at round turn */)) (truncate_set_in_1st && round == 2 /* reevaluted at round turn */))
{ {
if (set_truncate_coord(linfo, round, fdle->checksum_alg)) if (set_truncate_coord(linfo, round, fdle->used_checksum_alg))
return true; return true;
} }
else else

View File

@ -427,7 +427,8 @@ public:
bool encrypt, bool dont_set_created, bool encrypt, bool dont_set_created,
bool is_relay_log); bool is_relay_log);
bool write_event(Log_event *ev, binlog_cache_data *data, IO_CACHE *file); bool write_event(Log_event *ev, enum enum_binlog_checksum_alg checksum_alg,
binlog_cache_data *data, IO_CACHE *file);
}; };
/** /**
@ -993,11 +994,16 @@ public:
using Event_log::write_event; using Event_log::write_event;
bool write_event(Log_event *ev) { return write_event(ev, 0, &log_file); } bool write_event(Log_event *ev, binlog_cache_data *data, IO_CACHE *file);
bool write_event(Log_event *ev, enum enum_binlog_checksum_alg checksum_alg)
{
return write_event(ev, checksum_alg, 0, &log_file);
}
bool write_event(Log_event *ev);
bool write_event_buffer(uchar* buf,uint len); bool write_event_buffer(uchar* buf,uint len);
bool append(Log_event* ev); bool append(Log_event* ev, enum enum_binlog_checksum_alg checksum_alg);
bool append_no_lock(Log_event* ev); bool append_no_lock(Log_event* ev, enum enum_binlog_checksum_alg checksum_alg);
void mark_xids_active(ulong cookie, uint xid_count); void mark_xids_active(ulong cookie, uint xid_count);
void mark_xid_done(ulong cookie, bool write_checkpoint); void mark_xid_done(ulong cookie, bool write_checkpoint);

View File

@ -718,11 +718,10 @@ const char* Log_event::get_type_str()
Log_event::Log_event(const uchar *buf, Log_event::Log_event(const uchar *buf,
const Format_description_log_event* description_event) const Format_description_log_event* description_event)
:temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE), :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE)
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
slave_exec_mode(SLAVE_EXEC_MODE_STRICT), , slave_exec_mode(SLAVE_EXEC_MODE_STRICT),
#endif #endif
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
{ {
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
thd= 0; thd= 0;
@ -764,7 +763,7 @@ Log_event::Log_event(const uchar *buf,
int Log_event::read_log_event(IO_CACHE* file, String* packet, int Log_event::read_log_event(IO_CACHE* file, String* packet,
const Format_description_log_event *fdle, const Format_description_log_event *fdle,
enum enum_binlog_checksum_alg checksum_alg_arg, enum_binlog_checksum_alg checksum_alg_arg,
size_t max_allowed_packet) size_t max_allowed_packet)
{ {
ulong data_len; ulong data_len;
@ -966,7 +965,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
my_bool crc_check) my_bool crc_check)
{ {
Log_event* ev; Log_event* ev;
enum enum_binlog_checksum_alg alg; enum_binlog_checksum_alg alg;
DBUG_ENTER("Log_event::read_log_event(char*,...)"); DBUG_ENTER("Log_event::read_log_event(char*,...)");
DBUG_ASSERT(fdle != 0); DBUG_ASSERT(fdle != 0);
DBUG_PRINT("info", ("binlog_version: %d", fdle->binlog_version)); DBUG_PRINT("info", ("binlog_version: %d", fdle->binlog_version));
@ -985,7 +984,8 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
uint event_type= buf[EVENT_TYPE_OFFSET]; uint event_type= buf[EVENT_TYPE_OFFSET];
// all following START events in the current file are without checksum // all following START events in the current file are without checksum
if (event_type == START_EVENT_V3) if (event_type == START_EVENT_V3)
(const_cast< Format_description_log_event *>(fdle))->checksum_alg= BINLOG_CHECKSUM_ALG_OFF; (const_cast< Format_description_log_event *>(fdle))->used_checksum_alg=
BINLOG_CHECKSUM_ALG_OFF;
/* /*
CRC verification by SQL and Show-Binlog-Events master side. CRC verification by SQL and Show-Binlog-Events master side.
The caller has to provide @fdle->checksum_alg to The caller has to provide @fdle->checksum_alg to
@ -1006,7 +1006,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF. Notice, a pre-checksum FD version forces alg := BINLOG_CHECKSUM_ALG_UNDEF.
*/ */
alg= (event_type != FORMAT_DESCRIPTION_EVENT) ? alg= (event_type != FORMAT_DESCRIPTION_EVENT) ?
fdle->checksum_alg : get_checksum_alg(buf, event_len); fdle->used_checksum_alg : get_checksum_alg(buf, event_len);
// Emulate the corruption during reading an event // Emulate the corruption during reading an event
DBUG_EXECUTE_IF("corrupt_read_log_event_char", DBUG_EXECUTE_IF("corrupt_read_log_event_char",
if (event_type != FORMAT_DESCRIPTION_EVENT) if (event_type != FORMAT_DESCRIPTION_EVENT)
@ -1210,11 +1210,10 @@ exit:
if (ev) if (ev)
{ {
ev->checksum_alg= alg;
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
if (ev->checksum_alg != BINLOG_CHECKSUM_ALG_OFF && ev->read_checksum_alg= alg;
ev->checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) if (alg != BINLOG_CHECKSUM_ALG_OFF && alg != BINLOG_CHECKSUM_ALG_UNDEF)
ev->crc= uint4korr(buf + (event_len)); ev->read_checksum_value= uint4korr(buf + (event_len));
#endif #endif
} }
@ -1781,7 +1780,7 @@ Query_compressed_log_event::Query_compressed_log_event(const uchar *buf,
*/ */
int int
Query_log_event::dummy_event(String *packet, ulong ev_offset, Query_log_event::dummy_event(String *packet, ulong ev_offset,
enum enum_binlog_checksum_alg checksum_alg) enum_binlog_checksum_alg checksum_alg)
{ {
uchar *p= (uchar *)packet->ptr() + ev_offset; uchar *p= (uchar *)packet->ptr() + ev_offset;
size_t data_len= packet->length() - ev_offset; size_t data_len= packet->length() - ev_offset;
@ -1873,7 +1872,7 @@ Query_log_event::dummy_event(String *packet, ulong ev_offset,
*/ */
int int
Query_log_event::begin_event(String *packet, ulong ev_offset, Query_log_event::begin_event(String *packet, ulong ev_offset,
enum enum_binlog_checksum_alg checksum_alg) enum_binlog_checksum_alg checksum_alg)
{ {
uchar *p= (uchar *)packet->ptr() + ev_offset; uchar *p= (uchar *)packet->ptr() + ev_offset;
uchar *q= p + LOG_EVENT_HEADER_LEN; uchar *q= p + LOG_EVENT_HEADER_LEN;
@ -1955,9 +1954,11 @@ Query_log_event::begin_event(String *packet, ulong ev_offset,
*/ */
Format_description_log_event:: Format_description_log_event::
Format_description_log_event(uint8 binlog_ver, const char* server_ver) Format_description_log_event(uint8 binlog_ver, const char* server_ver,
enum_binlog_checksum_alg checksum_alg)
:Log_event(), created(0), binlog_version(binlog_ver), :Log_event(), created(0), binlog_version(binlog_ver),
dont_set_created(0), event_type_permutation(0) dont_set_created(0), event_type_permutation(0),
used_checksum_alg(checksum_alg)
{ {
switch (binlog_version) { switch (binlog_version) {
case 4: /* MySQL 5.0 */ case 4: /* MySQL 5.0 */
@ -2083,7 +2084,6 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
} }
calc_server_version_split(); calc_server_version_split();
deduct_options_written_to_bin_log(); deduct_options_written_to_bin_log();
checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
reset_crypto(); reset_crypto();
} }
@ -2114,6 +2114,7 @@ Format_description_log_event(const uchar *buf, uint event_len,
common_header_len(0), post_header_len(NULL), event_type_permutation(0) common_header_len(0), post_header_len(NULL), event_type_permutation(0)
{ {
DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)"); DBUG_ENTER("Format_description_log_event::Format_description_log_event(char*,...)");
used_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
if (event_len < LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET) if (event_len < LOG_EVENT_MINIMAL_HEADER_LEN + ST_COMMON_HEADER_LEN_OFFSET)
{ {
server_version[0]= 0; server_version[0]= 0;
@ -2147,11 +2148,11 @@ Format_description_log_event(const uchar *buf, uint event_len,
{ {
/* the last bytes are the checksum alg desc and value (or value's room) */ /* the last bytes are the checksum alg desc and value (or value's room) */
number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN; number_of_event_types -= BINLOG_CHECKSUM_ALG_DESC_LEN;
checksum_alg= (enum_binlog_checksum_alg)post_header_len[number_of_event_types]; used_checksum_alg= (enum_binlog_checksum_alg)post_header_len[number_of_event_types];
} }
else else
{ {
checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; used_checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
} }
deduct_options_written_to_bin_log(); deduct_options_written_to_bin_log();
reset_crypto(); reset_crypto();
@ -2274,9 +2275,9 @@ Format_description_log_event::is_version_before_checksum(const master_version_sp
checksum-unaware (effectively no checksum) and the actuall checksum-unaware (effectively no checksum) and the actuall
[1-254] range alg descriptor. [1-254] range alg descriptor.
*/ */
enum enum_binlog_checksum_alg get_checksum_alg(const uchar *buf, ulong len) enum_binlog_checksum_alg get_checksum_alg(const uchar *buf, ulong len)
{ {
enum enum_binlog_checksum_alg ret; enum_binlog_checksum_alg ret;
char version[ST_SERVER_VER_LEN]; char version[ST_SERVER_VER_LEN];
DBUG_ENTER("get_checksum_alg"); DBUG_ENTER("get_checksum_alg");
@ -2540,7 +2541,7 @@ Gtid_list_log_event::Gtid_list_log_event(const uchar *buf, uint event_len,
*/ */
bool bool
Gtid_list_log_event::peek(const char *event_start, size_t event_len, Gtid_list_log_event::peek(const char *event_start, size_t event_len,
enum enum_binlog_checksum_alg checksum_alg, enum_binlog_checksum_alg checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len, rpl_gtid **out_gtid_list, uint32 *out_list_len,
const Format_description_log_event *fdev) const Format_description_log_event *fdev)
{ {

View File

@ -991,6 +991,13 @@ class Log_event_writer
public: public:
ulonglong bytes_written; ulonglong bytes_written;
void *ctx; ///< Encryption context or 0 if no encryption is needed void *ctx; ///< Encryption context or 0 if no encryption is needed
/*
The length of a checksum written at the end of the event, if any.
Currently this is always either 0, when checksums are disabled, or
BINLOG_CHECKSUM_LEN when using BINLOG_CHECKSUM_ALG_CRC32.
(If we ever add another checksum algorithm, we will need to instead store
here the algorithm to use instead of just the length).
*/
uint checksum_len; uint checksum_len;
int write(Log_event *ev); int write(Log_event *ev);
int write_header(uchar *pos, size_t len); int write_header(uchar *pos, size_t len);
@ -1003,9 +1010,13 @@ public:
{ encrypt_or_write= &Log_event_writer::encrypt_and_write; } { encrypt_or_write= &Log_event_writer::encrypt_and_write; }
Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg, Log_event_writer(IO_CACHE *file_arg, binlog_cache_data *cache_data_arg,
Binlog_crypt_data *cr= 0) enum_binlog_checksum_alg checksum_alg,
Binlog_crypt_data *cr)
:encrypt_or_write(&Log_event_writer::write_internal), :encrypt_or_write(&Log_event_writer::write_internal),
bytes_written(0), ctx(0), bytes_written(0), ctx(0),
checksum_len(( checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) ?
BINLOG_CHECKSUM_LEN : 0),
file(file_arg), cache_data(cache_data_arg), crypto(cr) { } file(file_arg), cache_data(cache_data_arg), crypto(cr) { }
private: private:
@ -1294,16 +1305,6 @@ public:
*/ */
enum_slave_exec_mode slave_exec_mode; enum_slave_exec_mode slave_exec_mode;
/**
The value is set by caller of FD constructor and
Log_event::write_header() for the rest.
In the FD case it's propagated into the last byte
of post_header_len[] at FD::write().
On the slave side the value is assigned from post_header_len[last]
of the last seen FD event.
*/
enum enum_binlog_checksum_alg checksum_alg;
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
THD* thd; THD* thd;
@ -1333,7 +1334,9 @@ public:
} }
#else #else
Log_event() : temp_buf(0), when(0), flags(0) {} Log_event() : temp_buf(0), when(0), flags(0) {}
ha_checksum crc; /* The checksum algorithm used (if any) when the event was read. */
enum_binlog_checksum_alg read_checksum_alg;
ha_checksum read_checksum_value;
/* print*() functions are used by mysqlbinlog */ /* print*() functions are used by mysqlbinlog */
virtual bool print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0; virtual bool print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
bool print_timestamp(IO_CACHE* file, time_t *ts = 0); bool print_timestamp(IO_CACHE* file, time_t *ts = 0);
@ -1470,7 +1473,7 @@ public:
bool write_footer(Log_event_writer *writer) bool write_footer(Log_event_writer *writer)
{ return writer->write_footer(); } { return writer->write_footer(); }
my_bool need_checksum(); enum_binlog_checksum_alg select_checksum_alg();
virtual bool write(Log_event_writer *writer) virtual bool write(Log_event_writer *writer)
{ {
@ -2226,9 +2229,9 @@ public:
} }
Log_event_type get_type_code() { return QUERY_EVENT; } Log_event_type get_type_code() { return QUERY_EVENT; }
static int dummy_event(String *packet, ulong ev_offset, static int dummy_event(String *packet, ulong ev_offset,
enum enum_binlog_checksum_alg checksum_alg); enum_binlog_checksum_alg checksum_alg);
static int begin_event(String *packet, ulong ev_offset, static int begin_event(String *packet, ulong ev_offset,
enum enum_binlog_checksum_alg checksum_alg); enum_binlog_checksum_alg checksum_alg);
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(Log_event_writer *writer); bool write(Log_event_writer *writer);
virtual bool write_post_header_for_derived(Log_event_writer *writer) { return FALSE; } virtual bool write_post_header_for_derived(Log_event_writer *writer) { return FALSE; }
@ -2252,7 +2255,7 @@ public: /* !!! Public in this patch to allow old usage */
uint32 q_len_arg); uint32 q_len_arg);
static bool peek_is_commit_rollback(const uchar *event_start, static bool peek_is_commit_rollback(const uchar *event_start,
size_t event_len, size_t event_len,
enum enum_binlog_checksum_alg enum_binlog_checksum_alg
checksum_alg); checksum_alg);
int handle_split_alter_query_log_event(rpl_group_info *rgi, int handle_split_alter_query_log_event(rpl_group_info *rgi,
bool &skip_error_check); bool &skip_error_check);
@ -2524,8 +2527,15 @@ public:
master_version_split server_version_split; master_version_split server_version_split;
const uint8 *event_type_permutation; const uint8 *event_type_permutation;
uint32 options_written_to_bin_log; uint32 options_written_to_bin_log;
/*
The checksum algorithm used in the binlog or relaylog following this
Format_description_event. Or BINLOG_CHECKSUM_ALG_UNDEF for a
Format_description_event which is not part of a binlog or relaylog file.
*/
enum_binlog_checksum_alg used_checksum_alg;
Format_description_log_event(uint8 binlog_ver, const char* server_ver=0); Format_description_log_event(uint8 binlog_ver, const char* server_ver= 0,
enum_binlog_checksum_alg checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF);
Format_description_log_event(const uchar *buf, uint event_len, Format_description_log_event(const uchar *buf, uint event_len,
const Format_description_log_event const Format_description_log_event
*description_event); *description_event);
@ -3384,9 +3394,9 @@ public:
#ifdef MYSQL_SERVER #ifdef MYSQL_SERVER
bool write(Log_event_writer *writer); bool write(Log_event_writer *writer);
static int make_compatible_event(String *packet, bool *need_dummy_event, static int make_compatible_event(String *packet, bool *need_dummy_event,
ulong ev_offset, enum enum_binlog_checksum_alg checksum_alg); ulong ev_offset, enum_binlog_checksum_alg checksum_alg);
static bool peek(const uchar *event_start, size_t event_len, static bool peek(const uchar *event_start, size_t event_len,
enum enum_binlog_checksum_alg checksum_alg, enum_binlog_checksum_alg checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no, uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2, const Format_description_log_event *fdev); uchar *flags2, const Format_description_log_event *fdev);
#endif #endif
@ -3505,7 +3515,7 @@ public:
enum_skip_reason do_shall_skip(rpl_group_info *rgi); enum_skip_reason do_shall_skip(rpl_group_info *rgi);
#endif #endif
static bool peek(const char *event_start, size_t event_len, static bool peek(const char *event_start, size_t event_len,
enum enum_binlog_checksum_alg checksum_alg, enum_binlog_checksum_alg checksum_alg,
rpl_gtid **out_gtid_list, uint32 *out_list_len, rpl_gtid **out_gtid_list, uint32 *out_list_len,
const Format_description_log_event *fdev); const Format_description_log_event *fdev);
}; };
@ -5432,7 +5442,7 @@ bool slave_execute_deferred_events(THD *thd);
bool event_that_should_be_ignored(const uchar *buf); bool event_that_should_be_ignored(const uchar *buf);
bool event_checksum_test(uchar *buf, ulong event_len, bool event_checksum_test(uchar *buf, ulong event_len,
enum_binlog_checksum_alg alg); enum_binlog_checksum_alg alg);
enum enum_binlog_checksum_alg get_checksum_alg(const uchar *buf, ulong len); enum_binlog_checksum_alg get_checksum_alg(const uchar *buf, ulong len);
extern TYPELIB binlog_checksum_typelib; extern TYPELIB binlog_checksum_typelib;
#ifdef WITH_WSREP #ifdef WITH_WSREP
enum Log_event_type wsrep_peak_event(rpl_group_info *rgi, ulonglong* event_size); enum Log_event_type wsrep_peak_event(rpl_group_info *rgi, ulonglong* event_size);

View File

@ -346,14 +346,14 @@ bool Log_event::print_header(IO_CACHE* file,
/* print the checksum */ /* print the checksum */
if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF && if (read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
{ {
char checksum_buf[BINLOG_CHECKSUM_LEN * 2 + 4]; // to fit to "%p " char checksum_buf[BINLOG_CHECKSUM_LEN * 2 + 4]; // to fit to "%p "
size_t const bytes_written= size_t const bytes_written=
my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08x ", crc); my_snprintf(checksum_buf, sizeof(checksum_buf), "0x%08x ", read_checksum_value);
if (my_b_printf(file, "%s ", get_type(&binlog_checksum_typelib, if (my_b_printf(file, "%s ", get_type(&binlog_checksum_typelib,
checksum_alg)) || read_checksum_alg)) ||
my_b_printf(file, checksum_buf, bytes_written)) my_b_printf(file, checksum_buf, bytes_written))
goto err; goto err;
} }
@ -1604,8 +1604,8 @@ bool Log_event::print_base64(IO_CACHE* file,
uint tmp_size= size; uint tmp_size= size;
Rows_log_event *ev= NULL; Rows_log_event *ev= NULL;
Log_event_type ev_type = (enum Log_event_type) ptr[EVENT_TYPE_OFFSET]; Log_event_type ev_type = (enum Log_event_type) ptr[EVENT_TYPE_OFFSET];
if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && if (read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF &&
checksum_alg != BINLOG_CHECKSUM_ALG_OFF) read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
tmp_size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header tmp_size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header
switch (ev_type) { switch (ev_type) {
case WRITE_ROWS_EVENT: case WRITE_ROWS_EVENT:
@ -1672,8 +1672,8 @@ bool Log_event::print_base64(IO_CACHE* file,
Rows_log_event *ev= NULL; Rows_log_event *ev= NULL;
Log_event_type et= (Log_event_type) ptr[EVENT_TYPE_OFFSET]; Log_event_type et= (Log_event_type) ptr[EVENT_TYPE_OFFSET];
if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF && if (read_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF &&
checksum_alg != BINLOG_CHECKSUM_ALG_OFF) read_checksum_alg != BINLOG_CHECKSUM_ALG_OFF)
size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header size-= BINLOG_CHECKSUM_LEN; // checksum is displayed through the header
switch (et) switch (et)
@ -3514,7 +3514,7 @@ bool Write_rows_compressed_log_event::print(FILE *file,
ulong len; ulong len;
bool is_malloc = false; bool is_malloc = false;
if(!row_log_event_uncompress(glob_description_event, if(!row_log_event_uncompress(glob_description_event,
checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, UINT_MAX32, NULL, 0, &is_malloc, temp_buf, UINT_MAX32, NULL, 0, &is_malloc,
&new_buf, &len)) &new_buf, &len))
{ {
@ -3551,7 +3551,7 @@ bool Delete_rows_compressed_log_event::print(FILE *file,
ulong len; ulong len;
bool is_malloc = false; bool is_malloc = false;
if(!row_log_event_uncompress(glob_description_event, if(!row_log_event_uncompress(glob_description_event,
checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, UINT_MAX32, NULL, 0, &is_malloc, temp_buf, UINT_MAX32, NULL, 0, &is_malloc,
&new_buf, &len)) &new_buf, &len))
{ {
@ -3588,7 +3588,7 @@ Update_rows_compressed_log_event::print(FILE *file,
ulong len; ulong len;
bool is_malloc= false; bool is_malloc= false;
if(!row_log_event_uncompress(glob_description_event, if(!row_log_event_uncompress(glob_description_event,
checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, read_checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
temp_buf, UINT_MAX32, NULL, 0, &is_malloc, temp_buf, UINT_MAX32, NULL, 0, &is_malloc,
&new_buf, &len)) &new_buf, &len))
{ {

View File

@ -538,8 +538,7 @@ int append_query_string(CHARSET_INFO *csinfo, String *to,
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans) Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
:log_pos(0), temp_buf(0), exec_time(0), :log_pos(0), temp_buf(0), exec_time(0),
slave_exec_mode(SLAVE_EXEC_MODE_STRICT), slave_exec_mode(SLAVE_EXEC_MODE_STRICT), thd(thd_arg)
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), thd(thd_arg)
{ {
server_id= thd->variables.server_id; server_id= thd->variables.server_id;
when= thd->start_time; when= thd->start_time;
@ -563,8 +562,7 @@ Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
Log_event::Log_event() Log_event::Log_event()
:temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE), :temp_buf(0), exec_time(0), flags(0), cache_type(EVENT_INVALID_CACHE),
slave_exec_mode(SLAVE_EXEC_MODE_STRICT), slave_exec_mode(SLAVE_EXEC_MODE_STRICT), thd(0)
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), thd(0)
{ {
server_id= global_system_variables.server_id; server_id= global_system_variables.server_id;
/* /*
@ -690,82 +688,21 @@ void Log_event::init_show_field_list(THD *thd, List<Item>* field_list)
} }
/** /**
A decider of whether to trigger checksum computation or not. Select if and how to write checksum for an event written to the binlog.
To be invoked in Log_event::write() stack. It returns the actively configured binlog checksum option, unless the event
The decision is positive is being written to a cache (in which case the checksum, if any, is added
later when the cache is copied to the real binlog).
S,M) if it's been marked for checksumming with @c checksum_alg
M) otherwise, if @@global.binlog_checksum is not NONE and the event is
directly written to the binlog file.
The to-be-cached event decides at @c write_cache() time.
Otherwise the decision is negative.
@note A side effect of the method is altering Log_event::checksum_alg
it the latter was undefined at calling.
@return true Checksum should be used. Log_event::checksum_alg is set.
@return false No checksum
*/ */
enum_binlog_checksum_alg Log_event::select_checksum_alg()
my_bool Log_event::need_checksum()
{ {
my_bool ret; if (cache_type == Log_event::EVENT_NO_CACHE)
DBUG_ENTER("Log_event::need_checksum"); return (enum_binlog_checksum_alg)binlog_checksum_options;
/* DBUG_ASSERT(get_type_code() != ROTATE_EVENT &&
few callers of Log_event::write get_type_code() != STOP_EVENT &&
(incl FD::write, FD constructing code on the slave side, Rotate relay log get_type_code() != FORMAT_DESCRIPTION_EVENT);
and Stop event)
provides their checksum alg preference through Log_event::checksum_alg.
*/
if (checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)
ret= checksum_alg != BINLOG_CHECKSUM_ALG_OFF;
else
{
ret= binlog_checksum_options && cache_type == Log_event::EVENT_NO_CACHE;
checksum_alg= ret ? (enum_binlog_checksum_alg)binlog_checksum_options
: BINLOG_CHECKSUM_ALG_OFF;
}
/*
FD calls the methods before data_written has been calculated.
The following invariant claims if the current is not the first
call (and therefore data_written is not zero) then `ret' must be
TRUE. It may not be null because FD is always checksummed.
*/
DBUG_ASSERT(get_type_code() != FORMAT_DESCRIPTION_EVENT || ret ||
data_written == 0);
DBUG_ASSERT(!ret || return BINLOG_CHECKSUM_ALG_OFF;
((checksum_alg == binlog_checksum_options ||
/*
Stop event closes the relay-log and its checksum alg
preference is set by the caller can be different
from the server's binlog_checksum_options.
*/
get_type_code() == STOP_EVENT ||
/*
Rotate:s can be checksummed regardless of the server's
binlog_checksum_options. That applies to both
the local RL's Rotate and the master's Rotate
which IO thread instantiates via queue_binlog_ver_3_event.
*/
get_type_code() == ROTATE_EVENT ||
get_type_code() == START_ENCRYPTION_EVENT ||
/* FD is always checksummed */
get_type_code() == FORMAT_DESCRIPTION_EVENT) &&
checksum_alg != BINLOG_CHECKSUM_ALG_OFF));
DBUG_ASSERT(checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
DBUG_ASSERT(((get_type_code() != ROTATE_EVENT &&
get_type_code() != STOP_EVENT) ||
get_type_code() != FORMAT_DESCRIPTION_EVENT) ||
cache_type == Log_event::EVENT_NO_CACHE);
DBUG_RETURN(ret);
} }
int Log_event_writer::write_internal(const uchar *pos, size_t len) int Log_event_writer::write_internal(const uchar *pos, size_t len)
@ -912,8 +849,6 @@ bool Log_event::write_header(Log_event_writer *writer, size_t event_data_length)
(longlong) writer->pos(), event_data_length, (longlong) writer->pos(), event_data_length,
(int) get_type_code())); (int) get_type_code()));
writer->checksum_len= need_checksum() ? BINLOG_CHECKSUM_LEN : 0;
/* Store number of bytes that will be written by this event */ /* Store number of bytes that will be written by this event */
data_written= event_data_length + sizeof(header) + writer->checksum_len; data_written= event_data_length + sizeof(header) + writer->checksum_len;
@ -2417,7 +2352,7 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
bool bool
Query_log_event::peek_is_commit_rollback(const uchar *event_start, Query_log_event::peek_is_commit_rollback(const uchar *event_start,
size_t event_len, size_t event_len,
enum enum_binlog_checksum_alg enum_binlog_checksum_alg
checksum_alg) checksum_alg)
{ {
if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32) if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
@ -2455,7 +2390,6 @@ void Format_description_log_event::pack_info(Protocol *protocol)
bool Format_description_log_event::write(Log_event_writer *writer) bool Format_description_log_event::write(Log_event_writer *writer)
{ {
bool ret; bool ret;
bool no_checksum;
/* /*
We don't call Start_log_event_v::write() because this would make 2 We don't call Start_log_event_v::write() because this would make 2
my_b_safe_write(). my_b_safe_write().
@ -2478,11 +2412,9 @@ bool Format_description_log_event::write(Log_event_writer *writer)
FD_queue checksum_alg value. FD_queue checksum_alg value.
*/ */
compile_time_assert(BINLOG_CHECKSUM_ALG_DESC_LEN == 1); compile_time_assert(BINLOG_CHECKSUM_ALG_DESC_LEN == 1);
#ifdef DBUG_ASSERT_EXISTS uint8 checksum_byte= (uint8) (used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF ?
data_written= 0; // to prepare for need_checksum assert used_checksum_alg : BINLOG_CHECKSUM_ALG_OFF);
#endif DBUG_ASSERT(used_checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF);
uint8 checksum_byte= (uint8)
(need_checksum() ? checksum_alg : BINLOG_CHECKSUM_ALG_OFF);
/* /*
FD of checksum-aware server is always checksum-equipped, (V) is in, FD of checksum-aware server is always checksum-equipped, (V) is in,
regardless of @@global.binlog_checksum policy. regardless of @@global.binlog_checksum policy.
@ -2496,17 +2428,14 @@ bool Format_description_log_event::write(Log_event_writer *writer)
1 + 4 bytes bigger comparing to the former FD. 1 + 4 bytes bigger comparing to the former FD.
*/ */
if ((no_checksum= (checksum_alg == BINLOG_CHECKSUM_ALG_OFF))) uint orig_checksum_len= writer->checksum_len;
{ writer->checksum_len= BINLOG_CHECKSUM_LEN;
checksum_alg= BINLOG_CHECKSUM_ALG_CRC32; // Forcing (V) room to fill anyway
}
ret= write_header(writer, rec_size) || ret= write_header(writer, rec_size) ||
write_data(writer, buff, sizeof(buff)) || write_data(writer, buff, sizeof(buff)) ||
write_data(writer, post_header_len, number_of_event_types) || write_data(writer, post_header_len, number_of_event_types) ||
write_data(writer, &checksum_byte, sizeof(checksum_byte)) || write_data(writer, &checksum_byte, sizeof(checksum_byte)) ||
write_footer(writer); write_footer(writer);
if (no_checksum) writer->checksum_len= orig_checksum_len;
checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
return ret; return ret;
} }
@ -2973,7 +2902,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
*/ */
bool bool
Gtid_log_event::peek(const uchar *event_start, size_t event_len, Gtid_log_event::peek(const uchar *event_start, size_t event_len,
enum enum_binlog_checksum_alg checksum_alg, enum_binlog_checksum_alg checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no, uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2, const Format_description_log_event *fdev) uchar *flags2, const Format_description_log_event *fdev)
{ {
@ -3070,7 +2999,7 @@ Gtid_log_event::write(Log_event_writer *writer)
int int
Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event, Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
ulong ev_offset, ulong ev_offset,
enum enum_binlog_checksum_alg checksum_alg) enum_binlog_checksum_alg checksum_alg)
{ {
uchar flags2; uchar flags2;
if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN) if (packet->length() - ev_offset < LOG_EVENT_HEADER_LEN + GTID_HEADER_LEN)

View File

@ -240,7 +240,7 @@ class Master_info : public Slave_reporting_capability
Initialized to novalue, then set to the queried from master Initialized to novalue, then set to the queried from master
@@global.binlog_checksum and deactivated once FD has been received. @@global.binlog_checksum and deactivated once FD has been received.
*/ */
enum enum_binlog_checksum_alg checksum_alg_before_fd; enum_binlog_checksum_alg checksum_alg_before_fd;
uint connect_retry; uint connect_retry;
#ifndef DBUG_OFF #ifndef DBUG_OFF
int events_till_disconnect; int events_till_disconnect;

View File

@ -296,7 +296,7 @@ table_def::~table_def()
*/ */
bool event_checksum_test(uchar *event_buf, ulong event_len, bool event_checksum_test(uchar *event_buf, ulong event_len,
enum enum_binlog_checksum_alg alg) enum_binlog_checksum_alg alg)
{ {
bool res= FALSE; bool res= FALSE;
uint16 flags= 0; // to store in FD's buffer flags orig value uint16 flags= 0; // to store in FD's buffer flags orig value

View File

@ -1790,7 +1790,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
master is 3.23, 4.0, etc. master is 3.23, 4.0, etc.
*/ */
mi->rli.relay_log.description_event_for_queue= new mi->rli.relay_log.description_event_for_queue= new
Format_description_log_event(4, mysql->server_version); Format_description_log_event(4, mysql->server_version,
mi->rli.relay_log.relay_log_checksum_alg);
break; break;
} }
} }
@ -1849,10 +1850,8 @@ static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi)
until it has received a new FD_m. until it has received a new FD_m.
*/ */
mi->rli.relay_log.description_event_for_queue->checksum_alg=
mi->rli.relay_log.relay_log_checksum_alg;
DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg != DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->used_checksum_alg !=
BINLOG_CHECKSUM_ALG_UNDEF); BINLOG_CHECKSUM_ALG_UNDEF);
DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg != DBUG_ASSERT(mi->rli.relay_log.relay_log_checksum_alg !=
BINLOG_CHECKSUM_ALG_UNDEF); BINLOG_CHECKSUM_ALG_UNDEF);
@ -2741,7 +2740,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
{ {
DBUG_PRINT("info",("writing a Rotate event to track down ignored events")); DBUG_PRINT("info",("writing a Rotate event to track down ignored events"));
rev->server_id= 0; // don't be ignored by slave SQL thread rev->server_id= 0; // don't be ignored by slave SQL thread
if (unlikely(rli->relay_log.append(rev))) if (unlikely(rli->relay_log.append(rev, rli->relay_log.relay_log_checksum_alg)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Rotate event" "failed to write a Rotate event"
@ -2754,7 +2753,7 @@ static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events")); DBUG_PRINT("info",("writing a Gtid_list event to track down ignored events"));
glev->server_id= 0; // don't be ignored by slave SQL thread glev->server_id= 0; // don't be ignored by slave SQL thread
glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos glev->set_artificial_event(); // Don't mess up Exec_Master_Log_Pos
if (unlikely(rli->relay_log.append(glev))) if (unlikely(rli->relay_log.append(glev, rli->relay_log.relay_log_checksum_alg)))
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL, mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE), ER_THD(thd, ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"failed to write a Gtid_list event to the relay log, " "failed to write a Gtid_list event to the relay log, "
@ -5963,7 +5962,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
Show-up of FD:s affects checksum_alg at once because Show-up of FD:s affects checksum_alg at once because
that changes FD_queue. that changes FD_queue.
*/ */
enum enum_binlog_checksum_alg checksum_alg= enum_binlog_checksum_alg checksum_alg=
mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ? mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF ?
mi->checksum_alg_before_fd : mi->rli.relay_log.relay_log_checksum_alg; mi->checksum_alg_before_fd : mi->rli.relay_log.relay_log_checksum_alg;
@ -5987,7 +5986,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
{ {
// checksum behaviour is similar to the pre-checksum FD handling // checksum behaviour is similar to the pre-checksum FD handling
mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF; mi->checksum_alg_before_fd= BINLOG_CHECKSUM_ALG_UNDEF;
mi->rli.relay_log.description_event_for_queue->checksum_alg= mi->rli.relay_log.description_event_for_queue->used_checksum_alg=
mi->rli.relay_log.relay_log_checksum_alg= checksum_alg= mi->rli.relay_log.relay_log_checksum_alg= checksum_alg=
BINLOG_CHECKSUM_ALG_OFF; BINLOG_CHECKSUM_ALG_OFF;
} }
@ -6112,8 +6111,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
We detect this case by noticing a change of server_id and in this We detect this case by noticing a change of server_id and in this
case likewise rollback the partially received event group. case likewise rollback the partially received event group.
*/ */
Format_description_log_event fdle(4); Format_description_log_event fdle(4, NULL, checksum_alg);
fdle.checksum_alg= checksum_alg;
/* /*
Possible crash is flagged in being created FD' common header Possible crash is flagged in being created FD' common header
@ -6145,7 +6143,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
rev.new_log_ident); rev.new_log_ident);
} }
mysql_mutex_lock(log_lock); mysql_mutex_lock(log_lock);
if (likely(!rli->relay_log.write_event(&fdle) && if (likely(!rli->relay_log.write_event(&fdle, checksum_alg) &&
!rli->relay_log.flush_and_sync(NULL))) !rli->relay_log.flush_and_sync(NULL)))
{ {
rli->relay_log.harvest_bytes_written(&rli->log_space_total); rli->relay_log.harvest_bytes_written(&rli->log_space_total);
@ -6194,7 +6192,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
event_len - BINLOG_CHECKSUM_LEN); event_len - BINLOG_CHECKSUM_LEN);
int4store(&rot_buf[event_len - BINLOG_CHECKSUM_LEN], rot_crc); int4store(&rot_buf[event_len - BINLOG_CHECKSUM_LEN], rot_crc);
DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET]));
DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->used_checksum_alg ==
mi->rli.relay_log.relay_log_checksum_alg); mi->rli.relay_log.relay_log_checksum_alg);
/* the first one */ /* the first one */
DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
@ -6214,7 +6212,7 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
int4store(&rot_buf[EVENT_LEN_OFFSET], int4store(&rot_buf[EVENT_LEN_OFFSET],
uint4korr(&rot_buf[EVENT_LEN_OFFSET]) - BINLOG_CHECKSUM_LEN); uint4korr(&rot_buf[EVENT_LEN_OFFSET]) - BINLOG_CHECKSUM_LEN);
DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET])); DBUG_ASSERT(event_len == uint4korr(&rot_buf[EVENT_LEN_OFFSET]));
DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->checksum_alg == DBUG_ASSERT(mi->rli.relay_log.description_event_for_queue->used_checksum_alg ==
mi->rli.relay_log.relay_log_checksum_alg); mi->rli.relay_log.relay_log_checksum_alg);
/* the first one */ /* the first one */
DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF); DBUG_ASSERT(mi->checksum_alg_before_fd != BINLOG_CHECKSUM_ALG_UNDEF);
@ -6254,11 +6252,11 @@ static int queue_event(Master_info* mi, const uchar *buf, ulong event_len)
tmp->copy_crypto_data(mi->rli.relay_log.description_event_for_queue); tmp->copy_crypto_data(mi->rli.relay_log.description_event_for_queue);
delete mi->rli.relay_log.description_event_for_queue; delete mi->rli.relay_log.description_event_for_queue;
mi->rli.relay_log.description_event_for_queue= tmp; mi->rli.relay_log.description_event_for_queue= tmp;
if (tmp->checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF) if (tmp->used_checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF)
tmp->checksum_alg= BINLOG_CHECKSUM_ALG_OFF; tmp->used_checksum_alg= BINLOG_CHECKSUM_ALG_OFF;
/* installing new value of checksum Alg for relay log */ /* installing new value of checksum Alg for relay log */
mi->rli.relay_log.relay_log_checksum_alg= tmp->checksum_alg; mi->rli.relay_log.relay_log_checksum_alg= tmp->used_checksum_alg;
/* /*
Do not queue any format description event that we receive after a Do not queue any format description event that we receive after a
@ -6823,7 +6821,8 @@ dbug_gtid_accept:
rli->relay_log.description_event_for_queue->created= 0; rli->relay_log.description_event_for_queue->created= 0;
rli->relay_log.description_event_for_queue->set_artificial_event(); rli->relay_log.description_event_for_queue->set_artificial_event();
if (rli->relay_log.append_no_lock if (rli->relay_log.append_no_lock
(rli->relay_log.description_event_for_queue)) (rli->relay_log.description_event_for_queue,
rli->relay_log.relay_log_checksum_alg))
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
else else
rli->relay_log.harvest_bytes_written(&rli->log_space_total); rli->relay_log.harvest_bytes_written(&rli->log_space_total);
@ -6836,7 +6835,8 @@ dbug_gtid_accept:
*/ */
Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0); Rotate_log_event fake_rev(mi->master_log_name, 0, mi->master_log_pos, 0);
fake_rev.server_id= mi->master_id; fake_rev.server_id= mi->master_id;
if (rli->relay_log.append_no_lock(&fake_rev)) if (rli->relay_log.append_no_lock(&fake_rev,
rli->relay_log.relay_log_checksum_alg))
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE; error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
else else
rli->relay_log.harvest_bytes_written(&rli->log_space_total); rli->relay_log.harvest_bytes_written(&rli->log_space_total);

View File

@ -52,7 +52,7 @@ extern TYPELIB binlog_checksum_typelib;
static int static int
fake_event_header(String* packet, Log_event_type event_type, ulong extra_len, fake_event_header(String* packet, Log_event_type event_type, ulong extra_len,
my_bool *do_checksum, ha_checksum *crc, const char** errmsg, my_bool *do_checksum, ha_checksum *crc, const char** errmsg,
enum enum_binlog_checksum_alg checksum_alg_arg, uint32 end_pos) enum_binlog_checksum_alg checksum_alg_arg, uint32 end_pos)
{ {
char header[LOG_EVENT_HEADER_LEN]; char header[LOG_EVENT_HEADER_LEN];
ulong event_len; ulong event_len;
@ -133,7 +133,7 @@ struct binlog_send_info {
enum_gtid_skip_type gtid_skip_group; enum_gtid_skip_type gtid_skip_group;
enum_gtid_until_state gtid_until_group; enum_gtid_until_state gtid_until_group;
ushort flags; ushort flags;
enum enum_binlog_checksum_alg current_checksum_alg; enum_binlog_checksum_alg current_checksum_alg;
bool slave_gtid_strict_mode; bool slave_gtid_strict_mode;
bool send_fake_gtid_list; bool send_fake_gtid_list;
bool slave_gtid_ignore_duplicates; bool slave_gtid_ignore_duplicates;
@ -211,7 +211,7 @@ static int reset_transmit_packet(struct binlog_send_info *info, ushort flags,
*/ */
static int fake_rotate_event(binlog_send_info *info, ulonglong position, static int fake_rotate_event(binlog_send_info *info, ulonglong position,
const char** errmsg, enum enum_binlog_checksum_alg checksum_alg_arg) const char** errmsg, enum_binlog_checksum_alg checksum_alg_arg)
{ {
DBUG_ENTER("fake_rotate_event"); DBUG_ENTER("fake_rotate_event");
ulong ev_offset; ulong ev_offset;
@ -495,12 +495,12 @@ static bool is_slave_checksum_aware(THD * thd)
@param[in] thd THD to access a user variable @param[in] thd THD to access a user variable
@return value of @@binlog_checksum alg according to @return value of @@binlog_checksum alg according to
@c enum enum_binlog_checksum_alg @c enum_binlog_checksum_alg
*/ */
static enum enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * thd) static enum_binlog_checksum_alg get_binlog_checksum_value_at_connect(THD * thd)
{ {
enum enum_binlog_checksum_alg ret; enum_binlog_checksum_alg ret;
DBUG_ENTER("get_binlog_checksum_value_at_connect"); DBUG_ENTER("get_binlog_checksum_value_at_connect");
user_var_entry *entry= get_binlog_checksum_uservar(thd); user_var_entry *entry= get_binlog_checksum_uservar(thd);
@ -826,7 +826,7 @@ get_slave_until_gtid(THD *thd, String *out_str)
static int send_heartbeat_event(binlog_send_info *info, static int send_heartbeat_event(binlog_send_info *info,
NET* net, String* packet, NET* net, String* packet,
const struct event_coordinates *coord, const struct event_coordinates *coord,
enum enum_binlog_checksum_alg checksum_alg_arg) enum_binlog_checksum_alg checksum_alg_arg)
{ {
DBUG_ENTER("send_heartbeat_event"); DBUG_ENTER("send_heartbeat_event");
@ -1452,7 +1452,7 @@ gtid_state_from_pos(const char *name, uint32 offset,
bool found_gtid_list_event= false; bool found_gtid_list_event= false;
bool found_format_description_event= false; bool found_format_description_event= false;
bool valid_pos= false; bool valid_pos= false;
enum enum_binlog_checksum_alg current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF; enum_binlog_checksum_alg current_checksum_alg= BINLOG_CHECKSUM_ALG_UNDEF;
int err; int err;
String packet; String packet;
Format_description_log_event *fdev= NULL; Format_description_log_event *fdev= NULL;
@ -1722,7 +1722,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
String* const packet= info->packet; String* const packet= info->packet;
size_t len= packet->length(); size_t len= packet->length();
int mariadb_slave_capability= info->mariadb_slave_capability; int mariadb_slave_capability= info->mariadb_slave_capability;
enum enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg; enum_binlog_checksum_alg current_checksum_alg= info->current_checksum_alg;
slave_connection_state *gtid_state= &info->gtid_state; slave_connection_state *gtid_state= &info->gtid_state;
slave_connection_state *until_gtid_state= info->until_gtid_state; slave_connection_state *until_gtid_state= info->until_gtid_state;
bool need_sync= false; bool need_sync= false;
@ -4129,7 +4129,7 @@ bool mysql_show_binlog_events(THD* thd)
Master_info *mi= 0; Master_info *mi= 0;
LOG_INFO linfo; LOG_INFO linfo;
LEX_MASTER_INFO *lex_mi= &thd->lex->mi; LEX_MASTER_INFO *lex_mi= &thd->lex->mi;
enum enum_binlog_checksum_alg checksum_alg; enum_binlog_checksum_alg checksum_alg;
my_off_t binlog_size; my_off_t binlog_size;
MY_STAT s; MY_STAT s;
@ -4265,7 +4265,7 @@ bool mysql_show_binlog_events(THD* thd)
if (lex_mi->pos > BIN_LOG_HEADER_SIZE) if (lex_mi->pos > BIN_LOG_HEADER_SIZE)
{ {
checksum_alg= description_event->checksum_alg; checksum_alg= description_event->used_checksum_alg;
/* Validate user given position using checksum */ /* Validate user given position using checksum */
if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF && if (checksum_alg != BINLOG_CHECKSUM_ALG_OFF &&
checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF) checksum_alg != BINLOG_CHECKSUM_ALG_UNDEF)

View File

@ -236,7 +236,9 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
File file; File file;
IO_CACHE cache; IO_CACHE cache;
Log_event_writer writer(&cache, 0); enum_binlog_checksum_alg checksum_alg=
(enum_binlog_checksum_alg) binlog_checksum_options;
Log_event_writer writer(&cache, 0, checksum_alg, NULL);
Format_description_log_event *ev= 0; Format_description_log_event *ev= 0;
longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd); longlong thd_trx_seqno= (long long)wsrep_thd_trx_seqno(thd);
@ -288,7 +290,7 @@ void wsrep_dump_rbr_buf_with_header(THD *thd, const void *rbr_buf,
to the dump file). to the dump file).
*/ */
ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) : ev= (thd->wsrep_applier) ? wsrep_get_apply_format(thd) :
(new Format_description_log_event(4)); (new Format_description_log_event(4, NULL, checksum_alg));
if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) || if (writer.write(ev) || my_b_write(&cache, (uchar*)rbr_buf, buf_len) ||
flush_io_cache(&cache)) flush_io_cache(&cache))

View File

@ -2155,16 +2155,16 @@ int wsrep_to_buf_helper(
THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len) THD* thd, const char *query, uint query_len, uchar** buf, size_t* buf_len)
{ {
IO_CACHE tmp_io_cache; IO_CACHE tmp_io_cache;
Log_event_writer writer(&tmp_io_cache, 0); enum_binlog_checksum_alg current_binlog_check_alg=
(enum_binlog_checksum_alg) binlog_checksum_options;
Log_event_writer writer(&tmp_io_cache, NULL, current_binlog_check_alg, NULL);
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
65536, MYF(MY_WME))) 65536, MYF(MY_WME)))
return 1; return 1;
int ret(0); int ret(0);
enum enum_binlog_checksum_alg current_binlog_check_alg=
(enum_binlog_checksum_alg) binlog_checksum_options;
Format_description_log_event *tmp_fd= new Format_description_log_event(4); Format_description_log_event *tmp_fd=
tmp_fd->checksum_alg= current_binlog_check_alg; new Format_description_log_event(4, NULL, current_binlog_check_alg);
writer.write(tmp_fd); writer.write(tmp_fd);
delete tmp_fd; delete tmp_fd;
@ -2213,7 +2213,6 @@ int wsrep_to_buf_helper(
Query_log_event ev(thd, thd->wsrep_TOI_pre_query, Query_log_event ev(thd, thd->wsrep_TOI_pre_query,
thd->wsrep_TOI_pre_query_len, thd->wsrep_TOI_pre_query_len,
FALSE, FALSE, FALSE, 0); FALSE, FALSE, FALSE, 0);
ev.checksum_alg= current_binlog_check_alg;
if (writer.write(&ev)) ret= 1; if (writer.write(&ev)) ret= 1;
} }
@ -2222,7 +2221,6 @@ int wsrep_to_buf_helper(
/* WSREP GTID mode, we need to change server_id */ /* WSREP GTID mode, we need to change server_id */
if (wsrep_gtid_mode && !thd->variables.gtid_seq_no) if (wsrep_gtid_mode && !thd->variables.gtid_seq_no)
ev.server_id= wsrep_gtid_server.server_id; ev.server_id= wsrep_gtid_server.server_id;
ev.checksum_alg= current_binlog_check_alg;
if (!ret && writer.write(&ev)) ret= 1; if (!ret && writer.write(&ev)) ret= 1;
if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, buf_len)) ret= 1;
close_cached_file(&tmp_io_cache); close_cached_file(&tmp_io_cache);