diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt index 7c41d78d796..8e42d29c4ba 100644 --- a/client/CMakeLists.txt +++ b/client/CMakeLists.txt @@ -53,7 +53,7 @@ SET_TARGET_PROPERTIES(mariadb-test PROPERTIES ENABLE_EXPORTS TRUE) MYSQL_ADD_EXECUTABLE(mariadb-check mysqlcheck.c) TARGET_LINK_LIBRARIES(mariadb-check ${CLIENT_LIB}) -MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c) +MYSQL_ADD_EXECUTABLE(mariadb-dump mysqldump.cc ../sql-common/my_user.c connection_pool.cc) TARGET_LINK_LIBRARIES(mariadb-dump ${CLIENT_LIB}) MYSQL_ADD_EXECUTABLE(mariadb-import mysqlimport.c) diff --git a/client/mysqldump.cc b/client/mysqldump.cc index b786757720c..4f5a93f00d6 100644 --- a/client/mysqldump.cc +++ b/client/mysqldump.cc @@ -61,7 +61,7 @@ #include "mysqld_error.h" #include /* ORACLE_WELCOME_COPYRIGHT_NOTICE */ - +#include "connection_pool.h" /* Exit codes */ #define EX_USAGE 1 @@ -194,7 +194,7 @@ FILE *stderror_file=0; static uint opt_protocol= 0; static char *opt_plugin_dir= 0, *opt_default_auth= 0; - +static uint opt_parallel= 0; /* Dynamic_string wrapper functions. In this file use these wrappers, they will terminate the process if there is @@ -246,6 +246,8 @@ static HASH ignore_table, ignore_data; static HASH ignore_database; +static async_pool::connection_pool connection_pool; + static struct my_option my_long_options[] = { {"all-databases", 'A', @@ -526,6 +528,8 @@ static struct my_option my_long_options[] = {"password", 'p', "Password to use when connecting to server. If password is not given it's solicited on the tty.", 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, + {"parallel", 'j', "Number of dump table jobs executed in parallel (only with --tab option)", + &opt_parallel, &opt_parallel, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, #ifdef _WIN32 {"pipe", 'W', "Use named pipes to connect to server.", 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, @@ -1920,6 +1924,13 @@ static void free_resources() else fflush(md_result_file); } + if (first_error && mysql) + { + connection_pool.for_each_connection( + [](MYSQL *c) { mysql_kill(mysql,c->thread_id);}); + } + connection_pool.close(); + if (get_table_name_result) mysql_free_result(get_table_name_result); if (routine_res) @@ -4061,6 +4072,21 @@ static void vers_append_system_time(DYNAMIC_STRING* query_string) } } +/** + Completion handler for async queries in the pool. + Dies in case query produced an error. + + @param mysql The connection that executed the query. + @param query The query that was executed. + @param success Whether the query was successful. +*/ +static void send_query_completion_func(MYSQL* mysql, const char* query, + bool success, void*) +{ + if (!success) + maybe_die(EX_MYSQLERR, "Couldn't execute async query '%s' (%s)", query, + mysql_error(mysql)); +} /* @@ -4216,6 +4242,10 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key, dynstr_append_checked(&query_string, select_field_names.str); } dynstr_append_checked(&query_string, " FROM "); + char quoted_db_buf[NAME_LEN * 2 + 3]; + char *qdatabase= quote_name(db, quoted_db_buf, opt_quoted); + dynstr_append_checked(&query_string, qdatabase); + dynstr_append_checked(&query_string, "."); dynstr_append_checked(&query_string, result_table); if (versioned) @@ -4239,8 +4269,16 @@ static void dump_table(const char *table, const char *db, const uchar *hash_key, my_free(order_by); order_by= 0; } - - if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length)) + if (opt_parallel) + { + if (connection_pool.execute_async(query_string.str,send_query_completion_func,nullptr,true)) + { + dynstr_free(&query_string); + DB_error(mysql, "when executing send_query 'SELECT INTO OUTFILE'"); + DBUG_VOID_RETURN; + } + } + else if (mysql_real_query(mysql, query_string.str, (ulong)query_string.length)) { dynstr_free(&query_string); DB_error(mysql, "when executing 'SELECT INTO OUTFILE'"); @@ -7082,6 +7120,27 @@ static void dynstr_realloc_checked(DYNAMIC_STRING *str, ulong additional_size) die(EX_MYSQLERR, DYNAMIC_STR_ERROR_MSG); } +#define MAX_POOL_CONNECTIONS 256 +static void init_connection_pool(uint n_connections) +{ + MYSQL *conn[MAX_POOL_CONNECTIONS]; + + if (n_connections > array_elements(conn)) + die(EX_USAGE, "Too many connections"); + + for (uint i= 0; i < n_connections; i++) + { + MYSQL *c= connect_to_db(current_host, current_user, opt_password); + if (!c) + { + for (uint j= 0; j < i; j++) + mysql_close(conn[j]); + die(EX_MYSQLERR, "Error during connection to DB"); + } + conn[i]= c; + } + connection_pool.init(conn, n_connections); +} int main(int argc, char **argv) { @@ -7118,15 +7177,24 @@ int main(int argc, char **argv) } } - if (connect_to_db(current_host, current_user, opt_password)) + mysql= connect_to_db(current_host, current_user, opt_password); + if (!mysql) { free_resources(); exit(EX_MYSQLERR); } if (!path) + { write_header(md_result_file, *argv); - - + if (opt_parallel) + { + verbose_msg("-- Warning: ignoring --parallel setting, it currently only " + "works together with --tab\n"); + opt_parallel= 0; + } + } + else if (opt_parallel) + init_connection_pool(opt_parallel); /* Check if the server support multi source */ if (mysql_get_server_version(mysql) >= 100000) @@ -7179,8 +7247,15 @@ int main(int argc, char **argv) goto err; } - if (opt_single_transaction && start_transaction(mysql)) - goto err; + if (opt_single_transaction) + { + if (start_transaction(mysql)) + goto err; + connection_pool.for_each_connection([](MYSQL *c) { + if (start_transaction(c)) + maybe_die(EX_MYSQLERR, "Failed to start transaction on connection ID %u", mysql->thread_id); + }); + } /* Add 'STOP SLAVE to beginning of dump */ if (opt_slave_apply && add_stop_slave()) @@ -7273,6 +7348,9 @@ int main(int argc, char **argv) if (opt_delete_master_logs && purge_bin_logs_to(mysql, bin_log_name)) goto err; + /* wait for outstanding asynchronous queries */ + connection_pool.wait_all(); + /* No reason to explicitly COMMIT the transaction, neither to explicitly UNLOCK TABLES: these will be automatically be done by the server when we