diff --git a/sql/handler.cc b/sql/handler.cc index efcccc9f0c6..35efde7a78c 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -2702,6 +2702,30 @@ void handler::rebind_psi() } +void handler::start_psi_batch_mode() +{ +#ifdef HAVE_PSI_TABLE_INTERFACE + DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_NONE); + DBUG_ASSERT(m_psi_locker == NULL); + m_psi_batch_mode= PSI_BATCH_MODE_STARTING; + m_psi_numrows= 0; +#endif +} + +void handler::end_psi_batch_mode() +{ +#ifdef HAVE_PSI_TABLE_INTERFACE + DBUG_ASSERT(m_psi_batch_mode != PSI_BATCH_MODE_NONE); + if (m_psi_locker != NULL) + { + DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_STARTED); + PSI_TABLE_CALL(end_table_io_wait)(m_psi_locker, m_psi_numrows); + m_psi_locker= NULL; + } + m_psi_batch_mode= PSI_BATCH_MODE_NONE; +#endif +} + PSI_table_share *handler::ha_table_share_psi() const { return table_share->m_psi; @@ -2791,8 +2815,10 @@ int handler::ha_close(void) */ if (table->in_use) status_var_add(table->in_use->status_var.rows_tmp_read, rows_tmp_read); - PSI_CALL_close_table(m_psi); + PSI_CALL_close_table(table_share, m_psi); m_psi= NULL; /* instrumentation handle, invalid after close_table() */ + DBUG_ASSERT(m_psi_batch_mode == PSI_BATCH_MODE_NONE); + DBUG_ASSERT(m_psi_locker == NULL); /* Detach from ANALYZE tracker */ tracker= NULL; @@ -2813,7 +2839,7 @@ int handler::ha_rnd_next(uchar *buf) do { - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, MAX_KEY, result, { result= rnd_next(buf); }) if (result != HA_ERR_RECORD_DELETED) break; @@ -2845,7 +2871,7 @@ int handler::ha_rnd_pos(uchar *buf, uchar *pos) m_lock_type != F_UNLCK); DBUG_ASSERT(inited == RND); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, MAX_KEY, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, MAX_KEY, result, { result= rnd_pos(buf, pos); }) increment_statistics(&SSV::ha_read_rnd_count); if (result == HA_ERR_RECORD_DELETED) @@ -2870,7 +2896,7 @@ int handler::ha_index_read_map(uchar *buf, const uchar *key, m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_read_map(buf, key, keypart_map, find_flag); }) increment_statistics(&SSV::ha_read_key_count); if (!result) @@ -2898,7 +2924,7 @@ int handler::ha_index_read_idx_map(uchar *buf, uint index, const uchar *key, DBUG_ASSERT(table_share->tmp_table != NO_TMP_TABLE || m_lock_type != F_UNLCK); DBUG_ASSERT(end_range == NULL); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, index, result, { result= index_read_idx_map(buf, index, key, keypart_map, find_flag); }) increment_statistics(&SSV::ha_read_key_count); if (!result) @@ -2920,7 +2946,7 @@ int handler::ha_index_next(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_next(buf); }) increment_statistics(&SSV::ha_read_next_count); if (!result) @@ -2941,7 +2967,7 @@ int handler::ha_index_prev(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_prev(buf); }) increment_statistics(&SSV::ha_read_prev_count); if (!result) @@ -2961,7 +2987,7 @@ int handler::ha_index_first(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_first(buf); }) increment_statistics(&SSV::ha_read_first_count); if (!result) @@ -2981,7 +3007,7 @@ int handler::ha_index_last(uchar * buf) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_last(buf); }) increment_statistics(&SSV::ha_read_last_count); if (!result) @@ -3001,7 +3027,7 @@ int handler::ha_index_next_same(uchar *buf, const uchar *key, uint keylen) m_lock_type != F_UNLCK); DBUG_ASSERT(inited==INDEX); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_FETCH_ROW, active_index, result, { result= index_next_same(buf, key, keylen); }) increment_statistics(&SSV::ha_read_next_count); if (!result) @@ -6390,7 +6416,7 @@ int handler::ha_external_lock(THD *thd, int lock_type) We cache the table flags if the locking succeeded. Otherwise, we keep them as they were when they were fetched in ha_open(). */ - MYSQL_TABLE_LOCK_WAIT(m_psi, PSI_TABLE_EXTERNAL_LOCK, lock_type, + MYSQL_TABLE_LOCK_WAIT(PSI_TABLE_EXTERNAL_LOCK, lock_type, { error= external_lock(thd, lock_type); }) DBUG_EXECUTE_IF("external_lock_failure", error= HA_ERR_GENERIC;); @@ -6649,13 +6675,13 @@ int handler::ha_write_row(const uchar *buf) if (table->s->long_unique_table && this == table->file) { - if (this->inited == RND) + if (inited == RND) table->clone_handler_for_update(); handler *h= table->update_handler ? table->update_handler : table->file; if ((error= check_duplicate_long_entries(table, h, buf))) DBUG_RETURN(error); } - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_WRITE_ROW, MAX_KEY, error, { error= write_row(buf); }) MYSQL_INSERT_ROW_DONE(error); @@ -6700,7 +6726,7 @@ int handler::ha_update_row(const uchar *old_data, const uchar *new_data) return error; } - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error, { error= update_row(old_data, new_data);}) MYSQL_UPDATE_ROW_DONE(error); @@ -6763,7 +6789,7 @@ int handler::ha_delete_row(const uchar *buf) mark_trx_read_write(); increment_statistics(&SSV::ha_delete_count); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_DELETE_ROW, active_index, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_DELETE_ROW, active_index, error, { error= delete_row(buf);}) MYSQL_DELETE_ROW_DONE(error); if (likely(!error)) diff --git a/sql/handler.h b/sql/handler.h index 0a561ec8b3f..ab1e1590e5b 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -3115,8 +3115,59 @@ public: */ PSI_table *m_psi; +private: + /** Internal state of the batch instrumentation. */ + enum batch_mode_t + { + /** Batch mode not used. */ + PSI_BATCH_MODE_NONE, + /** Batch mode used, before first table io. */ + PSI_BATCH_MODE_STARTING, + /** Batch mode used, after first table io. */ + PSI_BATCH_MODE_STARTED + }; + /** + Batch mode state. + @sa start_psi_batch_mode. + @sa end_psi_batch_mode. + */ + batch_mode_t m_psi_batch_mode; + /** + The number of rows in the batch. + @sa start_psi_batch_mode. + @sa end_psi_batch_mode. + */ + ulonglong m_psi_numrows; + /** + The current event in a batch. + @sa start_psi_batch_mode. + @sa end_psi_batch_mode. + */ + PSI_table_locker *m_psi_locker; + /** + Storage for the event in a batch. + @sa start_psi_batch_mode. + @sa end_psi_batch_mode. + */ + PSI_table_locker_state m_psi_locker_state; + +public: virtual void unbind_psi(); virtual void rebind_psi(); + /** + Put the handler in 'batch' mode when collecting + table io instrumented events. + When operating in batch mode: + - a single start event is generated in the performance schema. + - all table io performed between @c start_psi_batch_mode + and @c end_psi_batch_mode is not instrumented: + the number of rows affected is counted instead in @c m_psi_numrows. + - a single end event is generated in the performance schema + when the batch mode ends with @c end_psi_batch_mode. + */ + void start_psi_batch_mode(); + /** End a batch started with @c start_psi_batch_mode. */ + void end_psi_batch_mode(); bool set_top_table_fields; struct TABLE *top_table; @@ -3163,7 +3214,11 @@ public: pushed_rowid_filter(NULL), rowid_filter_is_active(0), auto_inc_intervals_count(0), - m_psi(NULL), set_top_table_fields(FALSE), top_table(0), + m_psi(NULL), + m_psi_batch_mode(PSI_BATCH_MODE_NONE), + m_psi_numrows(0), + m_psi_locker(NULL), + set_top_table_fields(FALSE), top_table(0), top_table_field(0), top_table_fields(0), m_lock_type(F_UNLCK), ha_share(NULL), m_prev_insert_id(0) { @@ -4036,11 +4091,10 @@ public: virtual my_bool register_query_cache_table(THD *thd, const char *table_key, uint key_length, - qc_engine_callback - *engine_callback, + qc_engine_callback *callback, ulonglong *engine_data) { - *engine_callback= 0; + *callback= 0; return TRUE; } @@ -5007,7 +5061,8 @@ int ha_abort_transaction(THD *bf_thd, THD *victim_thd, my_bool signal); #endif /* these are called by storage engines */ -void trans_register_ha(THD *thd, bool all, handlerton *ht); +void trans_register_ha(THD *thd, bool all, handlerton *ht, + const ulonglong *trxid); /* Storage engine has to assume the transaction will end up with 2pc if @@ -5032,13 +5087,82 @@ int binlog_log_row(TABLE* table, const uchar *after_record, Log_func *log_func); -#define TABLE_IO_WAIT(TRACKER, PSI, OP, INDEX, FLAGS, PAYLOAD) \ +/** + @def MYSQL_TABLE_IO_WAIT + Instrumentation helper for table io_waits. + Note that this helper is intended to be used from + within the handler class only, as it uses members + from @c handler + Performance schema events are instrumented as follows: + - in non batch mode, one event is generated per call + - in batch mode, the number of rows affected is saved + in @c m_psi_numrows, so that @c end_psi_batch_mode() + generates a single event for the batch. + @param OP the table operation to be performed + @param INDEX the table index used if any, or MAX_KEY. + @param PAYLOAD instrumented code to execute + @sa handler::end_psi_batch_mode. +*/ +#ifdef HAVE_PSI_TABLE_INTERFACE + #define MYSQL_TABLE_IO_WAIT(OP, INDEX, RESULT, PAYLOAD) \ + { \ + if (m_psi != NULL) \ + { \ + switch (m_psi_batch_mode) \ + { \ + case PSI_BATCH_MODE_NONE: \ + { \ + PSI_table_locker *sub_locker= NULL; \ + PSI_table_locker_state reentrant_safe_state; \ + sub_locker= PSI_TABLE_CALL(start_table_io_wait) \ + (& reentrant_safe_state, m_psi, OP, INDEX, \ + __FILE__, __LINE__); \ + PAYLOAD \ + if (sub_locker != NULL) \ + PSI_TABLE_CALL(end_table_io_wait) \ + (sub_locker, 1); \ + break; \ + } \ + case PSI_BATCH_MODE_STARTING: \ + { \ + m_psi_locker= PSI_TABLE_CALL(start_table_io_wait) \ + (& m_psi_locker_state, m_psi, OP, INDEX, \ + __FILE__, __LINE__); \ + PAYLOAD \ + if (!RESULT) \ + m_psi_numrows++; \ + m_psi_batch_mode= PSI_BATCH_MODE_STARTED; \ + break; \ + } \ + case PSI_BATCH_MODE_STARTED: \ + default: \ + { \ + DBUG_ASSERT(m_psi_batch_mode \ + == PSI_BATCH_MODE_STARTED); \ + PAYLOAD \ + if (!RESULT) \ + m_psi_numrows++; \ + break; \ + } \ + } \ + } \ + else \ + { \ + PAYLOAD \ + } \ + } +#else + #define MYSQL_TABLE_IO_WAIT(OP, INDEX, RESULT, PAYLOAD) \ + PAYLOAD +#endif + +#define TABLE_IO_WAIT(TRACKER, OP, INDEX, RESULT, PAYLOAD) \ { \ Exec_time_tracker *this_tracker; \ if (unlikely((this_tracker= tracker))) \ tracker->start_tracking(table->in_use); \ \ - MYSQL_TABLE_IO_WAIT(PSI, OP, INDEX, FLAGS, PAYLOAD); \ + MYSQL_TABLE_IO_WAIT(OP, INDEX, RESULT, PAYLOAD); \ \ if (unlikely(this_tracker)) \ tracker->stop_tracking(table->in_use); \ diff --git a/sql/sql_class.h b/sql/sql_class.h index d0facea8bdf..f74f5fb3b20 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -6867,8 +6867,8 @@ inline int handler::ha_write_tmp_row(uchar *buf) int error; MYSQL_INSERT_ROW_START(table_share->db.str, table_share->table_name.str); increment_statistics(&SSV::ha_tmp_write_count); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_WRITE_ROW, MAX_KEY, 0, - { error= write_row(buf); }) + TABLE_IO_WAIT(tracker, PSI_TABLE_WRITE_ROW, MAX_KEY, error, + { error= write_row(buf); }) MYSQL_INSERT_ROW_DONE(error); return error; } @@ -6878,7 +6878,7 @@ inline int handler::ha_delete_tmp_row(uchar *buf) int error; MYSQL_DELETE_ROW_START(table_share->db.str, table_share->table_name.str); increment_statistics(&SSV::ha_tmp_delete_count); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_DELETE_ROW, MAX_KEY, 0, + TABLE_IO_WAIT(tracker, PSI_TABLE_DELETE_ROW, MAX_KEY, error, { error= delete_row(buf); }) MYSQL_DELETE_ROW_DONE(error); return error; @@ -6889,13 +6889,12 @@ inline int handler::ha_update_tmp_row(const uchar *old_data, uchar *new_data) int error; MYSQL_UPDATE_ROW_START(table_share->db.str, table_share->table_name.str); increment_statistics(&SSV::ha_tmp_update_count); - TABLE_IO_WAIT(tracker, m_psi, PSI_TABLE_UPDATE_ROW, active_index, 0, - { error= update_row(old_data, new_data);}) + TABLE_IO_WAIT(tracker, PSI_TABLE_UPDATE_ROW, active_index, error, + { error= update_row(old_data, new_data);}) MYSQL_UPDATE_ROW_DONE(error); return error; } - extern pthread_attr_t *get_connection_attrib(void); /** diff --git a/sql/sql_join_cache.cc b/sql/sql_join_cache.cc index 9c681061118..b3d0d985582 100644 --- a/sql/sql_join_cache.cc +++ b/sql/sql_join_cache.cc @@ -2085,8 +2085,13 @@ enum_nested_loop_state JOIN_CACHE::join_records(bool skip_last) if (!join_tab->first_unmatched) { + bool pfs_batch_update= join_tab->pfs_batch_update(join); + if (pfs_batch_update) + join_tab->table->file->start_psi_batch_mode(); /* Find all records from join_tab that match records from join buffer */ rc= join_matching_records(skip_last); + if (pfs_batch_update) + join_tab->table->file->end_psi_batch_mode(); if (rc != NESTED_LOOP_OK && rc != NESTED_LOOP_NO_MORE_ROWS) goto finish; if (outer_join_first_inner) diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 9d96de96ff0..c9f40d33b90 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -13520,6 +13520,21 @@ bool JOIN_TAB::preread_init() } +bool JOIN_TAB::pfs_batch_update(JOIN *join) +{ + /* + Use PFS batch mode if + 1. tab is an inner-most table, or + 2. will read more than one row (not eq_ref or const access type) + 3. no subqueries + */ + + return join->join_tab + join->table_count - 1 == this && // 1 + type != JT_EQ_REF && type != JT_CONST && type != JT_SYSTEM && // 2 + (!select_cond || !select_cond->with_subquery()); // 3 +} + + /** Build a TABLE_REF structure for index lookup in the temporary table @@ -20503,6 +20518,10 @@ sub_select(JOIN *join,JOIN_TAB *join_tab,bool end_of_records) if (join_tab->loosescan_match_tab) join_tab->loosescan_match_tab->found_match= FALSE; + const bool pfs_batch_update= join_tab->pfs_batch_update(join); + if (pfs_batch_update) + join_tab->table->file->start_psi_batch_mode(); + if (rc != NESTED_LOOP_NO_MORE_ROWS) { error= (*join_tab->read_first_record)(join_tab); @@ -20554,6 +20573,9 @@ sub_select(JOIN *join,JOIN_TAB *join_tab,bool end_of_records) join_tab->last_inner && !join_tab->found) rc= evaluate_null_complemented_join_record(join, join_tab); + if (pfs_batch_update) + join_tab->table->file->end_psi_batch_mode(); + if (rc == NESTED_LOOP_NO_MORE_ROWS) rc= NESTED_LOOP_OK; DBUG_RETURN(rc); diff --git a/sql/sql_select.h b/sql/sql_select.h index 3788abce423..8b2df74702b 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -635,6 +635,8 @@ typedef struct st_join_table { ha_rows get_examined_rows(); bool preread_init(); + bool pfs_batch_update(JOIN *join); + bool is_sjm_nest() { return MY_TEST(bush_children); } /* diff --git a/storage/rocksdb/ha_rocksdb.cc b/storage/rocksdb/ha_rocksdb.cc index 625624d228e..71ec5eb4634 100644 --- a/storage/rocksdb/ha_rocksdb.cc +++ b/storage/rocksdb/ha_rocksdb.cc @@ -8345,11 +8345,9 @@ int ha_rocksdb::read_range_first(const key_range *const start_key, #endif increment_statistics(&SSV::ha_read_key_count); - MYSQL_TABLE_IO_WAIT(m_psi, PSI_TABLE_FETCH_ROW, active_index, 0, { - result = - index_read_map_impl(table->record[0], start_key->key, - start_key->keypart_map, start_key->flag, end_key); - }) + result = + index_read_map_impl(table->record[0], start_key->key, + start_key->keypart_map, start_key->flag, end_key); } if (result) { DBUG_RETURN((result == HA_ERR_KEY_NOT_FOUND) ? HA_ERR_END_OF_FILE : result);