Merge mysql.com:/home/bkroot/mysql-5.1-new-rpl
into mysql.com:/home/bk/b20821-mysql-5.1-new-rpl sql/sql_class.cc: Auto merged sql/sql_insert.cc: Auto merged
This commit is contained in:
commit
77e82450b3
16
mysql-test/r/rpl_insert.result
Normal file
16
mysql-test/r/rpl_insert.result
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
stop slave;
|
||||||
|
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
|
||||||
|
reset master;
|
||||||
|
reset slave;
|
||||||
|
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
|
||||||
|
start slave;
|
||||||
|
CREATE SCHEMA IF NOT EXISTS mysqlslap;
|
||||||
|
USE mysqlslap;
|
||||||
|
CREATE TABLE t1 (id INT, name VARCHAR(64));
|
||||||
|
SELECT COUNT(*) FROM mysqlslap.t1;
|
||||||
|
COUNT(*)
|
||||||
|
20000
|
||||||
|
SELECT COUNT(*) FROM mysqlslap.t1;
|
||||||
|
COUNT(*)
|
||||||
|
20000
|
||||||
|
DROP SCHEMA IF EXISTS mysqlslap;
|
@ -17,8 +17,10 @@ Log_name Pos Event_type Server_id End_log_pos Info
|
|||||||
master-bin.000001 4 Format_desc 1 102 Server ver: VERSION, Binlog ver: 4
|
master-bin.000001 4 Format_desc 1 102 Server ver: VERSION, Binlog ver: 4
|
||||||
master-bin.000001 102 Query 1 222 use `test`; create table t1(a int not null primary key) engine=myisam
|
master-bin.000001 102 Query 1 222 use `test`; create table t1(a int not null primary key) engine=myisam
|
||||||
master-bin.000001 222 Table_map 1 261 table_id: # (test.t1)
|
master-bin.000001 222 Table_map 1 261 table_id: # (test.t1)
|
||||||
master-bin.000001 261 Write_rows 1 305 table_id: # flags: STMT_END_F
|
master-bin.000001 261 Write_rows 1 295 table_id: # flags: STMT_END_F
|
||||||
master-bin.000001 305 Query 1 380 use `test`; flush tables
|
master-bin.000001 295 Table_map 1 334 table_id: # (test.t1)
|
||||||
|
master-bin.000001 334 Write_rows 1 373 table_id: # flags: STMT_END_F
|
||||||
|
master-bin.000001 373 Query 1 448 use `test`; flush tables
|
||||||
SELECT * FROM t1 ORDER BY a;
|
SELECT * FROM t1 ORDER BY a;
|
||||||
a
|
a
|
||||||
1
|
1
|
||||||
|
27
mysql-test/t/rpl_insert.test
Normal file
27
mysql-test/t/rpl_insert.test
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
|
||||||
|
#
|
||||||
|
# Bug#20821: INSERT DELAYED fails to write some rows to binlog
|
||||||
|
#
|
||||||
|
|
||||||
|
--source include/master-slave.inc
|
||||||
|
--source include/not_embedded.inc
|
||||||
|
--source include/not_windows.inc
|
||||||
|
|
||||||
|
--disable_warnings
|
||||||
|
CREATE SCHEMA IF NOT EXISTS mysqlslap;
|
||||||
|
USE mysqlslap;
|
||||||
|
--enable_warnings
|
||||||
|
|
||||||
|
CREATE TABLE t1 (id INT, name VARCHAR(64));
|
||||||
|
|
||||||
|
let $query = "INSERT INTO t1 VALUES (1, 'Dr. No'), (2, 'From Russia With Love'), (3, 'Goldfinger'), (4, 'Thunderball'), (5, 'You Only Live Twice')";
|
||||||
|
--exec $MYSQL_SLAP --silent --concurrency=20 --iterations=200 --query=$query --delimiter=";"
|
||||||
|
|
||||||
|
SELECT COUNT(*) FROM mysqlslap.t1;
|
||||||
|
sync_slave_with_master;
|
||||||
|
SELECT COUNT(*) FROM mysqlslap.t1;
|
||||||
|
|
||||||
|
connection master;
|
||||||
|
DROP SCHEMA IF EXISTS mysqlslap;
|
||||||
|
sync_slave_with_master;
|
||||||
|
|
@ -2712,6 +2712,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype,
|
|||||||
bool is_trans, bool suppress_use)
|
bool is_trans, bool suppress_use)
|
||||||
{
|
{
|
||||||
DBUG_ENTER("THD::binlog_query");
|
DBUG_ENTER("THD::binlog_query");
|
||||||
|
DBUG_PRINT("enter", ("qtype=%d, query='%s'", qtype, query));
|
||||||
DBUG_ASSERT(query && mysql_bin_log.is_open());
|
DBUG_ASSERT(query && mysql_bin_log.is_open());
|
||||||
|
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
|
@ -26,8 +26,8 @@
|
|||||||
static int check_null_fields(THD *thd,TABLE *entry);
|
static int check_null_fields(THD *thd,TABLE *entry);
|
||||||
#ifndef EMBEDDED_LIBRARY
|
#ifndef EMBEDDED_LIBRARY
|
||||||
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
|
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
|
||||||
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
|
static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
|
||||||
char *query, uint query_length, bool log_on);
|
LEX_STRING query, bool ignore, bool log_on);
|
||||||
static void end_delayed_insert(THD *thd);
|
static void end_delayed_insert(THD *thd);
|
||||||
pthread_handler_t handle_delayed_insert(void *arg);
|
pthread_handler_t handle_delayed_insert(void *arg);
|
||||||
static void unlink_blobs(register TABLE *table);
|
static void unlink_blobs(register TABLE *table);
|
||||||
@ -511,7 +511,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
|
|||||||
#ifndef EMBEDDED_LIBRARY
|
#ifndef EMBEDDED_LIBRARY
|
||||||
if (lock_type == TL_WRITE_DELAYED)
|
if (lock_type == TL_WRITE_DELAYED)
|
||||||
{
|
{
|
||||||
error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on);
|
LEX_STRING const st_query = { query, thd->query_length };
|
||||||
|
error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
|
||||||
query=0;
|
query=0;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -1251,11 +1252,16 @@ public:
|
|||||||
bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
|
bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
|
||||||
ulonglong last_insert_id;
|
ulonglong last_insert_id;
|
||||||
timestamp_auto_set_type timestamp_field_type;
|
timestamp_auto_set_type timestamp_field_type;
|
||||||
|
LEX_STRING query;
|
||||||
|
|
||||||
delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
|
delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
|
||||||
:record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
|
bool ignore_arg, bool log_query_arg)
|
||||||
|
: record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
|
||||||
|
query(query_arg)
|
||||||
|
{}
|
||||||
~delayed_row()
|
~delayed_row()
|
||||||
{
|
{
|
||||||
|
x_free(query.str);
|
||||||
x_free(record);
|
x_free(record);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -1263,9 +1269,6 @@ public:
|
|||||||
|
|
||||||
class delayed_insert :public ilink {
|
class delayed_insert :public ilink {
|
||||||
uint locks_in_memory;
|
uint locks_in_memory;
|
||||||
char *query;
|
|
||||||
ulong query_length;
|
|
||||||
ulong query_allocated;
|
|
||||||
public:
|
public:
|
||||||
THD thd;
|
THD thd;
|
||||||
TABLE *table;
|
TABLE *table;
|
||||||
@ -1279,7 +1282,7 @@ public:
|
|||||||
TABLE_LIST table_list; // Argument
|
TABLE_LIST table_list; // Argument
|
||||||
|
|
||||||
delayed_insert()
|
delayed_insert()
|
||||||
:locks_in_memory(0), query(0), query_length(0), query_allocated(0),
|
:locks_in_memory(0),
|
||||||
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
|
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
|
||||||
group_count(0)
|
group_count(0)
|
||||||
{
|
{
|
||||||
@ -1305,7 +1308,6 @@ public:
|
|||||||
}
|
}
|
||||||
~delayed_insert()
|
~delayed_insert()
|
||||||
{
|
{
|
||||||
my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
|
|
||||||
/* The following is not really needed, but just for safety */
|
/* The following is not really needed, but just for safety */
|
||||||
delayed_row *row;
|
delayed_row *row;
|
||||||
while ((row=rows.get()))
|
while ((row=rows.get()))
|
||||||
@ -1325,25 +1327,6 @@ public:
|
|||||||
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
|
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
|
||||||
}
|
}
|
||||||
|
|
||||||
int set_query(char const *q, ulong qlen) {
|
|
||||||
if (q && qlen > 0)
|
|
||||||
{
|
|
||||||
if (query_allocated < qlen + 1)
|
|
||||||
{
|
|
||||||
ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
|
|
||||||
query= my_realloc(query, qlen + 1, MYF(flags));
|
|
||||||
if (query == 0)
|
|
||||||
return HA_ERR_OUT_OF_MEM;
|
|
||||||
query_allocated= qlen;
|
|
||||||
}
|
|
||||||
query_length= qlen;
|
|
||||||
memcpy(query, q, qlen + 1);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
query_length= 0;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* The following is for checking when we can delete ourselves */
|
/* The following is for checking when we can delete ourselves */
|
||||||
inline void lock()
|
inline void lock()
|
||||||
{
|
{
|
||||||
@ -1630,13 +1613,14 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
|
|||||||
|
|
||||||
/* Put a question in queue */
|
/* Put a question in queue */
|
||||||
|
|
||||||
static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
|
static int
|
||||||
bool ignore, char *query, uint query_length,
|
write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
|
||||||
bool log_on)
|
LEX_STRING query, bool ignore, bool log_on)
|
||||||
{
|
{
|
||||||
delayed_row *row=0;
|
delayed_row *row;
|
||||||
delayed_insert *di=thd->di;
|
delayed_insert *di=thd->di;
|
||||||
DBUG_ENTER("write_delayed");
|
DBUG_ENTER("write_delayed");
|
||||||
|
DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
|
||||||
|
|
||||||
thd->proc_info="waiting for handler insert";
|
thd->proc_info="waiting for handler insert";
|
||||||
pthread_mutex_lock(&di->mutex);
|
pthread_mutex_lock(&di->mutex);
|
||||||
@ -1644,13 +1628,28 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
|
|||||||
pthread_cond_wait(&di->cond_client,&di->mutex);
|
pthread_cond_wait(&di->cond_client,&di->mutex);
|
||||||
thd->proc_info="storing row into queue";
|
thd->proc_info="storing row into queue";
|
||||||
|
|
||||||
if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
|
if (thd->killed)
|
||||||
goto err;
|
goto err;
|
||||||
|
|
||||||
|
/*
|
||||||
|
Take a copy of the query string, if there is any. The string will
|
||||||
|
be free'ed when the row is destroyed. If there is no query string,
|
||||||
|
we don't do anything special.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (query.str)
|
||||||
|
if (!(query.str= my_strndup(query.str, MYF(MY_WME), query.length)))
|
||||||
|
goto err;
|
||||||
|
row= new delayed_row(query, duplic, ignore, log_on);
|
||||||
|
if (row == NULL)
|
||||||
|
{
|
||||||
|
my_free(query.str, MYF(MY_WME));
|
||||||
|
goto err;
|
||||||
|
}
|
||||||
|
|
||||||
if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
|
if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
|
||||||
goto err;
|
goto err;
|
||||||
memcpy(row->record, table->record[0], table->s->reclength);
|
memcpy(row->record, table->record[0], table->s->reclength);
|
||||||
di->set_query(query, query_length);
|
|
||||||
row->start_time= thd->start_time;
|
row->start_time= thd->start_time;
|
||||||
row->query_start_used= thd->query_start_used;
|
row->query_start_used= thd->query_start_used;
|
||||||
row->last_insert_id_used= thd->last_insert_id_used;
|
row->last_insert_id_used= thd->last_insert_id_used;
|
||||||
@ -2009,7 +2008,7 @@ bool delayed_insert::handle_inserts(void)
|
|||||||
if (thd.killed || table->s->version != refresh_version)
|
if (thd.killed || table->s->version != refresh_version)
|
||||||
{
|
{
|
||||||
thd.killed= THD::KILL_CONNECTION;
|
thd.killed= THD::KILL_CONNECTION;
|
||||||
max_rows= ~(ulong)0; // Do as much as possible
|
max_rows= ULONG_MAX; // Do as much as possible
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -2056,11 +2055,18 @@ bool delayed_insert::handle_inserts(void)
|
|||||||
thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
|
thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
|
||||||
row->log_query = 0;
|
row->log_query = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (using_ignore)
|
if (using_ignore)
|
||||||
{
|
{
|
||||||
using_ignore=0;
|
using_ignore=0;
|
||||||
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
|
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
|
||||||
|
thd.binlog_query(THD::ROW_QUERY_TYPE,
|
||||||
|
row->query.str, row->query.length,
|
||||||
|
FALSE, FALSE);
|
||||||
|
|
||||||
if (table->s->blob_fields)
|
if (table->s->blob_fields)
|
||||||
free_delayed_insert_blobs(table);
|
free_delayed_insert_blobs(table);
|
||||||
thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
|
thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
|
||||||
@ -2107,13 +2113,25 @@ bool delayed_insert::handle_inserts(void)
|
|||||||
pthread_cond_broadcast(&cond_client); // If waiting clients
|
pthread_cond_broadcast(&cond_client); // If waiting clients
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
thd.proc_info=0;
|
thd.proc_info=0;
|
||||||
pthread_mutex_unlock(&mutex);
|
pthread_mutex_unlock(&mutex);
|
||||||
|
|
||||||
/* After releasing the mutex, to prevent deadlocks. */
|
#ifdef HAVE_ROW_BASED_REPLICATION
|
||||||
if (mysql_bin_log.is_open())
|
/*
|
||||||
thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE);
|
We need to flush the pending event when using row-based
|
||||||
|
replication since the flushing normally done in binlog_query() is
|
||||||
|
not done last in the statement: for delayed inserts, the insert
|
||||||
|
statement is logged *before* all rows are inserted.
|
||||||
|
|
||||||
|
We can flush the pending event without checking the thd->lock
|
||||||
|
since the delayed insert *thread* is not inside a stored function
|
||||||
|
or trigger.
|
||||||
|
|
||||||
|
TODO: Move the logging to last in the sequence of rows.
|
||||||
|
*/
|
||||||
|
if (thd.current_stmt_binlog_row_based)
|
||||||
|
thd.binlog_flush_pending_rows_event(TRUE);
|
||||||
|
#endif /* HAVE_ROW_BASED_REPLICATION */
|
||||||
|
|
||||||
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
|
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
|
||||||
{ // This shouldn't happen
|
{ // This shouldn't happen
|
||||||
|
Loading…
x
Reference in New Issue
Block a user