Improve performance in QThreadPool

When many runnables are executed, this improves the
performance by not resizing the queue for each runnable,
which was the case in the previous version, because of
many calls to QVector::takeFirst().

Also add a test that makes sure tryTake() is safe to
call and does not leave the queue in a bad state that
tries to use nullptr entries.

Change-Id: I608134ecfa9cfc03db4878dcbd6f9c1107e13e90
Reviewed-by: Lars Knoll <lars.knoll@qt.io>
This commit is contained in:
Svenn-Arne Dragly 2017-09-05 15:02:20 +02:00
parent d7c57fa68e
commit ba423261cd
3 changed files with 204 additions and 31 deletions

View File

@ -73,7 +73,7 @@ public:
\internal \internal
*/ */
QThreadPoolThread::QThreadPoolThread(QThreadPoolPrivate *manager) QThreadPoolThread::QThreadPoolThread(QThreadPoolPrivate *manager)
:manager(manager), runnable(0) :manager(manager), runnable(nullptr)
{ } { }
/* /*
@ -84,7 +84,7 @@ void QThreadPoolThread::run()
QMutexLocker locker(&manager->mutex); QMutexLocker locker(&manager->mutex);
for(;;) { for(;;) {
QRunnable *r = runnable; QRunnable *r = runnable;
runnable = 0; runnable = nullptr;
do { do {
if (r) { if (r) {
@ -116,8 +116,19 @@ void QThreadPoolThread::run()
if (manager->tooManyThreadsActive()) if (manager->tooManyThreadsActive())
break; break;
r = !manager->queue.isEmpty() ? manager->queue.takeFirst().first : 0; if (manager->queue.isEmpty()) {
} while (r != 0); r = nullptr;
break;
}
QueuePage *page = manager->queue.first();
r = page->pop();
if (page->isFinished()) {
manager->queue.removeFirst();
delete page;
}
} while (true);
if (manager->isExiting) { if (manager->isExiting) {
registerThreadInactive(); registerThreadInactive();
@ -163,6 +174,7 @@ QThreadPoolPrivate:: QThreadPoolPrivate()
bool QThreadPoolPrivate::tryStart(QRunnable *task) bool QThreadPoolPrivate::tryStart(QRunnable *task)
{ {
Q_ASSERT(task != nullptr);
if (allThreads.isEmpty()) { if (allThreads.isEmpty()) {
// always create at least one thread // always create at least one thread
startThread(task); startThread(task);
@ -183,7 +195,7 @@ bool QThreadPoolPrivate::tryStart(QRunnable *task)
if (!expiredThreads.isEmpty()) { if (!expiredThreads.isEmpty()) {
// restart an expired thread // restart an expired thread
QThreadPoolThread *thread = expiredThreads.dequeue(); QThreadPoolThread *thread = expiredThreads.dequeue();
Q_ASSERT(thread->runnable == 0); Q_ASSERT(thread->runnable == nullptr);
++activeThreads; ++activeThreads;
@ -199,22 +211,25 @@ bool QThreadPoolPrivate::tryStart(QRunnable *task)
return true; return true;
} }
inline bool operator<(int priority, const QPair<QRunnable *, int> &p) inline bool comparePriority(int priority, const QueuePage *p)
{ return p.second < priority; } {
inline bool operator<(const QPair<QRunnable *, int> &p, int priority) return p->priority() < priority;
{ return priority < p.second; } }
void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority) void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority)
{ {
Q_ASSERT(runnable != nullptr);
if (runnable->autoDelete()) if (runnable->autoDelete())
++runnable->ref; ++runnable->ref;
// put it on the queue for (QueuePage *page : qAsConst(queue)) {
QVector<QPair<QRunnable *, int> >::const_iterator begin = queue.constBegin(); if (page->priority() == priority && !page->isFull()) {
QVector<QPair<QRunnable *, int> >::const_iterator it = queue.constEnd(); page->push(runnable);
if (it != begin && priority > (*(it - 1)).second) return;
it = std::upper_bound(begin, --it, priority); }
queue.insert(it - begin, qMakePair(runnable, priority)); }
auto it = std::upper_bound(queue.constBegin(), queue.constEnd(), priority, comparePriority);
queue.insert(std::distance(queue.constBegin(), it), new QueuePage(runnable, priority));
} }
int QThreadPoolPrivate::activeThreadCount() const int QThreadPoolPrivate::activeThreadCount() const
@ -228,8 +243,18 @@ int QThreadPoolPrivate::activeThreadCount() const
void QThreadPoolPrivate::tryToStartMoreThreads() void QThreadPoolPrivate::tryToStartMoreThreads()
{ {
// try to push tasks on the queue to any available threads // try to push tasks on the queue to any available threads
while (!queue.isEmpty() && tryStart(queue.constFirst().first)) while (!queue.isEmpty()) {
QueuePage *page = queue.first();
if (!tryStart(page->first()))
break;
page->pop();
if (page->isFinished()) {
queue.removeFirst(); queue.removeFirst();
delete page;
}
}
} }
bool QThreadPoolPrivate::tooManyThreadsActive() const bool QThreadPoolPrivate::tooManyThreadsActive() const
@ -243,6 +268,7 @@ bool QThreadPoolPrivate::tooManyThreadsActive() const
*/ */
void QThreadPoolPrivate::startThread(QRunnable *runnable) void QThreadPoolPrivate::startThread(QRunnable *runnable)
{ {
Q_ASSERT(runnable != nullptr);
QScopedPointer <QThreadPoolThread> thread(new QThreadPoolThread(this)); QScopedPointer <QThreadPoolThread> thread(new QThreadPoolThread(this));
thread->setObjectName(QLatin1String("Thread (pooled)")); thread->setObjectName(QLatin1String("Thread (pooled)"));
Q_ASSERT(!allThreads.contains(thread.data())); // if this assert hits, we have an ABA problem (deleted threads don't get removed here) Q_ASSERT(!allThreads.contains(thread.data())); // if this assert hits, we have an ABA problem (deleted threads don't get removed here)
@ -306,12 +332,14 @@ bool QThreadPoolPrivate::waitForDone(int msecs)
void QThreadPoolPrivate::clear() void QThreadPoolPrivate::clear()
{ {
QMutexLocker locker(&mutex); QMutexLocker locker(&mutex);
for (QVector<QPair<QRunnable *, int> >::const_iterator it = queue.constBegin(); for (QueuePage *page : qAsConst(queue)) {
it != queue.constEnd(); ++it) { while (!page->isFinished()) {
QRunnable* r = it->first; QRunnable *r = page->pop();
if (r->autoDelete() && !--r->ref) if (r && r->autoDelete() && !--r->ref)
delete r; delete r;
} }
}
qDeleteAll(queue);
queue.clear(); queue.clear();
} }
@ -336,22 +364,21 @@ bool QThreadPool::tryTake(QRunnable *runnable)
{ {
Q_D(QThreadPool); Q_D(QThreadPool);
if (runnable == 0) if (runnable == nullptr)
return false; return false;
{ {
QMutexLocker locker(&d->mutex); QMutexLocker locker(&d->mutex);
auto it = d->queue.begin(); for (QueuePage *page : qAsConst(d->queue)) {
auto end = d->queue.end(); if (page->tryTake(runnable)) {
if (page->isFinished()) {
while (it != end) { d->queue.removeOne(page);
if (it->first == runnable) { delete page;
d->queue.erase(it); }
if (runnable->autoDelete()) if (runnable->autoDelete())
--runnable->ref; // undo ++ref in start() --runnable->ref; // undo ++ref in start()
return true; return true;
} }
++it;
} }
} }

View File

@ -62,6 +62,87 @@
QT_BEGIN_NAMESPACE QT_BEGIN_NAMESPACE
class QueuePage {
public:
enum {
MaxPageSize = 256
};
QueuePage(QRunnable *runnable, int pri)
: m_priority(pri)
{
push(runnable);
}
bool isFull() {
return m_lastIndex >= MaxPageSize - 1;
}
bool isFinished() {
return m_firstIndex > m_lastIndex;
}
void push(QRunnable *runnable) {
Q_ASSERT(runnable != nullptr);
Q_ASSERT(!isFull());
m_lastIndex += 1;
m_entries[m_lastIndex] = runnable;
}
void skipToNextOrEnd() {
while (!isFinished() && m_entries[m_firstIndex] == nullptr) {
m_firstIndex += 1;
}
}
QRunnable *first() {
Q_ASSERT(!isFinished());
QRunnable *runnable = m_entries[m_firstIndex];
Q_ASSERT(runnable);
return runnable;
}
QRunnable *pop() {
Q_ASSERT(!isFinished());
QRunnable *runnable = first();
Q_ASSERT(runnable);
// clear the entry although this should not be necessary
m_entries[m_firstIndex] = nullptr;
m_firstIndex += 1;
// make sure the next runnable returned by first() is not a nullptr
skipToNextOrEnd();
return runnable;
}
bool tryTake(QRunnable *runnable) {
Q_ASSERT(!isFinished());
for (int i = m_firstIndex; i <= m_lastIndex; i++) {
if (m_entries[i] == runnable) {
m_entries[i] = nullptr;
if (i == m_firstIndex) {
// make sure first() does not return a nullptr
skipToNextOrEnd();
}
return true;
}
}
return false;
}
int priority() const {
return m_priority;
}
private:
int m_priority = 0;
int m_firstIndex = 0;
int m_lastIndex = -1;
QRunnable *m_entries[MaxPageSize];
};
class QThreadPoolThread; class QThreadPoolThread;
class Q_CORE_EXPORT QThreadPoolPrivate : public QObjectPrivate class Q_CORE_EXPORT QThreadPoolPrivate : public QObjectPrivate
{ {
@ -83,12 +164,13 @@ public:
bool waitForDone(int msecs); bool waitForDone(int msecs);
void clear(); void clear();
void stealAndRunRunnable(QRunnable *runnable); void stealAndRunRunnable(QRunnable *runnable);
void deletePageIfFinished(QueuePage *page);
mutable QMutex mutex; mutable QMutex mutex;
QList<QThreadPoolThread *> allThreads; QList<QThreadPoolThread *> allThreads;
QQueue<QThreadPoolThread *> waitingThreads; QQueue<QThreadPoolThread *> waitingThreads;
QQueue<QThreadPoolThread *> expiredThreads; QQueue<QThreadPoolThread *> expiredThreads;
QVector<QPair<QRunnable *, int> > queue; QVector<QueuePage*> queue;
QWaitCondition noActiveThreads; QWaitCondition noActiveThreads;
bool isExiting; bool isExiting;

View File

@ -93,6 +93,7 @@ private slots:
void waitForDoneTimeout(); void waitForDoneTimeout();
void destroyingWaitsForTasksToFinish(); void destroyingWaitsForTasksToFinish();
void stressTest(); void stressTest();
void takeAllAndIncreaseMaxThreadCount();
private: private:
QMutex m_functionTestMutex; QMutex m_functionTestMutex;
@ -1199,5 +1200,68 @@ void tst_QThreadPool::stressTest()
} }
} }
void tst_QThreadPool::takeAllAndIncreaseMaxThreadCount() {
class Task : public QRunnable
{
public:
Task(QSemaphore *mainBarrier, QSemaphore *threadBarrier)
: m_mainBarrier(mainBarrier)
, m_threadBarrier(threadBarrier)
{
setAutoDelete(false);
}
void run() {
m_mainBarrier->release();
m_threadBarrier->acquire();
}
private:
QSemaphore *m_mainBarrier;
QSemaphore *m_threadBarrier;
};
QSemaphore mainBarrier;
QSemaphore taskBarrier;
QThreadPool threadPool;
threadPool.setMaxThreadCount(1);
Task *task1 = new Task(&mainBarrier, &taskBarrier);
Task *task2 = new Task(&mainBarrier, &taskBarrier);
Task *task3 = new Task(&mainBarrier, &taskBarrier);
threadPool.start(task1);
threadPool.start(task2);
threadPool.start(task3);
mainBarrier.acquire(1);
QCOMPARE(threadPool.activeThreadCount(), 1);
QVERIFY(!threadPool.tryTake(task1));
QVERIFY(threadPool.tryTake(task2));
QVERIFY(threadPool.tryTake(task3));
// A bad queue implementation can segfault here because two consecutive items in the queue
// have been taken
threadPool.setMaxThreadCount(4);
// Even though we increase the max thread count, there should only be one job to run
QCOMPARE(threadPool.activeThreadCount(), 1);
// Make sure jobs 2 and 3 never started
QCOMPARE(mainBarrier.available(), 0);
taskBarrier.release(1);
threadPool.waitForDone();
QCOMPARE(threadPool.activeThreadCount(), 0);
delete task1;
delete task2;
delete task3;
}
QTEST_MAIN(tst_QThreadPool); QTEST_MAIN(tst_QThreadPool);
#include "tst_qthreadpool.moc" #include "tst_qthreadpool.moc"