QFuture: Don't use QFutureCallOutInterface for continuations

This patch replaces the QBasicFutureWatcher that was used for
continuations with context objects with a smaller QObject-based wrapper
that works directly from the actual continuation.
The idea stays the same: In order to run continuations in the thread of
a context object, we offload the continuation invocation to the
signal-slot mechanism.
Previously, we've hooked into QFuture with QFutureCallOutInterface to
emit a signal and trigger continuation invocation. However, it is much
easier and robust to emit that signal from the continuation itself.

This sidesteps the locking issues that QFutureCallOutInterface handling
presents. QFutureCallOutInterface basically requires any consumer to
only access the QFuture after all events have been posted and the
internal mutex unlocked, i.e. on the next cycle of the event loop.

Continuations do not impose this restriction; runContinuation()
explicitly unlocks the internal mutex before calling the continuation.

This fixes a deadlock when using QFuture::then(context, ...) where
the paren future is resolved from the same thread that the context
object lives in.

Fixes: QTBUG-119406
Fixes: QTBUG-119103
Fixes: QTBUG-117918
Fixes: QTBUG-119579
Fixes: QTBUG-119810
Pick-to: 6.6
Change-Id: I112b16024cde6b6ee0e4d8127392864b813df5bc
Reviewed-by: Qt CI Bot <qt_ci_bot@qt-project.org>
Reviewed-by: Ivan Solovev <ivan.solovev@qt.io>
(cherry picked from commit 59e21a536f7f81625216dc7a621e7be59919da33)
Reviewed-by: Qt Cherry-pick Bot <cherrypick_bot@qt-project.org>
This commit is contained in:
Arno Rehn 2023-11-23 23:59:34 +01:00 committed by Qt Cherry-pick Bot
parent 51cbcdab6e
commit f89d80c902
3 changed files with 76 additions and 92 deletions

View File

@ -44,73 +44,19 @@ const auto suspendingOrSuspended =
} // unnamed namespace
class QBasicFutureWatcher : public QObject, QFutureCallOutInterface
class QObjectContinuationWrapper : public QObject
{
Q_OBJECT
public:
explicit QBasicFutureWatcher(QObject *parent = nullptr);
~QBasicFutureWatcher() override;
explicit QObjectContinuationWrapper(QObject *parent = nullptr)
: QObject(parent)
{
}
void setFuture(QFutureInterfaceBase &fi);
bool event(QEvent *event) override;
Q_SIGNALS:
void finished();
private:
QFutureInterfaceBase future;
void postCallOutEvent(const QFutureCallOutEvent &event) override;
void callOutInterfaceDisconnected() override;
signals:
void run();
};
void QBasicFutureWatcher::postCallOutEvent(const QFutureCallOutEvent &event)
{
if (thread() == QThread::currentThread()) {
// If we are in the same thread, don't queue up anything.
std::unique_ptr<QFutureCallOutEvent> clonedEvent(event.clone());
QCoreApplication::sendEvent(this, clonedEvent.get());
} else {
QCoreApplication::postEvent(this, event.clone());
}
}
void QBasicFutureWatcher::callOutInterfaceDisconnected()
{
QCoreApplication::removePostedEvents(this, QEvent::FutureCallOut);
}
/*
* QBasicFutureWatcher is a more lightweight version of QFutureWatcher for internal use
*/
QBasicFutureWatcher::QBasicFutureWatcher(QObject *parent)
: QObject(parent)
{
}
QBasicFutureWatcher::~QBasicFutureWatcher()
{
future.d->disconnectOutputInterface(this);
}
void QBasicFutureWatcher::setFuture(QFutureInterfaceBase &fi)
{
future = fi;
future.d->connectOutputInterface(this);
}
bool QBasicFutureWatcher::event(QEvent *event)
{
if (event->type() == QEvent::FutureCallOut) {
QFutureCallOutEvent *callOutEvent = static_cast<QFutureCallOutEvent *>(event);
if (callOutEvent->callOutType == QFutureCallOutEvent::Finished)
emit finished();
return true;
}
return QObject::event(event);
}
void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj,
QFutureInterfaceBase &fi)
{
@ -119,22 +65,40 @@ void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *s
auto slot = SlotObjUniquePtr(slotObj);
auto *watcher = new QBasicFutureWatcher;
auto *watcher = new QObjectContinuationWrapper;
watcher->moveToThread(context->thread());
// We need to protect acccess to the watcher. The context object (and in turn, the watcher)
// could be destroyed while the continuation that emits the signal is running. We have to
// prevent that.
// The mutex has to be recursive, because the continuation itself could delete the context
// object (and thus the watcher), which will try to lock the mutex from the same thread twice.
auto watcherMutex = std::make_shared<QRecursiveMutex>();
const auto destroyWatcher = [watcherMutex, watcher]() mutable {
QMutexLocker lock(watcherMutex.get());
watcher->deleteLater();
};
// ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
QObject::connect(watcher, &QBasicFutureWatcher::finished,
QObject::connect(watcher, &QObjectContinuationWrapper::run,
// for the following, cf. QMetaObject::invokeMethodImpl():
// we know `slot` is a lambda returning `void`, so we can just
// `call()` with `obj` and `args[0]` set to `nullptr`:
watcher, [slot = std::move(slot)] {
context, [slot = std::move(slot)] {
void *args[] = { nullptr }; // for `void` return value
slot->call(nullptr, args);
});
QObject::connect(watcher, &QBasicFutureWatcher::finished,
watcher, &QObject::deleteLater);
QObject::connect(context, &QObject::destroyed,
watcher, &QObject::deleteLater);
watcher->setFuture(fi);
QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, &QObject::deleteLater);
QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
fi.setContinuation([watcherMutex, watcher = QPointer(watcher)]
(const QFutureInterfaceBase &parentData)
{
Q_UNUSED(parentData);
QMutexLocker lock(watcherMutex.get());
if (watcher)
emit watcher->run();
});
}
QFutureCallOutInterface::~QFutureCallOutInterface()

View File

@ -21,6 +21,7 @@ QT_BEGIN_NAMESPACE
template <typename T> class QFuture;
class QThreadPool;
class QFutureInterfaceBase;
class QFutureInterfaceBasePrivate;
class QFutureWatcherBase;
class QFutureWatcherBasePrivate;
@ -39,8 +40,10 @@ template<class Function, class ResultType>
class FailureHandler;
#endif
void Q_CORE_EXPORT watchContinuationImpl(const QObject *context,
QtPrivate::QSlotObjectBase *slotObj,
QFutureInterfaceBase &fi);
}
class QBasicFutureWatcher;
class Q_CORE_EXPORT QFutureInterfaceBase
{
@ -178,7 +181,8 @@ private:
friend class QtPrivate::FailureHandler;
#endif
friend class QBasicFutureWatcher;
friend Q_CORE_EXPORT void QtPrivate::watchContinuationImpl(
const QObject *context, QtPrivate::QSlotObjectBase *slotObj, QFutureInterfaceBase &fi);
template<class T>
friend class QPromise;

View File

@ -204,6 +204,7 @@ private slots:
#endif
void onCanceled();
void cancelContinuations();
void continuationsWithContext_data();
void continuationsWithContext();
void continuationsWithMoveOnlyLambda();
#if 0
@ -3223,15 +3224,35 @@ void tst_QFuture::cancelContinuations()
}
}
void tst_QFuture::continuationsWithContext_data()
{
QTest::addColumn<bool>("inOtherThread");
QTest::addRow("in-other-thread") << true;
QTest::addRow("in-main-thread-qtbug119406") << false;
}
void tst_QFuture::continuationsWithContext()
{
QThread thread;
thread.start();
auto context = new QObject();
context->moveToThread(&thread);
QFETCH(bool, inOtherThread);
auto tstThread = QThread::currentThread();
QThread *thread = inOtherThread ? new QThread
: tstThread;
auto context = new QObject();
const auto cleanupGuard = qScopeGuard([&] {
context->deleteLater();
if (thread != tstThread) {
thread->quit();
thread->wait();
delete thread;
}
});
if (inOtherThread) {
thread->start();
context->moveToThread(thread);
}
// .then()
{
@ -3244,12 +3265,12 @@ void tst_QFuture::continuationsWithContext()
})
.then(context,
[&](int val) {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
return val + 1;
})
.then([&](int val) {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
return val + 1;
});
@ -3265,12 +3286,12 @@ void tst_QFuture::continuationsWithContext()
auto future = promise.future()
.onCanceled(context,
[&] {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
return 1;
})
.then([&](int val) {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
return val + 1;
});
@ -3287,17 +3308,17 @@ void tst_QFuture::continuationsWithContext()
// like QPointers to the parent not being set to nullptr during child
// object destruction.
QPointer shortLivedContext = new FakeQWidget();
shortLivedContext->moveToThread(&thread);
shortLivedContext->moveToThread(thread);
QPromise<int> promise;
auto future = promise.future()
.then(shortLivedContext, [&](int val) {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
return val + 1000;
})
.onCanceled([&, ptr=QPointer(shortLivedContext)] {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
if (ptr)
return 1;
@ -3307,10 +3328,10 @@ void tst_QFuture::continuationsWithContext()
QMetaObject::invokeMethod(shortLivedContext, [&]() {
delete shortLivedContext;
}, Qt::BlockingQueuedConnection);
}, inOtherThread ? Qt::BlockingQueuedConnection
: Qt::DirectConnection);
promise.finish();
QCOMPARE(future.result(), 2);
}
@ -3326,12 +3347,12 @@ void tst_QFuture::continuationsWithContext()
})
.onFailed(context,
[&] {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
return 1;
})
.then([&](int val) {
if (QThread::currentThread() != &thread)
if (QThread::currentThread() != thread)
return 0;
return val + 1;
});
@ -3340,11 +3361,6 @@ void tst_QFuture::continuationsWithContext()
QCOMPARE(future.result(), 2);
}
#endif // QT_NO_EXCEPTIONS
context->deleteLater();
thread.quit();
thread.wait();
}
void tst_QFuture::continuationsWithMoveOnlyLambda()