Merge remote-tracking branch 'tbnobody/OpenDTU/master' into development

This commit is contained in:
helgeerbe 2023-08-04 12:21:16 +02:00
commit 5335ec1bde
13 changed files with 159 additions and 45 deletions

View File

@ -18,4 +18,4 @@ jobs:
pip install cpplint pip install cpplint
- name: Linting - name: Linting
run: | run: |
cpplint --repository=. --recursive --filter=-runtime/references,-readability/braces,-whitespace,-legal,-build/include ./src ./include ./lib/Hoymiles ./lib/MqttSubscribeParser ./lib/TimeoutHelper ./lib/ResetReason cpplint --repository=. --recursive --filter=-build/c++11,-runtime/references,-readability/braces,-whitespace,-legal,-build/include ./src ./include ./lib/Hoymiles ./lib/MqttSubscribeParser ./lib/TimeoutHelper ./lib/ResetReason

View File

@ -4,14 +4,15 @@
#include <AsyncWebSocket.h> #include <AsyncWebSocket.h>
#include <HardwareSerial.h> #include <HardwareSerial.h>
#include <Stream.h> #include <Stream.h>
#include <mutex>
#define BUFFER_SIZE 500 #define BUFFER_SIZE 500
class MessageOutputClass : public Print { class MessageOutputClass : public Print {
public: public:
MessageOutputClass();
void loop(); void loop();
size_t write(uint8_t c); size_t write(uint8_t c) override;
size_t write(const uint8_t *buffer, size_t size) override;
void register_ws_output(AsyncWebSocket* output); void register_ws_output(AsyncWebSocket* output);
private: private:
@ -21,7 +22,7 @@ private:
uint32_t _lastSend = 0; uint32_t _lastSend = 0;
bool _forceSend = false; bool _forceSend = false;
SemaphoreHandle_t _lock; std::mutex _msgLock;
}; };
extern MessageOutputClass MessageOutput; extern MessageOutputClass MessageOutput;

View File

@ -5,6 +5,7 @@
#include <MqttSubscribeParser.h> #include <MqttSubscribeParser.h>
#include <Ticker.h> #include <Ticker.h>
#include <espMqttClient.h> #include <espMqttClient.h>
#include <mutex>
class MqttSettingsClass { class MqttSettingsClass {
public: public:
@ -38,6 +39,7 @@ private:
String willTopic; String willTopic;
Ticker mqttReconnectTimer; Ticker mqttReconnectTimer;
MqttSubscribeParser _mqttSubscribeParser; MqttSubscribeParser _mqttSubscribeParser;
std::mutex _clientLock;
bool _verboseLogging = true; bool _verboseLogging = true;
}; };

View File

@ -99,7 +99,7 @@ void HoymilesRadio::handleReceivedPackage()
} }
} else if (!_busyFlag) { } else if (!_busyFlag) {
// Currently in idle mode --> send packet if one is in the queue // Currently in idle mode --> send packet if one is in the queue
if (!_commandQueue.empty()) { if (!isQueueEmpty()) {
CommandAbstract* cmd = _commandQueue.front().get(); CommandAbstract* cmd = _commandQueue.front().get();
auto inv = Hoymiles.getInverterBySerial(cmd->getTargetAddress()); auto inv = Hoymiles.getInverterBySerial(cmd->getTargetAddress());

View File

@ -5,7 +5,7 @@
#include "commands/CommandAbstract.h" #include "commands/CommandAbstract.h"
#include "types.h" #include "types.h"
#include <memory> #include <memory>
#include <queue> #include <ThreadSafeQueue.h>
class HoymilesRadio { class HoymilesRadio {
public: public:
@ -17,10 +17,15 @@ public:
bool isInitialized(); bool isInitialized();
template <typename T> template <typename T>
T* enqueCommand() void enqueCommand(std::shared_ptr<T> cmd)
{ {
_commandQueue.push(std::make_shared<T>()); _commandQueue.push(cmd);
return static_cast<T*>(_commandQueue.back().get()); }
template <typename T>
std::shared_ptr<T> prepareCommand()
{
return std::make_shared<T>();
} }
protected: protected:
@ -34,7 +39,7 @@ protected:
void handleReceivedPackage(); void handleReceivedPackage();
serial_u _dtuSerial; serial_u _dtuSerial;
std::queue<std::shared_ptr<CommandAbstract>> _commandQueue; ThreadSafeQueue<std::shared_ptr<CommandAbstract>> _commandQueue;
bool _isInitialized = false; bool _isInitialized = false;
bool _busyFlag = false; bool _busyFlag = false;

View File

@ -18,9 +18,10 @@ bool HMS_Abstract::sendChangeChannelRequest()
return false; return false;
} }
ChannelChangeCommand* cmdChannel = _radio->enqueCommand<ChannelChangeCommand>(); auto cmdChannel = _radio->prepareCommand<ChannelChangeCommand>();
cmdChannel->setChannel(HoymilesRadio_CMT::getChannelFromFrequency(Hoymiles.getRadioCmt()->getInverterTargetFrequency())); cmdChannel->setChannel(HoymilesRadio_CMT::getChannelFromFrequency(Hoymiles.getRadioCmt()->getInverterTargetFrequency()));
cmdChannel->setTargetAddress(serial()); cmdChannel->setTargetAddress(serial());
_radio->enqueCommand(cmdChannel);
return true; return true;
}; };

View File

@ -20,9 +20,10 @@ bool HMT_Abstract::sendChangeChannelRequest()
return false; return false;
} }
ChannelChangeCommand* cmdChannel = _radio->enqueCommand<ChannelChangeCommand>(); auto cmdChannel = _radio->prepareCommand<ChannelChangeCommand>();
cmdChannel->setChannel(HoymilesRadio_CMT::getChannelFromFrequency(Hoymiles.getRadioCmt()->getInverterTargetFrequency())); cmdChannel->setChannel(HoymilesRadio_CMT::getChannelFromFrequency(Hoymiles.getRadioCmt()->getInverterTargetFrequency()));
cmdChannel->setTargetAddress(serial()); cmdChannel->setTargetAddress(serial());
_radio->enqueCommand(cmdChannel);
return true; return true;
}; };

View File

@ -29,9 +29,10 @@ bool HM_Abstract::sendStatsRequest()
time_t now; time_t now;
time(&now); time(&now);
RealTimeRunDataCommand* cmd = _radio->enqueCommand<RealTimeRunDataCommand>(); auto cmd = _radio->prepareCommand<RealTimeRunDataCommand>();
cmd->setTime(now); cmd->setTime(now);
cmd->setTargetAddress(serial()); cmd->setTargetAddress(serial());
_radio->enqueCommand(cmd);
return true; return true;
} }
@ -60,9 +61,10 @@ bool HM_Abstract::sendAlarmLogRequest(bool force)
time_t now; time_t now;
time(&now); time(&now);
AlarmDataCommand* cmd = _radio->enqueCommand<AlarmDataCommand>(); auto cmd = _radio->prepareCommand<AlarmDataCommand>();
cmd->setTime(now); cmd->setTime(now);
cmd->setTargetAddress(serial()); cmd->setTargetAddress(serial());
_radio->enqueCommand(cmd);
EventLog()->setLastAlarmRequestSuccess(CMD_PENDING); EventLog()->setLastAlarmRequestSuccess(CMD_PENDING);
return true; return true;
@ -82,13 +84,15 @@ bool HM_Abstract::sendDevInfoRequest()
time_t now; time_t now;
time(&now); time(&now);
DevInfoAllCommand* cmdAll = _radio->enqueCommand<DevInfoAllCommand>(); auto cmdAll = _radio->prepareCommand<DevInfoAllCommand>();
cmdAll->setTime(now); cmdAll->setTime(now);
cmdAll->setTargetAddress(serial()); cmdAll->setTargetAddress(serial());
_radio->enqueCommand(cmdAll);
DevInfoSimpleCommand* cmdSimple = _radio->enqueCommand<DevInfoSimpleCommand>(); auto cmdSimple = _radio->prepareCommand<DevInfoSimpleCommand>();
cmdSimple->setTime(now); cmdSimple->setTime(now);
cmdSimple->setTargetAddress(serial()); cmdSimple->setTargetAddress(serial());
_radio->enqueCommand(cmdSimple);
return true; return true;
} }
@ -107,9 +111,10 @@ bool HM_Abstract::sendSystemConfigParaRequest()
time_t now; time_t now;
time(&now); time(&now);
SystemConfigParaCommand* cmd = _radio->enqueCommand<SystemConfigParaCommand>(); auto cmd = _radio->prepareCommand<SystemConfigParaCommand>();
cmd->setTime(now); cmd->setTime(now);
cmd->setTargetAddress(serial()); cmd->setTargetAddress(serial());
_radio->enqueCommand(cmd);
SystemConfigPara()->setLastLimitRequestSuccess(CMD_PENDING); SystemConfigPara()->setLastLimitRequestSuccess(CMD_PENDING);
return true; return true;
@ -132,9 +137,10 @@ bool HM_Abstract::sendActivePowerControlRequest(float limit, PowerLimitControlTy
_activePowerControlLimit = limit; _activePowerControlLimit = limit;
_activePowerControlType = type; _activePowerControlType = type;
ActivePowerControlCommand* cmd = _radio->enqueCommand<ActivePowerControlCommand>(); auto cmd = _radio->prepareCommand<ActivePowerControlCommand>();
cmd->setActivePowerLimit(limit, type); cmd->setActivePowerLimit(limit, type);
cmd->setTargetAddress(serial()); cmd->setTargetAddress(serial());
_radio->enqueCommand(cmd);
SystemConfigPara()->setLastLimitCommandSuccess(CMD_PENDING); SystemConfigPara()->setLastLimitCommandSuccess(CMD_PENDING);
return true; return true;
@ -161,9 +167,10 @@ bool HM_Abstract::sendPowerControlRequest(bool turnOn)
_powerState = 0; _powerState = 0;
} }
PowerControlCommand* cmd = _radio->enqueCommand<PowerControlCommand>(); auto cmd = _radio->prepareCommand<PowerControlCommand>();
cmd->setPowerOn(turnOn); cmd->setPowerOn(turnOn);
cmd->setTargetAddress(serial()); cmd->setTargetAddress(serial());
_radio->enqueCommand(cmd);
PowerCommand()->setLastPowerCommandSuccess(CMD_PENDING); PowerCommand()->setLastPowerCommandSuccess(CMD_PENDING);
return true; return true;
@ -177,9 +184,10 @@ bool HM_Abstract::sendRestartControlRequest()
_powerState = 2; _powerState = 2;
PowerControlCommand* cmd = _radio->enqueCommand<PowerControlCommand>(); auto cmd = _radio->prepareCommand<PowerControlCommand>();
cmd->setRestart(); cmd->setRestart();
cmd->setTargetAddress(serial()); cmd->setTargetAddress(serial());
_radio->enqueCommand(cmd);
PowerCommand()->setLastPowerCommandSuccess(CMD_PENDING); PowerCommand()->setLastPowerCommandSuccess(CMD_PENDING);
return true; return true;

View File

@ -0,0 +1,62 @@
// SPDX-License-Identifier: GPL-2.0-or-later
#pragma once
#include <mutex>
#include <optional>
#include <queue>
template <typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueue<T>&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue<T>&) = delete;
ThreadSafeQueue(ThreadSafeQueue<T>&& other)
{
std::lock_guard<std::mutex> lock(_mutex);
_queue = std::move(other._queue);
}
virtual ~ThreadSafeQueue() { }
unsigned long size() const
{
std::lock_guard<std::mutex> lock(_mutex);
return _queue.size();
}
std::optional<T> pop()
{
std::lock_guard<std::mutex> lock(_mutex);
if (_queue.empty()) {
return {};
}
T tmp = _queue.front();
_queue.pop();
return tmp;
}
void push(const T& item)
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(item);
}
T front()
{
std::lock_guard<std::mutex> lock(_mutex);
return _queue.front();
}
private:
// Moved out of public interface to prevent races between this
// and pop().
bool empty() const
{
return _queue.empty();
}
std::queue<T> _queue;
mutable std::mutex _mutex;
};

View File

@ -8,17 +8,6 @@
MessageOutputClass MessageOutput; MessageOutputClass MessageOutput;
#define MSG_LOCK() \
do { \
} while (xSemaphoreTake(_lock, portMAX_DELAY) != pdPASS)
#define MSG_UNLOCK() xSemaphoreGive(_lock)
MessageOutputClass::MessageOutputClass()
{
_lock = xSemaphoreCreateMutex();
MSG_UNLOCK();
}
void MessageOutputClass::register_ws_output(AsyncWebSocket* output) void MessageOutputClass::register_ws_output(AsyncWebSocket* output)
{ {
_ws = output; _ws = output;
@ -27,10 +16,9 @@ void MessageOutputClass::register_ws_output(AsyncWebSocket* output)
size_t MessageOutputClass::write(uint8_t c) size_t MessageOutputClass::write(uint8_t c)
{ {
if (_buff_pos < BUFFER_SIZE) { if (_buff_pos < BUFFER_SIZE) {
MSG_LOCK(); std::lock_guard<std::mutex> lock(_msgLock);
_buffer[_buff_pos] = c; _buffer[_buff_pos] = c;
_buff_pos++; _buff_pos++;
MSG_UNLOCK();
} else { } else {
_forceSend = true; _forceSend = true;
} }
@ -38,11 +26,23 @@ size_t MessageOutputClass::write(uint8_t c)
return Serial.write(c); return Serial.write(c);
} }
size_t MessageOutputClass::write(const uint8_t* buffer, size_t size)
{
std::lock_guard<std::mutex> lock(_msgLock);
if (_buff_pos + size < BUFFER_SIZE) {
memcpy(&_buffer[_buff_pos], buffer, size);
_buff_pos += size;
}
_forceSend = true;
return Serial.write(buffer, size);
}
void MessageOutputClass::loop() void MessageOutputClass::loop()
{ {
// Send data via websocket if either time is over or buffer is full // Send data via websocket if either time is over or buffer is full
if (_forceSend || (millis() - _lastSend > 1000)) { if (_forceSend || (millis() - _lastSend > 1000)) {
MSG_LOCK(); std::lock_guard<std::mutex> lock(_msgLock);
if (_ws && _buff_pos > 0) { if (_ws && _buff_pos > 0) {
_ws->textAll(_buffer, _buff_pos); _ws->textAll(_buffer, _buff_pos);
_buff_pos = 0; _buff_pos = 0;
@ -50,7 +50,6 @@ void MessageOutputClass::loop()
if (_forceSend) { if (_forceSend) {
_buff_pos = 0; _buff_pos = 0;
} }
MSG_UNLOCK();
_forceSend = false; _forceSend = false;
} }
} }

View File

@ -32,22 +32,31 @@ void MqttSettingsClass::onMqttConnect(bool sessionPresent)
const CONFIG_T& config = Configuration.get(); const CONFIG_T& config = Configuration.get();
publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Online); publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Online);
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient != nullptr) {
for (const auto& cb : _mqttSubscribeParser.get_callbacks()) { for (const auto& cb : _mqttSubscribeParser.get_callbacks()) {
mqttClient->subscribe(cb.topic.c_str(), cb.qos); mqttClient->subscribe(cb.topic.c_str(), cb.qos);
} }
} }
}
void MqttSettingsClass::subscribe(const String& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb) void MqttSettingsClass::subscribe(const String& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb)
{ {
_mqttSubscribeParser.register_callback(topic.c_str(), qos, cb); _mqttSubscribeParser.register_callback(topic.c_str(), qos, cb);
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient != nullptr) {
mqttClient->subscribe(topic.c_str(), qos); mqttClient->subscribe(topic.c_str(), qos);
} }
}
void MqttSettingsClass::unsubscribe(const String& topic) void MqttSettingsClass::unsubscribe(const String& topic)
{ {
_mqttSubscribeParser.unregister_callback(topic.c_str()); _mqttSubscribeParser.unregister_callback(topic.c_str());
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient != nullptr) {
mqttClient->unsubscribe(topic.c_str()); mqttClient->unsubscribe(topic.c_str());
} }
}
void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason reason)
{ {
@ -99,6 +108,12 @@ void MqttSettingsClass::performConnect()
using std::placeholders::_4; using std::placeholders::_4;
using std::placeholders::_5; using std::placeholders::_5;
using std::placeholders::_6; using std::placeholders::_6;
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient == nullptr) {
return;
}
MessageOutput.println("Connecting to MQTT..."); MessageOutput.println("Connecting to MQTT...");
const CONFIG_T& config = Configuration.get(); const CONFIG_T& config = Configuration.get();
_verboseLogging = config.Mqtt_VerboseLogging; _verboseLogging = config.Mqtt_VerboseLogging;
@ -135,6 +150,10 @@ void MqttSettingsClass::performDisconnect()
{ {
const CONFIG_T& config = Configuration.get(); const CONFIG_T& config = Configuration.get();
publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Offline); publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Offline);
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient == nullptr) {
return;
}
mqttClient->disconnect(); mqttClient->disconnect();
} }
@ -150,6 +169,10 @@ void MqttSettingsClass::performReconnect()
bool MqttSettingsClass::getConnected() bool MqttSettingsClass::getConnected()
{ {
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient == nullptr) {
return false;
}
return mqttClient->connected(); return mqttClient->connected();
} }
@ -160,6 +183,11 @@ String MqttSettingsClass::getPrefix()
void MqttSettingsClass::publish(const String& subtopic, const String& payload) void MqttSettingsClass::publish(const String& subtopic, const String& payload)
{ {
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient == nullptr) {
return;
}
String topic = getPrefix(); String topic = getPrefix();
topic += subtopic; topic += subtopic;
@ -171,6 +199,10 @@ void MqttSettingsClass::publish(const String& subtopic, const String& payload)
void MqttSettingsClass::publishGeneric(const String& topic, const String& payload, bool retain, uint8_t qos) void MqttSettingsClass::publishGeneric(const String& topic, const String& payload, bool retain, uint8_t qos)
{ {
std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient == nullptr) {
return;
}
mqttClient->publish(topic.c_str(), qos, retain, payload.c_str()); mqttClient->publish(topic.c_str(), qos, retain, payload.c_str());
} }
@ -190,8 +222,11 @@ void MqttSettingsClass::loop()
void MqttSettingsClass::createMqttClientObject() void MqttSettingsClass::createMqttClientObject()
{ {
if (mqttClient != nullptr) std::lock_guard<std::mutex> lock(_clientLock);
if (mqttClient != nullptr) {
delete mqttClient; delete mqttClient;
mqttClient = nullptr;
}
const CONFIG_T& config = Configuration.get(); const CONFIG_T& config = Configuration.get();
if (config.Mqtt_Tls) { if (config.Mqtt_Tls) {
mqttClient = new espMqttClientSecure(espMqttClientTypes::UseInternalTask::NO); mqttClient = new espMqttClientSecure(espMqttClientTypes::UseInternalTask::NO);

View File

@ -26,7 +26,7 @@
"@rushstack/eslint-patch": "^1.3.2", "@rushstack/eslint-patch": "^1.3.2",
"@tsconfig/node18": "^18.2.0", "@tsconfig/node18": "^18.2.0",
"@types/bootstrap": "^5.2.6", "@types/bootstrap": "^5.2.6",
"@types/node": "^20.4.5", "@types/node": "^20.4.6",
"@types/sortablejs": "^1.15.1", "@types/sortablejs": "^1.15.1",
"@types/spark-md5": "^3.0.2", "@types/spark-md5": "^3.0.2",
"@vitejs/plugin-vue": "^4.2.3", "@vitejs/plugin-vue": "^4.2.3",

View File

@ -382,10 +382,10 @@
resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.11.tgz#d421b6c527a3037f7c84433fd2c4229e016863d3" resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.11.tgz#d421b6c527a3037f7c84433fd2c4229e016863d3"
integrity sha512-wOuvG1SN4Us4rez+tylwwwCV1psiNVOkJeM3AUWUNWg/jDQY2+HE/444y5gc+jBmRqASOm2Oeh5c1axHobwRKQ== integrity sha512-wOuvG1SN4Us4rez+tylwwwCV1psiNVOkJeM3AUWUNWg/jDQY2+HE/444y5gc+jBmRqASOm2Oeh5c1axHobwRKQ==
"@types/node@^20.4.5": "@types/node@^20.4.6":
version "20.4.5" version "20.4.6"
resolved "https://registry.yarnpkg.com/@types/node/-/node-20.4.5.tgz#9dc0a5cb1ccce4f7a731660935ab70b9c00a5d69" resolved "https://registry.yarnpkg.com/@types/node/-/node-20.4.6.tgz#b66b66c9bb5d49b199f03399e341c9d6036e9e88"
integrity sha512-rt40Nk13II9JwQBdeYqmbn2Q6IVTA5uPhvSO+JVqdXw/6/4glI6oR9ezty/A9Hg5u7JH4OmYmuQ+XvjKm0Datg== integrity sha512-q0RkvNgMweWWIvSMDiXhflGUKMdIxBo2M2tYM/0kEGDueQByFzK4KZAgu5YHGFNxziTlppNpTIBcqHQAxlfHdA==
"@types/semver@^7.3.12": "@types/semver@^7.3.12":
version "7.3.13" version "7.3.13"