MDEV-24341 Innodb - do not block in foreground thread in log_write_up_to(
This commit is contained in:
parent
a1542f8a57
commit
4df0249b9a
@ -640,8 +640,20 @@ net_real_write(NET *net,const uchar *packet, size_t len)
|
||||
my_bool net_blocking = vio_is_blocking(net->vio);
|
||||
DBUG_ENTER("net_real_write");
|
||||
|
||||
#if defined(MYSQL_SERVER) && defined(USE_QUERY_CACHE)
|
||||
query_cache_insert(net->thd, (char*) packet, len, net->pkt_nr);
|
||||
#if defined(MYSQL_SERVER)
|
||||
THD *thd= (THD *)net->thd;
|
||||
#if defined(USE_QUERY_CACHE)
|
||||
query_cache_insert(thd, (char*) packet, len, net->pkt_nr);
|
||||
#endif
|
||||
if (likely(thd))
|
||||
{
|
||||
/*
|
||||
Wait until pending operations (currently it is engine
|
||||
asynchronous group commit) are finished before replying
|
||||
to the client, to keep durability promise.
|
||||
*/
|
||||
thd->async_state.wait_for_pending_ops();
|
||||
}
|
||||
#endif
|
||||
|
||||
if (unlikely(net->error == 2))
|
||||
|
@ -40,6 +40,8 @@ struct scheduler_functions
|
||||
void (*thd_wait_end)(THD *thd);
|
||||
void (*post_kill_notification)(THD *thd);
|
||||
void (*end)(void);
|
||||
/** resume previous unfinished command (threadpool only)*/
|
||||
void (*thd_resume)(THD* thd);
|
||||
};
|
||||
|
||||
|
||||
|
@ -682,7 +682,8 @@ THD::THD(my_thread_id id, bool is_wsrep_applier)
|
||||
m_stmt_da(&main_da),
|
||||
tdc_hash_pins(0),
|
||||
xid_hash_pins(0),
|
||||
m_tmp_tables_locked(false)
|
||||
m_tmp_tables_locked(false),
|
||||
async_state()
|
||||
#ifdef HAVE_REPLICATION
|
||||
,
|
||||
current_linfo(0),
|
||||
@ -4947,6 +4948,56 @@ void reset_thd(MYSQL_THD thd)
|
||||
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
|
||||
}
|
||||
|
||||
/**
|
||||
This function can be used by storage engine
|
||||
to indicate a start of an async operation.
|
||||
|
||||
This asynchronous is such operation needs to be
|
||||
finished before we write response to the client
|
||||
.
|
||||
An example of this operation is Innodb's asynchronous
|
||||
group commit. Server needs to wait for the end of it
|
||||
before writing response to client, to provide durability
|
||||
guarantees, in other words, server can't send OK packet
|
||||
before modified data is durable in redo log.
|
||||
*/
|
||||
extern "C" MYSQL_THD thd_increment_pending_ops(void)
|
||||
{
|
||||
THD *thd = current_thd;
|
||||
if (!thd)
|
||||
return NULL;
|
||||
thd->async_state.inc_pending_ops();
|
||||
return thd;
|
||||
}
|
||||
|
||||
/**
|
||||
This function can be used by plugin/engine to indicate
|
||||
end of async operation (such as end of group commit
|
||||
write flush)
|
||||
|
||||
@param thd THD
|
||||
*/
|
||||
extern "C" void thd_decrement_pending_ops(MYSQL_THD thd)
|
||||
{
|
||||
DBUG_ASSERT(thd);
|
||||
thd_async_state::enum_async_state state;
|
||||
if (thd->async_state.dec_pending_ops(&state) == 0)
|
||||
{
|
||||
switch(state)
|
||||
{
|
||||
case thd_async_state::enum_async_state::SUSPENDED:
|
||||
DBUG_ASSERT(thd->scheduler->thd_resume);
|
||||
thd->scheduler->thd_resume(thd);
|
||||
break;
|
||||
case thd_async_state::enum_async_state::NONE:
|
||||
break;
|
||||
default:
|
||||
DBUG_ASSERT(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
unsigned long long thd_get_query_id(const MYSQL_THD thd)
|
||||
{
|
||||
return((unsigned long long)thd->query_id);
|
||||
|
159
sql/sql_class.h
159
sql/sql_class.h
@ -2309,6 +2309,164 @@ struct THD_count
|
||||
~THD_count() { thread_count--; }
|
||||
};
|
||||
|
||||
/**
|
||||
Support structure for asynchronous group commit, or more generally
|
||||
any asynchronous operation that needs to finish before server writes
|
||||
response to client.
|
||||
|
||||
An engine, or any other server component, can signal that there is
|
||||
a pending operation by incrementing a counter, i.e inc_pending_ops()
|
||||
and that pending operation is finished by decrementing that counter
|
||||
dec_pending_ops().
|
||||
|
||||
NOTE: Currently, pending operations can not fail, i.e there is no
|
||||
way to pass a return code in dec_pending_ops()
|
||||
|
||||
The server does not write response to the client before the counter
|
||||
becomes 0. In case of group commit it ensures that data is persistent
|
||||
before success reported to client, i.e durability in ACID.
|
||||
*/
|
||||
struct thd_async_state
|
||||
{
|
||||
enum class enum_async_state
|
||||
{
|
||||
NONE,
|
||||
SUSPENDED, /* do_command() did not finish, and needs to be resumed */
|
||||
RESUMED /* do_command() is resumed*/
|
||||
};
|
||||
enum_async_state m_state{enum_async_state::NONE};
|
||||
|
||||
/* Stuff we need to resume do_command where we finished last time*/
|
||||
enum enum_server_command m_command{COM_SLEEP};
|
||||
LEX_STRING m_packet{};
|
||||
|
||||
mysql_mutex_t m_mtx;
|
||||
mysql_cond_t m_cond;
|
||||
|
||||
/** Pending counter*/
|
||||
Atomic_counter<int> m_pending_ops=0;
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
/* Checks */
|
||||
pthread_t m_dbg_thread;
|
||||
#endif
|
||||
|
||||
thd_async_state()
|
||||
{
|
||||
mysql_mutex_init(PSI_NOT_INSTRUMENTED, &m_mtx, 0);
|
||||
mysql_cond_init(PSI_INSTRUMENT_ME, &m_cond, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
Currently only used with threadpool, one can "suspend" and "resume" a THD.
|
||||
Suspend only means leaving do_command earlier, after saving some state.
|
||||
Resume is continuing suspended THD's do_command(), from where it finished last time.
|
||||
*/
|
||||
bool try_suspend()
|
||||
{
|
||||
bool ret;
|
||||
mysql_mutex_lock(&m_mtx);
|
||||
DBUG_ASSERT(m_state == enum_async_state::NONE);
|
||||
DBUG_ASSERT(m_pending_ops >= 0);
|
||||
|
||||
if(m_pending_ops)
|
||||
{
|
||||
ret=true;
|
||||
m_state= enum_async_state::SUSPENDED;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
If there is no pending operations, can't suspend, since
|
||||
nobody can resume it.
|
||||
*/
|
||||
ret=false;
|
||||
}
|
||||
mysql_mutex_unlock(&m_mtx);
|
||||
return ret;
|
||||
}
|
||||
|
||||
~thd_async_state()
|
||||
{
|
||||
wait_for_pending_ops();
|
||||
mysql_mutex_destroy(&m_mtx);
|
||||
mysql_cond_destroy(&m_cond);
|
||||
}
|
||||
|
||||
/*
|
||||
Increment pending asynchronous operations.
|
||||
The client response may not be written if
|
||||
this count > 0.
|
||||
So, without threadpool query needs to wait for
|
||||
the operations to finish.
|
||||
With threadpool, THD can be suspended and resumed
|
||||
when this counter goes to 0.
|
||||
*/
|
||||
void inc_pending_ops()
|
||||
{
|
||||
mysql_mutex_lock(&m_mtx);
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
/*
|
||||
Check that increments are always done by the same thread.
|
||||
*/
|
||||
if (!m_pending_ops)
|
||||
m_dbg_thread= pthread_self();
|
||||
else
|
||||
DBUG_ASSERT(pthread_equal(pthread_self(),m_dbg_thread));
|
||||
#endif
|
||||
|
||||
m_pending_ops++;
|
||||
mysql_mutex_unlock(&m_mtx);
|
||||
}
|
||||
|
||||
int dec_pending_ops(enum_async_state* state)
|
||||
{
|
||||
int ret;
|
||||
mysql_mutex_lock(&m_mtx);
|
||||
ret= --m_pending_ops;
|
||||
if (!ret)
|
||||
mysql_cond_signal(&m_cond);
|
||||
*state = m_state;
|
||||
mysql_mutex_unlock(&m_mtx);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
This is used for "dirty" reading pending ops,
|
||||
when dirty read is OK.
|
||||
*/
|
||||
int pending_ops()
|
||||
{
|
||||
return m_pending_ops;
|
||||
}
|
||||
|
||||
/* Wait for pending operations to finish.*/
|
||||
void wait_for_pending_ops()
|
||||
{
|
||||
/*
|
||||
It is fine to read m_pending_ops and compare it with 0,
|
||||
without mutex protection.
|
||||
|
||||
The value is only incremented by the current thread, and will
|
||||
be decremented by another one, thus "dirty" may show positive number
|
||||
when it is really 0, but this is not a problem, and the only
|
||||
bad thing from that will be rechecking under mutex.
|
||||
*/
|
||||
if (!pending_ops())
|
||||
return;
|
||||
|
||||
mysql_mutex_lock(&m_mtx);
|
||||
DBUG_ASSERT(m_pending_ops >= 0);
|
||||
while (m_pending_ops)
|
||||
mysql_cond_wait(&m_cond, &m_mtx);
|
||||
mysql_mutex_unlock(&m_mtx);
|
||||
}
|
||||
};
|
||||
|
||||
extern "C" MYSQL_THD thd_increment_pending_ops(void);
|
||||
extern "C" void thd_decrement_pending_ops(MYSQL_THD);
|
||||
|
||||
|
||||
/**
|
||||
@class THD
|
||||
@ -5025,6 +5183,7 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
thd_async_state async_state;
|
||||
#ifdef HAVE_REPLICATION
|
||||
/*
|
||||
If we do a purge of binary logs, log index info of the threads
|
||||
|
@ -1168,25 +1168,55 @@ static enum enum_server_command fetch_command(THD *thd, char *packet)
|
||||
|
||||
/**
|
||||
Read one command from connection and execute it (query or simple command).
|
||||
This function is called in loop from thread function.
|
||||
This function is to be used by different schedulers (one-thread-per-connection,
|
||||
pool-of-threads)
|
||||
|
||||
For profiling to work, it must never be called recursively.
|
||||
|
||||
@param thd - client connection context
|
||||
|
||||
@param blocking - wait for command to finish.
|
||||
if false (nonblocking), then the function might
|
||||
return when command is "half-finished", with
|
||||
DISPATCH_COMMAND_WOULDBLOCK.
|
||||
Currenly, this can *only* happen when using
|
||||
threadpool. The command will resume, after all outstanding
|
||||
async operations (i.e group commit) finish.
|
||||
Threadpool scheduler takes care of "resume".
|
||||
|
||||
@retval
|
||||
0 success
|
||||
DISPATCH_COMMAND_SUCCESS - success
|
||||
@retval
|
||||
1 request of thread shutdown (see dispatch_command() description)
|
||||
DISPATCH_COMMAND_CLOSE_CONNECTION request of THD shutdown
|
||||
(s. dispatch_command() description)
|
||||
@retval
|
||||
DISPATCH_COMMAND_WOULDBLOCK - need to wait for asyncronous operations
|
||||
to finish. Only returned if parameter
|
||||
'blocking' is false.
|
||||
*/
|
||||
|
||||
bool do_command(THD *thd)
|
||||
dispatch_command_return do_command(THD *thd, bool blocking)
|
||||
{
|
||||
bool return_value;
|
||||
dispatch_command_return return_value;
|
||||
char *packet= 0;
|
||||
ulong packet_length;
|
||||
NET *net= &thd->net;
|
||||
enum enum_server_command command;
|
||||
DBUG_ENTER("do_command");
|
||||
|
||||
DBUG_ASSERT(!thd->async_state.pending_ops());
|
||||
if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
|
||||
{
|
||||
/*
|
||||
Resuming previously suspended command.
|
||||
Restore the state
|
||||
*/
|
||||
command = thd->async_state.m_command;
|
||||
packet = thd->async_state.m_packet.str;
|
||||
packet_length = (ulong)thd->async_state.m_packet.length;
|
||||
goto resume;
|
||||
}
|
||||
|
||||
/*
|
||||
indicator of uninitialized lex => normal flow of errors handling
|
||||
(see my_message_sql)
|
||||
@ -1253,12 +1283,12 @@ bool do_command(THD *thd)
|
||||
|
||||
if (net->error != 3)
|
||||
{
|
||||
return_value= TRUE; // We have to close it.
|
||||
return_value= DISPATCH_COMMAND_CLOSE_CONNECTION; // We have to close it.
|
||||
goto out;
|
||||
}
|
||||
|
||||
net->error= 0;
|
||||
return_value= FALSE;
|
||||
return_value= DISPATCH_COMMAND_SUCCESS;
|
||||
goto out;
|
||||
}
|
||||
|
||||
@ -1325,7 +1355,7 @@ bool do_command(THD *thd)
|
||||
MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
|
||||
thd->m_statement_psi= NULL;
|
||||
thd->m_digest= NULL;
|
||||
return_value= FALSE;
|
||||
return_value= DISPATCH_COMMAND_SUCCESS;
|
||||
|
||||
wsrep_after_command_before_result(thd);
|
||||
goto out;
|
||||
@ -1351,7 +1381,7 @@ bool do_command(THD *thd)
|
||||
thd->m_statement_psi= NULL;
|
||||
thd->m_digest= NULL;
|
||||
|
||||
return_value= FALSE;
|
||||
return_value= DISPATCH_COMMAND_SUCCESS;
|
||||
wsrep_after_command_before_result(thd);
|
||||
goto out;
|
||||
}
|
||||
@ -1362,8 +1392,18 @@ bool do_command(THD *thd)
|
||||
|
||||
DBUG_ASSERT(packet_length);
|
||||
DBUG_ASSERT(!thd->apc_target.is_enabled());
|
||||
|
||||
resume:
|
||||
return_value= dispatch_command(command, thd, packet+1,
|
||||
(uint) (packet_length-1));
|
||||
(uint) (packet_length-1), blocking);
|
||||
if (return_value == DISPATCH_COMMAND_WOULDBLOCK)
|
||||
{
|
||||
/* Save current state, and resume later.*/
|
||||
thd->async_state.m_command= command;
|
||||
thd->async_state.m_packet={packet,packet_length};
|
||||
DBUG_RETURN(return_value);
|
||||
}
|
||||
|
||||
DBUG_ASSERT(!thd->apc_target.is_enabled());
|
||||
|
||||
out:
|
||||
@ -1510,6 +1550,13 @@ public:
|
||||
@param packet_length length of packet + 1 (to show that data is
|
||||
null-terminated) except for COM_SLEEP, where it
|
||||
can be zero.
|
||||
@param blocking if false (nonblocking), then the function might
|
||||
return when command is "half-finished", with
|
||||
DISPATCH_COMMAND_WOULDBLOCK.
|
||||
Currenly, this can *only* happen when using threadpool.
|
||||
The current command will resume, after all outstanding
|
||||
async operations (i.e group commit) finish.
|
||||
Threadpool scheduler takes care of "resume".
|
||||
|
||||
@todo
|
||||
set thd->lex->sql_command to SQLCOM_END here.
|
||||
@ -1522,8 +1569,8 @@ public:
|
||||
1 request of thread shutdown, i. e. if command is
|
||||
COM_QUIT/COM_SHUTDOWN
|
||||
*/
|
||||
bool dispatch_command(enum enum_server_command command, THD *thd,
|
||||
char* packet, uint packet_length)
|
||||
dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
|
||||
char* packet, uint packet_length, bool blocking)
|
||||
{
|
||||
NET *net= &thd->net;
|
||||
bool error= 0;
|
||||
@ -1535,6 +1582,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
|
||||
"<?>")));
|
||||
bool drop_more_results= 0;
|
||||
|
||||
if (thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
|
||||
{
|
||||
thd->async_state.m_state = thd_async_state::enum_async_state::NONE;
|
||||
goto resume;
|
||||
}
|
||||
|
||||
/* keep it withing 1 byte */
|
||||
compile_time_assert(COM_END == 255);
|
||||
|
||||
@ -2265,6 +2318,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
|
||||
general_log_print(thd, command, NullS);
|
||||
my_eof(thd);
|
||||
break;
|
||||
|
||||
case COM_SLEEP:
|
||||
case COM_CONNECT: // Impossible here
|
||||
case COM_TIME: // Impossible from client
|
||||
@ -2278,7 +2332,18 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
|
||||
}
|
||||
|
||||
dispatch_end:
|
||||
do_end_of_statement= true;
|
||||
/*
|
||||
For the threadpool i.e if non-blocking call, if not all async operations
|
||||
are finished, return without cleanup. The cleanup will be done on
|
||||
later, when command execution is resumed.
|
||||
*/
|
||||
if (!blocking && !error && thd->async_state.pending_ops())
|
||||
{
|
||||
DBUG_RETURN(DISPATCH_COMMAND_WOULDBLOCK);
|
||||
}
|
||||
|
||||
resume:
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
/*
|
||||
Next test should really be WSREP(thd), but that causes a failure when doing
|
||||
@ -2382,7 +2447,7 @@ dispatch_end:
|
||||
/* Check that some variables are reset properly */
|
||||
DBUG_ASSERT(thd->abort_on_warning == 0);
|
||||
thd->lex->restore_set_statement_var();
|
||||
DBUG_RETURN(error);
|
||||
DBUG_RETURN(error?DISPATCH_COMMAND_CLOSE_CONNECTION: DISPATCH_COMMAND_SUCCESS);
|
||||
}
|
||||
|
||||
static bool slow_filter_masked(THD *thd, ulonglong mask)
|
||||
|
@ -100,9 +100,16 @@ bool multi_delete_set_locks_and_link_aux_tables(LEX *lex);
|
||||
void create_table_set_open_action_and_adjust_tables(LEX *lex);
|
||||
int bootstrap(MYSQL_FILE *file);
|
||||
int mysql_execute_command(THD *thd);
|
||||
bool do_command(THD *thd);
|
||||
bool dispatch_command(enum enum_server_command command, THD *thd,
|
||||
char* packet, uint packet_length);
|
||||
enum dispatch_command_return
|
||||
{
|
||||
DISPATCH_COMMAND_SUCCESS=0,
|
||||
DISPATCH_COMMAND_CLOSE_CONNECTION= 1,
|
||||
DISPATCH_COMMAND_WOULDBLOCK= 2
|
||||
};
|
||||
|
||||
dispatch_command_return do_command(THD *thd, bool blocking = true);
|
||||
dispatch_command_return dispatch_command(enum enum_server_command command, THD *thd,
|
||||
char* packet, uint packet_length, bool blocking = true);
|
||||
void log_slow_statement(THD *thd);
|
||||
bool append_file_to_dir(THD *thd, const char **filename_ptr,
|
||||
const LEX_CSTRING *table_name);
|
||||
|
@ -133,6 +133,7 @@ struct TP_pool
|
||||
virtual int set_stall_limit(uint){ return 0; }
|
||||
virtual int get_thread_count() { return tp_stats.num_worker_threads; }
|
||||
virtual int get_idle_thread_count(){ return 0; }
|
||||
virtual void resume(TP_connection* c)=0;
|
||||
};
|
||||
|
||||
#ifdef _WIN32
|
||||
@ -146,6 +147,7 @@ struct TP_pool_win:TP_pool
|
||||
virtual void add(TP_connection *);
|
||||
virtual int set_max_threads(uint);
|
||||
virtual int set_min_threads(uint);
|
||||
void resume(TP_connection *c);
|
||||
};
|
||||
#endif
|
||||
|
||||
@ -159,6 +161,7 @@ struct TP_pool_generic :TP_pool
|
||||
virtual int set_pool_size(uint);
|
||||
virtual int set_stall_limit(uint);
|
||||
virtual int get_idle_thread_count();
|
||||
void resume(TP_connection* c);
|
||||
};
|
||||
|
||||
#endif /* HAVE_POOL_OF_THREADS */
|
||||
|
@ -23,6 +23,8 @@
|
||||
#include <sql_audit.h>
|
||||
#include <debug_sync.h>
|
||||
#include <threadpool.h>
|
||||
#include <sql_class.h>
|
||||
#include <sql_parse.h>
|
||||
|
||||
#ifdef WITH_WSREP
|
||||
#include "wsrep_trans_observer.h"
|
||||
@ -51,7 +53,7 @@ TP_STATISTICS tp_stats;
|
||||
|
||||
|
||||
static void threadpool_remove_connection(THD *thd);
|
||||
static int threadpool_process_request(THD *thd);
|
||||
static dispatch_command_return threadpool_process_request(THD *thd);
|
||||
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
|
||||
|
||||
extern bool do_command(THD*);
|
||||
@ -195,10 +197,30 @@ void tp_callback(TP_connection *c)
|
||||
}
|
||||
c->connect= 0;
|
||||
}
|
||||
else if (threadpool_process_request(thd))
|
||||
else
|
||||
{
|
||||
retry:
|
||||
switch(threadpool_process_request(thd))
|
||||
{
|
||||
case DISPATCH_COMMAND_WOULDBLOCK:
|
||||
if (!thd->async_state.try_suspend())
|
||||
{
|
||||
/*
|
||||
All async operations finished meanwhile, thus nobody is will wake up
|
||||
this THD. Therefore, we'll resume "manually" here.
|
||||
*/
|
||||
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
|
||||
goto retry;
|
||||
}
|
||||
worker_context.restore();
|
||||
return;
|
||||
case DISPATCH_COMMAND_CLOSE_CONNECTION:
|
||||
/* QUIT or an error occurred. */
|
||||
goto error;
|
||||
case DISPATCH_COMMAND_SUCCESS:
|
||||
break;
|
||||
}
|
||||
thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
|
||||
}
|
||||
|
||||
/* Set priority */
|
||||
@ -331,10 +353,13 @@ static bool has_unread_data(THD* thd)
|
||||
/**
|
||||
Process a single client request or a single batch.
|
||||
*/
|
||||
static int threadpool_process_request(THD *thd)
|
||||
static dispatch_command_return threadpool_process_request(THD *thd)
|
||||
{
|
||||
int retval= 0;
|
||||
dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
|
||||
|
||||
thread_attach(thd);
|
||||
if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
|
||||
goto resume;
|
||||
|
||||
if (thd->killed >= KILL_CONNECTION)
|
||||
{
|
||||
@ -342,7 +367,7 @@ static int threadpool_process_request(THD *thd)
|
||||
killed flag was set by timeout handler
|
||||
or KILL command. Return error.
|
||||
*/
|
||||
retval= 1;
|
||||
retval= DISPATCH_COMMAND_CLOSE_CONNECTION;
|
||||
if(thd->killed == KILL_WAIT_TIMEOUT)
|
||||
handle_wait_timeout(thd);
|
||||
goto end;
|
||||
@ -365,12 +390,20 @@ static int threadpool_process_request(THD *thd)
|
||||
if (mysql_audit_release_required(thd))
|
||||
mysql_audit_release(thd);
|
||||
|
||||
if ((retval= do_command(thd)) != 0)
|
||||
resume:
|
||||
retval= do_command(thd, false);
|
||||
switch(retval)
|
||||
{
|
||||
case DISPATCH_COMMAND_WOULDBLOCK:
|
||||
case DISPATCH_COMMAND_CLOSE_CONNECTION:
|
||||
goto end;
|
||||
case DISPATCH_COMMAND_SUCCESS:
|
||||
break;
|
||||
}
|
||||
|
||||
if (!thd_is_connection_alive(thd))
|
||||
{
|
||||
retval= 1;
|
||||
retval=DISPATCH_COMMAND_CLOSE_CONNECTION;
|
||||
goto end;
|
||||
}
|
||||
|
||||
@ -527,6 +560,15 @@ static void tp_post_kill_notification(THD *thd)
|
||||
post_kill_notification(thd);
|
||||
}
|
||||
|
||||
/* Resume previously suspended THD */
|
||||
static void tp_resume(THD* thd)
|
||||
{
|
||||
DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
|
||||
thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
|
||||
TP_connection* c = get_TP_connection(thd);
|
||||
pool->resume(c);
|
||||
}
|
||||
|
||||
static scheduler_functions tp_scheduler_functions=
|
||||
{
|
||||
0, // max_threads
|
||||
@ -537,7 +579,8 @@ static scheduler_functions tp_scheduler_functions=
|
||||
tp_wait_begin, // thd_wait_begin
|
||||
tp_wait_end, // thd_wait_end
|
||||
tp_post_kill_notification, // post kill notification
|
||||
tp_end // end
|
||||
tp_end, // end
|
||||
tp_resume
|
||||
};
|
||||
|
||||
void pool_of_threads_scheduler(struct scheduler_functions *func,
|
||||
|
@ -1327,7 +1327,10 @@ void TP_pool_generic::add(TP_connection *c)
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
|
||||
void TP_pool_generic::resume(TP_connection* c)
|
||||
{
|
||||
add(c);
|
||||
}
|
||||
|
||||
/**
|
||||
MySQL scheduler callback: wait begin
|
||||
|
@ -125,6 +125,12 @@ void TP_pool_win::add(TP_connection *c)
|
||||
}
|
||||
}
|
||||
|
||||
void TP_pool_win::resume(TP_connection* c)
|
||||
{
|
||||
DBUG_ASSERT(c->state == TP_STATE_RUNNING);
|
||||
SubmitThreadpoolWork(((TP_connection_win*)c)->work);
|
||||
}
|
||||
|
||||
#define CHECK_ALLOC_ERROR(op) \
|
||||
do \
|
||||
{ \
|
||||
@ -438,3 +444,4 @@ TP_connection *TP_pool_win::new_connection(CONNECT *connect)
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
||||
|
@ -103,15 +103,21 @@ bool
|
||||
log_set_capacity(ulonglong file_size)
|
||||
MY_ATTRIBUTE((warn_unused_result));
|
||||
|
||||
/** Ensure that the log has been written to the log file up to a given
|
||||
/**
|
||||
Ensure that the log has been written to the log file up to a given
|
||||
log entry (such as that of a transaction commit). Start a new write, or
|
||||
wait and check if an already running write is covering the request.
|
||||
@param[in] lsn log sequence number that should be
|
||||
included in the redo log file write
|
||||
@param[in] flush_to_disk whether the written log should also
|
||||
be flushed to the file system
|
||||
@param[in] rotate_key whether to rotate the encryption key */
|
||||
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false);
|
||||
@param[in] rotate_key whether to rotate the encryption key
|
||||
@param[in] cb completion callback. If not NULL, the callback will be called
|
||||
whenever lsn is written or flushed.
|
||||
*/
|
||||
struct completion_callback;
|
||||
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key = false,
|
||||
const completion_callback* cb=nullptr);
|
||||
|
||||
/** write to the log file up to the last log entry.
|
||||
@param[in] sync whether we want the written log
|
||||
|
@ -771,6 +771,7 @@ bool log_write_lock_own()
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
/** Ensure that the log has been written to the log file up to a given
|
||||
log entry (such as that of a transaction commit). Start a new write, or
|
||||
wait and check if an already running write is covering the request.
|
||||
@ -779,7 +780,8 @@ included in the redo log file write
|
||||
@param[in] flush_to_disk whether the written log should also
|
||||
be flushed to the file system
|
||||
@param[in] rotate_key whether to rotate the encryption key */
|
||||
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
|
||||
void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key,
|
||||
const completion_callback *callback)
|
||||
{
|
||||
ut_ad(!srv_read_only_mode);
|
||||
ut_ad(!rotate_key || flush_to_disk);
|
||||
@ -788,16 +790,18 @@ void log_write_up_to(lsn_t lsn, bool flush_to_disk, bool rotate_key)
|
||||
{
|
||||
/* Recovery is running and no operations on the log files are
|
||||
allowed yet (the variable name .._no_ibuf_.. is misleading) */
|
||||
ut_a(!callback);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flush_to_disk &&
|
||||
flush_lock.acquire(lsn) != group_commit_lock::ACQUIRED)
|
||||
flush_lock.acquire(lsn, callback) != group_commit_lock::ACQUIRED)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (write_lock.acquire(lsn) == group_commit_lock::ACQUIRED)
|
||||
if (write_lock.acquire(lsn, flush_to_disk?0:callback) ==
|
||||
group_commit_lock::ACQUIRED)
|
||||
{
|
||||
mysql_mutex_lock(&log_sys.mutex);
|
||||
lsn_t write_lsn= log_sys.get_lsn();
|
||||
|
@ -77,6 +77,7 @@ Note that if write operation is very fast, a) or b) can be fine as alternative.
|
||||
#include <log0types.h>
|
||||
#include "log0sync.h"
|
||||
#include <mysql/service_thd_wait.h>
|
||||
#include <sql_class.h>
|
||||
/**
|
||||
Helper class , used in group commit lock.
|
||||
|
||||
@ -158,10 +159,10 @@ void binary_semaphore::wake()
|
||||
/* A thread helper structure, used in group commit lock below*/
|
||||
struct group_commit_waiter_t
|
||||
{
|
||||
lsn_t m_value;
|
||||
binary_semaphore m_sema;
|
||||
group_commit_waiter_t* m_next;
|
||||
group_commit_waiter_t() :m_value(), m_sema(), m_next() {}
|
||||
lsn_t m_value=0;
|
||||
binary_semaphore m_sema{};
|
||||
group_commit_waiter_t* m_next= nullptr;
|
||||
bool m_group_commit_leader=false;
|
||||
};
|
||||
|
||||
group_commit_lock::group_commit_lock() :
|
||||
@ -188,7 +189,13 @@ void group_commit_lock::set_pending(group_commit_lock::value_type num)
|
||||
const unsigned int MAX_SPINS = 1; /** max spins in acquire */
|
||||
thread_local group_commit_waiter_t thread_local_waiter;
|
||||
|
||||
group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
|
||||
static inline void do_completion_callback(const completion_callback* cb)
|
||||
{
|
||||
if (cb)
|
||||
cb->m_callback(cb->m_param);
|
||||
}
|
||||
|
||||
group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num, const completion_callback *callback)
|
||||
{
|
||||
unsigned int spins = MAX_SPINS;
|
||||
|
||||
@ -197,6 +204,7 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
|
||||
if (num <= value())
|
||||
{
|
||||
/* No need to wait.*/
|
||||
do_completion_callback(callback);
|
||||
return lock_return_code::EXPIRED;
|
||||
}
|
||||
|
||||
@ -212,17 +220,23 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
|
||||
}
|
||||
|
||||
thread_local_waiter.m_value = num;
|
||||
thread_local_waiter.m_group_commit_leader= false;
|
||||
std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
|
||||
while (num > value())
|
||||
{
|
||||
lk.lock();
|
||||
|
||||
/* Re-read current value after acquiring the lock*/
|
||||
if (num <= value())
|
||||
if (num <= value() &&
|
||||
(!thread_local_waiter.m_group_commit_leader || m_lock))
|
||||
{
|
||||
lk.unlock();
|
||||
do_completion_callback(callback);
|
||||
thread_local_waiter.m_group_commit_leader=false;
|
||||
return lock_return_code::EXPIRED;
|
||||
}
|
||||
|
||||
thread_local_waiter.m_group_commit_leader= false;
|
||||
if (!m_lock)
|
||||
{
|
||||
/* Take the lock, become group commit leader.*/
|
||||
@ -230,9 +244,21 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
|
||||
#ifndef DBUG_OFF
|
||||
m_owner_id = std::this_thread::get_id();
|
||||
#endif
|
||||
if (callback)
|
||||
m_pending_callbacks.push_back({num,*callback});
|
||||
return lock_return_code::ACQUIRED;
|
||||
}
|
||||
|
||||
if (callback && m_waiters_list)
|
||||
{
|
||||
/*
|
||||
We need to have at least one waiter,
|
||||
so it can become the new group commit leader.
|
||||
*/
|
||||
m_pending_callbacks.push_back({num, *callback});
|
||||
return lock_return_code::CALLBACK_QUEUED;
|
||||
}
|
||||
|
||||
/* Add yourself to waiters list.*/
|
||||
thread_local_waiter.m_next = m_waiters_list;
|
||||
m_waiters_list = &thread_local_waiter;
|
||||
@ -244,11 +270,15 @@ group_commit_lock::lock_return_code group_commit_lock::acquire(value_type num)
|
||||
thd_wait_end(0);
|
||||
|
||||
}
|
||||
do_completion_callback(callback);
|
||||
return lock_return_code::EXPIRED;
|
||||
}
|
||||
|
||||
void group_commit_lock::release(value_type num)
|
||||
{
|
||||
completion_callback callbacks[1000];
|
||||
size_t callback_count = 0;
|
||||
|
||||
std::unique_lock<std::mutex> lk(m_mtx);
|
||||
m_lock = false;
|
||||
|
||||
@ -262,12 +292,21 @@ void group_commit_lock::release(value_type num)
|
||||
*/
|
||||
group_commit_waiter_t* cur, * prev, * next;
|
||||
group_commit_waiter_t* wakeup_list = nullptr;
|
||||
int extra_wake = 0;
|
||||
for (auto& c : m_pending_callbacks)
|
||||
{
|
||||
if (c.first <= num)
|
||||
{
|
||||
if (callback_count < array_elements(callbacks))
|
||||
callbacks[callback_count++] = c.second;
|
||||
else
|
||||
c.second.m_callback(c.second.m_param);
|
||||
}
|
||||
}
|
||||
|
||||
for (prev= nullptr, cur= m_waiters_list; cur; cur= next)
|
||||
{
|
||||
next= cur->m_next;
|
||||
if (cur->m_value <= num || extra_wake++ == 0)
|
||||
if (cur->m_value <= num)
|
||||
{
|
||||
/* Move current waiter to wakeup_list*/
|
||||
|
||||
@ -291,8 +330,43 @@ void group_commit_lock::release(value_type num)
|
||||
prev= cur;
|
||||
}
|
||||
}
|
||||
|
||||
auto it= std::remove_if(
|
||||
m_pending_callbacks.begin(), m_pending_callbacks.end(),
|
||||
[num](const pending_cb &c) { return c.first <= num; });
|
||||
|
||||
m_pending_callbacks.erase(it, m_pending_callbacks.end());
|
||||
|
||||
if (m_pending_callbacks.size() || m_waiters_list)
|
||||
{
|
||||
/*
|
||||
Ensure that after this thread released the lock,
|
||||
there is a new group commit leader
|
||||
We take this from waiters list or wakeup list. It
|
||||
might look like a spurious wake, but in fact we just
|
||||
ensure the waiter do not wait for eternity.
|
||||
*/
|
||||
if (m_waiters_list)
|
||||
{
|
||||
/* Move one waiter to wakeup list */
|
||||
auto e= m_waiters_list;
|
||||
m_waiters_list= m_waiters_list->m_next;
|
||||
e->m_next= wakeup_list;
|
||||
e->m_group_commit_leader= true;
|
||||
wakeup_list = e;
|
||||
}
|
||||
else
|
||||
{
|
||||
ut_a(wakeup_list);
|
||||
wakeup_list->m_group_commit_leader=true;
|
||||
}
|
||||
}
|
||||
|
||||
lk.unlock();
|
||||
|
||||
for (size_t i = 0; i < callback_count; i++)
|
||||
callbacks[i].m_callback(callbacks[i].m_param);
|
||||
|
||||
for (cur= wakeup_list; cur; cur= next)
|
||||
{
|
||||
next= cur->m_next;
|
||||
|
@ -18,8 +18,14 @@ this program; if not, write to the Free Software Foundation, Inc.,
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <log0types.h>
|
||||
#include <vector>
|
||||
|
||||
struct group_commit_waiter_t;
|
||||
struct completion_callback
|
||||
{
|
||||
void (*m_callback)(void*);
|
||||
void* m_param;
|
||||
};
|
||||
|
||||
/**
|
||||
Special synchronization primitive, which is helpful for
|
||||
@ -63,14 +69,19 @@ class group_commit_lock
|
||||
std::atomic<value_type> m_pending_value;
|
||||
bool m_lock;
|
||||
group_commit_waiter_t* m_waiters_list;
|
||||
|
||||
typedef std::pair<value_type, completion_callback> pending_cb;
|
||||
std::vector<pending_cb> m_pending_callbacks;
|
||||
|
||||
public:
|
||||
group_commit_lock();
|
||||
enum lock_return_code
|
||||
{
|
||||
ACQUIRED,
|
||||
EXPIRED
|
||||
EXPIRED,
|
||||
CALLBACK_QUEUED
|
||||
};
|
||||
lock_return_code acquire(value_type num);
|
||||
lock_return_code acquire(value_type num, const completion_callback *cb);
|
||||
void release(value_type num);
|
||||
value_type value() const;
|
||||
value_type pending() const;
|
||||
|
@ -1162,35 +1162,49 @@ trx_finalize_for_fts(
|
||||
trx->fts_trx = NULL;
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
If required, flushes the log to disk based on the value of
|
||||
innodb_flush_log_at_trx_commit. */
|
||||
static
|
||||
void
|
||||
trx_flush_log_if_needed_low(
|
||||
/*========================*/
|
||||
lsn_t lsn) /*!< in: lsn up to which logs are to be
|
||||
flushed. */
|
||||
{
|
||||
bool flush = srv_file_flush_method != SRV_NOSYNC;
|
||||
extern "C" MYSQL_THD thd_increment_pending_ops();
|
||||
extern "C" void thd_decrement_pending_ops(MYSQL_THD);
|
||||
|
||||
switch (srv_flush_log_at_trx_commit) {
|
||||
case 3:
|
||||
case 2:
|
||||
/* Write the log but do not flush it to disk */
|
||||
flush = false;
|
||||
/* fall through */
|
||||
case 1:
|
||||
/* Write the log and optionally flush it to disk */
|
||||
log_write_up_to(lsn, flush);
|
||||
srv_inc_activity_count();
|
||||
|
||||
#include "../log/log0sync.h"
|
||||
|
||||
/*
|
||||
If required, initiates write and optionally flush of the log to
|
||||
disk
|
||||
@param[in] lsn - lsn up to which logs are to be flushed.
|
||||
@param[in] trx_state - if trx_state is PREPARED, the function will
|
||||
also wait for the flush to complete.
|
||||
*/
|
||||
static void trx_flush_log_if_needed_low(lsn_t lsn, trx_state_t trx_state)
|
||||
{
|
||||
if (!srv_flush_log_at_trx_commit)
|
||||
return;
|
||||
case 0:
|
||||
/* Do nothing */
|
||||
|
||||
if (log_sys.get_flushed_lsn() > lsn)
|
||||
return;
|
||||
|
||||
bool flush= srv_file_flush_method != SRV_NOSYNC &&
|
||||
srv_flush_log_at_trx_commit == 1;
|
||||
|
||||
if (trx_state == TRX_STATE_PREPARED)
|
||||
{
|
||||
/* XA, which is used with binlog as well.
|
||||
Be conservative, use synchronous wait.*/
|
||||
log_write_up_to(lsn, flush);
|
||||
return;
|
||||
}
|
||||
|
||||
ut_error;
|
||||
completion_callback cb;
|
||||
if ((cb.m_param = thd_increment_pending_ops()))
|
||||
{
|
||||
cb.m_callback = (void (*)(void *)) thd_decrement_pending_ops;
|
||||
log_write_up_to(lsn, flush, false, &cb);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* No THD, synchronous write */
|
||||
log_write_up_to(lsn, flush);
|
||||
}
|
||||
}
|
||||
|
||||
/**********************************************************************//**
|
||||
@ -1205,7 +1219,7 @@ trx_flush_log_if_needed(
|
||||
trx_t* trx) /*!< in/out: transaction */
|
||||
{
|
||||
trx->op_info = "flushing log";
|
||||
trx_flush_log_if_needed_low(lsn);
|
||||
trx_flush_log_if_needed_low(lsn,trx->state);
|
||||
trx->op_info = "";
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user