This commit is contained in:
commit
5d16e868c1
@ -2291,201 +2291,6 @@ void kill_delayed_threads(void)
|
||||
}
|
||||
|
||||
|
||||
static void handle_delayed_insert_impl(THD *thd, Delayed_insert *di)
|
||||
{
|
||||
DBUG_ENTER("handle_delayed_insert_impl");
|
||||
thd->thread_stack= (char*) &thd;
|
||||
if (init_thr_lock() || thd->store_globals())
|
||||
{
|
||||
/* Can't use my_error since store_globals has perhaps failed */
|
||||
thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
|
||||
ER(ER_OUT_OF_RESOURCES), NULL);
|
||||
thd->fatal_error();
|
||||
goto err;
|
||||
}
|
||||
|
||||
/*
|
||||
Open table requires an initialized lex in case the table is
|
||||
partitioned. The .frm file contains a partial SQL string which is
|
||||
parsed using a lex, that depends on initialized thd->lex.
|
||||
*/
|
||||
lex_start(thd);
|
||||
thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
|
||||
/*
|
||||
Statement-based replication of INSERT DELAYED has problems with RAND()
|
||||
and user vars, so in mixed mode we go to row-based.
|
||||
*/
|
||||
thd->lex->set_stmt_unsafe();
|
||||
thd->set_current_stmt_binlog_row_based_if_mixed();
|
||||
|
||||
/* Open table */
|
||||
if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
|
||||
TL_WRITE_DELAYED)))
|
||||
{
|
||||
thd->fatal_error(); // Abort waiting inserts
|
||||
goto err;
|
||||
}
|
||||
if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
|
||||
{
|
||||
my_error(ER_DELAYED_NOT_SUPPORTED, MYF(ME_FATALERROR),
|
||||
di->table_list.table_name);
|
||||
goto err;
|
||||
}
|
||||
if (di->table->triggers)
|
||||
{
|
||||
/*
|
||||
Table has triggers. This is not an error, but we do
|
||||
not support triggers with delayed insert. Terminate the delayed
|
||||
thread without an error and thus request lock upgrade.
|
||||
*/
|
||||
goto err;
|
||||
}
|
||||
di->table->copy_blobs=1;
|
||||
|
||||
/* Tell client that the thread is initialized */
|
||||
pthread_cond_signal(&di->cond_client);
|
||||
|
||||
/* Now wait until we get an insert or lock to handle */
|
||||
/* We will not abort as long as a client thread uses this thread */
|
||||
|
||||
for (;;)
|
||||
{
|
||||
if (thd->killed == THD::KILL_CONNECTION)
|
||||
{
|
||||
uint lock_count;
|
||||
/*
|
||||
Remove this from delay insert list so that no one can request a
|
||||
table from this
|
||||
*/
|
||||
pthread_mutex_unlock(&di->mutex);
|
||||
pthread_mutex_lock(&LOCK_delayed_insert);
|
||||
di->unlink();
|
||||
lock_count=di->lock_count();
|
||||
pthread_mutex_unlock(&LOCK_delayed_insert);
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
|
||||
break; // Time to die
|
||||
}
|
||||
|
||||
if (!di->status && !di->stacked_inserts)
|
||||
{
|
||||
struct timespec abstime;
|
||||
set_timespec(abstime, delayed_insert_timeout);
|
||||
|
||||
/* Information for pthread_kill */
|
||||
di->thd.mysys_var->current_mutex= &di->mutex;
|
||||
di->thd.mysys_var->current_cond= &di->cond;
|
||||
thd_proc_info(&(di->thd), "Waiting for INSERT");
|
||||
|
||||
DBUG_PRINT("info",("Waiting for someone to insert rows"));
|
||||
while (!thd->killed)
|
||||
{
|
||||
int error;
|
||||
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
|
||||
error=pthread_cond_wait(&di->cond,&di->mutex);
|
||||
#else
|
||||
error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
|
||||
#ifdef EXTRA_DEBUG
|
||||
if (error && error != EINTR && error != ETIMEDOUT)
|
||||
{
|
||||
fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
|
||||
DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
|
||||
error));
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
if (thd->killed || di->status)
|
||||
break;
|
||||
if (error == ETIMEDOUT || error == ETIME)
|
||||
{
|
||||
thd->killed= THD::KILL_CONNECTION;
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* We can't lock di->mutex and mysys_var->mutex at the same time */
|
||||
pthread_mutex_unlock(&di->mutex);
|
||||
pthread_mutex_lock(&di->thd.mysys_var->mutex);
|
||||
di->thd.mysys_var->current_mutex= 0;
|
||||
di->thd.mysys_var->current_cond= 0;
|
||||
pthread_mutex_unlock(&di->thd.mysys_var->mutex);
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
}
|
||||
thd_proc_info(&(di->thd), 0);
|
||||
|
||||
if (di->tables_in_use && ! thd->lock)
|
||||
{
|
||||
bool not_used;
|
||||
/*
|
||||
Request for new delayed insert.
|
||||
Lock the table, but avoid to be blocked by a global read lock.
|
||||
If we got here while a global read lock exists, then one or more
|
||||
inserts started before the lock was requested. These are allowed
|
||||
to complete their work before the server returns control to the
|
||||
client which requested the global read lock. The delayed insert
|
||||
handler will close the table and finish when the outstanding
|
||||
inserts are done.
|
||||
*/
|
||||
if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
|
||||
MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
|
||||
¬_used)))
|
||||
{
|
||||
/* Fatal error */
|
||||
di->dead= 1;
|
||||
thd->killed= THD::KILL_CONNECTION;
|
||||
}
|
||||
pthread_cond_broadcast(&di->cond_client);
|
||||
}
|
||||
if (di->stacked_inserts)
|
||||
{
|
||||
if (di->handle_inserts())
|
||||
{
|
||||
/* Some fatal error */
|
||||
di->dead= 1;
|
||||
thd->killed= THD::KILL_CONNECTION;
|
||||
}
|
||||
}
|
||||
di->status=0;
|
||||
if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
|
||||
{
|
||||
/*
|
||||
No one is doing a insert delayed
|
||||
Unlock table so that other threads can use it
|
||||
*/
|
||||
MYSQL_LOCK *lock=thd->lock;
|
||||
thd->lock=0;
|
||||
pthread_mutex_unlock(&di->mutex);
|
||||
/*
|
||||
We need to release next_insert_id before unlocking. This is
|
||||
enforced by handler::ha_external_lock().
|
||||
*/
|
||||
di->table->file->ha_release_auto_increment();
|
||||
mysql_unlock_tables(thd, lock);
|
||||
ha_autocommit_or_rollback(thd, 0);
|
||||
di->group_count=0;
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
}
|
||||
if (di->tables_in_use)
|
||||
pthread_cond_broadcast(&di->cond_client); // If waiting clients
|
||||
}
|
||||
|
||||
err:
|
||||
/*
|
||||
mysql_lock_tables() can potentially start a transaction and write
|
||||
a table map. In the event of an error, that transaction has to be
|
||||
rolled back. We only need to roll back a potential statement
|
||||
transaction, since real transactions are rolled back in
|
||||
close_thread_tables().
|
||||
|
||||
TODO: This is not true any more, table maps are generated on the
|
||||
first call to ha_*_row() instead. Remove code that are used to
|
||||
cover for the case outlined above.
|
||||
*/
|
||||
ha_autocommit_or_rollback(thd, 1);
|
||||
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Create a new delayed insert thread
|
||||
*/
|
||||
@ -2518,11 +2323,201 @@ pthread_handler_t handle_delayed_insert(void *arg)
|
||||
/* Can't use my_error since store_globals has not yet been called */
|
||||
thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
|
||||
ER(ER_OUT_OF_RESOURCES), NULL);
|
||||
goto end;
|
||||
}
|
||||
handle_delayed_insert_impl(thd, di);
|
||||
else
|
||||
{
|
||||
DBUG_ENTER("handle_delayed_insert");
|
||||
thd->thread_stack= (char*) &thd;
|
||||
if (init_thr_lock() || thd->store_globals())
|
||||
{
|
||||
/* Can't use my_error since store_globals has perhaps failed */
|
||||
thd->stmt_da->set_error_status(thd, ER_OUT_OF_RESOURCES,
|
||||
ER(ER_OUT_OF_RESOURCES), NULL);
|
||||
thd->fatal_error();
|
||||
goto err;
|
||||
}
|
||||
|
||||
/*
|
||||
Open table requires an initialized lex in case the table is
|
||||
partitioned. The .frm file contains a partial SQL string which is
|
||||
parsed using a lex, that depends on initialized thd->lex.
|
||||
*/
|
||||
lex_start(thd);
|
||||
thd->lex->sql_command= SQLCOM_INSERT; // For innodb::store_lock()
|
||||
/*
|
||||
Statement-based replication of INSERT DELAYED has problems with RAND()
|
||||
and user vars, so in mixed mode we go to row-based.
|
||||
*/
|
||||
thd->lex->set_stmt_unsafe();
|
||||
thd->set_current_stmt_binlog_row_based_if_mixed();
|
||||
|
||||
/* Open table */
|
||||
if (!(di->table= open_n_lock_single_table(thd, &di->table_list,
|
||||
TL_WRITE_DELAYED)))
|
||||
{
|
||||
thd->fatal_error(); // Abort waiting inserts
|
||||
goto err;
|
||||
}
|
||||
if (!(di->table->file->ha_table_flags() & HA_CAN_INSERT_DELAYED))
|
||||
{
|
||||
my_error(ER_DELAYED_NOT_SUPPORTED, MYF(ME_FATALERROR),
|
||||
di->table_list.table_name);
|
||||
goto err;
|
||||
}
|
||||
if (di->table->triggers)
|
||||
{
|
||||
/*
|
||||
Table has triggers. This is not an error, but we do
|
||||
not support triggers with delayed insert. Terminate the delayed
|
||||
thread without an error and thus request lock upgrade.
|
||||
*/
|
||||
goto err;
|
||||
}
|
||||
di->table->copy_blobs=1;
|
||||
|
||||
/* Tell client that the thread is initialized */
|
||||
pthread_cond_signal(&di->cond_client);
|
||||
|
||||
/* Now wait until we get an insert or lock to handle */
|
||||
/* We will not abort as long as a client thread uses this thread */
|
||||
|
||||
for (;;)
|
||||
{
|
||||
if (thd->killed == THD::KILL_CONNECTION)
|
||||
{
|
||||
uint lock_count;
|
||||
/*
|
||||
Remove this from delay insert list so that no one can request a
|
||||
table from this
|
||||
*/
|
||||
pthread_mutex_unlock(&di->mutex);
|
||||
pthread_mutex_lock(&LOCK_delayed_insert);
|
||||
di->unlink();
|
||||
lock_count=di->lock_count();
|
||||
pthread_mutex_unlock(&LOCK_delayed_insert);
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
if (!lock_count && !di->tables_in_use && !di->stacked_inserts)
|
||||
break; // Time to die
|
||||
}
|
||||
|
||||
if (!di->status && !di->stacked_inserts)
|
||||
{
|
||||
struct timespec abstime;
|
||||
set_timespec(abstime, delayed_insert_timeout);
|
||||
|
||||
/* Information for pthread_kill */
|
||||
di->thd.mysys_var->current_mutex= &di->mutex;
|
||||
di->thd.mysys_var->current_cond= &di->cond;
|
||||
thd_proc_info(&(di->thd), "Waiting for INSERT");
|
||||
|
||||
DBUG_PRINT("info",("Waiting for someone to insert rows"));
|
||||
while (!thd->killed)
|
||||
{
|
||||
int error;
|
||||
#if defined(HAVE_BROKEN_COND_TIMEDWAIT)
|
||||
error=pthread_cond_wait(&di->cond,&di->mutex);
|
||||
#else
|
||||
error=pthread_cond_timedwait(&di->cond,&di->mutex,&abstime);
|
||||
#ifdef EXTRA_DEBUG
|
||||
if (error && error != EINTR && error != ETIMEDOUT)
|
||||
{
|
||||
fprintf(stderr, "Got error %d from pthread_cond_timedwait\n",error);
|
||||
DBUG_PRINT("error",("Got error %d from pthread_cond_timedwait",
|
||||
error));
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
if (thd->killed || di->status)
|
||||
break;
|
||||
if (error == ETIMEDOUT || error == ETIME)
|
||||
{
|
||||
thd->killed= THD::KILL_CONNECTION;
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* We can't lock di->mutex and mysys_var->mutex at the same time */
|
||||
pthread_mutex_unlock(&di->mutex);
|
||||
pthread_mutex_lock(&di->thd.mysys_var->mutex);
|
||||
di->thd.mysys_var->current_mutex= 0;
|
||||
di->thd.mysys_var->current_cond= 0;
|
||||
pthread_mutex_unlock(&di->thd.mysys_var->mutex);
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
}
|
||||
thd_proc_info(&(di->thd), 0);
|
||||
|
||||
if (di->tables_in_use && ! thd->lock)
|
||||
{
|
||||
bool not_used;
|
||||
/*
|
||||
Request for new delayed insert.
|
||||
Lock the table, but avoid to be blocked by a global read lock.
|
||||
If we got here while a global read lock exists, then one or more
|
||||
inserts started before the lock was requested. These are allowed
|
||||
to complete their work before the server returns control to the
|
||||
client which requested the global read lock. The delayed insert
|
||||
handler will close the table and finish when the outstanding
|
||||
inserts are done.
|
||||
*/
|
||||
if (! (thd->lock= mysql_lock_tables(thd, &di->table, 1,
|
||||
MYSQL_LOCK_IGNORE_GLOBAL_READ_LOCK,
|
||||
¬_used)))
|
||||
{
|
||||
/* Fatal error */
|
||||
di->dead= 1;
|
||||
thd->killed= THD::KILL_CONNECTION;
|
||||
}
|
||||
pthread_cond_broadcast(&di->cond_client);
|
||||
}
|
||||
if (di->stacked_inserts)
|
||||
{
|
||||
if (di->handle_inserts())
|
||||
{
|
||||
/* Some fatal error */
|
||||
di->dead= 1;
|
||||
thd->killed= THD::KILL_CONNECTION;
|
||||
}
|
||||
}
|
||||
di->status=0;
|
||||
if (!di->stacked_inserts && !di->tables_in_use && thd->lock)
|
||||
{
|
||||
/*
|
||||
No one is doing a insert delayed
|
||||
Unlock table so that other threads can use it
|
||||
*/
|
||||
MYSQL_LOCK *lock=thd->lock;
|
||||
thd->lock=0;
|
||||
pthread_mutex_unlock(&di->mutex);
|
||||
/*
|
||||
We need to release next_insert_id before unlocking. This is
|
||||
enforced by handler::ha_external_lock().
|
||||
*/
|
||||
di->table->file->ha_release_auto_increment();
|
||||
mysql_unlock_tables(thd, lock);
|
||||
ha_autocommit_or_rollback(thd, 0);
|
||||
di->group_count=0;
|
||||
pthread_mutex_lock(&di->mutex);
|
||||
}
|
||||
if (di->tables_in_use)
|
||||
pthread_cond_broadcast(&di->cond_client); // If waiting clients
|
||||
}
|
||||
|
||||
err:
|
||||
/*
|
||||
mysql_lock_tables() can potentially start a transaction and write
|
||||
a table map. In the event of an error, that transaction has to be
|
||||
rolled back. We only need to roll back a potential statement
|
||||
transaction, since real transactions are rolled back in
|
||||
close_thread_tables().
|
||||
|
||||
TODO: This is not true any more, table maps are generated on the
|
||||
first call to ha_*_row() instead. Remove code that are used to
|
||||
cover for the case outlined above.
|
||||
*/
|
||||
ha_autocommit_or_rollback(thd, 1);
|
||||
|
||||
DBUG_LEAVE;
|
||||
}
|
||||
|
||||
end:
|
||||
/*
|
||||
di should be unlinked from the thread handler list and have no active
|
||||
clients
|
||||
|
Loading…
x
Reference in New Issue
Block a user