MDEV-35643 Add support for MySQL 8.0 binlog events

MDEV-29533 Crash when MariaDB is replica of MySQL 8.0

MySQL 8.0 has added the following new events in the MySQL binary log

PARTIAL_UPDATE_ROWS_EVENT
TRANSACTION_PAYLOAD_EVENT
HEARTBEAT_LOG_EVENT_V2

- PARTIAL_UPDATE_ROWS_EVENT is used by MySQL to generate update
  statements using JSON_SET, JSON_REPLACE and JSON_REMOVE to make
  update of JSON columns more efficient.  These events can be
  disabled by setting 'binlog-row-value-options=""'
- TRANSACTION_PAYLOAD_EVENT is used by MySQL to signal that a
  row event is compressed. It an be disably by setting
  'binlog_transaction_compression=0'.
- HEARTBEAT_LOG_EVENT_V2 is written to the binary log many times
  per seconds. It can be ignored by the server.

What this patch does:

- If PARTIAL_UPDATE_ROWS_EVENT or TRANSACTION_PAYLOAD_EVENT is found,
  the server will stop with an error message of how to disable the
  MySQL server to generate such events.
- HEARTBEAT_LOG_EVENT_V2 events are ignored.
- mariadb-binlog will write the name of the new events.
- mariadb-binlog will stop if PARTIAL_UPDATE_ROWS_EVENT or
  TRANSACTION_PAYLOAD_EVENT is found, unless --force is given.
- Fixes a crash in mariadb-binlog if a character set unknown to
  MariaDB is found. (MDEV-29533)

From Kristian Nielsen:
- Add test case for MySQL 8.0 to MariaDB replication and fixed a
  a small typo in post_header_len initialization.

Reviewer: knielsen@mariadb.org
This commit is contained in:
Monty 2024-12-13 15:41:59 +02:00
parent 47a5eed437
commit 87ee1e75bc
7 changed files with 213 additions and 12 deletions

Binary file not shown.

View File

@ -0,0 +1,52 @@
include/master-slave.inc
[connection master]
connection slave;
include/stop_slave.inc
connection master;
include/rpl_stop_server.inc [server_number=1]
include/rpl_start_server.inc [server_number=1]
connection slave;
CHANGE MASTER TO Master_log_file='master-bin.000001', Master_log_pos=4, Master_use_gtid=No;
START SLAVE IO_THREAD;
include/wait_for_slave_io_to_start.inc
START SLAVE UNTIL Master_log_file='master-bin.000001', Master_log_pos= 1178;
SELECT MASTER_POS_WAIT('master-bin.000001', 1178, 60);
MASTER_POS_WAIT('master-bin.000001', 1178, 60)
NULL
SELECT * FROM t1 ORDER BY a;
a b c
1 0
2 0 hulu
3 0 bulu
include/wait_for_slave_sql_to_stop.inc
START SLAVE;
include/wait_for_slave_sql_error.inc [errno=1594]
STOP SLAVE IO_THREAD;
include/wait_for_slave_io_to_stop.inc
CHANGE MASTER TO Master_log_file='master-bin.000001', Master_log_pos=2297;
START SLAVE IO_THREAD;
START SLAVE SQL_THREAD;
include/wait_for_slave_io_to_start.inc
include/wait_for_slave_sql_error.inc [errno=1594]
SELECT * FROM t1 ORDER BY a;
a b c
1 0
2 0 hulu
3 0 bulu
4 0 skip
5 0 after compressed
SELECT * FROM t2 ORDER BY a;
a b
1 {"a": "hulu", "b": "[zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy][zyzzy]", "c": "bulu"}
STOP SLAVE IO_THREAD;
include/wait_for_slave_io_to_stop.inc
CHANGE MASTER TO Master_log_file='master-bin.000002', Master_log_pos=4;
START SLAVE IO_THREAD;
START SLAVE SQL_THREAD;
include/wait_for_slave_io_to_start.inc
include/wait_for_slave_sql_to_start.inc
DROP TABLE t1, t2;
CALL mtr.add_suppression('TRANSACTION_PAYLOAD_EVENT event. You can avoid this event by specifying');
CALL mtr.add_suppression('PARTIAL_UPDATE_ROWS_EVENT event. You can avoid this event by specifying');
connection master;
include/rpl_end.inc

View File

@ -0,0 +1,121 @@
--source include/have_innodb.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc
# Test some replication events from MySQL 8.0 to MariaDB.
# Works by copying in a binlog generated by MySQL 8.0.
# The binlog was generated by the following test case. Note that after
# re-generating this, values for Master_log_pos below may need updating, check
# mysqlbinlog for the correct new values.
#
# The latin1 character set is needed since MariaDB currently does not support
# the default MySQL character set. The binlog_format=statement is needed due to
# missing support for the JSON type in row events. These can be removed once
# support is added in MariaDB.
#
# --source include/not_group_replication_plugin.inc
# --source include/have_binlog_format_row.inc
# --source include/master-slave.inc
#
# --connection master
# SET NAMES 'latin1';
# SET SESSION collation_server= 'latin1_swedish_ci';
# CREATE TABLE t1 (a INT PRIMARY KEY, b INT, c VARCHAR(1024)) ENGINE=InnoDB CHARACTER SET latin1;
# INSERT INTO t1 VALUES (1, 0, '');
# BEGIN;
# INSERT INTO t1 VALUES (2, 0, 'hulu');
# INSERT INTO t1 VALUES (3, 0, 'bulu');
# COMMIT;
# INSERT INTO t1 VALUES (4, 0, 'skip');
#
# SET SESSION binlog_transaction_compression= 1;
# BEGIN;
# --disable_query_log
# --let $i= 0
# while ($i < 100) {
# eval INSERT INTO t1 VALUES ($i+1000, $i, CONCAT("--", $i, "--", REPEAT("\\/", 100), "--"));
# inc $i;
# }
# --enable_query_log
# COMMIT;
# SET SESSION binlog_transaction_compression= default;
#
# INSERT INTO t1 VALUES (5, 0, 'after compressed');
# SET SESSION binlog_row_value_options= PARTIAL_JSON;
# CREATE TABLE t2 (a INT PRIMARY KEY, b JSON) ENGINE=InnoDB;
# SET SESSION binlog_format=statement;
# INSERT INTO t2 VALUES (1, CONCAT('{"a": "hulu", "b": "', REPEAT("[zyzzy]", 100), '", "c": "bulu"}'));
# SET SESSION binlog_format=row;
# UPDATE t2 SET b=JSON_REPLACE(b, '$.b', REPEAT("oOo", 50));
# SET SESSION binlog_row_value_options= DEFAULT;
# --sync_slave_with_master
#
# --connection master
# SET sql_log_bin= 0;
# FLUSH BINARY LOGS;
# SET sql_log_bin= 1;
# DROP TABLE t1, t2;
# --source include/rpl_end.inc
--connection slave
--source include/stop_slave.inc
--connection master
--let $datadir= `SELECT @@datadir`
--let $rpl_server_number= 1
--source include/rpl_stop_server.inc
# Copy in the MySQL 8.0 generated binlog.
--remove_file $datadir/master-bin.000001
--copy_file $MYSQL_TEST_DIR/std_data/mdev35643_mysql_80_binlog.000001 $datadir/master-bin.000001
--source include/rpl_start_server.inc
--save_master_pos
--connection slave
CHANGE MASTER TO Master_log_file='master-bin.000001', Master_log_pos=4, Master_use_gtid=No;
START SLAVE IO_THREAD;
--source include/wait_for_slave_io_to_start.inc
# The position 1178 is the start of: INSERT INTO t1 VALUES (4, 0, 'skip');
# After that comes unknown MySQL 8.0 events, which we test error for below.
START SLAVE UNTIL Master_log_file='master-bin.000001', Master_log_pos= 1178;
SELECT MASTER_POS_WAIT('master-bin.000001', 1178, 60);
SELECT * FROM t1 ORDER BY a;
--source include/wait_for_slave_sql_to_stop.inc
START SLAVE;
--let $slave_sql_errno= 1594
--source include/wait_for_slave_sql_error.inc
STOP SLAVE IO_THREAD;
--source include/wait_for_slave_io_to_stop.inc
# The position 2298 is the start of: INSERT INTO t1 VALUES (5, 0, 'after compressed');
CHANGE MASTER TO Master_log_file='master-bin.000001', Master_log_pos=2297;
START SLAVE IO_THREAD;
START SLAVE SQL_THREAD;
--source include/wait_for_slave_io_to_start.inc
--let $slave_sql_errno= 1594
--source include/wait_for_slave_sql_error.inc
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
STOP SLAVE IO_THREAD;
--source include/wait_for_slave_io_to_stop.inc
# Restart replication after the MySQL 8.0 file.
CHANGE MASTER TO Master_log_file='master-bin.000002', Master_log_pos=4;
START SLAVE IO_THREAD;
START SLAVE SQL_THREAD;
--source include/wait_for_slave_io_to_start.inc
--source include/wait_for_slave_sql_to_start.inc
--sync_with_master
DROP TABLE t1, t2;
CALL mtr.add_suppression('TRANSACTION_PAYLOAD_EVENT event. You can avoid this event by specifying');
CALL mtr.add_suppression('PARTIAL_UPDATE_ROWS_EVENT event. You can avoid this event by specifying');
--connection master
--source include/rpl_end.inc

View File

@ -694,6 +694,9 @@ const char* Log_event::get_type_str(Log_event_type type)
case TRANSACTION_CONTEXT_EVENT: return "Transaction_context"; case TRANSACTION_CONTEXT_EVENT: return "Transaction_context";
case VIEW_CHANGE_EVENT: return "View_change"; case VIEW_CHANGE_EVENT: return "View_change";
case XA_PREPARE_LOG_EVENT: return "XA_prepare"; case XA_PREPARE_LOG_EVENT: return "XA_prepare";
case PARTIAL_UPDATE_ROWS_EVENT: return "MySQL Update_rows_partial";
case TRANSACTION_PAYLOAD_EVENT: return "MySQL Transaction_payload";
case HEARTBEAT_LOG_EVENT_V2: return "MySQL Heartbeat";
case QUERY_COMPRESSED_EVENT: return "Query_compressed"; case QUERY_COMPRESSED_EVENT: return "Query_compressed";
case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed"; case WRITE_ROWS_COMPRESSED_EVENT: return "Write_rows_compressed";
case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed"; case UPDATE_ROWS_COMPRESSED_EVENT: return "Update_rows_compressed";
@ -1017,6 +1020,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
DBUG_PRINT("info", ("binlog_version: %d", fdle->binlog_version)); DBUG_PRINT("info", ("binlog_version: %d", fdle->binlog_version));
DBUG_DUMP_EVENT_BUF(buf, event_len); DBUG_DUMP_EVENT_BUF(buf, event_len);
*error= 0;
/* /*
Check the integrity; This is needed because handle_slave_io() doesn't Check the integrity; This is needed because handle_slave_io() doesn't
check if packet is of proper length. check if packet is of proper length.
@ -1237,6 +1241,7 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
case ANONYMOUS_GTID_LOG_EVENT: case ANONYMOUS_GTID_LOG_EVENT:
case PREVIOUS_GTIDS_LOG_EVENT: case PREVIOUS_GTIDS_LOG_EVENT:
case TRANSACTION_CONTEXT_EVENT: case TRANSACTION_CONTEXT_EVENT:
case HEARTBEAT_LOG_EVENT_V2: // MySQL 8.0
case VIEW_CHANGE_EVENT: case VIEW_CHANGE_EVENT:
ev= new Ignorable_log_event(buf, fdle, ev= new Ignorable_log_event(buf, fdle,
get_type_str((Log_event_type) event_type)); get_type_str((Log_event_type) event_type));
@ -1261,6 +1266,21 @@ Log_event* Log_event::read_log_event(const uchar *buf, uint event_len,
case START_ENCRYPTION_EVENT: case START_ENCRYPTION_EVENT:
ev= new Start_encryption_log_event(buf, event_len, fdle); ev= new Start_encryption_log_event(buf, event_len, fdle);
break; break;
case TRANSACTION_PAYLOAD_EVENT: // MySQL 8.0
*error=
"Found incompatible MySQL 8.0 TRANSACTION_PAYLOAD_EVENT event. "
"You can avoid this event by specifying "
"'binlog_transaction_compression=0' in the MySQL server";
ev= NULL;
break;
case PARTIAL_UPDATE_ROWS_EVENT: // MySQL 8.0
*error=
"Found incompatible MySQL 8.0 PARTIAL_UPDATE_ROWS_EVENT event. "
"You can avoid this event by specifying "
"'binlog-row-value-options=\"\"' in the MySQL server";
ev= NULL;
break;
default: default:
DBUG_PRINT("error",("Unknown event code: %d", DBUG_PRINT("error",("Unknown event code: %d",
(uchar) buf[EVENT_TYPE_OFFSET])); (uchar) buf[EVENT_TYPE_OFFSET]));
@ -1303,12 +1323,14 @@ exit:
#ifdef MYSQL_CLIENT #ifdef MYSQL_CLIENT
if (!force_opt) /* then mysqlbinlog dies */ if (!force_opt) /* then mysqlbinlog dies */
{ {
*error= "Found invalid event in binary log"; if (!*error)
*error= "Found invalid event in binary log";
DBUG_RETURN(0); DBUG_RETURN(0);
} }
ev= new Unknown_log_event(buf, fdle); ev= new Unknown_log_event(buf, fdle);
#else #else
*error= "Found invalid event in binary log"; if (!*error)
*error= "Found invalid event in binary log";
DBUG_RETURN(0); DBUG_RETURN(0);
#endif #endif
} }
@ -2126,6 +2148,9 @@ Format_description_log_event(uint8 binlog_ver, const char* server_ver)
post_header_len[TRANSACTION_CONTEXT_EVENT-1]= 0; post_header_len[TRANSACTION_CONTEXT_EVENT-1]= 0;
post_header_len[VIEW_CHANGE_EVENT-1]= 0; post_header_len[VIEW_CHANGE_EVENT-1]= 0;
post_header_len[XA_PREPARE_LOG_EVENT-1]= 0; post_header_len[XA_PREPARE_LOG_EVENT-1]= 0;
post_header_len[PARTIAL_UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[TRANSACTION_PAYLOAD_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[HEARTBEAT_LOG_EVENT_V2-1]= ROWS_HEADER_LEN_V2;
post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2; post_header_len[WRITE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2; post_header_len[UPDATE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;
post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2; post_header_len[DELETE_ROWS_EVENT-1]= ROWS_HEADER_LEN_V2;

View File

@ -678,6 +678,14 @@ enum Log_event_type
/* not ignored */ /* not ignored */
XA_PREPARE_LOG_EVENT= 38, XA_PREPARE_LOG_EVENT= 38,
/**
Extension of UPDATE_ROWS_EVENT, allowing partial values according
to binlog_row_value_options.
*/
PARTIAL_UPDATE_ROWS_EVENT = 39,
TRANSACTION_PAYLOAD_EVENT = 40,
HEARTBEAT_LOG_EVENT_V2 = 41,
/* /*
Add new events here - right above this comment! Add new events here - right above this comment!
Existing events (except ENUM_END_EVENT) should never change their numbers Existing events (except ENUM_END_EVENT) should never change their numbers

View File

@ -1970,7 +1970,7 @@ bool Query_log_event::print_query_header(IO_CACHE* file,
print_event_info->auto_increment_offset= auto_increment_offset; print_event_info->auto_increment_offset= auto_increment_offset;
} }
/* TODO: print the catalog when we feature SET CATALOG */ /* TODO: print the catalog when we feature USE CATALOG */
if (likely(charset_inited) && if (likely(charset_inited) &&
(unlikely(!print_event_info->charset_inited || (unlikely(!print_event_info->charset_inited ||
@ -1984,12 +1984,15 @@ bool Query_log_event::print_query_header(IO_CACHE* file,
cs_info->cs_name.str, print_event_info->delimiter)) cs_info->cs_name.str, print_event_info->delimiter))
goto err; goto err;
} }
else if (my_b_printf(file, "# Ignored (Unknown charset) "))
goto err;
if (my_b_printf(file,"SET " if (my_b_printf(file,"SET "
"@@session.character_set_client=%s," "@@session.character_set_client=%s,"
"@@session.collation_connection=%d," "@@session.collation_connection=%d,"
"@@session.collation_server=%d" "@@session.collation_server=%d"
"%s\n", "%s\n",
cs_info->cs_name.str, cs_info ? cs_info->cs_name.str : "Unknown",
uint2korr(charset+2), uint2korr(charset+2),
uint2korr(charset+4), uint2korr(charset+4),
print_event_info->delimiter)) print_event_info->delimiter))

View File

@ -434,7 +434,6 @@ static int send_file(THD *thd)
Internal to mysql_binlog_send() routine that recalculates checksum for Internal to mysql_binlog_send() routine that recalculates checksum for
1. FD event (asserted) that needs additional arrangement prior sending to slave. 1. FD event (asserted) that needs additional arrangement prior sending to slave.
2. Start_encryption_log_event whose Ignored flag is set 2. Start_encryption_log_event whose Ignored flag is set
TODO DBUG_ASSERT can be removed if this function is used for more general cases
*/ */
inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet, inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet,
@ -446,13 +445,6 @@ inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet,
/* recalculate the crc for this event */ /* recalculate the crc for this event */
uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET); uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
ha_checksum crc; ha_checksum crc;
DBUG_ASSERT((data_len ==
LOG_EVENT_MINIMAL_HEADER_LEN + FORMAT_DESCRIPTION_HEADER_LEN +
BINLOG_CHECKSUM_ALG_DESC_LEN + BINLOG_CHECKSUM_LEN) ||
(data_len ==
LOG_EVENT_MINIMAL_HEADER_LEN + BINLOG_CRYPTO_SCHEME_LENGTH +
BINLOG_KEY_VERSION_LENGTH + BINLOG_NONCE_LENGTH +
BINLOG_CHECKSUM_LEN));
crc= my_checksum(0, (uchar *)packet->ptr() + ev_offset, data_len - crc= my_checksum(0, (uchar *)packet->ptr() + ev_offset, data_len -
BINLOG_CHECKSUM_LEN); BINLOG_CHECKSUM_LEN);
int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc); int4store(packet->ptr() + ev_offset + data_len - BINLOG_CHECKSUM_LEN, crc);