Add locks for sequence's to ensure that there is only one writer or many readers

This is needed for MyISAM and other storage engines which normally
relies on THR_LOCK's to ensure that one is not writing the same block
one could be reading from.
This commit is contained in:
Monty 2017-05-29 16:08:11 +03:00
parent d7e3120da8
commit 7e5bd1500f
6 changed files with 97 additions and 48 deletions

View File

@ -48,7 +48,7 @@ handlerton *sql_sequence_hton;
*/ */
ha_sequence::ha_sequence(handlerton *hton, TABLE_SHARE *share) ha_sequence::ha_sequence(handlerton *hton, TABLE_SHARE *share)
:handler(hton, share), sequence_locked(0) :handler(hton, share), write_locked(0)
{ {
sequence= share->sequence; sequence= share->sequence;
DBUG_ASSERT(share->sequence); DBUG_ASSERT(share->sequence);
@ -179,7 +179,7 @@ int ha_sequence::create(const char *name, TABLE *form,
@retval != 0 Failure @retval != 0 Failure
NOTES: NOTES:
sequence_locked is set if we are called from SEQUENCE::next_value write_locked is set if we are called from SEQUENCE::next_value
In this case the mutex is already locked and we should not update In this case the mutex is already locked and we should not update
the sequence with 'buf' as the sequence object is already up to date. the sequence with 'buf' as the sequence object is already up to date.
*/ */
@ -188,6 +188,7 @@ int ha_sequence::write_row(uchar *buf)
{ {
int error; int error;
sequence_definition tmp_seq; sequence_definition tmp_seq;
bool sequence_locked;
DBUG_ENTER("ha_sequence::write_row"); DBUG_ENTER("ha_sequence::write_row");
DBUG_ASSERT(table->record[0] == buf); DBUG_ASSERT(table->record[0] == buf);
@ -200,7 +201,8 @@ int ha_sequence::write_row(uchar *buf)
if (unlikely(sequence->initialized != SEQUENCE::SEQ_READY_TO_USE)) if (unlikely(sequence->initialized != SEQUENCE::SEQ_READY_TO_USE))
DBUG_RETURN(HA_ERR_WRONG_COMMAND); DBUG_RETURN(HA_ERR_WRONG_COMMAND);
if (!sequence_locked) // If not from next_value() sequence_locked= write_locked;
if (!write_locked) // If not from next_value()
{ {
/* /*
User tries to write a full row directly to the sequence table with User tries to write a full row directly to the sequence table with
@ -224,14 +226,13 @@ int ha_sequence::write_row(uchar *buf)
tmp_seq.read_fields(table); tmp_seq.read_fields(table);
if (tmp_seq.check_and_adjust(0)) if (tmp_seq.check_and_adjust(0))
DBUG_RETURN(HA_ERR_SEQUENCE_INVALID_DATA); DBUG_RETURN(HA_ERR_SEQUENCE_INVALID_DATA);
}
/* /*
Lock sequence to ensure that no one can come in between Lock sequence to ensure that no one can come in between
while sequence, table and binary log are updated. while sequence, table and binary log are updated.
*/ */
if (!sequence_locked) // If not from next_value() sequence->write_lock(table);
sequence->lock(); }
if (!(error= file->update_first_row(buf))) if (!(error= file->update_first_row(buf)))
{ {
@ -246,7 +247,7 @@ int ha_sequence::write_row(uchar *buf)
sequence->all_values_used= 0; sequence->all_values_used= 0;
if (!sequence_locked) if (!sequence_locked)
sequence->unlock(); sequence->write_unlock(table);
DBUG_RETURN(error); DBUG_RETURN(error);
} }

View File

@ -60,6 +60,9 @@ private:
SEQUENCE *sequence; /* From table_share->sequence */ SEQUENCE *sequence; /* From table_share->sequence */
public: public:
/* Set when handler is write locked */
bool write_locked;
ha_sequence(handlerton *hton, TABLE_SHARE *share); ha_sequence(handlerton *hton, TABLE_SHARE *share);
~ha_sequence(); ~ha_sequence();
@ -89,16 +92,36 @@ public:
/* For ALTER ONLINE TABLE */ /* For ALTER ONLINE TABLE */
bool check_if_incompatible_data(HA_CREATE_INFO *create_info, bool check_if_incompatible_data(HA_CREATE_INFO *create_info,
uint table_changes); uint table_changes);
void write_lock() { write_locked= 1;}
void unlock() { write_locked= 0; }
bool is_locked() { return write_locked; }
/* Functions that are directly mapped to the underlying handler */ /* Functions that are directly mapped to the underlying handler */
int rnd_init(bool scan) int rnd_init(bool scan)
{ return file->rnd_init(scan); } { return file->rnd_init(scan); }
/*
We need to have a lock here to protect engines like MyISAM from
simultaneous read and write. For sequence's this is not critical
as this function is used extremely seldom.
*/
int rnd_next(uchar *buf) int rnd_next(uchar *buf)
{ return file->rnd_next(buf); } {
int error;
table->s->sequence->read_lock(table);
error= file->rnd_next(buf);
table->s->sequence->read_unlock(table);
return error;
}
int rnd_end() int rnd_end()
{ return file->rnd_end(); } { return file->rnd_end(); }
int rnd_pos(uchar *buf, uchar *pos) int rnd_pos(uchar *buf, uchar *pos)
{ return file->rnd_pos(buf, pos); } {
int error;
table->s->sequence->read_lock(table);
error= file->rnd_pos(buf, pos);
table->s->sequence->read_unlock(table);
return error;
}
void position(const uchar *record) void position(const uchar *record)
{ return file->position(record); } { return file->position(record); }
const char *table_type() const const char *table_type() const
@ -136,8 +159,5 @@ public:
file= file_arg; file= file_arg;
init(); /* Update cached_table_flags */ init(); /* Update cached_table_flags */
} }
/* To inform handler that sequence is already locked by called */
bool sequence_locked;
}; };
#endif #endif

View File

@ -925,8 +925,7 @@ PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_PARTITION_LOCK_auto_inc; key_PARTITION_LOCK_auto_inc;
PSI_mutex_key key_RELAYLOG_LOCK_index; PSI_mutex_key key_RELAYLOG_LOCK_index;
PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry, key_LOCK_rpl_thread, key_LOCK_rpl_thread_pool, key_LOCK_parallel_entry;
key_LOCK_SEQUENCE;
PSI_mutex_key key_LOCK_stats, PSI_mutex_key key_LOCK_stats,
key_LOCK_global_user_client_stats, key_LOCK_global_table_stats, key_LOCK_global_user_client_stats, key_LOCK_global_table_stats,
@ -1011,7 +1010,6 @@ static PSI_mutex_info all_server_mutexes[]=
{ &key_LOCK_slave_state, "LOCK_slave_state", 0}, { &key_LOCK_slave_state, "LOCK_slave_state", 0},
{ &key_LOCK_start_thread, "LOCK_start_thread", PSI_FLAG_GLOBAL}, { &key_LOCK_start_thread, "LOCK_start_thread", PSI_FLAG_GLOBAL},
{ &key_LOCK_binlog_state, "LOCK_binlog_state", 0}, { &key_LOCK_binlog_state, "LOCK_binlog_state", 0},
{ &key_LOCK_SEQUENCE, "SQUENCE::LOCK_SEQUENCE", 0},
{ &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0}, { &key_LOCK_rpl_thread, "LOCK_rpl_thread", 0},
{ &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0}, { &key_LOCK_rpl_thread_pool, "LOCK_rpl_thread_pool", 0},
{ &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0} { &key_LOCK_parallel_entry, "LOCK_parallel_entry", 0}
@ -1019,7 +1017,9 @@ static PSI_mutex_info all_server_mutexes[]=
PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock; key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock,
key_LOCK_SEQUENCE;
static PSI_rwlock_info all_server_rwlocks[]= static PSI_rwlock_info all_server_rwlocks[]=
{ {
@ -1030,6 +1030,7 @@ static PSI_rwlock_info all_server_rwlocks[]=
{ &key_rwlock_LOCK_logger, "LOGGER::LOCK_logger", 0}, { &key_rwlock_LOCK_logger, "LOGGER::LOCK_logger", 0},
{ &key_rwlock_LOCK_sys_init_connect, "LOCK_sys_init_connect", PSI_FLAG_GLOBAL}, { &key_rwlock_LOCK_sys_init_connect, "LOCK_sys_init_connect", PSI_FLAG_GLOBAL},
{ &key_rwlock_LOCK_sys_init_slave, "LOCK_sys_init_slave", PSI_FLAG_GLOBAL}, { &key_rwlock_LOCK_sys_init_slave, "LOCK_sys_init_slave", PSI_FLAG_GLOBAL},
{ &key_LOCK_SEQUENCE, "LOCK_SEQUENCE", 0},
{ &key_rwlock_LOCK_system_variables_hash, "LOCK_system_variables_hash", PSI_FLAG_GLOBAL}, { &key_rwlock_LOCK_system_variables_hash, "LOCK_system_variables_hash", PSI_FLAG_GLOBAL},
{ &key_rwlock_query_cache_query_lock, "Query_cache_query::lock", 0} { &key_rwlock_query_cache_query_lock, "Query_cache_query::lock", 0}
}; };

View File

@ -301,7 +301,7 @@ extern PSI_mutex_key key_BINLOG_LOCK_index, key_BINLOG_LOCK_xid_list,
key_relay_log_info_log_space_lock, key_relay_log_info_run_lock, key_relay_log_info_log_space_lock, key_relay_log_info_run_lock,
key_rpl_group_info_sleep_lock, key_rpl_group_info_sleep_lock,
key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data, key_structure_guard_mutex, key_TABLE_SHARE_LOCK_ha_data,
key_LOCK_start_thread, key_LOCK_SEQUENCE, key_LOCK_start_thread,
key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc; key_LOCK_error_messages, key_LOCK_thread_count, key_PARTITION_LOCK_auto_inc;
extern PSI_mutex_key key_RELAYLOG_LOCK_index; extern PSI_mutex_key key_RELAYLOG_LOCK_index;
extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state, extern PSI_mutex_key key_LOCK_slave_state, key_LOCK_binlog_state,
@ -314,8 +314,8 @@ extern PSI_mutex_key key_LOCK_gtid_waiting;
extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger, extern PSI_rwlock_key key_rwlock_LOCK_grant, key_rwlock_LOCK_logger,
key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave, key_rwlock_LOCK_sys_init_connect, key_rwlock_LOCK_sys_init_slave,
key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock; key_rwlock_LOCK_system_variables_hash, key_rwlock_query_cache_query_lock,
key_LOCK_SEQUENCE;
#ifdef HAVE_MMAP #ifdef HAVE_MMAP
extern PSI_cond_key key_PAGE_cond, key_COND_active, key_COND_pool; extern PSI_cond_key key_PAGE_cond, key_COND_active, key_COND_pool;
#endif /* HAVE_MMAP */ #endif /* HAVE_MMAP */

View File

@ -342,14 +342,46 @@ bool sequence_insert(THD *thd, LEX *lex, TABLE_LIST *table_list)
SEQUENCE::SEQUENCE() :all_values_used(0), initialized(SEQ_UNINTIALIZED), table(0) SEQUENCE::SEQUENCE() :all_values_used(0), initialized(SEQ_UNINTIALIZED), table(0)
{ {
mysql_mutex_init(key_LOCK_SEQUENCE, &mutex, MY_MUTEX_INIT_SLOW); mysql_rwlock_init(key_LOCK_SEQUENCE, &mutex);
} }
SEQUENCE::~SEQUENCE() SEQUENCE::~SEQUENCE()
{ {
mysql_mutex_destroy(&mutex); mysql_rwlock_destroy(&mutex);
} }
/*
The following functions is to ensure that we when reserve new values
trough sequence object sequence we have only one writer at at time.
A sequence table can have many readers (trough normal SELECT's).
We mark that we have a write lock in the table object so that
ha_sequence::ha_write() can check if we have a lock. If already locked, then
ha_write() knows that we are running a sequence operation. If not, then
ha_write() knows that it's an INSERT.
*/
void SEQUENCE::write_lock(TABLE *table)
{
DBUG_ASSERT(((ha_sequence*) table->file)->is_locked() == 0);
mysql_rwlock_wrlock(&mutex);
((ha_sequence*) table->file)->write_lock();
}
void SEQUENCE::write_unlock(TABLE *table)
{
((ha_sequence*) table->file)->unlock();
mysql_rwlock_unlock(&mutex);
}
void SEQUENCE::read_lock(TABLE *table)
{
if (!((ha_sequence*) table->file)->is_locked())
mysql_rwlock_rdlock(&mutex);
}
void SEQUENCE::read_unlock(TABLE *table)
{
if (!((ha_sequence*) table->file)->is_locked())
mysql_rwlock_unlock(&mutex);
}
/** /**
Read values from the sequence tables to table_share->sequence. Read values from the sequence tables to table_share->sequence.
@ -366,7 +398,7 @@ int SEQUENCE::read_initial_values(TABLE *table_arg)
if (likely(initialized != SEQ_UNINTIALIZED)) if (likely(initialized != SEQ_UNINTIALIZED))
DBUG_RETURN(0); DBUG_RETURN(0);
table= table_arg; table= table_arg;
mysql_mutex_lock(&mutex); write_lock(table);
if (likely(initialized == SEQ_UNINTIALIZED)) if (likely(initialized == SEQ_UNINTIALIZED))
{ {
MYSQL_LOCK *lock; MYSQL_LOCK *lock;
@ -422,7 +454,7 @@ int SEQUENCE::read_initial_values(TABLE *table_arg)
if (!has_active_transaction && !thd->transaction.stmt.is_empty()) if (!has_active_transaction && !thd->transaction.stmt.is_empty())
trans_commit_stmt(thd); trans_commit_stmt(thd);
} }
mysql_mutex_unlock(&mutex); write_unlock(table);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
@ -436,7 +468,6 @@ int SEQUENCE::read_stored_values()
int error; int error;
my_bitmap_map *save_read_set; my_bitmap_map *save_read_set;
DBUG_ENTER("SEQUENCE::read_stored_values"); DBUG_ENTER("SEQUENCE::read_stored_values");
mysql_mutex_assert_owner(&mutex);
save_read_set= tmp_use_all_columns(table, table->read_set); save_read_set= tmp_use_all_columns(table, table->read_set);
error= table->file->ha_read_first_row(table->record[0], MAX_KEY); error= table->file->ha_read_first_row(table->record[0], MAX_KEY);
@ -546,6 +577,7 @@ int sequence_definition::write(TABLE *table, bool all_fields)
{ {
int error; int error;
MY_BITMAP *save_rpl_write_set, *save_write_set; MY_BITMAP *save_rpl_write_set, *save_write_set;
DBUG_ASSERT(((ha_sequence*) table->file)->is_locked());
save_rpl_write_set= table->rpl_write_set; save_rpl_write_set= table->rpl_write_set;
if (likely(!all_fields)) if (likely(!all_fields))
@ -563,11 +595,8 @@ int sequence_definition::write(TABLE *table, bool all_fields)
save_write_set= table->write_set; save_write_set= table->write_set;
table->write_set= &table->s->all_set; table->write_set= &table->s->all_set;
store_fields(table); store_fields(table);
/* Tell ha_sequence::write_row that we already hold the mutex */
((ha_sequence*) table->file)->sequence_locked= 1;
if ((error= table->file->ha_write_row(table->record[0]))) if ((error= table->file->ha_write_row(table->record[0])))
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));
((ha_sequence*) table->file)->sequence_locked= 0;
table->rpl_write_set= save_rpl_write_set; table->rpl_write_set= save_rpl_write_set;
table->write_set= save_write_set; table->write_set= save_write_set;
return error; return error;
@ -610,7 +639,7 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
*error= 0; *error= 0;
if (!second_round) if (!second_round)
lock(); write_lock(table);
res_value= next_free_value; res_value= next_free_value;
next_free_value= increment_value(next_free_value); next_free_value= increment_value(next_free_value);
@ -618,7 +647,7 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
if ((real_increment > 0 && res_value < reserved_until) || if ((real_increment > 0 && res_value < reserved_until) ||
(real_increment < 0 && res_value > reserved_until)) (real_increment < 0 && res_value > reserved_until))
{ {
unlock(); write_unlock(table);
DBUG_RETURN(res_value); DBUG_RETURN(res_value);
} }
@ -677,11 +706,11 @@ longlong SEQUENCE::next_value(TABLE *table, bool second_round, int *error)
next_free_value= res_value; next_free_value= res_value;
} }
unlock(); write_unlock(table);
DBUG_RETURN(res_value); DBUG_RETURN(res_value);
err: err:
unlock(); write_unlock(table);
my_error(ER_SEQUENCE_RUN_OUT, MYF(0), table->s->db.str, my_error(ER_SEQUENCE_RUN_OUT, MYF(0), table->s->db.str,
table->s->table_name.str); table->s->table_name.str);
*error= ER_SEQUENCE_RUN_OUT; *error= ER_SEQUENCE_RUN_OUT;
@ -740,7 +769,7 @@ bool SEQUENCE::set_value(TABLE *table, longlong next_val, ulonglong next_round,
ulonglong org_round= round; ulonglong org_round= round;
DBUG_ENTER("SEQUENCE::set_value"); DBUG_ENTER("SEQUENCE::set_value");
lock(); write_lock(table);
if (is_used) if (is_used)
next_val= increment_value(next_val); next_val= increment_value(next_val);
@ -782,7 +811,7 @@ bool SEQUENCE::set_value(TABLE *table, longlong next_val, ulonglong next_round,
error= 0; error= 0;
end: end:
unlock(); write_unlock(table);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
@ -871,6 +900,7 @@ bool Sql_cmd_alter_sequence::execute(THD *thd)
goto end; goto end;
} }
table->s->sequence->write_lock(table);
if (!(error= new_seq->write(table, 1))) if (!(error= new_seq->write(table, 1)))
{ {
/* Store the sequence values in table share */ /* Store the sequence values in table share */
@ -878,6 +908,7 @@ bool Sql_cmd_alter_sequence::execute(THD *thd)
} }
else else
table->file->print_error(error, MYF(0)); table->file->print_error(error, MYF(0));
table->s->sequence->write_unlock(table);
if (trans_commit_stmt(thd)) if (trans_commit_stmt(thd))
error= 1; error= 1;
if (trans_commit_implicit(thd)) if (trans_commit_implicit(thd))

View File

@ -40,7 +40,7 @@ class sequence_definition :public Sql_alloc
public: public:
sequence_definition(): sequence_definition():
min_value(1), max_value(LONGLONG_MAX-1), start(1), increment(1), min_value(1), max_value(LONGLONG_MAX-1), start(1), increment(1),
cache(1000), round(0), cycle(0), used_fields(0) cache(1000), round(0), restart(0), cycle(0), used_fields(0)
{} {}
longlong reserved_until; longlong reserved_until;
longlong min_value; longlong min_value;
@ -49,9 +49,9 @@ public:
longlong increment; longlong increment;
longlong cache; longlong cache;
ulonglong round; ulonglong round;
longlong restart; // alter sequence restart value
bool cycle; bool cycle;
uint used_fields; // Which fields where used in CREATE uint used_fields; // Which fields where used in CREATE
longlong restart; // alter sequence restart value
bool check_and_adjust(bool set_reserved_until); bool check_and_adjust(bool set_reserved_until);
void store_fields(TABLE *table); void store_fields(TABLE *table);
@ -93,14 +93,10 @@ public:
~SEQUENCE(); ~SEQUENCE();
int read_initial_values(TABLE *table); int read_initial_values(TABLE *table);
int read_stored_values(); int read_stored_values();
void lock() void write_lock(TABLE *table);
{ void write_unlock(TABLE *table);
mysql_mutex_lock(&mutex); void read_lock(TABLE *table);
} void read_unlock(TABLE *table);
void unlock()
{
mysql_mutex_unlock(&mutex);
}
void copy(sequence_definition *seq) void copy(sequence_definition *seq)
{ {
sequence_definition::operator= (*seq); sequence_definition::operator= (*seq);
@ -135,7 +131,7 @@ public:
private: private:
TABLE *table; TABLE *table;
mysql_mutex_t mutex; mysql_rwlock_t mutex;
}; };