diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 4313840119e..f5a30d6f877 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -1775,7 +1775,7 @@ rpl_parallel_thread::inuse_relaylog_refcount_update() inuse_relaylog *ir= accumulated_ir_last; if (ir) { - my_atomic_add64(&ir->dequeued_count, accumulated_ir_count); + ir->dequeued_count+= accumulated_ir_count; accumulated_ir_count= 0; accumulated_ir_last= NULL; } diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 18ee6a98ed0..49b2e00882e 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -1435,31 +1435,27 @@ Relay_log_info::alloc_inuse_relaylog(const char *name) uint32 gtid_count; rpl_gtid *gtid_list; - if (!(ir= (inuse_relaylog *)my_malloc(sizeof(*ir), MYF(MY_WME|MY_ZEROFILL)))) - { - my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*ir)); - return 1; - } gtid_count= relay_log_state.count(); if (!(gtid_list= (rpl_gtid *)my_malloc(sizeof(*gtid_list)*gtid_count, MYF(MY_WME)))) { - my_free(ir); my_error(ER_OUTOFMEMORY, MYF(0), (int)sizeof(*gtid_list)*gtid_count); return 1; } + if (!(ir= new inuse_relaylog(this, gtid_list, gtid_count, name))) + { + my_free(gtid_list); + my_error(ER_OUTOFMEMORY, MYF(0), (int) sizeof(*ir)); + return 1; + } if (relay_log_state.get_gtid_list(gtid_list, gtid_count)) { my_free(gtid_list); - my_free(ir); + delete ir; DBUG_ASSERT(0 /* Should not be possible as we allocated correct length */); my_error(ER_OUT_OF_RESOURCES, MYF(0)); return 1; } - ir->rli= this; - strmake_buf(ir->name, name); - ir->relay_log_state= gtid_list; - ir->relay_log_state_count= gtid_count; if (!inuse_relaylog_list) inuse_relaylog_list= ir; @@ -1478,7 +1474,7 @@ void Relay_log_info::free_inuse_relaylog(inuse_relaylog *ir) { my_free(ir->relay_log_state); - my_free(ir); + delete ir; } diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 0e2e42fcb08..5d2d33c397f 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -608,10 +608,20 @@ struct inuse_relaylog { /* Number of events in this relay log queued for worker threads. */ int64 queued_count; /* Number of events completed by worker threads. */ - volatile int64 dequeued_count; + Atomic_counter dequeued_count; /* Set when all events have been read from a relaylog. */ bool completed; char name[FN_REFLEN]; + + inuse_relaylog(Relay_log_info *rli_arg, rpl_gtid *relay_log_state_arg, + uint32 relay_log_state_count_arg, + const char *name_arg): + next(0), rli(rli_arg), relay_log_state(relay_log_state_arg), + relay_log_state_count(relay_log_state_count_arg), queued_count(0), + dequeued_count(0), completed(false) + { + strmake_buf(name, name_arg); + } };