From 64923bb653f908a6ea422a2d252011f93056f41c Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 25 Apr 2014 12:58:31 +0200 Subject: [PATCH] MDEV-6156: Parallel replication incorrectly caches charset between worker threads The previous patch for this bug was unfortunately completely wrong. The purpose of cached_charset is to remember which character set we have installed currently in the THD, so that in the common case where charset does not change between queries, we do not need to update it in the THD. Thus, it is important that the cached_charset field is tightly coupled to the THD for which it handles caching. Thus the right place to put cached_charset seems to be in the THD. This patch introduces a field THD:system_thread_info where such info in general can be placed without further inflating the THD with unused data for other threads (THD is already far too big as it is). It then moves the cached_charset into this slot for the SQL driver thread and for the parallel replication worker threads. The THD::rpl_filter field is also moved inside system_thread_info, to keep the size of THD unchanged. Moving further fields in to reduce the size of THD is a separate task, filed as MDEV-6164. --- sql/log_event.cc | 3 ++- sql/rpl_mi.h | 10 ++++++++ sql/rpl_parallel.cc | 4 ++- sql/rpl_rli.cc | 53 +++++++++++++++++++++------------------ sql/rpl_rli.h | 36 ++++++++++++++++++-------- sql/slave.cc | 9 ++++--- sql/sql_acl.cc | 26 +++++++++++-------- sql/sql_class.h | 8 ++++-- sql/sql_parse.cc | 61 +++++++++++++++++++++++++++------------------ 9 files changed, 134 insertions(+), 76 deletions(-) diff --git a/sql/log_event.cc b/sql/log_event.cc index 0a24d45113b..dda7f360f55 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4153,7 +4153,8 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, (sql_mode & ~(ulong) MODE_NO_DIR_IN_CREATE)); if (charset_inited) { - if (rgi->cached_charset_compare(charset)) + rpl_sql_thread_info *sql_info= thd->system_thread_info.rpl_sql_info; + if (sql_info->cached_charset_compare(charset)) { /* Verify that we support the charsets found in the event. */ if (!(thd->variables.character_set_client= diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h index a136af38356..af739b1dad4 100644 --- a/sql/rpl_mi.h +++ b/sql/rpl_mi.h @@ -216,6 +216,16 @@ public: bool stop_all_slaves(THD *thd); }; + +/* + The class rpl_io_thread_info is the THD::system_thread_info for the IO thread. +*/ +class rpl_io_thread_info +{ +public: +}; + + bool check_master_connection_name(LEX_STRING *name); void create_logfile_name_with_suffix(char *res_file_name, size_t length, const char *info_file, diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 5c902249914..e72d3470a7f 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -33,7 +33,7 @@ rpt_handle_event(rpl_parallel_thread::queued_event *qev, THD *thd= rgi->thd; thd->rgi_slave= rgi; - thd->rpl_filter = rli->mi->rpl_filter; + thd->system_thread_info.rpl_sql_info->rpl_filter = rli->mi->rpl_filter; /* ToDo: Access to thd, and what about rli, split out a parallel part? */ mysql_mutex_lock(&rli->data_lock); @@ -212,6 +212,7 @@ handle_rpl_parallel_thread(void *arg) rpl_parallel_thread::queued_event *qevs_to_free; rpl_group_info *rgis_to_free; group_commit_orderer *gcos_to_free; + rpl_sql_thread_info sql_info(NULL); size_t total_event_size; int err; @@ -242,6 +243,7 @@ handle_rpl_parallel_thread(void *arg) thd_proc_info(thd, "Waiting for work from main SQL threads"); thd->set_time(); thd->variables.lock_wait_timeout= LONG_TIMEOUT; + thd->system_thread_info.rpl_sql_info= &sql_info; /* For now, we need to run the replication parallel worker threads in READ COMMITTED. This is needed because gap locks are not symmetric. diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 8104d0ff553..a162d1d79f8 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1479,7 +1479,6 @@ rpl_group_info::rpl_group_info(Relay_log_info *rli) deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false) { reinit(rli); - cached_charset_invalidate(); bzero(¤t_gtid, sizeof(current_gtid)); mysql_mutex_init(key_rpl_group_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST); @@ -1562,29 +1561,6 @@ delete_or_keep_event_post_apply(rpl_group_info *rgi, } -void rpl_group_info::cached_charset_invalidate() -{ - DBUG_ENTER("rpl_group_info::cached_charset_invalidate"); - - /* Full of zeroes means uninitialized. */ - bzero(cached_charset, sizeof(cached_charset)); - DBUG_VOID_RETURN; -} - - -bool rpl_group_info::cached_charset_compare(char *charset) const -{ - DBUG_ENTER("rpl_group_info::cached_charset_compare"); - - if (memcmp(cached_charset, charset, sizeof(cached_charset))) - { - memcpy(const_cast(cached_charset), charset, sizeof(cached_charset)); - DBUG_RETURN(1); - } - DBUG_RETURN(0); -} - - void rpl_group_info::cleanup_context(THD *thd, bool error) { DBUG_ENTER("Relay_log_info::cleanup_context"); @@ -1769,4 +1745,33 @@ rpl_group_info::mark_start_commit() } +rpl_sql_thread_info::rpl_sql_thread_info(Rpl_filter *filter) + : rpl_filter(filter) +{ + cached_charset_invalidate(); +} + + +void rpl_sql_thread_info::cached_charset_invalidate() +{ + DBUG_ENTER("rpl_group_info::cached_charset_invalidate"); + + /* Full of zeroes means uninitialized. */ + bzero(cached_charset, sizeof(cached_charset)); + DBUG_VOID_RETURN; +} + + +bool rpl_sql_thread_info::cached_charset_compare(char *charset) const +{ + DBUG_ENTER("rpl_group_info::cached_charset_compare"); + + if (memcmp(cached_charset, charset, sizeof(cached_charset))) + { + memcpy(const_cast(cached_charset), charset, sizeof(cached_charset)); + DBUG_RETURN(1); + } + DBUG_RETURN(0); +} + #endif diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index bcb237b6ced..137571ab820 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -26,6 +26,7 @@ struct RPL_TABLE_LIST; class Master_info; +class Rpl_filter; /**************************************************************************** @@ -536,8 +537,6 @@ struct rpl_group_info mysql_mutex_t sleep_lock; mysql_cond_t sleep_cond; - char cached_charset[6]; - /* trans_retries varies between 0 to slave_transaction_retries and counts how many times the slave has retried the present transaction; gets reset to 0 @@ -671,15 +670,6 @@ struct rpl_group_info return false; } - /* - Last charset (6 bytes) seen by slave SQL thread is cached here; it helps - the thread save 3 get_charset() per Query_log_event if the charset is not - changing from event to event (common situation). - When the 6 bytes are equal to 0 is used to mean "cache is invalidated". - */ - void cached_charset_invalidate(); - bool cached_charset_compare(char *charset) const; - void clear_tables_to_lock(); void cleanup_context(THD *, bool); void slave_close_thread_tables(THD *); @@ -727,6 +717,30 @@ struct rpl_group_info }; +/* + The class rpl_sql_thread_info is the THD::system_thread_info for an SQL + thread; this is either the driver SQL thread or a worker thread for parallel + replication. +*/ +class rpl_sql_thread_info +{ +public: + char cached_charset[6]; + Rpl_filter* rpl_filter; + + rpl_sql_thread_info(Rpl_filter *filter); + + /* + Last charset (6 bytes) seen by slave SQL thread is cached here; it helps + the thread save 3 get_charset() per Query_log_event if the charset is not + changing from event to event (common situation). + When the 6 bytes are equal to 0 is used to mean "cache is invalidated". + */ + void cached_charset_invalidate(); + bool cached_charset_compare(char *charset) const; +}; + + // Defined in rpl_rli.cc int init_relay_log_info(Relay_log_info* rli, const char* info_fname); diff --git a/sql/slave.cc b/sql/slave.cc index b5d8758a405..a40e5735b24 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2891,7 +2891,7 @@ void set_slave_thread_default_charset(THD* thd, rpl_group_info *rgi) global_system_variables.collation_server; thd->update_charset(); - rgi->cached_charset_invalidate(); + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); DBUG_VOID_RETURN; } @@ -3768,6 +3768,7 @@ pthread_handler_t handle_slave_io(void *arg) uint retry_count; bool suppress_warnings; int ret; + rpl_io_thread_info io_info; #ifndef DBUG_OFF uint retry_count_reg= 0, retry_count_dump= 0, retry_count_event= 0; #endif @@ -3801,6 +3802,7 @@ pthread_handler_t handle_slave_io(void *arg) sql_print_error("Failed during slave I/O thread initialization"); goto err_during_init; } + thd->system_thread_info.rpl_io_info= &io_info; mysql_mutex_lock(&LOCK_thread_count); threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); @@ -4367,6 +4369,7 @@ pthread_handler_t handle_slave_sql(void *arg) Relay_log_info* rli = &mi->rli; const char *errmsg; rpl_group_info *serial_rgi; + rpl_sql_thread_info sql_info(mi->rpl_filter); // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff my_thread_init(); @@ -4378,7 +4381,7 @@ pthread_handler_t handle_slave_sql(void *arg) serial_rgi= new rpl_group_info(rli); thd = new THD; // note that contructor of THD uses DBUG_ ! thd->thread_stack = (char*)&thd; // remember where our stack is - thd->rpl_filter = mi->rpl_filter; + thd->system_thread_info.rpl_sql_info= &sql_info; DBUG_ASSERT(rli->inited); DBUG_ASSERT(rli->mi == mi); @@ -4676,7 +4679,7 @@ err_during_init: mysql_cond_broadcast(&rli->data_cond); rli->ignore_log_space_limit= 0; /* don't need any lock */ /* we die so won't remember charset - re-update them on next thread start */ - serial_rgi->cached_charset_invalidate(); + thd->system_thread_info.rpl_sql_info->cached_charset_invalidate(); /* TODO: see if we can do this conditionally in next_event() instead diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 130113c17f1..c4c06ba0e0b 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -38,6 +38,7 @@ #include "records.h" // READ_RECORD, read_record_info, // init_read_record, end_read_record #include "rpl_filter.h" // rpl_filter +#include "rpl_rli.h" #include #include #include "sp_head.h" @@ -2558,7 +2559,7 @@ bool change_password(THD *thd, const char *host, const char *user, { TABLE_LIST tables; TABLE *table; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; /* Buffer should be extended when password length is extended. */ char buff[512]; ulong query_length; @@ -2580,7 +2581,8 @@ bool change_password(THD *thd, const char *host, const char *user, GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -5393,7 +5395,7 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list, TABLE_LIST tables[3]; bool create_new_users=0; char *db_name, *table_name; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("mysql_table_grant"); if (!initialized) @@ -5483,7 +5485,8 @@ int mysql_table_grant(THD *thd, TABLE_LIST *table_list, GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -5670,7 +5673,7 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc, TABLE_LIST tables[2]; bool create_new_users=0, result=0; char *db_name, *table_name; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("mysql_routine_grant"); if (!initialized) @@ -5705,7 +5708,8 @@ bool mysql_routine_grant(THD *thd, TABLE_LIST *table_list, bool is_proc, GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -6141,7 +6145,7 @@ bool mysql_grant(THD *thd, const char *db, List &list, char tmp_db[SAFE_NAME_LEN+1]; bool create_new_users=0; TABLE_LIST tables[2]; - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("mysql_grant"); if (!initialized) @@ -6190,7 +6194,8 @@ bool mysql_grant(THD *thd, const char *db, List &list, GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into @@ -8223,7 +8228,7 @@ void get_mqh(const char *user, const char *host, USER_CONN *uc) #define GRANT_TABLES 7 static int open_grant_tables(THD *thd, TABLE_LIST *tables) { - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; DBUG_ENTER("open_grant_tables"); if (!initialized) @@ -8267,7 +8272,8 @@ static int open_grant_tables(THD *thd, TABLE_LIST *tables) GRANT and REVOKE are applied the slave in/exclusion rules as they are some kind of updates to the mysql.% tables. */ - if (thd->slave_thread && rpl_filter->is_on()) + if (thd->slave_thread && + (rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter)->is_on()) { /* The tables must be marked "updating" so that tables_ok() takes them into diff --git a/sql/sql_class.h b/sql/sql_class.h index d050a7b3704..19e641079bd 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -72,6 +72,8 @@ class Parser_state; class Rows_log_event; class Sroutine_hash_entry; class user_var_entry; +class rpl_io_thread_info; +class rpl_sql_thread_info; enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME }; enum enum_duplicates { DUP_ERROR, DUP_REPLACE, DUP_UPDATE }; @@ -1810,8 +1812,10 @@ public: /* Slave applier execution context */ rpl_group_info* rgi_slave; - /* Used to SLAVE SQL thread */ - Rpl_filter* rpl_filter; + union { + rpl_io_thread_info *rpl_io_info; + rpl_sql_thread_info *rpl_sql_info; + } system_thread_info; void reset_for_next_command(); /* diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 055317f0445..f7e2a9ce3e2 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -171,8 +171,9 @@ const char *xa_state_names[]={ */ inline bool all_tables_not_ok(THD *thd, TABLE_LIST *tables) { - return thd->rpl_filter->is_on() && tables && !thd->spcont && - !thd->rpl_filter->tables_ok(thd->db, tables); + Rpl_filter *rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + return rpl_filter->is_on() && tables && !thd->spcont && + !rpl_filter->tables_ok(thd->db, tables); } #endif @@ -2233,7 +2234,7 @@ mysql_execute_command(THD *thd) /* have table map for update for multi-update statement (BUG#37051) */ bool have_table_map_for_update= FALSE; /* */ - Rpl_filter *rpl_filter= thd->rpl_filter; + Rpl_filter *rpl_filter; #endif DBUG_ENTER("mysql_execute_command"); @@ -3885,12 +3886,15 @@ end_with_restore_list: above was not called. So we have to check rules again here. */ #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(lex->name.str) || - !rpl_filter->db_ok_with_wild_table(lex->name.str))) + if (thd->slave_thread) { - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(lex->name.str) || + !rpl_filter->db_ok_with_wild_table(lex->name.str)) + { + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_access(thd, CREATE_ACL, lex->name.str, NULL, NULL, 1, 0)) @@ -3913,12 +3917,15 @@ end_with_restore_list: above was not called. So we have to check rules again here. */ #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(lex->name.str) || - !rpl_filter->db_ok_with_wild_table(lex->name.str))) + if (thd->slave_thread) { - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(lex->name.str) || + !rpl_filter->db_ok_with_wild_table(lex->name.str)) + { + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_access(thd, DROP_ACL, lex->name.str, NULL, NULL, 1, 0)) @@ -3930,13 +3937,16 @@ end_with_restore_list: { LEX_STRING *db= & lex->name; #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(db->str) || - !rpl_filter->db_ok_with_wild_table(db->str))) + if (thd->slave_thread) { - res= 1; - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(db->str) || + !rpl_filter->db_ok_with_wild_table(db->str)) + { + res= 1; + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_db_name(db)) @@ -3973,12 +3983,15 @@ end_with_restore_list: above was not called. So we have to check rules again here. */ #ifdef HAVE_REPLICATION - if (thd->slave_thread && - (!rpl_filter->db_ok(db->str) || - !rpl_filter->db_ok_with_wild_table(db->str))) + if (thd->slave_thread) { - my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); - break; + rpl_filter= thd->system_thread_info.rpl_sql_info->rpl_filter; + if (!rpl_filter->db_ok(db->str) || + !rpl_filter->db_ok_with_wild_table(db->str)) + { + my_message(ER_SLAVE_IGNORED_TABLE, ER(ER_SLAVE_IGNORED_TABLE), MYF(0)); + break; + } } #endif if (check_access(thd, ALTER_ACL, db->str, NULL, NULL, 1, 0))