diff --git a/.github/workflows/cpplint.yml b/.github/workflows/cpplint.yml index 0514bb81..af5e8b79 100644 --- a/.github/workflows/cpplint.yml +++ b/.github/workflows/cpplint.yml @@ -18,4 +18,4 @@ jobs: pip install cpplint - name: Linting run: | - cpplint --repository=. --recursive --filter=-runtime/references,-readability/braces,-whitespace,-legal,-build/include ./src ./include ./lib/Hoymiles ./lib/MqttSubscribeParser ./lib/TimeoutHelper ./lib/ResetReason + cpplint --repository=. --recursive --filter=-build/c++11,-runtime/references,-readability/braces,-whitespace,-legal,-build/include ./src ./include ./lib/Hoymiles ./lib/MqttSubscribeParser ./lib/TimeoutHelper ./lib/ResetReason diff --git a/include/MessageOutput.h b/include/MessageOutput.h index a47b0278..7c56a6f4 100644 --- a/include/MessageOutput.h +++ b/include/MessageOutput.h @@ -4,14 +4,15 @@ #include #include #include +#include #define BUFFER_SIZE 500 class MessageOutputClass : public Print { public: - MessageOutputClass(); void loop(); - size_t write(uint8_t c); + size_t write(uint8_t c) override; + size_t write(const uint8_t *buffer, size_t size) override; void register_ws_output(AsyncWebSocket* output); private: @@ -21,7 +22,7 @@ private: uint32_t _lastSend = 0; bool _forceSend = false; - SemaphoreHandle_t _lock; + std::mutex _msgLock; }; extern MessageOutputClass MessageOutput; \ No newline at end of file diff --git a/include/MqttSettings.h b/include/MqttSettings.h index 6653d75a..af48ea5e 100644 --- a/include/MqttSettings.h +++ b/include/MqttSettings.h @@ -5,6 +5,7 @@ #include #include #include +#include class MqttSettingsClass { public: @@ -38,6 +39,7 @@ private: String willTopic; Ticker mqttReconnectTimer; MqttSubscribeParser _mqttSubscribeParser; + std::mutex _clientLock; bool _verboseLogging = true; }; diff --git a/lib/Hoymiles/src/HoymilesRadio.cpp b/lib/Hoymiles/src/HoymilesRadio.cpp index a8cc0f51..77fa609b 100644 --- a/lib/Hoymiles/src/HoymilesRadio.cpp +++ b/lib/Hoymiles/src/HoymilesRadio.cpp @@ -99,7 +99,7 @@ void HoymilesRadio::handleReceivedPackage() } } else if (!_busyFlag) { // Currently in idle mode --> send packet if one is in the queue - if (!_commandQueue.empty()) { + if (!isQueueEmpty()) { CommandAbstract* cmd = _commandQueue.front().get(); auto inv = Hoymiles.getInverterBySerial(cmd->getTargetAddress()); diff --git a/lib/Hoymiles/src/HoymilesRadio.h b/lib/Hoymiles/src/HoymilesRadio.h index 21477293..2322ab95 100644 --- a/lib/Hoymiles/src/HoymilesRadio.h +++ b/lib/Hoymiles/src/HoymilesRadio.h @@ -5,7 +5,7 @@ #include "commands/CommandAbstract.h" #include "types.h" #include -#include +#include class HoymilesRadio { public: @@ -17,10 +17,15 @@ public: bool isInitialized(); template - T* enqueCommand() + void enqueCommand(std::shared_ptr cmd) { - _commandQueue.push(std::make_shared()); - return static_cast(_commandQueue.back().get()); + _commandQueue.push(cmd); + } + + template + std::shared_ptr prepareCommand() + { + return std::make_shared(); } protected: @@ -34,7 +39,7 @@ protected: void handleReceivedPackage(); serial_u _dtuSerial; - std::queue> _commandQueue; + ThreadSafeQueue> _commandQueue; bool _isInitialized = false; bool _busyFlag = false; diff --git a/lib/Hoymiles/src/inverters/HMS_Abstract.cpp b/lib/Hoymiles/src/inverters/HMS_Abstract.cpp index 326ac839..f67ff11b 100644 --- a/lib/Hoymiles/src/inverters/HMS_Abstract.cpp +++ b/lib/Hoymiles/src/inverters/HMS_Abstract.cpp @@ -18,9 +18,10 @@ bool HMS_Abstract::sendChangeChannelRequest() return false; } - ChannelChangeCommand* cmdChannel = _radio->enqueCommand(); + auto cmdChannel = _radio->prepareCommand(); cmdChannel->setChannel(HoymilesRadio_CMT::getChannelFromFrequency(Hoymiles.getRadioCmt()->getInverterTargetFrequency())); cmdChannel->setTargetAddress(serial()); + _radio->enqueCommand(cmdChannel); return true; }; diff --git a/lib/Hoymiles/src/inverters/HMT_Abstract.cpp b/lib/Hoymiles/src/inverters/HMT_Abstract.cpp index b561ee7f..c345be97 100644 --- a/lib/Hoymiles/src/inverters/HMT_Abstract.cpp +++ b/lib/Hoymiles/src/inverters/HMT_Abstract.cpp @@ -20,9 +20,10 @@ bool HMT_Abstract::sendChangeChannelRequest() return false; } - ChannelChangeCommand* cmdChannel = _radio->enqueCommand(); + auto cmdChannel = _radio->prepareCommand(); cmdChannel->setChannel(HoymilesRadio_CMT::getChannelFromFrequency(Hoymiles.getRadioCmt()->getInverterTargetFrequency())); cmdChannel->setTargetAddress(serial()); + _radio->enqueCommand(cmdChannel); return true; }; \ No newline at end of file diff --git a/lib/Hoymiles/src/inverters/HM_Abstract.cpp b/lib/Hoymiles/src/inverters/HM_Abstract.cpp index 47acfbba..dd3af728 100644 --- a/lib/Hoymiles/src/inverters/HM_Abstract.cpp +++ b/lib/Hoymiles/src/inverters/HM_Abstract.cpp @@ -29,9 +29,10 @@ bool HM_Abstract::sendStatsRequest() time_t now; time(&now); - RealTimeRunDataCommand* cmd = _radio->enqueCommand(); + auto cmd = _radio->prepareCommand(); cmd->setTime(now); cmd->setTargetAddress(serial()); + _radio->enqueCommand(cmd); return true; } @@ -60,9 +61,10 @@ bool HM_Abstract::sendAlarmLogRequest(bool force) time_t now; time(&now); - AlarmDataCommand* cmd = _radio->enqueCommand(); + auto cmd = _radio->prepareCommand(); cmd->setTime(now); cmd->setTargetAddress(serial()); + _radio->enqueCommand(cmd); EventLog()->setLastAlarmRequestSuccess(CMD_PENDING); return true; @@ -82,13 +84,15 @@ bool HM_Abstract::sendDevInfoRequest() time_t now; time(&now); - DevInfoAllCommand* cmdAll = _radio->enqueCommand(); + auto cmdAll = _radio->prepareCommand(); cmdAll->setTime(now); cmdAll->setTargetAddress(serial()); + _radio->enqueCommand(cmdAll); - DevInfoSimpleCommand* cmdSimple = _radio->enqueCommand(); + auto cmdSimple = _radio->prepareCommand(); cmdSimple->setTime(now); cmdSimple->setTargetAddress(serial()); + _radio->enqueCommand(cmdSimple); return true; } @@ -107,9 +111,10 @@ bool HM_Abstract::sendSystemConfigParaRequest() time_t now; time(&now); - SystemConfigParaCommand* cmd = _radio->enqueCommand(); + auto cmd = _radio->prepareCommand(); cmd->setTime(now); cmd->setTargetAddress(serial()); + _radio->enqueCommand(cmd); SystemConfigPara()->setLastLimitRequestSuccess(CMD_PENDING); return true; @@ -132,9 +137,10 @@ bool HM_Abstract::sendActivePowerControlRequest(float limit, PowerLimitControlTy _activePowerControlLimit = limit; _activePowerControlType = type; - ActivePowerControlCommand* cmd = _radio->enqueCommand(); + auto cmd = _radio->prepareCommand(); cmd->setActivePowerLimit(limit, type); cmd->setTargetAddress(serial()); + _radio->enqueCommand(cmd); SystemConfigPara()->setLastLimitCommandSuccess(CMD_PENDING); return true; @@ -161,9 +167,10 @@ bool HM_Abstract::sendPowerControlRequest(bool turnOn) _powerState = 0; } - PowerControlCommand* cmd = _radio->enqueCommand(); + auto cmd = _radio->prepareCommand(); cmd->setPowerOn(turnOn); cmd->setTargetAddress(serial()); + _radio->enqueCommand(cmd); PowerCommand()->setLastPowerCommandSuccess(CMD_PENDING); return true; @@ -177,9 +184,10 @@ bool HM_Abstract::sendRestartControlRequest() _powerState = 2; - PowerControlCommand* cmd = _radio->enqueCommand(); + auto cmd = _radio->prepareCommand(); cmd->setRestart(); cmd->setTargetAddress(serial()); + _radio->enqueCommand(cmd); PowerCommand()->setLastPowerCommandSuccess(CMD_PENDING); return true; diff --git a/lib/ThreadSafeQueue/ThreadSafeQueue.h b/lib/ThreadSafeQueue/ThreadSafeQueue.h new file mode 100644 index 00000000..9a195c60 --- /dev/null +++ b/lib/ThreadSafeQueue/ThreadSafeQueue.h @@ -0,0 +1,62 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +#pragma once + +#include +#include +#include + +template +class ThreadSafeQueue { +public: + ThreadSafeQueue() = default; + ThreadSafeQueue(const ThreadSafeQueue&) = delete; + ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete; + + ThreadSafeQueue(ThreadSafeQueue&& other) + { + std::lock_guard lock(_mutex); + _queue = std::move(other._queue); + } + + virtual ~ThreadSafeQueue() { } + + unsigned long size() const + { + std::lock_guard lock(_mutex); + return _queue.size(); + } + + std::optional pop() + { + std::lock_guard lock(_mutex); + if (_queue.empty()) { + return {}; + } + T tmp = _queue.front(); + _queue.pop(); + return tmp; + } + + void push(const T& item) + { + std::lock_guard lock(_mutex); + _queue.push(item); + } + + T front() + { + std::lock_guard lock(_mutex); + return _queue.front(); + } + +private: + // Moved out of public interface to prevent races between this + // and pop(). + bool empty() const + { + return _queue.empty(); + } + + std::queue _queue; + mutable std::mutex _mutex; +}; diff --git a/src/MessageOutput.cpp b/src/MessageOutput.cpp index dc9c6715..23c644bc 100644 --- a/src/MessageOutput.cpp +++ b/src/MessageOutput.cpp @@ -8,17 +8,6 @@ MessageOutputClass MessageOutput; -#define MSG_LOCK() \ - do { \ - } while (xSemaphoreTake(_lock, portMAX_DELAY) != pdPASS) -#define MSG_UNLOCK() xSemaphoreGive(_lock) - -MessageOutputClass::MessageOutputClass() -{ - _lock = xSemaphoreCreateMutex(); - MSG_UNLOCK(); -} - void MessageOutputClass::register_ws_output(AsyncWebSocket* output) { _ws = output; @@ -27,10 +16,9 @@ void MessageOutputClass::register_ws_output(AsyncWebSocket* output) size_t MessageOutputClass::write(uint8_t c) { if (_buff_pos < BUFFER_SIZE) { - MSG_LOCK(); + std::lock_guard lock(_msgLock); _buffer[_buff_pos] = c; _buff_pos++; - MSG_UNLOCK(); } else { _forceSend = true; } @@ -38,11 +26,23 @@ size_t MessageOutputClass::write(uint8_t c) return Serial.write(c); } +size_t MessageOutputClass::write(const uint8_t* buffer, size_t size) +{ + std::lock_guard lock(_msgLock); + if (_buff_pos + size < BUFFER_SIZE) { + memcpy(&_buffer[_buff_pos], buffer, size); + _buff_pos += size; + } + _forceSend = true; + + return Serial.write(buffer, size); +} + void MessageOutputClass::loop() { // Send data via websocket if either time is over or buffer is full if (_forceSend || (millis() - _lastSend > 1000)) { - MSG_LOCK(); + std::lock_guard lock(_msgLock); if (_ws && _buff_pos > 0) { _ws->textAll(_buffer, _buff_pos); _buff_pos = 0; @@ -50,7 +50,6 @@ void MessageOutputClass::loop() if (_forceSend) { _buff_pos = 0; } - MSG_UNLOCK(); _forceSend = false; } } \ No newline at end of file diff --git a/src/MqttSettings.cpp b/src/MqttSettings.cpp index e432e061..befa930f 100644 --- a/src/MqttSettings.cpp +++ b/src/MqttSettings.cpp @@ -32,21 +32,30 @@ void MqttSettingsClass::onMqttConnect(bool sessionPresent) const CONFIG_T& config = Configuration.get(); publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Online); - for (const auto& cb : _mqttSubscribeParser.get_callbacks()) { - mqttClient->subscribe(cb.topic.c_str(), cb.qos); + std::lock_guard lock(_clientLock); + if (mqttClient != nullptr) { + for (const auto& cb : _mqttSubscribeParser.get_callbacks()) { + mqttClient->subscribe(cb.topic.c_str(), cb.qos); + } } } void MqttSettingsClass::subscribe(const String& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb) { _mqttSubscribeParser.register_callback(topic.c_str(), qos, cb); - mqttClient->subscribe(topic.c_str(), qos); + std::lock_guard lock(_clientLock); + if (mqttClient != nullptr) { + mqttClient->subscribe(topic.c_str(), qos); + } } void MqttSettingsClass::unsubscribe(const String& topic) { _mqttSubscribeParser.unregister_callback(topic.c_str()); - mqttClient->unsubscribe(topic.c_str()); + std::lock_guard lock(_clientLock); + if (mqttClient != nullptr) { + mqttClient->unsubscribe(topic.c_str()); + } } void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) @@ -99,6 +108,12 @@ void MqttSettingsClass::performConnect() using std::placeholders::_4; using std::placeholders::_5; using std::placeholders::_6; + + std::lock_guard lock(_clientLock); + if (mqttClient == nullptr) { + return; + } + MessageOutput.println("Connecting to MQTT..."); const CONFIG_T& config = Configuration.get(); _verboseLogging = config.Mqtt_VerboseLogging; @@ -135,6 +150,10 @@ void MqttSettingsClass::performDisconnect() { const CONFIG_T& config = Configuration.get(); publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Offline); + std::lock_guard lock(_clientLock); + if (mqttClient == nullptr) { + return; + } mqttClient->disconnect(); } @@ -150,6 +169,10 @@ void MqttSettingsClass::performReconnect() bool MqttSettingsClass::getConnected() { + std::lock_guard lock(_clientLock); + if (mqttClient == nullptr) { + return false; + } return mqttClient->connected(); } @@ -160,6 +183,11 @@ String MqttSettingsClass::getPrefix() void MqttSettingsClass::publish(const String& subtopic, const String& payload) { + std::lock_guard lock(_clientLock); + if (mqttClient == nullptr) { + return; + } + String topic = getPrefix(); topic += subtopic; @@ -171,6 +199,10 @@ void MqttSettingsClass::publish(const String& subtopic, const String& payload) void MqttSettingsClass::publishGeneric(const String& topic, const String& payload, bool retain, uint8_t qos) { + std::lock_guard lock(_clientLock); + if (mqttClient == nullptr) { + return; + } mqttClient->publish(topic.c_str(), qos, retain, payload.c_str()); } @@ -190,8 +222,11 @@ void MqttSettingsClass::loop() void MqttSettingsClass::createMqttClientObject() { - if (mqttClient != nullptr) + std::lock_guard lock(_clientLock); + if (mqttClient != nullptr) { delete mqttClient; + mqttClient = nullptr; + } const CONFIG_T& config = Configuration.get(); if (config.Mqtt_Tls) { mqttClient = new espMqttClientSecure(espMqttClientTypes::UseInternalTask::NO); diff --git a/webapp/package.json b/webapp/package.json index de16badd..e5c687b1 100644 --- a/webapp/package.json +++ b/webapp/package.json @@ -26,7 +26,7 @@ "@rushstack/eslint-patch": "^1.3.2", "@tsconfig/node18": "^18.2.0", "@types/bootstrap": "^5.2.6", - "@types/node": "^20.4.5", + "@types/node": "^20.4.6", "@types/sortablejs": "^1.15.1", "@types/spark-md5": "^3.0.2", "@vitejs/plugin-vue": "^4.2.3", diff --git a/webapp/yarn.lock b/webapp/yarn.lock index 97a9f0ff..c7562bc8 100644 --- a/webapp/yarn.lock +++ b/webapp/yarn.lock @@ -382,10 +382,10 @@ resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.11.tgz#d421b6c527a3037f7c84433fd2c4229e016863d3" integrity sha512-wOuvG1SN4Us4rez+tylwwwCV1psiNVOkJeM3AUWUNWg/jDQY2+HE/444y5gc+jBmRqASOm2Oeh5c1axHobwRKQ== -"@types/node@^20.4.5": - version "20.4.5" - resolved "https://registry.yarnpkg.com/@types/node/-/node-20.4.5.tgz#9dc0a5cb1ccce4f7a731660935ab70b9c00a5d69" - integrity sha512-rt40Nk13II9JwQBdeYqmbn2Q6IVTA5uPhvSO+JVqdXw/6/4glI6oR9ezty/A9Hg5u7JH4OmYmuQ+XvjKm0Datg== +"@types/node@^20.4.6": + version "20.4.6" + resolved "https://registry.yarnpkg.com/@types/node/-/node-20.4.6.tgz#b66b66c9bb5d49b199f03399e341c9d6036e9e88" + integrity sha512-q0RkvNgMweWWIvSMDiXhflGUKMdIxBo2M2tYM/0kEGDueQByFzK4KZAgu5YHGFNxziTlppNpTIBcqHQAxlfHdA== "@types/semver@^7.3.12": version "7.3.13"