From 98355a07891b92802b278c8ed2fb26c1c5b47b64 Mon Sep 17 00:00:00 2001 From: Alexey Yurchenko Date: Thu, 16 Dec 2021 13:46:37 +0200 Subject: [PATCH] MDEV-26971: Support for progress reporting from SST scripts. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New feedback events: - "total N": signals new SST stage and reports estimated total work - "complete N": reports completed work in the current stage Reviewed-by: Jan Lindström --- sql/wsrep_sst.cc | 134 ++++++++++++++++++++++++++++++++++++++++----- sql/wsrep_status.h | 17 +++--- sql/wsrep_utils.cc | 2 +- 3 files changed, 129 insertions(+), 24 deletions(-) diff --git a/sql/wsrep_sst.cc b/sql/wsrep_sst.cc index 786d8b9bbf5..68a4dd2bb57 100644 --- a/sql/wsrep_sst.cc +++ b/sql/wsrep_sst.cc @@ -518,6 +518,57 @@ static int generate_binlog_index_opt_val(char** ret) return 0; } +// report progress event +static void sst_report_progress(int const from, + long long const total_prev, + long long const total, + long long const complete) +{ + static char buf[128] = { '\0', }; + static size_t const buf_len= sizeof(buf) - 1; + snprintf(buf, buf_len, + "{ \"from\": %d, \"to\": %d, \"total\": %lld, \"done\": %lld, " + "\"indefinite\": -1 }", + from, WSREP_MEMBER_JOINED, total_prev + total, total_prev +complete); + WSREP_DEBUG("REPORTING SST PROGRESS: '%s'", buf); +} + +// process "complete" event from SST script feedback +static void sst_handle_complete(const char* const input, + long long const total_prev, + long long* total, + long long* complete, + int const from) +{ + long long x; + int n= sscanf(input, " %lld", &x); + if (n > 0 && x > *complete) + { + *complete= x; + if (*complete > *total) *total= *complete; + sst_report_progress(from, total_prev, *total, *complete); + } +} + +// process "total" event from SST script feedback +static void sst_handle_total(const char* const input, + long long* total_prev, + long long* total, + long long* complete, + int const from) +{ + long long x; + int n= sscanf(input, " %lld", &x); + if (n > 0) + { + // new stage starts, update total_prev + *total_prev+= *total; + *total= x; + *complete= 0; + sst_report_progress(from, *total_prev, *total, *complete); + } +} + static void* sst_joiner_thread (void* a) { sst_thread_arg* arg= (sst_thread_arg*) a; @@ -525,8 +576,8 @@ static void* sst_joiner_thread (void* a) { THD* thd; - const char magic[]= "ready"; - const size_t magic_len= sizeof(magic) - 1; + static const char magic[]= "ready"; + static const size_t magic_len= sizeof(magic) - 1; const size_t out_len= 512; char out[out_len]; @@ -579,22 +630,52 @@ static void* sst_joiner_thread (void* a) wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED; wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED; + // current state progress + long long total= 0; + long long complete= 0; + // previous stages cumulative progress + long long total_prev= 0; + // in case of successfull receiver start, wait for SST // completion/end - char* tmp= my_fgets (out, out_len, proc.pipe()); - - proc.wait(); - + const char* tmp= NULL; err= EINVAL; - if (!tmp) + wait_signal: + tmp= my_fgets (out, out_len, proc.pipe()); + + if (tmp) { - WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from " - "joiner script."); - if (proc.error()) err= proc.error(); + static const char magic_total[]= "total"; + static const size_t total_len=strlen(magic_total); + static const char magic_complete[]= "complete"; + static const size_t complete_len=strlen(magic_complete); + static const int from= WSREP_MEMBER_JOINER; + + if (!strncasecmp (tmp, magic_complete, complete_len)) + { + sst_handle_complete(tmp + complete_len, total_prev, &total, &complete, + from); + goto wait_signal; + } + else if (!strncasecmp (tmp, magic_total, total_len)) + { + sst_handle_total(tmp + total_len, &total_prev, &total, &complete, from); + goto wait_signal; + } } else { + WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from " + "joiner script."); + proc.wait(); + if (proc.error()) err= proc.error(); + } + + // this should be the final script output with GTID + if (tmp) + { + proc.wait(); // Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any). const char *pos= strchr(out, ' '); @@ -1680,16 +1761,38 @@ static void* sst_donor_thread (void* a) if (proc.pipe() && !err) { + long long total= 0; + long long complete= 0; + // total form previous stages + long long total_prev= 0; + wait_signal: out= my_fgets (out_buf, out_len, proc.pipe()); if (out) { - const char magic_flush[]= "flush tables"; - const char magic_cont[]= "continue"; - const char magic_done[]= "done"; + static const char magic_flush[]= "flush tables"; + static const char magic_cont[]= "continue"; + static const char magic_done[]= "done"; + static const size_t done_len=strlen(magic_done); + static const char magic_total[]= "total"; + static const size_t total_len=strlen(magic_total); + static const char magic_complete[]= "complete"; + static const size_t complete_len=strlen(magic_complete); + static const int from= WSREP_MEMBER_DONOR; - if (!strcasecmp (out, magic_flush)) + if (!strncasecmp (out, magic_complete, complete_len)) + { + sst_handle_complete(out + complete_len, total_prev, &total, &complete, + from); + goto wait_signal; + } + else if (!strncasecmp (out, magic_total, total_len)) + { + sst_handle_total(out + total_len, &total_prev, &total, &complete, from); + goto wait_signal; + } + else if (!strcasecmp (out, magic_flush)) { err= sst_flush_tables (thd.ptr); if (!err) @@ -1725,7 +1828,7 @@ wait_signal: err= 0; goto wait_signal; } - else if (!strncasecmp (out, magic_done, strlen(magic_done))) + else if (!strncasecmp (out, magic_done, done_len)) { err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1, &ret_uuid, &ret_seqno); @@ -1733,6 +1836,7 @@ wait_signal: else { WSREP_WARN("Received unknown signal: '%s'", out); + err = -EINVAL; proc.wait(); } } diff --git a/sql/wsrep_status.h b/sql/wsrep_status.h index 1c99a4cfe88..dd83dda2857 100644 --- a/sql/wsrep_status.h +++ b/sql/wsrep_status.h @@ -28,23 +28,24 @@ public: static void init_once(const std::string& file_name); static void destroy(); - static void report_state(enum wsrep::server_state::state const state, - float const progress = wsrep::reporter::indefinite) + static void report_state(enum wsrep::server_state::state const state) { if (!Wsrep_status::m_instance) return; - Wsrep_status::m_instance->report_state(state, progress); + Wsrep_status::m_instance->report_state(state); + } + + static void report_progress(const std::string& progress) + { + if (!Wsrep_status::m_instance) return; + + Wsrep_status::m_instance->report_progress(progress); } static void report_log_msg(wsrep::reporter::log_level level, const char* tag, size_t tag_len, const char* buf, size_t buf_len, double const tstamp = wsrep::reporter::undefined); -// { -// if (!Wsrep_status::m_instance) return; -// -// Wsrep_status::m_instance->report_log_msg(level, msg, tstamp); -// } static bool is_instance_initialized() { diff --git a/sql/wsrep_utils.cc b/sql/wsrep_utils.cc index a85533e52bb..a1bbe2f61f8 100644 --- a/sql/wsrep_utils.cc +++ b/sql/wsrep_utils.cc @@ -102,7 +102,7 @@ node_status::set(enum wsrep::server_state::state new_status, { wsrep_notify_status(new_status, view); status= new_status; - Wsrep_status::report_state(status, 0); + Wsrep_status::report_state(status); } }