QThreadPool::cancel() to remove individual jobs from the job queue.
[ChangeLog][QtCore][QThreadPool] Added QThreadPool::cancel() which allows removing from the job queue a job that hasn't been started yet. Change-Id: Ib8f1c1f32a34f5eec8338c641d820b928e470164 Reviewed-by: Nick Shaforostoff <shafff@ukr.net> Reviewed-by: Olivier Goffart <ogoffart@woboq.com>
This commit is contained in:
parent
6475462c6f
commit
5b11e43e9f
@ -292,7 +292,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex)
|
|||||||
|
|
||||||
// To avoid deadlocks and reduce the number of threads used, try to
|
// To avoid deadlocks and reduce the number of threads used, try to
|
||||||
// run the runnable in the current thread.
|
// run the runnable in the current thread.
|
||||||
d->pool()->d_func()->stealRunnable(d->runnable);
|
d->pool()->d_func()->stealAndRunRunnable(d->runnable);
|
||||||
|
|
||||||
lock.relock();
|
lock.relock();
|
||||||
|
|
||||||
@ -313,7 +313,7 @@ void QFutureInterfaceBase::waitForFinished()
|
|||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
if (!alreadyFinished) {
|
if (!alreadyFinished) {
|
||||||
d->pool()->d_func()->stealRunnable(d->runnable);
|
d->pool()->d_func()->stealAndRunRunnable(d->runnable);
|
||||||
|
|
||||||
lock.relock();
|
lock.relock();
|
||||||
|
|
||||||
|
@ -311,14 +311,12 @@ void QThreadPoolPrivate::clear()
|
|||||||
/*!
|
/*!
|
||||||
\internal
|
\internal
|
||||||
Searches for \a runnable in the queue, removes it from the queue and
|
Searches for \a runnable in the queue, removes it from the queue and
|
||||||
runs it if found. This function does not return until the runnable
|
returns \c true if it was found in the queue
|
||||||
has completed.
|
|
||||||
*/
|
*/
|
||||||
void QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
|
bool QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
|
||||||
{
|
{
|
||||||
if (runnable == 0)
|
if (runnable == 0)
|
||||||
return;
|
return false;
|
||||||
bool found = false;
|
|
||||||
{
|
{
|
||||||
QMutexLocker locker(&mutex);
|
QMutexLocker locker(&mutex);
|
||||||
QList<QPair<QRunnable *, int> >::iterator it = queue.begin();
|
QList<QPair<QRunnable *, int> >::iterator it = queue.begin();
|
||||||
@ -326,17 +324,26 @@ void QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
|
|||||||
|
|
||||||
while (it != end) {
|
while (it != end) {
|
||||||
if (it->first == runnable) {
|
if (it->first == runnable) {
|
||||||
found = true;
|
|
||||||
queue.erase(it);
|
queue.erase(it);
|
||||||
break;
|
return true;
|
||||||
}
|
}
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found)
|
return false;
|
||||||
return;
|
}
|
||||||
|
|
||||||
|
/*!
|
||||||
|
\internal
|
||||||
|
Searches for \a runnable in the queue, removes it from the queue and
|
||||||
|
runs it if found. This function does not return until the runnable
|
||||||
|
has completed.
|
||||||
|
*/
|
||||||
|
void QThreadPoolPrivate::stealAndRunRunnable(QRunnable *runnable)
|
||||||
|
{
|
||||||
|
if (!stealRunnable(runnable))
|
||||||
|
return;
|
||||||
const bool autoDelete = runnable->autoDelete();
|
const bool autoDelete = runnable->autoDelete();
|
||||||
bool del = autoDelete && !--runnable->ref;
|
bool del = autoDelete && !--runnable->ref;
|
||||||
|
|
||||||
@ -628,6 +635,25 @@ void QThreadPool::clear()
|
|||||||
d->clear();
|
d->clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*!
|
||||||
|
\since 5.5
|
||||||
|
|
||||||
|
Removes the specified \a runnable from the queue if it is not yet started.
|
||||||
|
The runnables for which \l{QRunnable::autoDelete()}{runnable->autoDelete()}
|
||||||
|
returns \c true are deleted.
|
||||||
|
|
||||||
|
\sa start()
|
||||||
|
*/
|
||||||
|
void QThreadPool::cancel(QRunnable *runnable)
|
||||||
|
{
|
||||||
|
Q_D(QThreadPool);
|
||||||
|
if (!d->stealRunnable(runnable))
|
||||||
|
return;
|
||||||
|
if (runnable->autoDelete() && !--runnable->ref) {
|
||||||
|
delete runnable;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
QT_END_NAMESPACE
|
QT_END_NAMESPACE
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -77,6 +77,7 @@ public:
|
|||||||
bool waitForDone(int msecs = -1);
|
bool waitForDone(int msecs = -1);
|
||||||
|
|
||||||
void clear();
|
void clear();
|
||||||
|
void cancel(QRunnable *runnable);
|
||||||
};
|
};
|
||||||
|
|
||||||
QT_END_NAMESPACE
|
QT_END_NAMESPACE
|
||||||
|
@ -76,7 +76,8 @@ public:
|
|||||||
void reset();
|
void reset();
|
||||||
bool waitForDone(int msecs);
|
bool waitForDone(int msecs);
|
||||||
void clear();
|
void clear();
|
||||||
void stealRunnable(QRunnable *);
|
bool stealRunnable(QRunnable *runnable);
|
||||||
|
void stealAndRunRunnable(QRunnable *runnable);
|
||||||
|
|
||||||
mutable QMutex mutex;
|
mutable QMutex mutex;
|
||||||
QSet<QThreadPoolThread *> allThreads;
|
QSet<QThreadPoolThread *> allThreads;
|
||||||
|
@ -92,6 +92,7 @@ private slots:
|
|||||||
void priorityStart();
|
void priorityStart();
|
||||||
void waitForDone();
|
void waitForDone();
|
||||||
void clear();
|
void clear();
|
||||||
|
void cancel();
|
||||||
void waitForDoneTimeout();
|
void waitForDoneTimeout();
|
||||||
void destroyingWaitsForTasksToFinish();
|
void destroyingWaitsForTasksToFinish();
|
||||||
void stressTest();
|
void stressTest();
|
||||||
@ -958,6 +959,56 @@ void tst_QThreadPool::clear()
|
|||||||
QCOMPARE(count.load(), threadPool.maxThreadCount());
|
QCOMPARE(count.load(), threadPool.maxThreadCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tst_QThreadPool::cancel()
|
||||||
|
{
|
||||||
|
QSemaphore sem(0);
|
||||||
|
class BlockingRunnable : public QRunnable
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
QSemaphore & sem;
|
||||||
|
int & dtorCounter;
|
||||||
|
int & runCounter;
|
||||||
|
int dummy;
|
||||||
|
BlockingRunnable(QSemaphore & s, int & c, int & r) : sem(s), dtorCounter(c), runCounter(r){}
|
||||||
|
~BlockingRunnable(){dtorCounter++;}
|
||||||
|
void run()
|
||||||
|
{
|
||||||
|
runCounter++;
|
||||||
|
sem.acquire();
|
||||||
|
count.ref();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
typedef BlockingRunnable* BlockingRunnablePtr;
|
||||||
|
|
||||||
|
QThreadPool threadPool;
|
||||||
|
threadPool.setMaxThreadCount(3);
|
||||||
|
int runs = 2 * threadPool.maxThreadCount();
|
||||||
|
BlockingRunnablePtr* runnables = new BlockingRunnablePtr[runs];
|
||||||
|
count.store(0);
|
||||||
|
int dtorCounter = 0;
|
||||||
|
int runCounter = 0;
|
||||||
|
for (int i = 0; i < runs; i++) {
|
||||||
|
runnables[i] = new BlockingRunnable(sem, dtorCounter, runCounter);
|
||||||
|
runnables[i]->setAutoDelete(i != 0 && i != (runs-1)); //one which will run and one which will not
|
||||||
|
threadPool.cancel(runnables[i]); //verify NOOP for jobs not in the queue
|
||||||
|
threadPool.start(runnables[i]);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < runs; i++) {
|
||||||
|
threadPool.cancel(runnables[i]);
|
||||||
|
}
|
||||||
|
runnables[0]->dummy = 0; //valgrind will catch this if cancel() is crazy enough to delete currently running jobs
|
||||||
|
runnables[runs-1]->dummy = 0;
|
||||||
|
QCOMPARE(dtorCounter, runs-threadPool.maxThreadCount()-1);
|
||||||
|
sem.release(threadPool.maxThreadCount());
|
||||||
|
threadPool.waitForDone();
|
||||||
|
QCOMPARE(runCounter, threadPool.maxThreadCount());
|
||||||
|
QCOMPARE(count.load(), threadPool.maxThreadCount());
|
||||||
|
QCOMPARE(dtorCounter, runs-2);
|
||||||
|
delete runnables[0]; //if the pool deletes them then we'll get double-free crash
|
||||||
|
delete runnables[runs-1];
|
||||||
|
delete[] runnables;
|
||||||
|
}
|
||||||
|
|
||||||
void tst_QThreadPool::destroyingWaitsForTasksToFinish()
|
void tst_QThreadPool::destroyingWaitsForTasksToFinish()
|
||||||
{
|
{
|
||||||
QTime total, pass;
|
QTime total, pass;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user