QFuture: Don't use QFutureCallOutInterface for continuations

This patch replaces the QBasicFutureWatcher that was used for
continuations with context objects with a smaller QObject-based wrapper
that works directly from the actual continuation.
The idea stays the same: In order to run continuations in the thread of
a context object, we offload the continuation invocation to the
signal-slot mechanism.
Previously, we've hooked into QFuture with QFutureCallOutInterface to
emit a signal and trigger continuation invocation. However, it is much
easier and robust to emit that signal from the continuation itself.

This sidesteps the locking issues that QFutureCallOutInterface handling
presents. QFutureCallOutInterface basically requires any consumer to
only access the QFuture after all events have been posted and the
internal mutex unlocked, i.e. on the next cycle of the event loop.

Continuations do not impose this restriction; runContinuation()
explicitly unlocks the internal mutex before calling the continuation.

This fixes a deadlock when using QFuture::then(context, ...) where
the paren future is resolved from the same thread that the context
object lives in.

Fixes: QTBUG-119406
Fixes: QTBUG-119103
Fixes: QTBUG-117918
Fixes: QTBUG-119579
Fixes: QTBUG-119810
Change-Id: I112b16024cde6b6ee0e4d8127392864b813df5bc
Reviewed-by: Qt CI Bot <qt_ci_bot@qt-project.org>
Reviewed-by: Ivan Solovev <ivan.solovev@qt.io>
(cherry picked from commit 59e21a536f7f81625216dc7a621e7be59919da33)
Reviewed-by: Qt Cherry-pick Bot <cherrypick_bot@qt-project.org>
(cherry picked from commit f89d80c902957f22c8d8f18f234347407ed96b91)
This commit is contained in:
Arno Rehn 2023-11-23 23:59:34 +01:00 committed by Qt Cherry-pick Bot
parent e3e3ab5633
commit d5528cdeca
3 changed files with 76 additions and 92 deletions

View File

@ -45,73 +45,19 @@ const auto suspendingOrSuspended =
} // unnamed namespace } // unnamed namespace
class QBasicFutureWatcher : public QObject, QFutureCallOutInterface class QObjectContinuationWrapper : public QObject
{ {
Q_OBJECT Q_OBJECT
public: public:
explicit QBasicFutureWatcher(QObject *parent = nullptr); explicit QObjectContinuationWrapper(QObject *parent = nullptr)
~QBasicFutureWatcher() override; : QObject(parent)
{
}
void setFuture(QFutureInterfaceBase &fi); signals:
void run();
bool event(QEvent *event) override;
Q_SIGNALS:
void finished();
private:
QFutureInterfaceBase future;
void postCallOutEvent(const QFutureCallOutEvent &event) override;
void callOutInterfaceDisconnected() override;
}; };
void QBasicFutureWatcher::postCallOutEvent(const QFutureCallOutEvent &event)
{
if (thread() == QThread::currentThread()) {
// If we are in the same thread, don't queue up anything.
std::unique_ptr<QFutureCallOutEvent> clonedEvent(event.clone());
QCoreApplication::sendEvent(this, clonedEvent.get());
} else {
QCoreApplication::postEvent(this, event.clone());
}
}
void QBasicFutureWatcher::callOutInterfaceDisconnected()
{
QCoreApplication::removePostedEvents(this, QEvent::FutureCallOut);
}
/*
* QBasicFutureWatcher is a more lightweight version of QFutureWatcher for internal use
*/
QBasicFutureWatcher::QBasicFutureWatcher(QObject *parent)
: QObject(parent)
{
}
QBasicFutureWatcher::~QBasicFutureWatcher()
{
future.d->disconnectOutputInterface(this);
}
void QBasicFutureWatcher::setFuture(QFutureInterfaceBase &fi)
{
future = fi;
future.d->connectOutputInterface(this);
}
bool QBasicFutureWatcher::event(QEvent *event)
{
if (event->type() == QEvent::FutureCallOut) {
QFutureCallOutEvent *callOutEvent = static_cast<QFutureCallOutEvent *>(event);
if (callOutEvent->callOutType == QFutureCallOutEvent::Finished)
emit finished();
return true;
}
return QObject::event(event);
}
void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj, void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *slotObj,
QFutureInterfaceBase &fi) QFutureInterfaceBase &fi)
{ {
@ -120,22 +66,40 @@ void QtPrivate::watchContinuationImpl(const QObject *context, QSlotObjectBase *s
auto slot = SlotObjUniquePtr(slotObj); auto slot = SlotObjUniquePtr(slotObj);
auto *watcher = new QBasicFutureWatcher; auto *watcher = new QObjectContinuationWrapper;
watcher->moveToThread(context->thread()); watcher->moveToThread(context->thread());
// We need to protect acccess to the watcher. The context object (and in turn, the watcher)
// could be destroyed while the continuation that emits the signal is running. We have to
// prevent that.
// The mutex has to be recursive, because the continuation itself could delete the context
// object (and thus the watcher), which will try to lock the mutex from the same thread twice.
auto watcherMutex = std::make_shared<QRecursiveMutex>();
const auto destroyWatcher = [watcherMutex, watcher]() mutable {
QMutexLocker lock(watcherMutex.get());
watcher->deleteLater();
};
// ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`... // ### we're missing a convenient way to `QObject::connect()` to a `QSlotObjectBase`...
QObject::connect(watcher, &QBasicFutureWatcher::finished, QObject::connect(watcher, &QObjectContinuationWrapper::run,
// for the following, cf. QMetaObject::invokeMethodImpl(): // for the following, cf. QMetaObject::invokeMethodImpl():
// we know `slot` is a lambda returning `void`, so we can just // we know `slot` is a lambda returning `void`, so we can just
// `call()` with `obj` and `args[0]` set to `nullptr`: // `call()` with `obj` and `args[0]` set to `nullptr`:
watcher, [slot = std::move(slot)] { context, [slot = std::move(slot)] {
void *args[] = { nullptr }; // for `void` return value void *args[] = { nullptr }; // for `void` return value
slot->call(nullptr, args); slot->call(nullptr, args);
}); });
QObject::connect(watcher, &QBasicFutureWatcher::finished, QObject::connect(watcher, &QObjectContinuationWrapper::run, watcher, &QObject::deleteLater);
watcher, &QObject::deleteLater); QObject::connect(context, &QObject::destroyed, watcher, destroyWatcher);
QObject::connect(context, &QObject::destroyed,
watcher, &QObject::deleteLater); fi.setContinuation([watcherMutex, watcher = QPointer(watcher)]
watcher->setFuture(fi); (const QFutureInterfaceBase &parentData)
{
Q_UNUSED(parentData);
QMutexLocker lock(watcherMutex.get());
if (watcher)
emit watcher->run();
});
} }
QFutureCallOutInterface::~QFutureCallOutInterface() QFutureCallOutInterface::~QFutureCallOutInterface()

View File

@ -21,6 +21,7 @@ QT_BEGIN_NAMESPACE
template <typename T> class QFuture; template <typename T> class QFuture;
class QThreadPool; class QThreadPool;
class QFutureInterfaceBase;
class QFutureInterfaceBasePrivate; class QFutureInterfaceBasePrivate;
class QFutureWatcherBase; class QFutureWatcherBase;
class QFutureWatcherBasePrivate; class QFutureWatcherBasePrivate;
@ -39,8 +40,10 @@ template<class Function, class ResultType>
class FailureHandler; class FailureHandler;
#endif #endif
void Q_CORE_EXPORT watchContinuationImpl(const QObject *context,
QtPrivate::QSlotObjectBase *slotObj,
QFutureInterfaceBase &fi);
} }
class QBasicFutureWatcher;
class Q_CORE_EXPORT QFutureInterfaceBase class Q_CORE_EXPORT QFutureInterfaceBase
{ {
@ -178,7 +181,8 @@ private:
friend class QtPrivate::FailureHandler; friend class QtPrivate::FailureHandler;
#endif #endif
friend class QBasicFutureWatcher; friend Q_CORE_EXPORT void QtPrivate::watchContinuationImpl(
const QObject *context, QtPrivate::QSlotObjectBase *slotObj, QFutureInterfaceBase &fi);
template<class T> template<class T>
friend class QPromise; friend class QPromise;

View File

@ -201,6 +201,7 @@ private slots:
#endif #endif
void onCanceled(); void onCanceled();
void cancelContinuations(); void cancelContinuations();
void continuationsWithContext_data();
void continuationsWithContext(); void continuationsWithContext();
void continuationsWithMoveOnlyLambda(); void continuationsWithMoveOnlyLambda();
#if 0 #if 0
@ -3220,15 +3221,35 @@ void tst_QFuture::cancelContinuations()
} }
} }
void tst_QFuture::continuationsWithContext_data()
{
QTest::addColumn<bool>("inOtherThread");
QTest::addRow("in-other-thread") << true;
QTest::addRow("in-main-thread-qtbug119406") << false;
}
void tst_QFuture::continuationsWithContext() void tst_QFuture::continuationsWithContext()
{ {
QThread thread; QFETCH(bool, inOtherThread);
thread.start();
auto context = new QObject();
context->moveToThread(&thread);
auto tstThread = QThread::currentThread(); auto tstThread = QThread::currentThread();
QThread *thread = inOtherThread ? new QThread
: tstThread;
auto context = new QObject();
const auto cleanupGuard = qScopeGuard([&] {
context->deleteLater();
if (thread != tstThread) {
thread->quit();
thread->wait();
delete thread;
}
});
if (inOtherThread) {
thread->start();
context->moveToThread(thread);
}
// .then() // .then()
{ {
@ -3241,12 +3262,12 @@ void tst_QFuture::continuationsWithContext()
}) })
.then(context, .then(context,
[&](int val) { [&](int val) {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
return val + 1; return val + 1;
}) })
.then([&](int val) { .then([&](int val) {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
return val + 1; return val + 1;
}); });
@ -3262,12 +3283,12 @@ void tst_QFuture::continuationsWithContext()
auto future = promise.future() auto future = promise.future()
.onCanceled(context, .onCanceled(context,
[&] { [&] {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
return 1; return 1;
}) })
.then([&](int val) { .then([&](int val) {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
return val + 1; return val + 1;
}); });
@ -3284,17 +3305,17 @@ void tst_QFuture::continuationsWithContext()
// like QPointers to the parent not being set to nullptr during child // like QPointers to the parent not being set to nullptr during child
// object destruction. // object destruction.
QPointer shortLivedContext = new FakeQWidget(); QPointer shortLivedContext = new FakeQWidget();
shortLivedContext->moveToThread(&thread); shortLivedContext->moveToThread(thread);
QPromise<int> promise; QPromise<int> promise;
auto future = promise.future() auto future = promise.future()
.then(shortLivedContext, [&](int val) { .then(shortLivedContext, [&](int val) {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
return val + 1000; return val + 1000;
}) })
.onCanceled([&, ptr=QPointer(shortLivedContext)] { .onCanceled([&, ptr=QPointer(shortLivedContext)] {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
if (ptr) if (ptr)
return 1; return 1;
@ -3304,10 +3325,10 @@ void tst_QFuture::continuationsWithContext()
QMetaObject::invokeMethod(shortLivedContext, [&]() { QMetaObject::invokeMethod(shortLivedContext, [&]() {
delete shortLivedContext; delete shortLivedContext;
}, Qt::BlockingQueuedConnection); }, inOtherThread ? Qt::BlockingQueuedConnection
: Qt::DirectConnection);
promise.finish(); promise.finish();
QCOMPARE(future.result(), 2); QCOMPARE(future.result(), 2);
} }
@ -3323,12 +3344,12 @@ void tst_QFuture::continuationsWithContext()
}) })
.onFailed(context, .onFailed(context,
[&] { [&] {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
return 1; return 1;
}) })
.then([&](int val) { .then([&](int val) {
if (QThread::currentThread() != &thread) if (QThread::currentThread() != thread)
return 0; return 0;
return val + 1; return val + 1;
}); });
@ -3337,11 +3358,6 @@ void tst_QFuture::continuationsWithContext()
QCOMPARE(future.result(), 2); QCOMPARE(future.result(), 2);
} }
#endif // QT_NO_EXCEPTIONS #endif // QT_NO_EXCEPTIONS
context->deleteLater();
thread.quit();
thread.wait();
} }
void tst_QFuture::continuationsWithMoveOnlyLambda() void tst_QFuture::continuationsWithMoveOnlyLambda()