perfschema table io instrumentation related changes

This commit is contained in:
Sergei Golubchik 2020-02-14 16:40:49 +01:00
parent cea187e349
commit 0d837e8153
7 changed files with 209 additions and 33 deletions

View File

@ -2702,6 +2702,30 @@ void handler::rebind_psi()
}
void handler::start_psi_batch_mode()
{
#ifdef HAVE_PSI_TABLE_INTERFACE
DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_NONE);
DBUG_ASSERT(m_psi_locker == NULL);
m_psi_batch_mode= PSI_BATCH_MODE_STARTING;
m_psi_numrows= 0;
#endif
}
void handler::end_psi_batch_mode()
{
#ifdef HAVE_PSI_TABLE_INTERFACE
DBUG_ASSERT(m_psi_batch_mode != PSI_BATCH_MODE_NONE);
if (m_psi_locker != NULL)
{
DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_STARTED);
PSI_TABLE_CALL(end_table_io_wait)(m_psi_locker, m_psi_numrows);
m_psi_locker= NULL;
}
m_psi_batch_mode= PSI_BATCH_MODE_NONE;
#endif
}
PSI_table_share *handler::ha_table_share_psi() const
{
return table_share->m_psi;
@ -2791,8 +2815,10 @@ int handler::ha_close(void)
*/
if (table->in_use)
status_var_add(table->in_use->status_var.rows_tmp_read, rows_tmp_read);
PSI_CALL_close_table(m_psi);
PSI_CALL_close_table(table_share, m_psi);
m_psi= NULL; /* instrumentation handle, invalid after close_table() */
DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_NONE);
DBUG_ASSERT(m_psi_locker == NULL);
/* Detach from ANALYZE tracker */
tracker= NULL;
@ -2813,7 +2839,7 @@ int handler::ha_rnd_next(uchar *buf)
do
{
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, MAX_KEY, result,
{ result= rnd_next(buf); })
if (result != HA_ERR_RECORD_DELETED)
break;
@ -2845,7 +2871,7 @@ int handler::ha_rnd_pos(uchar *buf, uchar *pos)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited == RND);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, MAX_KEY, result,
{ result= rnd_pos(buf, pos); })
increment_statistics(&SSV::ha_read_rnd_count);
if (result == HA_ERR_RECORD_DELETED)
@ -2870,7 +2896,7 @@ int handler::ha_index_read_map(uchar *buf, const uchar *key,
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result,
{ result= index_read_map(buf, key, keypart_map, find_flag); })
increment_statistics(&SSV::ha_read_key_count);
if (!result)
@ -2898,7 +2924,7 @@ int handler::ha_index_read_idx_map(uchar *buf, uint index, const uchar *key,
DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE ||
m_lock_type != F_UNLCK);
DBUG_ASSERT(end_range == NULL);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, index, result,
{ result= index_read_idx_map(buf, index, key, keypart_map, find_flag); })
increment_statistics(&SSV::ha_read_key_count);
if (!result)
@ -2920,7 +2946,7 @@ int handler::ha_index_next(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result,
{ result= index_next(buf); })
increment_statistics(&SSV::ha_read_next_count);
if (!result)
@ -2941,7 +2967,7 @@ int handler::ha_index_prev(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result,
{ result= index_prev(buf); })
increment_statistics(&SSV::ha_read_prev_count);
if (!result)
@ -2961,7 +2987,7 @@ int handler::ha_index_first(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result,
{ result= index_first(buf); })
increment_statistics(&SSV::ha_read_first_count);
if (!result)
@ -2981,7 +3007,7 @@ int handler::ha_index_last(uchar * buf)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result,
{ result= index_last(buf); })
increment_statistics(&SSV::ha_read_last_count);
if (!result)
@ -3001,7 +3027,7 @@ int handler::ha_index_next_same(uchar *buf, const uchar *key, uint keylen)
m_lock_type != F_UNLCK);
DBUG_ASSERT(inited==INDEX);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result,
{ result= index_next_same(buf, key, keylen); })
increment_statistics(&SSV::ha_read_next_count);
if (!result)
@ -6390,7 +6416,7 @@ int handler::ha_external_lock(THD *thd, int lock_type)
We cache the table flags if the locking succeeded. Otherwise, we
keep them as they were when they were fetched in ha_open().
*/
MYSQL_TABLE_LOCK_WAIT(m_psi, PSI_TABLE_EXTERNAL_LOCK, lock_type,
MYSQL_TABLE_LOCK_WAIT(PSI_TABLE_EXTERNAL_LOCK, lock_type,
{ error= external_lock(thd, lock_type); })
DBUG_EXECUTE_IF("external_lock_failure", error= HA_ERR_GENERIC;);
@ -6649,13 +6675,13 @@ int handler::ha_write_row(const uchar *buf)
if (table->s->long_unique_table && this == table->file)
{
if (this->inited == RND)
if (inited == RND)
table->clone_handler_for_update();
handler *h= table->update_handler ? table->update_handler : table->file;
if ((error= check_duplicate_long_entries(table, h, buf)))
DBUG_RETURN(error);
}
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_WRITE_ROW, MAX_KEY, error,
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
@ -6700,7 +6726,7 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data)
return error;
}
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error,
{ error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error);
@ -6763,7 +6789,7 @@ int handler::ha_delete_row(const uchar *buf)
mark_trx_read_write();
increment_statistics(&SSV::ha_delete_count);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_DELETE_ROW, active_index, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_DELETE_ROW, active_index, error,
{ error= delete_row(buf);})
MYSQL_DELETE_ROW_DONE(error);
if (likely(!error))

View File

@ -3115,8 +3115,59 @@ public:
*/
PSI_table *m_psi;
private:
/** Internal state of the batch instrumentation. */
enum batch_mode_t
{
/** Batch mode not used. */
PSI_BATCH_MODE_NONE,
/** Batch mode used, before first table io. */
PSI_BATCH_MODE_STARTING,
/** Batch mode used, after first table io. */
PSI_BATCH_MODE_STARTED
};
/**
Batch mode state.
@sa start_psi_batch_mode.
@sa end_psi_batch_mode.
*/
batch_mode_t m_psi_batch_mode;
/**
The number of rows in the batch.
@sa start_psi_batch_mode.
@sa end_psi_batch_mode.
*/
ulonglong m_psi_numrows;
/**
The current event in a batch.
@sa start_psi_batch_mode.
@sa end_psi_batch_mode.
*/
PSI_table_locker *m_psi_locker;
/**
Storage for the event in a batch.
@sa start_psi_batch_mode.
@sa end_psi_batch_mode.
*/
PSI_table_locker_state m_psi_locker_state;
public:
virtual void unbind_psi();
virtual void rebind_psi();
/**
Put the handler in 'batch' mode when collecting
table io instrumented events.
When operating in batch mode:
- a single start event is generated in the performance schema.
- all table io performed between @c start_psi_batch_mode
and @c end_psi_batch_mode is not instrumented:
the number of rows affected is counted instead in @c m_psi_numrows.
- a single end event is generated in the performance schema
when the batch mode ends with @c end_psi_batch_mode.
*/
void start_psi_batch_mode();
/** End a batch started with @c start_psi_batch_mode. */
void end_psi_batch_mode();
bool set_top_table_fields;
struct TABLE *top_table;
@ -3163,7 +3214,11 @@ public:
pushed_rowid_filter(NULL),
rowid_filter_is_active(0),
auto_inc_intervals_count(0),
m_psi(NULL), set_top_table_fields(FALSE), top_table(0),
m_psi(NULL),
m_psi_batch_mode(PSI_BATCH_MODE_NONE),
m_psi_numrows(0),
m_psi_locker(NULL),
set_top_table_fields(FALSE), top_table(0),
top_table_field(0), top_table_fields(0),
m_lock_type(F_UNLCK), ha_share(NULL), m_prev_insert_id(0)
{
@ -4036,11 +4091,10 @@ public:
virtual my_bool register_query_cache_table(THD *thd, const char *table_key,
uint key_length,
qc_engine_callback
*engine_callback,
qc_engine_callback *callback,
ulonglong *engine_data)
{
*engine_callback= 0;
*callback= 0;
return TRUE;
}
@ -5007,7 +5061,8 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal);
#endif
/* these are called by storage engines */
void trans_register_ha(THD *thd, bool all, handlerton *ht);
void trans_register_ha(THD *thd, bool all, handlerton *ht,
const ulonglong *trxid);
/*
Storage engine has to assume the transaction will end up with 2pc if
@ -5032,13 +5087,82 @@ int binlog_log_row(TABLE* table,
const uchar *after_record,
Log_func *log_func);
#define TABLE_IO_WAIT(TRACKER, PSI, OP, INDEX, FLAGS, PAYLOAD) \
/**
@def MYSQL_TABLE_IO_WAIT
Instrumentation helper for table io_waits.
Note that this helper is intended to be used from
within the handler class only, as it uses members
from @c handler
Performance schema events are instrumented as follows:
- in non batch mode, one event is generated per call
- in batch mode, the number of rows affected is saved
in @c m_psi_numrows, so that @c end_psi_batch_mode()
generates a single event for the batch.
@param OP the table operation to be performed
@param INDEX the table index used if any, or MAX_KEY.
@param PAYLOAD instrumented code to execute
@sa handler::end_psi_batch_mode.
*/
#ifdef HAVE_PSI_TABLE_INTERFACE
#define MYSQL_TABLE_IO_WAIT(OP, INDEX, RESULT, PAYLOAD) \
{ \
if (m_psi != NULL) \
{ \
switch (m_psi_batch_mode) \
{ \
case PSI_BATCH_MODE_NONE: \
{ \
PSI_table_locker *sub_locker= NULL; \
PSI_table_locker_state reentrant_safe_state; \
sub_locker= PSI_TABLE_CALL(start_table_io_wait) \
(& reentrant_safe_state, m_psi, OP, INDEX, \
__FILE__, __LINE__); \
PAYLOAD \
if (sub_locker != NULL) \
PSI_TABLE_CALL(end_table_io_wait) \
(sub_locker, 1); \
break; \
} \
case PSI_BATCH_MODE_STARTING: \
{ \
m_psi_locker= PSI_TABLE_CALL(start_table_io_wait) \
(& m_psi_locker_state, m_psi, OP, INDEX, \
__FILE__, __LINE__); \
PAYLOAD \
if (!RESULT) \
m_psi_numrows++; \
m_psi_batch_mode= PSI_BATCH_MODE_STARTED; \
break; \
} \
case PSI_BATCH_MODE_STARTED: \
default: \
{ \
DBUG_ASSERT(m_psi_batch_mode \
== PSI_BATCH_MODE_STARTED); \
PAYLOAD \
if (!RESULT) \
m_psi_numrows++; \
break; \
} \
} \
} \
else \
{ \
PAYLOAD \
} \
}
#else
#define MYSQL_TABLE_IO_WAIT(OP, INDEX, RESULT, PAYLOAD) \
PAYLOAD
#endif
#define TABLE_IO_WAIT(TRACKER, OP, INDEX, RESULT, PAYLOAD) \
{ \
Exec_time_tracker *this_tracker; \
if (unlikely((this_tracker= tracker))) \
tracker->start_tracking(table->in_use); \
\
MYSQL_TABLE_IO_WAIT(PSI, OP, INDEX, FLAGS, PAYLOAD); \
MYSQL_TABLE_IO_WAIT(OP, INDEX, RESULT, PAYLOAD); \
\
if (unlikely(this_tracker)) \
tracker->stop_tracking(table->in_use); \

View File

@ -6867,8 +6867,8 @@ inline int handler::ha_write_tmp_row(uchar *buf)
int error;
MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str);
increment_statistics(&SSV::ha_tmp_write_count);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0,
{ error= write_row(buf); })
TABLE_IO_WAIT(tracker, PSI_TABLE_WRITE_ROW, MAX_KEY, error,
{ error= write_row(buf); })
MYSQL_INSERT_ROW_DONE(error);
return error;
}
@ -6878,7 +6878,7 @@ inline int handler::ha_delete_tmp_row(uchar *buf)
int error;
MYSQL_DELETE_ROW_START(table_share->db.str, table_share->table_name.str);
increment_statistics(&SSV::ha_tmp_delete_count);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_DELETE_ROW, MAX_KEY, 0,
TABLE_IO_WAIT(tracker, PSI_TABLE_DELETE_ROW, MAX_KEY, error,
{ error= delete_row(buf); })
MYSQL_DELETE_ROW_DONE(error);
return error;
@ -6889,13 +6889,12 @@ inline int handler::ha_update_tmp_row(const uchar *old_data, uchar *new_data)
int error;
MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str);
increment_statistics(&SSV::ha_tmp_update_count);
TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0,
{ error= update_row(old_data, new_data);})
TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error,
{ error= update_row(old_data, new_data);})
MYSQL_UPDATE_ROW_DONE(error);
return error;
}
extern pthread_attr_t *get_connection_attrib(void);
/**

View File

@ -2085,8 +2085,13 @@ enum_nested_loop_state JOIN_CACHE::join_records(bool skip_last)
if (!join_tab->first_unmatched)
{
bool pfs_batch_update= join_tab->pfs_batch_update(join);
if (pfs_batch_update)
join_tab->table->file->start_psi_batch_mode();
/* Find all records from join_tab that match records from join buffer */
rc= join_matching_records(skip_last);
if (pfs_batch_update)
join_tab->table->file->end_psi_batch_mode();
if (rc != NESTED_LOOP_OK && rc != NESTED_LOOP_NO_MORE_ROWS)
goto finish;
if (outer_join_first_inner)

View File

@ -13520,6 +13520,21 @@ bool JOIN_TAB::preread_init()
}
bool JOIN_TAB::pfs_batch_update(JOIN *join)
{
/*
Use PFS batch mode if
1. tab is an inner-most table, or
2. will read more than one row (not eq_ref or const access type)
3. no subqueries
*/
return join->join_tab + join->table_count - 1 == this && // 1
type != JT_EQ_REF && type != JT_CONST && type != JT_SYSTEM && // 2
(!select_cond || !select_cond->with_subquery()); // 3
}
/**
Build a TABLE_REF structure for index lookup in the temporary table
@ -20503,6 +20518,10 @@ sub_select(JOIN *join,JOIN_TAB *join_tab,bool end_of_records)
if (join_tab->loosescan_match_tab)
join_tab->loosescan_match_tab->found_match= FALSE;
const bool pfs_batch_update= join_tab->pfs_batch_update(join);
if (pfs_batch_update)
join_tab->table->file->start_psi_batch_mode();
if (rc != NESTED_LOOP_NO_MORE_ROWS)
{
error= (*join_tab->read_first_record)(join_tab);
@ -20554,6 +20573,9 @@ sub_select(JOIN *join,JOIN_TAB *join_tab,bool end_of_records)
join_tab->last_inner && !join_tab->found)
rc= evaluate_null_complemented_join_record(join, join_tab);
if (pfs_batch_update)
join_tab->table->file->end_psi_batch_mode();
if (rc == NESTED_LOOP_NO_MORE_ROWS)
rc= NESTED_LOOP_OK;
DBUG_RETURN(rc);

View File

@ -635,6 +635,8 @@ typedef struct st_join_table {
ha_rows get_examined_rows();
bool preread_init();
bool pfs_batch_update(JOIN *join);
bool is_sjm_nest() { return MY_TEST(bush_children); }
/*

View File

@ -8345,11 +8345,9 @@ int ha_rocksdb::read_range_first(const key_range *const start_key,
#endif
increment_statistics(&SSV::ha_read_key_count);
MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, {
result =
index_read_map_impl(table->record[0], start_key->key,
start_key->keypart_map, start_key->flag, end_key);
})
result =
index_read_map_impl(table->record[0], start_key->key,
start_key->keypart_map, start_key->flag, end_key);
}
if (result) {
DBUG_RETURN((result == HA_ERR_KEY_NOT_FOUND) ? HA_ERR_END_OF_FILE : result);