Revert "Allow QWindowsPipe{Reader,Writer} to work with foreign event loops"

This reverts commit ee122077b09430da54ca09750589b37326a22d85.

Reason for revert: This causes QProcess::readAll() to sometimes
return nothing after the process has ended.

Fixes: QTBUG-88624
Change-Id: I34fa27ae7fb38cc7c3a1e8eb2fdae2a5775584c2
Reviewed-by: Lars Knoll <lars.knoll@qt.io>
Reviewed-by: Paul Wicking <paul.wicking@qt.io>
(cherry picked from commit 23100ee61e33680d20f934dcbc96b57e8da29bf9)
Reviewed-by: Qt Cherry-pick Bot <cherrypick_bot@qt-project.org>
This commit is contained in:
Kai Koehne 2020-11-19 16:06:05 +01:00 committed by Qt Cherry-pick Bot
parent 915be6606e
commit 5a233f7556
11 changed files with 337 additions and 878 deletions

View File

@ -1,6 +1,6 @@
/****************************************************************************
**
** Copyright (C) 2020 The Qt Company Ltd.
** Copyright (C) 2016 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -41,75 +41,61 @@
#include "qiodevice_p.h"
#include <qelapsedtimer.h>
#include <qscopedvaluerollback.h>
#include <qcoreapplication.h>
#include <QMutexLocker>
QT_BEGIN_NAMESPACE
QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
: pipeReader(reader)
{
}
void QWindowsPipeReader::Overlapped::clear()
{
ZeroMemory(this, sizeof(OVERLAPPED));
}
QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent),
handle(INVALID_HANDLE_VALUE),
eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
waitObject(NULL),
overlapped(this),
readBufferMaxSize(0),
actualReadBufferSize(0),
pendingReadBytes(0),
lastError(ERROR_SUCCESS),
stopped(true),
readSequenceStarted(false),
notifiedCalled(false),
pipeBroken(false),
readyReadPending(false),
winEventActPosted(false),
inReadyRead(false)
{
ZeroMemory(&overlapped, sizeof(OVERLAPPED));
overlapped.hEvent = eventHandle;
waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
if (waitObject == NULL)
qErrnoWarning("QWindowsPipeReader: CreateThreadpollWait failed.");
connect(this, &QWindowsPipeReader::_q_queueReadyRead,
this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
}
QWindowsPipeReader::~QWindowsPipeReader()
{
stop();
CloseThreadpoolWait(waitObject);
CloseHandle(eventHandle);
CloseHandle(syncHandle);
}
/*!
\internal
Sets the handle to read from. The handle must be valid.
Do not call this function if the pipe is running.
*/
void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
{
readBuffer.clear();
actualReadBufferSize = 0;
readyReadPending = false;
pendingReadBytes = 0;
handle = hPipeReadEnd;
pipeBroken = false;
lastError = ERROR_SUCCESS;
}
/*!
\internal
Stops the asynchronous read sequence.
If the read sequence is running then the I/O operation is canceled.
*/
void QWindowsPipeReader::stop()
{
if (stopped)
return;
mutex.lock();
stopped = true;
if (readSequenceStarted) {
// Trying to disable callback before canceling the operation.
// Callback invocation is unnecessary here.
SetThreadpoolWait(waitObject, NULL, NULL);
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@ -117,33 +103,8 @@ void QWindowsPipeReader::stop()
handle);
}
}
readSequenceStarted = false;
waitForNotification(-1);
}
mutex.unlock();
WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
}
/*!
\internal
Sets the size of internal read buffer.
*/
void QWindowsPipeReader::setMaxReadBufferSize(qint64 size)
{
QMutexLocker locker(&mutex);
readBufferMaxSize = size;
}
/*!
\internal
Returns \c true if async operation is in progress, there is
pending data to read, or a read error is pending.
*/
bool QWindowsPipeReader::isReadOperationActive() const
{
QMutexLocker locker(&mutex);
return readSequenceStarted || readyReadPending
|| (lastError != ERROR_SUCCESS && !pipeBroken);
}
/*!
@ -162,7 +123,6 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
if (pipeBroken && actualReadBufferSize == 0)
return 0; // signal EOF
mutex.lock();
qint64 readSoFar;
// If startAsyncRead() has read data, copy it to its destination.
if (maxlen == 1 && actualReadBufferSize > 0) {
@ -173,10 +133,9 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
readSoFar = readBuffer.read(data, qMin(actualReadBufferSize, maxlen));
actualReadBufferSize -= readSoFar;
}
mutex.unlock();
if (!pipeBroken) {
if (!stopped)
if (!readSequenceStarted && !stopped)
startAsyncRead();
if (readSoFar == 0)
return -2; // signal EWOULDBLOCK
@ -185,220 +144,131 @@ qint64 QWindowsPipeReader::read(char *data, qint64 maxlen)
return readSoFar;
}
/*!
\internal
Returns \c true if a complete line of data can be read from the buffer.
*/
bool QWindowsPipeReader::canReadLine() const
{
QMutexLocker locker(&mutex);
return readBuffer.indexOf('\n', actualReadBufferSize) >= 0;
}
/*!
\internal
Starts an asynchronous read sequence on the pipe.
*/
void QWindowsPipeReader::startAsyncRead()
{
QMutexLocker locker(&mutex);
if (readSequenceStarted || lastError != ERROR_SUCCESS)
return;
stopped = false;
startAsyncReadLocked();
// Do not post the event, if the read operation will be completed asynchronously.
if (!readyReadPending && lastError == ERROR_SUCCESS)
return;
if (!winEventActPosted) {
winEventActPosted = true;
locker.unlock();
QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
}
}
/*!
\internal
Starts a new read sequence. Thread-safety should be ensured by the caller.
*/
void QWindowsPipeReader::startAsyncReadLocked()
{
const DWORD minReadBufferSize = 4096;
forever {
qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
if (lastError != ERROR_SUCCESS)
return;
if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
bytesToRead = readBufferMaxSize - readBuffer.size();
if (bytesToRead <= 0) {
// Buffer is full. User must read data from the buffer
// before we can read more from the pipe.
return;
}
}
char *ptr = readBuffer.reserve(bytesToRead);
// ReadFile() returns true, if the read operation completes synchronously.
// We don't need to call GetOverlappedResult() additionally, because
// 'numberOfBytesRead' is valid in this case.
DWORD numberOfBytesRead;
if (!ReadFile(handle, ptr, bytesToRead, &numberOfBytesRead, &overlapped))
break;
readCompleted(ERROR_SUCCESS, numberOfBytesRead);
}
const DWORD dwError = GetLastError();
if (dwError == ERROR_IO_PENDING) {
// Operation has been queued and will complete in the future.
readSequenceStarted = true;
SetThreadpoolWait(waitObject, eventHandle, NULL);
} else {
// Any other errors are treated as EOF.
readCompleted(dwError, 0);
}
}
/*!
\internal
Thread pool callback procedure.
*/
void QWindowsPipeReader::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
Q_UNUSED(instance);
Q_UNUSED(wait);
Q_UNUSED(waitResult);
QWindowsPipeReader *pipeReader = reinterpret_cast<QWindowsPipeReader *>(context);
// Get the result of the asynchronous operation.
DWORD numberOfBytesTransfered = 0;
DWORD errorCode = ERROR_SUCCESS;
if (!GetOverlappedResult(pipeReader->handle, &pipeReader->overlapped,
&numberOfBytesTransfered, FALSE))
errorCode = GetLastError();
QMutexLocker locker(&pipeReader->mutex);
// After the reader was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new read sequence should
// be started in this case.
if (pipeReader->stopped)
return;
pipeReader->readSequenceStarted = false;
// Do not overwrite error code, if error has been detected by
// checkPipeState() in waitForPipeClosed().
if (pipeReader->lastError != ERROR_SUCCESS)
return;
pipeReader->readCompleted(errorCode, numberOfBytesTransfered);
if (pipeReader->lastError == ERROR_SUCCESS)
pipeReader->startAsyncReadLocked();
if (!pipeReader->winEventActPosted) {
pipeReader->winEventActPosted = true;
locker.unlock();
QCoreApplication::postEvent(pipeReader, new QEvent(QEvent::WinEventAct));
} else {
locker.unlock();
}
SetEvent(pipeReader->syncHandle);
}
/*!
\internal
Will be called whenever the read operation completes.
*/
void QWindowsPipeReader::readCompleted(DWORD errorCode, DWORD numberOfBytesRead)
void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
{
// ERROR_MORE_DATA is not an error. We're connected to a message mode
// pipe and the message didn't fit into the pipe's system
// buffer. We will read the remaining data in the next call.
if (errorCode == ERROR_SUCCESS || errorCode == ERROR_MORE_DATA) {
readyReadPending = true;
pendingReadBytes += numberOfBytesRead;
readBuffer.truncate(actualReadBufferSize + pendingReadBytes);
} else {
lastError = errorCode;
}
}
notifiedCalled = true;
readSequenceStarted = false;
/*!
\internal
Receives notification that the read operation has completed.
*/
bool QWindowsPipeReader::event(QEvent *e)
{
if (e->type() == QEvent::WinEventAct) {
emitPendingSignals(true);
return true;
}
return QObject::event(e);
}
/*!
\internal
Emits pending signals in the main thread. Returns \c true,
if readyRead() was emitted.
*/
bool QWindowsPipeReader::emitPendingSignals(bool allowWinActPosting)
{
mutex.lock();
// Enable QEvent::WinEventAct posting.
if (allowWinActPosting)
winEventActPosted = false;
bool emitReadyRead = false;
if (readyReadPending) {
readyReadPending = false;
actualReadBufferSize += pendingReadBytes;
pendingReadBytes = 0;
emitReadyRead = true;
}
const DWORD dwError = lastError;
mutex.unlock();
// Disable any further processing, if the pipe was stopped.
if (stopped)
return false;
if (emitReadyRead && !inReadyRead) {
QScopedValueRollback<bool> guard(inReadyRead, true);
emit readyRead();
}
// Trigger 'pipeBroken' only once.
if (dwError != ERROR_SUCCESS && !pipeBroken) {
switch (errorCode) {
case ERROR_SUCCESS:
break;
case ERROR_MORE_DATA:
// This is not an error. We're connected to a message mode
// pipe and the message didn't fit into the pipe's system
// buffer. We will read the remaining data in the next call.
break;
case ERROR_BROKEN_PIPE:
case ERROR_PIPE_NOT_CONNECTED:
pipeBroken = true;
if (dwError != ERROR_BROKEN_PIPE && dwError != ERROR_PIPE_NOT_CONNECTED)
emit winError(dwError, QLatin1String("QWindowsPipeReader::emitPendingSignals"));
emit pipeClosed();
break;
case ERROR_OPERATION_ABORTED:
if (stopped)
break;
Q_FALLTHROUGH();
default:
emit winError(errorCode, QLatin1String("QWindowsPipeReader::notified"));
pipeBroken = true;
break;
}
return emitReadyRead;
// After the reader was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new read sequence should
// be started in this case.
if (stopped)
return;
if (pipeBroken) {
emit pipeClosed();
return;
}
actualReadBufferSize += numberOfBytesRead;
readBuffer.truncate(actualReadBufferSize);
startAsyncRead();
if (!readyReadPending) {
readyReadPending = true;
emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
}
}
/*!
\internal
Reads data from the pipe into the readbuffer.
*/
void QWindowsPipeReader::startAsyncRead()
{
const DWORD minReadBufferSize = 4096;
qint64 bytesToRead = qMax(checkPipeState(), minReadBufferSize);
if (pipeBroken)
return;
if (readBufferMaxSize && bytesToRead > (readBufferMaxSize - readBuffer.size())) {
bytesToRead = readBufferMaxSize - readBuffer.size();
if (bytesToRead <= 0) {
// Buffer is full. User must read data from the buffer
// before we can read more from the pipe.
return;
}
}
char *ptr = readBuffer.reserve(bytesToRead);
stopped = false;
readSequenceStarted = true;
overlapped.clear();
if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
readSequenceStarted = false;
const DWORD dwError = GetLastError();
switch (dwError) {
case ERROR_BROKEN_PIPE:
case ERROR_PIPE_NOT_CONNECTED:
// It may happen, that the other side closes the connection directly
// after writing data. Then we must set the appropriate socket state.
pipeBroken = true;
emit pipeClosed();
break;
default:
emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
break;
}
}
}
/*!
\internal
Called when ReadFileEx finished the read operation.
*/
void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase)
{
Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
}
/*!
\internal
Returns the number of available bytes in the pipe.
Sets QWindowsPipeReader::lastError if the connection is broken.
Sets QWindowsPipeReader::pipeBroken to true if the connection is broken.
*/
DWORD QWindowsPipeReader::checkPipeState()
{
DWORD bytes;
if (PeekNamedPipe(handle, nullptr, 0, nullptr, &bytes, nullptr))
return bytes;
readCompleted(GetLastError(), 0);
if (!pipeBroken) {
pipeBroken = true;
emit pipeClosed();
}
return 0;
}
@ -406,21 +276,27 @@ bool QWindowsPipeReader::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
notifiedCalled = false;
int msecs = timeout;
do {
DWORD waitRet = WaitForSingleObjectEx(syncHandle,
msecs == -1 ? INFINITE : msecs, TRUE);
if (waitRet == WAIT_OBJECT_0)
while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
if (notifiedCalled)
return true;
if (waitRet != WAIT_IO_COMPLETION)
return false;
// Some I/O completion routine was called. Wait some more.
// Some other I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
} while (msecs != 0);
if (!msecs)
break;
}
return notifiedCalled;
}
return false;
void QWindowsPipeReader::emitPendingReadyRead()
{
if (readyReadPending) {
readyReadPending = false;
QScopedValueRollback<bool> guard(inReadyRead, true);
emit readyRead();
}
}
/*!
@ -430,21 +306,25 @@ bool QWindowsPipeReader::waitForNotification(int timeout)
*/
bool QWindowsPipeReader::waitForReadyRead(int msecs)
{
if (readBufferMaxSize && actualReadBufferSize >= readBufferMaxSize)
return false;
// Prepare handle for waiting.
ResetEvent(syncHandle);
// It is necessary to check if there is already data in the queue.
if (emitPendingSignals(false))
if (readyReadPending) {
if (!inReadyRead)
emitPendingReadyRead();
return true;
}
// Make sure that 'syncHandle' was triggered by the thread pool callback.
if (pipeBroken || !waitForNotification(msecs))
if (!readSequenceStarted)
return false;
return emitPendingSignals(false);
if (!waitForNotification(msecs))
return false;
if (readyReadPending) {
if (!inReadyRead)
emitPendingReadyRead();
return true;
}
return false;
}
/*!
@ -457,18 +337,9 @@ bool QWindowsPipeReader::waitForPipeClosed(int msecs)
stopWatch.start();
forever {
waitForReadyRead(0);
if (pipeBroken)
return true;
// When the read buffer is full, the read sequence is not running.
// So, we should peek the pipe to detect disconnect.
mutex.lock();
checkPipeState();
mutex.unlock();
emitPendingSignals(false);
if (pipeBroken)
return true;
if (stopWatch.hasExpired(msecs - sleepTime))
return false;
Sleep(sleepTime);

View File

@ -1,6 +1,6 @@
/****************************************************************************
**
** Copyright (C) 2020 The Qt Company Ltd.
** Copyright (C) 2016 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -52,7 +52,6 @@
//
#include <qobject.h>
#include <qmutex.h>
#include <private/qringbuffer_p.h>
#include <qt_windows.h>
@ -70,7 +69,7 @@ public:
void startAsyncRead();
void stop();
void setMaxReadBufferSize(qint64 size);
void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; }
qint64 maxReadBufferSize() const { return readBufferMaxSize; }
bool isPipeClosed() const { return pipeBroken; }
@ -80,41 +79,41 @@ public:
bool waitForReadyRead(int msecs);
bool waitForPipeClosed(int msecs);
bool isReadOperationActive() const;
bool isReadOperationActive() const { return readSequenceStarted; }
Q_SIGNALS:
void winError(ulong, const QString &);
void readyRead();
void pipeClosed();
protected:
bool event(QEvent *e) override;
void _q_queueReadyRead(QPrivateSignal);
private:
void startAsyncReadLocked();
static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult);
void readCompleted(DWORD errorCode, DWORD numberOfBytesRead);
static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase);
void notified(DWORD errorCode, DWORD numberOfBytesRead);
DWORD checkPipeState();
bool waitForNotification(int timeout);
bool emitPendingSignals(bool allowWinActPosting);
void emitPendingReadyRead();
class Overlapped : public OVERLAPPED
{
Q_DISABLE_COPY_MOVE(Overlapped)
public:
explicit Overlapped(QWindowsPipeReader *reader);
void clear();
QWindowsPipeReader *pipeReader;
};
HANDLE handle;
HANDLE eventHandle;
HANDLE syncHandle;
PTP_WAIT waitObject;
OVERLAPPED overlapped;
Overlapped overlapped;
qint64 readBufferMaxSize;
QRingBuffer readBuffer;
qint64 actualReadBufferSize;
qint64 pendingReadBytes;
mutable QMutex mutex;
DWORD lastError;
bool stopped;
bool readSequenceStarted;
bool notifiedCalled;
bool pipeBroken;
bool readyReadPending;
bool winEventActPosted;
bool inReadyRead;
};

View File

@ -1,6 +1,6 @@
/****************************************************************************
**
** Copyright (C) 2020 The Qt Company Ltd.
** Copyright (C) 2016 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -40,55 +40,178 @@
#include "qwindowspipewriter_p.h"
#include "qiodevice_p.h"
#include <qscopedvaluerollback.h>
#include <qcoreapplication.h>
QT_BEGIN_NAMESPACE
QWindowsPipeWriter::Overlapped::Overlapped(QWindowsPipeWriter *pipeWriter)
: pipeWriter(pipeWriter)
{
}
void QWindowsPipeWriter::Overlapped::clear()
{
ZeroMemory(this, sizeof(OVERLAPPED));
}
QWindowsPipeWriter::QWindowsPipeWriter(HANDLE pipeWriteEnd, QObject *parent)
: QObject(parent),
handle(pipeWriteEnd),
eventHandle(CreateEvent(NULL, FALSE, FALSE, NULL)),
syncHandle(CreateEvent(NULL, TRUE, FALSE, NULL)),
waitObject(NULL),
overlapped(this),
pendingBytesWrittenValue(0),
lastError(ERROR_SUCCESS),
stopped(true),
writeSequenceStarted(false),
notifiedCalled(false),
bytesWrittenPending(false),
winEventActPosted(false),
inBytesWritten(false)
{
ZeroMemory(&overlapped, sizeof(OVERLAPPED));
overlapped.hEvent = eventHandle;
waitObject = CreateThreadpoolWait(waitCallback, this, NULL);
if (waitObject == NULL)
qErrnoWarning("QWindowsPipeWriter: CreateThreadpollWait failed.");
connect(this, &QWindowsPipeWriter::_q_queueBytesWritten,
this, &QWindowsPipeWriter::emitPendingBytesWrittenValue, Qt::QueuedConnection);
}
QWindowsPipeWriter::~QWindowsPipeWriter()
{
stop();
CloseThreadpoolWait(waitObject);
CloseHandle(eventHandle);
CloseHandle(syncHandle);
}
bool QWindowsPipeWriter::waitForWrite(int msecs)
{
if (bytesWrittenPending) {
emitPendingBytesWrittenValue();
return true;
}
if (!writeSequenceStarted)
return false;
if (!waitForNotification(msecs))
return false;
if (bytesWrittenPending) {
emitPendingBytesWrittenValue();
return true;
}
return false;
}
qint64 QWindowsPipeWriter::bytesToWrite() const
{
return buffer.size() + pendingBytesWrittenValue;
}
void QWindowsPipeWriter::emitPendingBytesWrittenValue()
{
if (bytesWrittenPending) {
// Reset the state even if we don't emit bytesWritten().
// It's a defined behavior to not re-emit this signal recursively.
bytesWrittenPending = false;
const qint64 bytes = pendingBytesWrittenValue;
pendingBytesWrittenValue = 0;
emit canWrite();
if (!inBytesWritten) {
QScopedValueRollback<bool> guard(inBytesWritten, true);
emit bytesWritten(bytes);
}
}
}
void QWindowsPipeWriter::writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase)
{
Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
overlapped->pipeWriter->notified(errorCode, numberOfBytesTransfered);
}
/*!
\internal
Stops the asynchronous write sequence.
If the write sequence is running then the I/O operation is canceled.
Will be called whenever the write operation completes.
*/
void QWindowsPipeWriter::stop()
void QWindowsPipeWriter::notified(DWORD errorCode, DWORD numberOfBytesWritten)
{
notifiedCalled = true;
writeSequenceStarted = false;
Q_ASSERT(errorCode != ERROR_SUCCESS || numberOfBytesWritten == DWORD(buffer.size()));
buffer.clear();
switch (errorCode) {
case ERROR_SUCCESS:
break;
case ERROR_OPERATION_ABORTED:
if (stopped)
break;
Q_FALLTHROUGH();
default:
qErrnoWarning(errorCode, "QWindowsPipeWriter: asynchronous write failed.");
break;
}
// After the writer was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new write sequence should
// be started in this case.
if (stopped)
return;
mutex.lock();
pendingBytesWrittenValue += qint64(numberOfBytesWritten);
if (!bytesWrittenPending) {
bytesWrittenPending = true;
emit _q_queueBytesWritten(QWindowsPipeWriter::QPrivateSignal());
}
}
bool QWindowsPipeWriter::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
notifiedCalled = false;
int msecs = timeout;
while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
if (notifiedCalled)
return true;
// Some other I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
if (!msecs)
break;
}
return notifiedCalled;
}
bool QWindowsPipeWriter::write(const QByteArray &ba)
{
if (writeSequenceStarted)
return false;
overlapped.clear();
buffer = ba;
stopped = false;
writeSequenceStarted = true;
if (!WriteFileEx(handle, buffer.constData(), buffer.size(),
&overlapped, &writeFileCompleted)) {
writeSequenceStarted = false;
buffer.clear();
const DWORD errorCode = GetLastError();
switch (errorCode) {
case ERROR_NO_DATA: // "The pipe is being closed."
// The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
break;
default:
qErrnoWarning(errorCode, "QWindowsPipeWriter::write failed.");
}
return false;
}
return true;
}
void QWindowsPipeWriter::stop()
{
stopped = true;
bytesWrittenPending = false;
pendingBytesWrittenValue = 0;
if (writeSequenceStarted) {
// Trying to disable callback before canceling the operation.
// Callback invocation is unnecessary here.
SetThreadpoolWait(waitObject, NULL, NULL);
if (!CancelIoEx(handle, &overlapped)) {
const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) {
@ -96,250 +219,8 @@ void QWindowsPipeWriter::stop()
handle);
}
}
writeSequenceStarted = false;
waitForNotification(-1);
}
mutex.unlock();
WaitForThreadpoolWaitCallbacks(waitObject, TRUE);
}
/*!
\internal
Returns \c true if async operation is in progress or a bytesWritten
signal is pending.
*/
bool QWindowsPipeWriter::isWriteOperationActive() const
{
QMutexLocker locker(&mutex);
return writeSequenceStarted || bytesWrittenPending;
}
/*!
\internal
Returns the number of bytes that are waiting to be written.
*/
qint64 QWindowsPipeWriter::bytesToWrite() const
{
QMutexLocker locker(&mutex);
return writeBuffer.size() + pendingBytesWrittenValue;
}
/*!
\internal
Writes data to the pipe.
*/
bool QWindowsPipeWriter::write(const QByteArray &ba)
{
QMutexLocker locker(&mutex);
if (lastError != ERROR_SUCCESS)
return false;
writeBuffer.append(ba);
if (writeSequenceStarted)
return true;
stopped = false;
startAsyncWriteLocked();
// Do not post the event, if the write operation will be completed asynchronously.
if (bytesWrittenPending && !winEventActPosted) {
winEventActPosted = true;
locker.unlock();
QCoreApplication::postEvent(this, new QEvent(QEvent::WinEventAct));
}
return true;
}
/*!
\internal
Starts a new write sequence. Thread-safety should be ensured by the caller.
*/
void QWindowsPipeWriter::startAsyncWriteLocked()
{
forever {
if (writeBuffer.isEmpty())
return;
// WriteFile() returns true, if the write operation completes synchronously.
// We don't need to call GetOverlappedResult() additionally, because
// 'numberOfBytesWritten' is valid in this case.
DWORD numberOfBytesWritten;
if (!WriteFile(handle, writeBuffer.readPointer(), writeBuffer.nextDataBlockSize(),
&numberOfBytesWritten, &overlapped)) {
break;
}
writeCompleted(ERROR_SUCCESS, numberOfBytesWritten);
}
const DWORD dwError = GetLastError();
if (dwError == ERROR_IO_PENDING) {
// Operation has been queued and will complete in the future.
writeSequenceStarted = true;
SetThreadpoolWait(waitObject, eventHandle, NULL);
} else {
// Other return values are actual errors.
writeCompleted(dwError, 0);
}
}
/*!
\internal
Thread pool callback procedure.
*/
void QWindowsPipeWriter::waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult)
{
Q_UNUSED(instance);
Q_UNUSED(wait);
Q_UNUSED(waitResult);
QWindowsPipeWriter *pipeWriter = reinterpret_cast<QWindowsPipeWriter *>(context);
// Get the result of the asynchronous operation.
DWORD numberOfBytesTransfered = 0;
DWORD errorCode = ERROR_SUCCESS;
if (!GetOverlappedResult(pipeWriter->handle, &pipeWriter->overlapped,
&numberOfBytesTransfered, FALSE))
errorCode = GetLastError();
QMutexLocker locker(&pipeWriter->mutex);
// After the writer was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new write sequence
// should be started in this case.
if (pipeWriter->stopped)
return;
pipeWriter->writeSequenceStarted = false;
pipeWriter->writeCompleted(errorCode, numberOfBytesTransfered);
if (pipeWriter->lastError != ERROR_SUCCESS)
return;
pipeWriter->startAsyncWriteLocked();
if (!pipeWriter->winEventActPosted) {
pipeWriter->winEventActPosted = true;
locker.unlock();
QCoreApplication::postEvent(pipeWriter, new QEvent(QEvent::WinEventAct));
} else {
locker.unlock();
}
SetEvent(pipeWriter->syncHandle);
}
/*!
\internal
Will be called whenever the write operation completes.
*/
void QWindowsPipeWriter::writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten)
{
if (errorCode == ERROR_SUCCESS) {
Q_ASSERT(numberOfBytesWritten == DWORD(writeBuffer.nextDataBlockSize()));
bytesWrittenPending = true;
pendingBytesWrittenValue += numberOfBytesWritten;
writeBuffer.free(numberOfBytesWritten);
} else {
lastError = errorCode;
writeBuffer.clear();
// The other end has closed the pipe. This can happen in QLocalSocket. Do not warn.
if (errorCode != ERROR_OPERATION_ABORTED && errorCode != ERROR_NO_DATA)
qErrnoWarning(errorCode, "QWindowsPipeWriter: write failed.");
}
}
/*!
\internal
Receives notification that the write operation has completed.
*/
bool QWindowsPipeWriter::event(QEvent *e)
{
if (e->type() == QEvent::WinEventAct) {
emitPendingSignals(true);
return true;
}
return QObject::event(e);
}
/*!
\internal
Emits pending signals in the main thread. Returns \c true,
if bytesWritten() was emitted.
*/
bool QWindowsPipeWriter::emitPendingSignals(bool allowWinActPosting)
{
QMutexLocker locker(&mutex);
// Enable QEvent::WinEventAct posting.
if (allowWinActPosting)
winEventActPosted = false;
if (!bytesWrittenPending)
return false;
// Reset the state even if we don't emit bytesWritten().
// It's a defined behavior to not re-emit this signal recursively.
bytesWrittenPending = false;
qint64 numberOfBytesWritten = pendingBytesWrittenValue;
pendingBytesWrittenValue = 0;
locker.unlock();
// Disable any further processing, if the pipe was stopped.
if (stopped)
return false;
emit canWrite();
if (!inBytesWritten) {
QScopedValueRollback<bool> guard(inBytesWritten, true);
emit bytesWritten(numberOfBytesWritten);
}
return true;
}
bool QWindowsPipeWriter::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
int msecs = timeout;
do {
DWORD waitRet = WaitForSingleObjectEx(syncHandle,
msecs == -1 ? INFINITE : msecs, TRUE);
if (waitRet == WAIT_OBJECT_0)
return true;
if (waitRet != WAIT_IO_COMPLETION)
return false;
// Some I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
} while (msecs != 0);
return false;
}
/*!
\internal
Waits for the completion of the asynchronous write operation.
Returns \c true, if we've emitted the bytesWritten signal (non-recursive case)
or bytesWritten will be emitted by the event loop (recursive case).
*/
bool QWindowsPipeWriter::waitForWrite(int msecs)
{
// Prepare handle for waiting.
ResetEvent(syncHandle);
// It is necessary to check if there is already pending signal.
if (emitPendingSignals(false))
return true;
// Make sure that 'syncHandle' was triggered by the thread pool callback.
if (!isWriteOperationActive() || !waitForNotification(msecs))
return false;
return emitPendingSignals(false);
}
QT_END_NAMESPACE

View File

@ -1,6 +1,6 @@
/****************************************************************************
**
** Copyright (C) 2020 The Qt Company Ltd.
** Copyright (C) 2016 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the QtCore module of the Qt Toolkit.
@ -54,9 +54,7 @@
#include <QtCore/private/qglobal_p.h>
#include <qelapsedtimer.h>
#include <qobject.h>
#include <qmutex.h>
#include <private/qringbuffer_p.h>
#include <qbytearray.h>
#include <qt_windows.h>
QT_BEGIN_NAMESPACE
@ -119,37 +117,39 @@ public:
bool write(const QByteArray &ba);
void stop();
bool waitForWrite(int msecs);
bool isWriteOperationActive() const;
bool isWriteOperationActive() const { return writeSequenceStarted; }
qint64 bytesToWrite() const;
Q_SIGNALS:
void canWrite();
void bytesWritten(qint64 bytes);
protected:
bool event(QEvent *e) override;
void _q_queueBytesWritten(QPrivateSignal);
private:
void startAsyncWriteLocked();
static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
PTP_WAIT wait, TP_WAIT_RESULT waitResult);
void writeCompleted(DWORD errorCode, DWORD numberOfBytesWritten);
static void CALLBACK writeFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase);
void notified(DWORD errorCode, DWORD numberOfBytesWritten);
bool waitForNotification(int timeout);
bool emitPendingSignals(bool allowWinActPosting);
void emitPendingBytesWrittenValue();
class Overlapped : public OVERLAPPED
{
Q_DISABLE_COPY_MOVE(Overlapped)
public:
explicit Overlapped(QWindowsPipeWriter *pipeWriter);
void clear();
QWindowsPipeWriter *pipeWriter;
};
HANDLE handle;
HANDLE eventHandle;
HANDLE syncHandle;
PTP_WAIT waitObject;
OVERLAPPED overlapped;
QRingBuffer writeBuffer;
Overlapped overlapped;
QByteArray buffer;
qint64 pendingBytesWrittenValue;
mutable QMutex mutex;
DWORD lastError;
bool stopped;
bool writeSequenceStarted;
bool notifiedCalled;
bool bytesWrittenPending;
bool winEventActPosted;
bool inBytesWritten;
};

View File

@ -35,8 +35,6 @@
#include <QtGui/qrasterwindow.h>
#include <QtNetwork/qtcpserver.h>
#include <QtNetwork/qtcpsocket.h>
#include <QtNetwork/qlocalserver.h>
#include <QtNetwork/qlocalsocket.h>
#include <QtCore/qelapsedtimer.h>
#include <QtCore/qtimer.h>
#include <QtCore/qwineventnotifier.h>
@ -53,7 +51,6 @@ class tst_NoQtEventLoop : public QObject
private slots:
void consumeMouseEvents();
void consumeSocketEvents();
void consumeLocalSocketEvents();
void consumeWinEvents_data();
void consumeWinEvents();
void deliverEventsInLivelock();
@ -321,44 +318,6 @@ void tst_NoQtEventLoop::consumeSocketEvents()
QVERIFY(server.hasPendingConnections());
}
void tst_NoQtEventLoop::consumeLocalSocketEvents()
{
int argc = 1;
char *argv[] = { const_cast<char *>("test"), 0 };
QGuiApplication app(argc, argv);
QLocalServer server;
QLocalSocket client;
QSignalSpy readyReadSpy(&client, &QIODevice::readyRead);
QVERIFY(server.listen("consumeLocalSocketEvents"));
client.connectToServer("consumeLocalSocketEvents");
QVERIFY(client.waitForConnected(200));
QVERIFY(server.waitForNewConnection(200));
QLocalSocket *clientSocket = server.nextPendingConnection();
QVERIFY(clientSocket);
QSignalSpy bytesWrittenSpy(clientSocket, &QIODevice::bytesWritten);
server.close();
bool timeExpired = false;
QTimer::singleShot(3000, Qt::CoarseTimer, [&timeExpired]() {
timeExpired = true;
});
QVERIFY(clientSocket->putChar(0));
// Exec own message loop
MSG msg;
while (::GetMessage(&msg, NULL, 0, 0)) {
::TranslateMessage(&msg);
::DispatchMessage(&msg);
if (timeExpired || readyReadSpy.count() != 0)
break;
}
QVERIFY(!timeExpired);
QCOMPARE(bytesWrittenSpy.count(), 1);
QCOMPARE(readyReadSpy.count(), 1);
}
void tst_NoQtEventLoop::consumeWinEvents_data()
{
QTest::addColumn<bool>("peeking");

View File

@ -634,12 +634,11 @@ void tst_QLocalSocket::readBufferOverflow()
QCOMPARE(client.bytesAvailable(), 0);
#ifdef Q_OS_WIN
// ensure the previous write operation is finished
QVERIFY(serverSocket->waitForBytesWritten());
serverSocket->write(buffer, readBufferSize);
QVERIFY(serverSocket->waitForBytesWritten());
// ensure the read completion routine is called
SleepEx(100, true);
QVERIFY(client.waitForReadyRead());
QCOMPARE(client.read(buffer, readBufferSize), qint64(readBufferSize));

View File

@ -1,5 +1,4 @@
# Generated from socket.pro.
add_subdirectory(qlocalsocket)
add_subdirectory(qtcpserver)
add_subdirectory(qudpsocket)

View File

@ -1,16 +0,0 @@
# Generated from qlocalsocket.pro.
#####################################################################
## tst_bench_qlocalsocket Binary:
#####################################################################
qt_internal_add_benchmark(tst_bench_qlocalsocket
SOURCES
tst_qlocalsocket.cpp
PUBLIC_LIBRARIES
Qt::Network
Qt::Test
)
#### Keys ignored in scope 1:.:.:qlocalsocket.pro:<TRUE>:
# TEMPLATE = "app"

View File

@ -1,8 +0,0 @@
TEMPLATE = app
TARGET = tst_bench_qlocalsocket
QT = network testlib
CONFIG += release
SOURCES += tst_qlocalsocket.cpp

View File

@ -1,224 +0,0 @@
/****************************************************************************
**
** Copyright (C) 2020 The Qt Company Ltd.
** Contact: https://www.qt.io/licensing/
**
** This file is part of the test suite of the Qt Toolkit.
**
** $QT_BEGIN_LICENSE:GPL-EXCEPT$
** Commercial License Usage
** Licensees holding valid commercial Qt licenses may use this file in
** accordance with the commercial license agreement provided with the
** Software or, alternatively, in accordance with the terms contained in
** a written agreement between you and The Qt Company. For licensing terms
** and conditions see https://www.qt.io/terms-conditions. For further
** information use the contact form at https://www.qt.io/contact-us.
**
** GNU General Public License Usage
** Alternatively, this file may be used under the terms of the GNU
** General Public License version 3 as published by the Free Software
** Foundation with exceptions as appearing in the file LICENSE.GPL3-EXCEPT
** included in the packaging of this file. Please review the following
** information to ensure the GNU General Public License requirements will
** be met: https://www.gnu.org/licenses/gpl-3.0.html.
**
** $QT_END_LICENSE$
**
****************************************************************************/
#include <QtTest/QtTest>
#include <QtCore/qglobal.h>
#include <QtCore/qthread.h>
#include <QtCore/qsemaphore.h>
#include <QtCore/qbytearray.h>
#include <QtCore/qeventloop.h>
#include <QtCore/qvector.h>
#include <QtCore/qelapsedtimer.h>
#include <QtNetwork/qlocalsocket.h>
#include <QtNetwork/qlocalserver.h>
class tst_QLocalSocket : public QObject
{
Q_OBJECT
private slots:
void pingPong_data();
void pingPong();
void dataExchange_data();
void dataExchange();
};
class ServerThread : public QThread
{
public:
QSemaphore running;
explicit ServerThread(int chunkSize)
{
buffer.resize(chunkSize);
}
void run() override
{
QLocalServer server;
connect(&server, &QLocalServer::newConnection, [this, &server]() {
auto socket = server.nextPendingConnection();
connect(socket, &QLocalSocket::readyRead, [this, socket]() {
const qint64 bytesAvailable = socket->bytesAvailable();
Q_ASSERT(bytesAvailable <= this->buffer.size());
QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
});
});
QVERIFY(server.listen("foo"));
running.release();
exec();
}
protected:
QByteArray buffer;
};
class SocketFactory : public QObject
{
Q_OBJECT
public:
bool stopped = false;
explicit SocketFactory(int chunkSize, int connections)
{
buffer.resize(chunkSize);
for (int i = 0; i < connections; ++i) {
QLocalSocket *socket = new QLocalSocket(this);
Q_CHECK_PTR(socket);
connect(this, &SocketFactory::start, [this, socket]() {
QCOMPARE(socket->write(this->buffer), this->buffer.size());
});
connect(socket, &QLocalSocket::readyRead, [i, this, socket]() {
const qint64 bytesAvailable = socket->bytesAvailable();
Q_ASSERT(bytesAvailable <= this->buffer.size());
QCOMPARE(socket->read(this->buffer.data(), bytesAvailable), bytesAvailable);
emit this->bytesReceived(i, bytesAvailable);
if (!this->stopped)
QCOMPARE(socket->write(this->buffer.data(), bytesAvailable), bytesAvailable);
});
socket->connectToServer("foo");
QCOMPARE(socket->state(), QLocalSocket::ConnectedState);
}
}
signals:
void start();
void bytesReceived(int channel, qint64 bytes);
protected:
QByteArray buffer;
};
void tst_QLocalSocket::pingPong_data()
{
QTest::addColumn<int>("connections");
for (int value : {10, 50, 100, 1000, 5000})
QTest::addRow("connections: %d", value) << value;
}
void tst_QLocalSocket::pingPong()
{
QFETCH(int, connections);
const int iterations = 100000;
Q_ASSERT(iterations >= connections && connections > 0);
ServerThread serverThread(1);
serverThread.start();
// Wait for server to start.
QVERIFY(serverThread.running.tryAcquire(1, 3000));
SocketFactory factory(1, connections);
QEventLoop eventLoop;
QVector<qint64> bytesToRead;
QElapsedTimer timer;
bytesToRead.fill(iterations / connections, connections);
connect(&factory, &SocketFactory::bytesReceived,
[&bytesToRead, &connections, &factory, &eventLoop](int channel, qint64 bytes) {
Q_UNUSED(bytes);
if (--bytesToRead[channel] == 0 && --connections == 0) {
factory.stopped = true;
eventLoop.quit();
}
});
timer.start();
emit factory.start();
eventLoop.exec();
qDebug("Elapsed time: %.1f s", timer.elapsed() / 1000.0);
serverThread.quit();
serverThread.wait();
}
void tst_QLocalSocket::dataExchange_data()
{
QTest::addColumn<int>("connections");
QTest::addColumn<int>("chunkSize");
for (int connections : {1, 5, 10}) {
for (int chunkSize : {100, 1000, 10000, 100000}) {
QTest::addRow("connections: %d, chunk size: %d",
connections, chunkSize) << connections << chunkSize;
}
}
}
void tst_QLocalSocket::dataExchange()
{
QFETCH(int, connections);
QFETCH(int, chunkSize);
Q_ASSERT(chunkSize > 0 && connections > 0);
const qint64 timeToTest = 5000;
ServerThread serverThread(chunkSize);
serverThread.start();
// Wait for server to start.
QVERIFY(serverThread.running.tryAcquire(1, 3000));
SocketFactory factory(chunkSize, connections);
QEventLoop eventLoop;
qint64 totalReceived = 0;
QElapsedTimer timer;
connect(&factory, &SocketFactory::bytesReceived,
[&totalReceived, &timer, timeToTest, &factory, &eventLoop](int channel, qint64 bytes) {
Q_UNUSED(channel);
totalReceived += bytes;
if (timer.elapsed() >= timeToTest) {
factory.stopped = true;
eventLoop.quit();
}
});
timer.start();
emit factory.start();
eventLoop.exec();
qDebug("Transfer rate: %.1f MB/s", totalReceived / 1048.576 / timer.elapsed());
serverThread.quit();
serverThread.wait();
}
QTEST_MAIN(tst_QLocalSocket)
#include "tst_qlocalsocket.moc"

View File

@ -1,5 +1,4 @@
TEMPLATE = subdirs
SUBDIRS = \
qlocalsocket \
qtcpserver \
qudpsocket