From 8acae28c5926260f962766d29ab0dbb10fe2d7d6 Mon Sep 17 00:00:00 2001 From: Thomas Basler Date: Sat, 14 Dec 2024 19:02:31 +0100 Subject: [PATCH] Feature: New handling of command queue Goal of this change is to prevent a overflow in the command queue by flooding it with MQTT commands and therefor also prevent the reading of the inverter data. To achieve this it is now possible to specify a insert type for each queue element. --- lib/Hoymiles/src/Hoymiles.cpp | 4 +- lib/Hoymiles/src/HoymilesRadio.cpp | 10 ++++ lib/Hoymiles/src/HoymilesRadio.h | 48 +++++++++++++++-- .../commands/ActivePowerControlCommand.cpp | 12 ++++- .../src/commands/ActivePowerControlCommand.h | 4 +- lib/Hoymiles/src/commands/CommandAbstract.cpp | 6 +++ lib/Hoymiles/src/commands/CommandAbstract.h | 16 ++++++ .../src/commands/PowerControlCommand.h | 1 + lib/Hoymiles/src/queue/CommandQueue.cpp | 51 +++++++++++++++++++ lib/Hoymiles/src/queue/CommandQueue.h | 17 +++++++ lib/ThreadSafeQueue/src/ThreadSafeQueue.h | 13 ++--- 11 files changed, 169 insertions(+), 13 deletions(-) create mode 100644 lib/Hoymiles/src/queue/CommandQueue.cpp create mode 100644 lib/Hoymiles/src/queue/CommandQueue.h diff --git a/lib/Hoymiles/src/Hoymiles.cpp b/lib/Hoymiles/src/Hoymiles.cpp index 7f1a3c1f..fc40b8be 100644 --- a/lib/Hoymiles/src/Hoymiles.cpp +++ b/lib/Hoymiles/src/Hoymiles.cpp @@ -57,7 +57,7 @@ void HoymilesClass::loop() } } - if (iv != nullptr && iv->getRadio()->isInitialized() && iv->getRadio()->isQueueEmpty()) { + if (iv != nullptr && iv->getRadio()->isInitialized()) { if (iv->getZeroValuesIfUnreachable() && !iv->isReachable()) { iv->Statistics()->zeroRuntimeData(); @@ -119,6 +119,7 @@ void HoymilesClass::loop() iv->sendGridOnProFileParaRequest(); } + _messageOutput->printf("Queue size - NRF: %ld CMT: %ld\r\n", _radioNrf->getQueueSize(), _radioCmt->getQueueSize()); _lastPoll = millis(); } @@ -229,6 +230,7 @@ void HoymilesClass::removeInverterBySerial(const uint64_t serial) for (uint8_t i = 0; i < _inverters.size(); i++) { if (_inverters[i]->serial() == serial) { std::lock_guard lock(_mutex); + _inverters[i]->getRadio()->removeCommands(_inverters[i].get()); _inverters.erase(_inverters.begin() + i); return; } diff --git a/lib/Hoymiles/src/HoymilesRadio.cpp b/lib/Hoymiles/src/HoymilesRadio.cpp index 55281ab0..8111fb5d 100644 --- a/lib/Hoymiles/src/HoymilesRadio.cpp +++ b/lib/Hoymiles/src/HoymilesRadio.cpp @@ -156,6 +156,11 @@ bool HoymilesRadio::isInitialized() const return _isInitialized; } +void HoymilesRadio::removeCommands(InverterAbstract* inv) +{ + _commandQueue.removeAllEntriesForInverter(inv); +} + bool HoymilesRadio::isIdle() const { return !_busyFlag; @@ -165,3 +170,8 @@ bool HoymilesRadio::isQueueEmpty() const { return _commandQueue.size() == 0; } + +unsigned long HoymilesRadio::getQueueSize() const +{ + return _commandQueue.size(); +} diff --git a/lib/Hoymiles/src/HoymilesRadio.h b/lib/Hoymiles/src/HoymilesRadio.h index 296b479b..bd37f112 100644 --- a/lib/Hoymiles/src/HoymilesRadio.h +++ b/lib/Hoymiles/src/HoymilesRadio.h @@ -1,11 +1,17 @@ // SPDX-License-Identifier: GPL-2.0-or-later #pragma once +#include "Arduino.h" #include "commands/CommandAbstract.h" +#include "queue/CommandQueue.h" #include "types.h" -#include #include -#include + +#ifdef HOY_DEBUG_QUEUE +#define DEBUG_PRINT(fmt, args...) Serial.printf(fmt, ##args) +#else +#define DEBUG_PRINT(fmt, args...) /* Don't do anything in release builds */ +#endif class HoymilesRadio { public: @@ -14,11 +20,47 @@ public: bool isIdle() const; bool isQueueEmpty() const; + unsigned long getQueueSize() const; bool isInitialized() const; + void removeCommands(InverterAbstract* inv); + void enqueCommand(std::shared_ptr cmd) { + DEBUG_PRINT("Queue size before: %ld\r\n", _commandQueue.size()); + DEBUG_PRINT("Handling command %s with type %d\r\n", cmd.get()->getCommandName().c_str(), static_cast(cmd.get()->getQueueInsertType())); + switch (cmd.get()->getQueueInsertType()) { + case QueueInsertType::RemoveOldest: + _commandQueue.removeDuplicatedEntries(cmd); + break; + case QueueInsertType::ReplaceExistent: + // Checks if the queue already contains a command like the new one + // and replaces the existing one with the new one. + // (The new one will not be pushed at the end of the queue) + if (_commandQueue.countSimilarCommands(cmd) > 0) { + DEBUG_PRINT(" ... existing entry will be replaced\r\n"); + _commandQueue.replaceEntries(cmd); + return; + } + break; + case QueueInsertType::RemoveNewest: + // Checks if the queue already contains a command like the new one + // and drops the new one. The new one will not be inserted. + if (_commandQueue.countSimilarCommands(cmd) > 0) { + DEBUG_PRINT(" ... new entry will be dropped\r\n"); + return; + } + break; + case QueueInsertType::AllowMultiple: + // Dont do anything, just fall through and insert the command. + break; + } + + // Push the command into the queue if we reach this position of the code + DEBUG_PRINT(" ... new entry will be appended\r\n"); _commandQueue.push(cmd); + + DEBUG_PRINT("Queue size after: %ld\r\n", _commandQueue.size()); } template @@ -38,7 +80,7 @@ protected: void handleReceivedPackage(); serial_u _dtuSerial; - ThreadSafeQueue> _commandQueue; + CommandQueue _commandQueue; bool _isInitialized = false; bool _busyFlag = false; diff --git a/lib/Hoymiles/src/commands/ActivePowerControlCommand.cpp b/lib/Hoymiles/src/commands/ActivePowerControlCommand.cpp index 4ce3c6e5..20ed9d61 100644 --- a/lib/Hoymiles/src/commands/ActivePowerControlCommand.cpp +++ b/lib/Hoymiles/src/commands/ActivePowerControlCommand.cpp @@ -44,7 +44,15 @@ ActivePowerControlCommand::ActivePowerControlCommand(InverterAbstract* inv, cons String ActivePowerControlCommand::getCommandName() const { - return "ActivePowerControl"; + char buffer[30]; + snprintf(buffer, sizeof(buffer), "ActivePowerControl (%02X)", getType()); + return buffer; +} + +bool ActivePowerControlCommand::areSameParameter(CommandAbstract* other) +{ + return CommandAbstract::areSameParameter(other) + && this->getType() == static_cast(other)->getType(); } void ActivePowerControlCommand::setActivePowerLimit(const float limit, const PowerLimitControlType type) @@ -89,7 +97,7 @@ float ActivePowerControlCommand::getLimit() const return l / 10; } -PowerLimitControlType ActivePowerControlCommand::getType() +PowerLimitControlType ActivePowerControlCommand::getType() const { return (PowerLimitControlType)((static_cast(_payload[14]) << 8) | _payload[15]); } diff --git a/lib/Hoymiles/src/commands/ActivePowerControlCommand.h b/lib/Hoymiles/src/commands/ActivePowerControlCommand.h index 375b278b..8425d248 100644 --- a/lib/Hoymiles/src/commands/ActivePowerControlCommand.h +++ b/lib/Hoymiles/src/commands/ActivePowerControlCommand.h @@ -15,11 +15,13 @@ public: explicit ActivePowerControlCommand(InverterAbstract* inv, const uint64_t router_address = 0); virtual String getCommandName() const; + virtual QueueInsertType getQueueInsertType() const { return QueueInsertType::RemoveOldest; } + virtual bool areSameParameter(CommandAbstract* other); virtual bool handleResponse(const fragment_t fragment[], const uint8_t max_fragment_id); virtual void gotTimeout(); void setActivePowerLimit(const float limit, const PowerLimitControlType type = RelativNonPersistent); float getLimit() const; - PowerLimitControlType getType(); + PowerLimitControlType getType() const; }; diff --git a/lib/Hoymiles/src/commands/CommandAbstract.cpp b/lib/Hoymiles/src/commands/CommandAbstract.cpp index 16a7857e..6196a96f 100644 --- a/lib/Hoymiles/src/commands/CommandAbstract.cpp +++ b/lib/Hoymiles/src/commands/CommandAbstract.cpp @@ -138,3 +138,9 @@ uint8_t CommandAbstract::getMaxRetransmitCount() const { return MAX_RETRANSMIT_COUNT; } + +bool CommandAbstract::areSameParameter(CommandAbstract* other) +{ + return this->getCommandName() == other->getCommandName() + && this->_targetAddress == other->getTargetAddress(); +} diff --git a/lib/Hoymiles/src/commands/CommandAbstract.h b/lib/Hoymiles/src/commands/CommandAbstract.h index c93cb341..64c0bcd4 100644 --- a/lib/Hoymiles/src/commands/CommandAbstract.h +++ b/lib/Hoymiles/src/commands/CommandAbstract.h @@ -11,6 +11,18 @@ class InverterAbstract; +enum class QueueInsertType { + AllowMultiple, + // Remove from beginning of the queue + RemoveOldest, + + // Don't insert command if it already exist + RemoveNewest, + + // Replace the existing entry in the queue by the one to be added + ReplaceExistent, +}; + class CommandAbstract { public: explicit CommandAbstract(InverterAbstract* inv, const uint64_t router_address = 0); @@ -46,6 +58,10 @@ public: // Sets the amount how often a missing fragment is re-requested if it was not available virtual uint8_t getMaxRetransmitCount() const; + // Returns whether multiple instances of this command are allowed in the command queue. + virtual QueueInsertType getQueueInsertType() const { return QueueInsertType::RemoveNewest; } + virtual bool areSameParameter(CommandAbstract* other); + protected: uint8_t _payload[RF_LEN]; uint8_t _payload_size; diff --git a/lib/Hoymiles/src/commands/PowerControlCommand.h b/lib/Hoymiles/src/commands/PowerControlCommand.h index d40c356d..a86b5678 100644 --- a/lib/Hoymiles/src/commands/PowerControlCommand.h +++ b/lib/Hoymiles/src/commands/PowerControlCommand.h @@ -8,6 +8,7 @@ public: explicit PowerControlCommand(InverterAbstract* inv, const uint64_t router_address = 0); virtual String getCommandName() const; + virtual QueueInsertType getQueueInsertType() const { return QueueInsertType::AllowMultiple; } virtual bool handleResponse(const fragment_t fragment[], const uint8_t max_fragment_id); virtual void gotTimeout(); diff --git a/lib/Hoymiles/src/queue/CommandQueue.cpp b/lib/Hoymiles/src/queue/CommandQueue.cpp new file mode 100644 index 00000000..e4b6069a --- /dev/null +++ b/lib/Hoymiles/src/queue/CommandQueue.cpp @@ -0,0 +1,51 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * Copyright (C) 2024 Thomas Basler and others + */ +#include "CommandQueue.h" +#include "../inverters/InverterAbstract.h" +#include + +void CommandQueue::removeAllEntriesForInverter(InverterAbstract* inv) +{ + std::lock_guard lock(_mutex); + + auto it = std::remove_if(_queue.begin(), _queue.end(), + [&inv](std::shared_ptr v) -> bool { return v.get()->getTargetAddress() == inv->serial(); }); + _queue.erase(it, _queue.end()); +} + +void CommandQueue::removeDuplicatedEntries(std::shared_ptr cmd) +{ + std::lock_guard lock(_mutex); + + auto it = std::remove_if(_queue.begin() + 1, _queue.end(), + [&cmd](std::shared_ptr v) -> bool { + return cmd->areSameParameter(v.get()) + && cmd.get()->getQueueInsertType() == QueueInsertType::RemoveOldest; + }); + _queue.erase(it, _queue.end()); +} + +void CommandQueue::replaceEntries(std::shared_ptr cmd) +{ + std::lock_guard lock(_mutex); + + std::replace_if(_queue.begin() + 1, _queue.end(), + [&cmd](std::shared_ptr v)-> bool { + return cmd.get()->getQueueInsertType() == QueueInsertType::ReplaceExistent + && cmd->areSameParameter(v.get()); + }, + cmd + ); +} + +uint8_t CommandQueue::countSimilarCommands(std::shared_ptr cmd) +{ + std::lock_guard lock(_mutex); + + return std::count_if(_queue.begin(), _queue.end(), + [&cmd](std::shared_ptr v) -> bool { + return cmd->areSameParameter(v.get()); + }); +} diff --git a/lib/Hoymiles/src/queue/CommandQueue.h b/lib/Hoymiles/src/queue/CommandQueue.h new file mode 100644 index 00000000..f27aeaa3 --- /dev/null +++ b/lib/Hoymiles/src/queue/CommandQueue.h @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +#pragma once + +#include "../commands/CommandAbstract.h" +#include +#include + +class InverterAbstract; + +class CommandQueue : public ThreadSafeQueue> { +public: + void removeAllEntriesForInverter(InverterAbstract* inv); + void removeDuplicatedEntries(std::shared_ptr cmd); + void replaceEntries(std::shared_ptr cmd); + + uint8_t countSimilarCommands(std::shared_ptr cmd); +}; diff --git a/lib/ThreadSafeQueue/src/ThreadSafeQueue.h b/lib/ThreadSafeQueue/src/ThreadSafeQueue.h index 9a195c60..2569e630 100644 --- a/lib/ThreadSafeQueue/src/ThreadSafeQueue.h +++ b/lib/ThreadSafeQueue/src/ThreadSafeQueue.h @@ -3,7 +3,7 @@ #include #include -#include +#include template class ThreadSafeQueue { @@ -33,14 +33,14 @@ public: return {}; } T tmp = _queue.front(); - _queue.pop(); + _queue.pop_front(); return tmp; } void push(const T& item) { std::lock_guard lock(_mutex); - _queue.push(item); + _queue.push_back(item); } T front() @@ -49,6 +49,10 @@ public: return _queue.front(); } +protected: + std::deque _queue; + mutable std::mutex _mutex; + private: // Moved out of public interface to prevent races between this // and pop(). @@ -56,7 +60,4 @@ private: { return _queue.empty(); } - - std::queue _queue; - mutable std::mutex _mutex; };