qtbase/src/concurrent/qtconcurrentreducekernel.h
Ivan Solovev e632241386 Fix QThreadPool::maxThreadCount() usage
The docs claim that QThreadPool always creates at least one thread.
However, the user can (usually by mistake) request zero or a negative
number of threads.
The maxThreadCount() function is simply returning the value, that was
requested by the user.
Since it's a public API, it is used in several places in QtConcurrent,
where it is assumed that the value is always positive. This can lead
to a crash if the user sets zero as a maxThreadCount.

Update all such places with std::max(maxThreadCount(), 1).
Prefer this approach over changing the return value of
maxThreadCount(), because its behavior is documented and tested.

Amends 885eff053797d56f2e295558d0a71b030fbb1a69.

Fixes: QTBUG-120335
Pick-to: 6.5 6.2
Change-Id: Id3b2087cec7fbc7a2d42febca6586f2dacffe444
Reviewed-by: Thiago Macieira <thiago.macieira@intel.com>
(cherry picked from commit 936e72d18075b79c8d29353618dfbd052ae59dae)
Reviewed-by: Qt Cherry-pick Bot <cherrypick_bot@qt-project.org>
(cherry picked from commit 41a605e0f2251db78cb852bb161dd0020c63c935)
2024-01-12 14:31:43 +00:00

238 lines
6.8 KiB
C++

// Copyright (C) 2016 The Qt Company Ltd.
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
#ifndef QTCONCURRENT_REDUCEKERNEL_H
#define QTCONCURRENT_REDUCEKERNEL_H
#include <QtConcurrent/qtconcurrent_global.h>
#if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
#include <QtCore/qatomic.h>
#include <QtCore/qlist.h>
#include <QtCore/qmap.h>
#include <QtCore/qmutex.h>
#include <QtCore/qthread.h>
#include <QtCore/qthreadpool.h>
#include <mutex>
QT_BEGIN_NAMESPACE
namespace QtPrivate {
template<typename Sequence>
struct SequenceHolder
{
SequenceHolder(const Sequence &s) : sequence(s) { }
SequenceHolder(Sequence &&s) : sequence(std::move(s)) { }
Sequence sequence;
};
}
namespace QtConcurrent {
/*
The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
limit the reduce queue size for MapReduce. When the number of
reduce blocks in the queue exceeds ReduceQueueStartLimit,
MapReduce won't start any new threads, and when it exceeds
ReduceQueueThrottleLimit running threads will be stopped.
*/
#ifdef Q_QDOC
enum ReduceQueueLimits {
ReduceQueueStartLimit = 20,
ReduceQueueThrottleLimit = 30
};
#else
enum {
ReduceQueueStartLimit = 20,
ReduceQueueThrottleLimit = 30
};
#endif
// IntermediateResults holds a block of intermediate results from a
// map or filter functor. The begin/end offsets indicates the origin
// and range of the block.
template <typename T>
class IntermediateResults
{
public:
int begin, end;
QList<T> vector;
};
enum ReduceOption {
UnorderedReduce = 0x1,
OrderedReduce = 0x2,
SequentialReduce = 0x4
// ParallelReduce = 0x8
};
Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
#ifndef Q_QDOC
Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
#endif
// supports both ordered and out-of-order reduction
template <typename ReduceFunctor, typename ReduceResultType, typename T>
class ReduceKernel
{
typedef QMap<int, IntermediateResults<T> > ResultsMap;
const ReduceOptions reduceOptions;
QMutex mutex;
int progress, resultsMapSize;
const int threadCount;
ResultsMap resultsMap;
bool canReduce(int begin) const
{
return (((reduceOptions & UnorderedReduce)
&& progress == 0)
|| ((reduceOptions & OrderedReduce)
&& progress == begin));
}
void reduceResult(ReduceFunctor &reduce,
ReduceResultType &r,
const IntermediateResults<T> &result)
{
for (int i = 0; i < result.vector.size(); ++i) {
std::invoke(reduce, r, result.vector.at(i));
}
}
void reduceResults(ReduceFunctor &reduce,
ReduceResultType &r,
ResultsMap &map)
{
typename ResultsMap::iterator it = map.begin();
while (it != map.end()) {
reduceResult(reduce, r, it.value());
++it;
}
}
public:
ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
: reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
threadCount(std::max(pool->maxThreadCount(), 1))
{ }
void runReduce(ReduceFunctor &reduce,
ReduceResultType &r,
const IntermediateResults<T> &result)
{
std::unique_lock<QMutex> locker(mutex);
if (!canReduce(result.begin)) {
++resultsMapSize;
resultsMap.insert(result.begin, result);
return;
}
if (reduceOptions & UnorderedReduce) {
// UnorderedReduce
progress = -1;
// reduce this result
locker.unlock();
reduceResult(reduce, r, result);
locker.lock();
// reduce all stored results as well
while (!resultsMap.isEmpty()) {
ResultsMap resultsMapCopy = resultsMap;
resultsMap.clear();
locker.unlock();
reduceResults(reduce, r, resultsMapCopy);
locker.lock();
resultsMapSize -= resultsMapCopy.size();
}
progress = 0;
} else {
// reduce this result
locker.unlock();
reduceResult(reduce, r, result);
locker.lock();
// OrderedReduce
progress += result.end - result.begin;
// reduce as many other results as possible
typename ResultsMap::iterator it = resultsMap.begin();
while (it != resultsMap.end()) {
if (it.value().begin != progress)
break;
locker.unlock();
reduceResult(reduce, r, it.value());
locker.lock();
--resultsMapSize;
progress += it.value().end - it.value().begin;
it = resultsMap.erase(it);
}
}
}
// final reduction
void finish(ReduceFunctor &reduce, ReduceResultType &r)
{
reduceResults(reduce, r, resultsMap);
}
inline bool shouldThrottle()
{
std::lock_guard<QMutex> locker(mutex);
return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
}
inline bool shouldStartThread()
{
std::lock_guard<QMutex> locker(mutex);
return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
}
};
template <typename Sequence, typename Base, typename Functor1, typename Functor2>
struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base
{
template<typename S = Sequence, typename F1 = Functor1, typename F2 = Functor2>
SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
ReduceOptions reduceOptions)
: QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
Base(pool, this->sequence.cbegin(), this->sequence.cend(),
std::forward<F1>(functor1), std::forward<F2>(functor2), reduceOptions)
{ }
template<typename InitialValueType, typename S = Sequence,
typename F1 = Functor1, typename F2 = Functor2>
SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
InitialValueType &&initialValue, ReduceOptions reduceOptions)
: QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
Base(pool, this->sequence.cbegin(), this->sequence.cend(),
std::forward<F1>(functor1), std::forward<F2>(functor2),
std::forward<InitialValueType>(initialValue), reduceOptions)
{ }
void finish() override
{
Base::finish();
// Clear the sequence to make sure all temporaries are destroyed
// before finished is signaled.
this->sequence = Sequence();
}
};
} // namespace QtConcurrent
QT_END_NAMESPACE
#endif // QT_NO_CONCURRENT
#endif