Rework QWindowsPipeReader

The use of QWinOverlappedIoNotifier in QWindowsPipeReader restricts us
in the following ways:
  - The handle that gets assigned to QWinOverlappedIoNotifier is forever
    tied to an I/O completion port.
  - Other notification mechanisms like I/O completion routines of
    WriteFileEx do not work with such a handle.
  - No other QWinOverlappedIoNotifier can be registered for this handle.

To achieve the ultimate goal of making QWindowsPipeWriter thread-free
(to fix QTBUG-23378 and QTBUG-38185) we remove the usage of
QWinOverlappedIoNotifier from QWindowsPipeReader and use the
ReadFileEx API instead.
This has the additional advantage of removing the need for any thread
synchronization, as the I/O completion routine runs in the thread that
ReadFileEx was called on, leading to simpler and faster code.

Change-Id: I05c983e1f1e49d7dd27e3b77a47f87cae9c3f4c6
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>
This commit is contained in:
Joerg Bornemann 2016-03-15 10:59:59 +01:00
parent 461ebedb98
commit 5c89e2eeee
2 changed files with 117 additions and 67 deletions

View File

@ -32,25 +32,37 @@
****************************************************************************/ ****************************************************************************/
#include "qwindowspipereader_p.h" #include "qwindowspipereader_p.h"
#include "qwinoverlappedionotifier_p.h" #include "qiodevice_p.h"
#include <qdebug.h>
#include <qelapsedtimer.h> #include <qelapsedtimer.h>
#include <qeventloop.h>
QT_BEGIN_NAMESPACE QT_BEGIN_NAMESPACE
QWindowsPipeReader::Overlapped::Overlapped(QWindowsPipeReader *reader)
: pipeReader(reader)
{
}
void QWindowsPipeReader::Overlapped::clear()
{
ZeroMemory(this, sizeof(OVERLAPPED));
}
QWindowsPipeReader::QWindowsPipeReader(QObject *parent) QWindowsPipeReader::QWindowsPipeReader(QObject *parent)
: QObject(parent), : QObject(parent),
handle(INVALID_HANDLE_VALUE), handle(INVALID_HANDLE_VALUE),
overlapped(this),
readBufferMaxSize(0), readBufferMaxSize(0),
actualReadBufferSize(0), actualReadBufferSize(0),
stopped(true), stopped(true),
readSequenceStarted(false), readSequenceStarted(false),
notifiedCalled(false),
pipeBroken(false), pipeBroken(false),
readyReadEmitted(false) readyReadPending(false),
inReadyRead(false)
{ {
dataReadNotifier = new QWinOverlappedIoNotifier(this); connect(this, &QWindowsPipeReader::_q_queueReadyRead,
connect(dataReadNotifier, &QWinOverlappedIoNotifier::notified, this, &QWindowsPipeReader::notified); this, &QWindowsPipeReader::emitPendingReadyRead, Qt::QueuedConnection);
} }
static bool qt_cancelIo(HANDLE handle, OVERLAPPED *overlapped) static bool qt_cancelIo(HANDLE handle, OVERLAPPED *overlapped)
@ -82,12 +94,6 @@ void QWindowsPipeReader::setHandle(HANDLE hPipeReadEnd)
actualReadBufferSize = 0; actualReadBufferSize = 0;
handle = hPipeReadEnd; handle = hPipeReadEnd;
pipeBroken = false; pipeBroken = false;
readyReadEmitted = false;
stopped = false;
if (hPipeReadEnd != INVALID_HANDLE_VALUE) {
dataReadNotifier->setHandle(hPipeReadEnd);
dataReadNotifier->setEnabled(true);
}
} }
/*! /*!
@ -98,19 +104,15 @@ void QWindowsPipeReader::stop()
{ {
stopped = true; stopped = true;
if (readSequenceStarted) { if (readSequenceStarted) {
if (qt_cancelIo(handle, &overlapped)) { if (!qt_cancelIo(handle, &overlapped)) {
dataReadNotifier->waitForNotified(-1, &overlapped);
} else {
const DWORD dwError = GetLastError(); const DWORD dwError = GetLastError();
if (dwError != ERROR_NOT_FOUND) { if (dwError != ERROR_NOT_FOUND) {
qErrnoWarning(dwError, "QWindowsPipeReader: qt_cancelIo on handle %x failed.", qErrnoWarning(dwError, "QWindowsPipeReader: qt_cancelIo on handle %x failed.",
handle); handle);
} }
} }
waitForNotification(-1);
} }
readSequenceStarted = false;
dataReadNotifier->setEnabled(false);
handle = INVALID_HANDLE_VALUE;
} }
/*! /*!
@ -168,11 +170,10 @@ bool QWindowsPipeReader::canReadLine() const
\internal \internal
Will be called whenever the read operation completes. Will be called whenever the read operation completes.
*/ */
void QWindowsPipeReader::notified(quint32 numberOfBytesRead, quint32 errorCode, void QWindowsPipeReader::notified(DWORD errorCode, DWORD numberOfBytesRead)
OVERLAPPED *notifiedOverlapped)
{ {
if (&overlapped != notifiedOverlapped) notifiedCalled = true;
return; readSequenceStarted = false;
switch (errorCode) { switch (errorCode) {
case ERROR_SUCCESS: case ERROR_SUCCESS:
@ -196,8 +197,6 @@ void QWindowsPipeReader::notified(quint32 numberOfBytesRead, quint32 errorCode,
break; break;
} }
readSequenceStarted = false;
// After the reader was stopped, the only reason why this function can be called is the // After the reader was stopped, the only reason why this function can be called is the
// completion of a cancellation. No signals should be emitted, and no new read sequence should // completion of a cancellation. No signals should be emitted, and no new read sequence should
// be started in this case. // be started in this case.
@ -212,13 +211,15 @@ void QWindowsPipeReader::notified(quint32 numberOfBytesRead, quint32 errorCode,
actualReadBufferSize += numberOfBytesRead; actualReadBufferSize += numberOfBytesRead;
readBuffer.truncate(actualReadBufferSize); readBuffer.truncate(actualReadBufferSize);
startAsyncRead(); startAsyncRead();
readyReadEmitted = true; if (!readyReadPending) {
emit readyRead(); readyReadPending = true;
emit _q_queueReadyRead(QWindowsPipeReader::QPrivateSignal());
}
} }
/*! /*!
\internal \internal
Reads data from the socket into the readbuffer Reads data from the pipe into the readbuffer.
*/ */
void QWindowsPipeReader::startAsyncRead() void QWindowsPipeReader::startAsyncRead()
{ {
@ -238,41 +239,39 @@ void QWindowsPipeReader::startAsyncRead()
char *ptr = readBuffer.reserve(bytesToRead); char *ptr = readBuffer.reserve(bytesToRead);
stopped = false;
readSequenceStarted = true; readSequenceStarted = true;
ZeroMemory(&overlapped, sizeof(overlapped)); overlapped.clear();
if (ReadFile(handle, ptr, bytesToRead, NULL, &overlapped)) { if (!ReadFileEx(handle, ptr, bytesToRead, &overlapped, &readFileCompleted)) {
// We get notified by the QWinOverlappedIoNotifier - even in the synchronous case. readSequenceStarted = false;
return;
} else {
const DWORD dwError = GetLastError(); const DWORD dwError = GetLastError();
switch (dwError) { switch (dwError) {
case ERROR_IO_PENDING:
// This is not an error. We're getting notified, when data arrives.
return;
case ERROR_MORE_DATA:
// This is not an error. The synchronous read succeeded.
// We're connected to a message mode pipe and the message
// didn't fit into the pipe's system buffer.
// We're getting notified by the QWinOverlappedIoNotifier.
break;
case ERROR_BROKEN_PIPE: case ERROR_BROKEN_PIPE:
case ERROR_PIPE_NOT_CONNECTED: case ERROR_PIPE_NOT_CONNECTED:
{ // It may happen, that the other side closes the connection directly
// It may happen, that the other side closes the connection directly // after writing data. Then we must set the appropriate socket state.
// after writing data. Then we must set the appropriate socket state. pipeBroken = true;
readSequenceStarted = false; emit pipeClosed();
pipeBroken = true; break;
emit pipeClosed();
return;
}
default: default:
readSequenceStarted = false;
emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead")); emit winError(dwError, QLatin1String("QWindowsPipeReader::startAsyncRead"));
return; break;
} }
} }
} }
/*!
\internal
Called when ReadFileEx finished the read operation.
*/
void QWindowsPipeReader::readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase)
{
Overlapped *overlapped = static_cast<Overlapped *>(overlappedBase);
overlapped->pipeReader->notified(errorCode, numberOfBytesTransfered);
}
/*! /*!
\internal \internal
Returns the number of available bytes in the pipe. Returns the number of available bytes in the pipe.
@ -292,17 +291,60 @@ DWORD QWindowsPipeReader::checkPipeState()
return 0; return 0;
} }
bool QWindowsPipeReader::waitForNotification(int timeout)
{
QElapsedTimer t;
t.start();
notifiedCalled = false;
int msecs = timeout;
while (SleepEx(msecs == -1 ? INFINITE : msecs, TRUE) == WAIT_IO_COMPLETION) {
if (notifiedCalled)
return true;
// Some other I/O completion routine was called. Wait some more.
msecs = qt_subtract_from_timeout(timeout, t.elapsed());
if (!msecs)
break;
}
return notifiedCalled;
}
void QWindowsPipeReader::emitPendingReadyRead()
{
if (readyReadPending) {
readyReadPending = false;
inReadyRead = true;
emit readyRead();
inReadyRead = false;
}
}
/*! /*!
Waits for the completion of the asynchronous read operation. Waits for the completion of the asynchronous read operation.
Returns \c true, if we've emitted the readyRead signal. Returns \c true, if we've emitted the readyRead signal (non-recursive case)
or readyRead will be emitted by the event loop (recursive case).
*/ */
bool QWindowsPipeReader::waitForReadyRead(int msecs) bool QWindowsPipeReader::waitForReadyRead(int msecs)
{ {
if (!readSequenceStarted) if (!readSequenceStarted)
return false; return false;
readyReadEmitted = false;
dataReadNotifier->waitForNotified(msecs, &overlapped); if (readyReadPending) {
return readyReadEmitted; if (!inReadyRead)
emitPendingReadyRead();
return true;
}
if (!waitForNotification(msecs))
return false;
if (readyReadPending) {
if (!inReadyRead)
emitPendingReadyRead();
return true;
}
return false;
} }
/*! /*!

View File

@ -45,7 +45,6 @@
// We mean it. // We mean it.
// //
#include <qbytearray.h>
#include <qobject.h> #include <qobject.h>
#include <private/qringbuffer_p.h> #include <private/qringbuffer_p.h>
@ -53,9 +52,6 @@
QT_BEGIN_NAMESPACE QT_BEGIN_NAMESPACE
class QWinOverlappedIoNotifier;
class Q_CORE_EXPORT QWindowsPipeReader : public QObject class Q_CORE_EXPORT QWindowsPipeReader : public QObject
{ {
Q_OBJECT Q_OBJECT
@ -64,6 +60,7 @@ public:
~QWindowsPipeReader(); ~QWindowsPipeReader();
void setHandle(HANDLE hPipeReadEnd); void setHandle(HANDLE hPipeReadEnd);
void startAsyncRead();
void stop(); void stop();
void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; } void setMaxReadBufferSize(qint64 size) { readBufferMaxSize = size; }
@ -76,31 +73,42 @@ public:
bool waitForReadyRead(int msecs); bool waitForReadyRead(int msecs);
bool waitForPipeClosed(int msecs); bool waitForPipeClosed(int msecs);
void startAsyncRead();
bool isReadOperationActive() const { return readSequenceStarted; } bool isReadOperationActive() const { return readSequenceStarted; }
Q_SIGNALS: Q_SIGNALS:
void winError(ulong, const QString &); void winError(ulong, const QString &);
void readyRead(); void readyRead();
void pipeClosed(); void pipeClosed();
void _q_queueReadyRead(QPrivateSignal);
private Q_SLOTS:
void notified(quint32 numberOfBytesRead, quint32 errorCode, OVERLAPPED *notifiedOverlapped);
private: private:
static void CALLBACK readFileCompleted(DWORD errorCode, DWORD numberOfBytesTransfered,
OVERLAPPED *overlappedBase);
void notified(DWORD errorCode, DWORD numberOfBytesRead);
DWORD checkPipeState(); DWORD checkPipeState();
bool waitForNotification(int timeout);
void emitPendingReadyRead();
class Overlapped : public OVERLAPPED
{
Q_DISABLE_COPY(Overlapped)
public:
explicit Overlapped(QWindowsPipeReader *reader);
void clear();
QWindowsPipeReader *pipeReader;
};
private:
HANDLE handle; HANDLE handle;
OVERLAPPED overlapped; Overlapped overlapped;
QWinOverlappedIoNotifier *dataReadNotifier;
qint64 readBufferMaxSize; qint64 readBufferMaxSize;
QRingBuffer readBuffer; QRingBuffer readBuffer;
qint64 actualReadBufferSize; qint64 actualReadBufferSize;
bool stopped; bool stopped;
bool readSequenceStarted; bool readSequenceStarted;
bool notifiedCalled;
bool pipeBroken; bool pipeBroken;
bool readyReadEmitted; bool readyReadPending;
bool inReadyRead;
}; };
QT_END_NAMESPACE QT_END_NAMESPACE