AssetDownloader: Import TaskTree solution
An import from Creator 14.0 branch. Task-number: QTBUG-122550 Fixes: QTBUG-126022 Change-Id: I7a1ce1c05cb388104c2e79a0a270adf3e4c57cd2 Reviewed-by: Kai Köhne <kai.koehne@qt.io> (cherry picked from commit 78de81e286d89a5396b08b67ca98ccedda8a836f) Reviewed-by: Qt Cherry-pick Bot <cherrypick_bot@qt-project.org>
This commit is contained in:
parent
4d6fd8302e
commit
355f68bc4a
@ -8,6 +8,13 @@ qt_internal_add_module(ExamplesAssetDownloaderPrivate
|
||||
INTERNAL_MODULE
|
||||
SOURCES
|
||||
assetdownloader.cpp assetdownloader.h
|
||||
tasking/barrier.cpp tasking/barrier.h
|
||||
tasking/concurrentcall.h
|
||||
tasking/networkquery.cpp tasking/networkquery.h
|
||||
tasking/qprocesstask.cpp tasking/qprocesstask.h
|
||||
tasking/tasking_global.h
|
||||
tasking/tasktree.cpp tasking/tasktree.h
|
||||
tasking/tasktreerunner.cpp tasking/tasktreerunner.h
|
||||
LIBRARIES
|
||||
Qt6::Concurrent
|
||||
Qt6::CorePrivate
|
||||
|
54
src/assets/downloader/tasking/barrier.cpp
Normal file
54
src/assets/downloader/tasking/barrier.cpp
Normal file
@ -0,0 +1,54 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#include "barrier.h"
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
// That's cut down qtcassert.{c,h} to avoid the dependency.
|
||||
#define QT_STRING(cond) qDebug("SOFT ASSERT: \"%s\" in %s: %s", cond, __FILE__, QT_STRINGIFY(__LINE__))
|
||||
#define QT_ASSERT(cond, action) if (Q_LIKELY(cond)) {} else { QT_STRING(#cond); action; } do {} while (0)
|
||||
|
||||
void Barrier::setLimit(int value)
|
||||
{
|
||||
QT_ASSERT(!isRunning(), return);
|
||||
QT_ASSERT(value > 0, return);
|
||||
|
||||
m_limit = value;
|
||||
}
|
||||
|
||||
void Barrier::start()
|
||||
{
|
||||
QT_ASSERT(!isRunning(), return);
|
||||
m_current = 0;
|
||||
m_result.reset();
|
||||
}
|
||||
|
||||
void Barrier::advance()
|
||||
{
|
||||
// Calling advance on finished is OK
|
||||
QT_ASSERT(isRunning() || m_result, return);
|
||||
if (!isRunning()) // no-op
|
||||
return;
|
||||
++m_current;
|
||||
if (m_current == m_limit)
|
||||
stopWithResult(DoneResult::Success);
|
||||
}
|
||||
|
||||
void Barrier::stopWithResult(DoneResult result)
|
||||
{
|
||||
// Calling stopWithResult on finished is OK when the same success is passed
|
||||
QT_ASSERT(isRunning() || (m_result && *m_result == result), return);
|
||||
if (!isRunning()) // no-op
|
||||
return;
|
||||
m_current = -1;
|
||||
m_result = result;
|
||||
emit done(result);
|
||||
}
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
112
src/assets/downloader/tasking/barrier.h
Normal file
112
src/assets/downloader/tasking/barrier.h
Normal file
@ -0,0 +1,112 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef TASKING_BARRIER_H
|
||||
#define TASKING_BARRIER_H
|
||||
|
||||
//
|
||||
// W A R N I N G
|
||||
// -------------
|
||||
//
|
||||
// This file is not part of the Qt API. It exists purely as an
|
||||
// implementation detail. This header file may change from version to
|
||||
// version without notice, or even be removed.
|
||||
//
|
||||
// We mean it.
|
||||
//
|
||||
|
||||
#include "tasking_global.h"
|
||||
|
||||
#include "tasktree.h"
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
class TASKING_EXPORT Barrier final : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
void setLimit(int value);
|
||||
int limit() const { return m_limit; }
|
||||
|
||||
void start();
|
||||
void advance(); // If limit reached, stops with true
|
||||
void stopWithResult(DoneResult result); // Ignores limit
|
||||
|
||||
bool isRunning() const { return m_current >= 0; }
|
||||
int current() const { return m_current; }
|
||||
std::optional<DoneResult> result() const { return m_result; }
|
||||
|
||||
Q_SIGNALS:
|
||||
void done(DoneResult success);
|
||||
|
||||
private:
|
||||
std::optional<DoneResult> m_result = {};
|
||||
int m_limit = 1;
|
||||
int m_current = -1;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT BarrierTaskAdapter : public TaskAdapter<Barrier>
|
||||
{
|
||||
public:
|
||||
BarrierTaskAdapter() { connect(task(), &Barrier::done, this, &TaskInterface::done); }
|
||||
void start() final { task()->start(); }
|
||||
};
|
||||
|
||||
using BarrierTask = CustomTask<BarrierTaskAdapter>;
|
||||
|
||||
template <int Limit = 1>
|
||||
class SharedBarrier
|
||||
{
|
||||
public:
|
||||
static_assert(Limit > 0, "SharedBarrier's limit should be 1 or more.");
|
||||
SharedBarrier() : m_barrier(new Barrier) {
|
||||
m_barrier->setLimit(Limit);
|
||||
m_barrier->start();
|
||||
}
|
||||
Barrier *barrier() const { return m_barrier.get(); }
|
||||
|
||||
private:
|
||||
std::shared_ptr<Barrier> m_barrier;
|
||||
};
|
||||
|
||||
template <int Limit = 1>
|
||||
using MultiBarrier = Storage<SharedBarrier<Limit>>;
|
||||
|
||||
// Can't write: "MultiBarrier barrier;". Only "MultiBarrier<> barrier;" would work.
|
||||
// Can't have one alias with default type in C++17, getting the following error:
|
||||
// alias template deduction only available with C++20.
|
||||
using SingleBarrier = MultiBarrier<1>;
|
||||
|
||||
template <int Limit>
|
||||
GroupItem waitForBarrierTask(const MultiBarrier<Limit> &sharedBarrier)
|
||||
{
|
||||
return BarrierTask([sharedBarrier](Barrier &barrier) {
|
||||
SharedBarrier<Limit> *activeBarrier = sharedBarrier.activeStorage();
|
||||
if (!activeBarrier) {
|
||||
qWarning("The barrier referenced from WaitForBarrier element "
|
||||
"is not reachable in the running tree. "
|
||||
"It is possible that no barrier was added to the tree, "
|
||||
"or the storage is not reachable from where it is referenced. "
|
||||
"The WaitForBarrier task finishes with an error. ");
|
||||
return SetupResult::StopWithError;
|
||||
}
|
||||
Barrier *activeSharedBarrier = activeBarrier->barrier();
|
||||
const std::optional<DoneResult> result = activeSharedBarrier->result();
|
||||
if (result.has_value()) {
|
||||
return result.value() == DoneResult::Success ? SetupResult::StopWithSuccess
|
||||
: SetupResult::StopWithError;
|
||||
}
|
||||
QObject::connect(activeSharedBarrier, &Barrier::done, &barrier, &Barrier::stopWithResult);
|
||||
return SetupResult::Continue;
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif // TASKING_BARRIER_H
|
119
src/assets/downloader/tasking/concurrentcall.h
Normal file
119
src/assets/downloader/tasking/concurrentcall.h
Normal file
@ -0,0 +1,119 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef TASKING_CONCURRENTCALL_H
|
||||
#define TASKING_CONCURRENTCALL_H
|
||||
|
||||
//
|
||||
// W A R N I N G
|
||||
// -------------
|
||||
//
|
||||
// This file is not part of the Qt API. It exists purely as an
|
||||
// implementation detail. This header file may change from version to
|
||||
// version without notice, or even be removed.
|
||||
//
|
||||
// We mean it.
|
||||
//
|
||||
|
||||
#include "tasktree.h"
|
||||
|
||||
#include <QtConcurrent/QtConcurrent>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
// This class introduces the dependency to Qt::Concurrent, otherwise Tasking namespace
|
||||
// is independent on Qt::Concurrent.
|
||||
// Possibly, it could be placed inside Qt::Concurrent library, as a wrapper around
|
||||
// QtConcurrent::run() call.
|
||||
|
||||
template <typename ResultType>
|
||||
class ConcurrentCall
|
||||
{
|
||||
Q_DISABLE_COPY_MOVE(ConcurrentCall)
|
||||
|
||||
public:
|
||||
ConcurrentCall() = default;
|
||||
template <typename Function, typename ...Args>
|
||||
void setConcurrentCallData(Function &&function, Args &&...args)
|
||||
{
|
||||
return wrapConcurrent(std::forward<Function>(function), std::forward<Args>(args)...);
|
||||
}
|
||||
void setThreadPool(QThreadPool *pool) { m_threadPool = pool; }
|
||||
ResultType result() const
|
||||
{
|
||||
return m_future.resultCount() ? m_future.result() : ResultType();
|
||||
}
|
||||
QList<ResultType> results() const
|
||||
{
|
||||
return m_future.results();
|
||||
}
|
||||
QFuture<ResultType> future() const { return m_future; }
|
||||
|
||||
private:
|
||||
template <typename Function, typename ...Args>
|
||||
void wrapConcurrent(Function &&function, Args &&...args)
|
||||
{
|
||||
m_startHandler = [this, function = std::forward<Function>(function), args...] {
|
||||
QThreadPool *threadPool = m_threadPool ? m_threadPool : QThreadPool::globalInstance();
|
||||
return QtConcurrent::run(threadPool, function, args...);
|
||||
};
|
||||
}
|
||||
|
||||
template <typename Function, typename ...Args>
|
||||
void wrapConcurrent(std::reference_wrapper<const Function> &&wrapper, Args &&...args)
|
||||
{
|
||||
m_startHandler = [this, wrapper = std::forward<std::reference_wrapper<const Function>>(wrapper), args...] {
|
||||
QThreadPool *threadPool = m_threadPool ? m_threadPool : QThreadPool::globalInstance();
|
||||
return QtConcurrent::run(threadPool, std::forward<const Function>(wrapper.get()),
|
||||
args...);
|
||||
};
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
friend class ConcurrentCallTaskAdapter;
|
||||
|
||||
std::function<QFuture<ResultType>()> m_startHandler;
|
||||
QThreadPool *m_threadPool = nullptr;
|
||||
QFuture<ResultType> m_future;
|
||||
};
|
||||
|
||||
template <typename ResultType>
|
||||
class ConcurrentCallTaskAdapter : public TaskAdapter<ConcurrentCall<ResultType>>
|
||||
{
|
||||
public:
|
||||
~ConcurrentCallTaskAdapter() {
|
||||
if (m_watcher) {
|
||||
m_watcher->cancel();
|
||||
m_watcher->waitForFinished();
|
||||
}
|
||||
}
|
||||
|
||||
void start() final {
|
||||
if (!this->task()->m_startHandler) {
|
||||
emit this->done(DoneResult::Error); // TODO: Add runtime assert
|
||||
return;
|
||||
}
|
||||
m_watcher.reset(new QFutureWatcher<ResultType>);
|
||||
this->connect(m_watcher.get(), &QFutureWatcherBase::finished, this, [this] {
|
||||
emit this->done(toDoneResult(!m_watcher->isCanceled()));
|
||||
m_watcher.release()->deleteLater();
|
||||
});
|
||||
this->task()->m_future = this->task()->m_startHandler();
|
||||
m_watcher->setFuture(this->task()->m_future);
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<QFutureWatcher<ResultType>> m_watcher;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
using ConcurrentCallTask = CustomTask<ConcurrentCallTaskAdapter<T>>;
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif // TASKING_CONCURRENTCALL_H
|
58
src/assets/downloader/tasking/networkquery.cpp
Normal file
58
src/assets/downloader/tasking/networkquery.cpp
Normal file
@ -0,0 +1,58 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#include "networkquery.h"
|
||||
|
||||
#include <QtNetwork/QNetworkAccessManager>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
void NetworkQuery::start()
|
||||
{
|
||||
if (m_reply) {
|
||||
qWarning("The NetworkQuery is already running. Ignoring the call to start().");
|
||||
return;
|
||||
}
|
||||
if (!m_manager) {
|
||||
qWarning("Can't start the NetworkQuery without the QNetworkAccessManager. "
|
||||
"Stopping with an error.");
|
||||
emit done(DoneResult::Error);
|
||||
return;
|
||||
}
|
||||
switch (m_operation) {
|
||||
case NetworkOperation::Get:
|
||||
m_reply.reset(m_manager->get(m_request));
|
||||
break;
|
||||
case NetworkOperation::Put:
|
||||
m_reply.reset(m_manager->put(m_request, m_writeData));
|
||||
break;
|
||||
case NetworkOperation::Post:
|
||||
m_reply.reset(m_manager->post(m_request, m_writeData));
|
||||
break;
|
||||
case NetworkOperation::Delete:
|
||||
m_reply.reset(m_manager->deleteResource(m_request));
|
||||
break;
|
||||
}
|
||||
connect(m_reply.get(), &QNetworkReply::finished, this, [this] {
|
||||
disconnect(m_reply.get(), &QNetworkReply::finished, this, nullptr);
|
||||
emit done(toDoneResult(m_reply->error() == QNetworkReply::NoError));
|
||||
m_reply.release()->deleteLater();
|
||||
});
|
||||
if (m_reply->isRunning())
|
||||
emit started();
|
||||
}
|
||||
|
||||
NetworkQuery::~NetworkQuery()
|
||||
{
|
||||
if (m_reply) {
|
||||
disconnect(m_reply.get(), &QNetworkReply::finished, this, nullptr);
|
||||
m_reply->abort();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
77
src/assets/downloader/tasking/networkquery.h
Normal file
77
src/assets/downloader/tasking/networkquery.h
Normal file
@ -0,0 +1,77 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef TASKING_NETWORKQUERY_H
|
||||
#define TASKING_NETWORKQUERY_H
|
||||
|
||||
//
|
||||
// W A R N I N G
|
||||
// -------------
|
||||
//
|
||||
// This file is not part of the Qt API. It exists purely as an
|
||||
// implementation detail. This header file may change from version to
|
||||
// version without notice, or even be removed.
|
||||
//
|
||||
// We mean it.
|
||||
//
|
||||
|
||||
#include "tasking_global.h"
|
||||
|
||||
#include "tasktree.h"
|
||||
|
||||
#include <QtNetwork/QNetworkReply>
|
||||
#include <QtNetwork/QNetworkRequest>
|
||||
|
||||
#include <memory>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
class QNetworkAccessManager;
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
// This class introduces the dependency to Qt::Network, otherwise Tasking namespace
|
||||
// is independent on Qt::Network.
|
||||
// Possibly, it could be placed inside Qt::Network library, as a wrapper around QNetworkReply.
|
||||
|
||||
enum class NetworkOperation { Get, Put, Post, Delete };
|
||||
|
||||
class TASKING_EXPORT NetworkQuery final : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
~NetworkQuery();
|
||||
void setRequest(const QNetworkRequest &request) { m_request = request; }
|
||||
void setOperation(NetworkOperation operation) { m_operation = operation; }
|
||||
void setWriteData(const QByteArray &data) { m_writeData = data; }
|
||||
void setNetworkAccessManager(QNetworkAccessManager *manager) { m_manager = manager; }
|
||||
QNetworkReply *reply() const { return m_reply.get(); }
|
||||
void start();
|
||||
|
||||
Q_SIGNALS:
|
||||
void started();
|
||||
void done(DoneResult result);
|
||||
|
||||
private:
|
||||
QNetworkRequest m_request;
|
||||
NetworkOperation m_operation = NetworkOperation::Get;
|
||||
QByteArray m_writeData; // Used by Put and Post
|
||||
QNetworkAccessManager *m_manager = nullptr;
|
||||
std::unique_ptr<QNetworkReply> m_reply;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT NetworkQueryTaskAdapter : public TaskAdapter<NetworkQuery>
|
||||
{
|
||||
public:
|
||||
NetworkQueryTaskAdapter() { connect(task(), &NetworkQuery::done, this, &TaskInterface::done); }
|
||||
void start() final { task()->start(); }
|
||||
};
|
||||
|
||||
using NetworkQueryTask = CustomTask<NetworkQueryTaskAdapter>;
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif // TASKING_NETWORKQUERY_H
|
279
src/assets/downloader/tasking/qprocesstask.cpp
Normal file
279
src/assets/downloader/tasking/qprocesstask.cpp
Normal file
@ -0,0 +1,279 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#include "qprocesstask.h"
|
||||
|
||||
#include <QtCore/QCoreApplication>
|
||||
#include <QtCore/QDebug>
|
||||
#include <QtCore/QMutex>
|
||||
#include <QtCore/QThread>
|
||||
#include <QtCore/QTimer>
|
||||
#include <QtCore/QWaitCondition>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
#if QT_CONFIG(process)
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
class ProcessReaperPrivate;
|
||||
|
||||
class ProcessReaper final
|
||||
{
|
||||
public:
|
||||
static void reap(QProcess *process, int timeoutMs = 500);
|
||||
ProcessReaper();
|
||||
~ProcessReaper();
|
||||
|
||||
private:
|
||||
static ProcessReaper *instance();
|
||||
|
||||
QThread m_thread;
|
||||
ProcessReaperPrivate *m_private;
|
||||
};
|
||||
|
||||
static const int s_timeoutThreshold = 10000; // 10 seconds
|
||||
|
||||
static QString execWithArguments(QProcess *process)
|
||||
{
|
||||
QStringList commandLine;
|
||||
commandLine.append(process->program());
|
||||
commandLine.append(process->arguments());
|
||||
return commandLine.join(QChar::Space);
|
||||
}
|
||||
|
||||
struct ReaperSetup
|
||||
{
|
||||
QProcess *m_process = nullptr;
|
||||
int m_timeoutMs;
|
||||
};
|
||||
|
||||
class Reaper : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
Reaper(const ReaperSetup &reaperSetup) : m_reaperSetup(reaperSetup) {}
|
||||
|
||||
void reap()
|
||||
{
|
||||
m_timer.start();
|
||||
connect(m_reaperSetup.m_process, &QProcess::finished, this, &Reaper::handleFinished);
|
||||
if (emitFinished())
|
||||
return;
|
||||
terminate();
|
||||
}
|
||||
|
||||
Q_SIGNALS:
|
||||
void finished();
|
||||
|
||||
private:
|
||||
void terminate()
|
||||
{
|
||||
m_reaperSetup.m_process->terminate();
|
||||
QTimer::singleShot(m_reaperSetup.m_timeoutMs, this, &Reaper::handleTerminateTimeout);
|
||||
}
|
||||
|
||||
void kill() { m_reaperSetup.m_process->kill(); }
|
||||
|
||||
bool emitFinished()
|
||||
{
|
||||
if (m_reaperSetup.m_process->state() != QProcess::NotRunning)
|
||||
return false;
|
||||
|
||||
if (!m_finished) {
|
||||
const int timeout = m_timer.elapsed();
|
||||
if (timeout > s_timeoutThreshold) {
|
||||
qWarning() << "Finished parallel reaping of" << execWithArguments(m_reaperSetup.m_process)
|
||||
<< "in" << (timeout / 1000.0) << "seconds.";
|
||||
}
|
||||
|
||||
m_finished = true;
|
||||
emit finished();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void handleFinished()
|
||||
{
|
||||
if (emitFinished())
|
||||
return;
|
||||
qWarning() << "Finished process still running...";
|
||||
// In case the process is still running - wait until it has finished
|
||||
QTimer::singleShot(m_reaperSetup.m_timeoutMs, this, &Reaper::handleFinished);
|
||||
}
|
||||
|
||||
void handleTerminateTimeout()
|
||||
{
|
||||
if (emitFinished())
|
||||
return;
|
||||
kill();
|
||||
}
|
||||
|
||||
bool m_finished = false;
|
||||
QElapsedTimer m_timer;
|
||||
const ReaperSetup m_reaperSetup;
|
||||
};
|
||||
|
||||
class ProcessReaperPrivate : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
// Called from non-reaper's thread
|
||||
void scheduleReap(const ReaperSetup &reaperSetup)
|
||||
{
|
||||
if (QThread::currentThread() == thread())
|
||||
qWarning() << "Can't schedule reap from the reaper internal thread.";
|
||||
|
||||
QMutexLocker locker(&m_mutex);
|
||||
m_reaperSetupList.append(reaperSetup);
|
||||
QMetaObject::invokeMethod(this, &ProcessReaperPrivate::flush);
|
||||
}
|
||||
|
||||
// Called from non-reaper's thread
|
||||
void waitForFinished()
|
||||
{
|
||||
if (QThread::currentThread() == thread())
|
||||
qWarning() << "Can't wait for finished from the reaper internal thread.";
|
||||
|
||||
QMetaObject::invokeMethod(this, &ProcessReaperPrivate::flush,
|
||||
Qt::BlockingQueuedConnection);
|
||||
QMutexLocker locker(&m_mutex);
|
||||
if (m_reaperList.isEmpty())
|
||||
return;
|
||||
|
||||
m_waitCondition.wait(&m_mutex);
|
||||
}
|
||||
|
||||
private:
|
||||
// All the private methods are called from the reaper's thread
|
||||
QList<ReaperSetup> takeReaperSetupList()
|
||||
{
|
||||
QMutexLocker locker(&m_mutex);
|
||||
return std::exchange(m_reaperSetupList, {});
|
||||
}
|
||||
|
||||
void flush()
|
||||
{
|
||||
while (true) {
|
||||
const QList<ReaperSetup> reaperSetupList = takeReaperSetupList();
|
||||
if (reaperSetupList.isEmpty())
|
||||
return;
|
||||
for (const ReaperSetup &reaperSetup : reaperSetupList)
|
||||
reap(reaperSetup);
|
||||
}
|
||||
}
|
||||
|
||||
void reap(const ReaperSetup &reaperSetup)
|
||||
{
|
||||
Reaper *reaper = new Reaper(reaperSetup);
|
||||
connect(reaper, &Reaper::finished, this, [this, reaper, process = reaperSetup.m_process] {
|
||||
QMutexLocker locker(&m_mutex);
|
||||
const bool isRemoved = m_reaperList.removeOne(reaper);
|
||||
if (!isRemoved)
|
||||
qWarning() << "Reaper list doesn't contain the finished process.";
|
||||
|
||||
delete reaper;
|
||||
delete process;
|
||||
if (m_reaperList.isEmpty())
|
||||
m_waitCondition.wakeOne();
|
||||
}, Qt::QueuedConnection);
|
||||
|
||||
{
|
||||
QMutexLocker locker(&m_mutex);
|
||||
m_reaperList.append(reaper);
|
||||
}
|
||||
|
||||
reaper->reap();
|
||||
}
|
||||
|
||||
QMutex m_mutex;
|
||||
QWaitCondition m_waitCondition;
|
||||
QList<ReaperSetup> m_reaperSetupList;
|
||||
QList<Reaper *> m_reaperList;
|
||||
};
|
||||
|
||||
static ProcessReaper *s_instance = nullptr;
|
||||
static QMutex s_instanceMutex;
|
||||
|
||||
// Call me with s_instanceMutex locked.
|
||||
ProcessReaper *ProcessReaper::instance()
|
||||
{
|
||||
if (!s_instance)
|
||||
s_instance = new ProcessReaper;
|
||||
return s_instance;
|
||||
}
|
||||
|
||||
ProcessReaper::ProcessReaper()
|
||||
: m_private(new ProcessReaperPrivate)
|
||||
{
|
||||
m_private->moveToThread(&m_thread);
|
||||
QObject::connect(&m_thread, &QThread::finished, m_private, &QObject::deleteLater);
|
||||
m_thread.start();
|
||||
m_thread.moveToThread(qApp->thread());
|
||||
}
|
||||
|
||||
ProcessReaper::~ProcessReaper()
|
||||
{
|
||||
if (QThread::currentThread() != qApp->thread())
|
||||
qWarning() << "Destructing process reaper from non-main thread.";
|
||||
|
||||
instance()->m_private->waitForFinished();
|
||||
m_thread.quit();
|
||||
m_thread.wait();
|
||||
}
|
||||
|
||||
void ProcessReaper::reap(QProcess *process, int timeoutMs)
|
||||
{
|
||||
if (!process)
|
||||
return;
|
||||
|
||||
if (QThread::currentThread() != process->thread()) {
|
||||
qWarning() << "Can't reap process from non-process's thread.";
|
||||
return;
|
||||
}
|
||||
|
||||
process->disconnect();
|
||||
if (process->state() == QProcess::NotRunning) {
|
||||
delete process;
|
||||
return;
|
||||
}
|
||||
|
||||
// Neither can move object with a parent into a different thread
|
||||
// nor reaping the process with a parent makes any sense.
|
||||
process->setParent(nullptr);
|
||||
|
||||
QMutexLocker locker(&s_instanceMutex);
|
||||
ProcessReaperPrivate *priv = instance()->m_private;
|
||||
|
||||
process->moveToThread(priv->thread());
|
||||
ReaperSetup reaperSetup {process, timeoutMs};
|
||||
priv->scheduleReap(reaperSetup);
|
||||
}
|
||||
|
||||
void QProcessDeleter::deleteAll()
|
||||
{
|
||||
QMutexLocker locker(&s_instanceMutex);
|
||||
delete s_instance;
|
||||
s_instance = nullptr;
|
||||
}
|
||||
|
||||
void QProcessDeleter::operator()(QProcess *process)
|
||||
{
|
||||
ProcessReaper::reap(process);
|
||||
}
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
#endif // QT_CONFIG(process)
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#if QT_CONFIG(process)
|
||||
|
||||
#include "qprocesstask.moc"
|
||||
|
||||
#endif // QT_CONFIG(process)
|
||||
|
89
src/assets/downloader/tasking/qprocesstask.h
Normal file
89
src/assets/downloader/tasking/qprocesstask.h
Normal file
@ -0,0 +1,89 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef TASKING_QPROCESSTASK_H
|
||||
#define TASKING_QPROCESSTASK_H
|
||||
|
||||
//
|
||||
// W A R N I N G
|
||||
// -------------
|
||||
//
|
||||
// This file is not part of the Qt API. It exists purely as an
|
||||
// implementation detail. This header file may change from version to
|
||||
// version without notice, or even be removed.
|
||||
//
|
||||
// We mean it.
|
||||
//
|
||||
|
||||
#include "tasking_global.h"
|
||||
|
||||
#include "tasktree.h"
|
||||
|
||||
#include <QtCore/QProcess>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
#if QT_CONFIG(process)
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
// Deleting a running QProcess may block the caller thread up to 30 seconds and issue warnings.
|
||||
// To avoid these issues we move the running QProcess into a separate thread
|
||||
// managed by the internal ProcessReaper, instead of deleting it immediately.
|
||||
// Inside the ProcessReaper's thread we try to finish the process in a most gentle way:
|
||||
// we call QProcess::terminate() with 500 ms timeout, and if the process is still running
|
||||
// after this timeout passed, we call QProcess::kill() and wait for the process to finish.
|
||||
// All these handlings are done is a separate thread, so the main thread doesn't block at all
|
||||
// when the QProcessTask is destructed.
|
||||
// Finally, on application quit, QProcessDeleter::deleteAll() should be called in order
|
||||
// to synchronize all the processes being still potentially reaped in a separate thread.
|
||||
// The call to QProcessDeleter::deleteAll() is blocking in case some processes
|
||||
// are still being reaped.
|
||||
// This strategy seems most sensible, since when passing the running QProcess into the
|
||||
// ProcessReaper we don't block immediately, but postpone the possible (not certain) block
|
||||
// until the end of an application.
|
||||
// In this way we terminate the running processes in the most safe way and keep the main thread
|
||||
// responsive. That's a common case when the running application wants to terminate the QProcess
|
||||
// immediately (e.g. on Cancel button pressed), without keeping and managing the handle
|
||||
// to the still running QProcess.
|
||||
|
||||
// The implementation of the internal reaper is inspired by the Utils::ProcessReaper taken
|
||||
// from the QtCreator codebase.
|
||||
|
||||
class TASKING_EXPORT QProcessDeleter
|
||||
{
|
||||
public:
|
||||
// Blocking, should be called after all QProcessAdapter instances are deleted.
|
||||
static void deleteAll();
|
||||
void operator()(QProcess *process);
|
||||
};
|
||||
|
||||
class TASKING_EXPORT QProcessAdapter : public TaskAdapter<QProcess, QProcessDeleter>
|
||||
{
|
||||
private:
|
||||
void start() final {
|
||||
connect(task(), &QProcess::finished, this, [this] {
|
||||
const bool success = task()->exitStatus() == QProcess::NormalExit
|
||||
&& task()->error() == QProcess::UnknownError
|
||||
&& task()->exitCode() == 0;
|
||||
Q_EMIT done(toDoneResult(success));
|
||||
});
|
||||
connect(task(), &QProcess::errorOccurred, this, [this](QProcess::ProcessError error) {
|
||||
if (error != QProcess::FailedToStart)
|
||||
return;
|
||||
Q_EMIT done(DoneResult::Error);
|
||||
});
|
||||
task()->start();
|
||||
}
|
||||
};
|
||||
|
||||
using QProcessTask = CustomTask<QProcessAdapter>;
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
#endif // QT_CONFIG(process)
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif // TASKING_QPROCESSTASK_H
|
25
src/assets/downloader/tasking/tasking_global.h
Normal file
25
src/assets/downloader/tasking/tasking_global.h
Normal file
@ -0,0 +1,25 @@
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef TASKING_GLOBAL_H
|
||||
#define TASKING_GLOBAL_H
|
||||
|
||||
#include <QtCore/qglobal.h>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
// #if defined(QT_SHARED) || !defined(QT_STATIC)
|
||||
// # if defined(TASKING_LIBRARY)
|
||||
// # define TASKING_EXPORT Q_DECL_EXPORT
|
||||
// # else
|
||||
// # define TASKING_EXPORT Q_DECL_IMPORT
|
||||
// # endif
|
||||
// #else
|
||||
// # define TASKING_EXPORT
|
||||
// #endif
|
||||
|
||||
#define TASKING_EXPORT
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif // TASKING_GLOBAL_H
|
3431
src/assets/downloader/tasking/tasktree.cpp
Normal file
3431
src/assets/downloader/tasking/tasktree.cpp
Normal file
File diff suppressed because it is too large
Load Diff
642
src/assets/downloader/tasking/tasktree.h
Normal file
642
src/assets/downloader/tasking/tasktree.h
Normal file
@ -0,0 +1,642 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef TASKING_TASKTREE_H
|
||||
#define TASKING_TASKTREE_H
|
||||
|
||||
//
|
||||
// W A R N I N G
|
||||
// -------------
|
||||
//
|
||||
// This file is not part of the Qt API. It exists purely as an
|
||||
// implementation detail. This header file may change from version to
|
||||
// version without notice, or even be removed.
|
||||
//
|
||||
// We mean it.
|
||||
//
|
||||
|
||||
#include "tasking_global.h"
|
||||
|
||||
#include <QtCore/QList>
|
||||
#include <QtCore/QObject>
|
||||
|
||||
#include <memory>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
template <class T>
|
||||
class QFuture;
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
Q_NAMESPACE
|
||||
|
||||
// WorkflowPolicy:
|
||||
// 1. When all children finished with success -> report success, otherwise:
|
||||
// a) Report error on first error and stop executing other children (including their subtree).
|
||||
// b) On first error - continue executing all children and report error afterwards.
|
||||
// 2. When all children finished with error -> report error, otherwise:
|
||||
// a) Report success on first success and stop executing other children (including their subtree).
|
||||
// b) On first success - continue executing all children and report success afterwards.
|
||||
// 3. Stops on first finished child. In sequential mode it will never run other children then the first one.
|
||||
// Useful only in parallel mode.
|
||||
// 4. Always run all children, let them finish, ignore their results and report success afterwards.
|
||||
// 5. Always run all children, let them finish, ignore their results and report error afterwards.
|
||||
|
||||
enum class WorkflowPolicy
|
||||
{
|
||||
StopOnError, // 1a - Reports error on first child error, otherwise success (if all children were success).
|
||||
ContinueOnError, // 1b - The same, but children execution continues. Reports success when no children.
|
||||
StopOnSuccess, // 2a - Reports success on first child success, otherwise error (if all children were error).
|
||||
ContinueOnSuccess, // 2b - The same, but children execution continues. Reports error when no children.
|
||||
StopOnSuccessOrError, // 3 - Stops on first finished child and report its result.
|
||||
FinishAllAndSuccess, // 4 - Reports success after all children finished.
|
||||
FinishAllAndError // 5 - Reports error after all children finished.
|
||||
};
|
||||
Q_ENUM_NS(WorkflowPolicy)
|
||||
|
||||
enum class SetupResult
|
||||
{
|
||||
Continue,
|
||||
StopWithSuccess,
|
||||
StopWithError
|
||||
};
|
||||
Q_ENUM_NS(SetupResult)
|
||||
|
||||
enum class DoneResult
|
||||
{
|
||||
Success,
|
||||
Error
|
||||
};
|
||||
Q_ENUM_NS(DoneResult)
|
||||
|
||||
enum class DoneWith
|
||||
{
|
||||
Success,
|
||||
Error,
|
||||
Cancel
|
||||
};
|
||||
Q_ENUM_NS(DoneWith)
|
||||
|
||||
enum class CallDoneIf
|
||||
{
|
||||
SuccessOrError,
|
||||
Success,
|
||||
Error
|
||||
};
|
||||
Q_ENUM_NS(CallDoneIf)
|
||||
|
||||
TASKING_EXPORT DoneResult toDoneResult(bool success);
|
||||
|
||||
class LoopData;
|
||||
class StorageData;
|
||||
class TaskTreePrivate;
|
||||
|
||||
class TASKING_EXPORT TaskInterface : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
Q_SIGNALS:
|
||||
void done(DoneResult result);
|
||||
|
||||
private:
|
||||
template <typename Task, typename Deleter> friend class TaskAdapter;
|
||||
friend class TaskTreePrivate;
|
||||
TaskInterface() = default;
|
||||
#ifdef Q_QDOC
|
||||
protected:
|
||||
#endif
|
||||
virtual void start() = 0;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT Loop
|
||||
{
|
||||
public:
|
||||
using Condition = std::function<bool(int)>; // Takes iteration, called prior to each iteration.
|
||||
using ValueGetter = std::function<const void *(int)>; // Takes iteration, returns ptr to ref.
|
||||
|
||||
int iteration() const;
|
||||
|
||||
protected:
|
||||
Loop(); // LoopForever
|
||||
Loop(int count, const ValueGetter &valueGetter = {}); // LoopRepeat, LoopList
|
||||
Loop(const Condition &condition); // LoopUntil
|
||||
|
||||
const void *valuePtr() const;
|
||||
|
||||
private:
|
||||
friend class ExecutionContextActivator;
|
||||
friend class TaskTreePrivate;
|
||||
std::shared_ptr<LoopData> m_loopData;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT LoopForever final : public Loop
|
||||
{
|
||||
public:
|
||||
LoopForever() : Loop() {}
|
||||
};
|
||||
|
||||
class TASKING_EXPORT LoopRepeat final : public Loop
|
||||
{
|
||||
public:
|
||||
LoopRepeat(int count) : Loop(count) {}
|
||||
};
|
||||
|
||||
class TASKING_EXPORT LoopUntil final : public Loop
|
||||
{
|
||||
public:
|
||||
LoopUntil(const Condition &condition) : Loop(condition) {}
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
class LoopList final : public Loop
|
||||
{
|
||||
public:
|
||||
LoopList(const QList<T> &list) : Loop(list.size(), [list](int i) { return &list.at(i); }) {}
|
||||
const T *operator->() const { return static_cast<const T *>(valuePtr()); }
|
||||
const T &operator*() const { return *static_cast<const T *>(valuePtr()); }
|
||||
};
|
||||
|
||||
class TASKING_EXPORT StorageBase
|
||||
{
|
||||
private:
|
||||
using StorageConstructor = std::function<void *(void)>;
|
||||
using StorageDestructor = std::function<void(void *)>;
|
||||
using StorageHandler = std::function<void(void *)>;
|
||||
|
||||
StorageBase(const StorageConstructor &ctor, const StorageDestructor &dtor);
|
||||
|
||||
void *activeStorageVoid() const;
|
||||
|
||||
friend bool operator==(const StorageBase &first, const StorageBase &second)
|
||||
{ return first.m_storageData == second.m_storageData; }
|
||||
|
||||
friend bool operator!=(const StorageBase &first, const StorageBase &second)
|
||||
{ return first.m_storageData != second.m_storageData; }
|
||||
|
||||
friend size_t qHash(const StorageBase &storage, uint seed = 0)
|
||||
{ return size_t(storage.m_storageData.get()) ^ seed; }
|
||||
|
||||
std::shared_ptr<StorageData> m_storageData;
|
||||
|
||||
template <typename StorageStruct> friend class Storage;
|
||||
friend class ExecutionContextActivator;
|
||||
friend class StorageData;
|
||||
friend class RuntimeContainer;
|
||||
friend class TaskTree;
|
||||
friend class TaskTreePrivate;
|
||||
};
|
||||
|
||||
template <typename StorageStruct>
|
||||
class Storage final : public StorageBase
|
||||
{
|
||||
public:
|
||||
Storage() : StorageBase(Storage::ctor(), Storage::dtor()) {}
|
||||
StorageStruct &operator*() const noexcept { return *activeStorage(); }
|
||||
StorageStruct *operator->() const noexcept { return activeStorage(); }
|
||||
StorageStruct *activeStorage() const {
|
||||
return static_cast<StorageStruct *>(activeStorageVoid());
|
||||
}
|
||||
|
||||
private:
|
||||
static StorageConstructor ctor() { return [] { return new StorageStruct(); }; }
|
||||
static StorageDestructor dtor() {
|
||||
return [](void *storage) { delete static_cast<StorageStruct *>(storage); };
|
||||
}
|
||||
};
|
||||
|
||||
class TASKING_EXPORT GroupItem
|
||||
{
|
||||
public:
|
||||
// Called when group entered, after group's storages are created
|
||||
using GroupSetupHandler = std::function<SetupResult()>;
|
||||
// Called when group done, before group's storages are deleted
|
||||
using GroupDoneHandler = std::function<DoneResult(DoneWith)>;
|
||||
|
||||
template <typename StorageStruct>
|
||||
GroupItem(const Storage<StorageStruct> &storage)
|
||||
: m_type(Type::Storage)
|
||||
, m_storageList{storage} {}
|
||||
|
||||
GroupItem(const Loop &loop) : GroupItem(GroupData{{}, {}, {}, loop}) {}
|
||||
|
||||
// TODO: Add tests.
|
||||
GroupItem(const QList<GroupItem> &children) : m_type(Type::List) { addChildren(children); }
|
||||
GroupItem(std::initializer_list<GroupItem> children) : m_type(Type::List) { addChildren(children); }
|
||||
|
||||
protected:
|
||||
// Internal, provided by CustomTask
|
||||
using InterfaceCreateHandler = std::function<TaskInterface *(void)>;
|
||||
// Called prior to task start, just after createHandler
|
||||
using InterfaceSetupHandler = std::function<SetupResult(TaskInterface &)>;
|
||||
// Called on task done, just before deleteLater
|
||||
using InterfaceDoneHandler = std::function<DoneResult(const TaskInterface &, DoneWith)>;
|
||||
|
||||
struct TaskHandler {
|
||||
InterfaceCreateHandler m_createHandler;
|
||||
InterfaceSetupHandler m_setupHandler = {};
|
||||
InterfaceDoneHandler m_doneHandler = {};
|
||||
CallDoneIf m_callDoneIf = CallDoneIf::SuccessOrError;
|
||||
};
|
||||
|
||||
struct GroupHandler {
|
||||
GroupSetupHandler m_setupHandler;
|
||||
GroupDoneHandler m_doneHandler = {};
|
||||
CallDoneIf m_callDoneIf = CallDoneIf::SuccessOrError;
|
||||
};
|
||||
|
||||
struct GroupData {
|
||||
GroupHandler m_groupHandler = {};
|
||||
std::optional<int> m_parallelLimit = {};
|
||||
std::optional<WorkflowPolicy> m_workflowPolicy = {};
|
||||
std::optional<Loop> m_loop = {};
|
||||
};
|
||||
|
||||
enum class Type {
|
||||
List,
|
||||
Group,
|
||||
GroupData,
|
||||
Storage,
|
||||
TaskHandler
|
||||
};
|
||||
|
||||
GroupItem() = default;
|
||||
GroupItem(Type type) : m_type(type) { }
|
||||
GroupItem(const GroupData &data)
|
||||
: m_type(Type::GroupData)
|
||||
, m_groupData(data) {}
|
||||
GroupItem(const TaskHandler &handler)
|
||||
: m_type(Type::TaskHandler)
|
||||
, m_taskHandler(handler) {}
|
||||
void addChildren(const QList<GroupItem> &children);
|
||||
|
||||
static GroupItem groupHandler(const GroupHandler &handler) { return GroupItem({handler}); }
|
||||
static GroupItem parallelLimit(int limit) { return GroupItem({{}, limit}); }
|
||||
static GroupItem workflowPolicy(WorkflowPolicy policy) { return GroupItem({{}, {}, policy}); }
|
||||
|
||||
// Checks if Function may be invoked with Args and if Function's return type is Result.
|
||||
template <typename Result, typename Function, typename ...Args,
|
||||
typename DecayedFunction = std::decay_t<Function>>
|
||||
static constexpr bool isInvocable()
|
||||
{
|
||||
// Note, that std::is_invocable_r_v doesn't check Result type properly.
|
||||
if constexpr (std::is_invocable_r_v<Result, DecayedFunction, Args...>)
|
||||
return std::is_same_v<Result, std::invoke_result_t<DecayedFunction, Args...>>;
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
friend class ContainerNode;
|
||||
friend class TaskNode;
|
||||
friend class TaskTreePrivate;
|
||||
Type m_type = Type::Group;
|
||||
QList<GroupItem> m_children;
|
||||
GroupData m_groupData;
|
||||
QList<StorageBase> m_storageList;
|
||||
TaskHandler m_taskHandler;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT ExecutableItem : public GroupItem
|
||||
{
|
||||
public:
|
||||
ExecutableItem withTimeout(std::chrono::milliseconds timeout,
|
||||
const std::function<void()> &handler = {}) const;
|
||||
ExecutableItem withLog(const QString &logName) const;
|
||||
template <typename SenderSignalPairGetter>
|
||||
ExecutableItem withCancel(SenderSignalPairGetter &&getter) const
|
||||
{
|
||||
const auto connectWrapper = [getter](QObject *guard, const std::function<void()> &trigger) {
|
||||
const auto senderSignalPair = getter();
|
||||
QObject::connect(senderSignalPair.first, senderSignalPair.second, guard, [trigger] {
|
||||
trigger();
|
||||
}, static_cast<Qt::ConnectionType>(Qt::QueuedConnection | Qt::SingleShotConnection));
|
||||
};
|
||||
return withCancelImpl(connectWrapper);
|
||||
}
|
||||
|
||||
protected:
|
||||
ExecutableItem() = default;
|
||||
ExecutableItem(const TaskHandler &handler) : GroupItem(handler) {}
|
||||
|
||||
private:
|
||||
ExecutableItem withCancelImpl(
|
||||
const std::function<void(QObject *, const std::function<void()> &)> &connectWrapper) const;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT Group : public ExecutableItem
|
||||
{
|
||||
public:
|
||||
Group(const QList<GroupItem> &children) { addChildren(children); }
|
||||
Group(std::initializer_list<GroupItem> children) { addChildren(children); }
|
||||
|
||||
// GroupData related:
|
||||
template <typename Handler>
|
||||
static GroupItem onGroupSetup(Handler &&handler) {
|
||||
return groupHandler({wrapGroupSetup(std::forward<Handler>(handler))});
|
||||
}
|
||||
template <typename Handler>
|
||||
static GroupItem onGroupDone(Handler &&handler, CallDoneIf callDoneIf = CallDoneIf::SuccessOrError) {
|
||||
return groupHandler({{}, wrapGroupDone(std::forward<Handler>(handler)), callDoneIf});
|
||||
}
|
||||
using GroupItem::parallelLimit; // Default: 1 (sequential). 0 means unlimited (parallel).
|
||||
using GroupItem::workflowPolicy; // Default: WorkflowPolicy::StopOnError.
|
||||
|
||||
private:
|
||||
template <typename Handler>
|
||||
static GroupSetupHandler wrapGroupSetup(Handler &&handler)
|
||||
{
|
||||
// R, V stands for: Setup[R]esult, [V]oid
|
||||
static constexpr bool isR = isInvocable<SetupResult, Handler>();
|
||||
static constexpr bool isV = isInvocable<void, Handler>();
|
||||
static_assert(isR || isV,
|
||||
"Group setup handler needs to take no arguments and has to return void or SetupResult. "
|
||||
"The passed handler doesn't fulfill these requirements.");
|
||||
return [handler] {
|
||||
if constexpr (isR)
|
||||
return std::invoke(handler);
|
||||
std::invoke(handler);
|
||||
return SetupResult::Continue;
|
||||
};
|
||||
}
|
||||
template <typename Handler>
|
||||
static GroupDoneHandler wrapGroupDone(Handler &&handler)
|
||||
{
|
||||
// R, V, D stands for: Done[R]esult, [V]oid, [D]oneWith
|
||||
static constexpr bool isRD = isInvocable<DoneResult, Handler, DoneWith>();
|
||||
static constexpr bool isR = isInvocable<DoneResult, Handler>();
|
||||
static constexpr bool isVD = isInvocable<void, Handler, DoneWith>();
|
||||
static constexpr bool isV = isInvocable<void, Handler>();
|
||||
static_assert(isRD || isR || isVD || isV,
|
||||
"Group done handler needs to take (DoneWith) or (void) as an argument and has to "
|
||||
"return void or DoneResult. The passed handler doesn't fulfill these requirements.");
|
||||
return [handler](DoneWith result) {
|
||||
if constexpr (isRD)
|
||||
return std::invoke(handler, result);
|
||||
if constexpr (isR)
|
||||
return std::invoke(handler);
|
||||
if constexpr (isVD)
|
||||
std::invoke(handler, result);
|
||||
else if constexpr (isV)
|
||||
std::invoke(handler);
|
||||
return result == DoneWith::Success ? DoneResult::Success : DoneResult::Error;
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Handler>
|
||||
static GroupItem onGroupSetup(Handler &&handler)
|
||||
{
|
||||
return Group::onGroupSetup(std::forward<Handler>(handler));
|
||||
}
|
||||
|
||||
template <typename Handler>
|
||||
static GroupItem onGroupDone(Handler &&handler, CallDoneIf callDoneIf = CallDoneIf::SuccessOrError)
|
||||
{
|
||||
return Group::onGroupDone(std::forward<Handler>(handler), callDoneIf);
|
||||
}
|
||||
|
||||
TASKING_EXPORT GroupItem parallelLimit(int limit);
|
||||
TASKING_EXPORT GroupItem workflowPolicy(WorkflowPolicy policy);
|
||||
|
||||
TASKING_EXPORT extern const GroupItem nullItem;
|
||||
|
||||
TASKING_EXPORT extern const GroupItem sequential;
|
||||
TASKING_EXPORT extern const GroupItem parallel;
|
||||
TASKING_EXPORT extern const GroupItem parallelIdealThreadCountLimit;
|
||||
|
||||
TASKING_EXPORT extern const GroupItem stopOnError;
|
||||
TASKING_EXPORT extern const GroupItem continueOnError;
|
||||
TASKING_EXPORT extern const GroupItem stopOnSuccess;
|
||||
TASKING_EXPORT extern const GroupItem continueOnSuccess;
|
||||
TASKING_EXPORT extern const GroupItem stopOnSuccessOrError;
|
||||
TASKING_EXPORT extern const GroupItem finishAllAndSuccess;
|
||||
TASKING_EXPORT extern const GroupItem finishAllAndError;
|
||||
|
||||
class TASKING_EXPORT Forever final : public Group
|
||||
{
|
||||
public:
|
||||
Forever(const QList<GroupItem> &children) : Group({LoopForever(), children}) {}
|
||||
Forever(std::initializer_list<GroupItem> children) : Group({LoopForever(), children}) {}
|
||||
};
|
||||
|
||||
// Synchronous invocation. Similarly to Group - isn't counted as a task inside taskCount()
|
||||
class TASKING_EXPORT Sync final : public ExecutableItem
|
||||
{
|
||||
public:
|
||||
template <typename Handler>
|
||||
Sync(Handler &&handler) {
|
||||
addChildren({ onGroupSetup(wrapHandler(std::forward<Handler>(handler))) });
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename Handler>
|
||||
static GroupSetupHandler wrapHandler(Handler &&handler) {
|
||||
// R, V stands for: Done[R]esult, [V]oid
|
||||
static constexpr bool isR = isInvocable<DoneResult, Handler>();
|
||||
static constexpr bool isV = isInvocable<void, Handler>();
|
||||
static_assert(isR || isV,
|
||||
"Sync handler needs to take no arguments and has to return void or DoneResult. "
|
||||
"The passed handler doesn't fulfill these requirements.");
|
||||
return [handler] {
|
||||
if constexpr (isR) {
|
||||
return std::invoke(handler) == DoneResult::Success ? SetupResult::StopWithSuccess
|
||||
: SetupResult::StopWithError;
|
||||
}
|
||||
std::invoke(handler);
|
||||
return SetupResult::StopWithSuccess;
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Task, typename Deleter = std::default_delete<Task>>
|
||||
class TaskAdapter : public TaskInterface
|
||||
{
|
||||
protected:
|
||||
TaskAdapter() : m_task(new Task) {}
|
||||
Task *task() { return m_task.get(); }
|
||||
const Task *task() const { return m_task.get(); }
|
||||
|
||||
private:
|
||||
using TaskType = Task;
|
||||
using DeleterType = Deleter;
|
||||
template <typename Adapter> friend class CustomTask;
|
||||
std::unique_ptr<Task, Deleter> m_task;
|
||||
};
|
||||
|
||||
template <typename Adapter>
|
||||
class CustomTask final : public ExecutableItem
|
||||
{
|
||||
public:
|
||||
using Task = typename Adapter::TaskType;
|
||||
using Deleter = typename Adapter::DeleterType;
|
||||
static_assert(std::is_base_of_v<TaskAdapter<Task, Deleter>, Adapter>,
|
||||
"The Adapter type for the CustomTask<Adapter> needs to be derived from "
|
||||
"TaskAdapter<Task>.");
|
||||
using TaskSetupHandler = std::function<SetupResult(Task &)>;
|
||||
using TaskDoneHandler = std::function<DoneResult(const Task &, DoneWith)>;
|
||||
|
||||
template <typename SetupHandler = TaskSetupHandler, typename DoneHandler = TaskDoneHandler>
|
||||
CustomTask(SetupHandler &&setup = TaskSetupHandler(), DoneHandler &&done = TaskDoneHandler(),
|
||||
CallDoneIf callDoneIf = CallDoneIf::SuccessOrError)
|
||||
: ExecutableItem({&createAdapter, wrapSetup(std::forward<SetupHandler>(setup)),
|
||||
wrapDone(std::forward<DoneHandler>(done)), callDoneIf})
|
||||
{}
|
||||
|
||||
private:
|
||||
static Adapter *createAdapter() { return new Adapter; }
|
||||
|
||||
template <typename Handler>
|
||||
static InterfaceSetupHandler wrapSetup(Handler &&handler) {
|
||||
if constexpr (std::is_same_v<Handler, TaskSetupHandler>)
|
||||
return {}; // When user passed {} for the setup handler.
|
||||
// R, V stands for: Setup[R]esult, [V]oid
|
||||
static constexpr bool isR = isInvocable<SetupResult, Handler, Task &>();
|
||||
static constexpr bool isV = isInvocable<void, Handler, Task &>();
|
||||
static_assert(isR || isV,
|
||||
"Task setup handler needs to take (Task &) as an argument and has to return void or "
|
||||
"SetupResult. The passed handler doesn't fulfill these requirements.");
|
||||
return [handler](TaskInterface &taskInterface) {
|
||||
Adapter &adapter = static_cast<Adapter &>(taskInterface);
|
||||
if constexpr (isR)
|
||||
return std::invoke(handler, *adapter.task());
|
||||
std::invoke(handler, *adapter.task());
|
||||
return SetupResult::Continue;
|
||||
};
|
||||
}
|
||||
|
||||
template <typename Handler>
|
||||
static InterfaceDoneHandler wrapDone(Handler &&handler) {
|
||||
if constexpr (std::is_same_v<Handler, TaskDoneHandler>)
|
||||
return {}; // When user passed {} for the done handler.
|
||||
// R, V, T, D stands for: Done[R]esult, [V]oid, [T]ask, [D]oneWith
|
||||
static constexpr bool isRTD = isInvocable<DoneResult, Handler, const Task &, DoneWith>();
|
||||
static constexpr bool isRT = isInvocable<DoneResult, Handler, const Task &>();
|
||||
static constexpr bool isRD = isInvocable<DoneResult, Handler, DoneWith>();
|
||||
static constexpr bool isR = isInvocable<DoneResult, Handler>();
|
||||
static constexpr bool isVTD = isInvocable<void, Handler, const Task &, DoneWith>();
|
||||
static constexpr bool isVT = isInvocable<void, Handler, const Task &>();
|
||||
static constexpr bool isVD = isInvocable<void, Handler, DoneWith>();
|
||||
static constexpr bool isV = isInvocable<void, Handler>();
|
||||
static_assert(isRTD || isRT || isRD || isR || isVTD || isVT || isVD || isV,
|
||||
"Task done handler needs to take (const Task &, DoneWith), (const Task &), "
|
||||
"(DoneWith) or (void) as arguments and has to return void or DoneResult. "
|
||||
"The passed handler doesn't fulfill these requirements.");
|
||||
return [handler](const TaskInterface &taskInterface, DoneWith result) {
|
||||
const Adapter &adapter = static_cast<const Adapter &>(taskInterface);
|
||||
if constexpr (isRTD)
|
||||
return std::invoke(handler, *adapter.task(), result);
|
||||
if constexpr (isRT)
|
||||
return std::invoke(handler, *adapter.task());
|
||||
if constexpr (isRD)
|
||||
return std::invoke(handler, result);
|
||||
if constexpr (isR)
|
||||
return std::invoke(handler);
|
||||
if constexpr (isVTD)
|
||||
std::invoke(handler, *adapter.task(), result);
|
||||
else if constexpr (isVT)
|
||||
std::invoke(handler, *adapter.task());
|
||||
else if constexpr (isVD)
|
||||
std::invoke(handler, result);
|
||||
else if constexpr (isV)
|
||||
std::invoke(handler);
|
||||
return result == DoneWith::Success ? DoneResult::Success : DoneResult::Error;
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
class TASKING_EXPORT TaskTree final : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
TaskTree();
|
||||
TaskTree(const Group &recipe);
|
||||
~TaskTree();
|
||||
|
||||
void setRecipe(const Group &recipe);
|
||||
|
||||
void start();
|
||||
void cancel();
|
||||
bool isRunning() const;
|
||||
|
||||
// Helper methods. They execute a local event loop with ExcludeUserInputEvents.
|
||||
// The passed future is used for listening to the cancel event.
|
||||
// Don't use it in main thread. To be used in non-main threads or in auto tests.
|
||||
DoneWith runBlocking();
|
||||
DoneWith runBlocking(const QFuture<void> &future);
|
||||
static DoneWith runBlocking(const Group &recipe,
|
||||
std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
|
||||
static DoneWith runBlocking(const Group &recipe, const QFuture<void> &future,
|
||||
std::chrono::milliseconds timeout = std::chrono::milliseconds::max());
|
||||
|
||||
int asyncCount() const;
|
||||
int taskCount() const;
|
||||
int progressMaximum() const { return taskCount(); }
|
||||
int progressValue() const; // all finished / skipped / stopped tasks, groups itself excluded
|
||||
|
||||
template <typename StorageStruct, typename Handler>
|
||||
void onStorageSetup(const Storage<StorageStruct> &storage, Handler &&handler) {
|
||||
static_assert(std::is_invocable_v<std::decay_t<Handler>, StorageStruct &>,
|
||||
"Storage setup handler needs to take (Storage &) as an argument. "
|
||||
"The passed handler doesn't fulfill this requirement.");
|
||||
setupStorageHandler(storage,
|
||||
wrapHandler<StorageStruct>(std::forward<Handler>(handler)), {});
|
||||
}
|
||||
template <typename StorageStruct, typename Handler>
|
||||
void onStorageDone(const Storage<StorageStruct> &storage, Handler &&handler) {
|
||||
static_assert(std::is_invocable_v<std::decay_t<Handler>, const StorageStruct &>,
|
||||
"Storage done handler needs to take (const Storage &) as an argument. "
|
||||
"The passed handler doesn't fulfill this requirement.");
|
||||
setupStorageHandler(storage, {},
|
||||
wrapHandler<const StorageStruct>(std::forward<Handler>(handler)));
|
||||
}
|
||||
|
||||
Q_SIGNALS:
|
||||
void started();
|
||||
void done(DoneWith result);
|
||||
void asyncCountChanged(int count);
|
||||
void progressValueChanged(int value); // updated whenever task finished / skipped / stopped
|
||||
|
||||
private:
|
||||
void setupStorageHandler(const StorageBase &storage,
|
||||
StorageBase::StorageHandler setupHandler,
|
||||
StorageBase::StorageHandler doneHandler);
|
||||
template <typename StorageStruct, typename Handler>
|
||||
StorageBase::StorageHandler wrapHandler(Handler &&handler) {
|
||||
return [handler](void *voidStruct) {
|
||||
auto *storageStruct = static_cast<StorageStruct *>(voidStruct);
|
||||
std::invoke(handler, *storageStruct);
|
||||
};
|
||||
}
|
||||
|
||||
TaskTreePrivate *d;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT TaskTreeTaskAdapter : public TaskAdapter<TaskTree>
|
||||
{
|
||||
public:
|
||||
TaskTreeTaskAdapter();
|
||||
|
||||
private:
|
||||
void start() final;
|
||||
};
|
||||
|
||||
class TASKING_EXPORT TimeoutTaskAdapter : public TaskAdapter<std::chrono::milliseconds>
|
||||
{
|
||||
public:
|
||||
TimeoutTaskAdapter();
|
||||
~TimeoutTaskAdapter();
|
||||
|
||||
private:
|
||||
void start() final;
|
||||
std::optional<int> m_timerId;
|
||||
};
|
||||
|
||||
using TaskTreeTask = CustomTask<TaskTreeTaskAdapter>;
|
||||
using TimeoutTask = CustomTask<TimeoutTaskAdapter>;
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif // TASKING_TASKTREE_H
|
45
src/assets/downloader/tasking/tasktreerunner.cpp
Normal file
45
src/assets/downloader/tasking/tasktreerunner.cpp
Normal file
@ -0,0 +1,45 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#include "tasktreerunner.h"
|
||||
|
||||
#include "tasktree.h"
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
TaskTreeRunner::~TaskTreeRunner() = default;
|
||||
|
||||
void TaskTreeRunner::start(const Group &recipe,
|
||||
const SetupHandler &setupHandler,
|
||||
const DoneHandler &doneHandler)
|
||||
{
|
||||
m_taskTree.reset(new TaskTree(recipe));
|
||||
connect(m_taskTree.get(), &TaskTree::done, this, [this, doneHandler](DoneWith result) {
|
||||
m_taskTree.release()->deleteLater();
|
||||
if (doneHandler)
|
||||
doneHandler(result);
|
||||
emit done(result);
|
||||
});
|
||||
if (setupHandler)
|
||||
setupHandler(m_taskTree.get());
|
||||
emit aboutToStart(m_taskTree.get());
|
||||
m_taskTree->start();
|
||||
}
|
||||
|
||||
void TaskTreeRunner::cancel()
|
||||
{
|
||||
if (m_taskTree)
|
||||
m_taskTree->cancel();
|
||||
}
|
||||
|
||||
void TaskTreeRunner::reset()
|
||||
{
|
||||
m_taskTree.reset();
|
||||
}
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
63
src/assets/downloader/tasking/tasktreerunner.h
Normal file
63
src/assets/downloader/tasking/tasktreerunner.h
Normal file
@ -0,0 +1,63 @@
|
||||
// Copyright (C) 2024 Jarek Kobus
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef TASKING_TASKTREERUNNER_H
|
||||
#define TASKING_TASKTREERUNNER_H
|
||||
|
||||
//
|
||||
// W A R N I N G
|
||||
// -------------
|
||||
//
|
||||
// This file is not part of the Qt API. It exists purely as an
|
||||
// implementation detail. This header file may change from version to
|
||||
// version without notice, or even be removed.
|
||||
//
|
||||
// We mean it.
|
||||
//
|
||||
|
||||
#include "tasking_global.h"
|
||||
#include "tasktree.h"
|
||||
|
||||
#include <QtCore/QObject>
|
||||
|
||||
QT_BEGIN_NAMESPACE
|
||||
|
||||
namespace Tasking {
|
||||
|
||||
class TASKING_EXPORT TaskTreeRunner : public QObject
|
||||
{
|
||||
Q_OBJECT
|
||||
|
||||
public:
|
||||
using SetupHandler = std::function<void(TaskTree *)>;
|
||||
using DoneHandler = std::function<void(DoneWith)>;
|
||||
|
||||
~TaskTreeRunner();
|
||||
|
||||
bool isRunning() const { return bool(m_taskTree); }
|
||||
|
||||
// When task tree is running it resets the old task tree.
|
||||
void start(const Group &recipe,
|
||||
const SetupHandler &setupHandler = {},
|
||||
const DoneHandler &doneHandler = {});
|
||||
|
||||
// When task tree is running it emits done(DoneWith::Cancel) synchronously.
|
||||
void cancel();
|
||||
|
||||
// No done() signal is emitted.
|
||||
void reset();
|
||||
|
||||
Q_SIGNALS:
|
||||
void aboutToStart(TaskTree *taskTree);
|
||||
void done(DoneWith result);
|
||||
|
||||
private:
|
||||
std::unique_ptr<TaskTree> m_taskTree;
|
||||
};
|
||||
|
||||
} // namespace Tasking
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif // TASKING_TASKTREERUNNER_H
|
Loading…
x
Reference in New Issue
Block a user