wasm: move socket notifier impl to separate file
This is not core event dispatcher functionality, and can live in separate implementation files. Change-Id: I56202d59f57e8dd221f39b3b2ed34d57aacb89aa Reviewed-by: Morten Johan Sørvig <morten.sorvig@qt.io>
This commit is contained in:
parent
9a2370e434
commit
2e334c1395
@ -1427,6 +1427,7 @@ endif()
|
||||
qt_internal_extend_target(Core CONDITION WASM
|
||||
SOURCES
|
||||
platform/wasm/qstdweb.cpp platform/wasm/qstdweb_p.h
|
||||
platform/wasm/qwasmsocket.cpp platform/wasm/qwasmsocket_p.h
|
||||
kernel/qeventdispatcher_wasm.cpp kernel/qeventdispatcher_wasm_p.h
|
||||
)
|
||||
|
||||
|
@ -6,9 +6,8 @@
|
||||
#include <QtCore/private/qabstracteventdispatcher_p.h> // for qGlobalPostedEventsCount()
|
||||
#include <QtCore/qcoreapplication.h>
|
||||
#include <QtCore/qthread.h>
|
||||
#include <QtCore/qsocketnotifier.h>
|
||||
#include <QtCore/private/qstdweb_p.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <QtCore/private/qwasmsocket_p.h>
|
||||
|
||||
#include "emscripten.h"
|
||||
#include <emscripten/html5.h>
|
||||
@ -124,9 +123,6 @@ Q_CONSTINIT std::mutex QEventDispatcherWasm::g_staticDataMutex;
|
||||
emscripten::ProxyingQueue QEventDispatcherWasm::g_proxyingQueue;
|
||||
pthread_t QEventDispatcherWasm::g_mainThread;
|
||||
#endif
|
||||
// ### dynamic initialization:
|
||||
std::multimap<int, QSocketNotifier *> QEventDispatcherWasm::g_socketNotifiers;
|
||||
std::map<int, QEventDispatcherWasm::SocketReadyState> QEventDispatcherWasm::g_socketState;
|
||||
|
||||
QEventDispatcherWasm::QEventDispatcherWasm()
|
||||
{
|
||||
@ -182,19 +178,8 @@ QEventDispatcherWasm::~QEventDispatcherWasm()
|
||||
{
|
||||
if (m_timerId > 0)
|
||||
emscripten_clear_timeout(m_timerId);
|
||||
if (!g_socketNotifiers.empty()) {
|
||||
qWarning("QEventDispatcherWasm: main thread event dispatcher deleted with active socket notifiers");
|
||||
clearEmscriptenSocketCallbacks();
|
||||
g_socketNotifiers.clear();
|
||||
}
|
||||
QWasmSocket::clearSocketNotifiers();
|
||||
g_mainThreadEventDispatcher = nullptr;
|
||||
if (!g_socketNotifiers.empty()) {
|
||||
qWarning("QEventDispatcherWasm: main thread event dispatcher deleted with active socket notifiers");
|
||||
clearEmscriptenSocketCallbacks();
|
||||
g_socketNotifiers.clear();
|
||||
}
|
||||
|
||||
g_socketState.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@ -264,40 +249,6 @@ bool QEventDispatcherWasm::processEvents(QEventLoop::ProcessEventsFlags flags)
|
||||
return false;
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::registerSocketNotifier(QSocketNotifier *notifier)
|
||||
{
|
||||
LOCK_GUARD(g_staticDataMutex);
|
||||
|
||||
bool wasEmpty = g_socketNotifiers.empty();
|
||||
g_socketNotifiers.insert({notifier->socket(), notifier});
|
||||
if (wasEmpty)
|
||||
runOnMainThread([] { setEmscriptenSocketCallbacks(); });
|
||||
|
||||
int count;
|
||||
ioctl(notifier->socket(), FIONREAD, &count);
|
||||
|
||||
// message may have arrived already
|
||||
if (count > 0 && notifier->type() == QSocketNotifier::Read) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::unregisterSocketNotifier(QSocketNotifier *notifier)
|
||||
{
|
||||
LOCK_GUARD(g_staticDataMutex);
|
||||
|
||||
auto notifiers = g_socketNotifiers.equal_range(notifier->socket());
|
||||
for (auto it = notifiers.first; it != notifiers.second; ++it) {
|
||||
if (it->second == notifier) {
|
||||
g_socketNotifiers.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (g_socketNotifiers.empty())
|
||||
runOnMainThread([] { clearEmscriptenSocketCallbacks(); });
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::registerTimer(Qt::TimerId timerId, Duration interval, Qt::TimerType timerType, QObject *object)
|
||||
{
|
||||
#ifndef QT_NO_DEBUG
|
||||
@ -419,6 +370,32 @@ void QEventDispatcherWasm::handleApplicationExec()
|
||||
}, 0, simulateInfiniteLoop);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::registerSocketNotifier(QSocketNotifier *notifier)
|
||||
{
|
||||
QWasmSocket::registerSocketNotifier(notifier);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::unregisterSocketNotifier(QSocketNotifier *notifier)
|
||||
{
|
||||
QWasmSocket::unregisterSocketNotifier(notifier);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketSelect(int timeout, int socket, bool waitForRead, bool waitForWrite,
|
||||
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect)
|
||||
{
|
||||
QEventDispatcherWasm *eventDispatcher = static_cast<QEventDispatcherWasm *>(
|
||||
QAbstractEventDispatcher::instance(QThread::currentThread()));
|
||||
|
||||
if (!eventDispatcher) {
|
||||
qWarning("QEventDispatcherWasm::socketSelect called without eventdispatcher instance");
|
||||
return;
|
||||
}
|
||||
|
||||
QWasmSocket::waitForSocketState(eventDispatcher, timeout, socket, waitForRead, waitForWrite,
|
||||
selectForRead, selectForWrite, socketDisconnect);
|
||||
}
|
||||
|
||||
|
||||
void QEventDispatcherWasm::handleDialogExec()
|
||||
{
|
||||
if (!useAsyncify()) {
|
||||
@ -617,215 +594,6 @@ void QEventDispatcherWasm::callProcessTimers(void *context)
|
||||
#endif
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::setEmscriptenSocketCallbacks()
|
||||
{
|
||||
qCDebug(lcEventDispatcher) << "setEmscriptenSocketCallbacks";
|
||||
|
||||
emscripten_set_socket_error_callback(nullptr, QEventDispatcherWasm::socketError);
|
||||
emscripten_set_socket_open_callback(nullptr, QEventDispatcherWasm::socketOpen);
|
||||
emscripten_set_socket_listen_callback(nullptr, QEventDispatcherWasm::socketListen);
|
||||
emscripten_set_socket_connection_callback(nullptr, QEventDispatcherWasm::socketConnection);
|
||||
emscripten_set_socket_message_callback(nullptr, QEventDispatcherWasm::socketMessage);
|
||||
emscripten_set_socket_close_callback(nullptr, QEventDispatcherWasm::socketClose);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::clearEmscriptenSocketCallbacks()
|
||||
{
|
||||
qCDebug(lcEventDispatcher) << "clearEmscriptenSocketCallbacks";
|
||||
|
||||
emscripten_set_socket_error_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_open_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_listen_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_connection_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_message_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_close_callback(nullptr, nullptr);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketError(int socket, int err, const char* msg, void *context)
|
||||
{
|
||||
Q_UNUSED(err);
|
||||
Q_UNUSED(msg);
|
||||
Q_UNUSED(context);
|
||||
|
||||
// Emscripten makes socket callbacks while the main thread is busy-waiting for a mutex,
|
||||
// which can cause deadlocks if the callback code also tries to lock the same mutex.
|
||||
// This is most easily reproducible by adding print statements, where each print requires
|
||||
// taking a mutex lock. Work around this by running the callback asynchronously, i.e. by using
|
||||
// a native zero-timer, to make sure the main thread stack is completely unwond before calling
|
||||
// the Qt handler.
|
||||
// It is currently unclear if this problem is caused by code in Qt or in Emscripten, or
|
||||
// if this completely fixes the problem.
|
||||
runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
setSocketState(socket, true, true);
|
||||
});
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketOpen(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(context);
|
||||
|
||||
runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers) {
|
||||
if (notifier->type() == QSocketNotifier::Write) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
}
|
||||
setSocketState(socket, false, true);
|
||||
});
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketListen(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(socket);
|
||||
Q_UNUSED(context);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketConnection(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(socket);
|
||||
Q_UNUSED(context);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketMessage(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(context);
|
||||
|
||||
runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers) {
|
||||
if (notifier->type() == QSocketNotifier::Read) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
}
|
||||
setSocketState(socket, true, false);
|
||||
});
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketClose(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(context);
|
||||
|
||||
// Emscripten makes emscripten_set_socket_close_callback() calls to socket 0,
|
||||
// which is not a valid socket. see https://github.com/emscripten-core/emscripten/issues/6596
|
||||
if (socket == 0)
|
||||
return;
|
||||
|
||||
runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers)
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockClose));
|
||||
|
||||
setSocketState(socket, true, true);
|
||||
clearSocketState(socket);
|
||||
});
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::setSocketState(int socket, bool setReadyRead, bool setReadyWrite)
|
||||
{
|
||||
LOCK_GUARD(g_staticDataMutex);
|
||||
SocketReadyState &state = g_socketState[socket];
|
||||
|
||||
// Additively update socket ready state, e.g. if it
|
||||
// was already ready read then it stays ready read.
|
||||
state.readyRead |= setReadyRead;
|
||||
state.readyWrite |= setReadyWrite;
|
||||
|
||||
// Wake any waiters for the given readiness. The waiter consumes
|
||||
// the ready state, returning the socket to not-ready.
|
||||
if (QEventDispatcherWasm *waiter = state.waiter)
|
||||
if ((state.readyRead && state.waitForReadyRead) || (state.readyWrite && state.waitForReadyWrite))
|
||||
waiter->wakeEventDispatcherThread();
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::clearSocketState(int socket)
|
||||
{
|
||||
LOCK_GUARD(g_staticDataMutex);
|
||||
g_socketState.erase(socket);
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::waitForSocketState(int timeout, int socket, bool checkRead, bool checkWrite,
|
||||
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect)
|
||||
{
|
||||
// Loop until the socket becomes readyRead or readyWrite. Wait for
|
||||
// socket activity if it currently is neither.
|
||||
while (true) {
|
||||
*selectForRead = false;
|
||||
*selectForWrite = false;
|
||||
|
||||
{
|
||||
LOCK_GUARD(g_staticDataMutex);
|
||||
|
||||
// Access or create socket state: we want to register that a thread is waitng
|
||||
// even if we have not received any socket callbacks yet.
|
||||
SocketReadyState &state = g_socketState[socket];
|
||||
if (state.waiter) {
|
||||
qWarning() << "QEventDispatcherWasm::waitForSocketState: a thread is already waiting";
|
||||
break;
|
||||
}
|
||||
|
||||
bool shouldWait = true;
|
||||
if (checkRead && state.readyRead) {
|
||||
shouldWait = false;
|
||||
state.readyRead = false;
|
||||
*selectForRead = true;
|
||||
}
|
||||
if (checkWrite && state.readyWrite) {
|
||||
shouldWait = false;
|
||||
state.readyWrite = false;
|
||||
*selectForRead = true;
|
||||
}
|
||||
if (!shouldWait)
|
||||
break;
|
||||
|
||||
state.waiter = this;
|
||||
state.waitForReadyRead = checkRead;
|
||||
state.waitForReadyWrite = checkWrite;
|
||||
}
|
||||
|
||||
bool didTimeOut = !wait(timeout);
|
||||
{
|
||||
LOCK_GUARD(g_staticDataMutex);
|
||||
|
||||
// Missing socket state after a wakeup means that the socket has been closed.
|
||||
auto it = g_socketState.find(socket);
|
||||
if (it == g_socketState.end()) {
|
||||
*socketDisconnect = true;
|
||||
break;
|
||||
}
|
||||
it->second.waiter = nullptr;
|
||||
it->second.waitForReadyRead = false;
|
||||
it->second.waitForReadyWrite = false;
|
||||
}
|
||||
|
||||
if (didTimeOut)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void QEventDispatcherWasm::socketSelect(int timeout, int socket, bool waitForRead, bool waitForWrite,
|
||||
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect)
|
||||
{
|
||||
QEventDispatcherWasm *eventDispatcher = static_cast<QEventDispatcherWasm *>(
|
||||
QAbstractEventDispatcher::instance(QThread::currentThread()));
|
||||
|
||||
if (!eventDispatcher) {
|
||||
qWarning("QEventDispatcherWasm::socketSelect called without eventdispatcher instance");
|
||||
return;
|
||||
}
|
||||
|
||||
eventDispatcher->waitForSocketState(timeout, socket, waitForRead, waitForWrite,
|
||||
selectForRead, selectForWrite, socketDisconnect);
|
||||
}
|
||||
|
||||
namespace {
|
||||
int g_startupTasks = 0;
|
||||
}
|
||||
|
@ -41,9 +41,6 @@ public:
|
||||
|
||||
bool processEvents(QEventLoop::ProcessEventsFlags flags) override;
|
||||
|
||||
void registerSocketNotifier(QSocketNotifier *notifier) override;
|
||||
void unregisterSocketNotifier(QSocketNotifier *notifier) override;
|
||||
|
||||
void registerTimer(Qt::TimerId timerId, Duration interval, Qt::TimerType timerType,
|
||||
QObject *object) override final;
|
||||
bool unregisterTimer(Qt::TimerId timerId) override final;
|
||||
@ -54,10 +51,13 @@ public:
|
||||
void interrupt() override;
|
||||
void wakeUp() override;
|
||||
|
||||
static void runOnMainThread(std::function<void(void)> fn);
|
||||
void registerSocketNotifier(QSocketNotifier *notifier) override;
|
||||
void unregisterSocketNotifier(QSocketNotifier *notifier) override;
|
||||
static void socketSelect(int timeout, int socket, bool waitForRead, bool waitForWrite,
|
||||
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect);
|
||||
|
||||
static void runOnMainThread(std::function<void(void)> fn);
|
||||
|
||||
static void registerStartupTask();
|
||||
static void completeStarupTask();
|
||||
static void callOnLoadedIfRequired();
|
||||
@ -81,20 +81,6 @@ private:
|
||||
void updateNativeTimer();
|
||||
static void callProcessTimers(void *eventDispatcher);
|
||||
|
||||
static void setEmscriptenSocketCallbacks();
|
||||
static void clearEmscriptenSocketCallbacks();
|
||||
static void socketError(int fd, int err, const char* msg, void *context);
|
||||
static void socketOpen(int fd, void *context);
|
||||
static void socketListen(int fd, void *context);
|
||||
static void socketConnection(int fd, void *context);
|
||||
static void socketMessage(int fd, void *context);
|
||||
static void socketClose(int fd, void *context);
|
||||
|
||||
static void setSocketState(int socket, bool setReadyRead, bool setReadyWrite);
|
||||
static void clearSocketState(int socket);
|
||||
void waitForSocketState(int timeout, int socket, bool checkRead, bool checkWrite,
|
||||
bool *selectForRead, bool *selectForWrite, bool *socketDisconnect);
|
||||
|
||||
static void run(std::function<void(void)> fn);
|
||||
static void runAsync(std::function<void(void)> fn);
|
||||
static void runOnMainThreadAsync(std::function<void(void)> fn);
|
||||
@ -124,16 +110,7 @@ private:
|
||||
// that eventdispatcher thread. The locking order is g_staticDataMutex first, then m_mutex.
|
||||
#endif
|
||||
|
||||
static std::multimap<int, QSocketNotifier *> g_socketNotifiers;
|
||||
|
||||
struct SocketReadyState {
|
||||
QEventDispatcherWasm *waiter = nullptr;
|
||||
bool waitForReadyRead = false;
|
||||
bool waitForReadyWrite = false;
|
||||
bool readyRead = false;
|
||||
bool readyWrite = false;
|
||||
};
|
||||
static std::map<int, SocketReadyState> g_socketState;
|
||||
friend class QWasmSocket;
|
||||
};
|
||||
|
||||
#endif // QEVENTDISPATCHER_WASM_P_H
|
||||
|
261
src/corelib/platform/wasm/qwasmsocket.cpp
Normal file
261
src/corelib/platform/wasm/qwasmsocket.cpp
Normal file
@ -0,0 +1,261 @@
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#include "qwasmsocket_p.h"
|
||||
#include <QtCore/qsocketnotifier.h>
|
||||
|
||||
#include "emscripten.h"
|
||||
#include <sys/ioctl.h>
|
||||
|
||||
#if QT_CONFIG(thread)
|
||||
#define LOCK_GUARD(M) std::lock_guard<std::mutex> lock(M)
|
||||
#else
|
||||
#define LOCK_GUARD(M)
|
||||
#endif
|
||||
|
||||
#if QT_CONFIG(thread)
|
||||
Q_CONSTINIT std::mutex QWasmSocket::g_socketDataMutex;
|
||||
#endif
|
||||
|
||||
// ### dynamic initialization:
|
||||
std::multimap<int, QSocketNotifier *> QWasmSocket::g_socketNotifiers;
|
||||
std::map<int, QWasmSocket::SocketReadyState> QWasmSocket::g_socketState;
|
||||
|
||||
void QWasmSocket::registerSocketNotifier(QSocketNotifier *notifier)
|
||||
{
|
||||
LOCK_GUARD(g_socketDataMutex);
|
||||
|
||||
bool wasEmpty = g_socketNotifiers.empty();
|
||||
g_socketNotifiers.insert({notifier->socket(), notifier});
|
||||
if (wasEmpty)
|
||||
QEventDispatcherWasm::runOnMainThread([] { setEmscriptenSocketCallbacks(); });
|
||||
|
||||
int count;
|
||||
ioctl(notifier->socket(), FIONREAD, &count);
|
||||
|
||||
// message may have arrived already
|
||||
if (count > 0 && notifier->type() == QSocketNotifier::Read) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
}
|
||||
|
||||
void QWasmSocket::unregisterSocketNotifier(QSocketNotifier *notifier)
|
||||
{
|
||||
LOCK_GUARD(g_socketDataMutex);
|
||||
|
||||
auto notifiers = g_socketNotifiers.equal_range(notifier->socket());
|
||||
for (auto it = notifiers.first; it != notifiers.second; ++it) {
|
||||
if (it->second == notifier) {
|
||||
g_socketNotifiers.erase(it);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (g_socketNotifiers.empty())
|
||||
QEventDispatcherWasm::runOnMainThread([] { clearEmscriptenSocketCallbacks(); });
|
||||
}
|
||||
|
||||
void QWasmSocket::clearSocketNotifiers()
|
||||
{
|
||||
LOCK_GUARD(g_socketDataMutex);
|
||||
if (!g_socketNotifiers.empty()) {
|
||||
qWarning("QWasmSocket: Sockets cleared with active socket notifiers");
|
||||
clearEmscriptenSocketCallbacks();
|
||||
g_socketNotifiers.clear();
|
||||
}
|
||||
g_socketState.clear();
|
||||
}
|
||||
|
||||
void QWasmSocket::setEmscriptenSocketCallbacks()
|
||||
{
|
||||
qCDebug(lcEventDispatcher) << "setEmscriptenSocketCallbacks";
|
||||
|
||||
emscripten_set_socket_error_callback(nullptr, QWasmSocket::socketError);
|
||||
emscripten_set_socket_open_callback(nullptr, QWasmSocket::socketOpen);
|
||||
emscripten_set_socket_listen_callback(nullptr, QWasmSocket::socketListen);
|
||||
emscripten_set_socket_connection_callback(nullptr, QWasmSocket::socketConnection);
|
||||
emscripten_set_socket_message_callback(nullptr, QWasmSocket::socketMessage);
|
||||
emscripten_set_socket_close_callback(nullptr, QWasmSocket::socketClose);
|
||||
}
|
||||
|
||||
void QWasmSocket::clearEmscriptenSocketCallbacks()
|
||||
{
|
||||
qCDebug(lcEventDispatcher) << "clearEmscriptenSocketCallbacks";
|
||||
|
||||
emscripten_set_socket_error_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_open_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_listen_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_connection_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_message_callback(nullptr, nullptr);
|
||||
emscripten_set_socket_close_callback(nullptr, nullptr);
|
||||
}
|
||||
|
||||
void QWasmSocket::socketError(int socket, int err, const char* msg, void *context)
|
||||
{
|
||||
Q_UNUSED(err);
|
||||
Q_UNUSED(msg);
|
||||
Q_UNUSED(context);
|
||||
|
||||
// Emscripten makes socket callbacks while the main thread is busy-waiting for a mutex,
|
||||
// which can cause deadlocks if the callback code also tries to lock the same mutex.
|
||||
// This is most easily reproducible by adding print statements, where each print requires
|
||||
// taking a mutex lock. Work around this by running the callback asynchronously, i.e. by using
|
||||
// a native zero-timer, to make sure the main thread stack is completely unwond before calling
|
||||
// the Qt handler.
|
||||
// It is currently unclear if this problem is caused by code in Qt or in Emscripten, or
|
||||
// if this completely fixes the problem.
|
||||
QEventDispatcherWasm::runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
setSocketState(socket, true, true);
|
||||
});
|
||||
}
|
||||
|
||||
void QWasmSocket::socketOpen(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(context);
|
||||
|
||||
QEventDispatcherWasm::runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers) {
|
||||
if (notifier->type() == QSocketNotifier::Write) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
}
|
||||
setSocketState(socket, false, true);
|
||||
});
|
||||
}
|
||||
|
||||
void QWasmSocket::socketListen(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(socket);
|
||||
Q_UNUSED(context);
|
||||
}
|
||||
|
||||
void QWasmSocket::socketConnection(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(socket);
|
||||
Q_UNUSED(context);
|
||||
}
|
||||
|
||||
void QWasmSocket::socketMessage(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(context);
|
||||
|
||||
QEventDispatcherWasm::runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers) {
|
||||
if (notifier->type() == QSocketNotifier::Read) {
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockAct));
|
||||
}
|
||||
}
|
||||
setSocketState(socket, true, false);
|
||||
});
|
||||
}
|
||||
|
||||
void QWasmSocket::socketClose(int socket, void *context)
|
||||
{
|
||||
Q_UNUSED(context);
|
||||
|
||||
// Emscripten makes emscripten_set_socket_close_callback() calls to socket 0,
|
||||
// which is not a valid socket. see https://github.com/emscripten-core/emscripten/issues/6596
|
||||
if (socket == 0)
|
||||
return;
|
||||
|
||||
QEventDispatcherWasm::runAsync([socket](){
|
||||
auto notifiersRange = g_socketNotifiers.equal_range(socket);
|
||||
std::vector<std::pair<int, QSocketNotifier *>> notifiers(notifiersRange.first, notifiersRange.second);
|
||||
for (auto [_, notifier]: notifiers)
|
||||
QCoreApplication::postEvent(notifier, new QEvent(QEvent::SockClose));
|
||||
|
||||
setSocketState(socket, true, true);
|
||||
clearSocketState(socket);
|
||||
});
|
||||
}
|
||||
|
||||
void QWasmSocket::setSocketState(int socket, bool setReadyRead, bool setReadyWrite)
|
||||
{
|
||||
LOCK_GUARD(g_socketDataMutex);
|
||||
SocketReadyState &state = g_socketState[socket];
|
||||
|
||||
// Additively update socket ready state, e.g. if it
|
||||
// was already ready read then it stays ready read.
|
||||
state.readyRead |= setReadyRead;
|
||||
state.readyWrite |= setReadyWrite;
|
||||
|
||||
// Wake any waiters for the given readiness. The waiter consumes
|
||||
// the ready state, returning the socket to not-ready.
|
||||
if (QEventDispatcherWasm *waiter = state.waiter)
|
||||
if ((state.readyRead && state.waitForReadyRead) || (state.readyWrite && state.waitForReadyWrite))
|
||||
waiter->wakeUp();
|
||||
}
|
||||
|
||||
void QWasmSocket::clearSocketState(int socket)
|
||||
{
|
||||
LOCK_GUARD(g_socketDataMutex);
|
||||
g_socketState.erase(socket);
|
||||
}
|
||||
|
||||
void QWasmSocket::waitForSocketState(QEventDispatcherWasm *eventDispatcher, int timeout, int socket, bool checkRead,
|
||||
bool checkWrite, bool *selectForRead, bool *selectForWrite, bool *socketDisconnect)
|
||||
{
|
||||
// Loop until the socket becomes readyRead or readyWrite. Wait for
|
||||
// socket activity if it currently is neither.
|
||||
while (true) {
|
||||
*selectForRead = false;
|
||||
*selectForWrite = false;
|
||||
|
||||
{
|
||||
LOCK_GUARD(g_socketDataMutex);
|
||||
|
||||
// Access or create socket state: we want to register that a thread is waitng
|
||||
// even if we have not received any socket callbacks yet.
|
||||
SocketReadyState &state = g_socketState[socket];
|
||||
if (state.waiter) {
|
||||
qWarning() << "QEventDispatcherWasm::waitForSocketState: a thread is already waiting";
|
||||
break;
|
||||
}
|
||||
|
||||
bool shouldWait = true;
|
||||
if (checkRead && state.readyRead) {
|
||||
shouldWait = false;
|
||||
state.readyRead = false;
|
||||
*selectForRead = true;
|
||||
}
|
||||
if (checkWrite && state.readyWrite) {
|
||||
shouldWait = false;
|
||||
state.readyWrite = false;
|
||||
*selectForRead = true;
|
||||
}
|
||||
if (!shouldWait)
|
||||
break;
|
||||
|
||||
state.waiter = eventDispatcher;
|
||||
state.waitForReadyRead = checkRead;
|
||||
state.waitForReadyWrite = checkWrite;
|
||||
}
|
||||
|
||||
bool didTimeOut = !eventDispatcher->wait(timeout);
|
||||
{
|
||||
LOCK_GUARD(g_socketDataMutex);
|
||||
|
||||
// Missing socket state after a wakeup means that the socket has been closed.
|
||||
auto it = g_socketState.find(socket);
|
||||
if (it == g_socketState.end()) {
|
||||
*socketDisconnect = true;
|
||||
break;
|
||||
}
|
||||
it->second.waiter = nullptr;
|
||||
it->second.waitForReadyRead = false;
|
||||
it->second.waitForReadyWrite = false;
|
||||
}
|
||||
|
||||
if (didTimeOut)
|
||||
break;
|
||||
}
|
||||
}
|
56
src/corelib/platform/wasm/qwasmsocket_p.h
Normal file
56
src/corelib/platform/wasm/qwasmsocket_p.h
Normal file
@ -0,0 +1,56 @@
|
||||
// Copyright (C) 2024 The Qt Company Ltd.
|
||||
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
|
||||
|
||||
#ifndef QWASMSOCKET_P_H
|
||||
#define QWASMSOCKET_P_H
|
||||
|
||||
//
|
||||
// W A R N I N G
|
||||
// -------------
|
||||
//
|
||||
// This file is not part of the Qt API. It exists purely as an
|
||||
// implementation detail. This header file may change from version to
|
||||
// version without notice, or even be removed.
|
||||
//
|
||||
// We mean it.
|
||||
//
|
||||
|
||||
#include "QtCore/private/qeventdispatcher_wasm_p.h"
|
||||
|
||||
class QWasmSocket
|
||||
{
|
||||
public:
|
||||
static void registerSocketNotifier(QSocketNotifier *notifier);
|
||||
static void unregisterSocketNotifier(QSocketNotifier *notifier);
|
||||
static void clearSocketNotifiers();
|
||||
|
||||
static void setEmscriptenSocketCallbacks();
|
||||
static void clearEmscriptenSocketCallbacks();
|
||||
static void socketError(int fd, int err, const char* msg, void *context);
|
||||
static void socketOpen(int fd, void *context);
|
||||
static void socketListen(int fd, void *context);
|
||||
static void socketConnection(int fd, void *context);
|
||||
static void socketMessage(int fd, void *context);
|
||||
static void socketClose(int fd, void *context);
|
||||
|
||||
static void setSocketState(int socket, bool setReadyRead, bool setReadyWrite);
|
||||
static void clearSocketState(int socket);
|
||||
static void waitForSocketState(QEventDispatcherWasm *eventDispatcher, int timeout, int socket,
|
||||
bool checkRead, bool checkWrite, bool *selectForRead, bool *selectForWrite, bool *socketDisconnect);
|
||||
private:
|
||||
|
||||
#if QT_CONFIG(thread)
|
||||
Q_CONSTINIT static std::mutex g_socketDataMutex;
|
||||
#endif
|
||||
static std::multimap<int, QSocketNotifier *> g_socketNotifiers;
|
||||
struct SocketReadyState {
|
||||
QEventDispatcherWasm *waiter = nullptr;
|
||||
bool waitForReadyRead = false;
|
||||
bool waitForReadyWrite = false;
|
||||
bool readyRead = false;
|
||||
bool readyWrite = false;
|
||||
};
|
||||
static std::map<int, SocketReadyState> g_socketState;
|
||||
};
|
||||
|
||||
#endif // QWASMSOCKET_P_H
|
Loading…
x
Reference in New Issue
Block a user