MDEV-36234 Cleanup, refactor Innodb's use of tpool AIO, move logic to tpool
This commit refactors InnoDB to remove its dependency on tpool internals logic and significantly reduce platform-specific #ifdefs (HAVE_URING, _WIN32,__linux__) Key changes: - Encapsulated AIO support checks into tpool::is_aio_supported(). - Centralized Linux AIO implementation choice and fallback within tpool::create_linux_aio(), removing complex #ifdef mazes from os0file.cc. - Linux-specific AIO source files are now: - aio_libaio.cc: Exports create_libaio(). Previously, this logic was in confusingly named aio_linux.cc (confusingly since uring introduction) - aio_liburing.cc: Exports create_liburing(), and not create_linux_io() - aio_linux.cc: Exports create_linux_aio(), handles Linux AIO implementation selection and fallbacks. - Simplified/modernized CMake build using target_sources() and target_compile_definitions(), all available since CMake 2.8.12 With this change, there is no need to include ${CMAKE_SOURCE_DIR}/tpool or add TPOOL_DEFINES flags anymore, target_link_libraries(lib tpool) does all that. - LINUX_NATIVE_AIO preprocessor constant is renamed to HAVE_LIBAIO, analog to existing HAVE_URING. Ever since we got a second Linux native aio with uring implementation, this name because very confusing.
This commit is contained in:
parent
585531d6c0
commit
1c41e1424d
@ -2544,15 +2544,7 @@ static bool innodb_init_param()
|
||||
|
||||
ut_ad(DATA_MYSQL_BINARY_CHARSET_COLL == my_charset_bin.number);
|
||||
|
||||
#if defined(_WIN32) || defined(LINUX_NATIVE_AIO) || defined(HAVE_URING)
|
||||
srv_use_native_aio = TRUE;
|
||||
#else
|
||||
/* Currently native AIO is supported only on windows and linux
|
||||
and that also when the support is compiled in. In all other
|
||||
cases, we ignore the setting of innodb_use_native_aio. */
|
||||
srv_use_native_aio = FALSE;
|
||||
|
||||
#endif
|
||||
srv_use_native_aio= tpool::supports_native_aio();
|
||||
|
||||
/* Assign the default value to srv_undo_dir if it's not specified, as
|
||||
my_getopt does not support default values for string options. We also
|
||||
@ -2587,9 +2579,6 @@ static bool innodb_init_param()
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
srv_use_native_aio = TRUE;
|
||||
#endif
|
||||
return false;
|
||||
|
||||
error:
|
||||
|
@ -21,7 +21,6 @@ INCLUDE_DIRECTORIES(
|
||||
${CMAKE_SOURCE_DIR}/include
|
||||
${CMAKE_SOURCE_DIR}/libmysqld
|
||||
${CMAKE_SOURCE_DIR}/sql
|
||||
${CMAKE_SOURCE_DIR}/tpool
|
||||
${CMAKE_BINARY_DIR}/sql
|
||||
${PCRE_INCLUDE_DIRS}
|
||||
${LIBFMT_INCLUDE_DIR}
|
||||
|
@ -59,7 +59,6 @@ ${PCRE_INCLUDE_DIRS}
|
||||
${ZLIB_INCLUDE_DIRS}
|
||||
${SSL_INCLUDE_DIRS}
|
||||
${CMAKE_BINARY_DIR}/sql
|
||||
${CMAKE_SOURCE_DIR}/tpool
|
||||
)
|
||||
|
||||
ADD_CUSTOM_COMMAND(
|
||||
|
@ -131,7 +131,6 @@ ENDIF()
|
||||
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/innobase/include
|
||||
${CMAKE_SOURCE_DIR}/storage/innobase/handler
|
||||
${CMAKE_SOURCE_DIR}/libbinlogevents/include)
|
||||
INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/tpool)
|
||||
|
||||
SET(INNOBASE_SOURCES
|
||||
btr/btr0btr.cc
|
||||
@ -451,8 +450,8 @@ IF(NOT TARGET innobase)
|
||||
RETURN()
|
||||
ENDIF()
|
||||
|
||||
ADD_DEFINITIONS(${SSL_DEFINES} ${TPOOL_DEFINES})
|
||||
|
||||
ADD_DEFINITIONS(${SSL_DEFINES})
|
||||
# A GCC bug causes crash when compiling these files on ARM64 with -O1+
|
||||
# Compile them with -O0 as a workaround.
|
||||
IF(CMAKE_COMPILER_IS_GNUCXX AND CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64"
|
||||
|
@ -4078,12 +4078,8 @@ static int innodb_init_params()
|
||||
log_sys.log_buffered= true;
|
||||
#endif
|
||||
|
||||
#if !defined LINUX_NATIVE_AIO && !defined HAVE_URING && !defined _WIN32
|
||||
/* Currently native AIO is supported only on windows and linux
|
||||
and that also when the support is compiled in. In all other
|
||||
cases, we ignore the setting of innodb_use_native_aio. */
|
||||
if (!tpool::supports_native_aio())
|
||||
srv_use_native_aio= FALSE;
|
||||
#endif
|
||||
|
||||
#ifdef _WIN32
|
||||
switch (srv_file_flush_method) {
|
||||
|
@ -3089,29 +3089,18 @@ int os_aio_init() noexcept
|
||||
int max_events= max_read_events + max_write_events;
|
||||
int ret= 1;
|
||||
|
||||
#if defined __linux__ && (defined HAVE_URING || defined LINUX_NATIVE_AIO)
|
||||
if (srv_use_native_aio)
|
||||
{
|
||||
switch (srv_linux_aio_method) {
|
||||
case SRV_LINUX_AIO_AUTO:
|
||||
case SRV_LINUX_AIO_IO_URING:
|
||||
# ifdef HAVE_URING
|
||||
tpool::aio_implementation aio_impl= tpool::OS_IO_DEFAULT;
|
||||
#ifdef __linux__
|
||||
static_assert(SRV_LINUX_AIO_IO_URING==(srv_linux_aio_t)tpool::OS_IO_URING);
|
||||
static_assert(SRV_LINUX_AIO_LIBAIO==(srv_linux_aio_t) tpool::OS_IO_LIBAIO);
|
||||
static_assert(SRV_LINUX_AIO_AUTO==(srv_linux_aio_t) tpool::OS_IO_DEFAULT);
|
||||
aio_impl=(tpool::aio_implementation) srv_linux_aio_method;
|
||||
#endif
|
||||
|
||||
ret= srv_thread_pool->configure_aio(srv_use_native_aio, max_events,
|
||||
tpool::OS_IO_URING);
|
||||
# endif
|
||||
# ifdef LINUX_NATIVE_AIO
|
||||
# ifdef HAVE_URING
|
||||
if (ret && srv_linux_aio_method == SRV_LINUX_AIO_AUTO)
|
||||
sql_print_warning("InnoDB: io_uring failed: falling back to libaio");
|
||||
else
|
||||
break;
|
||||
/* fallthough */
|
||||
# endif /* HAVE_URING */
|
||||
case SRV_LINUX_AIO_LIBAIO:
|
||||
ret= srv_thread_pool->configure_aio(srv_use_native_aio, max_events,
|
||||
tpool::OS_AIO);
|
||||
# endif
|
||||
}
|
||||
aio_impl);
|
||||
if (ret)
|
||||
{
|
||||
srv_use_native_aio= false;
|
||||
@ -3122,13 +3111,9 @@ int os_aio_init() noexcept
|
||||
sql_print_information("InnoDB: Using %s", srv_thread_pool
|
||||
->get_aio_implementation());
|
||||
}
|
||||
#endif /* linux */
|
||||
|
||||
if (ret)
|
||||
ret= srv_thread_pool->configure_aio(srv_use_native_aio,
|
||||
max_events,
|
||||
tpool::OS_DEFAULT);
|
||||
|
||||
ret= srv_thread_pool->configure_aio(false, max_events,
|
||||
tpool::OS_IO_DEFAULT);
|
||||
if (!ret)
|
||||
{
|
||||
read_slots= new io_slots(max_read_events, srv_n_read_io_threads);
|
||||
|
@ -1,7 +1,21 @@
|
||||
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/include)
|
||||
ADD_LIBRARY(tpool STATIC
|
||||
aio_simulated.cc
|
||||
tpool_structs.h
|
||||
CMakeLists.txt
|
||||
tpool.h
|
||||
tpool_generic.cc
|
||||
task_group.cc
|
||||
task.cc
|
||||
wait_notification.cc
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(tpool PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}"
|
||||
PRIVATE ${PROJECT_SOURCE_DIR}/include)
|
||||
|
||||
IF(WIN32)
|
||||
SET(EXTRA_SOURCES tpool_win.cc aio_win.cc)
|
||||
TARGET_SOURCES(tpool PRIVATE tpool_win.cc aio_win.cc)
|
||||
ELSEIF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
|
||||
TARGET_SOURCES(tpool PRIVATE aio_linux.cc)
|
||||
OPTION(WITH_URING "Require that io_uring be used" OFF)
|
||||
OPTION(WITH_LIBAIO "Require that libaio is used" OFF)
|
||||
IF(WITH_URING)
|
||||
@ -10,15 +24,13 @@ ELSEIF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
|
||||
IF(WITH_LIBAIO)
|
||||
SET(LIBAIO_REQUIRED REQUIRED)
|
||||
ENDIF()
|
||||
SET(EXTRA_SOURCES)
|
||||
FIND_PACKAGE(URING QUIET ${URING_REQUIRED})
|
||||
IF(URING_FOUND)
|
||||
SET(URING_FOUND ${URING_FOUND} PARENT_SCOPE)
|
||||
SET(TPOOL_DEFINES "-DHAVE_URING")
|
||||
ADD_DEFINITIONS(-DHAVE_URING)
|
||||
LINK_LIBRARIES(${URING_LIBRARIES})
|
||||
INCLUDE_DIRECTORIES(${URING_INCLUDE_DIRS})
|
||||
SET(EXTRA_SOURCES aio_liburing.cc)
|
||||
TARGET_COMPILE_DEFINITIONS(tpool PUBLIC "-DHAVE_URING")
|
||||
TARGET_LINK_LIBRARIES(tpool PRIVATE ${URING_LIBRARIES})
|
||||
TARGET_INCLUDE_DIRECTORIES(tpool PUBLIC ${URING_INCLUDE_DIRS})
|
||||
TARGET_SOURCES(tpool PRIVATE aio_liburing.cc)
|
||||
SET(CMAKE_REQUIRED_INCLUDES_SAVE ${CMAKE_REQUIRED_INCLUDES})
|
||||
SET(CMAKE_REQUIRED_LIBRARIES_SAVE ${CMAKE_REQUIRED_LIBRARIES})
|
||||
SET(CMAKE_REQUIRED_INCLUDES ${URING_INCLUDE_DIRS})
|
||||
@ -30,29 +42,16 @@ ELSEIF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
|
||||
SET_SOURCE_FILES_PROPERTIES(aio_liburing.cc PROPERTIES COMPILE_FLAGS "-DHAVE_IO_URING_MLOCK_SIZE")
|
||||
ENDIF()
|
||||
ENDIF()
|
||||
|
||||
FIND_PACKAGE(LIBAIO QUIET ${LIBAIO_REQUIRED})
|
||||
IF(LIBAIO_FOUND)
|
||||
SET(TPOOL_DEFINES ${TPOOL_DEFINES} "-DLINUX_NATIVE_AIO")
|
||||
ADD_DEFINITIONS(-DLINUX_NATIVE_AIO)
|
||||
INCLUDE_DIRECTORIES(${LIBAIO_INCLUDE_DIRS})
|
||||
LINK_LIBRARIES(${LIBAIO_LIBRARIES})
|
||||
SET(EXTRA_SOURCES ${EXTRA_SOURCES} aio_linux.cc)
|
||||
TARGET_COMPILE_DEFINITIONS(tpool PUBLIC "-DHAVE_LIBAIO")
|
||||
TARGET_INCLUDE_DIRECTORIES(tpool PUBLIC ${LIBAIO_INCLUDE_DIRS})
|
||||
TARGET_LINK_LIBRARIES(tpool PRIVATE ${LIBAIO_LIBRARIES})
|
||||
TARGET_SOURCES(tpool PRIVATE aio_libaio.cc)
|
||||
ENDIF()
|
||||
SET(TPOOL_DEFINES ${TPOOL_DEFINES} PARENT_SCOPE)
|
||||
ENDIF()
|
||||
|
||||
ADD_LIBRARY(tpool STATIC
|
||||
aio_simulated.cc
|
||||
tpool_structs.h
|
||||
CMakeLists.txt
|
||||
tpool.h
|
||||
tpool_generic.cc
|
||||
task_group.cc
|
||||
task.cc
|
||||
wait_notification.cc
|
||||
${EXTRA_SOURCES}
|
||||
)
|
||||
|
||||
IF(URING_FOUND)
|
||||
ADD_DEPENDENCIES(tpool GenError)
|
||||
ENDIF()
|
||||
|
193
tpool/aio_libaio.cc
Normal file
193
tpool/aio_libaio.cc
Normal file
@ -0,0 +1,193 @@
|
||||
/* Copyright (C) 2019, 2020, MariaDB Corporation.
|
||||
|
||||
This program is free software; you can redistribute itand /or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; version 2 of the License.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
||||
|
||||
#include "tpool.h"
|
||||
#include <thread>
|
||||
#include <sys/syscall.h>
|
||||
#include <libaio.h>
|
||||
|
||||
/**
|
||||
Invoke the io_getevents() system call, without timeout parameter.
|
||||
|
||||
@param ctx context from io_setup()
|
||||
@param min_nr minimum number of completion events to wait for
|
||||
@param nr maximum number of completion events to collect
|
||||
@param ev the collected events
|
||||
|
||||
In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
|
||||
the io_getevents() implementation in libaio was "optimized" so that it
|
||||
would elide the system call when there are no outstanding requests
|
||||
and a timeout was specified.
|
||||
|
||||
The libaio code for dereferencing ctx would occasionally trigger
|
||||
SIGSEGV if io_destroy() was concurrently invoked from another thread.
|
||||
Hence, we have to use the raw system call.
|
||||
|
||||
WHY are we doing this at all?
|
||||
Because we want io_destroy() from another thread to interrupt io_getevents().
|
||||
|
||||
And, WHY do we want io_destroy() from another thread to interrupt
|
||||
io_getevents()?
|
||||
|
||||
Because there is no documented, libaio-friendly and race-condition-free way to
|
||||
interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
|
||||
for us so far.
|
||||
|
||||
Historical note : in the past, we used io_getevents with timeouts. We'd wake
|
||||
up periodically, check for shutdown flag, return from the main routine.
|
||||
This was admittedly safer, yet it did cost periodic wakeups, which we are not
|
||||
willing to do anymore.
|
||||
|
||||
@note we also rely on the undocumented property, that io_destroy(ctx)
|
||||
will make this version of io_getevents return EINVAL.
|
||||
*/
|
||||
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
|
||||
noexcept
|
||||
{
|
||||
int saved_errno= errno;
|
||||
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
|
||||
min_nr, nr, ev, 0);
|
||||
if (ret < 0)
|
||||
{
|
||||
ret= -errno;
|
||||
errno= saved_errno;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Linux AIO implementation, based on native AIO.
|
||||
Needs libaio.h and -laio at the compile time.
|
||||
|
||||
io_submit() is used to submit async IO.
|
||||
|
||||
A single thread will collect the completion notification
|
||||
with io_getevents() and forward io completion callback to
|
||||
the worker threadpool.
|
||||
*/
|
||||
namespace
|
||||
{
|
||||
using namespace tpool;
|
||||
|
||||
class aio_libaio final : public aio
|
||||
{
|
||||
thread_pool *m_pool;
|
||||
io_context_t m_io_ctx;
|
||||
std::thread m_getevent_thread;
|
||||
static std::atomic<bool> shutdown_in_progress;
|
||||
|
||||
static void getevent_thread_routine(aio_libaio *aio)
|
||||
{
|
||||
/*
|
||||
We collect events in small batches to hopefully reduce the
|
||||
number of system calls.
|
||||
*/
|
||||
constexpr unsigned MAX_EVENTS= 256;
|
||||
|
||||
aio->m_pool->m_worker_init_callback();
|
||||
io_event events[MAX_EVENTS];
|
||||
for (;;)
|
||||
{
|
||||
switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
|
||||
case -EINTR:
|
||||
continue;
|
||||
case -EINVAL:
|
||||
if (shutdown_in_progress)
|
||||
goto end;
|
||||
/* fall through */
|
||||
default:
|
||||
if (ret < 0)
|
||||
{
|
||||
fprintf(stderr, "io_getevents returned %d\n", ret);
|
||||
abort();
|
||||
goto end;
|
||||
}
|
||||
for (int i= 0; i < ret; i++)
|
||||
{
|
||||
const io_event &event= events[i];
|
||||
aiocb *iocb= reinterpret_cast<aiocb*>(event.obj);
|
||||
if (static_cast<int>(event.res) < 0)
|
||||
{
|
||||
iocb->m_err= -event.res;
|
||||
iocb->m_ret_len= 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
iocb->m_ret_len= event.res;
|
||||
iocb->m_err= 0;
|
||||
finish_synchronous(iocb);
|
||||
}
|
||||
iocb->m_internal_task.m_func= iocb->m_callback;
|
||||
iocb->m_internal_task.m_arg= iocb;
|
||||
iocb->m_internal_task.m_group= iocb->m_group;
|
||||
aio->m_pool->submit_task(&iocb->m_internal_task);
|
||||
}
|
||||
}
|
||||
}
|
||||
end:
|
||||
aio->m_pool->m_worker_destroy_callback();
|
||||
}
|
||||
|
||||
public:
|
||||
aio_libaio(io_context_t ctx, thread_pool *pool)
|
||||
: m_pool(pool), m_io_ctx(ctx),
|
||||
m_getevent_thread(getevent_thread_routine, this)
|
||||
{
|
||||
}
|
||||
|
||||
~aio_libaio()
|
||||
{
|
||||
shutdown_in_progress= true;
|
||||
io_destroy(m_io_ctx);
|
||||
m_getevent_thread.join();
|
||||
shutdown_in_progress= false;
|
||||
}
|
||||
|
||||
int submit_io(aiocb *cb) override
|
||||
{
|
||||
io_prep_pread(&cb->m_iocb, cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
||||
if (cb->m_opcode != aio_opcode::AIO_PREAD)
|
||||
cb->m_iocb.aio_lio_opcode= IO_CMD_PWRITE;
|
||||
iocb *icb= &cb->m_iocb;
|
||||
int ret= io_submit(m_io_ctx, 1, &icb);
|
||||
if (ret == 1)
|
||||
return 0;
|
||||
errno= -ret;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int bind(native_file_handle&) override { return 0; }
|
||||
int unbind(const native_file_handle&) override { return 0; }
|
||||
const char *get_implementation() const override { return "Linux native AIO"; };
|
||||
};
|
||||
|
||||
std::atomic<bool> aio_libaio::shutdown_in_progress;
|
||||
}
|
||||
|
||||
namespace tpool
|
||||
{
|
||||
aio *create_libaio(thread_pool *pool, int max_io)
|
||||
{
|
||||
io_context_t ctx;
|
||||
memset(&ctx, 0, sizeof ctx);
|
||||
if (int ret= io_setup(max_io, &ctx))
|
||||
{
|
||||
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
|
||||
return nullptr;
|
||||
}
|
||||
return new aio_libaio(ctx, pool);
|
||||
}
|
||||
}
|
@ -201,30 +201,13 @@ private:
|
||||
|
||||
namespace tpool
|
||||
{
|
||||
|
||||
#ifdef LINUX_NATIVE_AIO
|
||||
aio *create_libaio(thread_pool* tp, int max_io);
|
||||
#endif
|
||||
|
||||
aio *create_linux_aio(thread_pool *pool, int max_aio,
|
||||
aio_implementation implementation)
|
||||
aio *create_uring(thread_pool *pool, int max_aio)
|
||||
{
|
||||
switch (implementation) {
|
||||
case OS_DEFAULT:
|
||||
case OS_IO_URING:
|
||||
try {
|
||||
return new aio_uring(pool, max_aio);
|
||||
} catch (std::runtime_error&) {
|
||||
return nullptr;
|
||||
}
|
||||
break;
|
||||
#ifdef LINUX_NATIVE_AIO
|
||||
case OS_AIO:
|
||||
return create_libaio(pool, max_aio);
|
||||
#endif
|
||||
default:
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace tpool
|
||||
|
@ -1,4 +1,4 @@
|
||||
/* Copyright (C) 2019, 2020, MariaDB Corporation.
|
||||
/* Copyright (C) 2025 MariaDB Corporation.
|
||||
|
||||
This program is free software; you can redistribute itand /or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
@ -13,186 +13,62 @@ You should have received a copy of the GNU General Public License
|
||||
along with this program; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
||||
|
||||
#include "tpool.h"
|
||||
#include <thread>
|
||||
#include <sys/syscall.h>
|
||||
|
||||
/**
|
||||
Invoke the io_getevents() system call, without timeout parameter.
|
||||
|
||||
@param ctx context from io_setup()
|
||||
@param min_nr minimum number of completion events to wait for
|
||||
@param nr maximum number of completion events to collect
|
||||
@param ev the collected events
|
||||
|
||||
In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
|
||||
the io_getevents() implementation in libaio was "optimized" so that it
|
||||
would elide the system call when there are no outstanding requests
|
||||
and a timeout was specified.
|
||||
|
||||
The libaio code for dereferencing ctx would occasionally trigger
|
||||
SIGSEGV if io_destroy() was concurrently invoked from another thread.
|
||||
Hence, we have to use the raw system call.
|
||||
|
||||
WHY are we doing this at all?
|
||||
Because we want io_destroy() from another thread to interrupt io_getevents().
|
||||
|
||||
And, WHY do we want io_destroy() from another thread to interrupt
|
||||
io_getevents()?
|
||||
|
||||
Because there is no documented, libaio-friendly and race-condition-free way to
|
||||
interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
|
||||
for us so far.
|
||||
|
||||
Historical note : in the past, we used io_getevents with timeouts. We'd wake
|
||||
up periodically, check for shutdown flag, return from the main routine.
|
||||
This was admittedly safer, yet it did cost periodic wakeups, which we are not
|
||||
willing to do anymore.
|
||||
|
||||
@note we also rely on the undocumented property, that io_destroy(ctx)
|
||||
will make this version of io_getevents return EINVAL.
|
||||
*/
|
||||
static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
|
||||
noexcept
|
||||
{
|
||||
int saved_errno= errno;
|
||||
int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
|
||||
min_nr, nr, ev, 0);
|
||||
if (ret < 0)
|
||||
{
|
||||
ret= -errno;
|
||||
errno= saved_errno;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Linux AIO implementation, based on native AIO.
|
||||
Needs libaio.h and -laio at the compile time.
|
||||
|
||||
io_submit() is used to submit async IO.
|
||||
|
||||
A single thread will collect the completion notification
|
||||
with io_getevents() and forward io completion callback to
|
||||
the worker threadpool.
|
||||
*/
|
||||
namespace
|
||||
{
|
||||
using namespace tpool;
|
||||
|
||||
class aio_linux final : public aio
|
||||
{
|
||||
thread_pool *m_pool;
|
||||
io_context_t m_io_ctx;
|
||||
std::thread m_getevent_thread;
|
||||
static std::atomic<bool> shutdown_in_progress;
|
||||
|
||||
static void getevent_thread_routine(aio_linux *aio)
|
||||
{
|
||||
/*
|
||||
We collect events in small batches to hopefully reduce the
|
||||
number of system calls.
|
||||
This file exports create_linux_aio() function which is used to create
|
||||
an asynchronous IO implementation for Linux (currently either libaio or
|
||||
uring).
|
||||
*/
|
||||
constexpr unsigned MAX_EVENTS= 256;
|
||||
|
||||
aio->m_pool->m_worker_init_callback();
|
||||
io_event events[MAX_EVENTS];
|
||||
for (;;)
|
||||
{
|
||||
switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
|
||||
case -EINTR:
|
||||
continue;
|
||||
case -EINVAL:
|
||||
if (shutdown_in_progress)
|
||||
goto end;
|
||||
/* fall through */
|
||||
default:
|
||||
if (ret < 0)
|
||||
{
|
||||
fprintf(stderr, "io_getevents returned %d\n", ret);
|
||||
abort();
|
||||
goto end;
|
||||
}
|
||||
for (int i= 0; i < ret; i++)
|
||||
{
|
||||
const io_event &event= events[i];
|
||||
aiocb *iocb= reinterpret_cast<aiocb*>(event.obj);
|
||||
if (static_cast<int>(event.res) < 0)
|
||||
{
|
||||
iocb->m_err= -event.res;
|
||||
iocb->m_ret_len= 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
iocb->m_ret_len= event.res;
|
||||
iocb->m_err= 0;
|
||||
finish_synchronous(iocb);
|
||||
}
|
||||
iocb->m_internal_task.m_func= iocb->m_callback;
|
||||
iocb->m_internal_task.m_arg= iocb;
|
||||
iocb->m_internal_task.m_group= iocb->m_group;
|
||||
aio->m_pool->submit_task(&iocb->m_internal_task);
|
||||
}
|
||||
}
|
||||
}
|
||||
end:
|
||||
aio->m_pool->m_worker_destroy_callback();
|
||||
}
|
||||
|
||||
public:
|
||||
aio_linux(io_context_t ctx, thread_pool *pool)
|
||||
: m_pool(pool), m_io_ctx(ctx),
|
||||
m_getevent_thread(getevent_thread_routine, this)
|
||||
{
|
||||
}
|
||||
|
||||
~aio_linux()
|
||||
{
|
||||
shutdown_in_progress= true;
|
||||
io_destroy(m_io_ctx);
|
||||
m_getevent_thread.join();
|
||||
shutdown_in_progress= false;
|
||||
}
|
||||
|
||||
int submit_io(aiocb *cb) override
|
||||
{
|
||||
io_prep_pread(&cb->m_iocb, cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
|
||||
if (cb->m_opcode != aio_opcode::AIO_PREAD)
|
||||
cb->m_iocb.aio_lio_opcode= IO_CMD_PWRITE;
|
||||
iocb *icb= &cb->m_iocb;
|
||||
int ret= io_submit(m_io_ctx, 1, &icb);
|
||||
if (ret == 1)
|
||||
return 0;
|
||||
errno= -ret;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int bind(native_file_handle&) override { return 0; }
|
||||
int unbind(const native_file_handle&) override { return 0; }
|
||||
const char *get_implementation() const override { return "Linux native AIO"; };
|
||||
};
|
||||
|
||||
std::atomic<bool> aio_linux::shutdown_in_progress;
|
||||
}
|
||||
|
||||
#include "tpool.h"
|
||||
#include <stdio.h>
|
||||
namespace tpool
|
||||
{
|
||||
|
||||
#ifdef HAVE_URING
|
||||
aio *create_libaio(thread_pool *pool, int max_io)
|
||||
#else
|
||||
aio *create_linux_aio(thread_pool *pool, int max_io, aio_implementation)
|
||||
// Forward declarations of aio implementations
|
||||
#ifdef HAVE_LIBAIO
|
||||
// defined in aio_libaio.cc
|
||||
aio *create_libaio(thread_pool *pool, int max_io);
|
||||
#endif
|
||||
#if defined HAVE_URING
|
||||
// defined in aio_uring.cc
|
||||
aio *create_uring(thread_pool *pool, int max_io);
|
||||
#endif
|
||||
{
|
||||
io_context_t ctx;
|
||||
memset(&ctx, 0, sizeof ctx);
|
||||
if (int ret= io_setup(max_io, &ctx))
|
||||
{
|
||||
fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
return new aio_linux(ctx, pool);
|
||||
}
|
||||
/*
|
||||
@brief
|
||||
Choose native linux aio implementation based on availability and user
|
||||
preference.
|
||||
|
||||
@param pool - thread pool to use for aio operations
|
||||
@param max_io - maximum number of concurrent io operations
|
||||
@param impl - implementation to use, can be one of the following:
|
||||
|
||||
@returns
|
||||
A pointer to the aio implementation object, or nullptr if no suitable
|
||||
implementation is available.
|
||||
|
||||
If impl is OS_IO_DEFAULT, it will try uring first, fallback to libaio
|
||||
If impl is OS_IO_URING or OS_IO_LIBAIO, it won't fallback
|
||||
*/
|
||||
aio *create_linux_aio(thread_pool *pool, int max_io, aio_implementation impl)
|
||||
{
|
||||
#if defined HAVE_URING
|
||||
if (impl == OS_IO_URING || impl == OS_IO_DEFAULT)
|
||||
{
|
||||
aio *ret= create_uring(pool, max_io);
|
||||
if (ret)
|
||||
return ret;
|
||||
else if (impl == OS_IO_URING)
|
||||
return nullptr; // uring is not available
|
||||
else
|
||||
fprintf(stderr, "create_uring failed: falling back to libaio\n");
|
||||
}
|
||||
#endif
|
||||
#if defined HAVE_LIBAIO
|
||||
if (impl == OS_IO_LIBAIO || impl == OS_IO_DEFAULT)
|
||||
return create_libaio(pool, max_io);
|
||||
#endif
|
||||
return nullptr;
|
||||
}
|
||||
} // namespace tpool
|
||||
|
||||
|
@ -19,7 +19,7 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
||||
#include <mutex>
|
||||
#include <atomic>
|
||||
#include <tpool_structs.h>
|
||||
#ifdef LINUX_NATIVE_AIO
|
||||
#ifdef HAVE_LIBAIO
|
||||
#include <libaio.h>
|
||||
#endif
|
||||
#ifdef HAVE_URING
|
||||
@ -130,9 +130,9 @@ struct aiocb
|
||||
:OVERLAPPED
|
||||
#endif
|
||||
{
|
||||
#if defined LINUX_NATIVE_AIO || defined HAVE_URING
|
||||
#if defined HAVE_LIBAIO || defined HAVE_URING
|
||||
union {
|
||||
# ifdef LINUX_NATIVE_AIO
|
||||
# ifdef HAVE_LIBAIO
|
||||
/** The context between io_submit() and io_getevents();
|
||||
must be the first data member! */
|
||||
iocb m_iocb;
|
||||
@ -214,10 +214,10 @@ extern aio *create_simulated_aio(thread_pool *tp);
|
||||
|
||||
enum aio_implementation
|
||||
{
|
||||
OS_DEFAULT
|
||||
OS_IO_DEFAULT
|
||||
#ifdef __linux__
|
||||
, OS_IO_URING
|
||||
, OS_AIO
|
||||
, OS_IO_LIBAIO
|
||||
#endif
|
||||
};
|
||||
|
||||
@ -226,6 +226,7 @@ class thread_pool
|
||||
protected:
|
||||
/* AIO handler */
|
||||
std::unique_ptr<aio> m_aio{};
|
||||
aio_implementation m_aio_impl= OS_IO_DEFAULT;
|
||||
virtual aio *create_native_aio(int max_io, aio_implementation)= 0;
|
||||
|
||||
public:
|
||||
@ -249,7 +250,10 @@ public:
|
||||
int configure_aio(bool use_native_aio, int max_io, aio_implementation impl)
|
||||
{
|
||||
if (use_native_aio)
|
||||
{
|
||||
m_aio.reset(create_native_aio(max_io, impl));
|
||||
m_aio_impl= impl;
|
||||
}
|
||||
else
|
||||
m_aio.reset(create_simulated_aio(this));
|
||||
return !m_aio ? -1 : 0;
|
||||
@ -260,12 +264,7 @@ public:
|
||||
assert(m_aio);
|
||||
if (use_native_aio)
|
||||
{
|
||||
const aio_implementation impl=
|
||||
#ifdef LINUX_NATIVE_AIO
|
||||
!strcmp(get_aio_implementation(), "Linux native AIO") ? OS_AIO :
|
||||
#endif
|
||||
OS_DEFAULT;
|
||||
auto new_aio= create_native_aio(max_io, impl);
|
||||
auto new_aio= create_native_aio(max_io, m_aio_impl);
|
||||
if (!new_aio)
|
||||
return -1;
|
||||
m_aio.reset(new_aio);
|
||||
@ -306,6 +305,19 @@ public:
|
||||
virtual void wait_end() {};
|
||||
virtual ~thread_pool() {}
|
||||
};
|
||||
|
||||
/** Return true if compiled with native AIO support.*/
|
||||
constexpr bool supports_native_aio()
|
||||
{
|
||||
#ifdef _WIN32
|
||||
return true;
|
||||
#elif defined(__linux__) && (defined(HAVE_LIBAIO) || defined(HAVE_URING))
|
||||
return true;
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
const int DEFAULT_MIN_POOL_THREADS= 1;
|
||||
const int DEFAULT_MAX_POOL_THREADS= 500;
|
||||
extern thread_pool *
|
||||
|
@ -37,14 +37,8 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
|
||||
|
||||
namespace tpool
|
||||
{
|
||||
|
||||
#ifdef __linux__
|
||||
# if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO)
|
||||
aio *create_linux_aio(thread_pool* tp, int max_io, aio_implementation);
|
||||
# else
|
||||
static aio *create_linux_aio(thread_pool *, int, aio_implementation)
|
||||
{ return nullptr; }
|
||||
# endif
|
||||
#elif defined _WIN32
|
||||
aio *create_win_aio(thread_pool* tp, int max_io);
|
||||
#endif
|
||||
|
Loading…
x
Reference in New Issue
Block a user