Merges from lp:codership-mysql/5.5 up to rev #3893, this changes to wsrep API #24

This commit is contained in:
Seppo Jaakola 2013-11-26 16:48:30 +02:00
parent 2b4183f10b
commit a2594e96f7
28 changed files with 1099 additions and 701 deletions

View File

@ -50,6 +50,8 @@ IF(WITH_WSREP)
wsrep_sst.cc wsrep_sst.cc
wsrep_utils.cc wsrep_utils.cc
wsrep_var.cc wsrep_var.cc
wsrep_binlog.cc
wsrep_applier.cc
wsrep_thd.cc wsrep_thd.cc
) )
SET(WSREP_LIB wsrep) SET(WSREP_LIB wsrep)

View File

@ -1169,7 +1169,7 @@ end:
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len) int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len)
{ {
String log_query; String log_query;

View File

@ -596,7 +596,7 @@ void thd_binlog_rollback_stmt(THD * thd)
with the exception that here we write in buffer instead of log file. with the exception that here we write in buffer instead of log file.
*/ */
int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len) int wsrep_write_cache(IO_CACHE *cache, uchar **buf, int *buf_len)
{ {
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
@ -6017,21 +6017,27 @@ err:
if (WSREP(thd) && wsrep_incremental_data_collection && if (WSREP(thd) && wsrep_incremental_data_collection &&
(wsrep_emulate_bin_log || mysql_bin_log.is_open())) (wsrep_emulate_bin_log || mysql_bin_log.is_open()))
{ {
DBUG_ASSERT(thd->wsrep_trx_handle.trx_id != (unsigned long)-1); DBUG_ASSERT(thd->wsrep_ws_handle.trx_id != (unsigned long)-1);
if (!error) if (!error)
{ {
IO_CACHE* cache= get_trans_log(thd); IO_CACHE* cache= get_trans_log(thd);
uchar* buf= NULL; uchar* buf= NULL;
uint buf_len= 0; int buf_len= 0;
if (wsrep_emulate_bin_log) if (wsrep_emulate_bin_log)
thd->binlog_flush_pending_rows_event(false); thd->binlog_flush_pending_rows_event(false);
error= wsrep_write_cache(cache, &buf, &buf_len); error= wsrep_write_cache(cache, &buf, &buf_len);
if (!error && buf_len > 0) if (!error && buf_len > 0)
{ {
const struct wsrep_buf buff = { buf, buf_len };
const bool nocopy(false);
const bool unordered(false);
wsrep_status_t rc= wsrep->append_data(wsrep, wsrep_status_t rc= wsrep->append_data(wsrep,
&thd->wsrep_trx_handle, &thd->wsrep_ws_handle,
buf, buf_len); &buff, 1, WSREP_DATA_ORDERED,
true);
if (rc != WSREP_OK) if (rc != WSREP_OK)
{ {
sql_print_warning("WSREP: append_data() returned %d", rc); sql_print_warning("WSREP: append_data() returned %d", rc);

View File

@ -287,12 +287,6 @@ enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED };
(mmap+fsync is two times faster than write+fsync) (mmap+fsync is two times faster than write+fsync)
*/ */
#ifdef WITH_WSREP
extern my_bool wsrep_emulate_bin_log;
Log_event* wsrep_read_log_event(
char **arg_buf, size_t *arg_buf_len,
const Format_description_log_event *description_event);
#endif
class MYSQL_LOG class MYSQL_LOG
{ {
public: public:
@ -974,7 +968,6 @@ bool wsrep_trans_cache_is_empty(THD *thd);
void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end); void thd_binlog_flush_pending_rows_event(THD *thd, bool stmt_end);
void thd_binlog_trx_reset(THD * thd); void thd_binlog_trx_reset(THD * thd);
void thd_binlog_rollback_stmt(THD * thd); void thd_binlog_rollback_stmt(THD * thd);
int wsrep_write_cache(IO_CACHE *cache, uchar **buf, uint *buf_len);
#define WSREP_FORMAT(my_format) \ #define WSREP_FORMAT(my_format) \
((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \ ((wsrep_forced_binlog_format != BINLOG_FORMAT_UNSPEC) ? \

View File

@ -9211,7 +9211,7 @@ int Rows_log_event::do_apply_event(Relay_log_info const *rli)
thd->is_fatal_error, thd->is_fatal_error,
thd->wsrep_exec_mode, thd->wsrep_exec_mode,
thd->wsrep_conflict_state, thd->wsrep_conflict_state,
(long long)thd->wsrep_trx_seqno); (long long)wsrep_thd_trx_seqno(thd));
} }
#endif #endif
if (thd->is_slave_error || thd->is_fatal_error) if (thd->is_slave_error || thd->is_fatal_error)
@ -10976,7 +10976,7 @@ Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64]; char info[64];
info[sizeof(info) - 1] = '\0'; info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)", snprintf(info, sizeof(info) - 1, "Write_rows_log_event::write_row(%lld)",
(long long) thd->wsrep_trx_seqno); (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else #else
const char* tmp = (WSREP(thd)) ? const char* tmp = (WSREP(thd)) ?
@ -11659,7 +11659,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64]; char info[64];
info[sizeof(info) - 1] = '\0'; info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)", snprintf(info, sizeof(info) - 1, "Delete_rows_log_event::find_row(%lld)",
(long long) thd->wsrep_trx_seqno); (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else #else
const char* tmp = (WSREP(thd)) ? const char* tmp = (WSREP(thd)) ?
@ -11675,7 +11675,7 @@ int Delete_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1, snprintf(info, sizeof(info) - 1,
"Delete_rows_log_event::ha_delete_row(%lld)", "Delete_rows_log_event::ha_delete_row(%lld)",
(long long) thd->wsrep_trx_seqno); (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info); if (WSREP(thd)) thd_proc_info(thd, info);
#else #else
if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()"); if (WSREP(thd)) thd_proc_info(thd,"Delete_rows_log_event::ha_delete_row()");
@ -11809,7 +11809,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
char info[64]; char info[64];
info[sizeof(info) - 1] = '\0'; info[sizeof(info) - 1] = '\0';
snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)", snprintf(info, sizeof(info) - 1, "Update_rows_log_event::find_row(%lld)",
(long long) thd->wsrep_trx_seqno); (long long) wsrep_thd_trx_seqno(thd));
const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL; const char* tmp = (WSREP(thd)) ? thd_proc_info(thd, info) : NULL;
#else #else
const char* tmp = (WSREP(thd)) ? const char* tmp = (WSREP(thd)) ?
@ -11846,7 +11846,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1, snprintf(info, sizeof(info) - 1,
"Update_rows_log_event::unpack_current_row(%lld)", "Update_rows_log_event::unpack_current_row(%lld)",
(long long) thd->wsrep_trx_seqno); (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info); if (WSREP(thd)) thd_proc_info(thd, info);
#else #else
if (WSREP(thd)) if (WSREP(thd))
@ -11875,7 +11875,7 @@ Update_rows_log_event::do_exec_row(const Relay_log_info *const rli)
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
snprintf(info, sizeof(info) - 1, snprintf(info, sizeof(info) - 1,
"Update_rows_log_event::ha_update_row(%lld)", "Update_rows_log_event::ha_update_row(%lld)",
(long long) thd->wsrep_trx_seqno); (long long) wsrep_thd_trx_seqno(thd));
if (WSREP(thd)) thd_proc_info(thd, info); if (WSREP(thd)) thd_proc_info(thd, info);
#else #else
if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()"); if (WSREP(thd)) thd_proc_info(thd,"Update_rows_log_event::ha_update_row()");

View File

@ -359,7 +359,11 @@ static char *default_character_set_name;
static char *character_set_filesystem_name; static char *character_set_filesystem_name;
static char *lc_messages; static char *lc_messages;
static char *lc_time_names_name; static char *lc_time_names_name;
#ifndef WITH_WSREP
static char *my_bind_addr_str; static char *my_bind_addr_str;
#else
char *my_bind_addr_str;
#endif /* WITH_WSREP */
static char *default_collation_name; static char *default_collation_name;
char *default_storage_engine; char *default_storage_engine;
static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME; static char compiled_default_collation_name[]= MYSQL_DEFAULT_COLLATION_NAME;

View File

@ -2255,7 +2255,7 @@ sp_load_for_information_schema(THD *thd, TABLE *proc_table, String *db,
return sp; return sp;
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len) int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len)
{ {
String log_query; String log_query;
sp_head *sp = thd->lex->sphead; sp_head *sp = thd->lex->sphead;

View File

@ -869,9 +869,9 @@ extern "C" const char *wsrep_thd_conflict_state_str(THD *thd)
(thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void"; (thd->wsrep_conflict_state == CERT_FAILURE) ? "cert failure" : "void";
} }
extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd) extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd)
{ {
return &thd->wsrep_trx_handle; return &thd->wsrep_ws_handle;
} }
extern "C" void wsrep_thd_LOCK(THD *thd) extern "C" void wsrep_thd_LOCK(THD *thd)
@ -896,7 +896,7 @@ extern "C" my_thread_id wsrep_thd_thread_id(THD *thd)
} }
extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd) extern "C" wsrep_seqno_t wsrep_thd_trx_seqno(THD *thd)
{ {
return (thd) ? thd->wsrep_trx_seqno : -1; return (thd) ? thd->wsrep_trx_meta.gtid.seqno : -1;
} }
extern "C" query_id_t wsrep_thd_query_id(THD *thd) extern "C" query_id_t wsrep_thd_query_id(THD *thd)
{ {
@ -918,7 +918,6 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal)
{ {
if (signal) if (signal)
{ {
thd->wsrep_bf_thd = bf_thd;
mysql_mutex_lock(&thd->LOCK_thd_data); mysql_mutex_lock(&thd->LOCK_thd_data);
thd->awake(KILL_QUERY); thd->awake(KILL_QUERY);
mysql_mutex_unlock(&thd->LOCK_thd_data); mysql_mutex_unlock(&thd->LOCK_thd_data);
@ -934,15 +933,15 @@ extern "C" void wsrep_thd_awake(THD* bf_thd, THD *thd, my_bool signal)
extern int extern int
wsrep_trx_order_before(void *thd1, void *thd2) wsrep_trx_order_before(void *thd1, void *thd2)
{ {
if (((THD*)thd1)->wsrep_trx_seqno < ((THD*)thd2)->wsrep_trx_seqno) { if (wsrep_thd_trx_seqno((THD*)thd1) < wsrep_thd_trx_seqno((THD*)thd2)) {
WSREP_DEBUG("BF conflict, order: %lld %lld\n", WSREP_DEBUG("BF conflict, order: %lld %lld\n",
(long long)((THD*)thd1)->wsrep_trx_seqno, (long long)wsrep_thd_trx_seqno((THD*)thd1),
(long long)((THD*)thd2)->wsrep_trx_seqno); (long long)wsrep_thd_trx_seqno((THD*)thd2));
return 1; return 1;
} }
WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n", WSREP_DEBUG("waiting for BF, trx order: %lld %lld\n",
(long long)((THD*)thd1)->wsrep_trx_seqno, (long long)wsrep_thd_trx_seqno((THD*)thd1),
(long long)((THD*)thd2)->wsrep_trx_seqno); (long long)wsrep_thd_trx_seqno((THD*)thd2));
return 0; return 0;
} }
extern "C" int extern "C" int
@ -1142,9 +1141,8 @@ THD::THD()
#ifdef WITH_WSREP #ifdef WITH_WSREP
mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_LOCK_wsrep_thd, &LOCK_wsrep_thd, MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL); mysql_cond_init(key_COND_wsrep_thd, &COND_wsrep_thd, NULL);
wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID; wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
wsrep_trx_handle.opaque = NULL; wsrep_ws_handle.opaque = NULL;
//wsrep_retry_autocommit= ::wsrep_retry_autocommit;
wsrep_retry_counter = 0; wsrep_retry_counter = 0;
wsrep_PA_safe = true; wsrep_PA_safe = true;
wsrep_seqno_changed = false; wsrep_seqno_changed = false;
@ -1154,7 +1152,6 @@ THD::THD()
wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_consistency_check = NO_CONSISTENCY_CHECK;
wsrep_status_vars = 0; wsrep_status_vars = 0;
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_bf_thd = NULL;
wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0; wsrep_TOI_pre_query_len = 0;
#endif #endif
@ -1549,7 +1546,8 @@ void THD::init(void)
wsrep_conflict_state= NO_CONFLICT; wsrep_conflict_state= NO_CONFLICT;
wsrep_query_state= QUERY_IDLE; wsrep_query_state= QUERY_IDLE;
wsrep_last_query_id= 0; wsrep_last_query_id= 0;
wsrep_trx_seqno= 0; wsrep_trx_meta.gtid= WSREP_GTID_UNDEFINED;
wsrep_trx_meta.depends_on= WSREP_SEQNO_UNDEFINED;
wsrep_converted_lock_session= false; wsrep_converted_lock_session= false;
wsrep_retry_counter= 0; wsrep_retry_counter= 0;
wsrep_rli= NULL; wsrep_rli= NULL;
@ -1557,7 +1555,7 @@ void THD::init(void)
wsrep_seqno_changed= false; wsrep_seqno_changed= false;
wsrep_consistency_check = NO_CONSISTENCY_CHECK; wsrep_consistency_check = NO_CONSISTENCY_CHECK;
wsrep_mysql_replicated = 0; wsrep_mysql_replicated = 0;
wsrep_bf_thd = NULL;
wsrep_TOI_pre_query = NULL; wsrep_TOI_pre_query = NULL;
wsrep_TOI_pre_query_len = 0; wsrep_TOI_pre_query_len = 0;
#endif #endif
@ -1941,7 +1939,7 @@ void THD::awake(killed_state state_to_set)
/* Interrupt target waiting inside a storage engine. */ /* Interrupt target waiting inside a storage engine. */
if (state_to_set != NOT_KILLED) if (state_to_set != NOT_KILLED)
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (!wsrep_bf_thd || wsrep_bf_thd->wsrep_exec_mode == LOCAL_STATE) /* TODO: prevent applier close here */
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
ha_kill_query(this, thd_kill_level(this)); ha_kill_query(this, thd_kill_level(this));

View File

@ -2553,11 +2553,13 @@ public:
enum wsrep_conflict_state wsrep_conflict_state; enum wsrep_conflict_state wsrep_conflict_state;
mysql_mutex_t LOCK_wsrep_thd; mysql_mutex_t LOCK_wsrep_thd;
mysql_cond_t COND_wsrep_thd; mysql_cond_t COND_wsrep_thd;
wsrep_seqno_t wsrep_trx_seqno; // changed from wsrep_seqno_t to wsrep_trx_meta_t in wsrep API rev 75
// wsrep_seqno_t wsrep_trx_seqno;
wsrep_trx_meta_t wsrep_trx_meta;
uint32 wsrep_rand; uint32 wsrep_rand;
Relay_log_info* wsrep_rli; Relay_log_info* wsrep_rli;
bool wsrep_converted_lock_session; bool wsrep_converted_lock_session;
wsrep_trx_handle_t wsrep_trx_handle; wsrep_ws_handle_t wsrep_ws_handle;
bool wsrep_seqno_changed; bool wsrep_seqno_changed;
#ifdef WSREP_PROC_INFO #ifdef WSREP_PROC_INFO
char wsrep_info[128]; /* string for dynamic proc info */ char wsrep_info[128]; /* string for dynamic proc info */
@ -2571,7 +2573,6 @@ public:
wsrep_consistency_check; wsrep_consistency_check;
wsrep_stats_var* wsrep_status_vars; wsrep_stats_var* wsrep_status_vars;
int wsrep_mysql_replicated; int wsrep_mysql_replicated;
THD* wsrep_bf_thd;
const char* wsrep_TOI_pre_query; /* a query to apply before const char* wsrep_TOI_pre_query; /* a query to apply before
the actual TOI query */ the actual TOI query */
size_t wsrep_TOI_pre_query_len; size_t wsrep_TOI_pre_query_len;

View File

@ -953,7 +953,6 @@ bool do_command(THD *thd)
else if (thd->wsrep_conflict_state == ABORTED) else if (thd->wsrep_conflict_state == ABORTED)
{ {
thd->store_globals(); thd->store_globals();
thd->wsrep_bf_thd = NULL;
} }
thd->wsrep_query_state= QUERY_EXEC; thd->wsrep_query_state= QUERY_EXEC;
@ -1241,7 +1240,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
thd->mysys_var->abort = 0; thd->mysys_var->abort = 0;
thd->wsrep_conflict_state = NO_CONFLICT; thd->wsrep_conflict_state = NO_CONFLICT;
thd->wsrep_retry_counter = 0; thd->wsrep_retry_counter = 0;
thd->wsrep_bf_thd = NULL;
/* /*
Increment threads running to compensate dec_thread_running() called Increment threads running to compensate dec_thread_running() called
after dispatch_end label. after dispatch_end label.

View File

@ -2484,7 +2484,7 @@ bool load_table_name_for_trigger(THD *thd,
DBUG_RETURN(FALSE); DBUG_RETURN(FALSE);
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len) int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len)
{ {
LEX *lex= thd->lex; LEX *lex= thd->lex;
String stmt_query; String stmt_query;

View File

@ -4065,6 +4065,7 @@ static Sys_var_tz Sys_time_zone(
#ifdef WITH_WSREP #ifdef WITH_WSREP
#include "wsrep_var.h" #include "wsrep_var.h"
#include "wsrep_sst.h" #include "wsrep_sst.h"
#include "wsrep_binlog.h"
static Sys_var_charptr Sys_wsrep_provider( static Sys_var_charptr Sys_wsrep_provider(
"wsrep_provider", "Path to replication provider library", "wsrep_provider", "Path to replication provider library",
@ -4222,10 +4223,11 @@ static Sys_var_charptr Sys_wsrep_start_position (
ON_CHECK(wsrep_start_position_check), ON_CHECK(wsrep_start_position_check),
ON_UPDATE(wsrep_start_position_update)); ON_UPDATE(wsrep_start_position_update));
static Sys_var_ulonglong Sys_wsrep_max_ws_size ( static Sys_var_ulong Sys_wsrep_max_ws_size (
"wsrep_max_ws_size", "Max write set size (bytes)", "wsrep_max_ws_size", "Max write set size (bytes)",
GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG), GLOBAL_VAR(wsrep_max_ws_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1024, 4294967296ULL), DEFAULT(1073741824ULL), BLOCK_SIZE(1)); /* Upper limit is 65K short of 4G to avoid overlows on 32-bit systems */
VALID_RANGE(1024, WSREP_MAX_WS_SIZE), DEFAULT(1073741824UL), BLOCK_SIZE(1));
static Sys_var_ulong Sys_wsrep_max_ws_rows ( static Sys_var_ulong Sys_wsrep_max_ws_rows (
"wsrep_max_ws_rows", "Max number of rows in write set", "wsrep_max_ws_rows", "Max number of rows in write set",

49
sql/wsrep_binlog.h Normal file
View File

@ -0,0 +1,49 @@
/* Copyright (C) 2013 Codership Oy <info@codership.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
#ifndef WSREP_BINLOG_H
#define WSREP_BINLOG_H
#include "sql_class.h" // THD, IO_CACHE
#define HEAP_PAGE_SIZE 65536 /* 64K */
#define WSREP_MAX_WS_SIZE (0xFFFFFFFFUL - HEAP_PAGE_SIZE)
/*
Write the contents of a cache to a memory buffer.
This function quite the same as MYSQL_BIN_LOG::write_cache(),
with the exception that here we write in buffer instead of log file.
*/
int wsrep_write_cache_buf(IO_CACHE *cache, uchar **buf, size_t *buf_len);
/*
Write the contents of a cache to wsrep provider.
This function quite the same as MYSQL_BIN_LOG::write_cache(),
with the exception that here we write in buffer instead of log file.
@param len total amount of data written
@return wsrep error status
*/
int wsrep_write_cache (wsrep_t* wsrep,
THD* thd,
IO_CACHE* cache,
size_t* len);
/* Dump replication buffer to disk */
void wsrep_dump_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
#endif /* WSREP_BINLOG_H */

View File

@ -18,7 +18,7 @@
#include "rpl_filter.h" #include "rpl_filter.h"
#include <sql_class.h> #include <sql_class.h>
#include "wsrep_mysqld.h" #include "wsrep_mysqld.h"
#include "wsrep_priv.h" #include "wsrep_binlog.h"
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
@ -29,7 +29,8 @@ extern ulonglong thd_to_trx_id(THD *thd);
extern "C" int thd_binlog_format(const MYSQL_THD thd); extern "C" int thd_binlog_format(const MYSQL_THD thd);
// todo: share interface with ha_innodb.c // todo: share interface with ha_innodb.c
enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all); enum wsrep_trx_status wsrep_run_wsrep_commit(THD *thd, handlerton *hton,
bool all);
/* /*
a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct. a post-commit cleanup on behalf of wsrep. Can't be a part of hton struct.
@ -45,7 +46,7 @@ void wsrep_cleanup_transaction(THD *thd)
{ {
if (thd->wsrep_seqno_changed) if (thd->wsrep_seqno_changed)
{ {
if (wsrep->post_commit(wsrep, &thd->wsrep_trx_handle)) if (wsrep->post_commit(wsrep, &thd->wsrep_ws_handle))
{ {
DBUG_PRINT("wsrep", ("set committed fail")); DBUG_PRINT("wsrep", ("set committed fail"));
WSREP_WARN("set committed fail: %llu %d", WSREP_WARN("set committed fail: %llu %d",
@ -59,7 +60,7 @@ void wsrep_cleanup_transaction(THD *thd)
} }
thd->wsrep_exec_mode= LOCAL_STATE; thd->wsrep_exec_mode= LOCAL_STATE;
} }
thd->wsrep_trx_handle.trx_id = WSREP_UNDEFINED_TRX_ID; thd->wsrep_ws_handle.trx_id = WSREP_UNDEFINED_TRX_ID;
} }
/* /*
@ -156,7 +157,7 @@ static int wsrep_rollback(handlerton *hton, THD *thd, bool all)
if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) && if ((all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) &&
(thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY)) (thd->variables.wsrep_on && thd->wsrep_conflict_state != MUST_REPLAY))
{ {
if (wsrep->post_rollback(wsrep, &thd->wsrep_trx_handle)) if (wsrep->post_rollback(wsrep, &thd->wsrep_ws_handle))
{ {
DBUG_PRINT("wsrep", ("setting rollback fail")); DBUG_PRINT("wsrep", ("setting rollback fail"));
WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s", WSREP_ERROR("settting rollback fail: thd: %llu SQL: %s",
@ -183,14 +184,12 @@ int wsrep_commit(handlerton *hton, THD *thd, bool all)
extern Rpl_filter* binlog_filter; extern Rpl_filter* binlog_filter;
extern my_bool opt_log_slave_updates; extern my_bool opt_log_slave_updates;
extern void wsrep_write_rbr_buf(THD *thd, const void* rbr_buf, size_t buf_len);
enum wsrep_trx_status enum wsrep_trx_status
wsrep_run_wsrep_commit( wsrep_run_wsrep_commit(THD *thd, handlerton *hton, bool all)
THD *thd, handlerton *hton, bool all)
{ {
int rcode= -1; int rcode= -1;
uint data_len = 0; int data_len = 0;
uchar *rbr_data = NULL;
IO_CACHE *cache; IO_CACHE *cache;
int replay_round= 0; int replay_round= 0;
@ -200,9 +199,9 @@ wsrep_run_wsrep_commit(
} }
DBUG_ENTER("wsrep_run_wsrep_commit"); DBUG_ENTER("wsrep_run_wsrep_commit");
if (thd->slave_thread && !opt_log_slave_updates) {
DBUG_RETURN(WSREP_TRX_OK); if (thd->slave_thread && !opt_log_slave_updates) DBUG_RETURN(WSREP_TRX_OK);
}
if (thd->wsrep_exec_mode == REPL_RECV) { if (thd->wsrep_exec_mode == REPL_RECV) {
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
@ -220,9 +219,9 @@ wsrep_run_wsrep_commit(
} }
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
} }
if (thd->wsrep_exec_mode != LOCAL_STATE) {
DBUG_RETURN(WSREP_TRX_OK); if (thd->wsrep_exec_mode != LOCAL_STATE) DBUG_RETURN(WSREP_TRX_OK);
}
if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) { if (thd->wsrep_consistency_check == CONSISTENCY_CHECK_RUNNING) {
WSREP_DEBUG("commit for consistency check: %s", thd->query()); WSREP_DEBUG("commit for consistency check: %s", thd->query());
DBUG_RETURN(WSREP_TRX_OK); DBUG_RETURN(WSREP_TRX_OK);
@ -246,7 +245,6 @@ wsrep_run_wsrep_commit(
while (wsrep_replaying > 0 && while (wsrep_replaying > 0 &&
thd->wsrep_conflict_state == NO_CONFLICT && thd->wsrep_conflict_state == NO_CONFLICT &&
thd->killed == NOT_KILLED &&
!shutdown_in_progress) !shutdown_in_progress)
{ {
@ -265,9 +263,12 @@ wsrep_run_wsrep_commit(
struct timespec wtime = {0, 1000000}; struct timespec wtime = {0, 1000000};
mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying, mysql_cond_timedwait(&COND_wsrep_replaying, &LOCK_wsrep_replaying,
&wtime); &wtime);
if (replay_round++ % 100000 == 0) if (replay_round++ % 100000 == 0)
WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) conflict: %d (round: %d)", WSREP_DEBUG("commit waiting for replaying: replayers %d, thd: (%lu) "
wsrep_replaying, thd->thread_id, thd->wsrep_conflict_state, replay_round); "conflict: %d (round: %d)",
wsrep_replaying, thd->thread_id,
thd->wsrep_conflict_state, replay_round);
mysql_mutex_unlock(&LOCK_wsrep_replaying); mysql_mutex_unlock(&LOCK_wsrep_replaying);
@ -289,6 +290,7 @@ wsrep_run_wsrep_commit(
(thd->query()) ? thd->query() : "void"); (thd->query()) ? thd->query() : "void");
DBUG_RETURN(WSREP_TRX_ROLLBACK); DBUG_RETURN(WSREP_TRX_ROLLBACK);
} }
thd->wsrep_query_state = QUERY_COMMITTING; thd->wsrep_query_state = QUERY_COMMITTING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
@ -296,13 +298,13 @@ wsrep_run_wsrep_commit(
rcode = 0; rcode = 0;
if (cache) { if (cache) {
thd->binlog_flush_pending_rows_event(true); thd->binlog_flush_pending_rows_event(true);
rcode = wsrep_write_cache(cache, &rbr_data, &data_len); rcode = wsrep_write_cache(wsrep, thd, cache, (size_t*)&data_len);
if (rcode) { if (WSREP_OK != rcode) {
WSREP_ERROR("rbr write fail, data_len: %d, %d", data_len, rcode); WSREP_ERROR("rbr write fail, data_len: %zu, %d", data_len, rcode);
if (data_len) my_free(rbr_data);
DBUG_RETURN(WSREP_TRX_ROLLBACK); DBUG_RETURN(WSREP_TRX_ROLLBACK);
} }
} }
if (data_len == 0) if (data_len == 0)
{ {
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
@ -329,33 +331,31 @@ wsrep_run_wsrep_commit(
thd->wsrep_query_state= QUERY_EXEC; thd->wsrep_query_state= QUERY_EXEC;
DBUG_RETURN(WSREP_TRX_OK); DBUG_RETURN(WSREP_TRX_OK);
} }
if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_trx_handle.trx_id)
if (WSREP_UNDEFINED_TRX_ID == thd->wsrep_ws_handle.trx_id)
{ {
WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %d\n" WSREP_WARN("SQL statement was ineffective, THD: %lu, buf: %zu\n"
"QUERY: %s\n" "QUERY: %s\n"
" => Skipping replication", " => Skipping replication",
thd->thread_id, data_len, thd->query()); thd->thread_id, data_len, thd->query());
if (wsrep_debug)
{
wsrep_write_rbr_buf(thd, rbr_data, data_len);
}
rcode = WSREP_TRX_FAIL; rcode = WSREP_TRX_FAIL;
} }
else if (!rcode) else if (!rcode)
{ {
rcode = wsrep->pre_commit( if (WSREP_OK == rcode)
wsrep, rcode = wsrep->pre_commit(wsrep,
(wsrep_conn_id_t)thd->thread_id, (wsrep_conn_id_t)thd->thread_id,
&thd->wsrep_trx_handle, &thd->wsrep_ws_handle,
rbr_data, WSREP_FLAG_COMMIT |
data_len, ((thd->wsrep_PA_safe) ?
(thd->wsrep_PA_safe) ? WSREP_FLAG_PA_SAFE : 0ULL, 0ULL : WSREP_FLAG_PA_UNSAFE),
&thd->wsrep_trx_seqno); &thd->wsrep_trx_meta);
switch (rcode) {
switch (rcode)
{
case WSREP_TRX_MISSING: case WSREP_TRX_MISSING:
WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s", WSREP_WARN("Transaction missing in provider, thd: %ld, SQL: %s",
thd->thread_id, thd->query()); thd->thread_id, thd->query());
wsrep_write_rbr_buf(thd, rbr_data, data_len);
rcode = WSREP_OK; rcode = WSREP_OK;
break; break;
case WSREP_BF_ABORT: case WSREP_BF_ABORT:
@ -375,15 +375,10 @@ wsrep_run_wsrep_commit(
} else { } else {
WSREP_ERROR("I/O error reading from thd's binlog iocache: " WSREP_ERROR("I/O error reading from thd's binlog iocache: "
"errno=%d, io cache code=%d", my_errno, cache->error); "errno=%d, io cache code=%d", my_errno, cache->error);
if (data_len) my_free(rbr_data);
DBUG_ASSERT(0); // failure like this can not normally happen DBUG_ASSERT(0); // failure like this can not normally happen
DBUG_RETURN(WSREP_TRX_ERROR); DBUG_RETURN(WSREP_TRX_ERROR);
} }
if (data_len) {
my_free(rbr_data);
}
mysql_mutex_lock(&thd->LOCK_wsrep_thd); mysql_mutex_lock(&thd->LOCK_wsrep_thd);
switch(rcode) { switch(rcode) {
case 0: case 0:
@ -392,8 +387,8 @@ wsrep_run_wsrep_commit(
if (thd->transaction.xid_state.xid.get_my_xid()) if (thd->transaction.xid_state.xid.get_my_xid())
{ {
wsrep_xid_init(&thd->transaction.xid_state.xid, wsrep_xid_init(&thd->transaction.xid_state.xid,
wsrep_cluster_uuid(), &thd->wsrep_trx_meta.gtid.uuid,
thd->wsrep_trx_seqno); thd->wsrep_trx_meta.gtid.seqno);
} }
DBUG_PRINT("wsrep", ("replicating commit success")); DBUG_PRINT("wsrep", ("replicating commit success"));
@ -465,7 +460,8 @@ mysql_declare_plugin(wsrep)
&wsrep_storage_engine, &wsrep_storage_engine,
"wsrep", "wsrep",
"Codership Oy", "Codership Oy",
"A pseudo storage engine to represent transactions in multi-master synchornous replication", "A pseudo storage engine to represent transactions in multi-master "
"synchornous replication",
PLUGIN_LICENSE_GPL, PLUGIN_LICENSE_GPL,
wsrep_hton_init, /* Plugin Init */ wsrep_hton_init, /* Plugin Init */
NULL, /* Plugin Deinit */ NULL, /* Plugin Deinit */

View File

@ -21,6 +21,7 @@
#include "wsrep_sst.h" #include "wsrep_sst.h"
#include "wsrep_utils.h" #include "wsrep_utils.h"
#include "wsrep_var.h" #include "wsrep_var.h"
#include "wsrep_binlog.h"
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
#include "log_event.h" #include "log_event.h"
@ -37,34 +38,32 @@ const char* wsrep_data_home_dir = NULL;
const char* wsrep_dbug_option = ""; const char* wsrep_dbug_option = "";
long wsrep_slave_threads = 1; // # of slave action appliers wanted long wsrep_slave_threads = 1; // # of slave action appliers wanted
int wsrep_slave_count_change = 0; // # of appliers to stop or start
my_bool wsrep_debug = 0; // enable debug level logging my_bool wsrep_debug = 0; // enable debug level logging
my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx my_bool wsrep_convert_LOCK_to_trx = 1; // convert locking sessions to trx
ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx ulong wsrep_retry_autocommit = 5; // retry aborted autocommit trx
my_bool wsrep_auto_increment_control = 1; // control auto increment variables my_bool wsrep_auto_increment_control = 1; // control auto increment variables
my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey my_bool wsrep_drupal_282555_workaround = 1; // retry autoinc insert after dupkey
my_bool wsrep_incremental_data_collection = 0; // incremental data collection my_bool wsrep_incremental_data_collection = 0; // incremental data collection
long long wsrep_max_ws_size = 1073741824LL; //max ws (RBR buffer) size ulong wsrep_max_ws_size = 1073741824UL;//max ws (RBR buffer) size
long wsrep_max_ws_rows = 65536; // max number of rows in ws ulong wsrep_max_ws_rows = 65536; // max number of rows in ws
int wsrep_to_isolation = 0; // # of active TO isolation threads int wsrep_to_isolation = 0; // # of active TO isolation threads
my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key my_bool wsrep_certify_nonPK = 1; // certify, even when no primary key
long wsrep_max_protocol_version = 2; // maximum protocol version to use long wsrep_max_protocol_version = 2; // maximum protocol version to use
ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC; ulong wsrep_forced_binlog_format = BINLOG_FORMAT_UNSPEC;
my_bool wsrep_recovery = 0; // recovery my_bool wsrep_recovery = 0; // recovery
my_bool wsrep_replicate_myisam = 0; // enable myisam replication my_bool wsrep_replicate_myisam = 0; // enable myisam replication
my_bool wsrep_log_conflicts = 0; // my_bool wsrep_log_conflicts = 0;
ulong wsrep_mysql_replication_bundle = 0; ulong wsrep_mysql_replication_bundle = 0;
my_bool wsrep_desync = 0; // desynchronize the node from the
// cluster
my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals my_bool wsrep_load_data_splitting = 1; // commit load data every 10K intervals
my_bool wsrep_desync = 0; // desynchronize the node from the cluster
/* /*
* End configuration options * End configuration options
*/ */
static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED; static const wsrep_uuid_t cluster_uuid = WSREP_UUID_UNDEFINED;
const wsrep_uuid_t* wsrep_cluster_uuid()
{
return &cluster_uuid;
}
static char cluster_uuid_str[40]= { 0, }; static char cluster_uuid_str[40]= { 0, };
static const char* cluster_status_str[WSREP_VIEW_MAX] = static const char* cluster_status_str[WSREP_VIEW_MAX] =
{ {
@ -102,7 +101,18 @@ long wsrep_protocol_version = 2;
// if there was no state gap on receiving first view event. // if there was no state gap on receiving first view event.
static my_bool wsrep_startup = TRUE; static my_bool wsrep_startup = TRUE;
/* wsrep callbacks */ extern wsrep_cb_status_t wsrep_apply_cb(void *ctx,
const void* buf, size_t buf_len,
const wsrep_trx_meta_t* meta);
extern wsrep_cb_status_t wsrep_commit_cb(void *ctx,
const wsrep_trx_meta_t* meta,
wsrep_bool_t *exit,
wsrep_bool_t commit);
extern wsrep_cb_status_t wsrep_unordered_cb(void* ctx,
const void* data,
size_t size);
static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) { static void wsrep_log_cb(wsrep_log_level_t level, const char *msg) {
switch (level) { switch (level) {
@ -186,19 +196,22 @@ void wsrep_get_SE_checkpoint(XID* xid)
plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid); plugin_foreach(NULL, get_SE_checkpoint, MYSQL_STORAGE_ENGINE_PLUGIN, xid);
} }
static void wsrep_view_handler_cb (void* app_ctx, static wsrep_cb_status_t
wsrep_view_handler_cb (void* app_ctx,
void* recv_ctx, void* recv_ctx,
const wsrep_view_info_t* view, const wsrep_view_info_t* view,
const char* state, const char* state,
size_t state_len, size_t state_len,
void** sst_req, void** sst_req,
ssize_t* sst_req_len) size_t* sst_req_len)
{ {
wsrep_member_status_t new_status= local_status.get(); wsrep_member_status_t new_status= local_status.get();
if (memcmp(&cluster_uuid, &view->uuid, sizeof(wsrep_uuid_t))) if (memcmp(&cluster_uuid, &view->state_id.uuid, sizeof(wsrep_uuid_t)))
{ {
memcpy((wsrep_uuid_t*)&cluster_uuid, &view->uuid, sizeof(cluster_uuid)); memcpy((wsrep_uuid_t*)&cluster_uuid, &view->state_id.uuid,
sizeof(cluster_uuid));
wsrep_uuid_print (&cluster_uuid, cluster_uuid_str, wsrep_uuid_print (&cluster_uuid, cluster_uuid_str,
sizeof(cluster_uuid_str)); sizeof(cluster_uuid_str));
} }
@ -210,7 +223,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, " WSREP_INFO("New cluster view: global state: %s:%lld, view# %lld: %s, "
"number of nodes: %ld, my index: %ld, protocol version %d", "number of nodes: %ld, my index: %ld, protocol version %d",
wsrep_cluster_state_uuid, (long long)view->seqno, wsrep_cluster_state_uuid, (long long)view->state_id.seqno,
(long long)wsrep_cluster_conf_id, wsrep_cluster_status, (long long)wsrep_cluster_conf_id, wsrep_cluster_status,
wsrep_cluster_size, wsrep_local_index, view->proto_ver); wsrep_cluster_size, wsrep_local_index, view->proto_ver);
@ -290,14 +303,14 @@ static void wsrep_view_handler_cb (void* app_ctx,
{ {
wsrep_SE_init_grab(); wsrep_SE_init_grab();
// Signal mysqld init thread to continue // Signal mysqld init thread to continue
wsrep_sst_complete (&cluster_uuid, view->seqno, false); wsrep_sst_complete (&cluster_uuid, view->state_id.seqno, false);
// and wait for SE initialization // and wait for SE initialization
wsrep_SE_init_wait(); wsrep_SE_init_wait();
} }
else else
{ {
local_uuid= cluster_uuid; local_uuid= cluster_uuid;
local_seqno= view->seqno; local_seqno= view->state_id.seqno;
} }
/* Init storage engine XIDs from first view */ /* Init storage engine XIDs from first view */
XID xid; XID xid;
@ -310,7 +323,7 @@ static void wsrep_view_handler_cb (void* app_ctx,
if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t))) if (memcmp (&local_uuid, &cluster_uuid, sizeof (wsrep_uuid_t)))
{ {
WSREP_ERROR("Undetected state gap. Can't continue."); WSREP_ERROR("Undetected state gap. Can't continue.");
wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->seqno, wsrep_log_states(WSREP_LOG_FATAL, &cluster_uuid, view->state_id.seqno,
&local_uuid, -1); &local_uuid, -1);
unireg_abort(1); unireg_abort(1);
} }
@ -322,9 +335,23 @@ static void wsrep_view_handler_cb (void* app_ctx,
global_system_variables.auto_increment_increment= view->memb_num; global_system_variables.auto_increment_increment= view->memb_num;
} }
{ /* capabilities may be updated on new configuration */
uint64_t const caps(wsrep->capabilities (wsrep));
my_bool const idc((caps & WSREP_CAP_INCREMENTAL_WRITESET) != 0);
if (TRUE == wsrep_incremental_data_collection && FALSE == idc)
{
WSREP_WARN("Unsupported protocol downgrade: "
"incremental data collection disabled. Expect abort.");
}
wsrep_incremental_data_collection = idc;
}
out: out:
wsrep_startup= FALSE; wsrep_startup= FALSE;
local_status.set(new_status, view); local_status.set(new_status, view);
return WSREP_CB_SUCCESS;
} }
void wsrep_ready_set (my_bool x) void wsrep_ready_set (my_bool x)
@ -407,6 +434,8 @@ static void wsrep_init_position()
} }
} }
extern const char* my_bind_addr_str;
int wsrep_init() int wsrep_init()
{ {
int rcode= -1; int rcode= -1;
@ -460,7 +489,7 @@ int wsrep_init()
size_t const node_addr_max= sizeof(node_addr) - 1; size_t const node_addr_max= sizeof(node_addr) - 1;
if (!wsrep_node_address || !strcmp(wsrep_node_address, "")) if (!wsrep_node_address || !strcmp(wsrep_node_address, ""))
{ {
size_t const ret= guess_ip(node_addr, node_addr_max); size_t const ret= wsrep_guess_ip(node_addr, node_addr_max);
if (!(ret > 0 && ret < node_addr_max)) if (!(ret > 0 && ret < node_addr_max))
{ {
WSREP_WARN("Failed to guess base node address. Set it explicitly via " WSREP_WARN("Failed to guess base node address. Set it explicitly via "
@ -477,6 +506,23 @@ int wsrep_init()
size_t const inc_addr_max= sizeof (inc_addr); size_t const inc_addr_max= sizeof (inc_addr);
if ((!wsrep_node_incoming_address || if ((!wsrep_node_incoming_address ||
!strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO))) !strcmp (wsrep_node_incoming_address, WSREP_NODE_INCOMING_AUTO)))
{
unsigned int my_bind_ip= INADDR_ANY; // default if not set
if (my_bind_addr_str && strlen(my_bind_addr_str))
{
my_bind_ip= wsrep_check_ip(my_bind_addr_str);
}
if (INADDR_ANY != my_bind_ip)
{
if (INADDR_NONE != my_bind_ip && INADDR_LOOPBACK != my_bind_ip)
{
snprintf(inc_addr, inc_addr_max, "%s:%u",
my_bind_addr_str, (int)mysqld_port);
} // else leave inc_addr an empty string - mysqld is not listening for
// client connections on network interfaces.
}
else // mysqld binds to 0.0.0.0, take IP from wsrep_node_address if possible
{ {
size_t const node_addr_len= strlen(node_addr); size_t const node_addr_len= strlen(node_addr);
if (node_addr_len > 0) if (node_addr_len > 0)
@ -488,7 +534,8 @@ int wsrep_init()
if (ip_len + 7 /* :55555\0 */ < inc_addr_max) if (ip_len + 7 /* :55555\0 */ < inc_addr_max)
{ {
memcpy (inc_addr, node_addr, ip_len); memcpy (inc_addr, node_addr, ip_len);
snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",mysqld_port); snprintf(inc_addr + ip_len, inc_addr_max - ip_len, ":%u",
(int)mysqld_port);
} }
else else
{ {
@ -511,6 +558,7 @@ int wsrep_init()
"Try setting wsrep_node_incoming_address explicitly."); "Try setting wsrep_node_incoming_address explicitly.");
} }
} }
}
else if (!strchr(wsrep_node_incoming_address, ':')) // no port included else if (!strchr(wsrep_node_incoming_address, ':')) // no port included
{ {
if ((int)inc_addr_max <= if ((int)inc_addr_max <=
@ -536,6 +584,8 @@ int wsrep_init()
struct wsrep_init_args wsrep_args; struct wsrep_init_args wsrep_args;
struct wsrep_gtid const state_id = { local_uuid, local_seqno };
wsrep_args.data_dir = wsrep_data_home_dir; wsrep_args.data_dir = wsrep_data_home_dir;
wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : ""; wsrep_args.node_name = (wsrep_node_name) ? wsrep_node_name : "";
wsrep_args.node_address = node_addr; wsrep_args.node_address = node_addr;
@ -544,13 +594,13 @@ int wsrep_init()
wsrep_provider_options : ""; wsrep_provider_options : "";
wsrep_args.proto_ver = wsrep_max_protocol_version; wsrep_args.proto_ver = wsrep_max_protocol_version;
wsrep_args.state_uuid = &local_uuid; wsrep_args.state_id = &state_id;
wsrep_args.state_seqno = local_seqno;
wsrep_args.logger_cb = wsrep_log_cb; wsrep_args.logger_cb = wsrep_log_cb;
wsrep_args.view_handler_cb = wsrep_view_handler_cb; wsrep_args.view_handler_cb = wsrep_view_handler_cb;
wsrep_args.apply_cb = wsrep_apply_cb; wsrep_args.apply_cb = wsrep_apply_cb;
wsrep_args.commit_cb = wsrep_commit_cb; wsrep_args.commit_cb = wsrep_commit_cb;
wsrep_args.unordered_cb = wsrep_unordered_cb;
wsrep_args.sst_donate_cb = wsrep_sst_donate_cb; wsrep_args.sst_donate_cb = wsrep_sst_donate_cb;
wsrep_args.synced_cb = wsrep_synced_cb; wsrep_args.synced_cb = wsrep_synced_cb;
@ -679,6 +729,7 @@ bool wsrep_start_replication()
*/ */
const char* cluster_address = const char* cluster_address =
wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address; wsrep_new_cluster ? "bootstrap" : wsrep_cluster_address;
bool const bootstrap(TRUE == wsrep_new_cluster);
wsrep_new_cluster= FALSE; wsrep_new_cluster= FALSE;
WSREP_INFO("Start replication"); WSREP_INFO("Start replication");
@ -686,7 +737,8 @@ bool wsrep_start_replication()
if ((rcode = wsrep->connect(wsrep, if ((rcode = wsrep->connect(wsrep,
wsrep_cluster_name, wsrep_cluster_name,
cluster_address, cluster_address,
wsrep_sst_donor))) wsrep_sst_donor,
bootstrap)))
{ {
if (-ESOCKTNOSUPPORT == rcode) if (-ESOCKTNOSUPPORT == rcode)
{ {
@ -707,11 +759,6 @@ bool wsrep_start_replication()
{ {
wsrep_connected= TRUE; wsrep_connected= TRUE;
uint64_t caps = wsrep->capabilities (wsrep);
wsrep_incremental_data_collection =
!!(caps & WSREP_CAP_WRITE_SET_INCREMENTS);
char* opts= wsrep->options_get(wsrep); char* opts= wsrep->options_get(wsrep);
if (opts) if (opts)
{ {
@ -736,8 +783,8 @@ wsrep_causal_wait (THD* thd)
{ {
// This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0 // This allows autocommit SELECTs and a first SELECT after SET AUTOCOMMIT=0
// TODO: modify to check if thd has locked any rows. // TODO: modify to check if thd has locked any rows.
wsrep_seqno_t seqno; wsrep_gtid_t gtid;
wsrep_status_t ret= wsrep->causal_read (wsrep, &seqno); wsrep_status_t ret= wsrep->causal_read (wsrep, &gtid);
if (unlikely(WSREP_OK != ret)) if (unlikely(WSREP_OK != ret))
{ {
@ -785,7 +832,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
{ {
for (size_t i= 0; i < key_arr->keys_len; ++i) for (size_t i= 0; i < key_arr->keys_len; ++i)
{ {
my_free((wsrep_key_part_t*)key_arr->keys[i].key_parts); my_free((void*)key_arr->keys[i].key_parts);
} }
my_free(key_arr->keys); my_free(key_arr->keys);
key_arr->keys= 0; key_arr->keys= 0;
@ -805,7 +852,7 @@ static void wsrep_keys_free(wsrep_key_arr_t* key_arr)
static bool wsrep_prepare_key_for_isolation(const char* db, static bool wsrep_prepare_key_for_isolation(const char* db,
const char* table, const char* table,
wsrep_key_part_t* key, wsrep_buf_t* key,
size_t* key_len) size_t* key_len)
{ {
if (*key_len < 2) return false; if (*key_len < 2) return false;
@ -824,13 +871,13 @@ static bool wsrep_prepare_key_for_isolation(const char* db,
// sql_print_information("%s.%s", db, table); // sql_print_information("%s.%s", db, table);
if (db) if (db)
{ {
key[*key_len].buf= db; key[*key_len].ptr= db;
key[*key_len].buf_len= strlen(db); key[*key_len].len= strlen(db);
++(*key_len); ++(*key_len);
if (table) if (table)
{ {
key[*key_len].buf= table; key[*key_len].ptr= table;
key[*key_len].buf_len= strlen(table); key[*key_len].len= strlen(table);
++(*key_len); ++(*key_len);
} }
} }
@ -863,23 +910,23 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
{ {
if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0)))) if (!(ka->keys= (wsrep_key_t*)my_malloc(sizeof(wsrep_key_t), MYF(0))))
{ {
sql_print_error("Can't allocate memory for key_array"); WSREP_ERROR("Can't allocate memory for key_array");
goto err; goto err;
} }
ka->keys_len= 1; ka->keys_len= 1;
if (!(ka->keys[0].key_parts= (wsrep_key_part_t*) if (!(ka->keys[0].key_parts= (wsrep_buf_t*)
my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{ {
sql_print_error("Can't allocate memory for key_parts"); WSREP_ERROR("Can't allocate memory for key_parts");
goto err; goto err;
} }
ka->keys[0].key_parts_len= 2; ka->keys[0].key_parts_num= 2;
if (!wsrep_prepare_key_for_isolation( if (!wsrep_prepare_key_for_isolation(
db, table, db, table,
(wsrep_key_part_t*)ka->keys[0].key_parts, (wsrep_buf_t*)ka->keys[0].key_parts,
&ka->keys[0].key_parts_len)) &ka->keys[0].key_parts_num))
{ {
sql_print_error("Preparing keys for isolation failed"); WSREP_ERROR("Preparing keys for isolation failed");
goto err; goto err;
} }
} }
@ -895,24 +942,24 @@ static bool wsrep_prepare_keys_for_isolation(THD* thd,
MYF(MY_ALLOW_ZERO_PTR)); MYF(MY_ALLOW_ZERO_PTR));
if (!tmp) if (!tmp)
{ {
sql_print_error("Can't allocate memory for key_array"); WSREP_ERROR("Can't allocate memory for key_array");
goto err; goto err;
} }
ka->keys= tmp; ka->keys= tmp;
if (!(ka->keys[ka->keys_len].key_parts= (wsrep_key_part_t*) if (!(ka->keys[ka->keys_len].key_parts= (wsrep_buf_t*)
my_malloc(sizeof(wsrep_key_part_t)*2, MYF(0)))) my_malloc(sizeof(wsrep_buf_t)*2, MYF(0))))
{ {
sql_print_error("Can't allocate memory for key_parts"); WSREP_ERROR("Can't allocate memory for key_parts");
goto err; goto err;
} }
ka->keys[ka->keys_len].key_parts_len= 2; ka->keys[ka->keys_len].key_parts_num= 2;
++ka->keys_len; ++ka->keys_len;
if (!wsrep_prepare_key_for_isolation( if (!wsrep_prepare_key_for_isolation(
table->db, table->table_name, table->db, table->table_name,
(wsrep_key_part_t*)ka->keys[ka->keys_len - 1].key_parts, (wsrep_buf_t*)ka->keys[ka->keys_len - 1].key_parts,
&ka->keys[ka->keys_len - 1].key_parts_len)) &ka->keys[ka->keys_len - 1].key_parts_num))
{ {
sql_print_error("Preparing keys for isolation failed"); WSREP_ERROR("Preparing keys for isolation failed");
goto err; goto err;
} }
} }
@ -929,7 +976,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
size_t cache_key_len, size_t cache_key_len,
const uchar* row_id, const uchar* row_id,
size_t row_id_len, size_t row_id_len,
wsrep_key_part_t* key, wsrep_buf_t* key,
size_t* key_len) size_t* key_len)
{ {
if (*key_len < 3) return false; if (*key_len < 3) return false;
@ -939,28 +986,30 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
{ {
case 0: case 0:
{ {
key[*key_len].buf = cache_key; key[0].ptr = cache_key;
key[*key_len].buf_len = cache_key_len; key[0].len = cache_key_len;
++(*key_len);
*key_len = 1;
break; break;
} }
case 1: case 1:
case 2: case 2:
{ {
key[*key_len].buf = cache_key; key[0].ptr = cache_key;
key[*key_len].buf_len = strlen( (char*)cache_key ); key[0].len = strlen( (char*)cache_key );
++(*key_len);
key[*key_len].buf = cache_key + strlen( (char*)cache_key ) + 1; key[1].ptr = cache_key + strlen( (char*)cache_key ) + 1;
key[*key_len].buf_len = strlen( (char*)(key[*key_len].buf) ); key[1].len = strlen( (char*)(key[1].ptr) );
++(*key_len);
*key_len = 2;
break; break;
} }
default: default:
return false; return false;
} }
key[*key_len].buf = row_id; key[*key_len].ptr = row_id;
key[*key_len].buf_len = row_id_len; key[*key_len].len = row_id_len;
++(*key_len); ++(*key_len);
return true; return true;
@ -973,7 +1022,7 @@ bool wsrep_prepare_key_for_innodb(const uchar* cache_key,
* Return 0 in case of success, 1 in case of error. * Return 0 in case of success, 1 in case of error.
*/ */
int wsrep_to_buf_helper( int wsrep_to_buf_helper(
THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len) THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len)
{ {
IO_CACHE tmp_io_cache; IO_CACHE tmp_io_cache;
if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX, if (open_cached_file(&tmp_io_cache, mysql_tmpdir, TEMP_PREFIX,
@ -994,7 +1043,7 @@ int wsrep_to_buf_helper(
Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0); Query_log_event ev(thd, query, query_len, FALSE, FALSE, FALSE, 0);
if (ev.write(&tmp_io_cache)) ret= 1; if (ev.write(&tmp_io_cache)) ret= 1;
if (!ret && wsrep_write_cache(&tmp_io_cache, buf, buf_len)) ret= 1; if (!ret && wsrep_write_cache_buf(&tmp_io_cache, buf, (size_t*)buf_len)) ret= 1;
close_cached_file(&tmp_io_cache); close_cached_file(&tmp_io_cache);
return ret; return ret;
@ -1002,7 +1051,7 @@ int wsrep_to_buf_helper(
#include "sql_show.h" #include "sql_show.h"
static int static int
create_view_query(THD *thd, uchar** buf, uint* buf_len) create_view_query(THD *thd, uchar** buf, int* buf_len)
{ {
LEX *lex= thd->lex; LEX *lex= thd->lex;
SELECT_LEX *select_lex= &lex->select_lex; SELECT_LEX *select_lex= &lex->select_lex;
@ -1079,10 +1128,10 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
{ {
wsrep_status_t ret(WSREP_WARNING); wsrep_status_t ret(WSREP_WARNING);
uchar* buf(0); uchar* buf(0);
uint buf_len(0); int buf_len(0);
int buf_err; int buf_err;
WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, WSREP_DEBUG("TO BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() ); thd->wsrep_exec_mode, thd->query() );
switch (thd->lex->sql_command) switch (thd->lex->sql_command)
{ {
@ -1106,18 +1155,19 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
} }
wsrep_key_arr_t key_arr= {0, 0}; wsrep_key_arr_t key_arr= {0, 0};
struct wsrep_buf buff = { buf, buf_len };
if (!buf_err && if (!buf_err &&
wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&& wsrep_prepare_keys_for_isolation(thd, db_, table_, table_list, &key_arr)&&
WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id, WSREP_OK == (ret = wsrep->to_execute_start(wsrep, thd->thread_id,
key_arr.keys, key_arr.keys_len, key_arr.keys, key_arr.keys_len,
buf, buf_len, &buff, 1,
&thd->wsrep_trx_seqno))) &thd->wsrep_trx_meta)))
{ {
thd->wsrep_exec_mode= TOTAL_ORDER; thd->wsrep_exec_mode= TOTAL_ORDER;
wsrep_to_isolation++; wsrep_to_isolation++;
if (buf) my_free(buf); if (buf) my_free(buf);
wsrep_keys_free(&key_arr); wsrep_keys_free(&key_arr);
WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)thd->wsrep_trx_seqno, WSREP_DEBUG("TO BEGIN: %lld, %d",(long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode); thd->wsrep_exec_mode);
} }
else { else {
@ -1137,10 +1187,10 @@ static int wsrep_TOI_begin(THD *thd, char *db_, char *table_,
static void wsrep_TOI_end(THD *thd) { static void wsrep_TOI_end(THD *thd) {
wsrep_status_t ret; wsrep_status_t ret;
wsrep_to_isolation--; wsrep_to_isolation--;
WSREP_DEBUG("TO END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, WSREP_DEBUG("TO END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void") thd->wsrep_exec_mode, (thd->query()) ? thd->query() : "void")
if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) { if (WSREP_OK == (ret = wsrep->to_execute_end(wsrep, thd->thread_id))) {
WSREP_DEBUG("TO END: %lld", (long long)thd->wsrep_trx_seqno); WSREP_DEBUG("TO END: %lld", (long long)wsrep_thd_trx_seqno(thd));
} }
else { else {
WSREP_WARN("TO isolation end failed for: %d, sql: %s", WSREP_WARN("TO isolation end failed for: %d, sql: %s",
@ -1151,7 +1201,7 @@ static void wsrep_TOI_end(THD *thd) {
static int wsrep_RSU_begin(THD *thd, char *db_, char *table_) static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
{ {
wsrep_status_t ret(WSREP_WARNING); wsrep_status_t ret(WSREP_WARNING);
WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, WSREP_DEBUG("RSU BEGIN: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() ); thd->wsrep_exec_mode, thd->query() );
ret = wsrep->desync(wsrep); ret = wsrep->desync(wsrep);
@ -1196,7 +1246,7 @@ static int wsrep_RSU_begin(THD *thd, char *db_, char *table_)
static void wsrep_RSU_end(THD *thd) static void wsrep_RSU_end(THD *thd)
{ {
wsrep_status_t ret(WSREP_WARNING); wsrep_status_t ret(WSREP_WARNING);
WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)thd->wsrep_trx_seqno, WSREP_DEBUG("RSU END: %lld, %d : %s", (long long)wsrep_thd_trx_seqno(thd),
thd->wsrep_exec_mode, thd->query() ); thd->wsrep_exec_mode, thd->query() );
@ -1276,10 +1326,10 @@ void wsrep_to_isolation_end(THD *thd) {
"request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \ "request: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)\n" \
"granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \ "granted: (%lu \tseqno %lld \twsrep (%d, %d, %d) cmd %d %d \t%s)", \
msg, \ msg, \
req->thread_id, (long long)req->wsrep_trx_seqno, \ req->thread_id, (long long)wsrep_thd_trx_seqno(req), \
req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \ req->wsrep_exec_mode, req->wsrep_query_state, req->wsrep_conflict_state, \
req->get_command(), req->lex->sql_command, req->query(), \ req->get_command(), req->lex->sql_command, req->query(), \
gra->thread_id, (long long)gra->wsrep_trx_seqno, \ gra->thread_id, (long long)wsrep_thd_trx_seqno(gra), \
gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \ gra->wsrep_exec_mode, gra->wsrep_query_state, gra->wsrep_conflict_state, \
gra->get_command(), gra->lex->sql_command, gra->query()); gra->get_command(), gra->lex->sql_command, gra->query());

View File

@ -28,7 +28,6 @@ class THD;
#ifdef WITH_WSREP #ifdef WITH_WSREP
#include "../wsrep/wsrep_api.h" #include "../wsrep/wsrep_api.h"
//#include "wsrep_mysqld.h"
enum wsrep_exec_mode { enum wsrep_exec_mode {
LOCAL_STATE, LOCAL_STATE,
REPL_RECV, REPL_RECV,
@ -73,15 +72,16 @@ extern const char* wsrep_node_incoming_address;
extern const char* wsrep_data_home_dir; extern const char* wsrep_data_home_dir;
extern const char* wsrep_dbug_option; extern const char* wsrep_dbug_option;
extern long wsrep_slave_threads; extern long wsrep_slave_threads;
extern my_bool wsrep_debug; extern int wsrep_slave_count_change;
extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug;
extern my_bool wsrep_convert_LOCK_to_trx; extern my_bool wsrep_convert_LOCK_to_trx;
extern ulong wsrep_retry_autocommit; extern ulong wsrep_retry_autocommit;
extern my_bool wsrep_auto_increment_control; extern my_bool wsrep_auto_increment_control;
extern my_bool wsrep_drupal_282555_workaround; extern my_bool wsrep_drupal_282555_workaround;
extern my_bool wsrep_incremental_data_collection; extern my_bool wsrep_incremental_data_collection;
extern const char* wsrep_start_position; extern const char* wsrep_start_position;
extern long long wsrep_max_ws_size; extern ulong wsrep_max_ws_size;
extern long wsrep_max_ws_rows; extern ulong wsrep_max_ws_rows;
extern const char* wsrep_notify_cmd; extern const char* wsrep_notify_cmd;
extern my_bool wsrep_certify_nonPK; extern my_bool wsrep_certify_nonPK;
extern long wsrep_max_protocol_version; extern long wsrep_max_protocol_version;
@ -136,7 +136,7 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
extern "C" const char * wsrep_thd_query_state_str(THD *thd); extern "C" const char * wsrep_thd_query_state_str(THD *thd);
extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
extern "C" void wsrep_thd_set_query_state( extern "C" void wsrep_thd_set_query_state(
@ -260,19 +260,11 @@ extern mysql_cond_t COND_wsrep_rollback;
extern int wsrep_replaying; extern int wsrep_replaying;
extern mysql_mutex_t LOCK_wsrep_replaying; extern mysql_mutex_t LOCK_wsrep_replaying;
extern mysql_cond_t COND_wsrep_replaying; extern mysql_cond_t COND_wsrep_replaying;
extern wsrep_aborting_thd_t wsrep_aborting_thd;
extern MYSQL_PLUGIN_IMPORT my_bool wsrep_debug;
extern my_bool wsrep_convert_LOCK_to_trx;
extern ulong wsrep_retry_autocommit;
extern my_bool wsrep_emulate_bin_log;
extern my_bool wsrep_auto_increment_control;
extern my_bool wsrep_drupal_282555_workaround;
extern long long wsrep_max_ws_size;
extern long wsrep_max_ws_rows;
extern int wsrep_to_isolation;
extern my_bool wsrep_certify_nonPK;
extern mysql_mutex_t LOCK_wsrep_slave_threads; extern mysql_mutex_t LOCK_wsrep_slave_threads;
extern mysql_mutex_t LOCK_wsrep_desync; extern mysql_mutex_t LOCK_wsrep_desync;
extern wsrep_aborting_thd_t wsrep_aborting_thd;
extern my_bool wsrep_emulate_bin_log;
extern int wsrep_to_isolation;
extern PSI_mutex_key key_LOCK_wsrep_ready; extern PSI_mutex_key key_LOCK_wsrep_ready;
extern PSI_mutex_key key_COND_wsrep_ready; extern PSI_mutex_key key_COND_wsrep_ready;
@ -295,12 +287,11 @@ int wsrep_to_isolation_begin(THD *thd, char *db_, char *table_,
void wsrep_to_isolation_end(THD *thd); void wsrep_to_isolation_end(THD *thd);
int wsrep_to_buf_helper( int wsrep_to_buf_helper(
THD* thd, const char *query, uint query_len, uchar** buf, uint* buf_len); THD* thd, const char *query, uint query_len, uchar** buf, int* buf_len);
int wsrep_create_sp(THD *thd, uchar** buf, uint* buf_len); int wsrep_create_sp(THD *thd, uchar** buf, int* buf_len);
int wsrep_create_trigger_query(THD *thd, uchar** buf, uint* buf_len); int wsrep_create_trigger_query(THD *thd, uchar** buf, int* buf_len);
int wsrep_create_event_query(THD *thd, uchar** buf, uint* buf_len); int wsrep_create_event_query(THD *thd, uchar** buf, int* buf_len);
const wsrep_uuid_t* wsrep_cluster_uuid();
struct xid_t; struct xid_t;
void wsrep_set_SE_checkpoint(xid_t*); void wsrep_set_SE_checkpoint(xid_t*);

View File

@ -32,24 +32,22 @@ ssize_t wsrep_sst_prepare (void** msg);
wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx,
void* recv_ctx, void* recv_ctx,
const void* msg, size_t msg_len, const void* msg, size_t msg_len,
const wsrep_gtid_t* state_id, const wsrep_gtid_t* current_id,
const char* state, size_t state_len, const char* state, size_t state_len,
bool bypass); bool bypass);
extern unsigned int wsrep_check_ip (const char* addr);
extern size_t wsrep_guess_ip (char* buf, size_t buf_len);
extern size_t wsrep_guess_address(char* buf, size_t buf_len);
extern wsrep_uuid_t local_uuid; extern wsrep_uuid_t local_uuid;
extern wsrep_seqno_t local_seqno; extern wsrep_seqno_t local_seqno;
// a helper function // a helper function
void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t, extern void wsrep_sst_received(wsrep_t*, const wsrep_uuid_t*, wsrep_seqno_t,
const void*, size_t); const void*, size_t);
/*! SST thread signals init thread about sst completion */ /*! SST thread signals init thread about sst completion */
void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool); extern void wsrep_sst_complete(const wsrep_uuid_t*, wsrep_seqno_t, bool);
extern void wsrep_notify_status (wsrep_member_status_t new_status, void wsrep_notify_status (wsrep_member_status_t new_status,
const wsrep_view_info_t* view = 0); const wsrep_view_info_t* view = 0);
/* binlog-related stuff */
int wsrep_write_cache(IO_CACHE *cache, uchar **buf, size_t *buf_len);
#endif /* WSREP_PRIV_H */ #endif /* WSREP_PRIV_H */

View File

@ -232,13 +232,26 @@ void wsrep_sst_complete (const wsrep_uuid_t* sst_uuid,
mysql_mutex_unlock (&LOCK_wsrep_sst); mysql_mutex_unlock (&LOCK_wsrep_sst);
} }
void wsrep_sst_received (wsrep_t* const wsrep,
const wsrep_uuid_t* const uuid,
wsrep_seqno_t const seqno,
const void* const state,
size_t const state_len)
{
int const rcode(seqno < 0 ? seqno : 0);
wsrep_gtid_t const state_id = {
*uuid, (rcode ? WSREP_SEQNO_UNDEFINED : seqno)
};
wsrep->sst_received(wsrep, &state_id, state, state_len, rcode);
}
// Let applier threads to continue // Let applier threads to continue
void wsrep_sst_continue () void wsrep_sst_continue ()
{ {
if (sst_needed) if (sst_needed)
{ {
WSREP_INFO("Signalling provider to continue."); WSREP_INFO("Signalling provider to continue.");
wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
} }
} }
@ -434,7 +447,6 @@ static ssize_t sst_prepare_other (const char* method,
return ret; return ret;
} }
//extern ulong my_bind_addr;
extern uint mysqld_port; extern uint mysqld_port;
/*! Just tells donor where to send mysqldump */ /*! Just tells donor where to send mysqldump */
@ -518,7 +530,7 @@ ssize_t wsrep_sst_prepare (void** msg)
} }
else else
{ {
ssize_t ret= guess_ip (ip_buf, ip_max); ssize_t ret= wsrep_guess_ip (ip_buf, ip_max);
if (ret && ret < ip_max) if (ret && ret < ip_max)
{ {
@ -706,7 +718,9 @@ static int sst_donate_mysqldump (const char* addr,
ret= sst_run_shell (cmd_str, 3); ret= sst_run_shell (cmd_str, 3);
} }
wsrep->sst_sent (wsrep, uuid, ret ? ret : seqno); wsrep_gtid_t const state_id = { *uuid, (ret ? WSREP_SEQNO_UNDEFINED : seqno)};
wsrep->sst_sent (wsrep, &state_id, ret);
return ret; return ret;
} }
@ -896,7 +910,10 @@ wait_signal:
} }
// signal to donor that SST is over // signal to donor that SST is over
wsrep->sst_sent (wsrep, &ret_uuid, err ? -err : ret_seqno); struct wsrep_gtid const state_id = {
ret_uuid, err ? WSREP_SEQNO_UNDEFINED : ret_seqno
};
wsrep->sst_sent (wsrep, &state_id, -err);
proc.wait(); proc.wait();
return NULL; return NULL;
@ -950,10 +967,9 @@ static int sst_donate_other (const char* method,
return arg.err; return arg.err;
} }
int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx, wsrep_cb_status wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
const void* msg, size_t msg_len, const void* msg, size_t msg_len,
const wsrep_uuid_t* current_uuid, const wsrep_gtid_t* current_gtid,
wsrep_seqno_t current_seqno,
const char* state, size_t state_len, const char* state, size_t state_len,
bool bypass) bool bypass)
{ {
@ -967,20 +983,19 @@ int wsrep_sst_donate_cb (void* app_ctx, void* recv_ctx,
const char* data = method + method_len + 1; const char* data = method + method_len + 1;
char uuid_str[37]; char uuid_str[37];
wsrep_uuid_print (current_uuid, uuid_str, sizeof(uuid_str)); wsrep_uuid_print (&current_gtid->uuid, uuid_str, sizeof(uuid_str));
int ret; int ret;
if (!strcmp (WSREP_SST_MYSQLDUMP, method)) if (!strcmp (WSREP_SST_MYSQLDUMP, method))
{ {
ret = sst_donate_mysqldump (data, current_uuid, uuid_str, current_seqno, ret = sst_donate_mysqldump(data, &current_gtid->uuid, uuid_str,
bypass); current_gtid->seqno, bypass);
} }
else else
{ {
ret = sst_donate_other (method, data, uuid_str, current_seqno, bypass); ret = sst_donate_other(method, data, uuid_str, current_gtid->seqno,bypass);
} }
return (ret > 0 ? WSREP_CB_SUCCESS : WSREP_CB_FAILURE);
return (ret > 0 ? 0 : ret);
} }
void wsrep_SE_init_grab() void wsrep_SE_init_grab()

View File

@ -113,17 +113,17 @@ void wsrep_replay_transaction(THD *thd)
/* checking if BF trx must be replayed */ /* checking if BF trx must be replayed */
if (thd->wsrep_conflict_state== MUST_REPLAY) { if (thd->wsrep_conflict_state== MUST_REPLAY) {
if (thd->wsrep_exec_mode!= REPL_RECV) { if (thd->wsrep_exec_mode!= REPL_RECV) {
if (thd->stmt_da->is_sent) if (thd->get_stmt_da()->is_sent())
{ {
WSREP_ERROR("replay issue, thd has reported status already"); WSREP_ERROR("replay issue, thd has reported status already");
} }
thd->stmt_da->reset_diagnostics_area(); thd->get_stmt_da()->reset_diagnostics_area();
thd->wsrep_conflict_state= REPLAYING; thd->wsrep_conflict_state= REPLAYING;
mysql_mutex_unlock(&thd->LOCK_wsrep_thd); mysql_mutex_unlock(&thd->LOCK_wsrep_thd);
mysql_reset_thd_for_next_command(thd); mysql_reset_thd_for_next_command(thd, opt_userstat_running);
thd->killed= THD::NOT_KILLED; thd->killed= NOT_KILLED;
close_thread_tables(thd); close_thread_tables(thd);
if (thd->locked_tables_mode && thd->lock) if (thd->locked_tables_mode && thd->lock)
{ {
@ -157,7 +157,7 @@ void wsrep_replay_transaction(THD *thd)
wsrep->post_commit(wsrep, &thd->wsrep_ws_handle); wsrep->post_commit(wsrep, &thd->wsrep_ws_handle);
WSREP_DEBUG("trx_replay successful for: %ld %llu", WSREP_DEBUG("trx_replay successful for: %ld %llu",
thd->thread_id, (long long)thd->real_id); thd->thread_id, (long long)thd->real_id);
if (thd->stmt_da->is_sent) if (thd->get_stmt_da()->is_sent())
{ {
WSREP_WARN("replay ok, thd has reported status"); WSREP_WARN("replay ok, thd has reported status");
} }
@ -167,7 +167,7 @@ void wsrep_replay_transaction(THD *thd)
} }
break; break;
case WSREP_TRX_FAIL: case WSREP_TRX_FAIL:
if (thd->stmt_da->is_sent) if (thd->get_stmt_da()->is_sent())
{ {
WSREP_ERROR("replay failed, thd has reported status"); WSREP_ERROR("replay failed, thd has reported status");
} }
@ -237,7 +237,7 @@ static void wsrep_replication_process(THD *thd)
* avoid mysql shutdown. This is because the killer will then handle * avoid mysql shutdown. This is because the killer will then handle
* shutdown processing (or replication restarting) * shutdown processing (or replication restarting)
*/ */
if (thd->killed != THD::KILL_CONNECTION) if (thd->killed != KILL_CONNECTION)
{ {
wsrep_kill_mysql(thd); wsrep_kill_mysql(thd);
} }
@ -289,7 +289,7 @@ static void wsrep_rollback_process(THD *thd)
mysql_mutex_lock(&LOCK_wsrep_rollback); mysql_mutex_lock(&LOCK_wsrep_rollback);
wsrep_aborting_thd= NULL; wsrep_aborting_thd= NULL;
while (thd->killed == THD::NOT_KILLED) { while (thd->killed == NOT_KILLED) {
thd_proc_info(thd, "wsrep aborter idle"); thd_proc_info(thd, "wsrep aborter idle");
thd->mysys_var->current_mutex= &LOCK_wsrep_rollback; thd->mysys_var->current_mutex= &LOCK_wsrep_rollback;
thd->mysys_var->current_cond= &COND_wsrep_rollback; thd->mysys_var->current_cond= &COND_wsrep_rollback;

View File

@ -22,16 +22,17 @@
#include "wsrep_utils.h" #include "wsrep_utils.h"
#include "wsrep_mysqld.h" #include "wsrep_mysqld.h"
//#include "wsrep_api.h"
//#include "wsrep_priv.h" #include <sql_class.h>
#include <spawn.h> // posix_spawn() #include <spawn.h> // posix_spawn()
#include <unistd.h> // pipe() #include <unistd.h> // pipe()
#include <errno.h> // errno #include <errno.h> // errno
#include <string.h> // strerror() #include <string.h> // strerror()
#include <sys/wait.h> // waitpid() #include <sys/wait.h> // waitpid()
#include <sys/types.h>
#include <sql_class.h> #include <sys/socket.h>
#include "wsrep_priv.h" #include <netdb.h> // getaddrinfo()
extern char** environ; // environment variables extern char** environ; // environment variables
@ -317,23 +318,69 @@ thd::~thd ()
} // namespace wsp } // namespace wsp
extern ulong my_bind_addr; /* Returns INADDR_NONE, INADDR_ANY, INADDR_LOOPBACK or something else */
unsigned int wsrep_check_ip (const char* const addr)
{
unsigned int ret = INADDR_NONE;
struct addrinfo *res, hints;
memset (&hints, 0, sizeof(hints));
hints.ai_flags= AI_PASSIVE/*|AI_ADDRCONFIG*/;
hints.ai_socktype= SOCK_STREAM;
hints.ai_family= AF_UNSPEC;
int gai_ret = getaddrinfo(addr, NULL, &hints, &res);
if (0 == gai_ret)
{
if (AF_INET == res->ai_family) /* IPv4 */
{
struct sockaddr_in* a= (struct sockaddr_in*)res->ai_addr;
ret= htonl(a->sin_addr.s_addr);
}
else /* IPv6 */
{
struct sockaddr_in6* a= (struct sockaddr_in6*)res->ai_addr;
if (IN6_IS_ADDR_UNSPECIFIED(&a->sin6_addr))
ret= INADDR_ANY;
else if (IN6_IS_ADDR_LOOPBACK(&a->sin6_addr))
ret= INADDR_LOOPBACK;
else
ret= 0xdeadbeef;
}
freeaddrinfo (res);
}
else {
WSREP_ERROR ("getaddrinfo() failed on '%s': %d (%s)",
addr, gai_ret, gai_strerror(gai_ret));
}
// uint8_t* b= (uint8_t*)&ret;
// fprintf (stderr, "########## wsrep_check_ip returning: %hhu.%hhu.%hhu.%hhu\n",
// b[0], b[1], b[2], b[3]);
return ret;
}
extern const char* my_bind_addr_str;
extern uint mysqld_port; extern uint mysqld_port;
size_t guess_ip (char* buf, size_t buf_len) size_t wsrep_guess_ip (char* buf, size_t buf_len)
{ {
size_t ip_len = 0; size_t ip_len = 0;
if (htonl(INADDR_NONE) == my_bind_addr) { if (my_bind_addr_str && strlen(my_bind_addr_str))
{
unsigned int const ip_type= wsrep_check_ip(my_bind_addr_str);
if (INADDR_NONE == ip_type) {
WSREP_ERROR("Networking not configured, cannot receive state transfer."); WSREP_ERROR("Networking not configured, cannot receive state transfer.");
return 0; return 0;
} }
if (htonl(INADDR_ANY) != my_bind_addr) { if (INADDR_ANY != ip_type) {;
uint8_t* b = (uint8_t*)&my_bind_addr; strncpy (buf, my_bind_addr_str, buf_len);
ip_len = snprintf (buf, buf_len, return strlen(buf);
"%hhu.%hhu.%hhu.%hhu", b[0],b[1],b[2],b[3]); }
return ip_len;
} }
// mysqld binds to all interfaces - try IP from wsrep_node_address // mysqld binds to all interfaces - try IP from wsrep_node_address
@ -404,9 +451,9 @@ size_t guess_ip (char* buf, size_t buf_len)
return ip_len; return ip_len;
} }
size_t guess_address(char* buf, size_t buf_len) size_t wsrep_guess_address(char* buf, size_t buf_len)
{ {
size_t addr_len = guess_ip (buf, buf_len); size_t addr_len = wsrep_guess_ip (buf, buf_len);
if (addr_len && addr_len < buf_len) { if (addr_len && addr_len < buf_len) {
addr_len += snprintf (buf + addr_len, buf_len - addr_len, addr_len += snprintf (buf + addr_len, buf_len - addr_len,

View File

@ -38,7 +38,6 @@ const char* wsrep_node_address = 0;
const char* wsrep_node_incoming_address = 0; const char* wsrep_node_incoming_address = 0;
const char* wsrep_start_position = 0; const char* wsrep_start_position = 0;
ulong wsrep_OSU_method_options; ulong wsrep_OSU_method_options;
static int wsrep_thread_change = 0;
int wsrep_init_vars() int wsrep_init_vars()
{ {
@ -61,15 +60,6 @@ bool wsrep_on_update (sys_var *self, THD* thd, enum_var_type var_type)
// FIXME: this variable probably should be changed only per session // FIXME: this variable probably should be changed only per session
thd->variables.wsrep_on = global_system_variables.wsrep_on; thd->variables.wsrep_on = global_system_variables.wsrep_on;
} }
else {
}
#ifdef REMOVED
if (thd->variables.wsrep_on)
thd->variables.option_bits |= (OPTION_BIN_LOG);
else
thd->variables.option_bits &= ~(OPTION_BIN_LOG);
#endif
return false; return false;
} }
@ -78,8 +68,6 @@ void wsrep_causal_reads_update (sys_var *self, THD* thd, enum_var_type var_type)
if (var_type == OPT_GLOBAL) { if (var_type == OPT_GLOBAL) {
thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads; thd->variables.wsrep_causal_reads = global_system_variables.wsrep_causal_reads;
} }
else {
}
} }
static int wsrep_start_position_verify (const char* start_str) static int wsrep_start_position_verify (const char* start_str)
@ -149,7 +137,7 @@ bool wsrep_start_position_update (sys_var *self, THD* thd, enum_var_type type)
wsrep_set_local_position (wsrep_start_position); wsrep_set_local_position (wsrep_start_position);
if (wsrep) { if (wsrep) {
wsrep->sst_received (wsrep, &local_uuid, local_seqno, NULL, 0); wsrep_sst_received (wsrep, &local_uuid, local_seqno, NULL, 0);
} }
return 0; return 0;
@ -455,7 +443,7 @@ void wsrep_node_address_init (const char* value)
bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var) bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var)
{ {
mysql_mutex_lock(&LOCK_wsrep_slave_threads); mysql_mutex_lock(&LOCK_wsrep_slave_threads);
wsrep_thread_change = var->value->val_int() - wsrep_slave_threads; wsrep_slave_count_change = var->value->val_int() - wsrep_slave_threads;
mysql_mutex_unlock(&LOCK_wsrep_slave_threads); mysql_mutex_unlock(&LOCK_wsrep_slave_threads);
return 0; return 0;
@ -463,13 +451,10 @@ bool wsrep_slave_threads_check (sys_var *self, THD* thd, set_var* var)
bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type) bool wsrep_slave_threads_update (sys_var *self, THD* thd, enum_var_type type)
{ {
if (wsrep_thread_change > 0) if (wsrep_slave_count_change > 0)
{ {
wsrep_create_appliers(wsrep_thread_change); wsrep_create_appliers(wsrep_slave_count_change);
} wsrep_slave_count_change = 0;
else if (wsrep_thread_change < 0)
{
wsrep_close_applier_threads(-wsrep_thread_change);
} }
return false; return false;
} }

View File

@ -127,9 +127,9 @@ extern MYSQL_PLUGIN_IMPORT mysql_mutex_t LOCK_wsrep_rollback;
extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback; extern MYSQL_PLUGIN_IMPORT mysql_cond_t COND_wsrep_rollback;
extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd; extern MYSQL_PLUGIN_IMPORT wsrep_aborting_thd_t wsrep_aborting_thd;
static inline wsrep_trx_handle_t* static inline wsrep_ws_handle_t*
wsrep_trx_handle(THD* thd, const trx_t* trx) { wsrep_ws_handle(THD* thd, const trx_t* trx) {
return wsrep_trx_handle_for_id(wsrep_thd_trx_handle(thd), return wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd),
(wsrep_trx_id_t)trx->id); (wsrep_trx_id_t)trx->id);
} }
@ -137,7 +137,7 @@ extern bool wsrep_prepare_key_for_innodb(const uchar *cache_key,
size_t cache_key_len, size_t cache_key_len,
const uchar* row_id, const uchar* row_id,
size_t row_id_len, size_t row_id_len,
wsrep_key_part_t* key, wsrep_buf_t* key,
size_t* key_len); size_t* key_len);
extern handlerton * wsrep_hton; extern handlerton * wsrep_hton;
@ -9190,6 +9190,7 @@ wsrep_append_foreign_key(
ulint rcode = DB_SUCCESS; ulint rcode = DB_SUCCESS;
char cache_key[513] = {'\0'}; char cache_key[513] = {'\0'};
int cache_key_len; int cache_key_len;
bool const copy = true;
if (!wsrep_on(trx->mysql_thd) || if (!wsrep_on(trx->mysql_thd) ||
wsrep_thd_exec_mode(thd) != LOCAL_STATE) wsrep_thd_exec_mode(thd) != LOCAL_STATE)
@ -9316,14 +9317,14 @@ wsrep_append_foreign_key(
foreign->foreign_table->name); foreign->foreign_table->name);
} }
wsrep_key_part_t wkey_part[3]; wsrep_buf_t wkey_part[3];
wsrep_key_t wkey = {wkey_part, 3}; wsrep_key_t wkey = {wkey_part, 3};
if (!wsrep_prepare_key_for_innodb( if (!wsrep_prepare_key_for_innodb(
(const uchar*)cache_key, (const uchar*)cache_key,
cache_key_len + 1, cache_key_len + 1,
(const uchar*)key, len+1, (const uchar*)key, len+1,
wkey_part, wkey_part,
&wkey.key_parts_len)) { (size_t*)&wkey.key_parts_num)) {
WSREP_WARN("key prepare failed for cascaded FK: %s", WSREP_WARN("key prepare failed for cascaded FK: %s",
(wsrep_thd_query(thd)) ? (wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void"); wsrep_thd_query(thd) : "void");
@ -9331,10 +9332,11 @@ wsrep_append_foreign_key(
} }
rcode = (int)wsrep->append_key( rcode = (int)wsrep->append_key(
wsrep, wsrep,
wsrep_trx_handle(thd, trx), wsrep_ws_handle(thd, trx),
&wkey, &wkey,
1, 1,
shared); shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
copy);
if (rcode) { if (rcode) {
DBUG_PRINT("wsrep", ("row key failed: %lu", rcode)); DBUG_PRINT("wsrep", ("row key failed: %lu", rcode));
WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu", WSREP_ERROR("Appending cascaded fk row key failed: %s, %lu",
@ -9359,6 +9361,7 @@ wsrep_append_key(
) )
{ {
DBUG_ENTER("wsrep_append_key"); DBUG_ENTER("wsrep_append_key");
bool const copy = true;
#ifdef WSREP_DEBUG_PRINT #ifdef WSREP_DEBUG_PRINT
fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ", fprintf(stderr, "%s conn %ld, trx %llu, keylen %d, table %s ",
(shared) ? "Shared" : "Exclusive", (shared) ? "Shared" : "Exclusive",
@ -9369,14 +9372,14 @@ wsrep_append_key(
} }
fprintf(stderr, "\n"); fprintf(stderr, "\n");
#endif #endif
wsrep_key_part_t wkey_part[3]; wsrep_buf_t wkey_part[3];
wsrep_key_t wkey = {wkey_part, 3}; wsrep_key_t wkey = {wkey_part, 3};
if (!wsrep_prepare_key_for_innodb( if (!wsrep_prepare_key_for_innodb(
(const uchar*)table_share->table_cache_key.str, (const uchar*)table_share->table_cache_key.str,
table_share->table_cache_key.length, table_share->table_cache_key.length,
(const uchar*)key, key_len, (const uchar*)key, key_len,
wkey_part, wkey_part,
&wkey.key_parts_len)) { (size_t*)&wkey.key_parts_num)) {
WSREP_WARN("key prepare failed for: %s", WSREP_WARN("key prepare failed for: %s",
(wsrep_thd_query(thd)) ? (wsrep_thd_query(thd)) ?
wsrep_thd_query(thd) : "void"); wsrep_thd_query(thd) : "void");
@ -9385,10 +9388,11 @@ wsrep_append_key(
int rcode = (int)wsrep->append_key( int rcode = (int)wsrep->append_key(
wsrep, wsrep,
wsrep_trx_handle(thd, trx), wsrep_ws_handle(thd, trx),
&wkey, &wkey,
1, 1,
shared); shared ? WSREP_KEY_SHARED : WSREP_KEY_EXCLUSIVE,
copy);
if (rcode) { if (rcode) {
DBUG_PRINT("wsrep", ("row key failed: %d", rcode)); DBUG_PRINT("wsrep", ("row key failed: %d", rcode));
WSREP_WARN("Appending row key failed: %s, %d", WSREP_WARN("Appending row key failed: %s, %d",
@ -16915,7 +16919,7 @@ wsrep_fake_trx_id(
trx_id_t trx_id = trx_sys_get_new_trx_id(); trx_id_t trx_id = trx_sys_get_new_trx_id();
mutex_exit(&trx_sys->mutex); mutex_exit(&trx_sys->mutex);
(void *)wsrep_trx_handle_for_id(wsrep_thd_trx_handle(thd), trx_id); (void *)wsrep_ws_handle_for_trx(wsrep_thd_ws_handle(thd), trx_id);
} }
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */

View File

@ -470,7 +470,7 @@ extern "C" enum wsrep_query_state wsrep_thd_query_state(THD *thd);
extern "C" const char * wsrep_thd_exec_mode_str(THD *thd); extern "C" const char * wsrep_thd_exec_mode_str(THD *thd);
extern "C" const char * wsrep_thd_conflict_state_str(THD *thd); extern "C" const char * wsrep_thd_conflict_state_str(THD *thd);
extern "C" const char * wsrep_thd_query_state_str(THD *thd); extern "C" const char * wsrep_thd_query_state_str(THD *thd);
extern "C" wsrep_trx_handle_t* wsrep_thd_trx_handle(THD *thd); extern "C" wsrep_ws_handle_t* wsrep_thd_ws_handle(THD *thd);
extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode); extern "C" void wsrep_thd_set_exec_mode(THD *thd, enum wsrep_exec_mode mode);
extern "C" void wsrep_thd_set_query_state( extern "C" void wsrep_thd_set_query_state(

View File

@ -290,7 +290,7 @@ innobase_casedn_str(
UNIV_INTERN UNIV_INTERN
int int
wsrep_innobase_kill_one_trx(const trx_t *bf_trx, trx_t *victim_trx, ibool signal); wsrep_innobase_kill_one_trx(const trx_t *bf_trx, trx_t *victim_trx, ibool signal);
int wsrep_thd_is_brute_force(void *thd_ptr); extern "C" int wsrep_thd_is_brute_force(void *thd_ptr);
int wsrep_trx_order_before(void *thd1, void *thd2); int wsrep_trx_order_before(void *thd1, void *thd2);
void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number, void wsrep_innobase_mysql_sort(int mysql_type, uint charset_number,
unsigned char* str, unsigned int str_length); unsigned char* str, unsigned int str_length);

File diff suppressed because it is too large Load Diff

View File

@ -16,10 +16,11 @@
/*! @file Dummy wsrep API implementation. */ /*! @file Dummy wsrep API implementation. */
#include <errno.h>
#include "wsrep_api.h" #include "wsrep_api.h"
#include <errno.h>
#include <stdbool.h>
/*! Dummy backend context. */ /*! Dummy backend context. */
typedef struct wsrep_dummy typedef struct wsrep_dummy
{ {
@ -76,7 +77,8 @@ static wsrep_status_t dummy_connect(
wsrep_t* w, wsrep_t* w,
const char* name __attribute__((unused)), const char* name __attribute__((unused)),
const char* url __attribute__((unused)), const char* url __attribute__((unused)),
const char* donor __attribute__((unused))) const char* donor __attribute__((unused)),
wsrep_bool_t bootstrap __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -98,11 +100,11 @@ static wsrep_status_t dummy_recv(wsrep_t* w,
static wsrep_status_t dummy_pre_commit( static wsrep_status_t dummy_pre_commit(
wsrep_t* w, wsrep_t* w,
const wsrep_conn_id_t conn_id __attribute__((unused)), const wsrep_conn_id_t conn_id __attribute__((unused)),
wsrep_trx_handle_t* trx_handle __attribute__((unused)), wsrep_ws_handle_t* ws_handle __attribute__((unused)),
const void* query __attribute__((unused)), // const struct wsrep_buf* data __attribute__((unused)),
const size_t query_len __attribute__((unused)), // const long count __attribute__((unused)),
uint64_t flags __attribute__((unused)), uint64_t flags __attribute__((unused)),
wsrep_seqno_t* seqno __attribute__((unused))) wsrep_trx_meta_t* meta __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -110,7 +112,7 @@ static wsrep_status_t dummy_pre_commit(
static wsrep_status_t dummy_post_commit( static wsrep_status_t dummy_post_commit(
wsrep_t* w, wsrep_t* w,
wsrep_trx_handle_t* trx_handle __attribute__((unused))) wsrep_ws_handle_t* ws_handle __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -118,7 +120,7 @@ static wsrep_status_t dummy_post_commit(
static wsrep_status_t dummy_post_rollback( static wsrep_status_t dummy_post_rollback(
wsrep_t* w, wsrep_t* w,
wsrep_trx_handle_t* trx_handle __attribute__((unused))) wsrep_ws_handle_t* ws_handle __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -126,7 +128,7 @@ static wsrep_status_t dummy_post_rollback(
static wsrep_status_t dummy_replay_trx( static wsrep_status_t dummy_replay_trx(
wsrep_t* w, wsrep_t* w,
wsrep_trx_handle_t* trx_handle __attribute__((unused)), wsrep_ws_handle_t* ws_handle __attribute__((unused)),
void* trx_ctx __attribute__((unused))) void* trx_ctx __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
@ -142,23 +144,13 @@ static wsrep_status_t dummy_abort_pre_commit(
return WSREP_OK; return WSREP_OK;
} }
static wsrep_status_t dummy_append_query( static wsrep_status_t dummy_append_key(
wsrep_t* w, wsrep_t* w,
wsrep_trx_handle_t* trx_handle __attribute__((unused)), wsrep_ws_handle_t* ws_handle __attribute__((unused)),
const char* query __attribute__((unused)),
const time_t timeval __attribute__((unused)),
const uint32_t randseed __attribute__((unused)))
{
WSREP_DBUG_ENTER(w);
return WSREP_OK;
}
static wsrep_status_t dummy_append_row_key(
wsrep_t* w,
wsrep_trx_handle_t* trx_handle __attribute__((unused)),
const wsrep_key_t* key __attribute__((unused)), const wsrep_key_t* key __attribute__((unused)),
const size_t key_len __attribute__((unused)), const int key_num __attribute__((unused)),
const bool shared __attribute__((unused))) const wsrep_key_type_t key_type __attribute__((unused)),
const bool copy __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -166,9 +158,11 @@ static wsrep_status_t dummy_append_row_key(
static wsrep_status_t dummy_append_data( static wsrep_status_t dummy_append_data(
wsrep_t* w, wsrep_t* w,
wsrep_trx_handle_t* trx_handle __attribute__((unused)), wsrep_ws_handle_t* ws_handle __attribute__((unused)),
const void* data __attribute__((unused)), const struct wsrep_buf* data __attribute__((unused)),
size_t data_len __attribute__((unused))) const int count __attribute__((unused)),
const wsrep_data_type_t type __attribute__((unused)),
const bool copy __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -176,7 +170,7 @@ static wsrep_status_t dummy_append_data(
static wsrep_status_t dummy_causal_read( static wsrep_status_t dummy_causal_read(
wsrep_t* w, wsrep_t* w,
wsrep_seqno_t* seqno __attribute__((unused))) wsrep_gtid_t* gtid __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -194,10 +188,10 @@ static wsrep_status_t dummy_to_execute_start(
wsrep_t* w, wsrep_t* w,
const wsrep_conn_id_t conn_id __attribute__((unused)), const wsrep_conn_id_t conn_id __attribute__((unused)),
const wsrep_key_t* key __attribute__((unused)), const wsrep_key_t* key __attribute__((unused)),
const size_t key_len __attribute__((unused)), const int key_num __attribute__((unused)),
const void* query __attribute__((unused)), const struct wsrep_buf* data __attribute__((unused)),
const size_t query_len __attribute__((unused)), const int count __attribute__((unused)),
wsrep_seqno_t* seqno __attribute__((unused))) wsrep_trx_meta_t* meta __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -211,6 +205,19 @@ static wsrep_status_t dummy_to_execute_end(
return WSREP_OK; return WSREP_OK;
} }
static wsrep_status_t dummy_preordered(
wsrep_t* w,
const wsrep_uuid_t* source_id __attribute__((unused)),
int pa_range __attribute__((unused)),
const struct wsrep_buf* data __attribute__((unused)),
int count __attribute__((unused)),
uint64_t flags __attribute__((unused)),
wsrep_bool_t copy __attribute__((unused)))
{
WSREP_DBUG_ENTER(w);
return WSREP_OK;
}
static wsrep_status_t dummy_sst_sent( static wsrep_status_t dummy_sst_sent(
wsrep_t* w, wsrep_t* w,
const wsrep_uuid_t* uuid __attribute__((unused)), const wsrep_uuid_t* uuid __attribute__((unused)),
@ -234,7 +241,7 @@ static wsrep_status_t dummy_sst_received(
static wsrep_status_t dummy_snapshot( static wsrep_status_t dummy_snapshot(
wsrep_t* w, wsrep_t* w,
const void* msg __attribute__((unused)), const void* msg __attribute__((unused)),
const size_t msg_len __attribute__((unused)), const int msg_len __attribute__((unused)),
const char* donor_spec __attribute__((unused))) const char* donor_spec __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
@ -258,6 +265,11 @@ static void dummy_stats_free (
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
} }
static void dummy_stats_reset (wsrep_t* w)
{
WSREP_DBUG_ENTER(w);
}
static wsrep_seqno_t dummy_pause (wsrep_t* w) static wsrep_seqno_t dummy_pause (wsrep_t* w)
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
@ -284,7 +296,8 @@ static wsrep_status_t dummy_resync (wsrep_t* w)
static wsrep_status_t dummy_lock (wsrep_t* w, static wsrep_status_t dummy_lock (wsrep_t* w,
const char* s __attribute__((unused)), const char* s __attribute__((unused)),
int64_t o __attribute__((unused)), bool r __attribute__((unused)),
uint64_t o __attribute__((unused)),
int64_t t __attribute__((unused))) int64_t t __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
@ -293,7 +306,7 @@ static wsrep_status_t dummy_lock (wsrep_t* w,
static wsrep_status_t dummy_unlock (wsrep_t* w, static wsrep_status_t dummy_unlock (wsrep_t* w,
const char* s __attribute__((unused)), const char* s __attribute__((unused)),
int64_t o __attribute__((unused))) uint64_t o __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
return WSREP_OK; return WSREP_OK;
@ -301,7 +314,7 @@ static wsrep_status_t dummy_unlock (wsrep_t* w,
static bool dummy_is_locked (wsrep_t* w, static bool dummy_is_locked (wsrep_t* w,
const char* s __attribute__((unused)), const char* s __attribute__((unused)),
int64_t* o __attribute__((unused)), uint64_t* o __attribute__((unused)),
wsrep_uuid_t* t __attribute__((unused))) wsrep_uuid_t* t __attribute__((unused)))
{ {
WSREP_DBUG_ENTER(w); WSREP_DBUG_ENTER(w);
@ -322,18 +335,19 @@ static wsrep_t dummy_iface = {
&dummy_post_rollback, &dummy_post_rollback,
&dummy_replay_trx, &dummy_replay_trx,
&dummy_abort_pre_commit, &dummy_abort_pre_commit,
&dummy_append_query, &dummy_append_key,
&dummy_append_row_key,
&dummy_append_data, &dummy_append_data,
&dummy_causal_read, &dummy_causal_read,
&dummy_free_connection, &dummy_free_connection,
&dummy_to_execute_start, &dummy_to_execute_start,
&dummy_to_execute_end, &dummy_to_execute_end,
&dummy_preordered,
&dummy_sst_sent, &dummy_sst_sent,
&dummy_sst_received, &dummy_sst_received,
&dummy_snapshot, &dummy_snapshot,
&dummy_stats_get, &dummy_stats_get,
&dummy_stats_free, &dummy_stats_free,
&dummy_stats_reset,
&dummy_pause, &dummy_pause,
&dummy_resume, &dummy_resume,
&dummy_desync, &dummy_desync,
@ -344,6 +358,7 @@ static wsrep_t dummy_iface = {
WSREP_NONE, WSREP_NONE,
WSREP_INTERFACE_VERSION, WSREP_INTERFACE_VERSION,
"Codership Oy <info@codership.com>", "Codership Oy <info@codership.com>",
0xdeadbeef,
&dummy_free, &dummy_free,
NULL, NULL,
NULL NULL

View File

@ -70,15 +70,18 @@ static int verify(const wsrep_t *wh, const char *iface_ver)
VERIFY(wh->post_rollback); VERIFY(wh->post_rollback);
VERIFY(wh->replay_trx); VERIFY(wh->replay_trx);
VERIFY(wh->abort_pre_commit); VERIFY(wh->abort_pre_commit);
VERIFY(wh->append_query);
VERIFY(wh->append_key); VERIFY(wh->append_key);
VERIFY(wh->append_data);
VERIFY(wh->free_connection); VERIFY(wh->free_connection);
VERIFY(wh->to_execute_start); VERIFY(wh->to_execute_start);
VERIFY(wh->to_execute_end); VERIFY(wh->to_execute_end);
VERIFY(wh->preordered_collect);
VERIFY(wh->preordered_commit);
VERIFY(wh->sst_sent); VERIFY(wh->sst_sent);
VERIFY(wh->sst_received); VERIFY(wh->sst_received);
VERIFY(wh->stats_get); VERIFY(wh->stats_get);
VERIFY(wh->stats_free); VERIFY(wh->stats_free);
VERIFY(wh->stats_reset);
VERIFY(wh->pause); VERIFY(wh->pause);
VERIFY(wh->resume); VERIFY(wh->resume);
VERIFY(wh->desync); VERIFY(wh->desync);
@ -93,6 +96,7 @@ static int verify(const wsrep_t *wh, const char *iface_ver)
return 0; return 0;
} }
typedef int (*wsrep_loader_fun)(wsrep_t*);
static wsrep_loader_fun wsrep_dlf(void *dlh, const char *sym) static wsrep_loader_fun wsrep_dlf(void *dlh, const char *sym)
{ {
@ -175,7 +179,7 @@ out:
*hptr = NULL; *hptr = NULL;
} else { } else {
snprintf (msg, msg_len, snprintf (msg, msg_len,
"wsrep_load(): %s %s by %s loaded succesfully.", "wsrep_load(): %s %s by %s loaded successfully.",
(*hptr)->provider_name, (*hptr)->provider_version, (*hptr)->provider_name, (*hptr)->provider_version,
(*hptr)->provider_vendor); (*hptr)->provider_vendor);
logger (WSREP_LOG_INFO, msg); logger (WSREP_LOG_INFO, msg);

View File

@ -26,25 +26,31 @@
* Read UUID from string * Read UUID from string
* @return length of UUID string representation or -EINVAL in case of error * @return length of UUID string representation or -EINVAL in case of error
*/ */
ssize_t int
wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid) wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid)
{ {
size_t uuid_len = 0; unsigned int uuid_len = 0;
size_t uuid_offt = 0; unsigned int uuid_offt = 0;
while (uuid_len + 1 < str_len) { while (uuid_len + 1 < str_len) {
if ((4 == uuid_offt || 6 == uuid_offt || 8 == uuid_offt || /* We are skipping potential '-' after uuid_offt == 4, 6, 8, 10
10 == uuid_offt) && str[uuid_len] == '-') { * which means
* (uuid_offt >> 1) == 2, 3, 4, 5,
* which in turn means
* (uuid_offt >> 1) - 2 <= 3
* since it is always >= 0, because uuid_offt is unsigned */
if (((uuid_offt >> 1) - 2) <= 3 && str[uuid_len] == '-') {
// skip dashes after 4th, 6th, 8th and 10th positions // skip dashes after 4th, 6th, 8th and 10th positions
uuid_len += 1; uuid_len += 1;
continue; continue;
} }
if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) { if (isxdigit(str[uuid_len]) && isxdigit(str[uuid_len + 1])) {
// got hex digit // got hex digit, scan another byte to uuid, increment uuid_offt
sscanf (str + uuid_len, "%2hhx", uuid->uuid + uuid_offt); sscanf (str + uuid_len, "%2hhx", uuid->data + uuid_offt);
uuid_len += 2; uuid_len += 2;
uuid_offt += 1; uuid_offt += 1;
if (sizeof (uuid->uuid) == uuid_offt) if (sizeof (uuid->data) == uuid_offt)
return uuid_len; return uuid_len;
} }
else { else {
@ -61,11 +67,11 @@ wsrep_uuid_scan (const char* str, size_t str_len, wsrep_uuid_t* uuid)
* @return length of UUID string representation or -EMSGSIZE if string is too * @return length of UUID string representation or -EMSGSIZE if string is too
* short * short
*/ */
ssize_t int
wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len) wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len)
{ {
if (str_len > 36) { if (str_len > 36) {
const unsigned char* u = uuid->uuid; const unsigned char* u = uuid->data;
return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-" return snprintf(str, str_len, "%02x%02x%02x%02x-%02x%02x-%02x%02x-"
"%02x%02x-%02x%02x%02x%02x%02x%02x", "%02x%02x-%02x%02x%02x%02x%02x%02x",
u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7], u[ 0], u[ 1], u[ 2], u[ 3], u[ 4], u[ 5], u[ 6], u[ 7],
@ -75,4 +81,3 @@ wsrep_uuid_print (const wsrep_uuid_t* uuid, char* str, size_t str_len)
return -EMSGSIZE; return -EMSGSIZE;
} }
} }