MDEV-26971: Support for progress reporting from SST scripts.

New feedback events:
 - "total N": signals new SST stage and reports estimated total work
 - "complete N": reports completed work in the current stage

Reviewed-by: Jan Lindström <jan.lindstrom@mariadb.com>
This commit is contained in:
Alexey Yurchenko 2021-12-16 13:46:37 +02:00 committed by Sergei Golubchik
parent 9d7e596ba6
commit 98355a0789
3 changed files with 129 additions and 24 deletions

View File

@ -518,6 +518,57 @@ static int generate_binlog_index_opt_val(char** ret)
return 0;
}
// report progress event
static void sst_report_progress(int const from,
long long const total_prev,
long long const total,
long long const complete)
{
static char buf[128] = { '\0', };
static size_t const buf_len= sizeof(buf) - 1;
snprintf(buf, buf_len,
"{ \"from\": %d, \"to\": %d, \"total\": %lld, \"done\": %lld, "
"\"indefinite\": -1 }",
from, WSREP_MEMBER_JOINED, total_prev + total, total_prev +complete);
WSREP_DEBUG("REPORTING SST PROGRESS: '%s'", buf);
}
// process "complete" event from SST script feedback
static void sst_handle_complete(const char* const input,
long long const total_prev,
long long* total,
long long* complete,
int const from)
{
long long x;
int n= sscanf(input, " %lld", &x);
if (n > 0 && x > *complete)
{
*complete= x;
if (*complete > *total) *total= *complete;
sst_report_progress(from, total_prev, *total, *complete);
}
}
// process "total" event from SST script feedback
static void sst_handle_total(const char* const input,
long long* total_prev,
long long* total,
long long* complete,
int const from)
{
long long x;
int n= sscanf(input, " %lld", &x);
if (n > 0)
{
// new stage starts, update total_prev
*total_prev+= *total;
*total= x;
*complete= 0;
sst_report_progress(from, *total_prev, *total, *complete);
}
}
static void* sst_joiner_thread (void* a)
{
sst_thread_arg* arg= (sst_thread_arg*) a;
@ -525,8 +576,8 @@ static void* sst_joiner_thread (void* a)
{
THD* thd;
const char magic[]= "ready";
const size_t magic_len= sizeof(magic) - 1;
static const char magic[]= "ready";
static const size_t magic_len= sizeof(magic) - 1;
const size_t out_len= 512;
char out[out_len];
@ -579,22 +630,52 @@ static void* sst_joiner_thread (void* a)
wsrep_uuid_t ret_uuid = WSREP_UUID_UNDEFINED;
wsrep_seqno_t ret_seqno= WSREP_SEQNO_UNDEFINED;
// current state progress
long long total= 0;
long long complete= 0;
// previous stages cumulative progress
long long total_prev= 0;
// in case of successfull receiver start, wait for SST
// completion/end
char* tmp= my_fgets (out, out_len, proc.pipe());
proc.wait();
const char* tmp= NULL;
err= EINVAL;
if (!tmp)
wait_signal:
tmp= my_fgets (out, out_len, proc.pipe());
if (tmp)
{
WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
"joiner script.");
if (proc.error()) err= proc.error();
static const char magic_total[]= "total";
static const size_t total_len=strlen(magic_total);
static const char magic_complete[]= "complete";
static const size_t complete_len=strlen(magic_complete);
static const int from= WSREP_MEMBER_JOINER;
if (!strncasecmp (tmp, magic_complete, complete_len))
{
sst_handle_complete(tmp + complete_len, total_prev, &total, &complete,
from);
goto wait_signal;
}
else if (!strncasecmp (tmp, magic_total, total_len))
{
sst_handle_total(tmp + total_len, &total_prev, &total, &complete, from);
goto wait_signal;
}
}
else
{
WSREP_ERROR("Failed to read uuid:seqno and wsrep_gtid_domain_id from "
"joiner script.");
proc.wait();
if (proc.error()) err= proc.error();
}
// this should be the final script output with GTID
if (tmp)
{
proc.wait();
// Read state ID (UUID:SEQNO) followed by wsrep_gtid_domain_id (if any).
const char *pos= strchr(out, ' ');
@ -1680,16 +1761,38 @@ static void* sst_donor_thread (void* a)
if (proc.pipe() && !err)
{
long long total= 0;
long long complete= 0;
// total form previous stages
long long total_prev= 0;
wait_signal:
out= my_fgets (out_buf, out_len, proc.pipe());
if (out)
{
const char magic_flush[]= "flush tables";
const char magic_cont[]= "continue";
const char magic_done[]= "done";
static const char magic_flush[]= "flush tables";
static const char magic_cont[]= "continue";
static const char magic_done[]= "done";
static const size_t done_len=strlen(magic_done);
static const char magic_total[]= "total";
static const size_t total_len=strlen(magic_total);
static const char magic_complete[]= "complete";
static const size_t complete_len=strlen(magic_complete);
static const int from= WSREP_MEMBER_DONOR;
if (!strcasecmp (out, magic_flush))
if (!strncasecmp (out, magic_complete, complete_len))
{
sst_handle_complete(out + complete_len, total_prev, &total, &complete,
from);
goto wait_signal;
}
else if (!strncasecmp (out, magic_total, total_len))
{
sst_handle_total(out + total_len, &total_prev, &total, &complete, from);
goto wait_signal;
}
else if (!strcasecmp (out, magic_flush))
{
err= sst_flush_tables (thd.ptr);
if (!err)
@ -1725,7 +1828,7 @@ wait_signal:
err= 0;
goto wait_signal;
}
else if (!strncasecmp (out, magic_done, strlen(magic_done)))
else if (!strncasecmp (out, magic_done, done_len))
{
err= sst_scan_uuid_seqno (out + strlen(magic_done) + 1,
&ret_uuid, &ret_seqno);
@ -1733,6 +1836,7 @@ wait_signal:
else
{
WSREP_WARN("Received unknown signal: '%s'", out);
err = -EINVAL;
proc.wait();
}
}

View File

@ -28,23 +28,24 @@ public:
static void init_once(const std::string& file_name);
static void destroy();
static void report_state(enum wsrep::server_state::state const state,
float const progress = wsrep::reporter::indefinite)
static void report_state(enum wsrep::server_state::state const state)
{
if (!Wsrep_status::m_instance) return;
Wsrep_status::m_instance->report_state(state, progress);
Wsrep_status::m_instance->report_state(state);
}
static void report_progress(const std::string& progress)
{
if (!Wsrep_status::m_instance) return;
Wsrep_status::m_instance->report_progress(progress);
}
static void report_log_msg(wsrep::reporter::log_level level,
const char* tag, size_t tag_len,
const char* buf, size_t buf_len,
double const tstamp = wsrep::reporter::undefined);
// {
// if (!Wsrep_status::m_instance) return;
//
// Wsrep_status::m_instance->report_log_msg(level, msg, tstamp);
// }
static bool is_instance_initialized()
{

View File

@ -102,7 +102,7 @@ node_status::set(enum wsrep::server_state::state new_status,
{
wsrep_notify_status(new_status, view);
status= new_status;
Wsrep_status::report_state(status, 0);
Wsrep_status::report_state(status);
}
}