QThreadPool: fix race at time of thread expiry.
The current synchronization mechanism was racy: decrementing waitingThreads and then hoping that the wakeOne will wake a thread before its expiry timeout happens. In other words, on timeout, a just-assigned task would never run. And then no other task would run, if maxThreadCount is reached. Fixed by using a queue of waiting threads (rather than just a count), and by moving the wait condition into the thread itself, so we know precisely which one we're waking up, and we can remove it from the set of waiting threads before waking it up, and therefore it can determine on wakeup whether it has work to do (caller removed it from the queue) or it expired (it's still in the queue). This is reliable, whereas the return value from QWaitCondition::wait isn't reliable, when the main thread has already decided that this thread has work to do. Task-number: QTBUG-3786 Change-Id: I1eac5d6c309daed7f483ac7a8074297bfda6ee32 Reviewed-by: Thiago Macieira <thiago.macieira@intel.com>
This commit is contained in:
parent
94fd108ea4
commit
a9b6a78e54
@ -61,6 +61,7 @@ public:
|
||||
void run();
|
||||
void registerThreadInactive();
|
||||
|
||||
QWaitCondition runnableReady;
|
||||
QThreadPoolPrivate *manager;
|
||||
QRunnable *runnable;
|
||||
};
|
||||
@ -128,14 +129,13 @@ void QThreadPoolThread::run()
|
||||
// if too many threads are active, expire this thread
|
||||
bool expired = manager->tooManyThreadsActive();
|
||||
if (!expired) {
|
||||
++manager->waitingThreads;
|
||||
manager->waitingThreads.enqueue(this);
|
||||
registerThreadInactive();
|
||||
// wait for work, exiting after the expiry timeout is reached
|
||||
expired = !manager->runnableReady.wait(locker.mutex(), manager->expiryTimeout);
|
||||
runnableReady.wait(locker.mutex(), manager->expiryTimeout);
|
||||
++manager->activeThreads;
|
||||
|
||||
if (expired)
|
||||
--manager->waitingThreads;
|
||||
if (manager->waitingThreads.removeOne(this))
|
||||
expired = true;
|
||||
}
|
||||
if (expired) {
|
||||
manager->expiredThreads.enqueue(this);
|
||||
@ -160,7 +160,6 @@ QThreadPoolPrivate:: QThreadPoolPrivate()
|
||||
expiryTimeout(30000),
|
||||
maxThreadCount(qAbs(QThread::idealThreadCount())),
|
||||
reservedThreads(0),
|
||||
waitingThreads(0),
|
||||
activeThreads(0)
|
||||
{ }
|
||||
|
||||
@ -176,11 +175,10 @@ bool QThreadPoolPrivate::tryStart(QRunnable *task)
|
||||
if (activeThreadCount() >= maxThreadCount)
|
||||
return false;
|
||||
|
||||
if (waitingThreads > 0) {
|
||||
if (waitingThreads.count() > 0) {
|
||||
// recycle an available thread
|
||||
--waitingThreads;
|
||||
enqueueTask(task);
|
||||
runnableReady.wakeOne();
|
||||
waitingThreads.takeFirst()->runnableReady.wakeOne();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -225,7 +223,7 @@ int QThreadPoolPrivate::activeThreadCount() const
|
||||
{
|
||||
return (allThreads.count()
|
||||
- expiredThreads.count()
|
||||
- waitingThreads
|
||||
- waitingThreads.count()
|
||||
+ reservedThreads);
|
||||
}
|
||||
|
||||
@ -266,7 +264,6 @@ void QThreadPoolPrivate::reset()
|
||||
{
|
||||
QMutexLocker locker(&mutex);
|
||||
isExiting = true;
|
||||
runnableReady.wakeAll();
|
||||
|
||||
while (!allThreads.empty()) {
|
||||
// move the contents of the set out so that we can iterate without the lock
|
||||
@ -275,6 +272,7 @@ void QThreadPoolPrivate::reset()
|
||||
locker.unlock();
|
||||
|
||||
foreach (QThreadPoolThread *thread, allThreadsCopy) {
|
||||
thread->runnableReady.wakeAll();
|
||||
thread->wait();
|
||||
delete thread;
|
||||
}
|
||||
@ -283,7 +281,7 @@ void QThreadPoolPrivate::reset()
|
||||
// repeat until all newly arrived threads have also completed
|
||||
}
|
||||
|
||||
waitingThreads = 0;
|
||||
waitingThreads.clear();
|
||||
expiredThreads.clear();
|
||||
|
||||
isExiting = false;
|
||||
@ -459,10 +457,8 @@ void QThreadPool::start(QRunnable *runnable, int priority)
|
||||
if (!d->tryStart(runnable)) {
|
||||
d->enqueueTask(runnable, priority);
|
||||
|
||||
if (d->waitingThreads > 0) {
|
||||
--d->waitingThreads;
|
||||
d->runnableReady.wakeOne();
|
||||
}
|
||||
if (!d->waitingThreads.isEmpty())
|
||||
d->waitingThreads.takeFirst()->runnableReady.wakeOne();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,8 +87,8 @@ public:
|
||||
void stealRunnable(QRunnable *);
|
||||
|
||||
mutable QMutex mutex;
|
||||
QWaitCondition runnableReady;
|
||||
QSet<QThreadPoolThread *> allThreads;
|
||||
QQueue<QThreadPoolThread *> waitingThreads;
|
||||
QQueue<QThreadPoolThread *> expiredThreads;
|
||||
QList<QPair<QRunnable *, int> > queue;
|
||||
QWaitCondition noActiveThreads;
|
||||
@ -97,7 +97,6 @@ public:
|
||||
int expiryTimeout;
|
||||
int maxThreadCount;
|
||||
int reservedThreads;
|
||||
int waitingThreads;
|
||||
int activeThreads;
|
||||
};
|
||||
|
||||
|
@ -80,6 +80,7 @@ private slots:
|
||||
void destruction();
|
||||
void threadRecycling();
|
||||
void expiryTimeout();
|
||||
void expiryTimeoutRace();
|
||||
#ifndef QT_NO_EXCEPTIONS
|
||||
void exceptions();
|
||||
#endif
|
||||
@ -315,7 +316,7 @@ class ExpiryTimeoutTask : public QRunnable
|
||||
{
|
||||
public:
|
||||
QThread *thread;
|
||||
int runCount;
|
||||
QAtomicInt runCount;
|
||||
QSemaphore semaphore;
|
||||
|
||||
ExpiryTimeoutTask()
|
||||
@ -327,7 +328,7 @@ public:
|
||||
void run()
|
||||
{
|
||||
thread = QThread::currentThread();
|
||||
++runCount;
|
||||
runCount.ref();
|
||||
semaphore.release();
|
||||
}
|
||||
};
|
||||
@ -346,7 +347,7 @@ void tst_QThreadPool::expiryTimeout()
|
||||
// run the task
|
||||
threadPool.start(&task);
|
||||
QVERIFY(task.semaphore.tryAcquire(1, 10000));
|
||||
QCOMPARE(task.runCount, 1);
|
||||
QCOMPARE(task.runCount.load(), 1);
|
||||
QVERIFY(!task.thread->wait(100));
|
||||
// thread should expire
|
||||
QThread *firstThread = task.thread;
|
||||
@ -355,7 +356,7 @@ void tst_QThreadPool::expiryTimeout()
|
||||
// run task again, thread should be restarted
|
||||
threadPool.start(&task);
|
||||
QVERIFY(task.semaphore.tryAcquire(1, 10000));
|
||||
QCOMPARE(task.runCount, 2);
|
||||
QCOMPARE(task.runCount.load(), 2);
|
||||
QVERIFY(!task.thread->wait(100));
|
||||
// thread should expire again
|
||||
QVERIFY(task.thread->wait(10000));
|
||||
@ -368,6 +369,22 @@ void tst_QThreadPool::expiryTimeout()
|
||||
QCOMPARE(threadPool.expiryTimeout(), expiryTimeout);
|
||||
}
|
||||
|
||||
void tst_QThreadPool::expiryTimeoutRace() // QTBUG-3786
|
||||
{
|
||||
ExpiryTimeoutTask task;
|
||||
|
||||
QThreadPool threadPool;
|
||||
threadPool.setMaxThreadCount(1);
|
||||
threadPool.setExpiryTimeout(50);
|
||||
const int numTasks = 20;
|
||||
for (int i = 0; i < numTasks; ++i) {
|
||||
threadPool.start(&task);
|
||||
QThread::msleep(50); // exactly the same as the expiry timeout
|
||||
}
|
||||
QCOMPARE(task.runCount.load(), numTasks);
|
||||
QVERIFY(threadPool.waitForDone(2000));
|
||||
}
|
||||
|
||||
#ifndef QT_NO_EXCEPTIONS
|
||||
class ExceptionTask : public QRunnable
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user