diff --git a/mysql-test/r/rpl_insert.result b/mysql-test/r/rpl_insert.result new file mode 100644 index 00000000000..b0c44be017f --- /dev/null +++ b/mysql-test/r/rpl_insert.result @@ -0,0 +1,16 @@ +stop slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +reset master; +reset slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +start slave; +CREATE SCHEMA IF NOT EXISTS mysqlslap; +USE mysqlslap; +CREATE TABLE t1 (id INT, name VARCHAR(64)); +SELECT COUNT(*) FROM mysqlslap.t1; +COUNT(*) +20000 +SELECT COUNT(*) FROM mysqlslap.t1; +COUNT(*) +20000 +DROP SCHEMA IF EXISTS mysqlslap; diff --git a/mysql-test/r/rpl_row_delayed_ins.result b/mysql-test/r/rpl_row_delayed_ins.result index 16001b96ac2..31fffeb59cc 100644 --- a/mysql-test/r/rpl_row_delayed_ins.result +++ b/mysql-test/r/rpl_row_delayed_ins.result @@ -17,8 +17,10 @@ Log_name Pos Event_type Server_id End_log_pos Info master-bin.000001 4 Format_desc 1 102 Server ver: VERSION, Binlog ver: 4 master-bin.000001 102 Query 1 222 use `test`; create table t1(a int not null primary key) engine=myisam master-bin.000001 222 Table_map 1 261 table_id: # (test.t1) -master-bin.000001 261 Write_rows 1 305 table_id: # flags: STMT_END_F -master-bin.000001 305 Query 1 380 use `test`; flush tables +master-bin.000001 261 Write_rows 1 295 table_id: # flags: STMT_END_F +master-bin.000001 295 Table_map 1 334 table_id: # (test.t1) +master-bin.000001 334 Write_rows 1 373 table_id: # flags: STMT_END_F +master-bin.000001 373 Query 1 448 use `test`; flush tables SELECT * FROM t1 ORDER BY a; a 1 diff --git a/mysql-test/t/rpl_insert.test b/mysql-test/t/rpl_insert.test new file mode 100644 index 00000000000..2271fc9ad8a --- /dev/null +++ b/mysql-test/t/rpl_insert.test @@ -0,0 +1,27 @@ + +# +# Bug#20821: INSERT DELAYED fails to write some rows to binlog +# + +--source include/master-slave.inc +--source include/not_embedded.inc +--source include/not_windows.inc + +--disable_warnings +CREATE SCHEMA IF NOT EXISTS mysqlslap; +USE mysqlslap; +--enable_warnings + +CREATE TABLE t1 (id INT, name VARCHAR(64)); + +let $query = "INSERT INTO t1 VALUES (1, 'Dr. No'), (2, 'From Russia With Love'), (3, 'Goldfinger'), (4, 'Thunderball'), (5, 'You Only Live Twice')"; +--exec $MYSQL_SLAP --silent --concurrency=20 --iterations=200 --query=$query --delimiter=";" + +SELECT COUNT(*) FROM mysqlslap.t1; +sync_slave_with_master; +SELECT COUNT(*) FROM mysqlslap.t1; + +connection master; +DROP SCHEMA IF EXISTS mysqlslap; +sync_slave_with_master; + diff --git a/sql/sql_class.cc b/sql/sql_class.cc index c8c8ff16199..285a1e72f6f 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -2712,6 +2712,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, bool is_trans, bool suppress_use) { DBUG_ENTER("THD::binlog_query"); + DBUG_PRINT("enter", ("qtype=%d, query='%s'", qtype, query)); DBUG_ASSERT(query && mysql_bin_log.is_open()); switch (qtype) { diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 088bd3e59e5..2ec2555d6fc 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -26,8 +26,8 @@ static int check_null_fields(THD *thd,TABLE *entry); #ifndef EMBEDDED_LIBRARY static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list); -static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore, - char *query, uint query_length, bool log_on); +static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup, + LEX_STRING query, bool ignore, bool log_on); static void end_delayed_insert(THD *thd); pthread_handler_t handle_delayed_insert(void *arg); static void unlink_blobs(register TABLE *table); @@ -511,7 +511,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, #ifndef EMBEDDED_LIBRARY if (lock_type == TL_WRITE_DELAYED) { - error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on); + LEX_STRING const st_query = { query, thd->query_length }; + error=write_delayed(thd, table, duplic, st_query, ignore, log_on); query=0; } else @@ -1251,11 +1252,16 @@ public: bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query; ulonglong last_insert_id; timestamp_auto_set_type timestamp_field_type; + LEX_STRING query; - delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg) - :record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {} + delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg, + bool ignore_arg, bool log_query_arg) + : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg), + query(query_arg) + {} ~delayed_row() { + x_free(query.str); x_free(record); } }; @@ -1263,9 +1269,6 @@ public: class delayed_insert :public ilink { uint locks_in_memory; - char *query; - ulong query_length; - ulong query_allocated; public: THD thd; TABLE *table; @@ -1279,7 +1282,7 @@ public: TABLE_LIST table_list; // Argument delayed_insert() - :locks_in_memory(0), query(0), query_length(0), query_allocated(0), + :locks_in_memory(0), table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0), group_count(0) { @@ -1305,7 +1308,6 @@ public: } ~delayed_insert() { - my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR)); /* The following is not really needed, but just for safety */ delayed_row *row; while ((row=rows.get())) @@ -1325,25 +1327,6 @@ public: VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */ } - int set_query(char const *q, ulong qlen) { - if (q && qlen > 0) - { - if (query_allocated < qlen + 1) - { - ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR); - query= my_realloc(query, qlen + 1, MYF(flags)); - if (query == 0) - return HA_ERR_OUT_OF_MEM; - query_allocated= qlen; - } - query_length= qlen; - memcpy(query, q, qlen + 1); - } - else - query_length= 0; - return 0; - } - /* The following is for checking when we can delete ourselves */ inline void lock() { @@ -1630,13 +1613,14 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) /* Put a question in queue */ -static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, - bool ignore, char *query, uint query_length, - bool log_on) +static int +write_delayed(THD *thd,TABLE *table, enum_duplicates duplic, + LEX_STRING query, bool ignore, bool log_on) { - delayed_row *row=0; + delayed_row *row; delayed_insert *di=thd->di; DBUG_ENTER("write_delayed"); + DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length)); thd->proc_info="waiting for handler insert"; pthread_mutex_lock(&di->mutex); @@ -1644,13 +1628,28 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, pthread_cond_wait(&di->cond_client,&di->mutex); thd->proc_info="storing row into queue"; - if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on))) + if (thd->killed) goto err; + /* + Take a copy of the query string, if there is any. The string will + be free'ed when the row is destroyed. If there is no query string, + we don't do anything special. + */ + + if (query.str) + if (!(query.str= my_strndup(query.str, MYF(MY_WME), query.length))) + goto err; + row= new delayed_row(query, duplic, ignore, log_on); + if (row == NULL) + { + my_free(query.str, MYF(MY_WME)); + goto err; + } + if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME)))) goto err; memcpy(row->record, table->record[0], table->s->reclength); - di->set_query(query, query_length); row->start_time= thd->start_time; row->query_start_used= thd->query_start_used; row->last_insert_id_used= thd->last_insert_id_used; @@ -2009,7 +2008,7 @@ bool delayed_insert::handle_inserts(void) if (thd.killed || table->s->version != refresh_version) { thd.killed= THD::KILL_CONNECTION; - max_rows= ~(ulong)0; // Do as much as possible + max_rows= ULONG_MAX; // Do as much as possible } /* @@ -2056,11 +2055,18 @@ bool delayed_insert::handle_inserts(void) thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status); row->log_query = 0; } + if (using_ignore) { using_ignore=0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); } + + if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open()) + thd.binlog_query(THD::ROW_QUERY_TYPE, + row->query.str, row->query.length, + FALSE, FALSE); + if (table->s->blob_fields) free_delayed_insert_blobs(table); thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status); @@ -2107,13 +2113,25 @@ bool delayed_insert::handle_inserts(void) pthread_cond_broadcast(&cond_client); // If waiting clients } } - thd.proc_info=0; pthread_mutex_unlock(&mutex); - /* After releasing the mutex, to prevent deadlocks. */ - if (mysql_bin_log.is_open()) - thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE); +#ifdef HAVE_ROW_BASED_REPLICATION + /* + We need to flush the pending event when using row-based + replication since the flushing normally done in binlog_query() is + not done last in the statement: for delayed inserts, the insert + statement is logged *before* all rows are inserted. + + We can flush the pending event without checking the thd->lock + since the delayed insert *thread* is not inside a stored function + or trigger. + + TODO: Move the logging to last in the sequence of rows. + */ + if (thd.current_stmt_binlog_row_based) + thd.binlog_flush_pending_rows_event(TRUE); +#endif /* HAVE_ROW_BASED_REPLICATION */ if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) { // This shouldn't happen