diff --git a/sql/sql_class.h b/sql/sql_class.h index 98a76dce1fb..459ff9dc8b6 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -652,6 +652,7 @@ typedef struct system_variables #ifdef WITH_WSREP my_bool wsrep_on; my_bool wsrep_causal_reads; + uint wsrep_sync_wait; ulong wsrep_retry_autocommit; #endif double long_query_time_double; diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index aab655275b2..380573ca224 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -2766,7 +2766,7 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_STATUS: { #ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; + if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; #endif /* WITH_WSREP */ execute_show_status(thd, all_tables); break; @@ -2801,7 +2801,7 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_STATUS_PROC: case SQLCOM_SHOW_STATUS_FUNC: #ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; + if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; #endif /* WITH_WSREP */ case SQLCOM_SHOW_DATABASES: @@ -2825,7 +2825,7 @@ mysql_execute_command(THD *thd) case SQLCOM_SHOW_INDEX_STATS: case SQLCOM_SELECT: #ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; + if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; case SQLCOM_SHOW_VARIABLES: case SQLCOM_SHOW_CHARSETS: case SQLCOM_SHOW_COLLATIONS: @@ -3516,7 +3516,7 @@ end_with_restore_list: #else { #ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; + if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; #endif /* WITH_WSREP */ /* @@ -3582,7 +3582,7 @@ end_with_restore_list: { DBUG_ASSERT(first_table == all_tables && first_table != 0); #ifdef WITH_WSREP - if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) goto error; + if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) goto error; #endif /* WITH_WSREP */ if (check_table_access(thd, SELECT_ACL, all_tables, @@ -3593,6 +3593,10 @@ end_with_restore_list: break; } case SQLCOM_UPDATE: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && + wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error; +#endif /* WITH_WSREP */ { ha_rows found= 0, updated= 0; DBUG_ASSERT(first_table == all_tables && first_table != 0); @@ -3632,6 +3636,10 @@ end_with_restore_list: /* if we switched from normal update, rights are checked */ if (up_result != 2) { +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && + wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error; +#endif /* WITH_WSREP */ if ((res= multi_update_precheck(thd, all_tables))) break; } @@ -3701,6 +3709,10 @@ end_with_restore_list: break; } case SQLCOM_REPLACE: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && + wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) goto error; +#endif /* WITH_WSREP */ #ifndef DBUG_OFF if (mysql_bin_log.is_open()) { @@ -3736,6 +3748,10 @@ end_with_restore_list: } #endif case SQLCOM_INSERT: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && + wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) goto error; +#endif /* WITH_WSREP */ { DBUG_ASSERT(first_table == all_tables && first_table != 0); @@ -3789,6 +3805,10 @@ end_with_restore_list: } case SQLCOM_REPLACE_SELECT: case SQLCOM_INSERT_SELECT: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && + wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE)) goto error; +#endif /* WITH_WSREP */ { select_result *sel_result; bool explain= MY_TEST(lex->describe); @@ -3890,6 +3910,10 @@ end_with_restore_list: break; } case SQLCOM_DELETE: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && + wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error; +#endif /* WITH_WSREP */ { select_result *sel_result=lex->result; DBUG_ASSERT(first_table == all_tables && first_table != 0); @@ -3910,6 +3934,10 @@ end_with_restore_list: break; } case SQLCOM_DELETE_MULTI: +#ifdef WITH_WSREP + if (WSREP_CLIENT(thd) && + wsrep_sync_wait(thd, WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE)) goto error; +#endif /* WITH_WSREP */ { DBUG_ASSERT(first_table == all_tables && first_table != 0); TABLE_LIST *aux_tables= thd->lex->auxiliary_table_list.first; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 1ce485bc53f..895e4b2a002 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4664,9 +4664,21 @@ static Sys_var_mybool Sys_wsrep_certify_nonPK( CMD_LINE(OPT_ARG), DEFAULT(TRUE)); static Sys_var_mybool Sys_wsrep_causal_reads( - "wsrep_causal_reads", "Enable \"strictly synchronous\" semantics for read operations", + "wsrep_causal_reads", "(DEPRECATED) setting this variable is equivalent to setting wsrep_sync_wait READ flag", SESSION_VAR(wsrep_causal_reads), - CMD_LINE(OPT_ARG), DEFAULT(FALSE)); + CMD_LINE(OPT_ARG), DEFAULT(FALSE), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(wsrep_causal_reads_update)); + +static Sys_var_uint Sys_wsrep_sync_wait( + "wsrep_sync_wait", "Ensure \"synchronous\" read view before executing an operation of the type specified by bitmask: 1 - READ(includes SELECT, SHOW and BEGIN/START TRANSACTION); 2 - UPDATE and DELETE; 4 - INSERT and REPLACE", + SESSION_VAR(wsrep_sync_wait), + CMD_LINE(OPT_ARG), + VALID_RANGE(WSREP_SYNC_WAIT_NONE, WSREP_SYNC_WAIT_MAX), + DEFAULT(WSREP_SYNC_WAIT_NONE), + BLOCK_SIZE(1), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0), + ON_UPDATE(wsrep_sync_wait_update)); static const char *wsrep_OSU_method_names[]= { "TOI", "RSU", NullS }; static Sys_var_enum Sys_wsrep_OSU_method( diff --git a/sql/transaction.cc b/sql/transaction.cc index 72a8c47cdd8..8418ef304b0 100644 --- a/sql/transaction.cc +++ b/sql/transaction.cc @@ -192,7 +192,7 @@ bool trans_begin(THD *thd, uint flags) #ifdef WITH_WSREP thd->wsrep_PA_safe= true; - if (WSREP_CLIENT(thd) && wsrep_causal_wait(thd)) + if (WSREP_CLIENT(thd) && wsrep_sync_wait(thd)) DBUG_RETURN(TRUE); #endif /* WITH_WSREP */ diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 79103ae8e71..38b6f4dfe59 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -879,13 +879,15 @@ bool wsrep_start_replication() return true; } -bool -wsrep_causal_wait (THD* thd) +bool wsrep_sync_wait (THD* thd, uint mask) { - if (thd->variables.wsrep_causal_reads && thd->variables.wsrep_on && + if ((thd->variables.wsrep_sync_wait & mask) && + thd->variables.wsrep_on && !thd->in_active_multi_stmt_transaction() && thd->wsrep_conflict_state != REPLAYING) { + WSREP_DEBUG("wsrep_sync_wait: thd->variables.wsrep_sync_wait = %u, mask = %u", + thd->variables.wsrep_sync_wait, mask); // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // TODO: modify to check if thd has locked any rows. wsrep_gtid_t gtid; @@ -909,7 +911,7 @@ wsrep_causal_wait (THD* thd) err= ER_NOT_SUPPORTED_YET; break; default: - msg= "Causal wait failed."; + msg= "Synchronous wait failed."; err= ER_LOCK_WAIT_TIMEOUT; // NOTE: the above msg won't be displayed // with ER_LOCK_WAIT_TIMEOUT } diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index 796f1aac0f1..2e226be0318 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -102,6 +102,14 @@ extern my_bool wsrep_slave_FK_checks; extern my_bool wsrep_slave_UK_checks; enum enum_wsrep_OSU_method { WSREP_OSU_TOI, WSREP_OSU_RSU }; +enum enum_wsrep_sync_wait { + WSREP_SYNC_WAIT_NONE = 0x0, + // show, select, begin + WSREP_SYNC_WAIT_BEFORE_READ = 0x1, + WSREP_SYNC_WAIT_BEFORE_UPDATE_DELETE = 0x2, + WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE = 0x4, + WSREP_SYNC_WAIT_MAX = 0x7 +}; // MySQL status variables extern my_bool wsrep_connected; @@ -175,9 +183,10 @@ extern void wsrep_kill_mysql(THD *thd); /* new defines */ extern void wsrep_stop_replication(THD *thd); extern bool wsrep_start_replication(); -extern bool wsrep_causal_wait(THD* thd); +extern bool wsrep_sync_wait (THD* thd, uint mask = WSREP_SYNC_WAIT_BEFORE_READ); extern int wsrep_check_opts (int argc, char* const* argv); extern void wsrep_prepend_PATH (const char* path); +/* some inline functions are defined in wsrep_mysqld_inl.h */ /* Other global variables */ extern wsrep_seqno_t wsrep_locked_seqno; diff --git a/sql/wsrep_var.cc b/sql/wsrep_var.cc index b965c26c184..2272945535d 100644 --- a/sql/wsrep_var.cc +++ b/sql/wsrep_var.cc @@ -60,11 +60,29 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type) return false; } -void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) +bool wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type) { - if (var_type == OPT_GLOBAL) { - thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; + // global setting should not affect session setting. + // if (var_type == OPT_GLOBAL) { + // thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; + // } + if (thd->variables.wsrep_causal_reads) { + thd->variables.wsrep_sync_wait |= WSREP_SYNC_WAIT_BEFORE_READ; + } else { + thd->variables.wsrep_sync_wait &= ~WSREP_SYNC_WAIT_BEFORE_READ; } + return false; +} + +bool wsrep_sync_wait_update (sys_var* self, THD* thd, enum_var_type var_type) +{ + // global setting should not affect session setting. + // if (var_type == OPT_GLOBAL) { + // thd->variables.wsrep_sync_wait = global_system_variables.wsrep_sync_wait; + // } + thd->variables.wsrep_causal_reads = thd->variables.wsrep_sync_wait & + WSREP_SYNC_WAIT_BEFORE_READ; + return false; } static int wsrep_start_position_verify (const char* start_str) diff --git a/sql/wsrep_var.h b/sql/wsrep_var.h index 2a5e94b6724..524dabfd9c0 100644 --- a/sql/wsrep_var.h +++ b/sql/wsrep_var.h @@ -35,7 +35,8 @@ int wsrep_init_vars(); #define INIT_ARGS (const char* opt) extern bool wsrep_on_update UPDATE_ARGS; -extern void wsrep_causal_reads_update UPDATE_ARGS; +extern bool wsrep_causal_reads_update UPDATE_ARGS; +extern bool wsrep_sync_wait_update UPDATE_ARGS; extern bool wsrep_start_position_check CHECK_ARGS; extern bool wsrep_start_position_update UPDATE_ARGS; extern void wsrep_start_position_init INIT_ARGS;