Add support for attaching continuations to QFuture

Added QFuture::then() methods to allow chaining multiple asynchronous
computations.

Continuations can use the following execution policies:

* QtFuture::Launch::Sync - the continuation will be launched in the same
thread in which the parent has been executing.

* QtFuture::Launch::Async - the continuation will be launched in a new
thread.

* QtFuture::Launch::Inherit - the continuation will inherit the launch
policy of the parent, or its thread pool (if it was using a custom one).

* Additionally then() also accepts a custom QThreadPool* instance.

Note, that if the parent future gets canceled, its continuation(s) will
be also canceled.

If the parent throws an exception, it will be propagated to the
continuation's future, unless it is caught inside the continuation
(if it has a QFuture arg).

Some example usages:

 QFuture<int> future = ...;
 future.then([](int res1){ ... }).then([](int res2){ ... })...

 QFuture<int> future = ...;
 future.then([](QFuture<int> fut1){ /* do something with fut1 */ })...

In the examples above all continuations will run in the same thread as
future.

 QFuture<int> future = ...;
 future.then(QtFuture::Launch::Async, [](int res1){ ... })
       .then([](int res2){ ... })..

In this example the continuations will run in a new thread (but on the
same one).

 QThreadPool pool;
 QFuture<int> future = ...;
 future.then(&pool, [](int res1){ ... })
       .then([](int res2){ ... })..

In this example the continuations will run in the given thread pool.

[ChangeLog][QtCore] Added support for attaching continuations to QFuture.

Task-number: QTBUG-81587
Change-Id: I5b2e176694f7ae8ce00404aca725e9a170818955
Reviewed-by: Leena Miettinen <riitta-leena.miettinen@qt.io>
Reviewed-by: Timur Pocheptsov <timur.pocheptsov@qt.io>
Reviewed-by: Mårten Nordheim <marten.nordheim@qt.io>
This commit is contained in:
Sona Kurazyan 2020-02-27 17:30:07 +01:00
parent 1057d568bb
commit dfaca09e85
10 changed files with 1151 additions and 7 deletions

View File

@ -574,6 +574,7 @@ qt_extend_target(Core CONDITION QT_FEATURE_future
SOURCES
thread/qexception.cpp thread/qexception.h
thread/qfuture.h
thread/qfuture_impl.h
thread/qfutureinterface.cpp thread/qfutureinterface.h thread/qfutureinterface_p.h
thread/qfuturesynchronizer.h
thread/qfuturewatcher.cpp thread/qfuturewatcher.h thread/qfuturewatcher_p.h

View File

@ -677,6 +677,7 @@ qt_extend_target(Core CONDITION QT_FEATURE_future
SOURCES
thread/qexception.cpp thread/qexception.h
thread/qfuture.h
thread/qfuture_impl.h
thread/qfutureinterface.cpp thread/qfutureinterface.h thread/qfutureinterface_p.h
thread/qfuturesynchronizer.h
thread/qfuturewatcher.cpp thread/qfuturewatcher.h thread/qfuturewatcher_p.h

View File

@ -45,11 +45,12 @@
#include <QtCore/qfutureinterface.h>
#include <QtCore/qstring.h>
#include <QtCore/qfuture_impl.h>
QT_REQUIRE_CONFIG(future);
QT_BEGIN_NAMESPACE
template <typename T>
class QFutureWatcher;
template <>
@ -101,6 +102,18 @@ public:
operator T() const { return result(); }
QList<T> results() const { return d.results(); }
template<class Function>
using ResultType = typename QtPrivate::ResultTypeHelper<Function, T>::ResultType;
template<class Function>
QFuture<ResultType<Function>> then(Function &&function);
template<class Function>
QFuture<ResultType<Function>> then(QtFuture::Launch policy, Function &&function);
template<class Function>
QFuture<ResultType<Function>> then(QThreadPool *pool, Function &&function);
class const_iterator
{
public:
@ -199,6 +212,7 @@ private:
friend class QFutureWatcher<T>;
public: // Warning: the d pointer is not documented and is considered private.
// TODO: make this private
mutable QFutureInterface<T> d;
};
@ -222,6 +236,35 @@ inline QFuture<T> QFutureInterface<T>::future()
return QFuture<T>(this);
}
template<class T>
template<class Function>
QFuture<typename QFuture<T>::template ResultType<Function>> QFuture<T>::then(Function &&function)
{
return then(QtFuture::Launch::Sync, std::forward<Function>(function));
}
template<class T>
template<class Function>
QFuture<typename QFuture<T>::template ResultType<Function>>
QFuture<T>::then(QtFuture::Launch policy, Function &&function)
{
QFutureInterface<ResultType<Function>> promise(QFutureInterfaceBase::State::Pending);
QtPrivate::Continuation<Function, ResultType<Function>, T>::create(
std::forward<Function>(function), this, promise, policy);
return promise.future();
}
template<class T>
template<class Function>
QFuture<typename QFuture<T>::template ResultType<Function>> QFuture<T>::then(QThreadPool *pool,
Function &&function)
{
QFutureInterface<ResultType<Function>> promise(QFutureInterfaceBase::State::Pending);
QtPrivate::Continuation<Function, ResultType<Function>, T>::create(
std::forward<Function>(function), this, promise, pool);
return promise.future();
}
Q_DECLARE_SEQUENTIAL_ITERATOR(Future)
template <>
@ -272,6 +315,18 @@ public:
QString progressText() const { return d.progressText(); }
void waitForFinished() { d.waitForFinished(); }
template<class Function>
using ResultType = typename QtPrivate::ResultTypeHelper<Function, void>::ResultType;
template<class Function>
QFuture<ResultType<Function>> then(Function &&function);
template<class Function>
QFuture<ResultType<Function>> then(QtFuture::Launch policy, Function &&function);
template<class Function>
QFuture<ResultType<Function>> then(QThreadPool *pool, Function &&function);
private:
friend class QFutureWatcher<void>;
@ -279,6 +334,9 @@ private:
public:
#endif
mutable QFutureInterfaceBase d;
template<typename Function, typename ResultType, typename ParentResultType>
friend class QtPrivate::Continuation;
};
inline QFuture<void> QFutureInterface<void>::future()
@ -292,6 +350,32 @@ QFuture<void> qToVoidFuture(const QFuture<T> &future)
return QFuture<void>(future.d);
}
template<class Function>
QFuture<QFuture<void>::ResultType<Function>> QFuture<void>::then(Function &&function)
{
return then(QtFuture::Launch::Sync, std::forward<Function>(function));
}
template<class Function>
QFuture<QFuture<void>::ResultType<Function>> QFuture<void>::then(QtFuture::Launch policy,
Function &&function)
{
QFutureInterface<ResultType<Function>> promise(QFutureInterfaceBase::State::Pending);
QtPrivate::Continuation<Function, ResultType<Function>, void>::create(
std::forward<Function>(function), this, promise, policy);
return promise.future();
}
template<class Function>
QFuture<QFuture<void>::ResultType<Function>> QFuture<void>::then(QThreadPool *pool,
Function &&function)
{
QFutureInterface<ResultType<Function>> promise(QFutureInterfaceBase::State::Pending);
QtPrivate::Continuation<Function, ResultType<Function>, void>::create(
std::forward<Function>(function), this, promise, pool);
return promise.future();
}
QT_END_NAMESPACE
#endif // QFUTURE_H

View File

@ -682,3 +682,144 @@
\sa findNext()
*/
/*!
\namespace QtFuture
\inmodule QtCore
\brief Contains miscellaneous identifiers used by the QFuture class.
*/
/*!
\enum QtFuture::Launch
\since 6.0
Represents execution policies for running a QFuture continuation.
\value Sync The continuation will be launched in the same thread in
which the parent has been executing.
\value Async The continuation will be launched in in a separate thread taken from
the global QThreadPool.
\value Inherit The continuation will inherit the launch policy of the parent or its
thread pool, if it was using a custom one.
\sa QFuture::then(), QThreadPool::globalInstance()
*/
/*! \fn template<class T> template<class Function> QFuture<typename QFuture<T>::ResultType<Function>> QFuture<T>::then(Function &&function)
\since 6.0
\overload
Attaches a continuation to this future, allowing to chain multiple asynchronous
computations if desired. When the asynchronous computation represented by this
future finishes, \a function will be invoked in the same thread in which this
future has been running. A new QFuture representing the result of the continuation
is returned.
\note Use other overloads of this method if you need to launch the continuation in
a separate thread.
If this future has a result (is not a QFuture<void>), \a function takes the result
of this future as its argument.
You can chain multiple operations like this:
\code
QFuture<int> future = ...;
future.then([](int res1){ ... }).then([](int res2){ ... })...
\endcode
Or:
\code
QFuture<void> future = ...;
future.then([](){ ... }).then([](){ ... })...
\endcode
The continuation can also take a QFuture argument (instead of its value), representing
the previous future. This can be useful if, for example, QFuture has multiple results,
and the user wants to access them inside the continuation. Or the user needs to handle
the exception of the previous future inside the continuation, to not interrupt the chain
of multiple continuations. For example:
\code
QFuture<int> future = ...;
future.then([](QFuture<int> f) {
try {
...
auto result = f.result();
...
} catch (QException &e) {
// handle the exception
}
}).then(...);
\endcode
If the previous future throws an exception and it is not handled inside the
continuation, the exception will be propagated to the continuation future, to
allow the caller to handle it:
\code
QFuture<int> parentFuture = ...;
auto continuation = parentFuture.then([](int res1){ ... }).then([](int res2){ ... })...
...
// parentFuture throws an exception
try {
auto result = continuation.result();
} catch (QException &e) {
// handle the exception
}
\endcode
In this case the whole chain of continuations will be interrupted.
\note If the parent future gets canceled, its continuations will
also be canceled.
*/
/*! \fn template<class T> template<class Function> QFuture<typename QFuture<T>::ResultType<Function>> QFuture<T>::then(QtFuture::Launch policy, Function &&function)
\since 6.0
\overload
Attaches a continuation to this future, allowing to chain multiple asynchronous
computations. When the asynchronous computation represented by this future
finishes, \a function will be invoked according to the given launch \a policy.
A new QFuture representing the result of the continuation is returned.
Depending on the \a policy, continuation will run in the same thread as the parent,
run in a new thread, or inherit the launch policy and thread pool of the parent.
In the following example both continuations will run in a new thread (but in
the same one).
\code
QFuture<int> future = ...;
future.then(QtFuture::Launch::Async, [](int res){ ... }).then([](int res2){ ... });
\endcode
In the following example both continuations will run in new threads using the same
thread pool.
\code
QFuture<int> future = ...;
future.then(QtFuture::Launch::Async, [](int res){ ... })
.then(QtFuture::Launch::Inherit, [](int res2){ ... });
\endcode
*/
/*! \fn template<class T> template<class Function> QFuture<typename QFuture<T>::ResultType<Function>> QFuture<T>::then(QThreadPool *pool, Function &&function)
\since 6.0
\overload
Attaches a continuation to this future, allowing to chain multiple asynchronous
computations if desired. When the asynchronous computation represented by this
future finishes, \a function will be invoked in a separate thread taken from the
QThreadPool \a pool.
*/

View File

@ -0,0 +1,339 @@
/****************************************************************************
**
** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
**
** $QT_BEGIN_LICENSE:LGPL$
** Commercial License Usage
** Licensees holding valid commercial Qt licenses may use this file in
** accordance with the commercial license agreement provided with the
** Software or, alternatively, in accordance with the terms contained in
** a written agreement between you and The Qt Company. For licensing terms
** and conditions see https://www.qt.io/terms-conditions. For further
** information use the contact form at https://www.qt.io/contact-us.
**
** GNU Lesser General Public License Usage
** Alternatively, this file may be used under the terms of the GNU Lesser
** General Public License version 3 as published by the Free Software
** Foundation and appearing in the file LICENSE.LGPL3 included in the
** packaging of this file. Please review the following information to
** ensure the GNU Lesser General Public License version 3 requirements
** will be met: https://www.gnu.org/licenses/lgpl-3.0.html.
**
** GNU General Public License Usage
** Alternatively, this file may be used under the terms of the GNU
** General Public License version 2.0 or (at your option) the GNU General
** Public license version 3 or any later version approved by the KDE Free
** Qt Foundation. The licenses are as published by the Free Software
** Foundation and appearing in the file LICENSE.GPL2 and LICENSE.GPL3
** included in the packaging of this file. Please review the following
** information to ensure the GNU General Public License requirements will
** be met: https://www.gnu.org/licenses/gpl-2.0.html and
** https://www.gnu.org/licenses/gpl-3.0.html.
**
** $QT_END_LICENSE$
**
****************************************************************************/
#ifndef QFUTURE_H
#error Do not include qfuture_impl.h directly
#endif
#if 0
#pragma qt_sync_skip_header_check
#pragma qt_sync_stop_processing
#endif
#include <QtCore/qglobal.h>
#include <QtCore/qfutureinterface.h>
#include <QtCore/qthreadpool.h>
QT_BEGIN_NAMESPACE
//
// forward declarations
//
template<class T>
class QFuture;
template<class T>
class QFutureInterface;
namespace QtFuture {
enum class Launch { Sync, Async, Inherit };
}
namespace QtPrivate {
template<typename F, typename Arg, typename Enable = void>
struct ResultTypeHelper
{
};
// The callable takes an argument of type Arg
template<typename F, typename Arg>
struct ResultTypeHelper<
F, Arg, typename std::enable_if_t<!std::is_invocable_v<std::decay_t<F>, QFuture<Arg>>>>
{
using ResultType = std::invoke_result_t<std::decay_t<F>, std::decay_t<Arg>>;
};
// The callable takes an argument of type QFuture<Arg>
template<class F, class Arg>
struct ResultTypeHelper<
F, Arg, typename std::enable_if_t<std::is_invocable_v<std::decay_t<F>, QFuture<Arg>>>>
{
using ResultType = std::invoke_result_t<std::decay_t<F>, QFuture<Arg>>;
};
// The callable takes an argument of type QFuture<void>
template<class F>
struct ResultTypeHelper<
F, void, typename std::enable_if_t<std::is_invocable_v<std::decay_t<F>, QFuture<void>>>>
{
using ResultType = std::invoke_result_t<std::decay_t<F>, QFuture<void>>;
};
// The callable doesn't take argument
template<class F>
struct ResultTypeHelper<
F, void, typename std::enable_if_t<!std::is_invocable_v<std::decay_t<F>, QFuture<void>>>>
{
using ResultType = std::invoke_result_t<std::decay_t<F>>;
};
template<typename Function, typename ResultType, typename ParentResultType>
class Continuation
{
public:
Continuation(Function &&func, const QFuture<ParentResultType> &f,
const QFutureInterface<ResultType> &p)
: promise(p), parentFuture(f), function(std::forward<Function>(func))
{
}
virtual ~Continuation() = default;
bool execute();
static void create(Function &&func, QFuture<ParentResultType> *f,
QFutureInterface<ResultType> &p, QtFuture::Launch policy);
static void create(Function &&func, QFuture<ParentResultType> *f,
QFutureInterface<ResultType> &p, QThreadPool *pool);
protected:
virtual void runImpl() = 0;
void runFunction();
protected:
QFutureInterface<ResultType> promise;
const QFuture<ParentResultType> parentFuture;
Function function;
};
template<typename Function, typename ResultType, typename ParentResultType>
class SyncContinuation final : public Continuation<Function, ResultType, ParentResultType>
{
public:
SyncContinuation(Function &&func, const QFuture<ParentResultType> &f,
const QFutureInterface<ResultType> &p)
: Continuation<Function, ResultType, ParentResultType>(std::forward<Function>(func), f, p)
{
}
~SyncContinuation() override = default;
private:
void runImpl() override { this->runFunction(); }
};
template<typename Function, typename ResultType, typename ParentResultType>
class AsyncContinuation final : public QRunnable,
public Continuation<Function, ResultType, ParentResultType>
{
public:
AsyncContinuation(Function &&func, const QFuture<ParentResultType> &f,
const QFutureInterface<ResultType> &p, QThreadPool *pool = nullptr)
: Continuation<Function, ResultType, ParentResultType>(std::forward<Function>(func), f, p),
threadPool(pool)
{
this->promise.setRunnable(this);
}
~AsyncContinuation() override = default;
private:
void runImpl() override // from Continuation
{
QThreadPool *pool = threadPool ? threadPool : QThreadPool::globalInstance();
pool->start(this);
}
void run() override // from QRunnable
{
this->runFunction();
}
private:
QThreadPool *threadPool;
};
template<typename Function, typename ResultType, typename ParentResultType>
void Continuation<Function, ResultType, ParentResultType>::runFunction()
{
promise.reportStarted();
Q_ASSERT(parentFuture.isFinished());
#ifndef QT_NO_EXCEPTIONS
try {
#endif
if constexpr (!std::is_void_v<ResultType>) {
if constexpr (std::is_invocable_v<std::decay_t<Function>, QFuture<ParentResultType>>) {
promise.reportResult(function(parentFuture));
} else if constexpr (std::is_void_v<ParentResultType>) {
promise.reportResult(function());
} else {
// This assert normally should never fail, this is to make sure
// that nothing unexpected happend.
static_assert(
std::is_invocable_v<std::decay_t<Function>, std::decay_t<ParentResultType>>,
"The continuation is not invocable with the provided arguments");
promise.reportResult(function(parentFuture.result()));
}
} else {
if constexpr (std::is_invocable_v<std::decay_t<Function>, QFuture<ParentResultType>>) {
function(parentFuture);
} else if constexpr (std::is_void_v<ParentResultType>) {
function();
} else {
// This assert normally should never fail, this is to make sure
// that nothing unexpected happend.
static_assert(
std::is_invocable_v<std::decay_t<Function>, std::decay_t<ParentResultType>>,
"The continuation is not invocable with the provided arguments");
function(parentFuture.result());
}
}
#ifndef QT_NO_EXCEPTIONS
} catch (QException &e) {
promise.reportException(e);
} catch (...) {
promise.reportException(QUnhandledException());
}
#endif
promise.reportFinished();
}
template<typename Function, typename ResultType, typename ParentResultType>
bool Continuation<Function, ResultType, ParentResultType>::execute()
{
Q_ASSERT(parentFuture.isFinished());
if (parentFuture.isCanceled()) {
#ifndef QT_NO_EXCEPTIONS
if (parentFuture.d.exceptionStore().hasException()) {
// If the continuation doesn't take a QFuture argument, propagate the exception
// to the caller, by reporting it. If the continuation takes a QFuture argument,
// the user may want to catch the exception inside the continuation, to not
// interrupt the continuation chain, so don't report anything yet.
if constexpr (!std::is_invocable_v<std::decay_t<Function>, QFuture<ParentResultType>>) {
promise.reportStarted();
const QException *e = parentFuture.d.exceptionStore().exception().exception();
promise.reportException(*e);
promise.reportFinished();
return false;
}
} else
#endif
{
promise.reportStarted();
promise.reportCanceled();
promise.reportFinished();
return false;
}
}
runImpl();
return true;
}
template<typename Function, typename ResultType, typename ParentResultType>
void Continuation<Function, ResultType, ParentResultType>::create(Function &&func,
QFuture<ParentResultType> *f,
QFutureInterface<ResultType> &p,
QtFuture::Launch policy)
{
Q_ASSERT(f);
QThreadPool *pool = nullptr;
bool launchAsync = (policy == QtFuture::Launch::Async);
if (policy == QtFuture::Launch::Inherit) {
launchAsync = f->d.launchAsync();
// If the parent future was using a custom thread pool, inherit it as well.
if (launchAsync && f->d.threadPool()) {
pool = f->d.threadPool();
p.setThreadPool(pool);
}
}
Continuation<Function, ResultType, ParentResultType> *continuationJob = nullptr;
if (launchAsync) {
continuationJob = new AsyncContinuation<Function, ResultType, ParentResultType>(
std::forward<Function>(func), *f, p, pool);
} else {
continuationJob = new SyncContinuation<Function, ResultType, ParentResultType>(
std::forward<Function>(func), *f, p);
}
p.setLaunchAsync(launchAsync);
auto continuation = [continuationJob, policy, launchAsync]() mutable {
bool isLaunched = continuationJob->execute();
// If continuation is successfully launched, AsyncContinuation will be deleted
// by the QThreadPool which has started it. Synchronous continuation will be
// executed immediately, so it's safe to always delete it here.
if (!(launchAsync && isLaunched)) {
delete continuationJob;
continuationJob = nullptr;
}
};
f->d.setContinuation(std::move(continuation));
}
template<typename Function, typename ResultType, typename ParentResultType>
void Continuation<Function, ResultType, ParentResultType>::create(Function &&func,
QFuture<ParentResultType> *f,
QFutureInterface<ResultType> &p,
QThreadPool *pool)
{
Q_ASSERT(f);
auto continuationJob = new AsyncContinuation<Function, ResultType, ParentResultType>(
std::forward<Function>(func), *f, p, pool);
p.setLaunchAsync(true);
p.setThreadPool(pool);
auto continuation = [continuationJob]() mutable {
bool isLaunched = continuationJob->execute();
// If continuation is successfully launched, AsyncContinuation will be deleted
// by the QThreadPool which has started it.
if (!isLaunched) {
delete continuationJob;
continuationJob = nullptr;
}
};
f->d.setContinuation(continuation);
}
} // namespace QtPrivate
QT_END_NAMESPACE

View File

@ -191,6 +191,11 @@ bool QFutureInterfaceBase::isResultReadyAt(int index) const
return d->internal_isResultReadyAt(index);
}
bool QFutureInterfaceBase::isRunningOrPending() const
{
return queryState(static_cast<State>(Running | Pending));
}
bool QFutureInterfaceBase::waitForNextResult()
{
QMutexLocker lock(&d->m_mutex);
@ -315,7 +320,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex)
d->m_exceptionStore.throwPossibleException();
QMutexLocker lock(&d->m_mutex);
if (!isRunning())
if (!isRunningOrPending())
return;
lock.unlock();
@ -326,7 +331,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex)
lock.relock();
const int waitIndex = (resultIndex == -1) ? INT_MAX : resultIndex;
while (isRunning() && !d->internal_isResultReadyAt(waitIndex))
while (isRunningOrPending() && !d->internal_isResultReadyAt(waitIndex))
d->waitCondition.wait(&d->m_mutex);
d->m_exceptionStore.throwPossibleException();
@ -335,7 +340,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex)
void QFutureInterfaceBase::waitForFinished()
{
QMutexLocker lock(&d->m_mutex);
const bool alreadyFinished = !isRunning();
const bool alreadyFinished = !isRunningOrPending();
lock.unlock();
if (!alreadyFinished) {
@ -343,7 +348,7 @@ void QFutureInterfaceBase::waitForFinished()
lock.relock();
while (isRunning())
while (isRunningOrPending())
d->waitCondition.wait(&d->m_mutex);
}
@ -386,6 +391,11 @@ void QFutureInterfaceBase::setThreadPool(QThreadPool *pool)
d->m_pool = pool;
}
QThreadPool *QFutureInterfaceBase::threadPool() const
{
return d->m_pool;
}
void QFutureInterfaceBase::setFilterMode(bool enable)
{
QMutexLocker locker(&d->m_mutex);
@ -604,4 +614,36 @@ void QFutureInterfaceBasePrivate::setState(QFutureInterfaceBase::State newState)
state.storeRelaxed(newState);
}
void QFutureInterfaceBase::setContinuation(std::function<void()> func)
{
QMutexLocker lock(&d->continuationMutex);
// If the state is ready, run continuation immediately,
// otherwise save it for later.
if (isFinished()) {
lock.unlock();
func();
} else {
d->continuation = std::move(func);
}
}
void QFutureInterfaceBase::runContinuation() const
{
QMutexLocker lock(&d->continuationMutex);
if (d->continuation) {
lock.unlock();
d->continuation();
}
}
void QFutureInterfaceBase::setLaunchAsync(bool value)
{
d->launchAsync = value;
}
bool QFutureInterfaceBase::launchAsync() const
{
return d->launchAsync;
}
QT_END_NAMESPACE

View File

@ -58,6 +58,11 @@ class QFutureInterfaceBasePrivate;
class QFutureWatcherBase;
class QFutureWatcherBasePrivate;
namespace QtPrivate {
template<typename Function, typename ResultType, typename ParentResultType>
class Continuation;
}
class Q_CORE_EXPORT QFutureInterfaceBase
{
public:
@ -68,7 +73,9 @@ public:
Finished = 0x04,
Canceled = 0x08,
Paused = 0x10,
Throttled = 0x20
Throttled = 0x20,
// Pending means that the future depends on another one, which is not finished yet
Pending = 0x40
};
QFutureInterfaceBase(State initialState = NoState);
@ -86,6 +93,7 @@ public:
void setRunnable(QRunnable *runnable);
void setThreadPool(QThreadPool *pool);
QThreadPool *threadPool() const;
void setFilterMode(bool enable);
void setProgressRange(int minimum, int maximum);
int progressMinimum() const;
@ -141,6 +149,18 @@ private:
private:
friend class QFutureWatcherBase;
friend class QFutureWatcherBasePrivate;
template<typename Function, typename ResultType, typename ParentResultType>
friend class QtPrivate::Continuation;
protected:
void setContinuation(std::function<void()> func);
void runContinuation() const;
void setLaunchAsync(bool value);
bool launchAsync() const;
bool isRunningOrPending() const;
};
template <typename T>
@ -239,6 +259,7 @@ inline void QFutureInterface<T>::reportFinished(const T *result)
if (result)
reportResult(result);
QFutureInterfaceBase::reportFinished();
QFutureInterfaceBase::runContinuation();
}
template <typename T>
@ -292,7 +313,11 @@ public:
void reportResult(const void *, int) { }
void reportResults(const QVector<void> &, int) { }
void reportFinished(const void * = nullptr) { QFutureInterfaceBase::reportFinished(); }
void reportFinished(const void * = nullptr)
{
QFutureInterfaceBase::reportFinished();
QFutureInterfaceBase::runContinuation();
}
};
QT_END_NAMESPACE

View File

@ -191,6 +191,12 @@ public:
void disconnectOutputInterface(QFutureCallOutInterface *iface);
void setState(QFutureInterfaceBase::State state);
// Wrapper for continuation
std::function<void()> continuation;
QBasicMutex continuationMutex;
bool launchAsync = false;
};
QT_END_NAMESPACE

View File

@ -66,6 +66,7 @@ qtConfig(future) {
HEADERS += \
thread/qexception.h \
thread/qfuture.h \
thread/qfuture_impl.h \
thread/qfutureinterface.h \
thread/qfutureinterface_p.h \
thread/qfuturesynchronizer.h \

View File

@ -94,6 +94,13 @@ private slots:
void nestedExceptions();
#endif
void nonGlobalThreadPool();
void then();
void thenOnCanceledFuture();
#ifndef QT_NO_EXCEPTIONS
void thenOnExceptionFuture();
void thenThrows();
#endif
};
void tst_QFuture::resultStore()
@ -1557,5 +1564,502 @@ void tst_QFuture::nonGlobalThreadPool()
}
}
void tst_QFuture::then()
{
{
struct Add
{
static int addTwo(int arg) { return arg + 2; }
int operator()(int arg) const { return arg + 3; }
};
QFutureInterface<int> promise;
QFuture<int> then = promise.future()
.then([](int res) { return res + 1; }) // lambda
.then(Add::addTwo) // function
.then(Add()); // functor
promise.reportStarted();
QVERIFY(!then.isStarted());
QVERIFY(!then.isFinished());
const int result = 0;
promise.reportResult(result);
promise.reportFinished();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QCOMPARE(then.result(), result + 6);
}
// then() on a ready future
{
QFutureInterface<int> promise;
promise.reportStarted();
const int result = 0;
promise.reportResult(result);
promise.reportFinished();
QFuture<int> then = promise.future()
.then([](int res1) { return res1 + 1; })
.then([](int res2) { return res2 + 2; })
.then([](int res3) { return res3 + 3; });
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QCOMPARE(then.result(), result + 6);
}
// Continuation of QFuture<void>
{
int result = 0;
QFutureInterface<void> promise;
QFuture<void> then = promise.future()
.then([&]() { result += 1; })
.then([&]() { result += 2; })
.then([&]() { result += 3; });
promise.reportStarted();
QVERIFY(!then.isStarted());
QVERIFY(!then.isFinished());
promise.reportFinished();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QCOMPARE(result, 6);
}
// Continuation returns QFuture<void>
{
QFutureInterface<int> promise;
int value;
QFuture<void> then =
promise.future().then([](int res) { return res * 2; }).then([&](int prevResult) {
value = prevResult;
});
promise.reportStarted();
QVERIFY(!then.isStarted());
QVERIFY(!then.isFinished());
const int result = 5;
promise.reportResult(result);
promise.reportFinished();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QCOMPARE(value, result * 2);
}
// Continuations taking a QFuture argument.
{
int value = 0;
QFutureInterface<int> promise;
QFuture<void> then = promise.future()
.then([](QFuture<int> f1) { return f1.result() + 1; })
.then([&](QFuture<int> f2) { value = f2.result() + 2; })
.then([&](QFuture<void> f3) {
QVERIFY(f3.isFinished());
value += 3;
});
promise.reportStarted();
QVERIFY(!then.isStarted());
QVERIFY(!then.isFinished());
const int result = 0;
promise.reportResult(result);
promise.reportFinished();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QCOMPARE(value, 6);
}
// Continuations use a new thread
{
Qt::HANDLE threadId1 = nullptr;
Qt::HANDLE threadId2 = nullptr;
QFutureInterface<void> promise;
QFuture<void> then = promise.future()
.then(QtFuture::Launch::Async,
[&]() { threadId1 = QThread::currentThreadId(); })
.then([&]() { threadId2 = QThread::currentThreadId(); });
promise.reportStarted();
QVERIFY(!then.isStarted());
QVERIFY(!then.isFinished());
promise.reportFinished();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QVERIFY(threadId1 != QThread::currentThreadId());
QVERIFY(threadId2 != QThread::currentThreadId());
QVERIFY(threadId1 == threadId2);
}
// Continuation inherits the launch policy of its parent (QtFuture::Launch::Sync)
{
Qt::HANDLE threadId1 = nullptr;
Qt::HANDLE threadId2 = nullptr;
QFutureInterface<void> promise;
QFuture<void> then = promise.future()
.then(QtFuture::Launch::Sync,
[&]() { threadId1 = QThread::currentThreadId(); })
.then(QtFuture::Launch::Inherit,
[&]() { threadId2 = QThread::currentThreadId(); });
promise.reportStarted();
QVERIFY(!then.isStarted());
QVERIFY(!then.isFinished());
promise.reportFinished();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QVERIFY(threadId1 == QThread::currentThreadId());
QVERIFY(threadId2 == QThread::currentThreadId());
QVERIFY(threadId1 == threadId2);
}
// Continuation inherits the launch policy of its parent (QtFuture::Launch::Async)
{
Qt::HANDLE threadId1 = nullptr;
Qt::HANDLE threadId2 = nullptr;
QFutureInterface<void> promise;
QFuture<void> then = promise.future()
.then(QtFuture::Launch::Async,
[&]() { threadId1 = QThread::currentThreadId(); })
.then(QtFuture::Launch::Inherit,
[&]() { threadId2 = QThread::currentThreadId(); });
promise.reportStarted();
QVERIFY(!then.isStarted());
QVERIFY(!then.isFinished());
promise.reportFinished();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QVERIFY(threadId1 != QThread::currentThreadId());
QVERIFY(threadId2 != QThread::currentThreadId());
}
// Continuations use a custom thread pool
{
QFutureInterface<void> promise;
QThreadPool pool;
QVERIFY(pool.waitForDone(0)); // pool is not busy yet
QSemaphore semaphore;
QFuture<void> then = promise.future().then(&pool, [&]() { semaphore.acquire(); });
promise.reportStarted();
promise.reportFinished();
// Make sure the custom thread pool is busy on running the continuation
QVERIFY(!pool.waitForDone(0));
semaphore.release();
then.waitForFinished();
QVERIFY(then.isStarted());
QVERIFY(then.isFinished());
QCOMPARE(then.d.threadPool(), &pool);
}
// Continuation inherits parent's thread pool
{
Qt::HANDLE threadId1 = nullptr;
Qt::HANDLE threadId2 = nullptr;
QFutureInterface<void> promise;
QThreadPool pool;
QFuture<void> then1 = promise.future().then(&pool, [&]() {
threadId1 = QThread::currentThreadId();
});
promise.reportStarted();
promise.reportFinished();
then1.waitForFinished();
QVERIFY(pool.waitForDone()); // The pool is not busy after the first continuation is done
QSemaphore semaphore;
QFuture<void> then2 = then1.then(QtFuture::Launch::Inherit, [&]() {
semaphore.acquire();
threadId2 = QThread::currentThreadId();
});
QVERIFY(!pool.waitForDone(0)); // The pool is busy running the 2nd continuation
semaphore.release();
then2.waitForFinished();
QVERIFY(then2.isStarted());
QVERIFY(then2.isFinished());
QCOMPARE(then1.d.threadPool(), then2.d.threadPool());
QCOMPARE(then2.d.threadPool(), &pool);
QVERIFY(threadId1 != QThread::currentThreadId());
QVERIFY(threadId2 != QThread::currentThreadId());
}
}
void tst_QFuture::thenOnCanceledFuture()
{
// Continuations on a canceled future
{
QFutureInterface<void> promise;
promise.reportStarted();
promise.reportCanceled();
promise.reportFinished();
int thenResult = 0;
QFuture<void> then =
promise.future().then([&]() { ++thenResult; }).then([&]() { ++thenResult; });
QVERIFY(then.isCanceled());
QCOMPARE(thenResult, 0);
}
// QFuture gets canceled after continuations are set
{
QFutureInterface<void> promise;
int thenResult = 0;
QFuture<void> then =
promise.future().then([&]() { ++thenResult; }).then([&]() { ++thenResult; });
promise.reportStarted();
promise.reportCanceled();
promise.reportFinished();
QVERIFY(then.isCanceled());
QCOMPARE(thenResult, 0);
}
// Same with QtFuture::Launch::Async
// Continuations on a canceled future
{
QFutureInterface<void> promise;
promise.reportStarted();
promise.reportCanceled();
promise.reportFinished();
int thenResult = 0;
QFuture<void> then =
promise.future().then(QtFuture::Launch::Async, [&]() { ++thenResult; }).then([&]() {
++thenResult;
});
QVERIFY(then.isCanceled());
QCOMPARE(thenResult, 0);
}
// QFuture gets canceled after continuations are set
{
QFutureInterface<void> promise;
int thenResult = 0;
QFuture<void> then =
promise.future().then(QtFuture::Launch::Async, [&]() { ++thenResult; }).then([&]() {
++thenResult;
});
promise.reportStarted();
promise.reportCanceled();
promise.reportFinished();
QVERIFY(then.isCanceled());
QCOMPARE(thenResult, 0);
}
}
#ifndef QT_NO_EXCEPTIONS
void tst_QFuture::thenOnExceptionFuture()
{
{
QFutureInterface<int> promise;
int thenResult = 0;
QFuture<void> then = promise.future().then([&](int res) { thenResult = res; });
promise.reportStarted();
QException e;
promise.reportException(e);
promise.reportFinished();
bool caught = false;
try {
then.waitForFinished();
} catch (QException &) {
caught = true;
}
QVERIFY(caught);
QCOMPARE(thenResult, 0);
}
// Exception handled inside the continuation
{
QFutureInterface<int> promise;
bool caught = false;
bool caughtByContinuation = false;
bool success = false;
int thenResult = 0;
QFuture<void> then = promise.future()
.then([&](QFuture<int> res) {
try {
thenResult = res.result();
} catch (QException &) {
caughtByContinuation = true;
}
})
.then([&]() { success = true; });
promise.reportStarted();
QException e;
promise.reportException(e);
promise.reportFinished();
try {
then.waitForFinished();
} catch (QException &) {
caught = true;
}
QCOMPARE(thenResult, 0);
QVERIFY(!caught);
QVERIFY(caughtByContinuation);
QVERIFY(success);
}
// Exception future
{
QFutureInterface<int> promise;
promise.reportStarted();
QException e;
promise.reportException(e);
promise.reportFinished();
int thenResult = 0;
QFuture<void> then = promise.future().then([&](int res) { thenResult = res; });
bool caught = false;
try {
then.waitForFinished();
} catch (QException &) {
caught = true;
}
QVERIFY(caught);
QCOMPARE(thenResult, 0);
}
// Same with QtFuture::Launch::Async
{
QFutureInterface<int> promise;
int thenResult = 0;
QFuture<void> then =
promise.future().then(QtFuture::Launch::Async, [&](int res) { thenResult = res; });
promise.reportStarted();
QException e;
promise.reportException(e);
promise.reportFinished();
bool caught = false;
try {
then.waitForFinished();
} catch (QException &) {
caught = true;
}
QVERIFY(caught);
QCOMPARE(thenResult, 0);
}
// Exception future
{
QFutureInterface<int> promise;
promise.reportStarted();
QException e;
promise.reportException(e);
promise.reportFinished();
int thenResult = 0;
QFuture<void> then =
promise.future().then(QtFuture::Launch::Async, [&](int res) { thenResult = res; });
bool caught = false;
try {
then.waitForFinished();
} catch (QException &) {
caught = true;
}
QVERIFY(caught);
QCOMPARE(thenResult, 0);
}
}
void tst_QFuture::thenThrows()
{
// Continuation throws an exception
{
QFutureInterface<void> promise;
QFuture<void> then = promise.future().then([]() { throw QException(); });
promise.reportStarted();
promise.reportFinished();
bool caught = false;
try {
then.waitForFinished();
} catch (QException &) {
caught = true;
}
QVERIFY(caught);
}
// Same with QtFuture::Launch::Async
{
QFutureInterface<void> promise;
QFuture<void> then =
promise.future().then(QtFuture::Launch::Async, []() { throw QException(); });
promise.reportStarted();
promise.reportFinished();
bool caught = false;
try {
then.waitForFinished();
} catch (QException &) {
caught = true;
}
QVERIFY(caught);
}
}
#endif
QTEST_MAIN(tst_QFuture)
#include "tst_qfuture.moc"