diff --git a/src/corelib/doc/snippets/code/src_corelib_thread_qfuture.cpp b/src/corelib/doc/snippets/code/src_corelib_thread_qfuture.cpp index 500d7cc7a57..747e29e3106 100644 --- a/src/corelib/doc/snippets/code/src_corelib_thread_qfuture.cpp +++ b/src/corelib/doc/snippets/code/src_corelib_thread_qfuture.cpp @@ -440,3 +440,27 @@ auto continuation = future.then(context, [context](Result result) { }); //! [37] + +//! [38] +auto f = QtConcurrent::run(...) + .then([]{ + // Then 1 + }) + .then([]{ + // Then 2 + }) + .onCanceled([]{ + // OnCanceled 1 + }) + .then([]{ + // Then 3 + }) + .then([]{ + // Then 4 + }) + .onCanceled([]{ + // OnCanceled 2 + }); +... +f.cancelChain(); +//! [38] diff --git a/src/corelib/thread/qfuture.h b/src/corelib/thread/qfuture.h index 4aacd97f233..3c75365d096 100644 --- a/src/corelib/thread/qfuture.h +++ b/src/corelib/thread/qfuture.h @@ -63,6 +63,7 @@ public: void cancel() { d.cancel(); } bool isCanceled() const { return d.isCanceled(); } + void cancelChain() { d.cancelChain(); } #if QT_DEPRECATED_SINCE(6, 0) QT_DEPRECATED_VERSION_X_6_0("Use setSuspended() instead.") diff --git a/src/corelib/thread/qfuture.qdoc b/src/corelib/thread/qfuture.qdoc index b278d398821..891b38cab9f 100644 --- a/src/corelib/thread/qfuture.qdoc +++ b/src/corelib/thread/qfuture.qdoc @@ -180,6 +180,8 @@ Be aware that not all running asynchronous computations can be canceled. For example, the future returned by QtConcurrent::run() cannot be canceled; but the future returned by QtConcurrent::mappedReduced() can. + + \sa cancelChain() */ /*! \fn template bool QFuture::isCanceled() const @@ -191,6 +193,30 @@ function returns \c true. See cancel() for more details. */ +/*! + \fn template void QFuture::cancelChain() + \since 6.10 + + Cancels the entire continuation chain. All already-finished futures are + unchanged and their results are still available. Every pending continuation + is canceled, and its \l {onCanceled()} handler is called, if it exists in + the continuation chain. + + \snippet code/src_corelib_thread_qfuture.cpp 38 + + In the example, if the chain is canceled before the \c {Then 2} continuation + is executed, both \c {OnCanceled 1} and \c {OnCanceled 2} cancellation + handlers will be invoked. + + If the chain is canceled after \c {Then 2}, but before \c {Then 4}, then + only \c {OnCanceled 2} will be invoked. + + \note When called on an already finished future, this method has no effect. + It's recommended to use it on the QFuture object that represents the entire + continuation chain, like it's shown in the example above. + + \sa cancel() +*/ #if QT_DEPRECATED_SINCE(6, 0) /*! \fn template void QFuture::setPaused(bool paused) diff --git a/src/corelib/thread/qfutureinterface.cpp b/src/corelib/thread/qfutureinterface.cpp index cc61752f28d..30d7814ab0a 100644 --- a/src/corelib/thread/qfutureinterface.cpp +++ b/src/corelib/thread/qfutureinterface.cpp @@ -95,46 +95,87 @@ static inline int switch_from_to(QAtomicInt &a, int from, int to) return value; } +void QFutureInterfaceBasePrivate::cancelImpl(QFutureInterfaceBase::CancelMode mode, + CancelOptions options) +{ + QMutexLocker locker(&m_mutex); + + const auto oldState = state.loadRelaxed(); + + switch (mode) { + case QFutureInterfaceBase::CancelMode::CancelAndFinish: + if ((oldState & QFutureInterfaceBase::Finished) + && (oldState & QFutureInterfaceBase::Canceled)) { + return; + } + switch_from_to(state, suspendingOrSuspended | QFutureInterfaceBase::Running, + QFutureInterfaceBase::Canceled | QFutureInterfaceBase::Finished); + break; + case QFutureInterfaceBase::CancelMode::CancelOnly: + if (oldState & QFutureInterfaceBase::Canceled) + return; + switch_from_to(state, suspendingOrSuspended, QFutureInterfaceBase::Canceled); + break; + } + + if (options & CancelOption::CancelContinuations) { + // Cancel the continuations chain + QMutexLocker continuationLocker(&continuationMutex); + QFutureInterfaceBasePrivate *next = continuationData; + while (next) { + QMutexLocker nextLocker(&next->continuationMutex); + if (next->continuationType == QFutureInterfaceBase::ContinuationType::Then) { + next->continuationState = QFutureInterfaceBasePrivate::Canceled; + next = next->continuationData; + } else { + break; + } + } + } + + waitCondition.wakeAll(); + pausedWaitCondition.wakeAll(); + + if (!(oldState & QFutureInterfaceBase::Canceled)) + sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); + if (mode == QFutureInterfaceBase::CancelMode::CancelAndFinish + && !(oldState & QFutureInterfaceBase::Finished)) { + sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); + } + + isValid = false; +} + void QFutureInterfaceBase::cancel() { cancel(CancelMode::CancelOnly); } +void QFutureInterfaceBase::cancelChain() +{ + cancelChain(CancelMode::CancelOnly); +} + void QFutureInterfaceBase::cancel(QFutureInterfaceBase::CancelMode mode) { - QMutexLocker locker(&d->m_mutex); + d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations); +} - const auto oldState = d->state.loadRelaxed(); - - switch (mode) { - case CancelMode::CancelAndFinish: - if ((oldState & Finished) && (oldState & Canceled)) - return; - switch_from_to(d->state, suspendingOrSuspended | Running, Canceled | Finished); - break; - case CancelMode::CancelOnly: - if (oldState & Canceled) - return; - switch_from_to(d->state, suspendingOrSuspended, Canceled); - break; +void QFutureInterfaceBase::cancelChain(QFutureInterfaceBase::CancelMode mode) +{ + // go up through the list of continuations, cancelling each of them + { + QMutexLocker locker(&d->continuationMutex); + QFutureInterfaceBasePrivate *prev = d->nonConcludedParent; + while (prev) { + // Do not cancel continuations, because we're going bottom-to-top + prev->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::None); + QMutexLocker prevLocker(&prev->continuationMutex); + prev = prev->nonConcludedParent; + } } - - // Cancel the continuations chain - QFutureInterfaceBasePrivate *next = d->continuationData; - while (next && next->continuationType == ContinuationType::Then) { - next->continuationState = QFutureInterfaceBasePrivate::Canceled; - next = next->continuationData; - } - - d->waitCondition.wakeAll(); - d->pausedWaitCondition.wakeAll(); - - if (!(oldState & Canceled)) - d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled)); - if (mode == CancelMode::CancelAndFinish && !(oldState & Finished)) - d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Finished)); - - d->isValid = false; + // finally, cancel self and all next continuations + d->cancelImpl(mode, QFutureInterfaceBasePrivate::CancelOption::CancelContinuations); } void QFutureInterfaceBase::setSuspended(bool suspend) @@ -847,10 +888,14 @@ void QFutureInterfaceBase::setContinuation(std::functioncontinuation) { qWarning("Adding a continuation to a future which already has a continuation. " "The existing continuation is overwritten."); + if (d->continuationData) + d->continuationData->nonConcludedParent = nullptr; } d->continuation = std::move(func); - if (futureData) + if (futureData) { futureData->continuationType = type; + futureData->nonConcludedParent = d; + } d->continuationData = futureData; Q_ASSERT_X(!futureData || futureData->continuationType != ContinuationType::Unknown, "setContinuation", "Make sure to provide a correct continuation type!"); @@ -949,6 +994,10 @@ void QFutureInterfaceBase::runContinuation() const { QMutexLocker lock(&d->continuationMutex); if (d->continuation && !d->continuationExecuted) { + // If we run the next continuation, then this future is concluded, so + // we wouldn't need to revisit it in the cancelChain() + if (d->continuationData) + d->continuationData->nonConcludedParent = nullptr; // Save the continuation in a local function, to avoid calling // a null std::function below, in case cleanContinuation() is // called from some other thread right after unlock() below. diff --git a/src/corelib/thread/qfutureinterface.h b/src/corelib/thread/qfutureinterface.h index e3d0332f6f0..3224288228c 100644 --- a/src/corelib/thread/qfutureinterface.h +++ b/src/corelib/thread/qfutureinterface.h @@ -127,6 +127,7 @@ public: void cancel(); void cancelAndFinish() { cancel(CancelMode::CancelAndFinish); } + void cancelChain(); void setSuspended(bool suspend); void toggleSuspended(); @@ -221,6 +222,7 @@ protected: enum class CancelMode { CancelOnly, CancelAndFinish }; void cancel(CancelMode mode); + void cancelChain(CancelMode mode); }; inline void swap(QFutureInterfaceBase &lhs, QFutureInterfaceBase &rhs) noexcept diff --git a/src/corelib/thread/qfutureinterface_p.h b/src/corelib/thread/qfutureinterface_p.h index fdf80779db8..c850036b39a 100644 --- a/src/corelib/thread/qfutureinterface_p.h +++ b/src/corelib/thread/qfutureinterface_p.h @@ -142,6 +142,9 @@ public: // Wrapper for continuation std::function continuation; QFutureInterfaceBasePrivate *continuationData = nullptr; + // will reset back to nullptr when the parent future is done + // (finished, canceled, failed etc) + QFutureInterfaceBasePrivate *nonConcludedParent = nullptr; RefCount refCount = 1; QAtomicInt state; // reads and writes can happen unprotected, both must be atomic @@ -183,6 +186,14 @@ public: void disconnectOutputInterface(QFutureCallOutInterface *iface); void setState(QFutureInterfaceBase::State state); + + enum class CancelOption : quint32 + { + None = 0x00, + CancelContinuations = 0x01, + }; + Q_DECLARE_FLAGS(CancelOptions, CancelOption) + void cancelImpl(QFutureInterfaceBase::CancelMode mode, CancelOptions options); }; QT_END_NAMESPACE diff --git a/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp b/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp index 66477e6d505..e75446f5a71 100644 --- a/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp +++ b/tests/auto/corelib/thread/qfuture/tst_qfuture.cpp @@ -253,6 +253,11 @@ private slots: void unwrap(); + void cancelChain(); + void cancelChainWithContext_data(); + void cancelChainWithContext(); + void cancelChainOnAnOverwrittenFuture(); + private: using size_type = std::vector::size_type; @@ -5383,5 +5388,310 @@ void tst_QFuture::unwrap() } } + +void tst_QFuture::cancelChain() +{ + // cancel immediately + { + QPromise p; + p.start(); + + int thenCnt = 0; + int onCancelCnt = 0; + + auto f = p.future() + .then([&]() { + ++thenCnt; + }) + .then([&]() { + ++thenCnt; + }) + .then([&]{ + ++thenCnt; + }) + .onCanceled([&] { + ++onCancelCnt; + }) + .then([&]{ + ++thenCnt; + }); + + f.cancelChain(); + p.finish(); + + QCOMPARE_EQ(thenCnt, 0); + QCOMPARE_EQ(onCancelCnt, 1); + } + // cancel when part of the chain is already done + { + QPromise p1, p2; + p1.start(); + p2.start(); + + int thenCnt = 0; + int onCancelCnt = 0; + + auto f = QtFuture::makeReadyVoidFuture() + .then([&, f = p1.future()]() { + ++thenCnt; + return f; + }).unwrap() + .then([&, f = p2.future()]() { + ++thenCnt; + return f; + }).unwrap() + .then([&]{ + ++thenCnt; + }) + .onCanceled([&] { + ++onCancelCnt; + }) + .then([&]{ + ++thenCnt; + }); + + p1.finish(); + f.cancelChain(); + p2.finish(); + + QCOMPARE_EQ(thenCnt, 2); + QCOMPARE_EQ(onCancelCnt, 1); + } + // calling once everything is done has no effect + { + int thenCnt = 0; + int onCancelCnt = 0; + + auto f = QtFuture::makeReadyVoidFuture() + .then([&]() { + ++thenCnt; + }) + .then([&]() { + ++thenCnt; + }) + .then([&]{ + ++thenCnt; + }) + .onCanceled([&] { + ++onCancelCnt; + }) + .then([&]{ + ++thenCnt; + }); + + f.cancelChain(); + + QCOMPARE_EQ(thenCnt, 4); + QCOMPARE_EQ(onCancelCnt, 0); + } +} + +void tst_QFuture::cancelChainWithContext_data() +{ + QTest::addColumn("inOtherThread"); + QTest::addRow("in-other-thread") << true; + QTest::addRow("in-main-thread") << false; +} + +void tst_QFuture::cancelChainWithContext() +{ + QFETCH(const bool, inOtherThread); + + auto tstThread = QThread::currentThread(); + QThread *thread = inOtherThread ? new QThread + : tstThread; + auto context = new QObject(); + + const auto cleanupGuard = qScopeGuard([&] { + context->deleteLater(); + if (thread != tstThread) { + thread->quit(); + thread->wait(); + delete thread; + } + }); + + if (inOtherThread) { + thread->start(); + context->moveToThread(thread); + } + + // cancel immediately + { + QPromise p; + p.start(); + + int thenCnt = 0; + int onCancelCnt = 0; + bool unexpectedThread = false; + + auto f = p.future() + .then(context, [&]() { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .then([&]() { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .then([&]{ + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .onCanceled(context, [&] { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++onCancelCnt; + }) + .then([&]{ + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }); + + f.cancelChain(); + p.finish(); + f.waitForFinished(); + + QVERIFY(!unexpectedThread); + QCOMPARE_EQ(thenCnt, 0); + QCOMPARE_EQ(onCancelCnt, 1); + } + // cancel when part of the chain is already done + { + QPromise p1, p2; + p1.start(); + p2.start(); + + int thenCnt = 0; + int onCancelCnt = 0; + bool unexpectedThread = false; + + auto f = p1.future() + .then(context, [&]() { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .then([&, f = p2.future()]() { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + return f; + }).unwrap() + .then(context, [&]{ + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .onCanceled([&] { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++onCancelCnt; + }) + .then(context, [&]{ + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }); + + p1.finish(); + f.cancelChain(); + p2.finish(); + f.waitForFinished(); + + QVERIFY(!unexpectedThread); + QCOMPARE_EQ(thenCnt, 2); + QCOMPARE_EQ(onCancelCnt, 1); + } + // calling once everything is done has no effect + { + QPromise p; + p.start(); + + int thenCnt = 0; + int onCancelCnt = 0; + bool unexpectedThread = false; + + auto f = p.future() + .then(context, [&]() { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .then([&]() { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .then([&]{ + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }) + .onCanceled(context, [&] { + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++onCancelCnt; + }) + .then([&]{ + if (QThread::currentThread() != thread) + unexpectedThread = true; + ++thenCnt; + }); + + p.finish(); + f.waitForFinished(); + f.cancelChain(); + + QVERIFY(!unexpectedThread); + QCOMPARE_EQ(thenCnt, 4); + QCOMPARE_EQ(onCancelCnt, 0); + } +} + +void tst_QFuture::cancelChainOnAnOverwrittenFuture() +{ + QPromise p; + p.start(); + + int thenCnt = 0; + int onCancelCnt = 0; + + auto firstF = p.future() + .then([&]() { + ++thenCnt; + }) + .then([&]() { + ++thenCnt; + }) + .then([&]{ + ++thenCnt; + }) + .onCanceled([&] { + ++onCancelCnt; + }); + auto overwrittenF = firstF + .then([&]{ + thenCnt = -100; // should not happen + }); + + suppressContinuationOverrideWarning(); + auto anotherF = firstF + .then([&]{ + ++thenCnt; + }); + + // cancelling overwrittenF should have no effect on the chain at this point! + overwrittenF.cancelChain(); + p.finish(); + + QCOMPARE_EQ(thenCnt, 4); + QCOMPARE_EQ(onCancelCnt, 0); +} + QTEST_MAIN(tst_QFuture) #include "tst_qfuture.moc"