Port QSemaphore to use futexes

This is interesting because QSemaphore now needs to allocate no memory:
it's just a simple 31-bit counter and the contention flag.

Change-Id: I6e9274c1e7444ad48c81fffd14dbc0ab42bc2e00
Reviewed-by: Lars Knoll <lars.knoll@qt.io>
This commit is contained in:
Thiago Macieira 2017-08-17 14:52:22 -07:00
parent 5b5153fd5b
commit 9378bce442
5 changed files with 144 additions and 15 deletions

View File

@ -81,6 +81,7 @@
* Feature Added in version Macro
* - inotify_init1 before 2.6.12-rc12
* - futex(2) before 2.6.12-rc12
* - FUTEX_WAKE_OP 2.6.14 FUTEX_OP
* - linkat(2) 2.6.17 O_TMPFILE
* - FUTEX_PRIVATE_FLAG 2.6.22
* - O_CLOEXEC 2.6.23

View File

@ -90,6 +90,7 @@ template <typename T>
class QBasicAtomicInteger
{
public:
typedef T Type;
typedef QAtomicOps<T> Ops;
// static check that this is a valid integer
Q_STATIC_ASSERT_X(QTypeInfo<T>::isIntegral, "template parameter is not an integral type");

View File

@ -73,14 +73,12 @@ QT_END_NAMESPACE
QT_BEGIN_NAMESPACE
namespace QtLinuxFutex {
constexpr inline bool futexAvailable() { return true; }
inline int _q_futex(int *addr, int op, int val, const struct timespec *timeout) Q_DECL_NOTHROW
inline int _q_futex(int *addr, int op, int val, quintptr val2 = 0,
int *addr2 = nullptr, int val3 = 0) Q_DECL_NOTHROW
{
int *addr2 = 0;
int val2 = 0;
// we use __NR_futex because some libcs (like Android's bionic) don't
// provide SYS_futex etc.
return syscall(__NR_futex, addr, op | FUTEX_PRIVATE_FLAG, val, timeout, addr2, val2);
return syscall(__NR_futex, addr, op | FUTEX_PRIVATE_FLAG, val, val2, addr2, val3);
}
template <typename T> int *addr(T *ptr)
{
@ -95,7 +93,7 @@ namespace QtLinuxFutex {
template <typename Atomic>
inline void futexWait(Atomic &futex, typename Atomic::Type expectedValue)
{
_q_futex(addr(&futex), FUTEX_WAIT, qintptr(expectedValue), nullptr);
_q_futex(addr(&futex), FUTEX_WAIT, qintptr(expectedValue));
}
template <typename Atomic>
inline bool futexWait(Atomic &futex, typename Atomic::Type expectedValue, qint64 nstimeout)
@ -103,16 +101,21 @@ namespace QtLinuxFutex {
struct timespec ts;
ts.tv_sec = nstimeout / 1000 / 1000 / 1000;
ts.tv_nsec = nstimeout % (1000 * 1000 * 1000);
int r = _q_futex(addr(&futex), FUTEX_WAIT, qintptr(expectedValue), &ts);
int r = _q_futex(addr(&futex), FUTEX_WAIT, qintptr(expectedValue), quintptr(&ts));
return r == 0 || errno != ETIMEDOUT;
}
template <typename Atomic> inline void futexWakeOne(Atomic &futex)
{
_q_futex(addr(&futex), FUTEX_WAKE, 1, nullptr);
_q_futex(addr(&futex), FUTEX_WAKE, 1);
}
template <typename Atomic> inline void futexWakeAll(Atomic &futex)
{
_q_futex(addr(&futex), FUTEX_WAKE, INT_MAX, nullptr);
_q_futex(addr(&futex), FUTEX_WAKE, INT_MAX);
}
template <typename Atomic> inline
void futexWakeOp(Atomic &futex1, int wake1, int wake2, Atomic &futex2, quint32 op)
{
_q_futex(addr(&futex1), FUTEX_WAKE_OP, wake1, wake2, addr(&futex2), op);
}
}
namespace QtFutex = QtLinuxFutex;

View File

@ -1,6 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2016 The Qt Company Ltd.
** Copyright (C) 2017 The Qt Company Ltd.
** Copyright (C) 2017 Intel Corporation.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -41,12 +42,15 @@
#ifndef QT_NO_THREAD
#include "qmutex.h"
#include "qfutex_p.h"
#include "qwaitcondition.h"
#include "qdeadlinetimer.h"
#include "qdatetime.h"
QT_BEGIN_NAMESPACE
using namespace QtFutex;
/*!
\class QSemaphore
\inmodule QtCore
@ -97,6 +101,68 @@ QT_BEGIN_NAMESPACE
\sa QSemaphoreReleaser, QMutex, QWaitCondition, QThread, {Semaphores Example}
*/
/*
QSemaphore futex operation
QSemaphore stores a 32-bit integer with the counter of currently available
tokens (value between 0 and INT_MAX). When a thread attempts to acquire n
tokens and the counter is larger than that, we perform a compare-and-swap
with the new count. If that succeeds, the acquisition worked; if not, we
loop again because the counter changed. If there were not enough tokens,
we'll perform a futex-wait.
Before we do, we set the high bit in the futex to indicate that semaphore
is contended: that is, there's a thread waiting for more tokens. On
release() for n tokens, we perform a fetch-and-add of n and then check if
that high bit was set. If it was, then we clear that bit and perform a
futex-wake on the semaphore to indicate the waiting threads can wake up and
acquire tokens. Which ones get woken up is unspecified.
*/
static const quint32 futexContendedBit = 1U << 31;
static int futexAvailCounter(quint32 v)
{
// the low 31 bits
return int(v) & (futexContendedBit - 1);
}
template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quint32> &u, int n, int timeout)
{
QDeadlineTimer timer(IsTimed ? QDeadlineTimer(timeout) : QDeadlineTimer());
quint32 curValue = u.loadAcquire();
qint64 remainingTime = timeout * Q_INT64_C(1000) * 1000;
forever {
int available = futexAvailCounter(curValue);
if (available >= n) {
// try to acquire
quint32 newValue = curValue - n;
if (u.testAndSetOrdered(curValue, newValue, curValue))
return true; // succeeded!
continue;
}
// not enough tokens available, put us to wait
if (remainingTime == 0)
return false;
// set the contended bit
u.fetchAndOrRelaxed(futexContendedBit);
curValue |= futexContendedBit;
if (IsTimed && remainingTime > 0) {
bool timedout = !futexWait(u, curValue, remainingTime);
if (timedout)
return false;
} else {
futexWait(u, curValue);
}
curValue = u.loadAcquire();
if (IsTimed)
remainingTime = timer.remainingTimeNSecs();
}
}
class QSemaphorePrivate {
public:
inline QSemaphorePrivate(int n) : avail(n) { }
@ -116,6 +182,9 @@ public:
QSemaphore::QSemaphore(int n)
{
Q_ASSERT_X(n >= 0, "QSemaphore", "parameter 'n' must be non-negative");
if (futexAvailable())
u.store(n);
else
d = new QSemaphorePrivate(n);
}
@ -126,7 +195,10 @@ QSemaphore::QSemaphore(int n)
undefined behavior.
*/
QSemaphore::~QSemaphore()
{ delete d; }
{
if (!futexAvailable())
delete d;
}
/*!
Tries to acquire \c n resources guarded by the semaphore. If \a n
@ -138,6 +210,12 @@ QSemaphore::~QSemaphore()
void QSemaphore::acquire(int n)
{
Q_ASSERT_X(n >= 0, "QSemaphore::acquire", "parameter 'n' must be non-negative");
if (futexAvailable()) {
futexSemaphoreTryAcquire<false>(u, n, -1);
return;
}
QMutexLocker locker(&d->mutex);
while (n > d->avail)
d->cond.wait(locker.mutex());
@ -160,6 +238,42 @@ void QSemaphore::acquire(int n)
void QSemaphore::release(int n)
{
Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative");
if (futexAvailable()) {
quint32 prevValue = u.fetchAndAddRelease(n);
if (prevValue & futexContendedBit) {
#ifdef FUTEX_OP
/*
We'll ask the kernel to wake up and clear the bit for us.
atomic {
int oldval = u;
u = oldval & ~(1 << 31);
futexWake(u, INT_MAX);
if (oldval == 0) // impossible condition
futexWake(u, INT_MAX);
}
*/
quint32 op = FUTEX_OP_ANDN | FUTEX_OP_OPARG_SHIFT;
quint32 oparg = 31;
quint32 cmp = FUTEX_OP_CMP_EQ;
quint32 cmparg = 0;
futexWakeOp(u, INT_MAX, INT_MAX, u, FUTEX_OP(op, oparg, cmp, cmparg));
#else
// Unset the bit and wake everyone. There are two possibibilies
// under which a thread can set the bit between the AND and the
// futexWake:
// 1) it did see the new counter value, but it wasn't enough for
// its acquisition anyway, so it has to wait;
// 2) it did not see the new counter value, in which case its
// futexWait will fail.
u.fetchAndAndRelease(futexContendedBit - 1);
futexWakeAll(u);
#endif
}
return;
}
QMutexLocker locker(&d->mutex);
d->avail += n;
d->cond.wakeAll();
@ -173,6 +287,9 @@ void QSemaphore::release(int n)
*/
int QSemaphore::available() const
{
if (futexAvailable())
return futexAvailCounter(u.load());
QMutexLocker locker(&d->mutex);
return d->avail;
}
@ -191,6 +308,10 @@ int QSemaphore::available() const
bool QSemaphore::tryAcquire(int n)
{
Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative");
if (futexAvailable())
return futexSemaphoreTryAcquire<false>(u, n, 0);
QMutexLocker locker(&d->mutex);
if (n > d->avail)
return false;
@ -217,8 +338,8 @@ bool QSemaphore::tryAcquire(int n)
bool QSemaphore::tryAcquire(int n, int timeout)
{
Q_ASSERT_X(n >= 0, "QSemaphore::tryAcquire", "parameter 'n' must be non-negative");
if (timeout < 0)
return tryAcquire(n);
if (futexAvailable())
return futexSemaphoreTryAcquire<true>(u, n, timeout < 0 ? -1 : timeout);
QDeadlineTimer timer(timeout);
QMutexLocker locker(&d->mutex);

View File

@ -66,7 +66,10 @@ public:
private:
Q_DISABLE_COPY(QSemaphore)
union {
QSemaphorePrivate *d;
QBasicAtomicInteger<quint32> u;
};
};
class QSemaphoreReleaser