Add QFuture::cancelChain()
This method should be used on a future returned from the long continuation chain, if this chain needs to be canceled altogether. To achieve it, this commit introduces a pointer to the parent data. That pointer is set when setContinuation() is called, and cleaned when the parent is completed and calls its runContinuation() method. This allows the code in cancelChain() to simply move up the chain as long as the parent pointer is valid, and cancel each of the continuations. The commit also moves the QFIB::cancel() implementation into QFIBP::cancelImpl(), so that it can be reused in the new method, and introduces an enum for cancellation options, to address the differences in the behavior between these two methods. Note that the approach consistently works only on the last QFuture of the chain, because the already-executed continuations might clear their continuation data, which makes navigating down the chain inconsistent. [ChangeLog][QtCore][QFuture] Added QFuture::cancelChain(). Fixes: QTBUG-130662 Change-Id: Ic0949ed30a2f54f99597e95b3951fda6bfb9a0be Reviewed-by: Mårten Nordheim <marten.nordheim@qt.io>
This commit is contained in:
parent
3164746eb0
commit
cdc71532ec
@ -440,3 +440,27 @@ auto continuation = future.then(context, [context](Result result) {
|
||||
});
|
||||
|
||||
//! [37]
|
||||
|
||||
//! [38]
|
||||
auto f = QtConcurrent::run(...)
|
||||
.then([]{
|
||||
// Then 1
|
||||
})
|
||||
.then([]{
|
||||
// Then 2
|
||||
})
|
||||
.onCanceled([]{
|
||||
// OnCanceled 1
|
||||
})
|
||||
.then([]{
|
||||
// Then 3
|
||||
})
|
||||
.then([]{
|
||||
// Then 4
|
||||
})
|
||||
.onCanceled([]{
|
||||
// OnCanceled 2
|
||||
});
|
||||
...
|
||||
f.cancelChain();
|
||||
//! [38]
|
||||
|
@ -63,6 +63,7 @@ public:
|
||||
|
||||
void cancel() { d.cancel(); }
|
||||
bool isCanceled() const { return d.isCanceled(); }
|
||||
void cancelChain() { d.cancelChain(); }
|
||||
|
||||
#if QT_DEPRECATED_SINCE(6, 0)
|
||||
QT_DEPRECATED_VERSION_X_6_0("Use setSuspended() instead.")
|
||||
|
@ -180,6 +180,8 @@
|
||||
Be aware that not all running asynchronous computations can be canceled.
|
||||
For example, the future returned by QtConcurrent::run() cannot be canceled;
|
||||
but the future returned by QtConcurrent::mappedReduced() can.
|
||||
|
||||
\sa cancelChain()
|
||||
*/
|
||||
|
||||
/*! \fn template <typename T> bool QFuture<T>::isCanceled() const
|
||||
@ -191,6 +193,30 @@
|
||||
function returns \c true. See cancel() for more details.
|
||||
*/
|
||||
|
||||
/*!
|
||||
\fn template <typename T> void QFuture<T>::cancelChain()
|
||||
\since 6.10
|
||||
|
||||
Cancels the entire continuation chain. All already-finished futures are
|
||||
unchanged and their results are still available. Every pending continuation
|
||||
is canceled, and its \l {onCanceled()} handler is called, if it exists in
|
||||
the continuation chain.
|
||||
|
||||
\snippet code/src_corelib_thread_qfuture.cpp 38
|
||||
|
||||
In the example, if the chain is canceled before the \c {Then 2} continuation
|
||||
is executed, both \c {OnCanceled 1} and \c {OnCanceled 2} cancellation
|
||||
handlers will be invoked.
|
||||
|
||||
If the chain is canceled after \c {Then 2}, but before \c {Then 4}, then
|
||||
only \c {OnCanceled 2} will be invoked.
|
||||
|
||||
\note When called on an already finished future, this method has no effect.
|
||||
It's recommended to use it on the QFuture object that represents the entire
|
||||
continuation chain, like it's shown in the example above.
|
||||
|
||||
\sa cancel()
|
||||
*/
|
||||
|
||||
#if QT_DEPRECATED_SINCE(6, 0)
|
||||
/*! \fn template <typename T> void QFuture<T>::setPaused(bool paused)
|
||||
|
@ -95,46 +95,87 @@ static inline int switch_from_to(QAtomicInt &a, int from, int to)
|
||||
return value;
|
||||
}
|
||||
|
||||
void QFutureInterfaceBasePrivate::cancelImpl(QFutureInterfaceBase::CancelMode mode,
|
||||
CancelOptions options)
|
||||
{
|
||||
QMutexLocker locker(&m_mutex);
|
||||
|
||||
const auto oldState = state.loadRelaxed();
|
||||
|
||||
switch (mode) {
|
||||
case QFutureInterfaceBase::CancelMode::CancelAndFinish:
|
||||
if ((oldState & QFutureInterfaceBase::Finished)
|
||||
&& (oldState & QFutureInterfaceBase::Canceled)) {
|
||||
return;
|
||||
}
|
||||
switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running,
|
||||
QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished);
|
||||
break;
|
||||
case QFutureInterfaceBase::CancelMode::CancelOnly:
|
||||
if (oldState & QFutureInterfaceBase::Canceled)
|
||||
return;
|
||||
switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled);
|
||||
break;
|
||||
}
|
||||
|
||||
if (options & CancelOption::CancelContinuations) {
|
||||
// Cancel the continuations chain
|
||||
QMutexLocker continuationLocker(&continuationMutex);
|
||||
QFutureInterfaceBasePrivate *next = continuationData;
|
||||
while (next) {
|
||||
QMutexLocker nextLocker(&next->continuationMutex);
|
||||
if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) {
|
||||
next->continuationState = QFutureInterfaceBasePrivate::Canceled;
|
||||
next = next->continuationData;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
waitCondition.wakeAll();
|
||||
pausedWaitCondition.wakeAll();
|
||||
|
||||
if (!(oldState & QFutureInterfaceBase::Canceled))
|
||||
sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
|
||||
if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish
|
||||
&& !(oldState & QFutureInterfaceBase::Finished)) {
|
||||
sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
|
||||
}
|
||||
|
||||
isValid = false;
|
||||
}
|
||||
|
||||
void QFutureInterfaceBase::cancel()
|
||||
{
|
||||
cancel(CancelMode::CancelOnly);
|
||||
}
|
||||
|
||||
void QFutureInterfaceBase::cancelChain()
|
||||
{
|
||||
cancelChain(CancelMode::CancelOnly);
|
||||
}
|
||||
|
||||
void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode)
|
||||
{
|
||||
QMutexLocker locker(&d->m_mutex);
|
||||
d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
|
||||
}
|
||||
|
||||
const auto oldState = d->state.loadRelaxed();
|
||||
|
||||
switch (mode) {
|
||||
case CancelMode::CancelAndFinish:
|
||||
if ((oldState & Finished) && (oldState & Canceled))
|
||||
return;
|
||||
switch_from_to(d->state, suspendingOrSuspended | Running, Canceled | Finished);
|
||||
break;
|
||||
case CancelMode::CancelOnly:
|
||||
if (oldState & Canceled)
|
||||
return;
|
||||
switch_from_to(d->state, suspendingOrSuspended, Canceled);
|
||||
break;
|
||||
void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode)
|
||||
{
|
||||
// go up through the list of continuations, cancelling each of them
|
||||
{
|
||||
QMutexLocker locker(&d->continuationMutex);
|
||||
QFutureInterfaceBasePrivate *prev = d->nonConcludedParent;
|
||||
while (prev) {
|
||||
// Do not cancel continuations, because we're going bottom-to-top
|
||||
prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None);
|
||||
QMutexLocker prevLocker(&prev->continuationMutex);
|
||||
prev = prev->nonConcludedParent;
|
||||
}
|
||||
|
||||
// Cancel the continuations chain
|
||||
QFutureInterfaceBasePrivate *next = d->continuationData;
|
||||
while (next && next->continuationType == ContinuationType::Then) {
|
||||
next->continuationState = QFutureInterfaceBasePrivate::Canceled;
|
||||
next = next->continuationData;
|
||||
}
|
||||
|
||||
d->waitCondition.wakeAll();
|
||||
d->pausedWaitCondition.wakeAll();
|
||||
|
||||
if (!(oldState & Canceled))
|
||||
d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
|
||||
if (mode == CancelMode::CancelAndFinish && !(oldState & Finished))
|
||||
d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished));
|
||||
|
||||
d->isValid = false;
|
||||
// finally, cancel self and all next continuations
|
||||
d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations);
|
||||
}
|
||||
|
||||
void QFutureInterfaceBase::setSuspended(bool suspend)
|
||||
@ -847,10 +888,14 @@ void QFutureInterfaceBase::setContinuation(std::function<void (const QFutureInte
|
||||
if (d->continuation) {
|
||||
qWarning("Adding a continuation to a future which already has a continuation. "
|
||||
"The existing continuation is overwritten.");
|
||||
if (d->continuationData)
|
||||
d->continuationData->nonConcludedParent = nullptr;
|
||||
}
|
||||
d->continuation = std::move(func);
|
||||
if (futureData)
|
||||
if (futureData) {
|
||||
futureData->continuationType = type;
|
||||
futureData->nonConcludedParent = d;
|
||||
}
|
||||
d->continuationData = futureData;
|
||||
Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown,
|
||||
"setContinuation", "Make sure to provide a correct continuation type!");
|
||||
@ -949,6 +994,10 @@ void QFutureInterfaceBase::runContinuation() const
|
||||
{
|
||||
QMutexLocker lock(&d->continuationMutex);
|
||||
if (d->continuation && !d->continuationExecuted) {
|
||||
// If we run the next continuation, then this future is concluded, so
|
||||
// we wouldn't need to revisit it in the cancelChain()
|
||||
if (d->continuationData)
|
||||
d->continuationData->nonConcludedParent = nullptr;
|
||||
// Save the continuation in a local function, to avoid calling
|
||||
// a null std::function below, in case cleanContinuation() is
|
||||
// called from some other thread right after unlock() below.
|
||||
|
@ -127,6 +127,7 @@ public:
|
||||
|
||||
void cancel();
|
||||
void cancelAndFinish() { cancel(CancelMode::CancelAndFinish); }
|
||||
void cancelChain();
|
||||
|
||||
void setSuspended(bool suspend);
|
||||
void toggleSuspended();
|
||||
@ -221,6 +222,7 @@ protected:
|
||||
|
||||
enum class CancelMode { CancelOnly, CancelAndFinish };
|
||||
void cancel(CancelMode mode);
|
||||
void cancelChain(CancelMode mode);
|
||||
};
|
||||
|
||||
inline void swap(QFutureInterfaceBase &lhs, QFutureInterfaceBase &rhs) noexcept
|
||||
|
@ -142,6 +142,9 @@ public:
|
||||
// Wrapper for continuation
|
||||
std::function<void(const QFutureInterfaceBase &)> continuation;
|
||||
QFutureInterfaceBasePrivate *continuationData = nullptr;
|
||||
// will reset back to nullptr when the parent future is done
|
||||
// (finished, canceled, failed etc)
|
||||
QFutureInterfaceBasePrivate *nonConcludedParent = nullptr;
|
||||
|
||||
RefCount refCount = 1;
|
||||
QAtomicInt state; // reads and writes can happen unprotected, both must be atomic
|
||||
@ -183,6 +186,14 @@ public:
|
||||
void disconnectOutputInterface(QFutureCallOutInterface *iface);
|
||||
|
||||
void setState(QFutureInterfaceBase::State state);
|
||||
|
||||
enum class CancelOption : quint32
|
||||
{
|
||||
None = 0x00,
|
||||
CancelContinuations = 0x01,
|
||||
};
|
||||
Q_DECLARE_FLAGS(CancelOptions, CancelOption)
|
||||
void cancelImpl(QFutureInterfaceBase::CancelMode mode, CancelOptions options);
|
||||
};
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
@ -253,6 +253,11 @@ private slots:
|
||||
|
||||
void unwrap();
|
||||
|
||||
void cancelChain();
|
||||
void cancelChainWithContext_data();
|
||||
void cancelChainWithContext();
|
||||
void cancelChainOnAnOverwrittenFuture();
|
||||
|
||||
private:
|
||||
using size_type = std::vector<int>::size_type;
|
||||
|
||||
@ -5383,5 +5388,310 @@ void tst_QFuture::unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void tst_QFuture::cancelChain()
|
||||
{
|
||||
// cancel immediately
|
||||
{
|
||||
QPromise<void> p;
|
||||
p.start();
|
||||
|
||||
int thenCnt = 0;
|
||||
int onCancelCnt = 0;
|
||||
|
||||
auto f = p.future()
|
||||
.then([&]() {
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]() {
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
})
|
||||
.onCanceled([&] {
|
||||
++onCancelCnt;
|
||||
})
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
});
|
||||
|
||||
f.cancelChain();
|
||||
p.finish();
|
||||
|
||||
QCOMPARE_EQ(thenCnt, 0);
|
||||
QCOMPARE_EQ(onCancelCnt, 1);
|
||||
}
|
||||
// cancel when part of the chain is already done
|
||||
{
|
||||
QPromise<void> p1, p2;
|
||||
p1.start();
|
||||
p2.start();
|
||||
|
||||
int thenCnt = 0;
|
||||
int onCancelCnt = 0;
|
||||
|
||||
auto f = QtFuture::makeReadyVoidFuture()
|
||||
.then([&, f = p1.future()]() {
|
||||
++thenCnt;
|
||||
return f;
|
||||
}).unwrap()
|
||||
.then([&, f = p2.future()]() {
|
||||
++thenCnt;
|
||||
return f;
|
||||
}).unwrap()
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
})
|
||||
.onCanceled([&] {
|
||||
++onCancelCnt;
|
||||
})
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
});
|
||||
|
||||
p1.finish();
|
||||
f.cancelChain();
|
||||
p2.finish();
|
||||
|
||||
QCOMPARE_EQ(thenCnt, 2);
|
||||
QCOMPARE_EQ(onCancelCnt, 1);
|
||||
}
|
||||
// calling once everything is done has no effect
|
||||
{
|
||||
int thenCnt = 0;
|
||||
int onCancelCnt = 0;
|
||||
|
||||
auto f = QtFuture::makeReadyVoidFuture()
|
||||
.then([&]() {
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]() {
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
})
|
||||
.onCanceled([&] {
|
||||
++onCancelCnt;
|
||||
})
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
});
|
||||
|
||||
f.cancelChain();
|
||||
|
||||
QCOMPARE_EQ(thenCnt, 4);
|
||||
QCOMPARE_EQ(onCancelCnt, 0);
|
||||
}
|
||||
}
|
||||
|
||||
void tst_QFuture::cancelChainWithContext_data()
|
||||
{
|
||||
QTest::addColumn<bool>("inOtherThread");
|
||||
QTest::addRow("in-other-thread") << true;
|
||||
QTest::addRow("in-main-thread") << false;
|
||||
}
|
||||
|
||||
void tst_QFuture::cancelChainWithContext()
|
||||
{
|
||||
QFETCH(const bool, inOtherThread);
|
||||
|
||||
auto tstThread = QThread::currentThread();
|
||||
QThread *thread = inOtherThread ? new QThread
|
||||
: tstThread;
|
||||
auto context = new QObject();
|
||||
|
||||
const auto cleanupGuard = qScopeGuard([&] {
|
||||
context->deleteLater();
|
||||
if (thread != tstThread) {
|
||||
thread->quit();
|
||||
thread->wait();
|
||||
delete thread;
|
||||
}
|
||||
});
|
||||
|
||||
if (inOtherThread) {
|
||||
thread->start();
|
||||
context->moveToThread(thread);
|
||||
}
|
||||
|
||||
// cancel immediately
|
||||
{
|
||||
QPromise<void> p;
|
||||
p.start();
|
||||
|
||||
int thenCnt = 0;
|
||||
int onCancelCnt = 0;
|
||||
bool unexpectedThread = false;
|
||||
|
||||
auto f = p.future()
|
||||
.then(context, [&]() {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]() {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]{
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.onCanceled(context, [&] {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++onCancelCnt;
|
||||
})
|
||||
.then([&]{
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
});
|
||||
|
||||
f.cancelChain();
|
||||
p.finish();
|
||||
f.waitForFinished();
|
||||
|
||||
QVERIFY(!unexpectedThread);
|
||||
QCOMPARE_EQ(thenCnt, 0);
|
||||
QCOMPARE_EQ(onCancelCnt, 1);
|
||||
}
|
||||
// cancel when part of the chain is already done
|
||||
{
|
||||
QPromise<void> p1, p2;
|
||||
p1.start();
|
||||
p2.start();
|
||||
|
||||
int thenCnt = 0;
|
||||
int onCancelCnt = 0;
|
||||
bool unexpectedThread = false;
|
||||
|
||||
auto f = p1.future()
|
||||
.then(context, [&]() {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&, f = p2.future()]() {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
return f;
|
||||
}).unwrap()
|
||||
.then(context, [&]{
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.onCanceled([&] {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++onCancelCnt;
|
||||
})
|
||||
.then(context, [&]{
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
});
|
||||
|
||||
p1.finish();
|
||||
f.cancelChain();
|
||||
p2.finish();
|
||||
f.waitForFinished();
|
||||
|
||||
QVERIFY(!unexpectedThread);
|
||||
QCOMPARE_EQ(thenCnt, 2);
|
||||
QCOMPARE_EQ(onCancelCnt, 1);
|
||||
}
|
||||
// calling once everything is done has no effect
|
||||
{
|
||||
QPromise<void> p;
|
||||
p.start();
|
||||
|
||||
int thenCnt = 0;
|
||||
int onCancelCnt = 0;
|
||||
bool unexpectedThread = false;
|
||||
|
||||
auto f = p.future()
|
||||
.then(context, [&]() {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]() {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]{
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
})
|
||||
.onCanceled(context, [&] {
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++onCancelCnt;
|
||||
})
|
||||
.then([&]{
|
||||
if (QThread::currentThread() != thread)
|
||||
unexpectedThread = true;
|
||||
++thenCnt;
|
||||
});
|
||||
|
||||
p.finish();
|
||||
f.waitForFinished();
|
||||
f.cancelChain();
|
||||
|
||||
QVERIFY(!unexpectedThread);
|
||||
QCOMPARE_EQ(thenCnt, 4);
|
||||
QCOMPARE_EQ(onCancelCnt, 0);
|
||||
}
|
||||
}
|
||||
|
||||
void tst_QFuture::cancelChainOnAnOverwrittenFuture()
|
||||
{
|
||||
QPromise<void> p;
|
||||
p.start();
|
||||
|
||||
int thenCnt = 0;
|
||||
int onCancelCnt = 0;
|
||||
|
||||
auto firstF = p.future()
|
||||
.then([&]() {
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]() {
|
||||
++thenCnt;
|
||||
})
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
})
|
||||
.onCanceled([&] {
|
||||
++onCancelCnt;
|
||||
});
|
||||
auto overwrittenF = firstF
|
||||
.then([&]{
|
||||
thenCnt = -100; // should not happen
|
||||
});
|
||||
|
||||
suppressContinuationOverrideWarning();
|
||||
auto anotherF = firstF
|
||||
.then([&]{
|
||||
++thenCnt;
|
||||
});
|
||||
|
||||
// cancelling overwrittenF should have no effect on the chain at this point!
|
||||
overwrittenF.cancelChain();
|
||||
p.finish();
|
||||
|
||||
QCOMPARE_EQ(thenCnt, 4);
|
||||
QCOMPARE_EQ(onCancelCnt, 0);
|
||||
}
|
||||
|
||||
QTEST_MAIN(tst_QFuture)
|
||||
#include "tst_qfuture.moc"
|
||||
|
Loading…
x
Reference in New Issue
Block a user