diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index f0df7742f9a..750613326d2 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -105,13 +105,9 @@ #ifdef WITH_WSREP #include "wsrep_mysqld.h" -#include "rpl_rli.h" static void wsrep_client_rollback(THD *thd); - -extern Format_description_log_event *wsrep_format_desc; - static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, - Parser_state *parser_state); + Parser_state *parser_state); #endif /* WITH_WSREP */ /** @defgroup Runtime_Environment Runtime Environment @@ -606,13 +602,6 @@ bool is_log_table_write_query(enum enum_sql_command command) return (sql_command_flags[command] & CF_WRITE_LOGS_COMMAND) != 0; } -#ifdef WITH_WSREP -bool is_show_query(enum enum_sql_command command) -{ - DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); - return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; -} -#endif void execute_init_command(THD *thd, LEX_STRING *init_command, mysql_rwlock_t *var_lock) { @@ -813,7 +802,7 @@ void do_handle_bootstrap(THD *thd) close_connection(thd, ER_OUT_OF_RESOURCES, 1); #else close_connection(thd, ER_OUT_OF_RESOURCES); -#endif +#endif /* WITH_WSREP */ #endif thd->fatal_error(); goto end; @@ -898,7 +887,7 @@ bool do_command(THD *thd) } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } -#endif +#endif /* WITH_WSREP */ /* indicator of uninitialized lex => normal flow of errors handling (see my_message_sql) @@ -955,12 +944,12 @@ bool do_command(THD *thd) if (thd->wsrep_conflict_state == ABORTING) { while (thd->wsrep_conflict_state == ABORTING) { - mysql_mutex_unlock(&thd->LOCK_wsrep_thd); - my_sleep(1000); - mysql_mutex_lock(&thd->LOCK_wsrep_thd); + mysql_mutex_unlock(&thd->LOCK_wsrep_thd); + my_sleep(1000); + mysql_mutex_lock(&thd->LOCK_wsrep_thd); } thd->store_globals(); - } + } else if (thd->wsrep_conflict_state == ABORTED) { thd->store_globals(); @@ -986,12 +975,12 @@ bool do_command(THD *thd) mysql_mutex_lock(&thd->LOCK_wsrep_thd); if (thd->wsrep_conflict_state == MUST_ABORT) { - DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id)); - wsrep_client_rollback(thd); + DBUG_PRINT("wsrep",("aborted for wsrep rollback: %lu", thd->real_id)); + wsrep_client_rollback(thd); } mysql_mutex_unlock(&thd->LOCK_wsrep_thd); } -#endif +#endif /* WITH_WSREP */ /* Instrument this broken statement as "statement/com/error" */ thd->m_statement_psi= MYSQL_REFINE_STATEMENT(thd->m_statement_psi, com_statement_info[COM_END]. @@ -1048,11 +1037,11 @@ bool do_command(THD *thd) #ifdef WITH_WSREP if (WSREP(thd)) { - /* + /* * bail out if DB snapshot has not been installed. We however, * allow queries "SET" and "SHOW", they are trapped later in execute_command */ - if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && + if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && command != COM_QUERY && command != COM_PING && command != COM_QUIT && @@ -1065,14 +1054,14 @@ bool do_command(THD *thd) command != COM_TIME && command != COM_END ) { - my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), "WSREP has not yet prepared node for application use"); thd->protocol->end_statement(); return_value= FALSE; goto out; } } -#endif +#endif /* WITH_WSREP */ /* Restore read timeout value */ my_net_set_read_timeout(net, thd->variables.net_read_timeout); @@ -1083,7 +1072,7 @@ bool do_command(THD *thd) while (thd->wsrep_conflict_state== RETRY_AUTOCOMMIT) { WSREP_DEBUG("Retry autocommit for: %s\n", thd->wsrep_retry_query); - return_value= dispatch_command(command, thd, thd->wsrep_retry_query, + return_value= dispatch_command(command, thd, thd->wsrep_retry_query, thd->wsrep_retry_query_len); } } @@ -1094,7 +1083,7 @@ bool do_command(THD *thd) thd->wsrep_retry_query_len = 0; thd->wsrep_retry_command = COM_CONNECT; } -#endif +#endif /* WITH_WSREP */ out: /* The statement instrumentation must be closed in all cases. */ DBUG_ASSERT(thd->m_statement_psi == NULL); @@ -1243,7 +1232,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, { wsrep_client_rollback(thd); } - if (thd->wsrep_conflict_state== ABORTED) + if (thd->wsrep_conflict_state== ABORTED) { my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); WSREP_DEBUG("Deadlock error for: %s", thd->query()); @@ -1455,7 +1444,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, wsrep_mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); #else mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); -#endif +#endif /* WITH_WSREP */ while (!thd->killed && (parser_state.m_lip.found_semicolon != NULL) && ! thd->is_error()) @@ -1533,14 +1522,14 @@ bool dispatch_command(enum enum_server_command command, THD *thd, thd->set_time(); /* Reset the query start time. */ #else thd->set_time(); /* Reset the query start time. */ -#endif +#endif /* WITH_WSREP */ parser_state.reset(beginning_of_next_stmt, length); /* TODO: set thd->lex->sql_command to SQLCOM_END here */ #ifdef WITH_WSREP wsrep_mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); #else mysql_parse(thd, beginning_of_next_stmt, length, &parser_state); -#endif +#endif /* WITH_WSREP */ } DBUG_PRINT("info",("query ready")); @@ -2345,6 +2334,13 @@ err: return TRUE; } +#ifdef WITH_WSREP +static bool wsrep_is_show_query(enum enum_sql_command command) +{ + DBUG_ASSERT(command >= 0 && command <= SQLCOM_END); + return (sql_command_flags[command] & CF_STATUS_COMMAND) != 0; +} +#endif /* WITH_WSREP */ /** Execute command saved in thd and lex->sql_command. @@ -2575,7 +2571,7 @@ mysql_execute_command(THD *thd) } } } - if (lex->sql_command== SQLCOM_UNLOCK_TABLES && + if (lex->sql_command== SQLCOM_UNLOCK_TABLES && thd->wsrep_converted_lock_session) { thd->wsrep_converted_lock_session= false; @@ -2583,13 +2579,13 @@ mysql_execute_command(THD *thd) lex->tx_release= TVL_NO; } - /* + /* * bail out if DB snapshot has not been installed. We however, * allow SET and SHOW queries */ if (thd->variables.wsrep_on && !thd->wsrep_applier && !wsrep_ready && lex->sql_command != SQLCOM_SET_OPTION && - !is_show_query(lex->sql_command)) + !wsrep_is_show_query(lex->sql_command)) { #if DIRTY_HACK /* Dirty hack for lp:1002714 - trying to recognize mysqldump connection @@ -2604,7 +2600,7 @@ mysql_execute_command(THD *thd) strncmp(thd->query(), mysqldump_magic_str, mysqldump_magic_str_len)) { #endif /* DIRTY_HACK */ - my_error(ER_UNKNOWN_COM_ERROR, MYF(0), + my_error(ER_UNKNOWN_COM_ERROR, MYF(0), "WSREP has not yet prepared node for application use"); goto error; #if DIRTY_HACK @@ -3893,7 +3889,7 @@ end_with_restore_list: #ifdef WITH_WSREP for (TABLE_LIST *table= all_tables; table; table= table->next_global) { - if (!lex->drop_temporary && + if (!lex->drop_temporary && (!thd->is_current_stmt_binlog_format_row() || !find_temporary_table(thd, table))) { @@ -4586,7 +4582,7 @@ end_with_restore_list: lex->insert_list, lex->ha_rkey_mode, select_lex->where, unit->select_limit_cnt, unit->offset_limit_cnt); #ifdef WITH_WSREP - if (WSREP(thd)) thd_proc_info(thd, tmp_info); + if (WSREP(thd)) thd_proc_info(thd, tmp_info); } #endif /* WITH_WSREP */ break; @@ -6575,6 +6571,8 @@ void mysql_init_multi_delete(LEX *lex) } #ifdef WITH_WSREP +static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); +static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*); void wsrep_replay_transaction(THD *thd) { /* checking if BF trx must be replayed */ @@ -6584,7 +6582,7 @@ void wsrep_replay_transaction(THD *thd) { if (thd->get_stmt_da()->is_sent()) { - WSREP_ERROR("replay issue, thd has reported status already"); + WSREP_ERROR("replay issue, thd has reported status already"); } thd->get_stmt_da()->reset_diagnostics_area(); @@ -6596,62 +6594,61 @@ void wsrep_replay_transaction(THD *thd) close_thread_tables(thd); if (thd->locked_tables_mode && thd->lock) { - WSREP_DEBUG("releasing table lock for replaying (%ld)", - thd->thread_id); - thd->locked_tables_list.unlock_locked_tables(thd); - thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); + WSREP_DEBUG("releasing table lock for replaying (%ld)", + thd->thread_id); + thd->locked_tables_list.unlock_locked_tables(thd); + thd->variables.option_bits&= ~(OPTION_TABLE_LOCK); } thd->mdl_context.release_transactional_locks(); thd_proc_info(thd, "wsrep replaying trx"); WSREP_DEBUG("replay trx: %s %lld", - thd->query() ? thd->query() : "void", - (long long)thd->wsrep_trx_seqno); + thd->query() ? thd->query() : "void", + (long long)wsrep_thd_trx_seqno(thd)); struct wsrep_thd_shadow shadow; wsrep_prepare_bf_thd(thd, &shadow); int rcode = wsrep->replay_trx(wsrep, - &thd->wsrep_trx_handle, - (void *)thd); + &thd->wsrep_ws_handle, + (void *)thd); wsrep_return_from_bf_mode(thd, &shadow); if (thd->wsrep_conflict_state!= REPLAYING) - WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); + WSREP_WARN("lost replaying mode: %d", thd->wsrep_conflict_state ); mysql_mutex_lock(&thd->LOCK_wsrep_thd); switch (rcode) { case WSREP_OK: - thd->wsrep_conflict_state= NO_CONFLICT; - wsrep->post_commit(wsrep, &thd->wsrep_trx_handle); - WSREP_DEBUG("trx_replay successful for: %ld %llu", - thd->thread_id, (long long)thd->real_id); - break; + thd->wsrep_conflict_state= NO_CONFLICT; + wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); + WSREP_DEBUG("trx_replay successful for: %ld %llu", + thd->thread_id, (long long)thd->real_id); + break; case WSREP_TRX_FAIL: - if (thd->get_stmt_da()->is_sent()) - { - WSREP_ERROR("replay failed, thd has reported status"); - } - else - { - WSREP_DEBUG("replay failed, rolling back"); - my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); - } - thd->wsrep_conflict_state= ABORTED; - thd->wsrep_bf_thd = NULL; - wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle); - break; + if (thd->stmt_da->is_sent) + { + WSREP_ERROR("replay failed, thd has reported status"); + } + else + { + WSREP_DEBUG("replay failed, rolling back"); + my_error(ER_LOCK_DEADLOCK, MYF(0), "wsrep aborted transaction"); + } + thd->wsrep_conflict_state= ABORTED; + wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle); + break; default: - WSREP_ERROR("trx_replay failed for: %d, query: %s", - rcode, thd->query() ? thd->query() : "void"); - /* we're now in inconsistent state, must abort */ - unireg_abort(1); - break; + WSREP_ERROR("trx_replay failed for: %d, query: %s", + rcode, thd->query() ? thd->query() : "void"); + /* we're now in inconsistent state, must abort */ + unireg_abort(1); + break; } mysql_mutex_lock(&LOCK_wsrep_replaying); wsrep_replaying--; WSREP_DEBUG("replaying decreased: %d, thd: %lu", - wsrep_replaying, thd->thread_id); + wsrep_replaying, thd->thread_id); mysql_cond_broadcast(&COND_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying); } @@ -6691,7 +6688,7 @@ static void wsrep_mysql_parse(THD *thd, char *rawbuf, uint length, if (thd->wsrep_conflict_state== MUST_REPLAY) { - wsrep_replay_transaction(thd); + wsrep_replay_transaction(thd); } /* setting error code for BF aborted trxs */ @@ -7772,7 +7769,7 @@ uint kill_one_thread(THD *thd, ulong id, killed_state kill_signal) #ifdef WITH_WSREP if (((thd->security_ctx->master_access & SUPER_ACL) || thd->security_ctx->user_matches(tmp->security_ctx)) && - !wsrep_thd_is_brute_force((void *)tmp)) + !wsrep_thd_is_brute_force((void *)tmp)) #else if ((thd->security_ctx->master_access & SUPER_ACL) || thd->security_ctx->user_matches(tmp->security_ctx)) @@ -8985,7 +8982,7 @@ Relay_log_info* wsrep_relay_log_init(const char* log_fname) return rli; } -void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) +static void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) { shadow->options = thd->variables.option_bits; shadow->wsrep_exec_mode = thd->wsrep_exec_mode; @@ -9009,7 +9006,7 @@ void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow* shadow) thd->tx_isolation = ISO_READ_COMMITTED; } -void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) +static void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow* shadow) { thd->variables.option_bits = shadow->options; thd->wsrep_exec_mode = shadow->wsrep_exec_mode; @@ -9121,7 +9118,7 @@ void wsrep_rollback_process(THD *thd) mysql_mutex_lock(&aborting->LOCK_wsrep_thd); if (aborting->wsrep_conflict_state== ABORTED) { - WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", + WSREP_DEBUG("WSREP, thd already aborted: %llu state: %d", (long long)aborting->real_id, aborting->wsrep_conflict_state); @@ -9157,7 +9154,7 @@ int wsrep_thd_is_brute_force(void *thd_ptr) { if (thd_ptr) { switch (((THD *)thd_ptr)->wsrep_exec_mode) { - case LOCAL_STATE: + case LOCAL_STATE: { if (((THD *)thd_ptr)->wsrep_conflict_state== REPLAYING) { @@ -9180,7 +9177,7 @@ int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal) DBUG_ENTER("wsrep_abort_thd"); if ( (WSREP(bf_thd) || - ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && + ( (WSREP_ON || wsrep_OSU_method_options == WSREP_OSU_RSU) && bf_thd->wsrep_exec_mode == TOTAL_ORDER) ) && victim_thd) { diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index df33aa178e5..0df368c562d 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -322,8 +322,6 @@ extern int wsrep_thd_is_brute_force(void *thd_ptr); extern "C" int wsrep_abort_thd(void *bf_thd_ptr, void *victim_thd_ptr, my_bool signal); extern "C" int wsrep_thd_in_locking_session(void *thd_ptr); -void *wsrep_prepare_bf_thd(THD *thd); -void wsrep_return_from_bf_mode(void *shadow, THD *thd); /* this is visible for client build so that innodb plugin gets this */ typedef struct wsrep_aborting_thd { @@ -376,8 +374,6 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_, const TABLE_LIST* table_list); void wsrep_to_isolation_end(THD *thd); -void wsrep_prepare_bf_thd(THD *thd, struct wsrep_thd_shadow*); -void wsrep_return_from_bf_mode(THD *thd, struct wsrep_thd_shadow*); int wsrep_to_buf_helper( THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len); int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len);