all: retry incomplete socket reads
Fixes greetd and hyprland ipc sockets reads being incomplete and breaking said integrations on slow machines.
This commit is contained in:
parent
cf1a2aeb2d
commit
bd62179277
13 changed files with 221 additions and 103 deletions
|
|
@ -52,6 +52,7 @@ set shell id.
|
|||
- Fixed crashes when default pipewire devices are lost.
|
||||
- Fixed ToplevelManager not clearing activeToplevel on deactivation.
|
||||
- Desktop action order is now preserved.
|
||||
- Fixed partial socket reads in greetd and hyprland on slow machines.
|
||||
|
||||
## Packaging Changes
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ qt_add_library(quickshell-core STATIC
|
|||
scriptmodel.cpp
|
||||
colorquantizer.cpp
|
||||
toolsupport.cpp
|
||||
streamreader.cpp
|
||||
)
|
||||
|
||||
qt_add_qml_module(quickshell-core
|
||||
|
|
|
|||
98
src/core/streamreader.cpp
Normal file
98
src/core/streamreader.cpp
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
#include "streamreader.hpp"
|
||||
#include <cstring>
|
||||
|
||||
#include <qbytearray.h>
|
||||
#include <qiodevice.h>
|
||||
#include <qtypes.h>
|
||||
|
||||
void StreamReader::setDevice(QIODevice* device) {
|
||||
this->reset();
|
||||
this->device = device;
|
||||
}
|
||||
|
||||
void StreamReader::startTransaction() {
|
||||
this->cursor = 0;
|
||||
this->failed = false;
|
||||
}
|
||||
|
||||
bool StreamReader::fill() {
|
||||
auto available = this->device->bytesAvailable();
|
||||
if (available <= 0) return false;
|
||||
auto oldSize = this->buffer.size();
|
||||
this->buffer.resize(oldSize + available);
|
||||
auto bytesRead = this->device->read(this->buffer.data() + oldSize, available); // NOLINT
|
||||
|
||||
if (bytesRead <= 0) {
|
||||
this->buffer.resize(oldSize);
|
||||
return false;
|
||||
}
|
||||
|
||||
this->buffer.resize(oldSize + bytesRead);
|
||||
return true;
|
||||
}
|
||||
|
||||
QByteArray StreamReader::readBytes(qsizetype count) {
|
||||
if (this->failed) return {};
|
||||
|
||||
auto needed = this->cursor + count;
|
||||
|
||||
while (this->buffer.size() < needed) {
|
||||
if (!this->fill()) {
|
||||
this->failed = true;
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
auto result = this->buffer.mid(this->cursor, count);
|
||||
this->cursor += count;
|
||||
return result;
|
||||
}
|
||||
|
||||
QByteArray StreamReader::readUntil(char terminator) {
|
||||
if (this->failed) return {};
|
||||
|
||||
auto searchFrom = this->cursor;
|
||||
auto idx = this->buffer.indexOf(terminator, searchFrom);
|
||||
|
||||
while (idx == -1) {
|
||||
searchFrom = this->buffer.size();
|
||||
if (!this->fill()) {
|
||||
this->failed = true;
|
||||
return {};
|
||||
}
|
||||
|
||||
idx = this->buffer.indexOf(terminator, searchFrom);
|
||||
}
|
||||
|
||||
auto length = idx - this->cursor + 1;
|
||||
auto result = this->buffer.mid(this->cursor, length);
|
||||
this->cursor += length;
|
||||
return result;
|
||||
}
|
||||
|
||||
void StreamReader::readInto(char* ptr, qsizetype count) {
|
||||
auto data = this->readBytes(count);
|
||||
if (!data.isEmpty()) memcpy(ptr, data.data(), count);
|
||||
}
|
||||
|
||||
qint32 StreamReader::readI32() {
|
||||
qint32 value = 0;
|
||||
this->readInto(reinterpret_cast<char*>(&value), sizeof(qint32));
|
||||
return value;
|
||||
}
|
||||
|
||||
bool StreamReader::commitTransaction() {
|
||||
if (this->failed) {
|
||||
this->cursor = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
this->buffer.remove(0, this->cursor);
|
||||
this->cursor = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
void StreamReader::reset() {
|
||||
this->buffer.clear();
|
||||
this->cursor = 0;
|
||||
}
|
||||
26
src/core/streamreader.hpp
Normal file
26
src/core/streamreader.hpp
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
#pragma once
|
||||
|
||||
#include <qbytearray.h>
|
||||
#include <qiodevice.h>
|
||||
#include <qtypes.h>
|
||||
|
||||
class StreamReader {
|
||||
public:
|
||||
void setDevice(QIODevice* device);
|
||||
|
||||
void startTransaction();
|
||||
QByteArray readBytes(qsizetype count);
|
||||
QByteArray readUntil(char terminator);
|
||||
void readInto(char* ptr, qsizetype count);
|
||||
qint32 readI32();
|
||||
bool commitTransaction();
|
||||
void reset();
|
||||
|
||||
private:
|
||||
bool fill();
|
||||
|
||||
QIODevice* device = nullptr;
|
||||
QByteArray buffer;
|
||||
qsizetype cursor = 0;
|
||||
bool failed = false;
|
||||
};
|
||||
|
|
@ -12,7 +12,7 @@ qt_add_qml_module(quickshell-service-greetd
|
|||
install_qml_module(quickshell-service-greetd)
|
||||
|
||||
# can't be Qt::Qml because generation.hpp pulls in gui types
|
||||
target_link_libraries(quickshell-service-greetd PRIVATE Qt::Quick)
|
||||
target_link_libraries(quickshell-service-greetd PRIVATE Qt::Quick quickshell-core)
|
||||
|
||||
qs_module_pch(quickshell-service-greetd)
|
||||
|
||||
|
|
|
|||
|
|
@ -145,6 +145,7 @@ void GreetdConnection::setInactive() {
|
|||
QString GreetdConnection::user() const { return this->mUser; }
|
||||
|
||||
void GreetdConnection::onSocketConnected() {
|
||||
this->reader.setDevice(&this->socket);
|
||||
qCDebug(logGreetd) << "Connected to greetd socket.";
|
||||
|
||||
if (this->mTargetActive) {
|
||||
|
|
@ -160,82 +161,84 @@ void GreetdConnection::onSocketError(QLocalSocket::LocalSocketError error) {
|
|||
}
|
||||
|
||||
void GreetdConnection::onSocketReady() {
|
||||
qint32 length = 0;
|
||||
while (true) {
|
||||
this->reader.startTransaction();
|
||||
auto length = this->reader.readI32();
|
||||
auto text = this->reader.readBytes(length);
|
||||
if (!this->reader.commitTransaction()) return;
|
||||
|
||||
this->socket.read(reinterpret_cast<char*>(&length), sizeof(qint32));
|
||||
auto json = QJsonDocument::fromJson(text).object();
|
||||
auto type = json.value("type").toString();
|
||||
|
||||
auto text = this->socket.read(length);
|
||||
auto json = QJsonDocument::fromJson(text).object();
|
||||
auto type = json.value("type").toString();
|
||||
qCDebug(logGreetd).noquote() << "Received greetd response:" << text;
|
||||
|
||||
qCDebug(logGreetd).noquote() << "Received greetd response:" << text;
|
||||
if (type == "success") {
|
||||
switch (this->mState) {
|
||||
case GreetdState::Authenticating:
|
||||
qCDebug(logGreetd) << "Authentication complete.";
|
||||
this->mState = GreetdState::ReadyToLaunch;
|
||||
emit this->stateChanged();
|
||||
emit this->readyToLaunch();
|
||||
break;
|
||||
case GreetdState::Launching:
|
||||
qCDebug(logGreetd) << "Target session set successfully.";
|
||||
this->mState = GreetdState::Launched;
|
||||
emit this->stateChanged();
|
||||
emit this->launched();
|
||||
|
||||
if (type == "success") {
|
||||
switch (this->mState) {
|
||||
case GreetdState::Authenticating:
|
||||
qCDebug(logGreetd) << "Authentication complete.";
|
||||
this->mState = GreetdState::ReadyToLaunch;
|
||||
emit this->stateChanged();
|
||||
emit this->readyToLaunch();
|
||||
break;
|
||||
case GreetdState::Launching:
|
||||
qCDebug(logGreetd) << "Target session set successfully.";
|
||||
this->mState = GreetdState::Launched;
|
||||
emit this->stateChanged();
|
||||
emit this->launched();
|
||||
if (this->mExitAfterLaunch) {
|
||||
qCDebug(logGreetd) << "Quitting.";
|
||||
EngineGeneration::currentGeneration()->quit();
|
||||
}
|
||||
|
||||
if (this->mExitAfterLaunch) {
|
||||
qCDebug(logGreetd) << "Quitting.";
|
||||
EngineGeneration::currentGeneration()->quit();
|
||||
break;
|
||||
default: goto unexpected;
|
||||
}
|
||||
} else if (type == "error") {
|
||||
auto errorType = json.value("error_type").toString();
|
||||
auto desc = json.value("description").toString();
|
||||
|
||||
// Special case this error in case a session was already running.
|
||||
// This cancels and restarts the session.
|
||||
if (errorType == "error" && desc == "a session is already being configured") {
|
||||
qCDebug(
|
||||
logGreetd
|
||||
) << "A session was already in progress, cancelling it and starting a new one.";
|
||||
this->setActive(false);
|
||||
this->setActive(true);
|
||||
return;
|
||||
}
|
||||
|
||||
break;
|
||||
default: goto unexpected;
|
||||
}
|
||||
} else if (type == "error") {
|
||||
auto errorType = json.value("error_type").toString();
|
||||
auto desc = json.value("description").toString();
|
||||
if (errorType == "auth_error") {
|
||||
emit this->authFailure(desc);
|
||||
this->setActive(false);
|
||||
} else if (errorType == "error") {
|
||||
qCWarning(logGreetd) << "Greetd error occurred" << desc;
|
||||
emit this->error(desc);
|
||||
} else goto unexpected;
|
||||
|
||||
// Special case this error in case a session was already running.
|
||||
// This cancels and restarts the session.
|
||||
if (errorType == "error" && desc == "a session is already being configured") {
|
||||
qCDebug(
|
||||
logGreetd
|
||||
) << "A session was already in progress, cancelling it and starting a new one.";
|
||||
this->setActive(false);
|
||||
this->setActive(true);
|
||||
return;
|
||||
}
|
||||
// errors terminate the session
|
||||
this->setInactive();
|
||||
} else if (type == "auth_message") {
|
||||
auto message = json.value("auth_message").toString();
|
||||
auto type = json.value("auth_message_type").toString();
|
||||
auto error = type == "error";
|
||||
auto responseRequired = type == "visible" || type == "secret";
|
||||
auto echoResponse = type != "secret";
|
||||
|
||||
if (errorType == "auth_error") {
|
||||
emit this->authFailure(desc);
|
||||
this->setActive(false);
|
||||
} else if (errorType == "error") {
|
||||
qCWarning(logGreetd) << "Greetd error occurred" << desc;
|
||||
emit this->error(desc);
|
||||
this->mResponseRequired = responseRequired;
|
||||
emit this->authMessage(message, error, responseRequired, echoResponse);
|
||||
|
||||
if (!responseRequired) {
|
||||
this->sendRequest({{"type", "post_auth_message_response"}});
|
||||
}
|
||||
} else goto unexpected;
|
||||
|
||||
// errors terminate the session
|
||||
this->setInactive();
|
||||
} else if (type == "auth_message") {
|
||||
auto message = json.value("auth_message").toString();
|
||||
auto type = json.value("auth_message_type").toString();
|
||||
auto error = type == "error";
|
||||
auto responseRequired = type == "visible" || type == "secret";
|
||||
auto echoResponse = type != "secret";
|
||||
|
||||
this->mResponseRequired = responseRequired;
|
||||
emit this->authMessage(message, error, responseRequired, echoResponse);
|
||||
|
||||
if (!responseRequired) {
|
||||
this->sendRequest({{"type", "post_auth_message_response"}});
|
||||
}
|
||||
} else goto unexpected;
|
||||
|
||||
return;
|
||||
unexpected:
|
||||
qCCritical(logGreetd) << "Received unexpected greetd response" << text;
|
||||
this->setActive(false);
|
||||
continue;
|
||||
unexpected:
|
||||
qCCritical(logGreetd) << "Received unexpected greetd response" << text;
|
||||
this->setActive(false);
|
||||
}
|
||||
}
|
||||
|
||||
void GreetdConnection::sendRequest(const QJsonObject& json) {
|
||||
|
|
|
|||
|
|
@ -8,6 +8,8 @@
|
|||
#include <qtmetamacros.h>
|
||||
#include <qtypes.h>
|
||||
|
||||
#include "../../core/streamreader.hpp"
|
||||
|
||||
///! State of the Greetd connection.
|
||||
/// See @@Greetd.state.
|
||||
class GreetdState: public QObject {
|
||||
|
|
@ -74,4 +76,5 @@ private:
|
|||
bool mResponseRequired = false;
|
||||
QString mUser;
|
||||
QLocalSocket socket;
|
||||
StreamReader reader;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ qs_add_module_deps_light(quickshell-hyprland-ipc Quickshell)
|
|||
|
||||
install_qml_module(quickshell-hyprland-ipc)
|
||||
|
||||
target_link_libraries(quickshell-hyprland-ipc PRIVATE Qt::Quick)
|
||||
target_link_libraries(quickshell-hyprland-ipc PRIVATE Qt::Quick quickshell-core)
|
||||
|
||||
if (WAYLAND_TOPLEVEL_MANAGEMENT)
|
||||
target_sources(quickshell-hyprland-ipc PRIVATE
|
||||
|
|
|
|||
|
|
@ -93,6 +93,7 @@ void HyprlandIpc::eventSocketError(QLocalSocket::LocalSocketError error) const {
|
|||
|
||||
void HyprlandIpc::eventSocketStateChanged(QLocalSocket::LocalSocketState state) {
|
||||
if (state == QLocalSocket::ConnectedState) {
|
||||
this->eventReader.setDevice(&this->eventSocket);
|
||||
qCInfo(logHyprlandIpc) << "Hyprland event socket connected.";
|
||||
emit this->connected();
|
||||
} else if (state == QLocalSocket::UnconnectedState && this->valid) {
|
||||
|
|
@ -104,11 +105,11 @@ void HyprlandIpc::eventSocketStateChanged(QLocalSocket::LocalSocketState state)
|
|||
|
||||
void HyprlandIpc::eventSocketReady() {
|
||||
while (true) {
|
||||
auto rawEvent = this->eventSocket.readLine();
|
||||
if (rawEvent.isEmpty()) break;
|
||||
this->eventReader.startTransaction();
|
||||
auto rawEvent = this->eventReader.readUntil('\n');
|
||||
if (!this->eventReader.commitTransaction()) return;
|
||||
|
||||
// remove trailing \n
|
||||
rawEvent.truncate(rawEvent.length() - 1);
|
||||
rawEvent.chop(1); // remove trailing \n
|
||||
auto splitIdx = rawEvent.indexOf(">>");
|
||||
auto event = QByteArrayView(rawEvent.data(), splitIdx);
|
||||
auto data = QByteArrayView(
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
#include "../../../core/model.hpp"
|
||||
#include "../../../core/qmlscreen.hpp"
|
||||
#include "../../../core/streamreader.hpp"
|
||||
#include "../../../wayland/toplevel_management/handle.hpp"
|
||||
|
||||
namespace qs::hyprland::ipc {
|
||||
|
|
@ -139,6 +140,7 @@ private:
|
|||
static bool compareWorkspaces(HyprlandWorkspace* a, HyprlandWorkspace* b);
|
||||
|
||||
QLocalSocket eventSocket;
|
||||
StreamReader eventReader;
|
||||
QString mRequestSocketPath;
|
||||
QString mEventSocketPath;
|
||||
bool valid = false;
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ qs_add_module_deps_light(quickshell-i3-ipc Quickshell)
|
|||
|
||||
install_qml_module(quickshell-i3-ipc)
|
||||
|
||||
target_link_libraries(quickshell-i3-ipc PRIVATE Qt::Quick)
|
||||
target_link_libraries(quickshell-i3-ipc PRIVATE Qt::Quick quickshell-core)
|
||||
|
||||
qs_module_pch(quickshell-i3-ipc SET large)
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@
|
|||
#include <qbytearray.h>
|
||||
#include <qbytearrayview.h>
|
||||
#include <qcontainerfwd.h>
|
||||
#include <qdatastream.h>
|
||||
#include <qjsonarray.h>
|
||||
#include <qjsondocument.h>
|
||||
#include <qjsonobject.h>
|
||||
|
|
@ -15,9 +14,7 @@
|
|||
#include <qlocalsocket.h>
|
||||
#include <qlogging.h>
|
||||
#include <qloggingcategory.h>
|
||||
#include <qnamespace.h>
|
||||
#include <qobject.h>
|
||||
#include <qsysinfo.h>
|
||||
#include <qtenvironmentvariables.h>
|
||||
#include <qtmetamacros.h>
|
||||
#include <qtypes.h>
|
||||
|
|
@ -89,9 +86,6 @@ I3Ipc::I3Ipc(const QList<QString>& events): mEvents(events) {
|
|||
QObject::connect(&this->liveEventSocket, &QLocalSocket::readyRead, this, &I3Ipc::eventSocketReady);
|
||||
QObject::connect(&this->liveEventSocket, &QLocalSocket::connected, this, &I3Ipc::subscribe);
|
||||
// clang-format on
|
||||
|
||||
this->liveEventSocketDs.setDevice(&this->liveEventSocket);
|
||||
this->liveEventSocketDs.setByteOrder(static_cast<QDataStream::ByteOrder>(QSysInfo::ByteOrder));
|
||||
}
|
||||
|
||||
void I3Ipc::makeRequest(const QByteArray& request) {
|
||||
|
|
@ -145,34 +139,21 @@ void I3Ipc::reconnectIPC() {
|
|||
}
|
||||
|
||||
QVector<Event> I3Ipc::parseResponse() {
|
||||
QVector<std::tuple<EventCode, QJsonDocument>> events;
|
||||
const int magicLen = 6;
|
||||
QVector<Event> events;
|
||||
|
||||
while (!this->liveEventSocketDs.atEnd()) {
|
||||
this->liveEventSocketDs.startTransaction();
|
||||
this->liveEventSocketDs.startTransaction();
|
||||
while (true) {
|
||||
this->eventReader.startTransaction();
|
||||
auto magic = this->eventReader.readBytes(6);
|
||||
auto size = this->eventReader.readI32();
|
||||
auto type = this->eventReader.readI32();
|
||||
auto payload = this->eventReader.readBytes(size);
|
||||
if (!this->eventReader.commitTransaction()) return events;
|
||||
|
||||
std::array<char, 6> buffer = {};
|
||||
qint32 size = 0;
|
||||
qint32 type = EventCode::Unknown;
|
||||
|
||||
this->liveEventSocketDs.readRawData(buffer.data(), magicLen);
|
||||
this->liveEventSocketDs >> size;
|
||||
this->liveEventSocketDs >> type;
|
||||
|
||||
if (!this->liveEventSocketDs.commitTransaction()) break;
|
||||
|
||||
QByteArray payload(size, Qt::Uninitialized);
|
||||
|
||||
this->liveEventSocketDs.readRawData(payload.data(), size);
|
||||
|
||||
if (!this->liveEventSocketDs.commitTransaction()) break;
|
||||
|
||||
if (strncmp(buffer.data(), MAGIC.data(), 6) != 0) {
|
||||
if (magic.size() < 6 || strncmp(magic.data(), MAGIC.data(), 6) != 0) {
|
||||
qCWarning(logI3Ipc) << "No magic sequence found in string.";
|
||||
this->reconnectIPC();
|
||||
break;
|
||||
};
|
||||
}
|
||||
|
||||
if (I3IpcEvent::intToEvent(type) == EventCode::Unknown) {
|
||||
qCWarning(logI3Ipc) << "Received unknown event";
|
||||
|
|
@ -204,6 +185,7 @@ void I3Ipc::eventSocketError(QLocalSocket::LocalSocketError error) const {
|
|||
|
||||
void I3Ipc::eventSocketStateChanged(QLocalSocket::LocalSocketState state) {
|
||||
if (state == QLocalSocket::ConnectedState) {
|
||||
this->eventReader.setDevice(&this->liveEventSocket);
|
||||
qCInfo(logI3Ipc) << "I3 event socket connected.";
|
||||
emit this->connected();
|
||||
} else if (state == QLocalSocket::UnconnectedState && this->valid) {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
#pragma once
|
||||
|
||||
#include <qbytearrayview.h>
|
||||
#include <qdatastream.h>
|
||||
#include <qjsondocument.h>
|
||||
#include <qlocalsocket.h>
|
||||
#include <qobject.h>
|
||||
|
|
@ -9,6 +8,8 @@
|
|||
#include <qtmetamacros.h>
|
||||
#include <qtypes.h>
|
||||
|
||||
#include "../../../core/streamreader.hpp"
|
||||
|
||||
namespace qs::i3::ipc {
|
||||
|
||||
constexpr std::string MAGIC = "i3-ipc";
|
||||
|
|
@ -92,7 +93,7 @@ protected:
|
|||
QVector<std::tuple<EventCode, QJsonDocument>> parseResponse();
|
||||
|
||||
QLocalSocket liveEventSocket;
|
||||
QDataStream liveEventSocketDs;
|
||||
StreamReader eventReader;
|
||||
|
||||
QString mSocketPath;
|
||||
bool valid = false;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue