MDEV-13073. This part patch weeds out RUN_HOOK from the server as semisync

is defined statically. Consequently the observer interfaces are removed
as well.
This commit is contained in:
Andrei Elkin 2017-11-22 19:34:42 +02:00 committed by Monty
parent e972125f11
commit 74b35b6874
17 changed files with 137 additions and 942 deletions

View File

@ -90,7 +90,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/scheduler.cc ../sql/sql_audit.cc
../sql/sql_alter.cc ../sql/sql_partition_admin.cc
../sql/event_parse_data.cc
../sql/sql_signal.cc ../sql/rpl_handler.cc
../sql/sql_signal.cc
../sql/sys_vars.cc
${CMAKE_BINARY_DIR}/sql/sql_builtin.cc
../sql/mdl.cc ../sql/transaction.cc

View File

@ -4,7 +4,7 @@ where name like 'Wait/Synch/Mutex/sql/%'
and name not in ('wait/synch/mutex/sql/DEBUG_SYNC::mutex')
order by name limit 10;
NAME ENABLED TIMED
wait/synch/mutex/sql/Ack_receiver::m_mutex YES YES
wait/synch/mutex/sql/Ack_receiver::mutex YES YES
wait/synch/mutex/sql/Cversion_lock YES YES
wait/synch/mutex/sql/Delayed_insert::mutex YES YES
wait/synch/mutex/sql/Event_scheduler::LOCK_scheduler_state YES YES
@ -36,7 +36,7 @@ where name like 'Wait/Synch/Cond/sql/%'
'wait/synch/cond/sql/DEBUG_SYNC::cond')
order by name limit 10;
NAME ENABLED TIMED
wait/synch/cond/sql/Ack_receiver::m_cond YES YES
wait/synch/cond/sql/Ack_receiver::cond YES YES
wait/synch/cond/sql/COND_binlog_send YES YES
wait/synch/cond/sql/COND_flush_thread_cache YES YES
wait/synch/cond/sql/COND_group_commit_orderer YES YES

View File

@ -1,5 +1,5 @@
--- sysvars_server_notembedded.result 2017-11-17 17:00:22.470630255 +0100
+++ sysvars_server_notembedded,32bit.reject 2017-11-17 19:12:42.732453556 +0100
--- sysvars_server_notembedded.result 2017-12-15 20:57:40.174654761 +0200
+++ sysvars_server_notembedded,32bit.reject 2017-12-15 21:02:20.476044700 +0200
@@ -58,7 +58,7 @@
GLOBAL_VALUE_ORIGIN CONFIG
DEFAULT_VALUE 1
@ -1116,7 +1116,46 @@
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -4048,7 +4048,7 @@
@@ -4034,10 +4034,10 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 10000
VARIABLE_SCOPE GLOBAL
-VARIABLE_TYPE BIGINT UNSIGNED
+VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT The timeout value (in ms) for semi-synchronous replication in the master
NUMERIC_MIN_VALUE 0
-NUMERIC_MAX_VALUE 18446744073709551615
+NUMERIC_MAX_VALUE 4294967295
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -4048,10 +4048,10 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 32
VARIABLE_SCOPE GLOBAL
-VARIABLE_TYPE BIGINT UNSIGNED
+VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT The tracing level for semi-sync replication.
NUMERIC_MIN_VALUE 0
-NUMERIC_MAX_VALUE 18446744073709551615
+NUMERIC_MAX_VALUE 4294967295
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -4132,10 +4132,10 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 32
VARIABLE_SCOPE GLOBAL
-VARIABLE_TYPE BIGINT UNSIGNED
+VARIABLE_TYPE INT UNSIGNED
VARIABLE_COMMENT The tracing level for semi-sync replication.
NUMERIC_MIN_VALUE 0
-NUMERIC_MAX_VALUE 18446744073709551615
+NUMERIC_MAX_VALUE 4294967295
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -4174,7 +4174,7 @@
GLOBAL_VALUE_ORIGIN CONFIG
DEFAULT_VALUE 1
VARIABLE_SCOPE SESSION
@ -1125,7 +1164,7 @@
VARIABLE_COMMENT Uniquely identifies the server instance in the community of replication partners
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 4294967295
@@ -4230,7 +4230,7 @@
@@ -4356,7 +4356,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 0
VARIABLE_SCOPE GLOBAL
@ -1134,7 +1173,7 @@
VARIABLE_COMMENT Maximum number of parallel threads to use on slave for events in a single replication domain. When using multiple domains, this can be used to limit a single domain from grabbing all threads and thus stalling other domains. The default of 0 means to allow a domain to grab as many threads as it wants, up to the value of slave_parallel_threads.
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 16383
@@ -4272,7 +4272,7 @@
@@ -4398,7 +4398,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 1073741824
VARIABLE_SCOPE GLOBAL
@ -1143,7 +1182,7 @@
VARIABLE_COMMENT The maximum packet length to sent successfully from the master to slave.
NUMERIC_MIN_VALUE 1024
NUMERIC_MAX_VALUE 1073741824
@@ -4300,7 +4300,7 @@
@@ -4426,7 +4426,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 131072
VARIABLE_SCOPE GLOBAL
@ -1152,7 +1191,7 @@
VARIABLE_COMMENT Limit on how much memory SQL threads should use per parallel replication thread when reading ahead in the relay log looking for opportunities for parallel replication. Only used when --slave-parallel-threads > 0.
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 2147483647
@@ -4328,7 +4328,7 @@
@@ -4454,7 +4454,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 0
VARIABLE_SCOPE GLOBAL
@ -1161,7 +1200,7 @@
VARIABLE_COMMENT If non-zero, number of threads to spawn to apply in parallel events on the slave that were group-committed on the master or were logged with GTID in different replication domains. Note that these threads are in addition to the IO and SQL threads, which are always created by a replication slave
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 16383
@@ -4342,7 +4342,7 @@
@@ -4468,7 +4468,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 0
VARIABLE_SCOPE GLOBAL
@ -1170,7 +1209,7 @@
VARIABLE_COMMENT Alias for slave_parallel_threads
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 16383
@@ -4398,7 +4398,7 @@
@@ -4524,7 +4524,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 10
VARIABLE_SCOPE GLOBAL
@ -1179,7 +1218,7 @@
VARIABLE_COMMENT Number of times the slave SQL thread will retry a transaction in case it failed with a deadlock, elapsed lock wait timeout or listed in slave_transaction_retry_errors, before giving up and stopping
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 4294967295
@@ -4426,7 +4426,7 @@
@@ -4552,7 +4552,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 0
VARIABLE_SCOPE GLOBAL
@ -1188,7 +1227,7 @@
VARIABLE_COMMENT Interval of the slave SQL thread will retry a transaction in case it failed with a deadlock or elapsed lock wait timeout or listed in slave_transaction_retry_errors
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 3600
@@ -4257,7 +4257,7 @@
@@ -4580,7 +4580,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 2
VARIABLE_SCOPE GLOBAL
@ -1197,7 +1236,7 @@
VARIABLE_COMMENT If creating the thread takes longer than this value (in seconds), the Slow_launch_threads counter will be incremented
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 31536000
@@ -4316,7 +4316,7 @@
@@ -4639,7 +4639,7 @@
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Each thread that needs to do a sort allocates a buffer of this size
NUMERIC_MIN_VALUE 1024
@ -1206,7 +1245,7 @@
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -4621,7 +4621,7 @@
@@ -4944,7 +4944,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 256
VARIABLE_SCOPE GLOBAL
@ -1215,7 +1254,7 @@
VARIABLE_COMMENT The soft upper limit for number of cached stored routines for one connection.
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 524288
@@ -4719,7 +4719,7 @@
@@ -5042,7 +5042,7 @@
GLOBAL_VALUE_ORIGIN AUTO
DEFAULT_VALUE 400
VARIABLE_SCOPE GLOBAL
@ -1224,7 +1263,7 @@
VARIABLE_COMMENT The number of cached table definitions
NUMERIC_MIN_VALUE 400
NUMERIC_MAX_VALUE 524288
@@ -4733,7 +4733,7 @@
@@ -5056,7 +5056,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 2000
VARIABLE_SCOPE GLOBAL
@ -1233,7 +1272,7 @@
VARIABLE_COMMENT The number of cached open tables
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 1048576
@@ -4761,7 +4761,7 @@
@@ -5126,7 +5126,7 @@
GLOBAL_VALUE_ORIGIN AUTO
DEFAULT_VALUE 256
VARIABLE_SCOPE GLOBAL
@ -1242,7 +1281,7 @@
VARIABLE_COMMENT How many threads we should keep in a cache for reuse. These are freed after 5 minutes of idle time
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 16384
@@ -4775,7 +4775,7 @@
@@ -5140,7 +5140,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 10
VARIABLE_SCOPE GLOBAL
@ -1251,7 +1290,7 @@
VARIABLE_COMMENT Permits the application to give the threads system a hint for the desired number of threads that should be run at the same time.This variable has no effect, and is deprecated. It will be removed in a future release.
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 512
@@ -4980,15 +4980,15 @@
@@ -5345,15 +5345,15 @@
READ_ONLY YES
COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME TMP_DISK_TABLE_SIZE
@ -1271,7 +1310,7 @@
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -5002,7 +5002,7 @@
@@ -5367,7 +5367,7 @@
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT If an internal in-memory temporary table exceeds this size, MariaDB will automatically convert it to an on-disk MyISAM or Aria table. Same as tmp_table_size.
NUMERIC_MIN_VALUE 1024
@ -1280,7 +1319,7 @@
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -5016,7 +5016,7 @@
@@ -5381,7 +5381,7 @@
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Alias for tmp_memory_table_size. If an internal in-memory temporary table exceeds this size, MariaDB will automatically convert it to an on-disk MyISAM or Aria table.
NUMERIC_MIN_VALUE 1024
@ -1289,7 +1328,7 @@
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -5027,7 +5027,7 @@
@@ -5392,7 +5392,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 8192
VARIABLE_SCOPE SESSION
@ -1298,7 +1337,7 @@
VARIABLE_COMMENT Allocation block size for transactions to be stored in binary log
NUMERIC_MIN_VALUE 1024
NUMERIC_MAX_VALUE 134217728
@@ -5041,7 +5041,7 @@
@@ -5406,7 +5406,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 4096
VARIABLE_SCOPE SESSION
@ -1307,7 +1346,7 @@
VARIABLE_COMMENT Persistent buffer for transactions to be stored in binary log
NUMERIC_MIN_VALUE 1024
NUMERIC_MAX_VALUE 134217728
@@ -5139,7 +5139,7 @@
@@ -5504,7 +5504,7 @@
GLOBAL_VALUE_ORIGIN COMPILE-TIME
DEFAULT_VALUE 28800
VARIABLE_SCOPE SESSION
@ -1316,7 +1355,7 @@
VARIABLE_COMMENT The number of seconds the server waits for activity on a connection before closing it
NUMERIC_MIN_VALUE 1
NUMERIC_MAX_VALUE 31536000
@@ -5243,7 +5243,7 @@
@@ -5609,7 +5609,7 @@
COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME OPEN_FILES_LIMIT
VARIABLE_SCOPE GLOBAL
@ -1325,7 +1364,7 @@
VARIABLE_COMMENT If this is not 0, then mysqld will use this value to reserve file descriptors to use with setrlimit(). If this value is 0 or autoset then mysqld will reserve max_connections*5 or max_connections + table_cache*2 (whichever is larger) number of file descriptors
NUMERIC_MIN_VALUE 0
NUMERIC_MAX_VALUE 4294967295
@@ -5256,7 +5256,7 @@
@@ -5622,7 +5622,7 @@
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Sets the internal state of the RAND() generator for replication purposes
NUMERIC_MIN_VALUE 0
@ -1334,7 +1373,7 @@
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -5266,7 +5266,7 @@
@@ -5632,7 +5632,7 @@
VARIABLE_TYPE BIGINT UNSIGNED
VARIABLE_COMMENT Sets the internal state of the RAND() generator for replication purposes
NUMERIC_MIN_VALUE 0
@ -1343,7 +1382,7 @@
NUMERIC_BLOCK_SIZE 1
ENUM_VALUE_LIST NULL
READ_ONLY NO
@@ -5351,7 +5351,7 @@
@@ -5727,7 +5727,7 @@
VARIABLE_NAME LOG_TC_SIZE
GLOBAL_VALUE_ORIGIN AUTO
VARIABLE_SCOPE GLOBAL

View File

@ -122,7 +122,7 @@ SET (SQL_SOURCE
rpl_rli.cc rpl_mi.cc sql_servers.cc sql_audit.cc
sql_connect.cc scheduler.cc sql_partition_admin.cc
sql_profile.cc event_parse_data.cc sql_alter.cc
sql_signal.cc rpl_handler.cc mdl.cc sql_admin.cc
sql_signal.cc mdl.cc sql_admin.cc
transaction.cc sys_vars.cc sql_truncate.cc datadict.cc
sql_reload.cc item_inetfunc.cc

View File

@ -23,7 +23,7 @@
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
#include "rpl_handler.h"
#include "rpl_rli.h"
#include "sql_cache.h" // query_cache, query_cache_*
#include "sql_connect.h" // global_table_stats
#include "key.h" // key_copy, key_unpack, key_cmp_if_same, key_cmp
@ -1485,8 +1485,7 @@ done:
mysql_mutex_assert_not_owner(mysql_bin_log.get_log_lock());
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
#ifdef REPLICATION
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterCommit(thd, all);
DEBUG_SYNC(thd, "after_group_after_commit");
#endif
@ -1734,8 +1733,7 @@ int ha_rollback_trans(THD *thd, bool all)
push_warning(thd, Sql_condition::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER_THD(thd, ER_WARNING_NOT_COMPLETE_ROLLBACK));
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
#ifdef REPLICATION
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, all);
#endif
DBUG_RETURN(error);

View File

@ -49,7 +49,6 @@
#endif
#include "sql_plugin.h"
#include "rpl_handler.h"
#include "debug_sync.h"
#include "sql_show.h"
#include "my_pthread.h"
@ -6376,19 +6375,15 @@ err:
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
if ((error= RUN_HOOK(binlog_storage, after_flush,
(thd, log_file_name, file->pos_in_file,
synced, true, true)))
#ifdef REPLICATION
|| repl_semisync_master.reportBinlogUpdate(thd, log_file_name,
file->pos_in_file)
#endif
)
#ifdef HAVE_REPLICATION
if (repl_semisync_master.reportBinlogUpdate(thd, log_file_name,
file->pos_in_file))
{
sql_print_error("Failed to run 'after_flush' hooks");
error= 1;
}
else
#endif
{
/*
update binlog_end_pos so it can be read by dump thread
@ -6413,18 +6408,14 @@ err:
mysql_mutex_assert_not_owner(&LOCK_log);
mysql_mutex_assert_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
if (RUN_HOOK(binlog_storage, after_sync,
(thd, log_file_name, file->pos_in_file,
true, true))
#ifdef REPLICATION
|| repl_semisync_master.waitAfterSync(log_file_name,
file->pos_in_file)
#endif
)
#ifdef HAVE_REPLICATION
if (repl_semisync_master.waitAfterSync(log_file_name,
file->pos_in_file))
{
error=1;
/* error is already printed inside hook */
}
#endif
/*
Take mutex to protect against a reader seeing partial writes of 64-bit
@ -7850,33 +7841,23 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
mysql_mutex_assert_owner(&LOCK_log);
mysql_mutex_assert_not_owner(&LOCK_after_binlog_sync);
mysql_mutex_assert_not_owner(&LOCK_commit_ordered);
bool first __attribute__((unused))= true;
bool last __attribute__((unused));
for (current= queue; current != NULL; current= current->next)
{
last= current->next == NULL;
#ifdef HAVE_REPLICATION
if (!current->error &&
(RUN_HOOK(binlog_storage, after_flush,
(current->thd,
current->cache_mngr->last_commit_pos_file,
current->cache_mngr->last_commit_pos_offset, synced,
first, last))
#ifdef REPLICATION
|| (DBUG_EVALUATE_IF("failed_report_binlog_update", 1, 0) ||
repl_semisync_master.
reportBinlogUpdate(current->thd,
current->cache_mngr->last_commit_pos_file,
current->cache_mngr->
last_commit_pos_offset))
#endif
))
repl_semisync_master.
reportBinlogUpdate(current->thd,
current->cache_mngr->last_commit_pos_file,
current->cache_mngr->
last_commit_pos_offset))
{
current->error= ER_ERROR_ON_WRITE;
current->commit_errno= -1;
current->error_cache= NULL;
any_error= true;
}
first= false;
#endif
}
/*
@ -7951,24 +7932,14 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
for (current= queue; current != NULL; current= current->next)
{
last= current->next == NULL;
if (!current->error &&
(RUN_HOOK(binlog_storage, after_sync,
(current->thd, current->cache_mngr->last_commit_pos_file,
current->cache_mngr->last_commit_pos_offset,
first, last))
#ifdef REPLICATION
|| (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) ||
repl_semisync_master.waitAfterSync(current->cache_mngr->
last_commit_pos_file,
current->cache_mngr->
last_commit_pos_offset))
#ifdef HAVE_REPLICATION
if (!current->error)
current->error=
repl_semisync_master.waitAfterSync(current->cache_mngr->
last_commit_pos_file,
current->cache_mngr->
last_commit_pos_offset);
#endif
))
{
const char *hook_name= rpl_semi_sync_master_enabled ?
"'waitAfterSync'" : "binlog_storage 'after_sync'";
sql_print_error("Failed to call '%s'", hook_name);
}
first= false;
}
}

View File

@ -97,7 +97,6 @@
#include "set_var.h"
#include "rpl_injector.h"
#include "rpl_handler.h"
#include "semisync_master.h"
#include "semisync_slave.h"
@ -390,7 +389,7 @@ static longlong start_memory_used;
/* Global variables */
bool opt_bin_log, opt_bin_log_used=0, opt_ignore_builtin_innodb= 0;
bool opt_bin_log_compress, run_hooks_enabled;
bool opt_bin_log_compress;
uint opt_bin_log_compress_min_len;
my_bool opt_log, debug_assert_if_crashed_table= 0, opt_help= 0;
my_bool debug_assert_on_not_freed_memory= 0;
@ -2237,7 +2236,6 @@ void clean_up(bool print_message)
#ifdef HAVE_REPLICATION
semi_sync_master_deinit();
#endif
delegates_destroy();
xid_cache_free();
tdc_deinit();
mdl_destroy();
@ -5154,13 +5152,6 @@ static int init_server_components()
xid_cache_init();
/*
initialize delegates for extension observers, errors have already
been reported in the function
*/
if (delegates_init())
unireg_abort(1);
/* need to configure logging before initializing storage engines */
if (!opt_bin_log_used && !WSREP_ON)
{
@ -8960,7 +8951,6 @@ static int mysql_init_variables(void)
transactions_multi_engine= 0;
rpl_transactions_multi_engine= 0;
transactions_gtid_foreign_engine= 0;
run_hooks_enabled= 0; // don't run hooks, semisync does not need 'em
log_bin_basename= NULL;
log_bin_index= NULL;

View File

@ -109,7 +109,6 @@ extern CHARSET_INFO *character_set_filesystem;
extern MY_BITMAP temp_pool;
extern bool opt_large_files;
extern bool opt_update_log, opt_bin_log, opt_error_log, opt_bin_log_compress;
extern bool run_hooks_enabled;
extern uint opt_bin_log_compress_min_len;
extern my_bool opt_log, opt_bootstrap;
extern my_bool opt_backup_history_log;

View File

@ -1,555 +0,0 @@
/* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "mariadb.h"
#include "sql_priv.h"
#include "unireg.h"
#include "rpl_mi.h"
#include "sql_repl.h"
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
Trans_delegate *transaction_delegate;
Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
Binlog_transmit_delegate *binlog_transmit_delegate;
Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
/*
structure to save transaction log filename and position
*/
typedef struct Trans_binlog_info {
my_off_t log_pos;
char log_file[FN_REFLEN];
} Trans_binlog_info;
int get_user_var_int(const char *name,
long long int *value, int *null_value)
{
bool null_val;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
*value= entry->val_int(&null_val);
if (null_value)
*null_value= null_val;
return 0;
}
int get_user_var_real(const char *name,
double *value, int *null_value)
{
bool null_val;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
*value= entry->val_real(&null_val);
if (null_value)
*null_value= null_val;
return 0;
}
int get_user_var_str(const char *name, char *value,
size_t len, unsigned int precision, int *null_value)
{
String str;
bool null_val;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
entry->val_str(&null_val, &str, precision);
strncpy(value, str.c_ptr(), len);
if (null_value)
*null_value= null_val;
return 0;
}
int delegates_init()
{
static my_aligned_storage<sizeof(Trans_delegate), MY_ALIGNOF(long)> trans_mem;
static my_aligned_storage<sizeof(Binlog_storage_delegate),
MY_ALIGNOF(long)> storage_mem;
#ifdef HAVE_REPLICATION
static my_aligned_storage<sizeof(Binlog_transmit_delegate),
MY_ALIGNOF(long)> transmit_mem;
static my_aligned_storage<sizeof(Binlog_relay_IO_delegate),
MY_ALIGNOF(long)> relay_io_mem;
#endif
void *place_trans_mem= trans_mem.data;
void *place_storage_mem= storage_mem.data;
transaction_delegate= new (place_trans_mem) Trans_delegate;
if (!transaction_delegate->is_inited())
{
sql_print_error("Initialization of transaction delegates failed. "
"Please report a bug.");
return 1;
}
binlog_storage_delegate= new (place_storage_mem) Binlog_storage_delegate;
if (!binlog_storage_delegate->is_inited())
{
sql_print_error("Initialization binlog storage delegates failed. "
"Please report a bug.");
return 1;
}
#ifdef HAVE_REPLICATION
void *place_transmit_mem= transmit_mem.data;
void *place_relay_io_mem= relay_io_mem.data;
binlog_transmit_delegate= new (place_transmit_mem) Binlog_transmit_delegate;
if (!binlog_transmit_delegate->is_inited())
{
sql_print_error("Initialization of binlog transmit delegates failed. "
"Please report a bug.");
return 1;
}
binlog_relay_io_delegate= new (place_relay_io_mem) Binlog_relay_IO_delegate;
if (!binlog_relay_io_delegate->is_inited())
{
sql_print_error("Initialization binlog relay IO delegates failed. "
"Please report a bug.");
return 1;
}
#endif
return 0;
}
void delegates_destroy()
{
if (transaction_delegate)
transaction_delegate->~Trans_delegate();
transaction_delegate= 0;
if (binlog_storage_delegate)
binlog_storage_delegate->~Binlog_storage_delegate();
binlog_storage_delegate= 0;
#ifdef HAVE_REPLICATION
if (binlog_transmit_delegate)
binlog_transmit_delegate->~Binlog_transmit_delegate();
binlog_transmit_delegate= 0;
if (binlog_relay_io_delegate)
binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
binlog_relay_io_delegate= 0;
#endif /* HAVE_REPLICATION */
}
/*
This macro is used by almost all the Delegate methods to iterate
over all the observers running given callback function of the
delegate.
*/
#define FOREACH_OBSERVER(r, f, do_lock, args) \
param.server_id= thd->variables.server_id; \
read_lock(); \
Observer_info_iterator iter= observer_info_iter(); \
Observer_info *info= iter++; \
for (; info; info= iter++) \
{ \
if (((Observer *)info->observer)->f \
&& ((Observer *)info->observer)->f args) \
{ \
r= 1; \
sql_print_error("Run function '" #f "' failed"); \
break; \
} \
} \
unlock();
int Trans_delegate::after_commit(THD *thd, bool all)
{
Trans_param param;
Trans_binlog_info *log_info;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
int ret= 0;
param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
log_info= thd->semisync_info;
param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
param.log_pos= log_info ? log_info->log_pos : 0;
FOREACH_OBSERVER(ret, after_commit, false, (&param));
/*
This is the end of a real transaction or autocommit statement, we
can mark the memory unused.
*/
if (is_real_trans && log_info)
{
log_info->log_file[0]= 0;
log_info->log_pos= 0;
}
return ret;
}
int Trans_delegate::after_rollback(THD *thd, bool all)
{
Trans_param param;
Trans_binlog_info *log_info;
bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
int ret= 0;
param.flags = is_real_trans ? TRANS_IS_REAL_TRANS : 0;
log_info= thd->semisync_info;
param.log_file= log_info && log_info->log_file[0] ? log_info->log_file : 0;
param.log_pos= log_info ? log_info->log_pos : 0;
FOREACH_OBSERVER(ret, after_rollback, false, (&param));
/*
This is the end of a real transaction or autocommit statement, we
can mark the memory unused.
*/
if (is_real_trans && log_info)
{
log_info->log_file[0]= 0;
log_info->log_pos= 0;
}
return ret;
}
int Binlog_storage_delegate::after_flush(THD *thd,
const char *log_file,
my_off_t log_pos,
bool synced,
bool first_in_group,
bool last_in_group)
{
Binlog_storage_param param;
Trans_binlog_info *log_info;
uint32 flags=0;
int ret= 0;
if (synced)
flags |= BINLOG_STORAGE_IS_SYNCED;
if (first_in_group)
flags|= BINLOG_GROUP_COMMIT_LEADER;
if (last_in_group)
flags|= BINLOG_GROUP_COMMIT_TRAILER;
if (!(log_info= thd->semisync_info))
{
if(!(log_info=
(Trans_binlog_info*) my_malloc(sizeof(Trans_binlog_info), MYF(0))))
return 1;
thd->semisync_info= log_info;
}
strmake_buf(log_info->log_file, log_file+dirname_length(log_file));
log_info->log_pos = log_pos;
FOREACH_OBSERVER(ret, after_flush, false,
(&param, log_info->log_file, log_info->log_pos, flags));
return ret;
}
int Binlog_storage_delegate::after_sync(THD *thd,
const char *log_file,
my_off_t log_pos,
bool first_in_group,
bool last_in_group)
{
Binlog_storage_param param;
uint32 flags=0;
if (first_in_group)
flags|= BINLOG_GROUP_COMMIT_LEADER;
if (last_in_group)
flags|= BINLOG_GROUP_COMMIT_TRAILER;
int ret= 0;
FOREACH_OBSERVER(ret, after_sync, false,
(&param, log_file+dirname_length(log_file), log_pos, flags));
return ret;
}
#ifdef HAVE_REPLICATION
int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
const char *log_file,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, transmit_start, true, (&param, log_file, log_pos));
return ret;
}
int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, transmit_stop, false, (&param));
return ret;
}
int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
String *packet)
{
/* NOTE2ME: Maximum extra header size for each observer, I hope 32
bytes should be enough for each Observer to reserve their extra
header. If later found this is not enough, we can increase this
/HEZX
*/
#define RESERVE_HEADER_SIZE 32
unsigned char header[RESERVE_HEADER_SIZE];
ulong hlen;
Binlog_transmit_param param;
param.flags= flags;
param.server_id= thd->variables.server_id;
int ret= 0;
read_lock();
Observer_info_iterator iter= observer_info_iter();
Observer_info *info= iter++;
for (; info; info= iter++)
{
hlen= 0;
if (((Observer *)info->observer)->reserve_header
&& ((Observer *)info->observer)->reserve_header(&param,
header,
RESERVE_HEADER_SIZE,
&hlen))
{
ret= 1;
break;
}
if (hlen == 0)
continue;
if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
{
ret= 1;
break;
}
}
unlock();
return ret;
}
int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
String *packet,
const char *log_file,
my_off_t log_pos)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, before_send_event, false,
(&param, (uchar *)packet->c_ptr(),
packet->length(),
log_file+dirname_length(log_file), log_pos));
return ret;
}
int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
String *packet)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, after_send_event, false,
(&param, packet->c_ptr(), packet->length()));
return ret;
}
int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
{
Binlog_transmit_param param;
param.flags= flags;
int ret= 0;
FOREACH_OBSERVER(ret, after_reset_master, false, (&param));
return ret;
}
void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
Master_info *mi)
{
param->mysql= mi->mysql;
param->user= mi->user;
param->host= mi->host;
param->port= mi->port;
param->master_log_name= mi->master_log_name;
param->master_log_pos= mi->master_log_pos;
}
int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, thread_start, true, (&param));
return ret;
}
int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, thread_stop, false, (&param));
return ret;
}
int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
Master_info *mi,
ushort flags)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, before_request_transmit, false, (&param, (uint32)flags));
return ret;
}
int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf,
ulong *event_len)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, after_read_event, false,
(&param, packet, len, event_buf, event_len));
return ret;
}
int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
const char *event_buf,
ulong event_len,
bool synced)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
uint32 flags=0;
if (synced)
flags |= BINLOG_STORAGE_IS_SYNCED;
int ret= 0;
FOREACH_OBSERVER(ret, after_queue_event, false,
(&param, event_buf, event_len, flags));
return ret;
}
int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
{
Binlog_relay_IO_param param;
init_param(&param, mi);
int ret= 0;
FOREACH_OBSERVER(ret, after_reset_slave, false, (&param));
return ret;
}
#endif /* HAVE_REPLICATION */
int register_trans_observer(Trans_observer *observer, void *p)
{
return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_trans_observer(Trans_observer *observer, void *p)
{
return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
}
int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
{
return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
}
#ifdef HAVE_REPLICATION
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
}
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
}
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
}
#else
int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return 0;
}
int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
{
return 0;
}
int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return 0;
}
int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
{
return 0;
}
#endif /* HAVE_REPLICATION */

View File

@ -1,222 +0,0 @@
/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef RPL_HANDLER_H
#define RPL_HANDLER_H
#include "sql_priv.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
#include "sql_plugin.h"
#include "replication.h"
class Observer_info {
public:
void *observer;
st_plugin_int *plugin_int;
Observer_info(void *ob, st_plugin_int *p)
:observer(ob), plugin_int(p)
{ }
};
class Delegate {
public:
typedef List<Observer_info> Observer_info_list;
typedef List_iterator<Observer_info> Observer_info_iterator;
int add_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (!info)
{
info= new Observer_info(observer, plugin);
if (!info || observer_info_list.push_back(info, &memroot))
ret= TRUE;
}
else
ret= TRUE;
unlock();
return ret;
}
int remove_observer(void *observer, st_plugin_int *plugin)
{
int ret= FALSE;
if (!inited)
return TRUE;
write_lock();
Observer_info_iterator iter(observer_info_list);
Observer_info *info= iter++;
while (info && info->observer != observer)
info= iter++;
if (info)
{
iter.remove();
delete info;
}
else
ret= TRUE;
unlock();
return ret;
}
inline Observer_info_iterator observer_info_iter()
{
return Observer_info_iterator(observer_info_list);
}
inline bool is_empty()
{
return observer_info_list.is_empty();
}
inline int read_lock()
{
if (!inited)
return TRUE;
return rw_rdlock(&lock);
}
inline int write_lock()
{
if (!inited)
return TRUE;
return rw_wrlock(&lock);
}
inline int unlock()
{
if (!inited)
return TRUE;
return rw_unlock(&lock);
}
inline bool is_inited()
{
return inited;
}
Delegate()
{
inited= FALSE;
if (my_rwlock_init(&lock, NULL))
return;
init_sql_alloc(&memroot, 1024, 0, MYF(0));
inited= TRUE;
}
~Delegate()
{
inited= FALSE;
rwlock_destroy(&lock);
free_root(&memroot, MYF(0));
}
private:
Observer_info_list observer_info_list;
rw_lock_t lock;
MEM_ROOT memroot;
bool inited;
};
class Trans_delegate
:public Delegate {
public:
typedef Trans_observer Observer;
int before_commit(THD *thd, bool all);
int before_rollback(THD *thd, bool all);
int after_commit(THD *thd, bool all);
int after_rollback(THD *thd, bool all);
};
class Binlog_storage_delegate
:public Delegate {
public:
typedef Binlog_storage_observer Observer;
int after_flush(THD *thd, const char *log_file,
my_off_t log_pos, bool synced,
bool first_in_group, bool last_in_group);
int after_sync(THD *thd, const char *log_file, my_off_t log_pos,
bool first_in_group, bool last_in_group);
};
#ifdef HAVE_REPLICATION
class Binlog_transmit_delegate
:public Delegate {
public:
typedef Binlog_transmit_observer Observer;
int transmit_start(THD *thd, ushort flags,
const char *log_file, my_off_t log_pos);
int transmit_stop(THD *thd, ushort flags);
int reserve_header(THD *thd, ushort flags, String *packet);
int before_send_event(THD *thd, ushort flags,
String *packet, const
char *log_file, my_off_t log_pos );
int after_send_event(THD *thd, ushort flags,
String *packet);
int after_reset_master(THD *thd, ushort flags);
};
class Binlog_relay_IO_delegate
:public Delegate {
public:
typedef Binlog_relay_IO_observer Observer;
int thread_start(THD *thd, Master_info *mi);
int thread_stop(THD *thd, Master_info *mi);
int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
int after_read_event(THD *thd, Master_info *mi,
const char *packet, ulong len,
const char **event_buf, ulong *event_len);
int after_queue_event(THD *thd, Master_info *mi,
const char *event_buf, ulong event_len,
bool synced);
int after_reset_slave(THD *thd, Master_info *mi);
private:
void init_param(Binlog_relay_IO_param *param, Master_info *mi);
};
#endif /* HAVE_REPLICATION */
int delegates_init();
void delegates_destroy();
extern Trans_delegate *transaction_delegate;
extern Binlog_storage_delegate *binlog_storage_delegate;
#ifdef HAVE_REPLICATION
extern Binlog_transmit_delegate *binlog_transmit_delegate;
extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
#endif /* HAVE_REPLICATION */
/*
if semisync replication is not enabled, we can return immediately.
*/
#ifdef HAVE_REPLICATION
/*
As semisync is unpluggined and its hooks are turned into static
invocations all other hooks are not run for optimization sake.
*/
#define RUN_HOOK(group, hook, args) \
(unlikely(run_hooks_enabled) ? group ##_delegate->hook args : 0)
#else
#define RUN_HOOK(group, hook, args) 0
#endif /* HAVE_REPLICATION */
#endif /* RPL_HANDLER_H */

View File

@ -738,19 +738,19 @@ int ReplSemiSyncMaster::reportBinlogUpdate(THD* thd, const char *log_file,
return 0;
}
void ReplSemiSyncMaster::dump_start(THD* thd,
int ReplSemiSyncMaster::dump_start(THD* thd,
const char *log_file,
my_off_t log_pos)
{
if (!thd->semi_sync_slave)
return;
return 0;
if (ack_receiver.add_slave(thd))
{
sql_print_error("Failed to register slave to semi-sync ACK receiver "
"thread. Turning off semisync");
thd->semi_sync_slave= 0;
return;
return 1;
}
add_slave();
@ -758,7 +758,7 @@ void ReplSemiSyncMaster::dump_start(THD* thd,
sql_print_information("Start semi-sync binlog_dump to slave (server_id: %d), pos(%s, %lu",
thd->variables.server_id, log_file, (unsigned long)log_pos);
return;
return 0;
}
void ReplSemiSyncMaster::dump_end(THD* thd)

View File

@ -553,7 +553,7 @@ class ReplSemiSyncMaster
* be acked by slave*/
int reportBinlogUpdate(THD *thd, const char *log_file,my_off_t log_pos);
void dump_start(THD* thd,
int dump_start(THD* thd,
const char *log_file,
my_off_t log_pos);

View File

@ -24,6 +24,8 @@
#include "rpl_mi.h"
#include "mysql.h"
class Master_info;
/**
The extension class for the slave of semi-synchronous replication
*/

View File

@ -43,7 +43,6 @@
#include <ssl_compat.h>
#include <mysqld_error.h>
#include <mysys_err.h>
#include "rpl_handler.h"
#include <signal.h>
#include <mysql.h>
#include <myisam.h>
@ -3587,10 +3586,6 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
if (opt_log_slave_updates && opt_replicate_annotate_row_events)
binlog_flags|= BINLOG_SEND_ANNOTATE_ROWS_EVENT;
if (RUN_HOOK(binlog_relay_io,
before_request_transmit,
(thd, mi, binlog_flags)))
DBUG_RETURN(1);
if (repl_semisync_slave.requestTransmit(mi))
DBUG_RETURN(1);
@ -4618,9 +4613,8 @@ pthread_handler_t handle_slave_io(void *arg)
}
if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)) ||
(DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
|| repl_semisync_slave.slaveStart(mi)))
if (DBUG_EVALUATE_IF("failed_slave_start", 1, 0)
|| repl_semisync_slave.slaveStart(mi))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@ -4811,10 +4805,7 @@ Stopping slave I/O thread due to out-of-memory error from master");
THD_STAGE_INFO(thd, stage_queueing_master_event_to_the_relay_log);
event_buf= (const char*)mysql->net.read_pos + 1;
mi->semi_ack= 0;
if (RUN_HOOK(binlog_relay_io, after_read_event,
(thd, mi,(const char*)mysql->net.read_pos + 1,
event_len, &event_buf, &event_len)) ||
repl_semisync_slave.
if (repl_semisync_slave.
slaveReadSyncHeader((const char*)mysql->net.read_pos + 1, event_len,
&(mi->semi_ack), &event_buf, &event_len))
{
@ -4865,9 +4856,6 @@ Stopping slave I/O thread due to out-of-memory error from master");
tokenamount -= network_read_len;
}
/* XXX: 'synced' should be updated by queue_event to indicate
whether event has been synced to disk */
bool synced= 0;
if (queue_event(mi, event_buf, event_len))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE, NULL,
@ -4876,11 +4864,8 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err;
}
if (RUN_HOOK(binlog_relay_io, after_queue_event,
(thd, mi, event_buf, event_len, synced)) ||
(rpl_semi_sync_slave_status &&
(mi->semi_ack & SEMI_SYNC_NEED_ACK) &&
repl_semisync_slave.slaveReply(mi)))
if (rpl_semi_sync_slave_status && (mi->semi_ack & SEMI_SYNC_NEED_ACK) &&
repl_semisync_slave.slaveReply(mi))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, NULL,
ER_THD(thd, ER_SLAVE_FATAL_ERROR),
@ -4952,7 +4937,6 @@ err:
IO_RPL_LOG_NAME, mi->master_log_pos,
tmp.c_ptr_safe());
}
(void) RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
repl_semisync_slave.slaveStop(mi);
thd->reset_query();
thd->reset_db(NULL, 0);

View File

@ -82,7 +82,6 @@
#include <m_ctype.h>
#include <myisam.h>
#include <my_dir.h>
#include "rpl_handler.h"
#include "rpl_mi.h"
#include "sql_digest.h"

View File

@ -28,7 +28,6 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
#include "debug_sync.h"
#include "semisync_master.h"
#include "semisync_slave.h"
@ -315,24 +314,35 @@ static int reset_transmit_packet(binlog_send_info *info, ushort flags,
packet->length(0);
packet->set("\0", 1, &my_charset_bin);
if (RUN_HOOK(binlog_transmit, reserve_header, (info->thd, flags, packet)))
{
/* RUN_HOOK() must return zero when thd->semi_sync_slave */
DBUG_ASSERT(!info->thd->semi_sync_slave);
info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed to run hook 'reserve_header'";
ret= 1;
}
if (info->thd->semi_sync_slave)
{
repl_semisync_master.reserveSyncHeader(packet);
if (repl_semisync_master.reserveSyncHeader(packet))
{
info->error= ER_UNKNOWN_ERROR;
*errmsg= "Failed to run hook 'reserve_header'";
ret= 1;
}
}
*ev_offset= packet->length();
return ret;
}
int get_user_var_int(const char *name,
long long int *value, int *null_value)
{
bool null_val;
user_var_entry *entry=
(user_var_entry*) my_hash_search(&current_thd->user_vars,
(uchar*) name, strlen(name));
if (!entry)
return 1;
*value= entry->val_int(&null_val);
if (null_value)
*null_value= null_val;
return 0;
}
inline bool is_semi_sync_slave()
{
int null_value;
@ -1935,9 +1945,7 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
THD_STAGE_INFO(info->thd, stage_sending_binlog_event_to_slave);
pos= my_b_tell(log);
if (RUN_HOOK(binlog_transmit, before_send_event,
(info->thd, info->flags, packet, info->log_file_name, pos)) ||
repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(),
if (repl_semisync_master.updateSyncHeader(info->thd, (uchar *)packet->c_ptr(),
info->log_file_name + info->dirlen,
pos, &need_sync))
{
@ -1961,14 +1969,11 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
if (RUN_HOOK(binlog_transmit, after_send_event,
(info->thd, info->flags, packet)))
if (need_sync && repl_semisync_master.flushNet(info->thd, packet->c_ptr()))
{
info->error= ER_UNKNOWN_ERROR;
return "Failed to run hook 'after_send_event'";
}
if (need_sync)
repl_semisync_master.flushNet(info->thd, packet->c_ptr());
return NULL; /* Success */
}
@ -2740,21 +2745,16 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
if (init_binlog_sender(info, &linfo, log_ident, &pos))
goto err;
/*
run hook first when all check has been made that slave seems to
be requesting a reasonable position. i.e when transmit actually starts
*/
if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
has_transmit_started= true;
/* Check if the dump thread is created by a slave with semisync enabled. */
thd->semi_sync_slave = is_semi_sync_slave();
if (repl_semisync_master.dump_start(thd, log_ident, pos))
{
info->errmsg= "Failed to run hook 'transmit_start'";
info->error= ER_UNKNOWN_ERROR;
goto err;
}
has_transmit_started= true;
/* Check if the dump thread is created by a slave with semisync enabled. */
thd->semi_sync_slave = is_semi_sync_slave();
repl_semisync_master.dump_start(thd, log_ident, pos);
/*
heartbeat_period from @master_heartbeat_period user variable
@ -2871,7 +2871,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
err:
THD_STAGE_INFO(thd, stage_waiting_to_finalize_termination);
(void) RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
if (has_transmit_started)
{
repl_semisync_master.dump_end(thd);
@ -3341,7 +3340,6 @@ int reset_slave(THD *thd, Master_info* mi)
else if (global_system_variables.log_warnings > 1)
sql_print_information("Deleted Master_info file '%s'.", fname);
(void) RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
if (rpl_semi_sync_slave_enabled)
repl_semisync_slave.resetSlave(mi);
err:
@ -3850,7 +3848,6 @@ int reset_master(THD* thd, rpl_gtid *init_state, uint32 init_state_len,
repl_semisync_master.beforeResetMaster();
ret= mysql_bin_log.reset_logs(thd, 1, init_state, init_state_len,
next_log_number);
(void) RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
repl_semisync_master.afterResetMaster();
return ret;
}

View File

@ -21,7 +21,6 @@
#include "mariadb.h"
#include "sql_priv.h"
#include "transaction.h"
#include "rpl_handler.h"
#include "debug_sync.h" // DEBUG_SYNC
#include "sql_acl.h"
#include "semisync_master.h"
@ -319,14 +318,12 @@ bool trans_commit(THD *thd)
*/
if (res)
{
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE);
#endif
}
else
{
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterCommit(thd, FALSE);
#endif
@ -423,7 +420,6 @@ bool trans_rollback(THD *thd)
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= ha_rollback_trans(thd, TRUE);
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE);
#endif
@ -540,14 +536,12 @@ bool trans_commit_stmt(THD *thd)
*/
if (res)
{
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE);
#endif
}
else
{
(void) RUN_HOOK(transaction, after_commit, (thd, FALSE));
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterCommit(thd, FALSE);
#endif
@ -590,7 +584,6 @@ bool trans_rollback_stmt(THD *thd)
trans_reset_one_shot_chistics(thd);
}
(void) RUN_HOOK(transaction, after_rollback, (thd, FALSE));
#ifdef HAVE_REPLICATION
repl_semisync_master.waitAfterRollback(thd, FALSE);
#endif