MINOR: task: add thread safe notification_new and notification_wake variants

notification_new and notification_wake were historically meant to be
called by a single thread doing both the init and the wakeup for other
tasks waiting on the signals.

In this patch, we extend the API so that notification_new and
notification_wake have thread-safe variants that can safely be used with
multiple threads registering on the same list of events and multiple
threads pushing updates on the list.
This commit is contained in:
Aurelien DARRAGON 2025-04-01 10:07:50 +02:00
parent f0f1816f1a
commit b77b1a2c3a
2 changed files with 56 additions and 15 deletions

View File

@ -107,8 +107,13 @@ enum {
struct notification {
struct list purge_me; /* Part of the list of signals to be purged in the
case of the LUA execution stack crash. */
struct list wake_me; /* Part of list of signals to be targeted if an
event occurs. */
union {
struct list wake_me; /* Part of list of signals to be targeted if an
event occurs. */
struct mt_list wake_me_mt; /* thread safe signal list */
} wake;
# define wake_me wake.wake_me
# define wake_me_mt wake.wake_me_mt
struct task *task; /* The task to be wake if an event occurs. */
__decl_thread(HA_SPINLOCK_T lock);
};

View File

@ -775,6 +775,17 @@ static inline const char *task_wakeup_type_str(uint t)
}
}
static inline struct notification *_notification_new(struct list *purge, struct task *wakeup)
{
struct notification *com = pool_alloc(pool_head_notification);
if (!com)
return NULL;
LIST_APPEND(purge, &com->purge_me);
HA_SPIN_INIT(&com->lock);
com->task = wakeup;
return com;
}
/* This function register a new signal. "lua" is the current lua
* execution context. It contains a pointer to the associated task.
* "link" is a list head attached to an other task that must be wake
@ -784,13 +795,20 @@ static inline const char *task_wakeup_type_str(uint t)
*/
static inline struct notification *notification_new(struct list *purge, struct list *event, struct task *wakeup)
{
struct notification *com = pool_alloc(pool_head_notification);
struct notification *com = _notification_new(purge, wakeup);
if (!com)
return NULL;
LIST_APPEND(purge, &com->purge_me);
LIST_APPEND(event, &com->wake_me);
HA_SPIN_INIT(&com->lock);
com->task = wakeup;
return com;
}
/* thread safe variant */
static inline struct notification *notification_new_mt(struct list *purge, struct mt_list *event, struct task *wakeup)
{
struct notification *com = _notification_new(purge, wakeup);
if (!com)
return NULL;
MT_LIST_APPEND(event, &com->wake_me_mt);
return com;
}
@ -839,6 +857,19 @@ static inline void notification_gc(struct list *purge)
}
}
static inline void _notification_wake(struct notification *com)
{
HA_SPIN_LOCK(NOTIF_LOCK, &com->lock);
if (!com->task) {
HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
pool_free(pool_head_notification, com);
return;
}
task_wakeup(com->task, TASK_WOKEN_MSG);
com->task = NULL;
HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
}
/* This function sends signals. It wakes all the tasks attached
* to a list head, and remove the signal, and free the used
* memory. The wake list is not locked because it is owned by
@ -851,16 +882,21 @@ static inline void notification_wake(struct list *wake)
/* Wake task and delete all pending communication signals. */
list_for_each_entry_safe(com, back, wake, wake_me) {
HA_SPIN_LOCK(NOTIF_LOCK, &com->lock);
LIST_DELETE(&com->wake_me);
if (!com->task) {
HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
pool_free(pool_head_notification, com);
continue;
}
task_wakeup(com->task, TASK_WOKEN_MSG);
com->task = NULL;
HA_SPIN_UNLOCK(NOTIF_LOCK, &com->lock);
_notification_wake(com);
}
}
/* thread safe variant */
static inline void notification_wake_mt(struct mt_list *wake)
{
struct notification *com;
struct mt_list back;
/* Wake task and delete all pending communication signals. */
MT_LIST_FOR_EACH_ENTRY_UNLOCKED(com, wake, wake_me_mt, back) {
_notification_wake(com);
com = NULL;
}
}