MDEV-16329 [3/5] use binlog_cache_data directly in most places
* Eliminate most usages of THD::use_trans_table. Only 3 left, and they are at quite high levels, and really essential. * Eliminate is_transactional argument when possible. Lots of places are left though, because of some WSREP error handling in MYSQL_BIN_LOG::set_write_error. * Remove junk binlog functions from THD * binlog_prepare_pending_rows_event is moved to log.cc inside MYSQL_BIN_LOG and is not anymore template. Instead it accepls event factory with a type code, and a callback to a constructing function in it.
This commit is contained in:
parent
429f635f30
commit
6427e343cf
@ -7269,7 +7269,9 @@ int handler::binlog_log_row(TABLE *table,
|
||||
if (thd->variables.option_bits & OPTION_GTID_BEGIN)
|
||||
is_trans= 1;
|
||||
|
||||
bool error= (*log_func)(thd, table, &mysql_bin_log, cache_mngr,
|
||||
auto *cache= binlog_get_cache_data(cache_mngr, use_trans_cache(thd, is_trans));
|
||||
|
||||
bool error= (*log_func)(thd, table, &mysql_bin_log, cache,
|
||||
is_trans, before_record, after_record);
|
||||
DBUG_RETURN(error ? HA_ERR_RBR_LOGGING_FAILED : 0);
|
||||
}
|
||||
|
@ -656,8 +656,8 @@ given at all. */
|
||||
typedef ulonglong alter_table_operations;
|
||||
|
||||
class MYSQL_BIN_LOG;
|
||||
class binlog_cache_mngr;
|
||||
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_mngr *, bool,
|
||||
class binlog_cache_data;
|
||||
typedef bool Log_func(THD*, TABLE*, MYSQL_BIN_LOG *, binlog_cache_data *, bool,
|
||||
const uchar*, const uchar*);
|
||||
|
||||
/*
|
||||
|
173
sql/log.cc
173
sql/log.cc
@ -2055,29 +2055,31 @@ static int
|
||||
binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
|
||||
{
|
||||
DBUG_ENTER("binlog_truncate_trx_cache");
|
||||
|
||||
if(!WSREP_EMULATE_BINLOG_NNULL(thd) && !mysql_bin_log.is_open())
|
||||
DBUG_RETURN(0);
|
||||
|
||||
int error=0;
|
||||
/*
|
||||
This function handles transactional changes and as such this flag
|
||||
equals to true.
|
||||
*/
|
||||
bool const is_transactional= TRUE;
|
||||
|
||||
DBUG_PRINT("info", ("thd->options={ %s %s}, transaction: %s",
|
||||
FLAGSTR(thd->variables.option_bits, OPTION_NOT_AUTOCOMMIT),
|
||||
FLAGSTR(thd->variables.option_bits, OPTION_BEGIN),
|
||||
all ? "all" : "stmt"));
|
||||
|
||||
thd->binlog_remove_pending_rows_event(TRUE, is_transactional);
|
||||
auto &trx_cache= cache_mngr->trx_cache;
|
||||
MYSQL_BIN_LOG::remove_pending_rows_event(thd, &trx_cache);
|
||||
thd->reset_binlog_for_next_statement();
|
||||
|
||||
/*
|
||||
If rolling back an entire transaction or a single statement not
|
||||
inside a transaction, we reset the transaction cache.
|
||||
*/
|
||||
if (ending_trans(thd, all))
|
||||
{
|
||||
if (cache_mngr->trx_cache.has_incident())
|
||||
if (trx_cache.has_incident())
|
||||
error= mysql_bin_log.write_incident(thd);
|
||||
|
||||
thd->reset_binlog_for_next_statement();
|
||||
DBUG_ASSERT(thd->binlog_table_maps == 0);
|
||||
|
||||
cache_mngr->reset(false, true);
|
||||
}
|
||||
@ -2086,9 +2088,9 @@ binlog_truncate_trx_cache(THD *thd, binlog_cache_mngr *cache_mngr, bool all)
|
||||
transaction cache to remove the statement.
|
||||
*/
|
||||
else
|
||||
cache_mngr->trx_cache.restore_prev_position();
|
||||
trx_cache.restore_prev_position();
|
||||
|
||||
DBUG_ASSERT(cache_mngr->trx_cache.pending() == NULL);
|
||||
DBUG_ASSERT(trx_cache.pending() == NULL);
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
@ -2406,7 +2408,7 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
||||
thd->reset_binlog_for_next_statement();
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
if (!wsrep_emulate_bin_log && mysql_bin_log.check_write_error(thd))
|
||||
if (!wsrep_emulate_bin_log && MYSQL_BIN_LOG::check_write_error(thd))
|
||||
{
|
||||
/*
|
||||
"all == true" means that a "rollback statement" triggered the error and
|
||||
@ -2458,12 +2460,13 @@ static int binlog_rollback(handlerton *hton, THD *thd, bool all)
|
||||
|
||||
void binlog_reset_cache(THD *thd)
|
||||
{
|
||||
binlog_cache_mngr *const cache_mngr= opt_bin_log ?
|
||||
binlog_cache_mngr *const cache_mngr= opt_bin_log ?
|
||||
thd->binlog_get_cache_mngr() : 0;
|
||||
DBUG_ENTER("binlog_reset_cache");
|
||||
if (cache_mngr)
|
||||
{
|
||||
thd->binlog_remove_pending_rows_event(TRUE, TRUE);
|
||||
MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache);
|
||||
thd->reset_binlog_for_next_statement();
|
||||
cache_mngr->reset(true, true);
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
@ -6318,30 +6321,31 @@ binlog_cache_mngr *THD::binlog_get_cache_mngr() const
|
||||
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
|
||||
bool use_trans_cache)
|
||||
{
|
||||
DBUG_ASSERT(cache_mngr);
|
||||
Rows_log_event* rows= NULL;
|
||||
|
||||
/*
|
||||
This is less than ideal, but here's the story: If there is no cache_mngr,
|
||||
prepare_pending_rows_event() has never been called (since the cache_mngr
|
||||
is set up there). In that case, we just return NULL.
|
||||
*/
|
||||
if (cache_mngr)
|
||||
rows= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
|
||||
return rows;
|
||||
}
|
||||
|
||||
binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
|
||||
bool use_trans_cache)
|
||||
{
|
||||
return cache_mngr->get_binlog_cache_data(use_trans_cache);
|
||||
}
|
||||
|
||||
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
|
||||
bool is_transactional,
|
||||
MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
bool use_trans_cache)
|
||||
binlog_cache_data *cache_data)
|
||||
{
|
||||
/*
|
||||
Mark the event as the last event of a statement if the stmt_end
|
||||
flag is set.
|
||||
*/
|
||||
int error= 0;
|
||||
auto *pending= cache_mngr->get_binlog_cache_data(use_trans_cache)->pending();
|
||||
auto *pending= cache_data->pending();
|
||||
if (pending)
|
||||
{
|
||||
if (stmt_end)
|
||||
@ -6350,36 +6354,12 @@ int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
|
||||
thd->reset_binlog_for_next_statement();
|
||||
}
|
||||
|
||||
error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_mngr,
|
||||
error= bin_log->flush_and_set_pending_rows_event(thd, 0, cache_data,
|
||||
is_transactional);
|
||||
}
|
||||
return error;
|
||||
}
|
||||
|
||||
/**
|
||||
This function stores a pending row event into a cache which is specified
|
||||
through the parameter @c is_transactional. Respectively, when it is @c
|
||||
true, the pending event is stored into the transactional cache. Otherwise
|
||||
into the non-transactional cache.
|
||||
|
||||
@param evt a pointer to the row event.
|
||||
@param use_trans_cache @c true indicates a transactional cache,
|
||||
otherwise @c false a non-transactional.
|
||||
*/
|
||||
void
|
||||
THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
|
||||
{
|
||||
binlog_cache_mngr *const cache_mngr= binlog_setup_trx_data();
|
||||
|
||||
DBUG_ASSERT(cache_mngr);
|
||||
|
||||
binlog_cache_data *cache_data=
|
||||
cache_mngr->get_binlog_cache_data(use_trans_cache);
|
||||
|
||||
cache_data->set_pending(ev);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
This function removes the pending rows event, discarding any outstanding
|
||||
rows. If there is no pending rows event available, this is effectively a
|
||||
@ -6390,17 +6370,10 @@ THD::binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache)
|
||||
otherwise @c false a non-transactional.
|
||||
*/
|
||||
int
|
||||
MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
|
||||
MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data)
|
||||
{
|
||||
DBUG_ENTER("MYSQL_BIN_LOG::remove_pending_rows_event");
|
||||
|
||||
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
|
||||
|
||||
DBUG_ASSERT(cache_mngr);
|
||||
|
||||
binlog_cache_data *cache_data=
|
||||
cache_mngr->get_binlog_cache_data(use_trans_cache(thd, is_transactional));
|
||||
|
||||
if (Rows_log_event* pending= cache_data->pending())
|
||||
{
|
||||
delete pending;
|
||||
@ -6414,6 +6387,7 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
|
||||
Moves the last bunch of rows from the pending Rows event to a cache (either
|
||||
transactional cache if is_transaction is @c true, or the non-transactional
|
||||
cache otherwise. Sets a new pending event.
|
||||
In case of error during flushing, sets write_error=1 to itself.
|
||||
|
||||
@param thd a pointer to the user thread.
|
||||
@param evt a pointer to the row event.
|
||||
@ -6423,19 +6397,13 @@ MYSQL_BIN_LOG::remove_pending_rows_event(THD *thd, bool is_transactional)
|
||||
int
|
||||
MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
|
||||
Rows_log_event* event,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
binlog_cache_data *cache_data,
|
||||
bool is_transactional)
|
||||
{
|
||||
DBUG_ENTER("MYSQL_BIN_LOG::flush_and_set_pending_rows_event(event)");
|
||||
DBUG_ASSERT(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open());
|
||||
DBUG_PRINT("enter", ("event: %p", event));
|
||||
|
||||
DBUG_ASSERT(cache_mngr);
|
||||
|
||||
bool should_use_trans_cache= use_trans_cache(thd, is_transactional);
|
||||
binlog_cache_data *cache_data=
|
||||
cache_mngr->get_binlog_cache_data(should_use_trans_cache);
|
||||
|
||||
DBUG_PRINT("info", ("cache_mngr->pending(): %p", cache_data->pending()));
|
||||
|
||||
if (Rows_log_event* pending= cache_data->pending())
|
||||
@ -6463,11 +6431,89 @@ MYSQL_BIN_LOG::flush_and_set_pending_rows_event(THD *thd,
|
||||
delete pending;
|
||||
}
|
||||
|
||||
thd->binlog_set_pending_rows_event(event, should_use_trans_cache);
|
||||
cache_data->set_pending(event);
|
||||
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
/*
|
||||
Member function for ensuring that there is an rows log
|
||||
event of the apropriate type before proceeding.
|
||||
|
||||
POST CONDITION:
|
||||
If a non-NULL pointer is returned, the pending event for thread 'thd' will
|
||||
be an event created by callback hold by event_factory, and
|
||||
will be either empty or have enough space to hold 'needed' bytes.
|
||||
In addition, the columns bitmap will be correct for the row, meaning that
|
||||
the pending event will be flushed if the columns in the event differ from
|
||||
the columns suppled to the function.
|
||||
|
||||
RETURNS
|
||||
If no error, a non-NULL pending event (either one which already existed or
|
||||
the newly created one).
|
||||
If error, NULL.
|
||||
*/
|
||||
|
||||
Rows_log_event*
|
||||
MYSQL_BIN_LOG::prepare_pending_rows_event(THD *thd, TABLE* table,
|
||||
binlog_cache_data *cache_data,
|
||||
uint32 serv_id, size_t needed,
|
||||
bool is_transactional,
|
||||
Rows_event_factory event_factory)
|
||||
{
|
||||
DBUG_ENTER("MYSQL_BIN_LOG::prepare_pending_rows_event");
|
||||
/* Pre-conditions */
|
||||
DBUG_ASSERT(table->s->table_map_id != ~0UL);
|
||||
|
||||
/*
|
||||
There is no good place to set up the transactional data, so we
|
||||
have to do it here.
|
||||
*/
|
||||
Rows_log_event* pending= cache_data->pending();
|
||||
|
||||
if (unlikely(pending && !pending->is_valid()))
|
||||
DBUG_RETURN(NULL);
|
||||
|
||||
/*
|
||||
Check if the current event is non-NULL and a write-rows
|
||||
event. Also check if the table provided is mapped: if it is not,
|
||||
then we have switched to writing to a new table.
|
||||
If there is no pending event, we need to create one. If there is a pending
|
||||
event, but it's not about the same table id, or not of the same type
|
||||
(between Write, Update and Delete), or not the same affected columns, or
|
||||
going to be too big, flush this event to disk and create a new pending
|
||||
event.
|
||||
*/
|
||||
if (!pending ||
|
||||
pending->server_id != serv_id ||
|
||||
pending->get_table_id() != table->s->table_map_id ||
|
||||
pending->get_general_type_code() != event_factory.type_code ||
|
||||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
|
||||
pending->read_write_bitmaps_cmp(table) == FALSE)
|
||||
{
|
||||
/* Create a new RowsEventT... */
|
||||
Rows_log_event* const
|
||||
ev= event_factory.create(thd, table, table->s->table_map_id,
|
||||
is_transactional);
|
||||
if (unlikely(!ev))
|
||||
DBUG_RETURN(NULL);
|
||||
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
|
||||
/*
|
||||
flush the pending event and replace it with the newly created
|
||||
event...
|
||||
*/
|
||||
if (unlikely(flush_and_set_pending_rows_event(thd, ev, cache_data,
|
||||
is_transactional)))
|
||||
{
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
DBUG_RETURN(ev); /* This is the new pending event */
|
||||
}
|
||||
DBUG_RETURN(pending); /* This is the current pending event */
|
||||
}
|
||||
|
||||
|
||||
/* Generate a new global transaction ID, and write it to the binlog */
|
||||
|
||||
@ -12108,7 +12154,8 @@ void wsrep_thd_binlog_stmt_rollback(THD * thd)
|
||||
binlog_cache_mngr *const cache_mngr= thd->binlog_get_cache_mngr();
|
||||
if (cache_mngr)
|
||||
{
|
||||
thd->binlog_remove_pending_rows_event(TRUE, TRUE);
|
||||
MYSQL_BIN_LOG::remove_pending_rows_event(thd, &cache_mngr->trx_cache);
|
||||
thd->reset_binlog_for_next_statement();
|
||||
cache_mngr->stmt_cache.reset();
|
||||
}
|
||||
DBUG_VOID_RETURN;
|
||||
|
44
sql/log.h
44
sql/log.h
@ -350,6 +350,31 @@ public:
|
||||
enum cache_type io_cache_type_arg);
|
||||
};
|
||||
|
||||
/**
|
||||
@struct Rows_event_factory
|
||||
|
||||
Holds an event type code and a callback function to create it.
|
||||
Should be created by Rows_event_factory::get.
|
||||
*/
|
||||
struct Rows_event_factory
|
||||
{
|
||||
int type_code;
|
||||
|
||||
Rows_log_event *(*create)(THD*, TABLE*, ulong, bool is_transactional);
|
||||
|
||||
template<class RowsEventT>
|
||||
static Rows_event_factory get()
|
||||
{
|
||||
return { RowsEventT::TYPE_CODE,
|
||||
[](THD* thd, TABLE* table, ulong flags, bool is_transactional)
|
||||
-> Rows_log_event*
|
||||
{
|
||||
return new RowsEventT(thd, table, flags, is_transactional);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
/* Tell the io thread if we can delay the master info sync. */
|
||||
#define SEMI_SYNC_SLAVE_DELAY_SYNC 1
|
||||
/* Tell the io thread if the current event needs a ack. */
|
||||
@ -419,6 +444,7 @@ private:
|
||||
#define BINLOG_COOKIE_IS_DUMMY(c) \
|
||||
( ((ulong)(c)>>1) == BINLOG_COOKIE_DUMMY_ID )
|
||||
|
||||
|
||||
class binlog_cache_mngr;
|
||||
class binlog_cache_data;
|
||||
struct rpl_gtid;
|
||||
@ -723,11 +749,18 @@ public:
|
||||
Format_description_log_event *fdle, bool do_xa);
|
||||
int do_binlog_recovery(const char *opt_name, bool do_xa_recovery);
|
||||
#if !defined(MYSQL_CLIENT)
|
||||
Rows_log_event*
|
||||
prepare_pending_rows_event(THD *thd, TABLE* table,
|
||||
binlog_cache_data *cache_data,
|
||||
uint32 serv_id, size_t needed,
|
||||
bool is_transactional,
|
||||
Rows_event_factory event_factory);
|
||||
|
||||
int flush_and_set_pending_rows_event(THD *thd, Rows_log_event* event,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
binlog_cache_data *cache_data,
|
||||
bool is_transactional);
|
||||
int remove_pending_rows_event(THD *thd, bool is_transactional);
|
||||
|
||||
static int remove_pending_rows_event(THD *thd, binlog_cache_data *cache_data);
|
||||
|
||||
#endif /* !defined(MYSQL_CLIENT) */
|
||||
void reset_bytes_written()
|
||||
@ -824,7 +857,7 @@ public:
|
||||
void write_binlog_checkpoint_event_already_locked(const char *name, uint len);
|
||||
int write_cache(THD *thd, IO_CACHE *cache);
|
||||
void set_write_error(THD *thd, bool is_transactional);
|
||||
bool check_write_error(THD *thd);
|
||||
static bool check_write_error(THD *thd);
|
||||
|
||||
void start_union_events(THD *thd, query_id_t query_id_param);
|
||||
void stop_union_events(THD *thd);
|
||||
@ -1179,10 +1212,11 @@ bool write_annotated_row(THD *thd);
|
||||
int binlog_flush_pending_rows_event(THD *thd, bool stmt_end,
|
||||
bool is_transactional,
|
||||
MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
bool use_trans_cache);
|
||||
binlog_cache_data *cache_data);
|
||||
Rows_log_event* binlog_get_pending_rows_event(binlog_cache_mngr *cache_mngr,
|
||||
bool use_trans_cache);
|
||||
binlog_cache_data* binlog_get_cache_data(binlog_cache_mngr *cache_mngr,
|
||||
bool use_trans_cache);
|
||||
|
||||
extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log;
|
||||
extern handlerton *binlog_hton;
|
||||
|
@ -4927,14 +4927,14 @@ public:
|
||||
#if defined(MYSQL_SERVER)
|
||||
static bool binlog_row_logging_function(THD *thd, TABLE *table,
|
||||
MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
binlog_cache_data *cache_data,
|
||||
bool is_transactional,
|
||||
const uchar *before_record
|
||||
__attribute__((unused)),
|
||||
const uchar *after_record)
|
||||
{
|
||||
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
|
||||
return thd->binlog_write_row(table, bin_log, cache_mngr, is_transactional,
|
||||
return thd->binlog_write_row(table, bin_log, cache_data, is_transactional,
|
||||
after_record);
|
||||
}
|
||||
#endif
|
||||
@ -5013,13 +5013,13 @@ public:
|
||||
#ifdef MYSQL_SERVER
|
||||
static bool binlog_row_logging_function(THD *thd, TABLE *table,
|
||||
MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
binlog_cache_data *cache_data,
|
||||
bool is_transactional,
|
||||
const uchar *before_record,
|
||||
const uchar *after_record)
|
||||
{
|
||||
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
|
||||
return thd->binlog_update_row(table, bin_log, cache_mngr, is_transactional,
|
||||
return thd->binlog_update_row(table, bin_log, cache_data, is_transactional,
|
||||
before_record, after_record);
|
||||
}
|
||||
#endif
|
||||
@ -5104,14 +5104,14 @@ public:
|
||||
#ifdef MYSQL_SERVER
|
||||
static bool binlog_row_logging_function(THD *thd, TABLE *table,
|
||||
MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
binlog_cache_data *cache_data,
|
||||
bool is_transactional,
|
||||
const uchar *before_record,
|
||||
const uchar *after_record
|
||||
__attribute__((unused)))
|
||||
{
|
||||
DBUG_ASSERT(!table->versioned(VERS_TRX_ID));
|
||||
return thd->binlog_delete_row(table, bin_log, cache_mngr, is_transactional,
|
||||
return thd->binlog_delete_row(table, bin_log, cache_data, is_transactional,
|
||||
before_record);
|
||||
}
|
||||
#endif
|
||||
|
178
sql/sql_class.cc
178
sql/sql_class.cc
@ -6899,116 +6899,6 @@ bool THD::binlog_table_should_be_logged(const LEX_CSTRING *db)
|
||||
binlog_filter->db_ok(db->str)));
|
||||
}
|
||||
|
||||
struct RowsEventFactory
|
||||
{
|
||||
int type_code;
|
||||
|
||||
Rows_log_event *(*create)(THD*, TABLE*, ulong, bool is_transactional);
|
||||
};
|
||||
|
||||
/**
|
||||
Creates RowsEventFactory, responsible for creating Rows_log_event descendant.
|
||||
@tparam RowsEventT is a type which will be constructed by
|
||||
RowsEventFactory::create.
|
||||
@return a RowsEventFactory object with type_code equal to RowsEventT::TYPE_CODE
|
||||
and create containing pointer to a RowsEventT constructor callback.
|
||||
*/
|
||||
template<class RowsEventT>
|
||||
static RowsEventFactory binlog_get_rows_event_creator()
|
||||
{
|
||||
return { RowsEventT::TYPE_CODE,
|
||||
[](THD* thd, TABLE* table, ulong flags, bool is_transactional)
|
||||
-> Rows_log_event*
|
||||
{
|
||||
return new RowsEventT(thd, table, flags, is_transactional);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/*
|
||||
Template member function for ensuring that there is an rows log
|
||||
event of the apropriate type before proceeding.
|
||||
|
||||
PRE CONDITION:
|
||||
- Events of type 'RowEventT' have the type code 'type_code'.
|
||||
|
||||
POST CONDITION:
|
||||
If a non-NULL pointer is returned, the pending event for thread 'thd' will
|
||||
be an event of type 'RowEventT' (which have the type code 'type_code')
|
||||
will either empty or have enough space to hold 'needed' bytes. In
|
||||
addition, the columns bitmap will be correct for the row, meaning that
|
||||
the pending event will be flushed if the columns in the event differ from
|
||||
the columns suppled to the function.
|
||||
|
||||
RETURNS
|
||||
If no error, a non-NULL pending event (either one which already existed or
|
||||
the newly created one).
|
||||
If error, NULL.
|
||||
*/
|
||||
|
||||
Rows_log_event*
|
||||
binlog_prepare_pending_rows_event(THD *thd, TABLE* table,
|
||||
MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr,
|
||||
uint32 serv_id, size_t needed,
|
||||
bool is_transactional,
|
||||
RowsEventFactory event_factory)
|
||||
{
|
||||
DBUG_ENTER("binlog_prepare_pending_rows_event");
|
||||
/* Pre-conditions */
|
||||
DBUG_ASSERT(table->s->table_map_id != ~0UL);
|
||||
|
||||
/*
|
||||
There is no good place to set up the transactional data, so we
|
||||
have to do it here.
|
||||
*/
|
||||
Rows_log_event* pending= binlog_get_pending_rows_event(cache_mngr,
|
||||
use_trans_cache(thd,
|
||||
is_transactional));
|
||||
|
||||
if (unlikely(pending && !pending->is_valid()))
|
||||
DBUG_RETURN(NULL);
|
||||
|
||||
/*
|
||||
Check if the current event is non-NULL and a write-rows
|
||||
event. Also check if the table provided is mapped: if it is not,
|
||||
then we have switched to writing to a new table.
|
||||
If there is no pending event, we need to create one. If there is a pending
|
||||
event, but it's not about the same table id, or not of the same type
|
||||
(between Write, Update and Delete), or not the same affected columns, or
|
||||
going to be too big, flush this event to disk and create a new pending
|
||||
event.
|
||||
*/
|
||||
if (!pending ||
|
||||
pending->server_id != serv_id ||
|
||||
pending->get_table_id() != table->s->table_map_id ||
|
||||
pending->get_general_type_code() != event_factory.type_code ||
|
||||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
|
||||
pending->read_write_bitmaps_cmp(table) == FALSE)
|
||||
{
|
||||
/* Create a new RowsEventT... */
|
||||
Rows_log_event* const
|
||||
ev= event_factory.create(thd, table, table->s->table_map_id,
|
||||
is_transactional);
|
||||
if (unlikely(!ev))
|
||||
DBUG_RETURN(NULL);
|
||||
ev->server_id= serv_id; // I don't like this, it's too easy to forget.
|
||||
/*
|
||||
flush the pending event and replace it with the newly created
|
||||
event...
|
||||
*/
|
||||
if (unlikely(bin_log->flush_and_set_pending_rows_event(thd, ev, cache_mngr,
|
||||
is_transactional)))
|
||||
{
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
DBUG_RETURN(ev); /* This is the new pending event */
|
||||
}
|
||||
DBUG_RETURN(pending); /* This is the current pending event */
|
||||
}
|
||||
|
||||
/* Declare in unnamed namespace. */
|
||||
CPP_UNNAMED_NS_START
|
||||
/**
|
||||
@ -7134,7 +7024,7 @@ CPP_UNNAMED_NS_START
|
||||
CPP_UNNAMED_NS_END
|
||||
|
||||
int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr, bool is_trans,
|
||||
binlog_cache_data *cache_data, bool is_trans,
|
||||
uchar const *record)
|
||||
{
|
||||
/*
|
||||
@ -7151,13 +7041,12 @@ int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
size_t const len= pack_row(table, table->rpl_write_set, row_data, record);
|
||||
|
||||
auto creator= binlog_should_compress(len) ?
|
||||
binlog_get_rows_event_creator<Write_rows_compressed_log_event>() :
|
||||
binlog_get_rows_event_creator<Write_rows_log_event>();
|
||||
Rows_event_factory::get<Write_rows_compressed_log_event>() :
|
||||
Rows_event_factory::get<Write_rows_log_event>();
|
||||
|
||||
auto *ev= binlog_prepare_pending_rows_event(this, table,
|
||||
&mysql_bin_log, cache_mngr,
|
||||
variables.server_id,
|
||||
len, is_trans, creator);
|
||||
auto *ev= bin_log->prepare_pending_rows_event(this, table, cache_data,
|
||||
variables.server_id,
|
||||
len, is_trans, creator);
|
||||
|
||||
if (unlikely(ev == 0))
|
||||
return HA_ERR_OUT_OF_MEM;
|
||||
@ -7166,7 +7055,7 @@ int THD::binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
}
|
||||
|
||||
int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr, bool is_trans,
|
||||
binlog_cache_data *cache_data, bool is_trans,
|
||||
const uchar *before_record,
|
||||
const uchar *after_record)
|
||||
{
|
||||
@ -7214,13 +7103,12 @@ int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
#endif
|
||||
|
||||
auto creator= binlog_should_compress(before_size + after_size) ?
|
||||
binlog_get_rows_event_creator<Update_rows_compressed_log_event>() :
|
||||
binlog_get_rows_event_creator<Update_rows_log_event>();
|
||||
auto *ev= binlog_prepare_pending_rows_event(this, table,
|
||||
&mysql_bin_log, cache_mngr,
|
||||
variables.server_id,
|
||||
before_size + after_size,
|
||||
is_trans, creator);
|
||||
Rows_event_factory::get<Update_rows_compressed_log_event>() :
|
||||
Rows_event_factory::get<Update_rows_log_event>();
|
||||
auto *ev= bin_log->prepare_pending_rows_event(this, table, cache_data,
|
||||
variables.server_id,
|
||||
before_size + after_size,
|
||||
is_trans, creator);
|
||||
|
||||
if (unlikely(ev == 0))
|
||||
return HA_ERR_OUT_OF_MEM;
|
||||
@ -7236,7 +7124,7 @@ int THD::binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
}
|
||||
|
||||
int THD::binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr, bool is_trans,
|
||||
binlog_cache_data *cache_data, bool is_trans,
|
||||
uchar const *record)
|
||||
{
|
||||
/**
|
||||
@ -7270,12 +7158,11 @@ int THD::binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
size_t const len= pack_row(table, table->read_set, row_data, record);
|
||||
|
||||
auto creator= binlog_should_compress(len) ?
|
||||
binlog_get_rows_event_creator<Delete_rows_compressed_log_event>() :
|
||||
binlog_get_rows_event_creator<Delete_rows_log_event>();
|
||||
auto *ev= binlog_prepare_pending_rows_event(this, table,
|
||||
&mysql_bin_log, cache_mngr,
|
||||
variables.server_id,
|
||||
len, is_trans, creator);
|
||||
Rows_event_factory::get<Delete_rows_compressed_log_event>() :
|
||||
Rows_event_factory::get<Delete_rows_log_event>();
|
||||
auto *ev= mysql_bin_log.prepare_pending_rows_event(this, table, cache_data,
|
||||
variables.server_id,
|
||||
len, is_trans, creator);
|
||||
|
||||
if (unlikely(ev == 0))
|
||||
return HA_ERR_OUT_OF_MEM;
|
||||
@ -7355,28 +7242,6 @@ void THD::binlog_prepare_row_images(TABLE *table)
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int THD::binlog_remove_pending_rows_event(bool reset_stmt,
|
||||
bool is_transactional)
|
||||
{
|
||||
DBUG_ENTER("THD::binlog_remove_pending_rows_event");
|
||||
|
||||
if(!WSREP_EMULATE_BINLOG_NNULL(this) && !mysql_bin_log.is_open())
|
||||
DBUG_RETURN(0);
|
||||
|
||||
/* Ensure that all events in a GTID group are in the same cache */
|
||||
if (variables.option_bits & OPTION_GTID_BEGIN)
|
||||
is_transactional= 1;
|
||||
|
||||
mysql_bin_log.remove_pending_rows_event(this, is_transactional);
|
||||
|
||||
if (reset_stmt)
|
||||
reset_binlog_for_next_statement();
|
||||
DBUG_RETURN(0);
|
||||
}
|
||||
|
||||
|
||||
int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
|
||||
{
|
||||
DBUG_ENTER("THD::binlog_flush_pending_rows_event");
|
||||
@ -7395,11 +7260,12 @@ int THD::binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional)
|
||||
auto *cache_mngr= binlog_get_cache_mngr();
|
||||
if (!cache_mngr)
|
||||
DBUG_RETURN(0);
|
||||
auto *cache= binlog_get_cache_data(cache_mngr,
|
||||
use_trans_cache(this, is_transactional));
|
||||
|
||||
int error=
|
||||
::binlog_flush_pending_rows_event(this, stmt_end, is_transactional,
|
||||
&mysql_bin_log, cache_mngr,
|
||||
use_trans_cache(this, is_transactional));
|
||||
&mysql_bin_log, cache);
|
||||
DBUG_RETURN(error);
|
||||
}
|
||||
|
||||
|
@ -2947,13 +2947,13 @@ public:
|
||||
void binlog_start_trans_and_stmt();
|
||||
void binlog_set_stmt_begin();
|
||||
int binlog_write_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr, bool is_transactional,
|
||||
binlog_cache_data *cache_data, bool is_transactional,
|
||||
const uchar *buf);
|
||||
int binlog_delete_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr, bool is_transactional,
|
||||
binlog_cache_data *cache_data, bool is_transactional,
|
||||
const uchar *buf);
|
||||
int binlog_update_row(TABLE* table, MYSQL_BIN_LOG *bin_log,
|
||||
binlog_cache_mngr *cache_mngr, bool is_transactional,
|
||||
binlog_cache_data *cache_data, bool is_transactional,
|
||||
const uchar *old_data, const uchar *new_data);
|
||||
bool prepare_handlers_for_update(uint flag);
|
||||
bool binlog_write_annotated_row(Log_event_writer *writer);
|
||||
@ -2968,14 +2968,12 @@ public:
|
||||
Member functions to handle pending event for row-level logging.
|
||||
*/
|
||||
binlog_cache_mngr *binlog_get_cache_mngr() const;
|
||||
void binlog_set_pending_rows_event(Rows_log_event* ev, bool use_trans_cache);
|
||||
inline int binlog_flush_pending_rows_event(bool stmt_end)
|
||||
{
|
||||
return (binlog_flush_pending_rows_event(stmt_end, FALSE) ||
|
||||
binlog_flush_pending_rows_event(stmt_end, TRUE));
|
||||
}
|
||||
int binlog_flush_pending_rows_event(bool stmt_end, bool is_transactional);
|
||||
int binlog_remove_pending_rows_event(bool clear_maps, bool is_transactional);
|
||||
|
||||
bool binlog_need_stmt_format(bool is_transactional) const
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user