QSemaphore: fix deadlock when the woken up thread wakes up another

When the thread that got woken up by release() is supposed to release()
to wake up another thread, we were deadlocking. This happened because we
cleared the bit indicating that there was contention when the first
release(). Instead of storing a single bit, we now store the number of
threads waiting.

Task-number: QTBUG-66875
Change-Id: I72f5230ad59948f784eafffd15193873502ecba4
Reviewed-by: Lars Knoll <lars.knoll@qt.io>
This commit is contained in:
Thiago Macieira 2018-03-05 19:53:37 -08:00 committed by Aapo Keskimolo
parent 9fa2626ef8
commit 081c001deb
2 changed files with 157 additions and 43 deletions

View File

@ -1,7 +1,7 @@
/****************************************************************************
**
** Copyright (C) 2017 The Qt Company Ltd.
** Copyright (C) 2017 Intel Corporation.
** Copyright (C) 2018 Intel Corporation.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -119,28 +119,37 @@ using namespace QtFutex;
acquire tokens. Which ones get woken up is unspecified.
If the system has the ability to wake up a precise number of threads, has
Linux's FUTEX_WAKE_OP functionality, and is 64-bit, we'll use the high word
as a copy of the low word, but the sign bit indicating the presence of a
thread waiting for multiple tokens. So when releasing n tokens on those
systems, we tell the kernel to wake up n single-token threads and all of
the multi-token ones, then clear that wait bit. Which threads get woken up
is unspecified, but it's likely single-token threads will get woken up
first.
Linux's FUTEX_WAKE_OP functionality, and is 64-bit, instead of using a
single bit indicating a contended semaphore, we'll store the total number
of waiters in the high word. Additionally, all multi-token waiters will be
waiting on that high word. So when releasing n tokens on those systems, we
tell the kernel to wake up n single-token threads and all of the
multi-token ones. Which threads get woken up is unspecified, but it's
likely single-token threads will get woken up first.
*/
static const quint32 futexContendedBit = 1U << 31;
#if defined(FUTEX_OP) && QT_POINTER_SIZE > 4
static Q_CONSTEXPR bool futexHasWaiterCount = true;
#else
static Q_CONSTEXPR bool futexHasWaiterCount = false;
#endif
static const quintptr futexNeedsWakeAllBit =
Q_UINT64_C(1) << (sizeof(quintptr) * CHAR_BIT - 1);
static int futexAvailCounter(quintptr v)
{
// the low 31 bits
return int(v & (futexContendedBit - 1));
return int(v & 0x7fffffffU);
}
static quintptr futexCounterParcel(int n)
static bool futexNeedsWake(quintptr v)
{
// replicate the 31 bits if we're on 64-bit
quint64 nn = quint32(n);
nn |= (nn << 32);
return quintptr(nn);
// If we're counting waiters, the number of waiters is stored in the low 31
// bits of the high word (that is, bits 32-62). If we're not, then we use
// bit 31 to indicate anyone is waiting. Either way, if any bit 31 or above
// is set, there are waiters.
return v >> 31;
}
static QBasicAtomicInteger<quint32> *futexLow32(QBasicAtomicInteger<quintptr> *ptr)
@ -152,9 +161,6 @@ static QBasicAtomicInteger<quint32> *futexLow32(QBasicAtomicInteger<quintptr> *p
return result;
}
#ifdef FUTEX_OP
// quintptr might be 32bit, in which case we want this to be 0, without implicitly casting.
static const quintptr futexMultiWaiterBit = static_cast<quintptr>(Q_UINT64_C(1) << 63);
static QBasicAtomicInteger<quint32> *futexHigh32(QBasicAtomicInteger<quintptr> *ptr)
{
auto result = reinterpret_cast<QBasicAtomicInteger<quint32> *>(ptr);
@ -163,18 +169,21 @@ static QBasicAtomicInteger<quint32> *futexHigh32(QBasicAtomicInteger<quintptr> *
#endif
return result;
}
#endif
template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintptr> &u, int n, int timeout)
template <bool IsTimed> bool
futexSemaphoreTryAcquire_loop(QBasicAtomicInteger<quintptr> &u, quintptr curValue, quintptr nn, int timeout)
{
QDeadlineTimer timer(IsTimed ? QDeadlineTimer(timeout) : QDeadlineTimer());
quintptr curValue = u.loadAcquire();
qint64 remainingTime = timeout * Q_INT64_C(1000) * 1000;
int n = int(unsigned(nn));
// we're called after one testAndSet, so start by waiting first
goto start_wait;
forever {
int available = futexAvailCounter(curValue);
if (available >= n) {
if (futexAvailCounter(curValue) >= n) {
// try to acquire
quintptr newValue = curValue - futexCounterParcel(n);
quintptr newValue = curValue - nn;
if (u.testAndSetOrdered(curValue, newValue, curValue))
return true; // succeeded!
continue;
@ -184,19 +193,18 @@ template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintp
if (remainingTime == 0)
return false;
// set the contended and multi-wait bits
quintptr bitsToSet = futexContendedBit;
// indicate we're waiting
start_wait:
auto ptr = futexLow32(&u);
#ifdef FUTEX_OP
if (n > 1 && sizeof(curValue) >= sizeof(int)) {
bitsToSet |= futexMultiWaiterBit;
ptr = futexHigh32(&u);
if (n > 1 || !futexHasWaiterCount) {
u.fetchAndOrRelaxed(futexNeedsWakeAllBit);
curValue |= futexNeedsWakeAllBit;
if (n > 1 && futexHasWaiterCount) {
ptr = futexHigh32(&u);
//curValue >>= 32; // but this is UB in 32-bit, so roundabout:
curValue = quint64(curValue) >> 32;
}
}
#endif
// the value is the same for either branch
u.fetchAndOrRelaxed(bitsToSet);
curValue |= bitsToSet;
if (IsTimed && remainingTime > 0) {
bool timedout = !futexWait(*ptr, curValue, remainingTime);
@ -212,6 +220,47 @@ template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintp
}
}
template <bool IsTimed> bool futexSemaphoreTryAcquire(QBasicAtomicInteger<quintptr> &u, int n, int timeout)
{
// Try to acquire without waiting (we still loop because the testAndSet
// call can fail).
quintptr curValue = u.loadAcquire();
while (futexAvailCounter(curValue) >= n) {
// try to acquire
quintptr newValue = curValue - n;
if (u.testAndSetOrdered(curValue, newValue, curValue))
return true; // succeeded!
}
if (timeout == 0)
return false;
// we need to wait
quintptr valueToSubtract = unsigned(n);
quintptr oneWaiter = quintptr(Q_UINT64_C(1) << 32); // zero on 32-bit
if (futexHasWaiterCount) {
// increase the waiter count
u.fetchAndAddRelaxed(oneWaiter);
// We don't use the fetched value from above so futexWait() fails if
// it changed after the testAndSetOrdered above.
curValue += oneWaiter;
// Also adjust valueToSubtract to subtract oneWaiter when we succeed in
// acquiring.
valueToSubtract += oneWaiter;
}
if (futexSemaphoreTryAcquire_loop<IsTimed>(u, curValue, valueToSubtract, timeout))
return true;
if (futexHasWaiterCount) {
// decrement the number of threads waiting
Q_ASSERT(futexHigh32(&u)->load() & 0x7fffffffU);
u.fetchAndSubRelaxed(oneWaiter);
}
return false;
}
class QSemaphorePrivate {
public:
inline QSemaphorePrivate(int n) : avail(n) { }
@ -289,10 +338,10 @@ void QSemaphore::release(int n)
Q_ASSERT_X(n >= 0, "QSemaphore::release", "parameter 'n' must be non-negative");
if (futexAvailable()) {
quintptr prevValue = u.fetchAndAddRelease(futexCounterParcel(n));
if (prevValue & futexContendedBit) {
quintptr prevValue = u.fetchAndAddRelease(n);
if (futexNeedsWake(prevValue)) {
#ifdef FUTEX_OP
if (sizeof(u) == sizeof(int)) {
if (!futexHasWaiterCount) {
/*
On 32-bit systems, all waiters are waiting on the same address,
so we'll wake them all and ask the kernel to clear the high bit.
@ -317,9 +366,6 @@ void QSemaphore::release(int n)
the kernel to wake up n single-token waiters and all multi-token
waiters (if any), then clear the multi-token wait bit.
That means we must clear the contention bit ourselves. See
below for handling the race.
atomic {
int oldval = *upper;
*upper = oldval & ~(1 << 31);
@ -332,7 +378,6 @@ void QSemaphore::release(int n)
quint32 oparg = 31;
quint32 cmp = FUTEX_OP_CMP_LT;
quint32 cmparg = 0;
futexLow32(&u)->fetchAndAndRelease(futexContendedBit - 1);
futexWakeOp(*futexLow32(&u), n, INT_MAX, *futexHigh32(&u), FUTEX_OP(op, oparg, cmp, cmparg));
}
#else
@ -343,7 +388,7 @@ void QSemaphore::release(int n)
// its acquisition anyway, so it has to wait;
// 2) it did not see the new counter value, in which case its
// futexWait will fail.
u.fetchAndAndRelease(futexContendedBit - 1);
u.fetchAndAndRelease(futexNeedsWakeAllBit - 1);
futexWakeAll(u);
#endif
}

View File

@ -37,6 +37,8 @@ class tst_QSemaphore : public QObject
Q_OBJECT
private slots:
void acquire();
void multiRelease();
void multiAcquireRelease();
void tryAcquire();
void tryAcquireWithTimeout_data();
void tryAcquireWithTimeout();
@ -149,6 +151,73 @@ void tst_QSemaphore::acquire()
QCOMPARE(semaphore.available(), 0);
}
void tst_QSemaphore::multiRelease()
{
class Thread : public QThread
{
public:
QSemaphore &sem;
Thread(QSemaphore &sem) : sem(sem) {}
void run() override
{
sem.acquire();
}
};
QSemaphore sem;
QVector<Thread *> threads;
threads.resize(4);
for (Thread *&t : threads)
t = new Thread(sem);
for (Thread *&t : threads)
t->start();
// wait for all threads to reach the sem.acquire() and then
// release them all
QTest::qSleep(1);
sem.release(threads.size());
for (Thread *&t : threads)
t->wait();
qDeleteAll(threads);
}
void tst_QSemaphore::multiAcquireRelease()
{
class Thread : public QThread
{
public:
QSemaphore &sem;
Thread(QSemaphore &sem) : sem(sem) {}
void run() override
{
sem.acquire();
sem.release();
}
};
QSemaphore sem;
QVector<Thread *> threads;
threads.resize(4);
for (Thread *&t : threads)
t = new Thread(sem);
for (Thread *&t : threads)
t->start();
// wait for all threads to reach the sem.acquire() and then
// release them all
QTest::qSleep(1);
sem.release();
for (Thread *&t : threads)
t->wait();
qDeleteAll(threads);
}
void tst_QSemaphore::tryAcquire()
{
QSemaphore semaphore;