MDEV-22990 Threadpool : Optimize network/named pipe IO for Windows
This patch reduces the overhead of system calls prior to a query, for threadpool. Previously, 3 system calls were done 1. WSARecv() to get notification of input data from client, asynchronous equivalent of select() in one-thread-per-connection 2. recv(4 bytes) - reading packet header length 3. recv(packet payload) Now there will be usually, just WSARecv(), which pre-reads user data into a buffer, so we spared 2 syscalls Profiler shows the most expensive call WSARecv(16%CPU) becomes 4% CPU, after the patch, benchmark results (network heavy ones like point-select) improve by ~20% The buffer management was rather carefully done to keep buffers together, as Windows would keeps the pages pinned in memory for the duration of async calls. At most 1MB memory is used for the buffers, and overhead per-connection is only 256 bytes, which should cover most of the uses. SSL does not yet use the optmization, so far it does not properly use VIO for reads and writes. Neither one-thread-per-connection would get any benefit, but that should be fine, it is not even default on Windows.
This commit is contained in:
parent
b3acad4a48
commit
d15c839c0d
@ -278,6 +278,7 @@ struct st_vio
|
||||
#ifdef _WIN32
|
||||
HANDLE hPipe;
|
||||
OVERLAPPED overlapped;
|
||||
void *tp_ctx; /* threadpool context */
|
||||
#endif
|
||||
};
|
||||
#endif /* vio_violite_h_ */
|
||||
|
@ -171,7 +171,7 @@ IF ((CMAKE_SYSTEM_NAME MATCHES "Linux" OR
|
||||
AND (NOT DISABLE_THREADPOOL))
|
||||
ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS)
|
||||
IF(WIN32)
|
||||
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
|
||||
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc threadpool_winsockets.cc threadpool_winsockets.h)
|
||||
ENDIF()
|
||||
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
|
||||
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_common.cc)
|
||||
|
@ -37,6 +37,8 @@ extern uint threadpool_mode; /* Thread pool implementation , windows or generic
|
||||
#define DEFAULT_THREADPOOL_STALL_LIMIT 500U
|
||||
|
||||
struct TP_connection;
|
||||
struct st_vio;
|
||||
|
||||
extern void tp_callback(TP_connection *c);
|
||||
extern void tp_timeout_handler(TP_connection *c);
|
||||
|
||||
@ -115,7 +117,7 @@ struct TP_connection
|
||||
|
||||
virtual void wait_begin(int type)= 0;
|
||||
virtual void wait_end() = 0;
|
||||
|
||||
IF_WIN(virtual,) void init_vio(st_vio *){};
|
||||
};
|
||||
|
||||
|
||||
@ -136,6 +138,7 @@ struct TP_pool
|
||||
};
|
||||
|
||||
#ifdef _WIN32
|
||||
|
||||
struct TP_pool_win:TP_pool
|
||||
{
|
||||
TP_pool_win();
|
||||
|
@ -28,6 +28,10 @@
|
||||
#include "wsrep_trans_observer.h"
|
||||
#endif /* WITH_WSREP */
|
||||
|
||||
#ifdef _WIN32
|
||||
#include "threadpool_winsockets.h"
|
||||
#endif
|
||||
|
||||
/* Threadpool parameters */
|
||||
|
||||
uint threadpool_min_threads;
|
||||
@ -48,7 +52,7 @@ TP_STATISTICS tp_stats;
|
||||
|
||||
static void threadpool_remove_connection(THD *thd);
|
||||
static int threadpool_process_request(THD *thd);
|
||||
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
|
||||
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
|
||||
|
||||
extern bool do_command(THD*);
|
||||
|
||||
@ -221,7 +225,7 @@ error:
|
||||
}
|
||||
|
||||
|
||||
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
|
||||
static THD *threadpool_add_connection(CONNECT *connect, TP_connection *c)
|
||||
{
|
||||
THD *thd= NULL;
|
||||
|
||||
@ -243,9 +247,10 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
|
||||
return NULL;
|
||||
}
|
||||
delete connect;
|
||||
|
||||
server_threads.insert(thd);
|
||||
thd->set_mysys_var(mysys_var);
|
||||
thd->event_scheduler.data= scheduler_data;
|
||||
thd->event_scheduler.data = c;
|
||||
|
||||
/* Login. */
|
||||
thread_attach(thd);
|
||||
@ -260,6 +265,8 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
|
||||
if (thd_prepare_connection(thd))
|
||||
goto end;
|
||||
|
||||
c->init_vio(thd->net.vio);
|
||||
|
||||
/*
|
||||
Check if THD is ok, as prepare_new_connection_state()
|
||||
can fail, for example if init command failed.
|
||||
@ -397,6 +404,9 @@ static bool tp_init()
|
||||
pool= 0;
|
||||
return true;
|
||||
}
|
||||
#ifdef _WIN32
|
||||
init_win_aio_buffers(max_connections);
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -484,6 +494,9 @@ static void tp_wait_end(THD *thd)
|
||||
static void tp_end()
|
||||
{
|
||||
delete pool;
|
||||
#ifdef _WIN32
|
||||
destroy_win_aio_buffers();
|
||||
#endif
|
||||
}
|
||||
|
||||
static void tp_post_kill_notification(THD *thd)
|
||||
|
@ -29,8 +29,8 @@
|
||||
#include <sql_plist.h>
|
||||
#include <threadpool.h>
|
||||
#include <algorithm>
|
||||
|
||||
#ifdef HAVE_IOCP
|
||||
#ifdef _WIN32
|
||||
#include "threadpool_winsockets.h"
|
||||
#define OPTIONAL_IO_POLL_READ_PARAM this
|
||||
#else
|
||||
#define OPTIONAL_IO_POLL_READ_PARAM 0
|
||||
@ -348,7 +348,7 @@ static void* native_event_get_userdata(native_event *event)
|
||||
return event->portev_user;
|
||||
}
|
||||
|
||||
#elif defined(HAVE_IOCP)
|
||||
#elif defined(_WIN32)
|
||||
|
||||
|
||||
static TP_file_handle io_poll_create()
|
||||
@ -359,29 +359,8 @@ static TP_file_handle io_poll_create()
|
||||
|
||||
int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
|
||||
{
|
||||
static char c;
|
||||
TP_connection_generic *con= (TP_connection_generic *)opt;
|
||||
OVERLAPPED *overlapped= &con->overlapped;
|
||||
if (con->vio_type == VIO_TYPE_NAMEDPIPE)
|
||||
{
|
||||
if (ReadFile(fd, &c, 0, NULL, overlapped))
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
WSABUF buf;
|
||||
buf.buf= &c;
|
||||
buf.len= 0;
|
||||
DWORD flags=0;
|
||||
|
||||
if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (GetLastError() == ERROR_IO_PENDING)
|
||||
return 0;
|
||||
|
||||
return 1;
|
||||
auto c= (TP_connection_generic *) opt;
|
||||
return (int) c->win_sock.begin_read();
|
||||
}
|
||||
|
||||
|
||||
@ -401,20 +380,33 @@ int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
|
||||
}
|
||||
|
||||
|
||||
int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
|
||||
static void *native_event_get_userdata(native_event *event)
|
||||
{
|
||||
return (void *) event->lpCompletionKey;
|
||||
}
|
||||
|
||||
int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents,
|
||||
int timeout_ms)
|
||||
{
|
||||
ULONG n;
|
||||
BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
|
||||
maxevents, &n, timeout_ms, FALSE);
|
||||
|
||||
return ok ? (int)n : -1;
|
||||
if (!GetQueuedCompletionStatusEx(pollfd, events, maxevents, &n, timeout_ms, FALSE))
|
||||
return -1;
|
||||
|
||||
/* Update win_sock with number of bytes read.*/
|
||||
for (ULONG i= 0; i < n; i++)
|
||||
{
|
||||
auto ev= &events[i];
|
||||
auto c= (TP_connection_generic *) native_event_get_userdata(ev);
|
||||
/* null userdata zero means shutdown (see PostQueuedCompletionStatus() usage*/
|
||||
if (c)
|
||||
{
|
||||
c->win_sock.end_read(ev->dwNumberOfBytesTransferred, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return (int) n;
|
||||
}
|
||||
|
||||
|
||||
static void* native_event_get_userdata(native_event *event)
|
||||
{
|
||||
return (void *)event->lpCompletionKey;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@ -969,7 +961,7 @@ void thread_group_destroy(thread_group_t *thread_group)
|
||||
io_poll_close(thread_group->pollfd);
|
||||
thread_group->pollfd= INVALID_HANDLE_VALUE;
|
||||
}
|
||||
#ifndef HAVE_IOCP
|
||||
#ifndef _WIN32
|
||||
for(int i=0; i < 2; i++)
|
||||
{
|
||||
if(thread_group->shutdown_pipe[i] != -1)
|
||||
@ -1013,7 +1005,7 @@ static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
|
||||
*/
|
||||
static int wake_listener(thread_group_t *thread_group)
|
||||
{
|
||||
#ifndef HAVE_IOCP
|
||||
#ifndef _WIN32
|
||||
if (pipe(thread_group->shutdown_pipe))
|
||||
{
|
||||
return -1;
|
||||
@ -1354,9 +1346,6 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
|
||||
abs_wait_timeout(ULONGLONG_MAX),
|
||||
bound_to_poll_descriptor(false),
|
||||
waiting(false)
|
||||
#ifdef HAVE_IOCP
|
||||
, overlapped()
|
||||
#endif
|
||||
{
|
||||
DBUG_ASSERT(c->vio_type != VIO_CLOSED);
|
||||
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <windows.h>
|
||||
#include "threadpool_winsockets.h"
|
||||
/* AIX may define this, too ?*/
|
||||
#define HAVE_IOCP
|
||||
#endif
|
||||
@ -75,11 +76,11 @@ struct TP_connection_generic :public TP_connection
|
||||
TP_connection_generic(CONNECT* c);
|
||||
~TP_connection_generic();
|
||||
|
||||
virtual int init() { return 0; };
|
||||
virtual void set_io_timeout(int sec);
|
||||
virtual int start_io();
|
||||
virtual void wait_begin(int type);
|
||||
virtual void wait_end();
|
||||
int init() override { return 0; }
|
||||
void set_io_timeout(int sec) override;
|
||||
int start_io() override;
|
||||
void wait_begin(int type) override;
|
||||
void wait_end() override;
|
||||
|
||||
thread_group_t* thread_group;
|
||||
TP_connection_generic* next_in_queue;
|
||||
@ -89,12 +90,13 @@ struct TP_connection_generic :public TP_connection
|
||||
TP_file_handle fd;
|
||||
bool bound_to_poll_descriptor;
|
||||
int waiting;
|
||||
#ifdef HAVE_IOCP
|
||||
OVERLAPPED overlapped;
|
||||
#endif
|
||||
|
||||
#ifdef _WIN32
|
||||
enum_vio_type vio_type;
|
||||
win_aiosocket win_sock{};
|
||||
void init_vio(st_vio *vio) override
|
||||
{ win_sock.init(vio);}
|
||||
#endif
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
@ -31,6 +31,8 @@
|
||||
#include <threadpool.h>
|
||||
#include <windows.h>
|
||||
|
||||
#include "threadpool_winsockets.h"
|
||||
|
||||
/* Log a warning */
|
||||
static void tp_log_warning(const char *msg, const char *fct)
|
||||
{
|
||||
@ -43,8 +45,6 @@ static PTP_POOL pool;
|
||||
static TP_CALLBACK_ENVIRON callback_environ;
|
||||
static DWORD fls;
|
||||
|
||||
static bool skip_completion_port_on_success = false;
|
||||
|
||||
PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ()
|
||||
{
|
||||
return pool? &callback_environ: 0;
|
||||
@ -83,22 +83,21 @@ struct TP_connection_win:public TP_connection
|
||||
public:
|
||||
TP_connection_win(CONNECT*);
|
||||
~TP_connection_win();
|
||||
virtual int init();
|
||||
virtual int start_io();
|
||||
virtual void set_io_timeout(int sec);
|
||||
virtual void wait_begin(int type);
|
||||
virtual void wait_end();
|
||||
|
||||
ulonglong timeout;
|
||||
enum_vio_type vio_type;
|
||||
HANDLE handle;
|
||||
OVERLAPPED overlapped;
|
||||
PTP_CALLBACK_INSTANCE callback_instance;
|
||||
PTP_IO io;
|
||||
PTP_TIMER timer;
|
||||
PTP_WORK work;
|
||||
bool long_callback;
|
||||
int init() override;
|
||||
void init_vio(st_vio *vio) override;
|
||||
int start_io() override;
|
||||
void set_io_timeout(int sec) override;
|
||||
void wait_begin(int type) override;
|
||||
void wait_end() override;
|
||||
|
||||
ulonglong timeout=ULLONG_MAX;
|
||||
OVERLAPPED overlapped{};
|
||||
PTP_CALLBACK_INSTANCE callback_instance{};
|
||||
PTP_IO io{};
|
||||
PTP_TIMER timer{};
|
||||
PTP_WORK work{};
|
||||
bool long_callback{};
|
||||
win_aiosocket sock;
|
||||
};
|
||||
|
||||
struct TP_connection *new_TP_connection(CONNECT *connect)
|
||||
@ -125,120 +124,50 @@ void TP_pool_win::add(TP_connection *c)
|
||||
}
|
||||
}
|
||||
|
||||
#define CHECK_ALLOC_ERROR(op) \
|
||||
do \
|
||||
{ \
|
||||
if (!(op)) \
|
||||
{ \
|
||||
tp_log_warning("Allocation failed", #op); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
TP_connection_win::TP_connection_win(CONNECT *c) :
|
||||
TP_connection(c),
|
||||
timeout(ULONGLONG_MAX),
|
||||
callback_instance(0),
|
||||
io(0),
|
||||
timer(0),
|
||||
work(0)
|
||||
TP_connection(c)
|
||||
{
|
||||
}
|
||||
/* Assign io completion callback */
|
||||
HANDLE h= c->vio_type == VIO_TYPE_NAMEDPIPE ? c->pipe
|
||||
: (HANDLE)mysql_socket_getfd(c->sock);
|
||||
|
||||
#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; }
|
||||
CHECK_ALLOC_ERROR(io=CreateThreadpoolIo(h, io_completion_callback, this, &callback_environ));
|
||||
CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
|
||||
CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
|
||||
}
|
||||
|
||||
int TP_connection_win::init()
|
||||
{
|
||||
|
||||
memset(&overlapped, 0, sizeof(OVERLAPPED));
|
||||
switch ((vio_type = connect->vio_type))
|
||||
{
|
||||
case VIO_TYPE_SSL:
|
||||
case VIO_TYPE_TCPIP:
|
||||
handle= (HANDLE) mysql_socket_getfd(connect->sock);
|
||||
break;
|
||||
case VIO_TYPE_NAMEDPIPE:
|
||||
handle= connect->pipe;
|
||||
break;
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
|
||||
|
||||
/* Performance tweaks (s. MSDN documentation)*/
|
||||
UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE;
|
||||
if (skip_completion_port_on_success)
|
||||
{
|
||||
flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS;
|
||||
}
|
||||
(void)SetFileCompletionNotificationModes(handle, flags);
|
||||
/* Assign io completion callback */
|
||||
CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ));
|
||||
CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ));
|
||||
CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ));
|
||||
return 0;
|
||||
return !io || !timer || !work ;
|
||||
}
|
||||
|
||||
void TP_connection_win::init_vio(st_vio* vio)
|
||||
{
|
||||
sock.init(vio);
|
||||
}
|
||||
|
||||
/*
|
||||
Start asynchronous read
|
||||
*/
|
||||
int TP_connection_win::start_io()
|
||||
{
|
||||
DWORD num_bytes = 0;
|
||||
static char c;
|
||||
WSABUF buf;
|
||||
buf.buf= &c;
|
||||
buf.len= 0;
|
||||
DWORD flags=0;
|
||||
DWORD last_error= 0;
|
||||
|
||||
int retval;
|
||||
StartThreadpoolIo(io);
|
||||
|
||||
if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL)
|
||||
if (sock.begin_read())
|
||||
{
|
||||
/* Start async io (sockets). */
|
||||
if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags,
|
||||
&overlapped, NULL) == 0)
|
||||
{
|
||||
retval= last_error= 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
retval= -1;
|
||||
last_error= WSAGetLastError();
|
||||
}
|
||||
/* Some error occurred */
|
||||
CancelThreadpoolIo(io);
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Start async io (named pipe) */
|
||||
if (ReadFile(handle, &c, 0, &num_bytes,&overlapped))
|
||||
{
|
||||
retval= last_error= 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
retval= -1;
|
||||
last_error= GetLastError();
|
||||
}
|
||||
}
|
||||
|
||||
if (retval == 0 || last_error == ERROR_MORE_DATA)
|
||||
{
|
||||
/*
|
||||
IO successfully finished (synchronously).
|
||||
If skip_completion_port_on_success is set, we need to handle it right
|
||||
here, because completion callback would not be executed by the pool.
|
||||
*/
|
||||
if (skip_completion_port_on_success)
|
||||
{
|
||||
CancelThreadpoolIo(io);
|
||||
io_completion_callback(callback_instance, this, &overlapped, last_error,
|
||||
num_bytes, io);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (last_error == ERROR_IO_PENDING)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Some error occurred */
|
||||
CancelThreadpoolIo(io);
|
||||
return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -305,7 +234,7 @@ void tp_win_callback_prolog()
|
||||
{
|
||||
/* Running in new worker thread*/
|
||||
FlsSetValue(fls, (void *)1);
|
||||
statistic_increment(thread_created, &LOCK_status);
|
||||
thread_created++;
|
||||
tp_stats.num_worker_threads++;
|
||||
my_thread_init();
|
||||
}
|
||||
@ -350,6 +279,10 @@ static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
|
||||
PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io)
|
||||
{
|
||||
TP_connection_win *c= (TP_connection_win *)context;
|
||||
|
||||
/* How many bytes were preread into read buffer */
|
||||
c->sock.end_read((ULONG)nbytes, io_result);
|
||||
|
||||
/*
|
||||
Execute high priority connections immediately.
|
||||
'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback)
|
||||
|
259
sql/threadpool_winsockets.cc
Normal file
259
sql/threadpool_winsockets.cc
Normal file
@ -0,0 +1,259 @@
|
||||
/* Copyright (C) 2012 Monty Program Ab
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
|
||||
*/
|
||||
|
||||
#include <winsock2.h>
|
||||
#include <my_global.h>
|
||||
#include <violite.h>
|
||||
#include "threadpool_winsockets.h"
|
||||
#include <algorithm>
|
||||
#include <vector>
|
||||
#include <mutex>
|
||||
|
||||
/*
|
||||
A cache for IO buffers for asynchronous socket(or named pipe) reads.
|
||||
|
||||
Considerations on Windows : since Windows locks the AIO buffers in physical memory,
|
||||
it is important that these buffers are compactly allocated.
|
||||
We try to to prevent any kinds of memory fragmentation
|
||||
|
||||
A relatively small region (at most 1MB) is allocated, for equally sized smallish(256 bytes)
|
||||
This allow buffers. The region is pagesize-aligned (via VirtualAlloc allocation)
|
||||
|
||||
We use smallish IO buffers, 256 bytes is probably large enough for most of
|
||||
the queries. Larger buffers could have funny effects(thread hogginng)
|
||||
on threadpool scheduling in case client is using protocol pipelining.
|
||||
|
||||
Also note, that even in an unlikely situation where cache runs out of buffers,
|
||||
this does not lead to errors, zero szed reads will be used in WSARecv then.
|
||||
*/
|
||||
|
||||
constexpr size_t READ_BUFSIZ= 256;
|
||||
class AIO_buffer_cache
|
||||
{
|
||||
const size_t ITEM_SIZE= READ_BUFSIZ;
|
||||
|
||||
/** Limit the whole cache to 1MB*/
|
||||
const size_t MAX_SIZE= 1048576;
|
||||
|
||||
/* Allocation base */
|
||||
char *m_base= 0;
|
||||
|
||||
/* "Free list" with LIFO policy */
|
||||
std::vector<char *> m_cache;
|
||||
std::mutex m_mtx;
|
||||
size_t m_elements=0;
|
||||
|
||||
public:
|
||||
void set_size(size_t n_items);
|
||||
char *acquire_buffer();
|
||||
void release_buffer(char *v);
|
||||
void clear();
|
||||
~AIO_buffer_cache();
|
||||
};
|
||||
|
||||
|
||||
void AIO_buffer_cache::set_size(size_t n_items)
|
||||
{
|
||||
DBUG_ASSERT(!m_base);
|
||||
m_elements= std::min(n_items, MAX_SIZE / ITEM_SIZE);
|
||||
auto sz= m_elements * ITEM_SIZE;
|
||||
|
||||
m_base=
|
||||
(char *) VirtualAlloc(0, sz, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
|
||||
if (!m_base)
|
||||
{
|
||||
m_elements= 0;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Try to help memory manager here, by prelocking region in memory*/
|
||||
(void) VirtualLock(m_base, sz);
|
||||
|
||||
m_cache.reserve(m_elements);
|
||||
for (ssize_t i= m_elements - 1; i >= 0 ; i--)
|
||||
m_cache.push_back(m_base + i * ITEM_SIZE);
|
||||
}
|
||||
|
||||
/*
|
||||
Returns a buffer, or NULL if no free buffers.
|
||||
|
||||
LIFO policy is implemented, so we do not touch too many
|
||||
pages (no std::stack though)
|
||||
*/
|
||||
char *AIO_buffer_cache::acquire_buffer()
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(m_mtx);
|
||||
if (m_cache.empty())
|
||||
return nullptr;
|
||||
auto p= m_cache.back();
|
||||
m_cache.pop_back();
|
||||
return p;
|
||||
}
|
||||
|
||||
void AIO_buffer_cache::release_buffer(char *v)
|
||||
{
|
||||
std::unique_lock<std::mutex> lk(m_mtx);
|
||||
m_cache.push_back(v);
|
||||
}
|
||||
|
||||
void AIO_buffer_cache::clear()
|
||||
{
|
||||
if (!m_base)
|
||||
return;
|
||||
|
||||
/* Check that all items are returned to the cache. */
|
||||
DBUG_ASSERT(m_cache.size() == m_elements);
|
||||
VirtualFree(m_base, 0, MEM_RELEASE);
|
||||
m_cache.clear();
|
||||
m_base= 0;
|
||||
m_elements= 0;
|
||||
}
|
||||
|
||||
AIO_buffer_cache::~AIO_buffer_cache() { clear(); }
|
||||
|
||||
/* Global variable for the cache buffers.*/
|
||||
AIO_buffer_cache read_buffers;
|
||||
|
||||
win_aiosocket::~win_aiosocket()
|
||||
{
|
||||
if (m_buf_ptr)
|
||||
read_buffers.release_buffer(m_buf_ptr);
|
||||
}
|
||||
|
||||
|
||||
/** Return number of unread bytes.*/
|
||||
size_t win_aiosocket::buffer_remaining()
|
||||
{
|
||||
return m_buf_datalen - m_buf_off;
|
||||
}
|
||||
|
||||
static my_bool my_vio_has_data(st_vio *vio)
|
||||
{
|
||||
auto sock= (win_aiosocket *) vio->tp_ctx;
|
||||
return sock->buffer_remaining() || sock->m_orig_vio_has_data(vio);
|
||||
}
|
||||
|
||||
/*
|
||||
(Half-)buffered read.
|
||||
|
||||
The buffer is filled once, by completion of the async IO.
|
||||
|
||||
We do not refill the buffer once it is read off,
|
||||
does not make sense.
|
||||
*/
|
||||
static size_t my_vio_read(st_vio *vio, uchar *dest, size_t sz)
|
||||
{
|
||||
auto sock= (win_aiosocket *) vio->tp_ctx;
|
||||
DBUG_ASSERT(sock);
|
||||
|
||||
auto nbytes= std::min(sock->buffer_remaining(), sz);
|
||||
|
||||
if (nbytes > 0)
|
||||
{
|
||||
/* Copy to output, adjust the offset.*/
|
||||
memcpy(dest, sock->m_buf_ptr + sock->m_buf_off, nbytes);
|
||||
sock->m_buf_off += nbytes;
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
return sock->m_orig_vio_read(vio, dest, sz);
|
||||
}
|
||||
|
||||
DWORD win_aiosocket::begin_read()
|
||||
{
|
||||
DWORD err = ERROR_SUCCESS;
|
||||
static char c;
|
||||
WSABUF buf;
|
||||
|
||||
DBUG_ASSERT(!buffer_remaining());
|
||||
|
||||
/*
|
||||
If there is no internal buffer to store data,
|
||||
we do zero size read, but still need a valid
|
||||
pointer for the buffer parameter.
|
||||
*/
|
||||
if (m_buf_ptr)
|
||||
buf= {(ULONG)READ_BUFSIZ, m_buf_ptr};
|
||||
else
|
||||
buf= {0, &c};
|
||||
|
||||
|
||||
if (!m_is_pipe)
|
||||
{
|
||||
/* Do async io (sockets). */
|
||||
DWORD flags= 0;
|
||||
if (WSARecv((SOCKET) m_handle, &buf, 1, 0, &flags, &m_overlapped, NULL))
|
||||
err= WSAGetLastError();
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Do async read (named pipe) */
|
||||
if (ReadFile(m_handle, buf.buf, buf.len, 0, &m_overlapped))
|
||||
err= GetLastError();
|
||||
}
|
||||
|
||||
if (!err || err == ERROR_IO_PENDING)
|
||||
return 0;
|
||||
return err;
|
||||
}
|
||||
|
||||
void win_aiosocket::end_read(ULONG nbytes, DWORD err)
|
||||
{
|
||||
DBUG_ASSERT(!buffer_remaining());
|
||||
DBUG_ASSERT(!nbytes || m_buf_ptr);
|
||||
m_buf_off= 0;
|
||||
m_buf_datalen= nbytes;
|
||||
}
|
||||
|
||||
void win_aiosocket::init(Vio *vio)
|
||||
{
|
||||
m_is_pipe= vio->type == VIO_TYPE_NAMEDPIPE;
|
||||
m_handle=
|
||||
m_is_pipe ? vio->hPipe : (HANDLE) mysql_socket_getfd(vio->mysql_socket);
|
||||
|
||||
SetFileCompletionNotificationModes(m_handle, FILE_SKIP_SET_EVENT_ON_HANDLE);
|
||||
if (vio->type == VIO_TYPE_SSL)
|
||||
{
|
||||
/*
|
||||
TODO : This requires fixing viossl to call our manipulated VIO
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(m_buf_ptr = read_buffers.acquire_buffer()))
|
||||
{
|
||||
/* Ran out of buffers, that's fine.*/
|
||||
return;
|
||||
}
|
||||
|
||||
vio->tp_ctx= this;
|
||||
|
||||
m_orig_vio_has_data= vio->has_data;
|
||||
vio->has_data= my_vio_has_data;
|
||||
|
||||
m_orig_vio_read= vio->read;
|
||||
vio->read= my_vio_read;
|
||||
}
|
||||
|
||||
void init_win_aio_buffers(unsigned int n_buffers)
|
||||
{
|
||||
read_buffers.set_size(n_buffers);
|
||||
}
|
||||
|
||||
extern void destroy_win_aio_buffers()
|
||||
{
|
||||
read_buffers.clear();
|
||||
}
|
80
sql/threadpool_winsockets.h
Normal file
80
sql/threadpool_winsockets.h
Normal file
@ -0,0 +1,80 @@
|
||||
/* Copyright (C) 2020 Monty Program Ab
|
||||
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
|
||||
*/
|
||||
#pragma once
|
||||
|
||||
#include <WinSock2.h>
|
||||
#include <windows.h>
|
||||
|
||||
struct st_vio;
|
||||
|
||||
struct win_aiosocket
|
||||
{
|
||||
/** OVERLAPPED is needed by all Windows AIO*/
|
||||
OVERLAPPED m_overlapped{};
|
||||
/** Handle to pipe, or socket */
|
||||
HANDLE m_handle{};
|
||||
/** Whether the m_handle refers to pipe*/
|
||||
bool m_is_pipe{};
|
||||
|
||||
/* Read buffer handling */
|
||||
|
||||
/** Pointer to buffer of size READ_BUFSIZ. Can be NULL.*/
|
||||
char *m_buf_ptr{};
|
||||
/** Offset to current buffer position*/
|
||||
size_t m_buf_off{};
|
||||
/** Size of valid data in the buffer*/
|
||||
size_t m_buf_datalen{};
|
||||
|
||||
/* Vio handling */
|
||||
/** Pointer to original vio->vio_read/vio->has_data function */
|
||||
size_t (*m_orig_vio_read)(st_vio *, unsigned char *, size_t){};
|
||||
char (*m_orig_vio_has_data)(st_vio *){};
|
||||
|
||||
|
||||
|
||||
/**
|
||||
Begins asynchronnous reading from socket/pipe.
|
||||
On IO completion, pre-read some bytes into internal buffer
|
||||
*/
|
||||
DWORD begin_read();
|
||||
|
||||
/**
|
||||
Update number of bytes returned, and IO error status
|
||||
|
||||
Should be called right after IO is completed
|
||||
GetQueuedCompletionStatus() , or threadpool IO completion
|
||||
callback would return nbytes and the error.
|
||||
|
||||
Sets the valid data length in the read buffer.
|
||||
*/
|
||||
void end_read(ULONG nbytes, DWORD err);
|
||||
|
||||
/**
|
||||
Override VIO routines with ours, accounting for
|
||||
one-shot buffering.
|
||||
*/
|
||||
void init(st_vio *vio);
|
||||
|
||||
/** Return number of unread bytes.*/
|
||||
size_t buffer_remaining();
|
||||
|
||||
/* Frees the read buffer.*/
|
||||
~win_aiosocket();
|
||||
};
|
||||
|
||||
/* Functions related to IO buffers caches.*/
|
||||
extern void init_win_aio_buffers(unsigned int n_buffers);
|
||||
extern void destroy_win_aio_buffers();
|
Loading…
x
Reference in New Issue
Block a user