MDEV-14638 - Replace trx_sys_t::rw_trx_set with LF_HASH

trx_sys_t::rw_trx_set is implemented as std::set, which does a few quite
expensive operations under trx_sys_t::mutex protection: e.g. malloc/free
when adding/removing elements. Traversing b-tree is not that cheap either.

This has negative scalability impact, which is especially visible when running
oltp_update_index.lua benchmark on a ramdisk.

To reduce trx_sys_t::mutex contention std::set is replaced with LF_HASH. None
of LF_HASH operations require trx_sys_t::mutex (nor any other global mutex)
protection.

Another interesting issue observed with std::set is reproducible ~2% performance
decline after benchmark is ran for ~60 seconds. With LF_HASH results are stable.

All in all this patch optimises away one of three trx_sys->mutex locks per
oltp_update_index.lua query. The other two critical sections became smaller.

Relevant clean-ups:

Replaced rw_trx_set iteration at startup with local set. The latter is needed
because values inserted to rw_trx_list must be ordered by trx->id.

Removed redundant conditions from trx_reference(): it is (and even was) never
called with transactions that have trx->state == TRX_STATE_COMMITTED_IN_MEMORY.
do_ref_count doesn't (and probably even didn't) make any sense: now it is called
only when reference counter increment is actually requested.

Moved condition out of mutex in trx_erase_lists().

trx_rw_is_active(), trx_rw_is_active_low() and trx_get_rw_trx_by_id() were
greatly simplified and replaced by appropriate trx_rw_hash_t methods.

Compared to rw_trx_set, rw_trx_hash holds transactions only in PREPARED or
ACTIVE states. Transactions in COMMITTED state were required to be found
at InnoDB startup only. They are now looked up in the local set.

Removed unused trx_assert_recovered().

Removed unused innobase_get_trx() declaration.

Removed rather semantically incorrect trx_sys_rw_trx_add().

Moved information printout from trx_sys_init_at_db_start() to
trx_lists_init_at_db_start().
This commit is contained in:
Sergey Vojtovich 2017-12-13 15:40:41 +04:00
parent 1a62c8a396
commit 380069c235
20 changed files with 512 additions and 469 deletions

View File

@ -482,10 +482,10 @@ inconsistent:
/* In fact, because we only ever append fields to the 'default
value' record, it is also OK to perform READ UNCOMMITTED and
then ignore any extra fields, provided that
trx_rw_is_active(DB_TRX_ID). */
trx_sys->rw_trx_hash.find(DB_TRX_ID). */
if (rec_offs_n_fields(offsets) > index->n_fields
&& !trx_rw_is_active(row_get_rec_trx_id(rec, index, offsets),
NULL, false)) {
&& !trx_sys->rw_trx_hash.find(row_get_rec_trx_id(rec, index,
offsets))) {
goto inconsistent;
}

View File

@ -443,9 +443,6 @@ bool
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
MY_ATTRIBUTE((nonnull));
/* prototypes for new functions added to ha_innodb.cc */
trx_t* innobase_get_trx();
/********************************************************************//**
Gets the smallest oldest_modification lsn for any page in the pool. Returns
zero if all modified pages have been flushed to disk.

View File

@ -2819,13 +2819,17 @@ check_trx_exists(
return(trx);
}
/*************************************************************************
Gets current trx. */
trx_t*
innobase_get_trx()
/**
Gets current trx.
This function may be called during InnoDB initialisation, when
innodb_hton_ptr->slot is not yet set to meaningful value.
*/
trx_t *current_trx()
{
THD *thd=current_thd;
if (likely(thd != 0)) {
if (likely(thd != 0) && innodb_hton_ptr->slot != HA_SLOT_UNDEF) {
trx_t*& trx = thd_to_trx(thd);
return(trx);
} else {

View File

@ -41,6 +41,7 @@ class ReadView;
/** Determine if an active transaction has inserted or modified a secondary
index record.
@param[in,out] caller_trx trx of current thread
@param[in] rec secondary index record
@param[in] index secondary index
@param[in] offsets rec_get_offsets(rec, index)
@ -48,6 +49,7 @@ index record.
@retval NULL if the record was committed */
trx_t*
row_vers_impl_x_locked(
trx_t* caller_trx,
const rec_t* rec,
dict_index_t* index,
const ulint* offsets);
@ -126,6 +128,7 @@ which should be seen by a semi-consistent read. */
void
row_vers_build_for_semi_consistent_read(
/*====================================*/
trx_t* caller_trx,/*!<in/out: trx of current thread */
const rec_t* rec, /*!< in: record in a clustered index; the
caller must have a latch on the page; this
latch locks the top of the stack of versions

View File

@ -112,6 +112,7 @@ extern mysql_pfs_key_t sync_array_mutex_key;
extern mysql_pfs_key_t thread_mutex_key;
extern mysql_pfs_key_t zip_pad_mutex_key;
extern mysql_pfs_key_t row_drop_list_mutex_key;
extern mysql_pfs_key_t rw_trx_hash_element_mutex_key;
#endif /* UNIV_PFS_MUTEX */
#ifdef UNIV_PFS_RWLOCK

View File

@ -233,6 +233,7 @@ enum latch_level_t {
SYNC_REC_LOCK,
SYNC_THREADS,
SYNC_TRX,
SYNC_RW_TRX_HASH_ELEMENT,
SYNC_TRX_SYS,
SYNC_LOCK_SYS,
SYNC_LOCK_WAIT_SYS,
@ -383,6 +384,7 @@ enum latch_id_t {
LATCH_ID_FIL_CRYPT_STAT_MUTEX,
LATCH_ID_FIL_CRYPT_DATA_MUTEX,
LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
LATCH_ID_RW_TRX_HASH_ELEMENT,
LATCH_ID_TEST_MUTEX,
LATCH_ID_MAX = LATCH_ID_TEST_MUTEX
};

View File

@ -188,14 +188,6 @@ inline bool trx_id_check(const void* db_trx_id, trx_id_t trx_id)
}
#endif
/****************************************************************//**
Looks for the trx instance with the given id in the rw trx_list.
@return the trx handle or NULL if not found */
UNIV_INLINE
trx_t*
trx_get_rw_trx_by_id(
/*=================*/
trx_id_t trx_id);/*!< in: trx id to search for */
/****************************************************************//**
Returns the minimum trx id in rw trx list. This is the smallest id for which
the trx can possibly be active. (But, you must look at the trx->state to
@ -206,41 +198,6 @@ UNIV_INLINE
trx_id_t
trx_rw_min_trx_id(void);
/*===================*/
/****************************************************************//**
Checks if a rw transaction with the given id is active.
@return transaction instance if active, or NULL */
UNIV_INLINE
trx_t*
trx_rw_is_active_low(
/*=================*/
trx_id_t trx_id, /*!< in: trx id of the transaction */
ibool* corrupt); /*!< in: NULL or pointer to a flag
that will be set if corrupt */
/****************************************************************//**
Checks if a rw transaction with the given id is active. If the caller is
not holding trx_sys->mutex, the transaction may already have been
committed.
@return transaction instance if active, or NULL; */
UNIV_INLINE
trx_t*
trx_rw_is_active(
/*=============*/
trx_id_t trx_id, /*!< in: trx id of the transaction */
ibool* corrupt, /*!< in: NULL or pointer to a flag
that will be set if corrupt */
bool do_ref_count); /*!< in: if true then increment the
trx_t::n_ref_count */
#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
/***********************************************************//**
Assert that a transaction has been recovered.
@return TRUE */
UNIV_INLINE
ibool
trx_assert_recovered(
/*=================*/
trx_id_t trx_id) /*!< in: transaction identifier */
MY_ATTRIBUTE((warn_unused_result));
#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
/*****************************************************************//**
Updates the offset information about the end of the MySQL binlog entry
which corresponds to the transaction just being committed. In a MySQL
@ -302,13 +259,6 @@ ulint
trx_sys_any_active_transactions(void);
/*=================================*/
/**
Add the transaction to the RW transaction set
@param trx transaction instance to add */
UNIV_INLINE
void
trx_sys_rw_trx_add(trx_t* trx);
#ifdef UNIV_DEBUG
/*************************************************************//**
Validate the trx_sys_t::rw_trx_list.
@ -497,6 +447,281 @@ FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */
#define TRX_SYS_DOUBLEWRITE_BLOCK_SIZE FSP_EXTENT_SIZE
/* @} */
trx_t* current_trx();
struct rw_trx_hash_element_t
{
rw_trx_hash_element_t(): trx(0)
{
mutex_create(LATCH_ID_RW_TRX_HASH_ELEMENT, &mutex);
}
~rw_trx_hash_element_t()
{
mutex_free(&mutex);
}
trx_id_t id; /* lf_hash_init() relies on this to be first in the struct */
trx_t *trx;
ib_mutex_t mutex;
};
/**
Wrapper around LF_HASH to store set of in memory read-write transactions.
*/
class rw_trx_hash_t
{
LF_HASH hash;
/**
Constructor callback for lock-free allocator.
Object is just allocated and is not yet accessible via rw_trx_hash by
concurrent threads. Object can be reused multiple times before it is freed.
Every time object is being reused initializer() callback is called.
*/
static void rw_trx_hash_constructor(uchar *arg)
{
new(arg + LF_HASH_OVERHEAD) rw_trx_hash_element_t();
}
/**
Destructor callback for lock-free allocator.
Object is about to be freed and is not accessible via rw_trx_hash by
concurrent threads.
*/
static void rw_trx_hash_destructor(uchar *arg)
{
reinterpret_cast<rw_trx_hash_element_t*>
(arg + LF_HASH_OVERHEAD)->~rw_trx_hash_element_t();
}
/**
Initializer callback for lock-free hash.
Object is not yet accessible via rw_trx_hash by concurrent threads, but is
about to become such. Object id can be changed only by this callback and
remains the same until all pins to this object are released.
Object trx can be changed to 0 by erase() under object mutex protection,
which indicates it is about to be removed from lock-free hash and become
not accessible by concurrent threads.
*/
static void rw_trx_hash_initializer(LF_HASH *,
rw_trx_hash_element_t *element,
trx_t *trx)
{
element->trx= trx;
element->id= trx->id;
trx->rw_trx_hash_element= element;
}
/**
Gets LF_HASH pins.
Pins are used to protect object from being destroyed or reused. They are
normally stored in trx object for quick access. If caller doesn't have trx
available, we try to get it using currnet_trx(). If caller doesn't have trx
at all, temporary pins are allocated.
*/
LF_PINS *get_pins(trx_t *trx)
{
if (!trx->rw_trx_hash_pins)
{
trx->rw_trx_hash_pins= lf_hash_get_pins(&hash);
ut_a(trx->rw_trx_hash_pins);
}
return trx->rw_trx_hash_pins;
}
public:
void init()
{
lf_hash_init(&hash, sizeof(rw_trx_hash_element_t), LF_HASH_UNIQUE, 0,
sizeof(trx_id_t), 0, &my_charset_bin);
hash.alloc.constructor= rw_trx_hash_constructor;
hash.alloc.destructor= rw_trx_hash_destructor;
hash.initializer=
reinterpret_cast<lf_hash_initializer>(rw_trx_hash_initializer);
}
void destroy()
{
lf_hash_destroy(&hash);
}
/**
Releases LF_HASH pins.
Must be called by thread that owns trx_t object when the latter is being
"detached" from thread (e.g. released to the pool by trx_free()). Can be
called earlier if thread is expected not to use rw_trx_hash.
Since pins are not allowed to be transferred to another thread,
initialisation thread calls this for recovered transactions.
*/
void put_pins(trx_t *trx)
{
if (trx->rw_trx_hash_pins)
{
lf_hash_put_pins(trx->rw_trx_hash_pins);
trx->rw_trx_hash_pins= 0;
}
}
/**
Finds trx object in lock-free hash with given id.
Only ACTIVE or PREPARED trx objects may participate in hash. Nevertheless
the transaction may get committed before this method returns.
With do_ref_count == false the caller may dereference returned trx pointer
only if lock_sys->mutex was acquired before calling find().
With do_ref_count == true caller may dereference trx even if it is not
holding lock_sys->mutex. Caller is responsible for calling
trx_release_reference() when it is done playing with trx.
Ideally this method should get caller rw_trx_hash_pins along with trx
object as a parameter, similar to insert() and erase(). However most
callers lose trx early in their call chains and it is not that easy to pass
them through.
So we take more expensive approach: get trx through current_thd()->ha_data.
Some threads don't have trx attached to THD, and at least server
initialisation thread, fts_optimize_thread, srv_master_thread,
dict_stats_thread, srv_monitor_thread, btr_defragment_thread don't even
have THD at all. For such cases we allocate pins only for duration of
search and free them immediately.
This has negative performance impact and should be fixed eventually (by
passing caller_trx as a parameter). Still stream of DML is more or less Ok.
@return
@retval 0 not found
@retval pointer to trx
*/
trx_t *find(trx_t *caller_trx, trx_id_t trx_id, bool do_ref_count= false)
{
/*
In MariaDB 10.3, purge will reset DB_TRX_ID to 0
when the history is lost. Read/write transactions will
always have a nonzero trx_t::id; there the value 0 is
reserved for transactions that did not write or lock
anything yet.
*/
if (!trx_id)
return NULL;
trx_t *trx= 0;
LF_PINS *pins= caller_trx ? get_pins(caller_trx) : lf_hash_get_pins(&hash);
ut_a(pins);
rw_trx_hash_element_t *element= reinterpret_cast<rw_trx_hash_element_t*>
(lf_hash_search(&hash, pins, reinterpret_cast<const void*>(&trx_id),
sizeof(trx_id_t)));
if (element)
{
mutex_enter(&element->mutex);
lf_hash_search_unpin(pins);
if ((trx= element->trx))
{
if (do_ref_count)
trx_reference(trx);
#ifdef UNIV_DEBUG
mutex_enter(&trx->mutex);
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED));
mutex_exit(&trx->mutex);
#endif
}
mutex_exit(&element->mutex);
}
if (!caller_trx)
lf_hash_put_pins(pins);
return trx;
}
trx_t *find(trx_id_t trx_id, bool do_ref_count= false)
{
return find(current_trx(), trx_id, do_ref_count);
}
/**
Inserts trx to lock-free hash.
Object becomes accessible via rw_trx_hash.
*/
void insert(trx_t *trx)
{
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED));
int res= lf_hash_insert(&hash, get_pins(trx),
reinterpret_cast<void*>(trx));
ut_a(res == 0);
}
/**
Removes trx from lock-free hash.
Object becomes not accessible via rw_trx_hash. But it still can be pinned
by concurrent find(), which is supposed to release it immediately after
it sees object trx is 0.
*/
void erase(trx_t *trx)
{
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED));
mutex_enter(&trx->rw_trx_hash_element->mutex);
trx->rw_trx_hash_element->trx= 0;
mutex_exit(&trx->rw_trx_hash_element->mutex);
int res= lf_hash_delete(&hash, get_pins(trx),
reinterpret_cast<const void*>(&trx->id),
sizeof(trx_id_t));
ut_a(res == 0);
}
/**
Returns the number of elements in the hash.
The number is exact only if hash is protected against concurrent
modifications (e.g. single threaded startup or hash is protected
by some mutex). Otherwise the number may be used as a hint only,
because it may change even before this method returns.
*/
int32_t size()
{
return my_atomic_load32_explicit(&hash.count, MY_MEMORY_ORDER_RELAXED);
}
};
/** The transaction system central memory data structure. */
struct trx_sys_t {
@ -569,8 +794,16 @@ struct trx_sys_t {
transactions), protected by
rseg->mutex */
TrxIdSet rw_trx_set; /*!< Mapping from transaction id
to transaction instance */
const char rw_trx_hash_pre_pad[CACHE_LINE_SIZE];
/**
Lock-free hash of in memory read-write transactions.
Works faster when it is on it's own cache line (tested).
*/
rw_trx_hash_t rw_trx_hash;
const char rw_trx_hash_post_pad[CACHE_LINE_SIZE];
ulint n_prepared_trx; /*!< Number of transactions currently
in the XA PREPARED state */

View File

@ -192,32 +192,6 @@ trx_write_trx_id(
mach_write_to_6(ptr, id);
}
/****************************************************************//**
Looks for the trx handle with the given id in rw_trx_list.
The caller must be holding trx_sys->mutex.
@return the trx handle or NULL if not found;
the pointer must not be dereferenced unless lock_sys->mutex was
acquired before calling this function and is still being held */
UNIV_INLINE
trx_t*
trx_get_rw_trx_by_id(
/*=================*/
trx_id_t trx_id) /*!< in: trx id to search for */
{
ut_ad(trx_id > 0);
ut_ad(trx_sys_mutex_own());
if (trx_sys->rw_trx_set.empty()) {
return(NULL);
}
TrxIdSet::iterator it;
it = trx_sys->rw_trx_set.find(TrxTrack(trx_id));
return(it == trx_sys->rw_trx_set.end() ? NULL : it->m_trx);
}
/****************************************************************//**
Returns the minimum trx id in trx list. This is the smallest id for which
the trx can possibly be active. (But, you must look at the trx->state
@ -245,29 +219,6 @@ trx_rw_min_trx_id_low(void)
return(id);
}
#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
/***********************************************************//**
Assert that a transaction has been recovered.
@return TRUE */
UNIV_INLINE
ibool
trx_assert_recovered(
/*=================*/
trx_id_t trx_id) /*!< in: transaction identifier */
{
const trx_t* trx;
trx_sys_mutex_enter();
trx = trx_get_rw_trx_by_id(trx_id);
ut_a(trx->is_recovered);
trx_sys_mutex_exit();
return(TRUE);
}
#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
/****************************************************************//**
Returns the minimum trx id in rw trx list. This is the smallest id for which
the rw trx can possibly be active. (But, you must look at the trx->state
@ -288,86 +239,6 @@ trx_rw_min_trx_id(void)
return(id);
}
/****************************************************************//**
Checks if a rw transaction with the given id is active. If the caller is
not holding lock_sys->mutex, the transaction may already have been committed.
@return transaction instance if active, or NULL */
UNIV_INLINE
trx_t*
trx_rw_is_active_low(
/*=================*/
trx_id_t trx_id, /*!< in: trx id of the transaction */
ibool* corrupt) /*!< in: NULL or pointer to a flag
that will be set if corrupt */
{
trx_t* trx;
ut_ad(trx_sys_mutex_own());
if (trx_id < trx_rw_min_trx_id_low()) {
trx = NULL;
} else if (trx_id >= trx_sys->max_trx_id) {
/* There must be corruption: we let the caller handle the
diagnostic prints in this case. */
trx = NULL;
if (corrupt != NULL) {
*corrupt = TRUE;
}
} else {
trx = trx_get_rw_trx_by_id(trx_id);
if (trx != NULL
&& trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)) {
trx = NULL;
}
}
return(trx);
}
/****************************************************************//**
Checks if a rw transaction with the given id is active. If the caller is
not holding lock_sys->mutex, the transaction may already have been
committed.
@return transaction instance if active, or NULL; */
UNIV_INLINE
trx_t*
trx_rw_is_active(
/*=============*/
trx_id_t trx_id, /*!< in: trx id of the transaction */
ibool* corrupt, /*!< in: NULL or pointer to a flag
that will be set if corrupt */
bool do_ref_count) /*!< in: if true then increment the
trx_t::n_ref_count */
{
if (!trx_id) {
/* In MariaDB 10.3, purge will reset DB_TRX_ID to 0
when the history is lost. Read/write transactions will
always have a nonzero trx_t::id; there the value 0 is
reserved for transactions that did not write or lock
anything yet. */
return NULL;
}
trx_t* trx;
trx_sys_mutex_enter();
trx = trx_rw_is_active_low(trx_id, corrupt);
if (trx != 0) {
trx = trx_reference(trx, do_ref_count);
}
trx_sys_mutex_exit();
return(trx);
}
/*****************************************************************//**
Allocates a new transaction id.
@return new, allocated trx id */
@ -441,16 +312,3 @@ trx_sys_get_n_rw_trx(void)
return(n_trx);
}
/**
Add the transaction to the RW transaction set
@param trx transaction instance to add */
UNIV_INLINE
void
trx_sys_rw_trx_add(trx_t* trx)
{
ut_ad(trx->id != 0);
trx_sys->rw_trx_set.insert(TrxTrack(trx->id, trx));
ut_d(trx->in_rw_trx_list = true);
}

View File

@ -55,6 +55,8 @@ class ReadView;
// Forward declaration
class FlushObserver;
struct rw_trx_hash_element_t;
/** Dummy session used currently in MySQL interface */
extern sess_t* trx_dummy_sess;
@ -531,17 +533,12 @@ trx_set_rw_mode(
trx_t* trx);
/**
Increase the reference count. If the transaction is in state
TRX_STATE_COMMITTED_IN_MEMORY then the transaction is considered
committed and the reference count is not incremented.
@param trx Transaction that is being referenced
@param do_ref_count Increment the reference iff this is true
@return transaction instance if it is not committed */
Increase the reference count.
@param trx Transaction that is being referenced */
UNIV_INLINE
trx_t*
void
trx_reference(
trx_t* trx,
bool do_ref_count);
trx_t* trx);
/**
Release the transaction. Decrease the reference count.
@ -951,6 +948,9 @@ struct trx_t {
Recovered XA:
* NOT_STARTED -> PREPARED -> COMMITTED -> (freed)
Recovered XA followed by XA ROLLBACK:
* NOT_STARTED -> PREPARED -> ACTIVE -> COMMITTED -> (freed)
XA (2PC) (shutdown or disconnect before ROLLBACK or COMMIT):
* NOT_STARTED -> PREPARED -> (freed)
@ -1277,6 +1277,8 @@ struct trx_t {
os_event_t wsrep_event; /* event waited for in srv_conc_slot */
#endif /* WITH_WSREP */
rw_trx_hash_element_t *rw_trx_hash_element;
LF_PINS *rw_trx_hash_pins;
ulint magic_n;
/** @return whether any persistent undo log has been generated */

View File

@ -214,32 +214,14 @@ ok:
}
/**
Increase the reference count. If the transaction is in state
TRX_STATE_COMMITTED_IN_MEMORY then the transaction is considered
committed and the reference count is not incremented.
@param trx Transaction that is being referenced
@param do_ref_count Increment the reference iff this is true
@return transaction instance if it is not committed */
UNIV_INLINE
trx_t*
trx_reference(
trx_t* trx,
bool do_ref_count)
Increase the reference count.
@param trx Transaction that is being referenced */
UNIV_INLINE void trx_reference(trx_t *trx)
{
trx_mutex_enter(trx);
if (trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)) {
trx_mutex_exit(trx);
trx = NULL;
} else if (do_ref_count) {
ut_ad(trx->n_ref >= 0);
++trx->n_ref;
trx_mutex_exit(trx);
} else {
trx_mutex_exit(trx);
}
return(trx);
trx_mutex_enter(trx);
ut_ad(trx->n_ref >= 0);
++trx->n_ref;
trx_mutex_exit(trx);
}
/**

View File

@ -31,12 +31,9 @@ Created 3/26/1996 Heikki Tuuri
#include "ut0mutex.h"
#include "ut0new.h"
#include <set>
#include <queue>
#include <vector>
//#include <unordered_set>
/** printf(3) format used for printing DB_TRX_ID and other system fields */
#define TRX_ID_FMT IB_ID_FMT
@ -173,51 +170,4 @@ typedef ib_mutex_t PQMutex;
typedef ib_mutex_t TrxSysMutex;
typedef std::vector<trx_id_t, ut_allocator<trx_id_t> > trx_ids_t;
/** Mapping read-write transactions from id to transaction instance, for
creating read views and during trx id lookup for MVCC and locking. */
struct TrxTrack {
explicit TrxTrack(trx_id_t id, trx_t* trx = NULL)
:
m_id(id),
m_trx(trx)
{
// Do nothing
}
trx_id_t m_id;
trx_t* m_trx;
};
struct TrxTrackHash {
size_t operator()(const TrxTrack& key) const
{
return(size_t(key.m_id));
}
};
/**
Comparator for TrxMap */
struct TrxTrackHashCmp {
bool operator() (const TrxTrack& lhs, const TrxTrack& rhs) const
{
return(lhs.m_id == rhs.m_id);
}
};
/**
Comparator for TrxMap */
struct TrxTrackCmp {
bool operator() (const TrxTrack& lhs, const TrxTrack& rhs) const
{
return(lhs.m_id < rhs.m_id);
}
};
//typedef std::unordered_set<TrxTrack, TrxTrackHash, TrxTrackHashCmp> TrxIdSet;
typedef std::set<TrxTrack, TrxTrackCmp, ut_allocator<TrxTrack> >
TrxIdSet;
#endif /* trx0types_h */

View File

@ -1494,6 +1494,7 @@ static
trx_t*
lock_sec_rec_some_has_impl(
/*=======================*/
trx_t* caller_trx,/*!<in/out: trx of current thread */
const rec_t* rec, /*!< in: user record */
dict_index_t* index, /*!< in: secondary index */
const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
@ -1530,7 +1531,7 @@ lock_sec_rec_some_has_impl(
x-lock. We have to look in the clustered index. */
} else {
trx = row_vers_impl_x_locked(rec, index, offsets);
trx = row_vers_impl_x_locked(caller_trx, rec, index, offsets);
}
return(trx);
@ -1540,6 +1541,11 @@ lock_sec_rec_some_has_impl(
/*********************************************************************//**
Checks if some transaction, other than given trx_id, has an explicit
lock on the given rec, in the given precise_mode.
FIXME: if the current transaction holds implicit lock from INSERT, a
subsequent locking read should not convert it to explicit. See also
MDEV-11215.
@return the transaction, whose id is not equal to trx_id, that has an
explicit lock on the given rec, in the given precise_mode or NULL.*/
static
@ -1558,31 +1564,28 @@ lock_rec_other_trx_holds_expl(
ut_ad(!page_rec_is_default_row(rec));
trx_t* holds = NULL;
ulint heap_no = page_rec_get_heap_no(rec);
lock_mutex_enter();
mutex_enter(&trx_sys->mutex);
if (trx_t* impl_trx = trx_rw_is_active(trx->id, NULL, false)) {
ulint heap_no = page_rec_get_heap_no(rec);
mutex_enter(&trx_sys->mutex);
for (trx_t* t = UT_LIST_GET_FIRST(trx_sys->rw_trx_list);
t != NULL;
t = UT_LIST_GET_NEXT(trx_list, t)) {
for (trx_t* t = UT_LIST_GET_FIRST(trx_sys->rw_trx_list);
t != NULL;
t = UT_LIST_GET_NEXT(trx_list, t)) {
lock_t* expl_lock = lock_rec_has_expl(
precise_mode, block, heap_no, t);
lock_t* expl_lock = lock_rec_has_expl(
precise_mode, block, heap_no, t);
if (expl_lock && expl_lock->trx != impl_trx) {
/* An explicit lock is held by trx other than
the trx holding the implicit lock. */
holds = expl_lock->trx;
break;
}
if (expl_lock && expl_lock->trx != trx) {
/* An explicit lock is held by trx other than
the trx holding the implicit lock. */
holds = expl_lock->trx;
break;
}
mutex_exit(&trx_sys->mutex);
}
mutex_exit(&trx_sys->mutex);
lock_mutex_exit();
return(holds);
@ -6223,7 +6226,6 @@ lock_rec_queue_validate(
const dict_index_t* index, /*!< in: index, or NULL if not known */
const ulint* offsets)/*!< in: rec_get_offsets(rec, index) */
{
const trx_t* impl_trx;
const lock_t* lock;
ulint heap_no;
@ -6269,13 +6271,11 @@ lock_rec_queue_validate(
/* Nothing we can do */
} else if (dict_index_is_clust(index)) {
trx_id_t trx_id;
/* Unlike the non-debug code, this invariant can only succeed
if the check and assertion are covered by the lock mutex. */
trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);
impl_trx = trx_rw_is_active_low(trx_id, NULL);
const trx_t *impl_trx = trx_sys->rw_trx_hash.find(
lock_clust_rec_some_has_impl(rec, index, offsets));
ut_ad(lock_mutex_own());
/* impl_trx cannot be committed until lock_mutex_exit()
@ -6849,6 +6849,7 @@ static
void
lock_rec_convert_impl_to_expl(
/*==========================*/
trx_t* caller_trx,/*!<in/out: trx of current thread */
const buf_block_t* block, /*!< in: buffer block of rec */
const rec_t* rec, /*!< in: user record on page */
dict_index_t* index, /*!< in: index of record */
@ -6868,11 +6869,12 @@ lock_rec_convert_impl_to_expl(
trx_id = lock_clust_rec_some_has_impl(rec, index, offsets);
trx = trx_rw_is_active(trx_id, NULL, true);
trx = trx_sys->rw_trx_hash.find(caller_trx, trx_id, true);
} else {
ut_ad(!dict_index_is_online_ddl(index));
trx = lock_sec_rec_some_has_impl(rec, index, offsets);
trx = lock_sec_rec_some_has_impl(caller_trx, rec, index,
offsets);
ut_ad(!trx || !lock_rec_other_trx_holds_expl(
LOCK_S | LOCK_REC_NOT_GAP, trx, rec, block));
@ -6934,7 +6936,8 @@ lock_clust_rec_modify_check_and_lock(
/* If a transaction has no explicit x-lock set on the record, set one
for it */
lock_rec_convert_impl_to_expl(block, rec, index, offsets);
lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec, index,
offsets);
lock_mutex_enter();
@ -7098,7 +7101,8 @@ lock_sec_rec_read_check_and_lock(
|| recv_recovery_is_on())
&& !page_rec_is_supremum(rec)) {
lock_rec_convert_impl_to_expl(block, rec, index, offsets);
lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec,
index, offsets);
}
lock_mutex_enter();
@ -7173,7 +7177,8 @@ lock_clust_rec_read_check_and_lock(
if (heap_no != PAGE_HEAP_NO_SUPREMUM) {
lock_rec_convert_impl_to_expl(block, rec, index, offsets);
lock_rec_convert_impl_to_expl(thr_get_trx(thr), block, rec,
index, offsets);
}
lock_mutex_enter();

View File

@ -371,6 +371,7 @@ Copy the transaction ids from the source vector */
void
ReadView::copy_trx_ids(const trx_ids_t& trx_ids)
{
ut_ad(mutex_own(&trx_sys->mutex));
ulint size = trx_ids.size();
if (m_creator_trx_id > 0) {
@ -424,14 +425,24 @@ ReadView::copy_trx_ids(const trx_ids_t& trx_ids)
}
#ifdef UNIV_DEBUG
/* Assert that all transaction ids in list are active. */
/* Original assertion was here to make sure that rw_trx_ids and
rw_trx_hash are in sync and they hold either ACTIVE or PREPARED
transaction.
Now rw_trx_hash.find() does
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED)).
No need to repeat it here. We even can't repeat it here: it'll be race
condition because we need trx->element->mutex locked to perform this
check (see how it is done in find()).
Now rw_trx_ids and rw_trx_hash may get out of sync for a short while:
when transaction is registered it first gets added into rw_trx_ids
under trx_sys->mutex protection and then to rw_trx_hash without mutex
protection. Thus we need repeat this lookup. */
for (trx_ids_t::const_iterator it = trx_ids.begin();
it != trx_ids.end(); ++it) {
trx_t* trx = trx_get_rw_trx_by_id(*it);
ut_ad(trx != NULL);
ut_ad(trx->state == TRX_STATE_ACTIVE
|| trx->state == TRX_STATE_PREPARED);
while (!trx_sys->rw_trx_hash.find(*it));
}
#endif /* UNIV_DEBUG */
}

View File

@ -415,8 +415,8 @@ row_build_low(
times, and the cursor restore can happen multiple times for single
insert or update statement. */
ut_a(!rec_offs_any_null_extern(rec, offsets)
|| trx_rw_is_active(row_get_rec_trx_id(rec, index, offsets),
NULL, false));
|| trx_sys->rw_trx_hash.find(row_get_rec_trx_id(rec, index,
offsets)));
#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
if (type != ROW_COPY_POINTERS) {

View File

@ -799,7 +799,7 @@ row_sel_build_committed_vers_for_mysql(
rec_offs_size(*offsets));
}
row_vers_build_for_semi_consistent_read(
row_vers_build_for_semi_consistent_read(prebuilt->trx,
rec, mtr, clust_index, offsets, offset_heap,
prebuilt->old_vers_heap, old_vers, vrow);
}
@ -4972,17 +4972,17 @@ wrong_offs:
/* In delete-marked records, DB_TRX_ID must
always refer to an existing undo log record. */
ut_ad(trx_id);
if (!trx_rw_is_active(trx_id, NULL, false)) {
if (!trx_sys->rw_trx_hash.find(trx, trx_id)) {
/* The clustered index record
was delete-marked in a committed
transaction. Ignore the record. */
goto locks_ok_del_marked;
}
} else if (trx_t* trx = row_vers_impl_x_locked(
rec, index, offsets)) {
} else if (trx_t* t = row_vers_impl_x_locked(
trx, rec, index, offsets)) {
/* The record belongs to an active
transaction. We must acquire a lock. */
trx_release_reference(trx);
trx_release_reference(t);
} else {
/* The secondary index record does not
point to a delete-marked clustered index

View File

@ -47,6 +47,7 @@ Created 2/6/1997 Heikki Tuuri
/** Check whether all non-virtual columns in a virtual index match that of in
the cluster index
@param[in,out] caller_trx trx of current thread
@param[in] index the secondary index
@param[in] row the cluster index row in dtuple form
@param[in] ext externally stored column prefix or NULL
@ -65,6 +66,7 @@ row_vers_non_vc_match(
ulint* n_non_v_col);
/** Determine if an active transaction has inserted or modified a secondary
index record.
@param[in,out] caller_trx trx of current thread
@param[in] clust_rec clustered index record
@param[in] clust_index clustered index
@param[in] rec secondary index record
@ -76,6 +78,7 @@ index record.
UNIV_INLINE
trx_t*
row_vers_impl_x_locked_low(
trx_t* caller_trx,
const rec_t* clust_rec,
dict_index_t* clust_index,
const rec_t* rec,
@ -84,7 +87,6 @@ row_vers_impl_x_locked_low(
mtr_t* mtr)
{
trx_id_t trx_id;
ibool corrupt;
ulint comp;
ulint rec_del;
const rec_t* version;
@ -118,13 +120,15 @@ row_vers_impl_x_locked_low(
mem_heap_free(heap);
DBUG_RETURN(0);
}
corrupt = FALSE;
trx_t* trx = trx_rw_is_active(trx_id, &corrupt, true);
trx_t* trx = trx_sys->rw_trx_hash.find(caller_trx, trx_id, true);
if (trx == 0) {
/* The transaction that modified or inserted clust_rec is no
longer active, or it is corrupt: no implicit lock on rec */
trx_sys_mutex_enter();
bool corrupt = trx_id >= trx_sys->max_trx_id;
trx_sys_mutex_exit();
if (corrupt) {
lock_report_trx_id_insanity(
trx_id, clust_rec, clust_index, clust_offsets,
@ -189,7 +193,7 @@ row_vers_impl_x_locked_low(
inserting a delete-marked record. */
ut_ad(prev_version
|| !rec_get_deleted_flag(version, comp)
|| !trx_rw_is_active(trx_id, NULL, false));
|| !trx_sys->rw_trx_hash.find(caller_trx, trx_id));
/* Free version and clust_offsets. */
mem_heap_free(old_heap);
@ -342,6 +346,7 @@ result_check:
/** Determine if an active transaction has inserted or modified a secondary
index record.
@param[in,out] caller_trx trx of current thread
@param[in] rec secondary index record
@param[in] index secondary index
@param[in] offsets rec_get_offsets(rec, index)
@ -349,6 +354,7 @@ index record.
@retval NULL if the record was committed */
trx_t*
row_vers_impl_x_locked(
trx_t* caller_trx,
const rec_t* rec,
dict_index_t* index,
const ulint* offsets)
@ -389,7 +395,8 @@ row_vers_impl_x_locked(
trx = 0;
} else {
trx = row_vers_impl_x_locked_low(
clust_rec, clust_index, rec, index, offsets, &mtr);
caller_trx, clust_rec, clust_index, rec, index,
offsets, &mtr);
ut_ad(trx == 0 || trx_is_referenced(trx));
}
@ -1234,6 +1241,7 @@ which should be seen by a semi-consistent read. */
void
row_vers_build_for_semi_consistent_read(
/*====================================*/
trx_t* caller_trx,/*!<in/out: trx of current thread */
const rec_t* rec, /*!< in: record in a clustered index; the
caller must have a latch on the page; this
latch locks the top of the stack of versions
@ -1270,7 +1278,6 @@ row_vers_build_for_semi_consistent_read(
ut_ad(!vrow || !(*vrow));
for (;;) {
const trx_t* version_trx;
mem_heap_t* heap2;
rec_t* prev_version;
trx_id_t version_trx_id;
@ -1280,24 +1287,7 @@ row_vers_build_for_semi_consistent_read(
rec_trx_id = version_trx_id;
}
if (!version_trx_id) {
goto committed_version_trx;
}
trx_sys_mutex_enter();
version_trx = trx_get_rw_trx_by_id(version_trx_id);
/* Because version_trx is a read-write transaction,
its state cannot change from or to NOT_STARTED while
we are holding the trx_sys->mutex. It may change from
ACTIVE to PREPARED or COMMITTED. */
if (version_trx
&& trx_state_eq(version_trx,
TRX_STATE_COMMITTED_IN_MEMORY)) {
version_trx = NULL;
}
trx_sys_mutex_exit();
if (!version_trx) {
if (!trx_sys->rw_trx_hash.find(caller_trx, version_trx_id)) {
committed_version_trx:
/* We found a version that belongs to a
committed transaction: return it. */

View File

@ -480,6 +480,7 @@ LatchDebug::LatchDebug()
LEVEL_MAP_INSERT(SYNC_REC_LOCK);
LEVEL_MAP_INSERT(SYNC_THREADS);
LEVEL_MAP_INSERT(SYNC_TRX);
LEVEL_MAP_INSERT(SYNC_RW_TRX_HASH_ELEMENT);
LEVEL_MAP_INSERT(SYNC_TRX_SYS);
LEVEL_MAP_INSERT(SYNC_LOCK_SYS);
LEVEL_MAP_INSERT(SYNC_LOCK_WAIT_SYS);
@ -761,6 +762,7 @@ LatchDebug::check_order(
case SYNC_THREADS:
case SYNC_LOCK_SYS:
case SYNC_LOCK_WAIT_SYS:
case SYNC_RW_TRX_HASH_ELEMENT:
case SYNC_TRX_SYS:
case SYNC_IBUF_BITMAP_MUTEX:
case SYNC_REDO_RSEG:
@ -1521,6 +1523,8 @@ sync_latch_meta_init()
PFS_NOT_INSTRUMENTED);
LATCH_ADD_MUTEX(FIL_CRYPT_THREADS_MUTEX, SYNC_NO_ORDER_CHECK,
PFS_NOT_INSTRUMENTED);
LATCH_ADD_MUTEX(RW_TRX_HASH_ELEMENT, SYNC_RW_TRX_HASH_ELEMENT,
rw_trx_hash_element_mutex_key);
latch_id_t id = LATCH_ID_NONE;

View File

@ -96,6 +96,7 @@ mysql_pfs_key_t sync_array_mutex_key;
mysql_pfs_key_t thread_mutex_key;
mysql_pfs_key_t zip_pad_mutex_key;
mysql_pfs_key_t row_drop_list_mutex_key;
mysql_pfs_key_t rw_trx_hash_element_mutex_key;
#endif /* UNIV_PFS_MUTEX */
#ifdef UNIV_PFS_RWLOCK
mysql_pfs_key_t btr_search_latch_key;

View File

@ -429,8 +429,6 @@ void
trx_sys_init_at_db_start()
{
trx_sysf_t* sys_header;
ib_uint64_t rows_to_undo = 0;
const char* unit = "";
/* VERY important: after the database is started, max_trx_id value is
divisible by TRX_SYS_TRX_ID_WRITE_MARGIN, and the 'if' in
@ -455,43 +453,6 @@ trx_sys_init_at_db_start()
trx_dummy_sess = sess_open();
trx_lists_init_at_db_start();
/* This mutex is not strictly required, it is here only to satisfy
the debug code (assertions). We are still running in single threaded
bootstrap mode. */
trx_sys_mutex_enter();
if (UT_LIST_GET_LEN(trx_sys->rw_trx_list) > 0) {
const trx_t* trx;
for (trx = UT_LIST_GET_FIRST(trx_sys->rw_trx_list);
trx != NULL;
trx = UT_LIST_GET_NEXT(trx_list, trx)) {
ut_ad(trx->is_recovered);
assert_trx_in_rw_list(trx);
if (trx_state_eq(trx, TRX_STATE_ACTIVE)) {
rows_to_undo += trx->undo_no;
}
}
if (rows_to_undo > 1000000000) {
unit = "M";
rows_to_undo = rows_to_undo / 1000000;
}
ib::info() << UT_LIST_GET_LEN(trx_sys->rw_trx_list)
<< " transaction(s) which must be rolled back or"
" cleaned up in total " << rows_to_undo << unit
<< " row operations to undo";
ib::info() << "Trx id counter is " << trx_sys->max_trx_id;
}
trx_sys_mutex_exit();
trx_sys->mvcc->clone_oldest_view(&purge_sys->view);
}
@ -515,8 +476,7 @@ trx_sys_create(void)
new(&trx_sys->rw_trx_ids) trx_ids_t(ut_allocator<trx_id_t>(
mem_key_trx_sys_t_rw_trx_ids));
new(&trx_sys->rw_trx_set) TrxIdSet();
trx_sys->rw_trx_hash.init();
}
/*****************************************************************//**
@ -669,8 +629,7 @@ trx_sys_close(void)
trx_sys->rw_trx_ids.~trx_ids_t();
trx_sys->rw_trx_set.~TrxIdSet();
trx_sys->rw_trx_hash.destroy();
ut_free(trx_sys);
trx_sys = NULL;

View File

@ -236,6 +236,7 @@ struct TrxFactory {
new(&trx->hit_list) hit_list_t();
trx->rw_trx_hash_pins = 0;
trx_init(trx);
DBUG_LOG("trx", "Init: " << trx);
@ -446,6 +447,7 @@ trx_create_low()
/* We just got trx from pool, it should be non locking */
ut_ad(trx->will_lock == 0);
ut_ad(!trx->rw_trx_hash_pins);
/* Background trx should not be forced to rollback,
we will unset the flag for user trx. */
@ -483,6 +485,7 @@ trx_free(trx_t*& trx)
{
assert_trx_is_free(trx);
trx_sys->rw_trx_hash.put_pins(trx);
trx->mysql_thd = 0;
trx->mysql_log_file_name = 0;
@ -731,7 +734,9 @@ trx_resurrect_table_locks(
trx_undo_rec_t* undo_rec;
table_id_set tables;
if (trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) || undo->empty) {
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED));
if (undo->empty) {
return;
}
@ -960,10 +965,65 @@ trx_resurrect_update(
}
}
/** Mapping read-write transactions from id to transaction instance, for
creating read views and during trx id lookup for MVCC and locking. */
struct TrxTrack {
explicit TrxTrack(trx_id_t id, trx_t* trx = NULL)
:
m_id(id),
m_trx(trx)
{
// Do nothing
}
trx_id_t m_id;
trx_t* m_trx;
};
/**
Comparator for TrxMap */
struct TrxTrackCmp {
bool operator() (const TrxTrack& lhs, const TrxTrack& rhs) const
{
return(lhs.m_id < rhs.m_id);
}
};
typedef std::set<TrxTrack, TrxTrackCmp, ut_allocator<TrxTrack> >
TrxIdSet;
static inline void trx_sys_add_trx_at_init(trx_t *trx, trx_undo_t *undo,
uint64_t *rows_to_undo,
TrxIdSet *set)
{
ut_ad(trx->id != 0);
ut_ad(trx->is_recovered);
set->insert(TrxTrack(trx->id, trx));
if (trx_state_eq(trx, TRX_STATE_ACTIVE) ||
trx_state_eq(trx, TRX_STATE_PREPARED))
{
trx_sys->rw_trx_hash.insert(trx);
trx_sys->rw_trx_hash.put_pins(trx);
trx_sys->rw_trx_ids.push_back(trx->id);
trx_resurrect_table_locks(trx, undo);
if (trx_state_eq(trx, TRX_STATE_ACTIVE))
*rows_to_undo+= trx->undo_no;
}
#ifdef UNIV_DEBUG
trx->in_rw_trx_list= true;
if (trx->id > trx_sys->rw_max_trx_id)
trx_sys->rw_max_trx_id= trx->id;
#endif
}
/** Initialize (resurrect) transactions at startup. */
void
trx_lists_init_at_db_start()
{
TrxIdSet set;
uint64_t rows_to_undo = 0;
ut_a(srv_is_being_started);
ut_ad(!srv_was_started);
ut_ad(!purge_sys);
@ -993,15 +1053,10 @@ trx_lists_init_at_db_start()
for (undo = UT_LIST_GET_FIRST(rseg->old_insert_list);
undo != NULL;
undo = UT_LIST_GET_NEXT(undo_list, undo)) {
trx_t* trx;
trx = trx_resurrect_insert(undo, rseg);
trx_t* trx = trx_resurrect_insert(undo, rseg);
trx->start_time = start_time;
trx_sys_rw_trx_add(trx);
trx_resurrect_table_locks(trx, undo);
trx_sys_add_trx_at_init(trx, undo, &rows_to_undo,
&set);
}
/* Ressurrect other transactions. */
@ -1009,12 +1064,10 @@ trx_lists_init_at_db_start()
undo != NULL;
undo = UT_LIST_GET_NEXT(undo_list, undo)) {
/* Check the trx_sys->rw_trx_set first. */
trx_sys_mutex_enter();
trx_t* trx = trx_get_rw_trx_by_id(undo->trx_id);
trx_sys_mutex_exit();
/* Check if trx_id was already registered first. */
TrxIdSet::iterator it =
set.find(TrxTrack(undo->trx_id));
trx_t *trx= it == set.end() ? 0 : it->m_trx;
if (trx == NULL) {
trx = trx_allocate_for_background();
@ -1025,34 +1078,30 @@ trx_lists_init_at_db_start()
}
trx_resurrect_update(trx, undo, rseg);
trx_sys_rw_trx_add(trx);
trx_resurrect_table_locks(trx, undo);
trx_sys_add_trx_at_init(trx, undo, &rows_to_undo,
&set);
}
}
TrxIdSet::iterator end = trx_sys->rw_trx_set.end();
if (set.size()) {
for (TrxIdSet::iterator it = trx_sys->rw_trx_set.begin();
ib::info() << set.size()
<< " transaction(s) which must be rolled back or"
" cleaned up in total " << rows_to_undo
<< " row operations to undo";
ib::info() << "Trx id counter is " << trx_sys->max_trx_id;
}
TrxIdSet::iterator end = set.end();
for (TrxIdSet::iterator it = set.begin();
it != end;
++it) {
ut_ad(it->m_trx->in_rw_trx_list);
#ifdef UNIV_DEBUG
if (it->m_trx->id > trx_sys->rw_max_trx_id) {
trx_sys->rw_max_trx_id = it->m_trx->id;
}
#endif /* UNIV_DEBUG */
if (it->m_trx->state == TRX_STATE_ACTIVE
|| it->m_trx->state == TRX_STATE_PREPARED) {
trx_sys->rw_trx_ids.push_back(it->m_id);
}
UT_LIST_ADD_FIRST(trx_sys->rw_trx_list, it->m_trx);
}
std::sort(trx_sys->rw_trx_ids.begin(), trx_sys->rw_trx_ids.end());
}
/** Assign a persistent rollback segment in a round-robin fashion,
@ -1171,8 +1220,8 @@ trx_t::assign_temp_rseg()
mutex_enter(&trx_sys->mutex);
id = trx_sys_get_new_trx_id();
trx_sys->rw_trx_ids.push_back(id);
trx_sys->rw_trx_set.insert(TrxTrack(id, this));
mutex_exit(&trx_sys->mutex);
trx_sys->rw_trx_hash.insert(this);
}
ut_ad(!rseg->is_persistent());
@ -1237,10 +1286,14 @@ trx_start_low(
ut_ad(!trx->in_rw_trx_list);
/* We tend to over assert and that complicates the code somewhat.
e.g., the transaction state can be set earlier but we are forced to
set it under the protection of the trx_sys_t::mutex because some
trx list assertions are triggered unnecessarily. */
/* No other thread can access this trx object through rw_trx_hash, thus
we don't need trx_sys->mutex protection for that purpose. Still this
trx can be found through trx_sys->mysql_trx_list, which means state
change must be protected by e.g. trx->mutex.
For now we update it without mutex protection, because original code
did it this way. It has to be reviewed and fixed properly. */
trx->state = TRX_STATE_ACTIVE;
/* By default all transactions are in the read-only list unless they
are non-locking auto-commit read only transactions or background
@ -1262,8 +1315,6 @@ trx_start_low(
trx_sys->rw_trx_ids.push_back(trx->id);
trx_sys_rw_trx_add(trx);
ut_ad(trx->rsegs.m_redo.rseg != 0
|| srv_read_only_mode
|| srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO);
@ -1277,11 +1328,10 @@ trx_start_low(
}
#endif /* UNIV_DEBUG */
trx->state = TRX_STATE_ACTIVE;
ut_ad(trx_sys_validate_trx_list());
trx_sys_mutex_exit();
trx_sys->rw_trx_hash.insert(trx);
} else {
trx->id = 0;
@ -1302,17 +1352,11 @@ trx_start_low(
trx_sys->rw_trx_ids.push_back(trx->id);
trx_sys->rw_trx_set.insert(
TrxTrack(trx->id, trx));
trx_sys_mutex_exit();
trx_sys->rw_trx_hash.insert(trx);
}
trx->state = TRX_STATE_ACTIVE;
} else {
ut_ad(!read_write);
trx->state = TRX_STATE_ACTIVE;
}
}
@ -1642,7 +1686,22 @@ trx_erase_lists(
bool serialised)
{
ut_ad(trx->id > 0);
trx_sys_mutex_enter();
if (trx->read_only || trx->rsegs.m_redo.rseg == NULL) {
trx_sys_mutex_enter();
ut_ad(!trx->in_rw_trx_list);
} else {
trx_sys_mutex_enter();
UT_LIST_REMOVE(trx_sys->rw_trx_list, trx);
ut_d(trx->in_rw_trx_list = false);
ut_ad(trx_sys_validate_trx_list());
if (trx->read_view != NULL) {
trx_sys->mvcc->view_close(trx->read_view, true);
}
}
if (serialised) {
UT_LIST_REMOVE(trx_sys->serialisation_list, trx);
@ -1654,24 +1713,8 @@ trx_erase_lists(
trx->id);
ut_ad(*it == trx->id);
trx_sys->rw_trx_ids.erase(it);
if (trx->read_only || trx->rsegs.m_redo.rseg == NULL) {
ut_ad(!trx->in_rw_trx_list);
} else {
UT_LIST_REMOVE(trx_sys->rw_trx_list, trx);
ut_d(trx->in_rw_trx_list = false);
ut_ad(trx_sys_validate_trx_list());
if (trx->read_view != NULL) {
trx_sys->mvcc->view_close(trx->read_view, true);
}
}
trx_sys->rw_trx_set.erase(TrxTrack(trx->id));
trx_sys_mutex_exit();
trx_sys->rw_trx_hash.erase(trx);
}
/****************************************************************//**
@ -3036,6 +3079,7 @@ trx_set_rw_mode(
ut_ad(!trx->in_rw_trx_list);
ut_ad(!trx_is_autocommit_non_locking(trx));
ut_ad(!trx->read_only);
ut_ad(trx->id == 0);
if (high_level_read_only) {
return;
@ -3053,14 +3097,10 @@ trx_set_rw_mode(
ut_ad(trx->rsegs.m_redo.rseg != 0);
mutex_enter(&trx_sys->mutex);
ut_ad(trx->id == 0);
trx->id = trx_sys_get_new_trx_id();
trx_sys->rw_trx_ids.push_back(trx->id);
trx_sys->rw_trx_set.insert(TrxTrack(trx->id, trx));
/* So that we can see our own changes. */
if (MVCC::is_view_active(trx->read_view)) {
MVCC::set_view_creator_trx_id(trx->read_view, trx->id);
@ -3077,6 +3117,7 @@ trx_set_rw_mode(
ut_d(trx->in_rw_trx_list = true);
mutex_exit(&trx_sys->mutex);
trx_sys->rw_trx_hash.insert(trx);
}
/**