wasm: add support for blocking sockets

Add support for blocking sockets on secondary threads and on the main
thread with asyncify. This extends the support for websockify tunneled
TCP sockets, which was previously limited to async sockets on the main
thread.

Blocking sockets support is implemented by emulating select() on top
of emscripten's socket notification support. This is requires synchronization
between the blockee threads and the main thread, since we get socket
notification callbacks on the main thread. The synchronized state is held
in g_socketState where the main thread registers socket readiness state and
blocking threads register themselves.

Blocking using asyncify on the main thread is similar to blocking on
a secondary thread, with the exception that the main thread suspends
with qt_asyncify_suspend() instead of waiting on a wait condition.

Change-Id: Idb5a493644e1e6634057dc2f64f2e99e82e3c01e
Reviewed-by: Lorn Potter <lorn.potter@gmail.com>
This commit is contained in:
Morten Johan Sørvig 2022-01-27 16:15:57 +01:00 committed by Morten Sørvig
parent 6376b1c5a7
commit 18f0793f9c
3 changed files with 205 additions and 54 deletions

View File

@ -96,6 +96,7 @@ Q_CONSTINIT std::mutex QEventDispatcherWasm::g_staticDataMutex;
#endif
// ### dynamic initialization:
std::multimap<int, QSocketNotifier *> QEventDispatcherWasm::g_socketNotifiers;
std::map<int, QEventDispatcherWasm::SocketReadyState> QEventDispatcherWasm::g_socketState;
QEventDispatcherWasm::QEventDispatcherWasm()
: QAbstractEventDispatcher()
@ -149,6 +150,13 @@ QEventDispatcherWasm::~QEventDispatcherWasm()
g_socketNotifiers.clear();
}
g_mainThreadEventDispatcher = nullptr;
if (!g_socketNotifiers.empty()) {
qWarning("QEventDispatcherWasm: main thread event dispatcher deleted with active socket notifiers");
clearEmscriptenSocketCallbacks();
g_socketNotifiers.clear();
}
g_socketState.clear();
}
}
@ -221,23 +229,17 @@ void QEventDispatcherWasm::processWindowSystemEvents(QEventLoop::ProcessEventsFl
void QEventDispatcherWasm::registerSocketNotifier(QSocketNotifier *notifier)
{
if (!emscripten_is_main_runtime_thread()) {
qWarning("QEventDispatcherWasm::registerSocketNotifier: socket notifiers on secondary threads are not supported");
return;
}
if (g_socketNotifiers.empty())
setEmscriptenSocketCallbacks();
LOCK_GUARD(g_staticDataMutex);
bool wasEmpty = g_socketNotifiers.empty();
g_socketNotifiers.insert({notifier->socket(), notifier});
if (wasEmpty)
runOnMainThread([]{ setEmscriptenSocketCallbacks(); });
}
void QEventDispatcherWasm::unregisterSocketNotifier(QSocketNotifier *notifier)
{
if (!emscripten_is_main_runtime_thread()) {
qWarning("QEventDispatcherWasm::registerSocketNotifier: socket notifiers on secondary threads are not supported");
return;
}
LOCK_GUARD(g_staticDataMutex);
auto notifiers = g_socketNotifiers.equal_range(notifier->socket());
for (auto it = notifiers.first; it != notifiers.second; ++it) {
@ -248,7 +250,7 @@ void QEventDispatcherWasm::unregisterSocketNotifier(QSocketNotifier *notifier)
}
if (g_socketNotifiers.empty())
clearEmscriptenSocketCallbacks();
runOnMainThread([]{ clearEmscriptenSocketCallbacks(); });
}
void QEventDispatcherWasm::registerTimer(int timerId, qint64 interval, Qt::TimerType timerType, QObject *object)
@ -425,8 +427,11 @@ bool QEventDispatcherWasm::wait(int timeout)
qWarning() << "QEventDispatcherWasm asyncify wait with timeout is not supported; timeout will be ignored"; // FIXME
bool didSuspend = qt_asyncify_suspend();
if (!didSuspend)
if (!didSuspend) {
qWarning("QEventDispatcherWasm: current thread is already suspended; could not asyncify wait for events");
return false;
}
return true;
#else
qWarning("QEventLoop::WaitForMoreEvents is not supported on the main thread without asyncify");
Q_UNUSED(timeout);
@ -605,80 +610,185 @@ void QEventDispatcherWasm::socketError(int socket, int err, const char* msg, voi
Q_UNUSED(msg);
Q_UNUSED(context);
qCDebug(lcEventDispatcher) << "QEventDispatcherWasm::socketError" << socket;
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers) {
QEvent event(QEvent::SockAct);
QCoreApplication::sendEvent(notifier, &event);
}
// Emscripten makes socket callbacks while the main thread is busy-waiting for a mutex,
// which can cause deadlocks if the callback code also tries to lock the same mutex.
// This is most easily reproducible by adding print statements, where each print requires
// taking a mutex lock. Work around this by runnign the callback asynchronously, i.e. by using
// a native zero-timer, to make sure the main thread stack is completely unwond before calling
// the Qt handler.
// It is currently unclear if this problem is caused by code in Qt or in Emscripten, or
// if this completely fixes the problem.
runAsync([socket](){
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers) {
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
}
setSocketState(socket, true, true);
});
}
void QEventDispatcherWasm::socketOpen(int socket, void *context)
{
Q_UNUSED(context);
qCDebug(lcEventDispatcher) << "QEventDispatcherWasm::socketOpen" << socket;
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers) {
if (notifier->type() == QSocketNotifier::Write) {
QEvent event(QEvent::SockAct);
QCoreApplication::sendEvent(notifier, &event);
runAsync([socket](){
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers) {
if (notifier->type() == QSocketNotifier::Write) {
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
}
}
}
setSocketState(socket, false, true);
});
}
void QEventDispatcherWasm::socketListen(int socket, void *context)
{
Q_UNUSED(socket);
Q_UNUSED(context);
qCDebug(lcEventDispatcher) << "QEventDispatcherWasm::socketListen" << socket;
}
void QEventDispatcherWasm::socketConnection(int socket, void *context)
{
Q_UNUSED(context);
Q_UNUSED(socket);
qCDebug(lcEventDispatcher) << "QEventDispatcherWasm::socketConnection" << socket;
Q_UNUSED(context);
}
void QEventDispatcherWasm::socketMessage(int socket, void *context)
{
Q_UNUSED(context);
qCDebug(lcEventDispatcher) << "QEventDispatcherWasm::socketMessage" << socket;
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers) {
if (notifier->type() == QSocketNotifier::Read) {
QEvent event(QEvent::SockAct);
QCoreApplication::sendEvent(notifier, &event);
runAsync([socket](){
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers) {
if (notifier->type() == QSocketNotifier::Read) {
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
}
}
}
setSocketState(socket, true, false);
});
}
void QEventDispatcherWasm::socketClose(int socket, void *context)
{
Q_UNUSED(context);
qCDebug(lcEventDispatcher) << "QEventDispatcherWasm::socketClose" << socket;
// Emscripten makes emscripten_set_socket_close_callback() calls to socket 0,
// which is not a valid socket. see https://github.com/emscripten-core/emscripten/issues/6596
if (socket == 0)
return;
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers) {
if (notifier->type() == QSocketNotifier::Write) {
QEvent event(QEvent::SockAct);
QCoreApplication::sendEvent(notifier, &event);
runAsync([socket](){
auto notifiersRange = g_socketNotifiers.equal_range(socket);
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
for (auto [_, notifier]: notifiers)
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockClose));
setSocketState(socket, true, true);
clearSocketState(socket);
});
}
void QEventDispatcherWasm::setSocketState(int socket, bool setReadyRead, bool setReadyWrite)
{
LOCK_GUARD(g_staticDataMutex);
SocketReadyState &state = g_socketState[socket];
// Additively update socket ready state, e.g. if it
// was already ready read then it stays ready read.
state.readyRead |= setReadyRead;
state.readyWrite |= setReadyWrite;
// Wake any waiters for the given readyness. The waiter consumes
// the ready state, returning the socket to not-ready.
if (QEventDispatcherWasm *waiter = state.waiter)
if ((state.readyRead && state.waitForReadyRead) || (state.readyWrite && state.waitForReadyWrite))
waiter->wakeEventDispatcherThread();
}
void QEventDispatcherWasm::clearSocketState(int socket)
{
LOCK_GUARD(g_staticDataMutex);
g_socketState.erase(socket);
}
void QEventDispatcherWasm::waitForSocketState(int timeout, int socket, bool checkRead, bool checkWrite,
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect)
{
// Loop until the socket becomes readyRead or readyWrite. Wait for
// socket activity if it currently is neither.
while (true) {
*selectForRead = false;
*selectForWrite = false;
{
LOCK_GUARD(g_staticDataMutex);
// Access or create socket state: we want to register that a thread is waitng
// even if we have not received any socket callbacks yet.
SocketReadyState &state = g_socketState[socket];
if (state.waiter) {
qWarning() << "QEventDispatcherWasm::waitForSocketState: a thread is already waiting";
break;
}
bool shouldWait = true;
if (checkRead && state.readyRead) {
shouldWait = false;
state.readyRead = false;
*selectForRead = true;
}
if (checkWrite && state.readyWrite) {
shouldWait = false;
state.readyWrite = false;
*selectForRead = true;
}
if (!shouldWait)
break;
state.waiter = this;
state.waitForReadyRead = checkRead;
state.waitForReadyWrite = checkWrite;
}
bool didTimeOut = !wait(timeout);
{
LOCK_GUARD(g_staticDataMutex);
// Missing socket state after a wakeup means that the socket has been closed.
auto it = g_socketState.find(socket);
if (it == g_socketState.end()) {
*socketDisconnect = true;
break;
}
it->second.waiter = nullptr;
it->second.waitForReadyRead = false;
it->second.waitForReadyWrite = false;
}
if (didTimeOut)
break;
}
}
void QEventDispatcherWasm::socketSelect(int timeout, int socket, bool waitForRead, bool waitForWrite,
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect)
{
QEventDispatcherWasm *eventDispatcher = static_cast<QEventDispatcherWasm *>(
QAbstractEventDispatcher::instance(QThread::currentThread()));
if (!eventDispatcher) {
qWarning("QEventDispatcherWasm::socketSelect called without eventdispatcher instance");
return;
}
eventDispatcher->waitForSocketState(timeout, socket, waitForRead, waitForWrite,
selectForRead, selectForWrite, socketDisconnect);
}
namespace {
void trampoline(void *context) {

View File

@ -50,7 +50,9 @@ public:
void interrupt() override;
void wakeUp() override;
protected:
static void socketSelect(int timeout, int socket, bool waitForRead, bool waitForWrite,
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect);
protected:
virtual void processWindowSystemEvents(QEventLoop::ProcessEventsFlags flags);
private:
@ -69,8 +71,8 @@ private:
void updateNativeTimer();
static void callProcessTimers(void *eventDispatcher);
void setEmscriptenSocketCallbacks();
void clearEmscriptenSocketCallbacks();
static void setEmscriptenSocketCallbacks();
static void clearEmscriptenSocketCallbacks();
static void socketError(int fd, int err, const char* msg, void *context);
static void socketOpen(int fd, void *context);
static void socketListen(int fd, void *context);
@ -78,6 +80,11 @@ private:
static void socketMessage(int fd, void *context);
static void socketClose(int fd, void *context);
static void setSocketState(int socket, bool setReadyRead, bool setReadyWrite);
static void clearSocketState(int socket);
void waitForSocketState(int timeout, int socket, bool checkRead, bool checkWrite,
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect);
static void run(std::function<void(void)> fn);
static void runAsync(std::function<void(void)> fn);
static void runOnMainThread(std::function<void(void)> fn);
@ -107,6 +114,15 @@ private:
#endif
static std::multimap<int, QSocketNotifier *> g_socketNotifiers;
struct SocketReadyState {
QEventDispatcherWasm *waiter = nullptr;
bool waitForReadyRead = false;
bool waitForReadyWrite = false;
bool readyRead = false;
bool readyWrite = false;
};
static std::map<int, SocketReadyState> g_socketState;
};
#endif // QEVENTDISPATCHER_WASM_P_H

View File

@ -11,6 +11,9 @@
#include "qvarlengtharray.h"
#include "qnetworkinterface.h"
#include "qendian.h"
#ifdef Q_OS_WASM
#include <private/qeventdispatcher_wasm_p.h>
#endif
#include <time.h>
#include <errno.h>
#include <fcntl.h>
@ -1353,6 +1356,8 @@ int QNativeSocketEnginePrivate::nativeSelect(int timeout, bool selectForRead) co
return nativeSelect(timeout, selectForRead, !selectForRead, &dummy, &dummy);
}
#ifndef Q_OS_WASM
int QNativeSocketEnginePrivate::nativeSelect(int timeout, bool checkRead, bool checkWrite,
bool *selectForRead, bool *selectForWrite) const
{
@ -1383,4 +1388,24 @@ int QNativeSocketEnginePrivate::nativeSelect(int timeout, bool checkRead, bool c
return ret;
}
#else
int QNativeSocketEnginePrivate::nativeSelect(int timeout, bool checkRead, bool checkWrite,
bool *selectForRead, bool *selectForWrite) const
{
*selectForRead = checkRead;
*selectForWrite = checkWrite;
bool socketDisconnect = false;
QEventDispatcherWasm::socketSelect(timeout, socketDescriptor, checkRead, checkWrite,selectForRead, selectForWrite, &socketDisconnect);
// The disconnect/close handling code in QAbstractsScket::canReadNotification()
// does not detect remote disconnect properly; do that here as a workardound.
if (socketDisconnect)
receiver->closeNotification();
return 1;
}
#endif // Q_OS_WASM
QT_END_NAMESPACE