Cleanup of qthreadpool

Don't bother overwaiting in waitForDone(), if it was done at one point
after it was called we can return true. And do not stop threads recently
awakened by a startThread call as they have tasks to do.

Make allowing at least one thread regardless of reservation more
standard instead of hacked in certain places.

Pick-to: 6.2
Change-Id: I304bcdc5822f440d5e72fc33ba2aa1678c9ba0d0
Reviewed-by: David Faure <david.faure@kdab.com>
This commit is contained in:
Allan Sandfeld Jensen 2021-04-22 14:33:17 +02:00
parent 2fea6bbe8e
commit 9a6c653eaf
3 changed files with 110 additions and 47 deletions

View File

@ -112,14 +112,13 @@ void QThreadPoolThread::run()
locker.relock();
}
// if too many threads are active, expire this thread
// if too many threads are active, stop working in this one
if (manager->tooManyThreadsActive())
break;
if (manager->queue.isEmpty()) {
r = nullptr;
// all work is done, time to wait for more
if (manager->queue.isEmpty())
break;
}
QueuePage *page = manager->queue.first();
r = page->pop();
@ -130,26 +129,32 @@ void QThreadPoolThread::run()
}
} while (true);
// if too many threads are active, expire this thread
bool expired = manager->tooManyThreadsActive();
if (!expired) {
manager->waitingThreads.enqueue(this);
// this thread is about to be deleted, do not wait or expire
if (!manager->allThreads.contains(this)) {
registerThreadInactive();
// wait for work, exiting after the expiry timeout is reached
runnableReady.wait(locker.mutex(), QDeadlineTimer(manager->expiryTimeout));
++manager->activeThreads;
if (manager->waitingThreads.removeOne(this))
expired = true;
if (!manager->allThreads.contains(this)) {
registerThreadInactive();
break;
}
return;
}
if (expired) {
// if too many threads are active, expire this thread
if (manager->tooManyThreadsActive()) {
manager->expiredThreads.enqueue(this);
registerThreadInactive();
break;
return;
}
manager->waitingThreads.enqueue(this);
registerThreadInactive();
// wait for work, exiting after the expiry timeout is reached
runnableReady.wait(locker.mutex(), QDeadlineTimer(manager->expiryTimeout));
// this thread is about to be deleted, do not work or expire
if (!manager->allThreads.contains(this)) {
Q_ASSERT(manager->queue.isEmpty());
return;
}
if (manager->waitingThreads.removeOne(this)) {
manager->expiredThreads.enqueue(this);
return;
}
++manager->activeThreads;
}
}
@ -176,10 +181,10 @@ bool QThreadPoolPrivate::tryStart(QRunnable *task)
}
// can't do anything if we're over the limit
if (activeThreadCount() >= maxThreadCount())
if (areAllThreadsActive())
return false;
if (waitingThreads.count() > 0) {
if (!waitingThreads.isEmpty()) {
// recycle an available thread
enqueueTask(task);
waitingThreads.takeFirst()->runnableReady.wakeOne();
@ -251,6 +256,12 @@ void QThreadPoolPrivate::tryToStartMoreThreads()
}
}
bool QThreadPoolPrivate::areAllThreadsActive() const
{
const int activeThreadCount = this->activeThreadCount();
return activeThreadCount >= maxThreadCount() && (activeThreadCount - reservedThreads) >= 1;
}
bool QThreadPoolPrivate::tooManyThreadsActive() const
{
const int activeThreadCount = this->activeThreadCount();
@ -283,18 +294,20 @@ void QThreadPoolPrivate::startThread(QRunnable *runnable)
\internal
Helper function only to be called from waitForDone(int)
Deletes all current threads.
*/
void QThreadPoolPrivate::reset()
{
// move the contents of the set out so that we can iterate without the lock
QSet<QThreadPoolThread *> allThreadsCopy;
allThreadsCopy.swap(allThreads);
auto allThreadsCopy = std::exchange(allThreads, {});
expiredThreads.clear();
waitingThreads.clear();
mutex.unlock();
for (QThreadPoolThread *thread : qAsConst(allThreadsCopy)) {
if (!thread->isFinished()) {
if (thread->isRunning()) {
thread->runnableReady.wakeAll();
thread->wait();
}
@ -321,15 +334,13 @@ bool QThreadPoolPrivate::waitForDone(int msecs)
{
QMutexLocker locker(&mutex);
QDeadlineTimer timer(msecs);
do {
if (!waitForDone(timer))
return false;
reset();
// More threads can be started during reset(), in that case continue
// waiting if we still have time left.
} while ((!queue.isEmpty() || activeThreads) && !timer.hasExpired());
return queue.isEmpty() && activeThreads == 0;
if (!waitForDone(timer))
return false;
reset();
// New jobs might have started during reset, but return anyway
// as the active thread and task count did reach 0 once, and
// race conditions are outside our scope.
return true;
}
void QThreadPoolPrivate::clear()
@ -473,7 +484,10 @@ QThreadPool::QThreadPool(QObject *parent)
*/
QThreadPool::~QThreadPool()
{
Q_D(QThreadPool);
waitForDone();
Q_ASSERT(d->queue.isEmpty());
Q_ASSERT(d->allThreads.isEmpty());
}
/*!
@ -513,12 +527,8 @@ void QThreadPool::start(QRunnable *runnable, int priority)
Q_D(QThreadPool);
QMutexLocker locker(&d->mutex);
if (!d->tryStart(runnable)) {
if (!d->tryStart(runnable))
d->enqueueTask(runnable, priority);
if (!d->waitingThreads.isEmpty())
d->waitingThreads.takeFirst()->runnableReady.wakeOne();
}
}
/*!
@ -582,7 +592,7 @@ bool QThreadPool::tryStart(std::function<void()> functionToRun)
Q_D(QThreadPool);
QMutexLocker locker(&d->mutex);
if (!d->allThreads.isEmpty() && d->activeThreadCount() >= d->maxThreadCount())
if (!d->allThreads.isEmpty() && d->areAllThreadsActive())
return false;
QRunnable *runnable = QRunnable::create(std::move(functionToRun));
@ -674,7 +684,10 @@ int QThreadPool::activeThreadCount() const
Once you are done with the thread, call releaseThread() to allow it to be
reused.
\note This function will always increase the number of active threads.
\note Even if reserving maxThreadCount() threads or more, the thread pool
will still allow a minimum of one thread.
\note This function will increase the reported number of active threads.
This means that by using this function, it is possible for
activeThreadCount() to return a value greater than maxThreadCount() .

View File

@ -157,6 +157,7 @@ public:
int activeThreadCount() const;
void tryToStartMoreThreads();
bool areAllThreadsActive() const;
bool tooManyThreadsActive() const;
int maxThreadCount() const

View File

@ -89,6 +89,7 @@ private slots:
void releaseThread_data();
void releaseThread();
void reserveAndStart();
void releaseAndBlock();
void start();
void tryStart();
void tryStartPeakThreadCount();
@ -713,21 +714,19 @@ void tst_QThreadPool::reserveAndStart() // QTBUG-21051
threadpool->reserveThread();
QCOMPARE(threadpool->activeThreadCount(), 1);
// start a task, to get a running thread
// start a task, to get a running thread, works since one thread is always allowed
WaitingTask task;
threadpool->start(&task);
QCOMPARE(threadpool->activeThreadCount(), 2);
// tryStart() will fail since activeThreadCount() >= maxThreadCount() and one thread is already running
QVERIFY(!threadpool->tryStart(&task));
QTRY_COMPARE(threadpool->activeThreadCount(), 2);
task.waitForStarted.acquire();
task.waitBeforeDone.release();
QTRY_COMPARE(task.count.loadRelaxed(), 1);
QTRY_COMPARE(threadpool->activeThreadCount(), 1);
// now the thread is waiting, but tryStart() will fail since activeThreadCount() >= maxThreadCount()
QVERIFY(!threadpool->tryStart(&task));
QTRY_COMPARE(threadpool->activeThreadCount(), 1);
// start() will therefore do a failing tryStart(), followed by enqueueTask()
// which will actually wake up the waiting thread.
// start() will wake up the waiting thread.
threadpool->start(&task);
QTRY_COMPARE(threadpool->activeThreadCount(), 2);
task.waitForStarted.acquire();
@ -741,6 +740,54 @@ void tst_QThreadPool::reserveAndStart() // QTBUG-21051
threadpool->setMaxThreadCount(savedLimit);
}
void tst_QThreadPool::releaseAndBlock()
{
class WaitingTask : public QRunnable
{
public:
QSemaphore waitBeforeDone;
WaitingTask() { setAutoDelete(false); }
void run() override
{
waitBeforeDone.acquire();
}
};
// Set up
QThreadPool *threadpool = QThreadPool::globalInstance();
const int savedLimit = threadpool->maxThreadCount();
threadpool->setMaxThreadCount(1);
QCOMPARE(threadpool->activeThreadCount(), 0);
// start a task, to get a running thread, works since one thread is always allowed
WaitingTask task1, task2;
threadpool->start(&task1);
QCOMPARE(threadpool->activeThreadCount(), 1);
// tryStart() will fail since activeThreadCount() >= maxThreadCount() and one thread is already running
QVERIFY(!threadpool->tryStart(&task2));
QCOMPARE(threadpool->activeThreadCount(), 1);
// Use release without reserve to account for the blocking thread.
threadpool->releaseThread();
QTRY_COMPARE(threadpool->activeThreadCount(), 0);
// Now we can start task2
QVERIFY(threadpool->tryStart(&task2));
QCOMPARE(threadpool->activeThreadCount(), 1);
task2.waitBeforeDone.release();
QTRY_COMPARE(threadpool->activeThreadCount(), 0);
threadpool->reserveThread();
QCOMPARE(threadpool->activeThreadCount(), 1);
task1.waitBeforeDone.release();
QTRY_COMPARE(threadpool->activeThreadCount(), 0);
threadpool->setMaxThreadCount(savedLimit);
}
static QAtomicInt count;
class CountingRunnable : public QRunnable
{
@ -917,6 +964,7 @@ void tst_QThreadPool::waitForDone()
{
QElapsedTimer total, pass;
total.start();
pass.start();
QThreadPool threadPool;
while (total.elapsed() < 10000) {
@ -1101,6 +1149,7 @@ void tst_QThreadPool::destroyingWaitsForTasksToFinish()
{
QElapsedTimer total, pass;
total.start();
pass.start();
while (total.elapsed() < 10000) {
int runs;