diff --git a/doc/lua-api/index.rst b/doc/lua-api/index.rst index 2cce5d340..b548d85e8 100644 --- a/doc/lua-api/index.rst +++ b/doc/lua-api/index.rst @@ -758,6 +758,9 @@ Core class It takes up to 4 optional arguments (provided when registering), and no output is expected. + See also :js:func:`core.queue` to dynamically pass data between main context + and tasks or even between tasks. + .. js:function:: core.register_cli([path], usage, func) **context**: body @@ -849,6 +852,14 @@ Core class :returns: A :ref:`concat_class` object. +.. js:function:: core.queue() + + **context**: body, init, task, event, action, sample-fetch, converter + + This function returns a new queue object. + + :returns: A :ref:`queue_class` object. + .. js:function:: core.done(data) **context**: body, init, task, action, sample-fetch, converter @@ -1758,6 +1769,83 @@ This class contains additional info related to **SERVER_ADMIN** event. Same as :js:attr:`ServerEventState.requeued` but when the requeue is due to the server administrative state change. +.. _queue_class: + +Queue class +=========== + +.. js:class:: Queue + + This class provides a generic FIFO storage mechanism that may be shared + between multiple lua contexts to easily pass data between them, as stock + Lua doesn't provide easy methods for passing data between multiple coroutines. + + inter-task example: + +.. code-block:: lua + + -- script wide shared queue + local queue = core.queue() + + -- master task + core.register_task(function() + -- send the date every second + while true do + queue:push(os.date("%c", core.now().sec)) + core.sleep(1) + end + end) + + -- worker task + core.register_task(function() + while true do + -- print the date sent by master + print(queue:pop_wait()) + end + end) +.. + + Of course, queue may also be used as a local storage mechanism. + + Use :js:func:`core.queue` to get a new Queue object. + +.. js:function:: Queue.size(queue) + + This function returns the number of items within the Queue. + + :param class_queue queue: A :ref:`queue_class` to the current queue + +.. js:function:: Queue.push(queue, item) + + This function pushes the item (may be of any type) to the queue. + Pushed item cannot be nil or invalid, or an error will be thrown. + + :param class_queue queue: A :ref:`queue_class` to the current queue + :returns: boolean true for success and false for error + +.. js:function:: Queue.pop(queue) + + This function immediately tries to pop an item from the queue. + It returns nil of no item is available at the time of the call. + + :param class_queue queue: A :ref:`queue_class` to the current queue + :returns: the item at the top of the stack (any type) or nil if no items + +.. js:function:: Queue.pop_wait(queue) + + **context**: task + + This is an alternative to pop() that may be used within task contexts. + + The call waits for data if no item is currently available. This may be + useful when used in a while loop to prevent cpu waste. + + Note that this requires yielding, thus it is only available within contexts + that support yielding (mainly task context). + + :param class_queue queue: A :ref:`queue_class` to the current queue + :returns: the item at the top of the stack (any type) or nil in case of error + .. _concat_class: Concat class diff --git a/src/hlua_fcn.c b/src/hlua_fcn.c index a357c9f7b..761cfae88 100644 --- a/src/hlua_fcn.c +++ b/src/hlua_fcn.c @@ -43,6 +43,7 @@ /* Contains the class reference of the concat object. */ static int class_concat_ref; +static int class_queue_ref; static int class_proxy_ref; static int class_server_ref; static int class_listener_ref; @@ -499,6 +500,241 @@ static void hlua_concat_init(lua_State *L) class_concat_ref = luaL_ref(L, LUA_REGISTRYINDEX); } +/* C backing storage for lua Queue class */ +struct hlua_queue { + uint32_t size; + struct mt_list list; + struct mt_list wait_tasks; +}; + +/* used to store lua objects in queue->list */ +struct hlua_queue_item { + int ref; /* lua object reference id */ + struct mt_list list; +}; + +/* used to store wait entries in queue->wait_tasks */ +struct hlua_queue_wait +{ + struct task *task; + struct mt_list entry; +}; + +/* This is the memory pool containing struct hlua_queue_item (queue items) + */ +DECLARE_STATIC_POOL(pool_head_hlua_queue, "hlua_queue", sizeof(struct hlua_queue_item)); + +/* This is the memory pool containing struct hlua_queue_wait + * (queue waiting tasks) + */ +DECLARE_STATIC_POOL(pool_head_hlua_queuew, "hlua_queuew", sizeof(struct hlua_queue_wait)); + +static struct hlua_queue *hlua_check_queue(lua_State *L, int ud) +{ + return hlua_checkudata(L, ud, class_queue_ref); +} + +/* queue:size(): returns an integer containing the current number of queued + * items. + */ +static int hlua_queue_size(lua_State *L) +{ + struct hlua_queue *queue = hlua_check_queue(L, 1); + + BUG_ON(!queue); + lua_pushinteger(L, queue->size); + + return 1; +} + +/* queue:push(): push an item (any type, except nil) at the end of the queue + * + * Returns boolean:true for success and boolean:false on error + */ +static int hlua_queue_push(lua_State *L) +{ + struct hlua_queue *queue = hlua_check_queue(L, 1); + struct hlua_queue_item *item; + struct mt_list *elt1, elt2; + struct hlua_queue_wait *waiter; + + if (lua_gettop(L) != 2 || lua_isnoneornil(L, 2)) { + luaL_error(L, "unexpected argument"); + /* not reached */ + return 0; + } + BUG_ON(!queue); + item = pool_alloc(pool_head_hlua_queue); + if (!item) { + lua_pushboolean(L, 0); + return 1; + } + item->ref = hlua_ref(L); + MT_LIST_INIT(&item->list); + HA_ATOMIC_INC(&queue->size); + MT_LIST_APPEND(&queue->list, &item->list); + + /* notify tasks waiting on queue:pop_wait() (if any) */ + mt_list_for_each_entry_safe(waiter, &queue->wait_tasks, entry, elt1, elt2) { + task_wakeup(waiter->task, TASK_WOKEN_MSG); + } + + lua_pushboolean(L, 1); + return 1; +} + +/* queue:pop(): returns the first item at the top of que queue or nil if + * the queue is empty. + */ +static int _hlua_queue_pop(lua_State *L, struct hlua_queue *queue) +{ + struct hlua_queue_item *item; + + item = MT_LIST_POP(&queue->list, typeof(item), list); + if (!item) { + lua_pushnil(L); + return 1; /* nothing in queue, return nil */ + } + + HA_ATOMIC_DEC(&queue->size); + /* push lua obj on the stack */ + hlua_pushref(L, item->ref); + + /* free the queue item */ + pool_free(pool_head_hlua_queue, item); + + return 1; +} +static int hlua_queue_pop(lua_State *L) +{ + struct hlua_queue *queue = hlua_check_queue(L, 1); + + BUG_ON(!queue); + return _hlua_queue_pop(L, queue); +} + +/* queue:pop_wait(): same as queue:pop() but doesn't return on empty queue. + * + * Aborts if used incorrectly and returns nil in case of memory error. + */ +static int _hlua_queue_pop_wait(lua_State *L, int status, lua_KContext ctx) +{ + struct hlua_queue *queue = hlua_check_queue(L, 1); + + /* new pop attempt */ + if (!_hlua_queue_pop(L, queue)) + hlua_yieldk(L, 0, 0, _hlua_queue_pop_wait, TICK_ETERNITY, 0); // wait retry + return 1; // success +} +static int hlua_queue_pop_wait(lua_State *L) +{ + struct hlua_queue *queue = hlua_check_queue(L, 1); + struct hlua_queue_wait *wait; + struct hlua *hlua; + + BUG_ON(!queue); + + /* Get hlua struct, or NULL if we execute from main lua state */ + hlua = hlua_gethlua(L); + + if (!hlua || HLUA_CANT_YIELD(hlua)) { + luaL_error(L, "pop_wait() may only be used within task context " + "(requires yielding)"); + return 0; /* not reached */ + } + + wait = pool_alloc(pool_head_hlua_queuew); + if (!wait) { + lua_pushnil(L); + return 1; /* memory error, return nil */ + } + + wait->task = hlua->task; + MT_LIST_INIT(&wait->entry); + + /* add task to queue's wait list */ + MT_LIST_TRY_APPEND(&queue->wait_tasks, &wait->entry); + + /* push queue on the top of the stack */ + lua_pushlightuserdata(L, queue); + + /* try to pop without waiting (there could be already pending items) */ + if (!_hlua_queue_pop(L, queue)) { + /* no item immediately available, go to waiting loop */ + hlua_yieldk(L, 0, 0, _hlua_queue_pop_wait, TICK_ETERNITY, 0); + } + + /* remove task from waiting list */ + MT_LIST_DELETE(&wait->entry); + pool_free(pool_head_hlua_queuew, wait); + + return 1; +} + +static int hlua_queue_new(lua_State *L) +{ + struct hlua_queue *q; + + lua_newtable(L); + + /* set class metatable */ + lua_rawgeti(L, LUA_REGISTRYINDEX, class_queue_ref); + lua_setmetatable(L, -2); + + /* index:0 is queue userdata (c data) */ + q = lua_newuserdata(L, sizeof(*q)); + MT_LIST_INIT(&q->list); + MT_LIST_INIT(&q->wait_tasks); + q->size = 0; + lua_rawseti(L, -2, 0); + + /* class methods */ + hlua_class_function(L, "size", hlua_queue_size); + hlua_class_function(L, "pop", hlua_queue_pop); + hlua_class_function(L, "pop_wait", hlua_queue_pop_wait); + hlua_class_function(L, "push", hlua_queue_push); + + return 1; +} + +static int hlua_queue_gc(struct lua_State *L) +{ + struct hlua_queue *queue = hlua_check_queue(L, 1); + struct hlua_queue_wait *wait; + struct hlua_queue_item *item; + + /* Purge waiting tasks (if any) + * + * It is normally not expected to have waiting tasks, except if such + * task has been aborted while in the middle of a queue:pop_wait() + * function call. + */ + while ((wait = MT_LIST_POP(&queue->wait_tasks, typeof(wait), entry))) { + /* free the wait entry */ + pool_free(pool_head_hlua_queuew, wait); + } + + /* purge remaining (unconsumed) items in the queue */ + while ((item = MT_LIST_POP(&queue->list, typeof(item), list))) { + /* free the queue item */ + pool_free(pool_head_hlua_queue, item); + } + + /* queue (userdata) will automatically be freed by lua gc */ + + return 0; +} + +static void hlua_queue_init(lua_State *L) +{ + /* Creates the queue object. */ + lua_newtable(L); + + hlua_class_function(L, "__gc", hlua_queue_gc); + + class_queue_ref = luaL_ref(L, LUA_REGISTRYINDEX); +} + int hlua_fcn_new_stktable(lua_State *L, struct stktable *tbl) { lua_newtable(L); @@ -2339,6 +2575,7 @@ static int hlua_regex_free(struct lua_State *L) void hlua_fcn_reg_core_fcn(lua_State *L) { hlua_concat_init(L); + hlua_queue_init(L); hlua_class_function(L, "now", hlua_now); hlua_class_function(L, "http_date", hlua_http_date); @@ -2346,6 +2583,7 @@ void hlua_fcn_reg_core_fcn(lua_State *L) hlua_class_function(L, "rfc850_date", hlua_rfc850_date); hlua_class_function(L, "asctime_date", hlua_asctime_date); hlua_class_function(L, "concat", hlua_concat_new); + hlua_class_function(L, "queue", hlua_queue_new); hlua_class_function(L, "get_info", hlua_get_info); hlua_class_function(L, "parse_addr", hlua_parse_addr); hlua_class_function(L, "match_addr", hlua_match_addr);