BUG#19459 (BINLOG RBR command does not lock tables correctly causing
crash for, e.g., NDB): Before, mysqlbinlog printed table map events as a separate statement, so when executing the event, the opened table was subsequently closed when the statement ended. Instead, the row-based events that make up a statement are now printed as *one* BINLOG statement, which means that the table maps and the following *_rows_log_event events are executed fully before the statement ends. Changing implementation of BINLOG statement to be able to read the emitted format, which now consists of several chunks of BASE64-encoded data.
This commit is contained in:
parent
d1b9686245
commit
e762328b54
@ -475,6 +475,30 @@ static bool check_database(const char *log_dbname)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
write_event_header_and_base64(Log_event *ev, FILE *result_file,
|
||||||
|
PRINT_EVENT_INFO *print_event_info)
|
||||||
|
{
|
||||||
|
DBUG_ENTER("write_event_header_and_base64");
|
||||||
|
/* Write header and base64 output to cache */
|
||||||
|
IO_CACHE result_cache;
|
||||||
|
if (init_io_cache(&result_cache, -1, 0, WRITE_CACHE, 0L, FALSE,
|
||||||
|
MYF(MY_WME | MY_NABP)))
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ev->print_header(&result_cache, print_event_info, FALSE);
|
||||||
|
ev->print_base64(&result_cache, print_event_info, FALSE);
|
||||||
|
|
||||||
|
/* Read data from cache and write to result file */
|
||||||
|
my_b_copy_to_file(&result_cache, result_file);
|
||||||
|
end_io_cache(&result_cache);
|
||||||
|
DBUG_RETURN(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Process an event
|
Process an event
|
||||||
|
|
||||||
@ -537,18 +561,18 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
|
|||||||
|
|
||||||
print_event_info->base64_output= opt_base64_output;
|
print_event_info->base64_output= opt_base64_output;
|
||||||
|
|
||||||
|
DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str()));
|
||||||
|
|
||||||
switch (ev_type) {
|
switch (ev_type) {
|
||||||
case QUERY_EVENT:
|
case QUERY_EVENT:
|
||||||
if (check_database(((Query_log_event*)ev)->db))
|
if (check_database(((Query_log_event*)ev)->db))
|
||||||
goto end;
|
goto end;
|
||||||
if (opt_base64_output)
|
if (opt_base64_output)
|
||||||
{
|
write_event_header_and_base64(ev, result_file, print_event_info);
|
||||||
ev->print_header(result_file, print_event_info);
|
|
||||||
ev->print_base64(result_file, print_event_info);
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
ev->print(result_file, print_event_info);
|
ev->print(result_file, print_event_info);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case CREATE_FILE_EVENT:
|
case CREATE_FILE_EVENT:
|
||||||
{
|
{
|
||||||
Create_file_log_event* ce= (Create_file_log_event*)ev;
|
Create_file_log_event* ce= (Create_file_log_event*)ev;
|
||||||
@ -569,8 +593,7 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
|
|||||||
*/
|
*/
|
||||||
if (opt_base64_output)
|
if (opt_base64_output)
|
||||||
{
|
{
|
||||||
ce->print_header(result_file, print_event_info);
|
write_event_header_and_base64(ce, result_file, print_event_info);
|
||||||
ce->print_base64(result_file, print_event_info);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
ce->print(result_file, print_event_info, TRUE);
|
ce->print(result_file, print_event_info, TRUE);
|
||||||
|
@ -28,7 +28,7 @@
|
|||||||
Copy contents of an IO_CACHE to a file.
|
Copy contents of an IO_CACHE to a file.
|
||||||
|
|
||||||
SYNOPSIS
|
SYNOPSIS
|
||||||
copy_io_cache_to_file()
|
my_b_copy_to_file()
|
||||||
cache IO_CACHE to copy from
|
cache IO_CACHE to copy from
|
||||||
file File to copy to
|
file File to copy to
|
||||||
|
|
||||||
|
575
sql/log_event.cc
575
sql/log_event.cc
File diff suppressed because it is too large
Load Diff
@ -519,14 +519,30 @@ typedef struct st_print_event_info
|
|||||||
bzero(db, sizeof(db));
|
bzero(db, sizeof(db));
|
||||||
bzero(charset, sizeof(charset));
|
bzero(charset, sizeof(charset));
|
||||||
bzero(time_zone_str, sizeof(time_zone_str));
|
bzero(time_zone_str, sizeof(time_zone_str));
|
||||||
|
uint const flags = MYF(MY_WME | MY_NABP);
|
||||||
|
init_io_cache(&head_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags);
|
||||||
|
init_io_cache(&body_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
~st_print_event_info() {
|
||||||
|
end_io_cache(&head_cache);
|
||||||
|
end_io_cache(&body_cache);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Settings on how to print the events */
|
/* Settings on how to print the events */
|
||||||
bool short_form;
|
bool short_form;
|
||||||
bool base64_output;
|
bool base64_output;
|
||||||
my_off_t hexdump_from;
|
my_off_t hexdump_from;
|
||||||
uint8 common_header_len;
|
uint8 common_header_len;
|
||||||
|
|
||||||
|
/*
|
||||||
|
These two caches are used by the row-based replication events to
|
||||||
|
collect the header information and the main body of the events
|
||||||
|
making up a statement.
|
||||||
|
*/
|
||||||
|
IO_CACHE head_cache;
|
||||||
|
IO_CACHE body_cache;
|
||||||
} PRINT_EVENT_INFO;
|
} PRINT_EVENT_INFO;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
@ -637,9 +653,11 @@ public:
|
|||||||
const Format_description_log_event *description_event);
|
const Format_description_log_event *description_event);
|
||||||
/* print*() functions are used by mysqlbinlog */
|
/* print*() functions are used by mysqlbinlog */
|
||||||
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
|
virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
|
||||||
void print_timestamp(FILE* file, time_t *ts = 0);
|
void print_timestamp(IO_CACHE* file, time_t *ts = 0);
|
||||||
void print_header(FILE* file, PRINT_EVENT_INFO* print_event_info);
|
void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
|
||||||
void print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info);
|
bool is_more);
|
||||||
|
void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
|
||||||
|
bool is_more);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static void *operator new(size_t size)
|
static void *operator new(size_t size)
|
||||||
@ -804,7 +822,7 @@ public:
|
|||||||
uint32 q_len_arg);
|
uint32 q_len_arg);
|
||||||
#endif /* HAVE_REPLICATION */
|
#endif /* HAVE_REPLICATION */
|
||||||
#else
|
#else
|
||||||
void print_query_header(FILE* file, PRINT_EVENT_INFO* print_event_info);
|
void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info);
|
||||||
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
|
void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
@ -1843,6 +1861,10 @@ protected:
|
|||||||
Log_event_type event_type,
|
Log_event_type event_type,
|
||||||
const Format_description_log_event *description_event);
|
const Format_description_log_event *description_event);
|
||||||
|
|
||||||
|
#ifdef MYSQL_CLIENT
|
||||||
|
void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name);
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifndef MYSQL_CLIENT
|
#ifndef MYSQL_CLIENT
|
||||||
virtual int do_add_row_data(byte *data, my_size_t length);
|
virtual int do_add_row_data(byte *data, my_size_t length);
|
||||||
#endif
|
#endif
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
|
|
||||||
void mysql_client_binlog_statement(THD* thd)
|
void mysql_client_binlog_statement(THD* thd)
|
||||||
{
|
{
|
||||||
|
DBUG_ENTER("mysql_client_binlog_statement");
|
||||||
DBUG_PRINT("info",("binlog base64: '%*s'",
|
DBUG_PRINT("info",("binlog base64: '%*s'",
|
||||||
(thd->lex->comment.length < 2048 ?
|
(thd->lex->comment.length < 2048 ?
|
||||||
thd->lex->comment.length : 2048),
|
thd->lex->comment.length : 2048),
|
||||||
@ -43,8 +44,8 @@ void mysql_client_binlog_statement(THD* thd)
|
|||||||
my_bool nsok= thd->net.no_send_ok;
|
my_bool nsok= thd->net.no_send_ok;
|
||||||
thd->net.no_send_ok= TRUE;
|
thd->net.no_send_ok= TRUE;
|
||||||
|
|
||||||
const my_size_t coded_len= thd->lex->comment.length + 1;
|
my_size_t coded_len= thd->lex->comment.length + 1;
|
||||||
const my_size_t event_len= base64_needed_decoded_length(coded_len);
|
my_size_t decoded_len= base64_needed_decoded_length(coded_len);
|
||||||
DBUG_ASSERT(coded_len > 0);
|
DBUG_ASSERT(coded_len > 0);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -57,9 +58,8 @@ void mysql_client_binlog_statement(THD* thd)
|
|||||||
new Format_description_log_event(4);
|
new Format_description_log_event(4);
|
||||||
|
|
||||||
const char *error= 0;
|
const char *error= 0;
|
||||||
char *buf= (char *) my_malloc(event_len, MYF(MY_WME));
|
char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
|
||||||
Log_event *ev = 0;
|
Log_event *ev = 0;
|
||||||
int res;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Out of memory check
|
Out of memory check
|
||||||
@ -73,43 +73,97 @@ void mysql_client_binlog_statement(THD* thd)
|
|||||||
thd->rli_fake->sql_thd= thd;
|
thd->rli_fake->sql_thd= thd;
|
||||||
thd->rli_fake->no_storage= TRUE;
|
thd->rli_fake->no_storage= TRUE;
|
||||||
|
|
||||||
res= base64_decode(thd->lex->comment.str, coded_len, buf);
|
for (char const *strptr= thd->lex->comment.str ;
|
||||||
|
strptr < thd->lex->comment.str + thd->lex->comment.length ; )
|
||||||
DBUG_PRINT("info",("binlog base64 decoded_len=%d, event_len=%d\n",
|
|
||||||
res, uint4korr(buf + EVENT_LEN_OFFSET)));
|
|
||||||
/*
|
|
||||||
Note that 'res' is the correct event length, 'event_len' was
|
|
||||||
calculated based on the base64-string that possibly contained
|
|
||||||
extra spaces, so it can be longer than the real event.
|
|
||||||
*/
|
|
||||||
if (res < EVENT_LEN_OFFSET
|
|
||||||
|| (uint) res != uint4korr(buf+EVENT_LEN_OFFSET))
|
|
||||||
{
|
{
|
||||||
my_error(ER_SYNTAX_ERROR, MYF(0));
|
char const *endptr= 0;
|
||||||
goto end;
|
int bytes_decoded= base64_decode(strptr, coded_len, buf, &endptr);
|
||||||
}
|
|
||||||
|
|
||||||
ev= Log_event::read_log_event(buf, res, &error, desc);
|
DBUG_PRINT("info",
|
||||||
|
("bytes_decoded=%d; strptr=0x%lu; endptr=0x%lu ('%c':%d)",
|
||||||
|
bytes_decoded, strptr, endptr, *endptr, *endptr));
|
||||||
|
|
||||||
|
if (bytes_decoded < 0)
|
||||||
|
{
|
||||||
|
my_error(ER_BASE64_DECODE_ERROR, MYF(0));
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
else if (bytes_decoded == 0)
|
||||||
|
break; // If no bytes where read, the string contained only whitespace
|
||||||
|
|
||||||
|
DBUG_ASSERT(bytes_decoded > 0);
|
||||||
|
DBUG_ASSERT(endptr > strptr);
|
||||||
|
coded_len-= endptr - strptr;
|
||||||
|
strptr= endptr;
|
||||||
|
|
||||||
DBUG_PRINT("info",("binlog base64 err=%s", error));
|
|
||||||
if (!ev)
|
|
||||||
{
|
|
||||||
/*
|
/*
|
||||||
This could actually be an out-of-memory, but it is more
|
Now we have one or more events stored in the buffer. The size of
|
||||||
likely causes by a bad statement
|
the buffer is computed based on how much base64-encoded data
|
||||||
|
there were, so there should be ample space for the data (maybe
|
||||||
|
even too much, since a statement can consist of a considerable
|
||||||
|
number of events).
|
||||||
|
|
||||||
|
TODO: Switch to use a stream-based base64 encoder/decoder in
|
||||||
|
order to be able to read exactly what is necessary.
|
||||||
*/
|
*/
|
||||||
my_error(ER_SYNTAX_ERROR, MYF(0));
|
|
||||||
goto end;
|
|
||||||
}
|
|
||||||
|
|
||||||
DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
|
DBUG_PRINT("info",("binlog base64 decoded_len=%d, bytes_decoded=%d",
|
||||||
DBUG_PRINT("info",("buf+EVENT_TYPE_OFFSET=%d", buf+EVENT_TYPE_OFFSET));
|
decoded_len, bytes_decoded));
|
||||||
|
|
||||||
ev->thd= thd;
|
/*
|
||||||
if (ev->exec_event(thd->rli_fake))
|
Now we start to read events of the buffer, until there are no
|
||||||
{
|
more.
|
||||||
my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
|
*/
|
||||||
goto end;
|
for (char *bufptr= buf ; bytes_decoded > 0 ; )
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
Checking that the first event in the buffer is not truncated.
|
||||||
|
*/
|
||||||
|
ulong event_len= uint4korr(bufptr + EVENT_LEN_OFFSET);
|
||||||
|
DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
|
||||||
|
event_len, bytes_decoded));
|
||||||
|
if (bytes_decoded < EVENT_LEN_OFFSET || (uint) bytes_decoded < event_len)
|
||||||
|
{
|
||||||
|
my_error(ER_SYNTAX_ERROR, MYF(0));
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
ev= Log_event::read_log_event(bufptr, event_len, &error, desc);
|
||||||
|
|
||||||
|
DBUG_PRINT("info",("binlog base64 err=%s", error));
|
||||||
|
if (!ev)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
This could actually be an out-of-memory, but it is more likely
|
||||||
|
causes by a bad statement
|
||||||
|
*/
|
||||||
|
my_error(ER_SYNTAX_ERROR, MYF(0));
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes_decoded -= event_len;
|
||||||
|
bufptr += event_len;
|
||||||
|
|
||||||
|
DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
|
||||||
|
DBUG_PRINT("info",("bufptr+EVENT_TYPE_OFFSET=0x%lx",
|
||||||
|
bufptr+EVENT_TYPE_OFFSET));
|
||||||
|
DBUG_PRINT("info", ("bytes_decoded=%d; bufptr=0x%lx; buf[EVENT_LEN_OFFSET]=%u",
|
||||||
|
bytes_decoded, bufptr, uint4korr(bufptr+EVENT_LEN_OFFSET)));
|
||||||
|
ev->thd= thd;
|
||||||
|
if (int err= ev->exec_event(thd->rli_fake))
|
||||||
|
{
|
||||||
|
DBUG_PRINT("info", ("exec_event() - error=%d", error));
|
||||||
|
/*
|
||||||
|
TODO: Maybe a better error message since the BINLOG statement
|
||||||
|
now contains several events.
|
||||||
|
*/
|
||||||
|
my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete ev;
|
||||||
|
ev= 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -126,10 +180,7 @@ end:
|
|||||||
*/
|
*/
|
||||||
thd->net.no_send_ok= nsok;
|
thd->net.no_send_ok= nsok;
|
||||||
|
|
||||||
if (ev)
|
delete desc;
|
||||||
delete ev;
|
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
|
||||||
if (desc)
|
DBUG_VOID_RETURN;
|
||||||
delete desc;
|
|
||||||
if (buf)
|
|
||||||
my_free(buf, MYF(0));
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user