QLocalSocket/Win: implement duplex communication in blocking mode
[ChangeLog][QtNetwork][QLocalSocket] The waitFor*() functions on Windows now support duplex operation, as they already did on Unix. As a side effect, this restores the behavior that a single call to waitForReadyRead() won't emit both readyRead() and disconnected(), which also matches Unix behavior. The groundwork for that misbehavior was laid by incorrect refactoring in d1a671b69 already, but at this point it was harmless, as the pipe couldn't be newly closed after a successful read. That changed with f265c87e0, which made the queuing of signals async. Change-Id: I1eb80e8f147bb58825143e0fe1e4300c59ae0fbb Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@gmx.de>
This commit is contained in:
parent
3d71c4b740
commit
ca14ed494c
@ -143,7 +143,7 @@ void QWindowsPipeReader::cancelAsyncRead(State newState)
|
|||||||
// Wait for callback to complete.
|
// Wait for callback to complete.
|
||||||
do {
|
do {
|
||||||
locker.unlock();
|
locker.unlock();
|
||||||
waitForNotification(QDeadlineTimer(-1));
|
waitForNotification();
|
||||||
locker.relock();
|
locker.relock();
|
||||||
} while (readSequenceStarted);
|
} while (readSequenceStarted);
|
||||||
}
|
}
|
||||||
@ -462,67 +462,18 @@ DWORD QWindowsPipeReader::checkPipeState()
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool QWindowsPipeReader::waitForNotification(const QDeadlineTimer &deadline)
|
bool QWindowsPipeReader::waitForNotification()
|
||||||
{
|
{
|
||||||
|
DWORD waitRet;
|
||||||
do {
|
do {
|
||||||
DWORD waitRet = WaitForSingleObjectEx(syncHandle, deadline.remainingTime(), TRUE);
|
waitRet = WaitForSingleObjectEx(syncHandle, INFINITE, TRUE);
|
||||||
if (waitRet == WAIT_OBJECT_0)
|
if (waitRet == WAIT_OBJECT_0)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
if (waitRet != WAIT_IO_COMPLETION)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
// Some I/O completion routine was called. Wait some more.
|
// Some I/O completion routine was called. Wait some more.
|
||||||
} while (!deadline.hasExpired());
|
} while (waitRet == WAIT_IO_COMPLETION);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
|
||||||
Waits for the completion of the asynchronous read operation.
|
|
||||||
Returns \c true, if we've emitted the readyRead signal.
|
|
||||||
*/
|
|
||||||
bool QWindowsPipeReader::waitForReadyRead(int msecs)
|
|
||||||
{
|
|
||||||
QDeadlineTimer timer(msecs);
|
|
||||||
|
|
||||||
// Make sure that 'syncHandle' was triggered by the thread pool callback.
|
|
||||||
while (isReadOperationActive() && waitForNotification(timer)) {
|
|
||||||
if (consumePendingAndEmit(false))
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*!
|
|
||||||
Waits until the pipe is closed.
|
|
||||||
*/
|
|
||||||
bool QWindowsPipeReader::waitForPipeClosed(int msecs)
|
|
||||||
{
|
|
||||||
const int sleepTime = 10;
|
|
||||||
QDeadlineTimer timer(msecs);
|
|
||||||
|
|
||||||
while (waitForReadyRead(timer.remainingTime())) {}
|
|
||||||
if (pipeBroken)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
if (timer.hasExpired())
|
|
||||||
return false;
|
|
||||||
|
|
||||||
// When the read buffer is full, the read sequence is not running,
|
|
||||||
// so we need to peek the pipe to detect disconnection.
|
|
||||||
forever {
|
|
||||||
checkPipeState();
|
|
||||||
consumePendingAndEmit(false);
|
|
||||||
if (pipeBroken)
|
|
||||||
return true;
|
|
||||||
|
|
||||||
if (timer.hasExpired())
|
|
||||||
return false;
|
|
||||||
|
|
||||||
Sleep(sleepTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
QT_END_NAMESPACE
|
QT_END_NAMESPACE
|
||||||
|
@ -53,7 +53,6 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
#include <qobject.h>
|
#include <qobject.h>
|
||||||
#include <qdeadlinetimer.h>
|
|
||||||
#include <qmutex.h>
|
#include <qmutex.h>
|
||||||
#include <private/qringbuffer_p.h>
|
#include <private/qringbuffer_p.h>
|
||||||
|
|
||||||
@ -80,9 +79,8 @@ public:
|
|||||||
qint64 bytesAvailable() const;
|
qint64 bytesAvailable() const;
|
||||||
qint64 read(char *data, qint64 maxlen);
|
qint64 read(char *data, qint64 maxlen);
|
||||||
bool canReadLine() const;
|
bool canReadLine() const;
|
||||||
bool waitForReadyRead(int msecs);
|
DWORD checkPipeState();
|
||||||
bool checkForReadyRead() { return consumePendingAndEmit(false); }
|
bool checkForReadyRead() { return consumePendingAndEmit(false); }
|
||||||
bool waitForPipeClosed(int msecs);
|
|
||||||
|
|
||||||
bool isReadOperationActive() const;
|
bool isReadOperationActive() const;
|
||||||
HANDLE syncEvent() const { return syncHandle; }
|
HANDLE syncEvent() const { return syncHandle; }
|
||||||
@ -103,8 +101,7 @@ private:
|
|||||||
static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
|
static void CALLBACK waitCallback(PTP_CALLBACK_INSTANCE instance, PVOID context,
|
||||||
PTP_WAIT wait, TP_WAIT_RESULT waitResult);
|
PTP_WAIT wait, TP_WAIT_RESULT waitResult);
|
||||||
bool readCompleted(DWORD errorCode, DWORD numberOfBytesRead);
|
bool readCompleted(DWORD errorCode, DWORD numberOfBytesRead);
|
||||||
DWORD checkPipeState();
|
bool waitForNotification();
|
||||||
bool waitForNotification(const QDeadlineTimer &deadline);
|
|
||||||
bool consumePendingAndEmit(bool allowWinActPosting);
|
bool consumePendingAndEmit(bool allowWinActPosting);
|
||||||
bool consumePending();
|
bool consumePending();
|
||||||
|
|
||||||
|
@ -133,6 +133,7 @@ public:
|
|||||||
#elif defined(Q_OS_WIN)
|
#elif defined(Q_OS_WIN)
|
||||||
~QLocalSocketPrivate();
|
~QLocalSocketPrivate();
|
||||||
void destroyPipeHandles();
|
void destroyPipeHandles();
|
||||||
|
qint64 pipeWriterBytesToWrite() const;
|
||||||
void _q_canRead();
|
void _q_canRead();
|
||||||
void _q_bytesWritten(qint64 bytes);
|
void _q_bytesWritten(qint64 bytes);
|
||||||
void _q_canWrite();
|
void _q_canWrite();
|
||||||
|
@ -39,9 +39,61 @@
|
|||||||
|
|
||||||
#include "qlocalsocket_p.h"
|
#include "qlocalsocket_p.h"
|
||||||
#include <qscopedvaluerollback.h>
|
#include <qscopedvaluerollback.h>
|
||||||
|
#include <qdeadlinetimer.h>
|
||||||
|
|
||||||
QT_BEGIN_NAMESPACE
|
QT_BEGIN_NAMESPACE
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
struct QSocketPoller
|
||||||
|
{
|
||||||
|
QSocketPoller(const QLocalSocketPrivate &socket);
|
||||||
|
|
||||||
|
bool poll(const QDeadlineTimer &deadline);
|
||||||
|
|
||||||
|
enum { maxHandles = 2 };
|
||||||
|
HANDLE handles[maxHandles];
|
||||||
|
DWORD handleCount = 0;
|
||||||
|
bool waitForClose = false;
|
||||||
|
};
|
||||||
|
|
||||||
|
QSocketPoller::QSocketPoller(const QLocalSocketPrivate &socket)
|
||||||
|
{
|
||||||
|
if (socket.pipeWriter)
|
||||||
|
handles[handleCount++] = socket.pipeWriter->syncEvent();
|
||||||
|
if (socket.pipeReader->isReadOperationActive())
|
||||||
|
handles[handleCount++] = socket.pipeReader->syncEvent();
|
||||||
|
else
|
||||||
|
waitForClose = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*!
|
||||||
|
Waits until new data is available for reading or write operation
|
||||||
|
completes. Returns \c true, if we need to check pipe workers;
|
||||||
|
otherwise it returns \c false (if an error occurred or the operation
|
||||||
|
timed out).
|
||||||
|
|
||||||
|
\note If the read operation is inactive, it succeeds after
|
||||||
|
a short wait, allowing the caller to check the state of the socket.
|
||||||
|
*/
|
||||||
|
bool QSocketPoller::poll(const QDeadlineTimer &deadline)
|
||||||
|
{
|
||||||
|
const qint64 sleepTime = 10;
|
||||||
|
QDeadlineTimer timer(waitForClose ? qMin(deadline.remainingTime(), sleepTime)
|
||||||
|
: deadline.remainingTime());
|
||||||
|
DWORD waitRet;
|
||||||
|
|
||||||
|
do {
|
||||||
|
waitRet = WaitForMultipleObjectsEx(handleCount, handles, FALSE,
|
||||||
|
timer.remainingTime(), TRUE);
|
||||||
|
} while (waitRet == WAIT_IO_COMPLETION);
|
||||||
|
|
||||||
|
if (waitRet == WAIT_TIMEOUT)
|
||||||
|
return !deadline.hasExpired();
|
||||||
|
|
||||||
|
return waitRet - WAIT_OBJECT_0 < handleCount;
|
||||||
|
}
|
||||||
|
} // anonymous namespace
|
||||||
|
|
||||||
void QLocalSocketPrivate::init()
|
void QLocalSocketPrivate::init()
|
||||||
{
|
{
|
||||||
Q_Q(QLocalSocket);
|
Q_Q(QLocalSocket);
|
||||||
@ -288,7 +340,7 @@ qint64 QLocalSocket::bytesAvailable() const
|
|||||||
qint64 QLocalSocket::bytesToWrite() const
|
qint64 QLocalSocket::bytesToWrite() const
|
||||||
{
|
{
|
||||||
Q_D(const QLocalSocket);
|
Q_D(const QLocalSocket);
|
||||||
return d->writeBuffer.size() + (d->pipeWriter ? d->pipeWriter->bytesToWrite() : 0);
|
return d->writeBuffer.size() + d->pipeWriterBytesToWrite();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool QLocalSocket::canReadLine() const
|
bool QLocalSocket::canReadLine() const
|
||||||
@ -370,6 +422,11 @@ bool QLocalSocket::setSocketDescriptor(qintptr socketDescriptor,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qint64 QLocalSocketPrivate::pipeWriterBytesToWrite() const
|
||||||
|
{
|
||||||
|
return pipeWriter ? pipeWriter->bytesToWrite() : qint64(0);
|
||||||
|
}
|
||||||
|
|
||||||
void QLocalSocketPrivate::_q_bytesWritten(qint64 bytes)
|
void QLocalSocketPrivate::_q_bytesWritten(qint64 bytes)
|
||||||
{
|
{
|
||||||
Q_Q(QLocalSocket);
|
Q_Q(QLocalSocket);
|
||||||
@ -384,7 +441,7 @@ void QLocalSocketPrivate::_q_canWrite()
|
|||||||
{
|
{
|
||||||
Q_Q(QLocalSocket);
|
Q_Q(QLocalSocket);
|
||||||
if (writeBuffer.isEmpty()) {
|
if (writeBuffer.isEmpty()) {
|
||||||
if (state == QLocalSocket::ClosingState)
|
if (state == QLocalSocket::ClosingState && pipeWriterBytesToWrite() == 0)
|
||||||
q->close();
|
q->close();
|
||||||
} else {
|
} else {
|
||||||
Q_ASSERT(pipeWriter);
|
Q_ASSERT(pipeWriter);
|
||||||
@ -428,11 +485,27 @@ bool QLocalSocket::waitForDisconnected(int msecs)
|
|||||||
qWarning("QLocalSocket::waitForDisconnected isn't supported for write only pipes.");
|
qWarning("QLocalSocket::waitForDisconnected isn't supported for write only pipes.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (d->pipeReader->waitForPipeClosed(msecs)) {
|
|
||||||
d->_q_pipeClosed();
|
QDeadlineTimer deadline(msecs);
|
||||||
return true;
|
while (!d->pipeReader->isPipeClosed()) {
|
||||||
|
d->_q_canWrite();
|
||||||
|
|
||||||
|
QSocketPoller poller(*d);
|
||||||
|
if (!poller.poll(deadline))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (d->pipeWriter)
|
||||||
|
d->pipeWriter->checkForWrite();
|
||||||
|
|
||||||
|
// When the read buffer is full, the read sequence is not running,
|
||||||
|
// so we need to peek the pipe to detect disconnection.
|
||||||
|
if (poller.waitForClose)
|
||||||
|
d->pipeReader->checkPipeState();
|
||||||
|
|
||||||
|
d->pipeReader->checkForReadyRead();
|
||||||
}
|
}
|
||||||
return false;
|
d->_q_pipeClosed();
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool QLocalSocket::isValid() const
|
bool QLocalSocket::isValid() const
|
||||||
@ -448,32 +521,52 @@ bool QLocalSocket::waitForReadyRead(int msecs)
|
|||||||
if (d->state != QLocalSocket::ConnectedState)
|
if (d->state != QLocalSocket::ConnectedState)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// We already know that the pipe is gone, but did not enter the event loop yet.
|
QDeadlineTimer deadline(msecs);
|
||||||
if (d->pipeReader->isPipeClosed()) {
|
while (!d->pipeReader->isPipeClosed()) {
|
||||||
d->_q_pipeClosed();
|
d->_q_canWrite();
|
||||||
return false;
|
|
||||||
|
QSocketPoller poller(*d);
|
||||||
|
if (poller.waitForClose || !poller.poll(deadline))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (d->pipeWriter)
|
||||||
|
d->pipeWriter->checkForWrite();
|
||||||
|
|
||||||
|
if (d->pipeReader->checkForReadyRead())
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
d->_q_pipeClosed();
|
||||||
bool result = d->pipeReader->waitForReadyRead(msecs);
|
return false;
|
||||||
|
|
||||||
// We just noticed that the pipe is gone.
|
|
||||||
if (d->pipeReader->isPipeClosed())
|
|
||||||
d->_q_pipeClosed();
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool QLocalSocket::waitForBytesWritten(int msecs)
|
bool QLocalSocket::waitForBytesWritten(int msecs)
|
||||||
{
|
{
|
||||||
Q_D(const QLocalSocket);
|
Q_D(QLocalSocket);
|
||||||
if (!d->pipeWriter)
|
|
||||||
|
if (d->state == QLocalSocket::UnconnectedState)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
// Wait for the pipe writer to acknowledge that it has
|
QDeadlineTimer deadline(msecs);
|
||||||
// written. This will succeed if either the pipe writer has
|
while (!d->pipeReader->isPipeClosed()) {
|
||||||
// already written the data, or if it manages to write data
|
if (bytesToWrite() == 0)
|
||||||
// within the given timeout.
|
return false;
|
||||||
return d->pipeWriter->waitForWrite(msecs);
|
d->_q_canWrite();
|
||||||
|
|
||||||
|
QSocketPoller poller(*d);
|
||||||
|
if (!poller.poll(deadline))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
Q_ASSERT(d->pipeWriter);
|
||||||
|
if (d->pipeWriter->checkForWrite())
|
||||||
|
return true;
|
||||||
|
|
||||||
|
if (poller.waitForClose)
|
||||||
|
d->pipeReader->checkPipeState();
|
||||||
|
|
||||||
|
d->pipeReader->checkForReadyRead();
|
||||||
|
}
|
||||||
|
d->_q_pipeClosed();
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
QT_END_NAMESPACE
|
QT_END_NAMESPACE
|
||||||
|
Loading…
x
Reference in New Issue
Block a user