Feature: New handling of command queue

Goal of this change is to  prevent a overflow in the command queue by flooding it with MQTT commands and therefor also prevent  the reading of the inverter data.

To achieve this it is now possible to specify a insert type for each  queue element.
This commit is contained in:
Thomas Basler 2024-12-14 19:02:31 +01:00
parent 0061d5e159
commit 8acae28c59
11 changed files with 169 additions and 13 deletions

View File

@ -57,7 +57,7 @@ void HoymilesClass::loop()
}
}
if (iv != nullptr && iv->getRadio()->isInitialized() && iv->getRadio()->isQueueEmpty()) {
if (iv != nullptr && iv->getRadio()->isInitialized()) {
if (iv->getZeroValuesIfUnreachable() && !iv->isReachable()) {
iv->Statistics()->zeroRuntimeData();
@ -119,6 +119,7 @@ void HoymilesClass::loop()
iv->sendGridOnProFileParaRequest();
}
_messageOutput->printf("Queue size - NRF: %ld CMT: %ld\r\n", _radioNrf->getQueueSize(), _radioCmt->getQueueSize());
_lastPoll = millis();
}
@ -229,6 +230,7 @@ void HoymilesClass::removeInverterBySerial(const uint64_t serial)
for (uint8_t i = 0; i < _inverters.size(); i++) {
if (_inverters[i]->serial() == serial) {
std::lock_guard<std::mutex> lock(_mutex);
_inverters[i]->getRadio()->removeCommands(_inverters[i].get());
_inverters.erase(_inverters.begin() + i);
return;
}

View File

@ -156,6 +156,11 @@ bool HoymilesRadio::isInitialized() const
return _isInitialized;
}
void HoymilesRadio::removeCommands(InverterAbstract* inv)
{
_commandQueue.removeAllEntriesForInverter(inv);
}
bool HoymilesRadio::isIdle() const
{
return !_busyFlag;
@ -165,3 +170,8 @@ bool HoymilesRadio::isQueueEmpty() const
{
return _commandQueue.size() == 0;
}
unsigned long HoymilesRadio::getQueueSize() const
{
return _commandQueue.size();
}

View File

@ -1,11 +1,17 @@
// SPDX-License-Identifier: GPL-2.0-or-later
#pragma once
#include "Arduino.h"
#include "commands/CommandAbstract.h"
#include "queue/CommandQueue.h"
#include "types.h"
#include <ThreadSafeQueue.h>
#include <TimeoutHelper.h>
#include <memory>
#ifdef HOY_DEBUG_QUEUE
#define DEBUG_PRINT(fmt, args...) Serial.printf(fmt, ##args)
#else
#define DEBUG_PRINT(fmt, args...) /* Don't do anything in release builds */
#endif
class HoymilesRadio {
public:
@ -14,11 +20,47 @@ public:
bool isIdle() const;
bool isQueueEmpty() const;
unsigned long getQueueSize() const;
bool isInitialized() const;
void removeCommands(InverterAbstract* inv);
void enqueCommand(std::shared_ptr<CommandAbstract> cmd)
{
DEBUG_PRINT("Queue size before: %ld\r\n", _commandQueue.size());
DEBUG_PRINT("Handling command %s with type %d\r\n", cmd.get()->getCommandName().c_str(), static_cast<uint8_t>(cmd.get()->getQueueInsertType()));
switch (cmd.get()->getQueueInsertType()) {
case QueueInsertType::RemoveOldest:
_commandQueue.removeDuplicatedEntries(cmd);
break;
case QueueInsertType::ReplaceExistent:
// Checks if the queue already contains a command like the new one
// and replaces the existing one with the new one.
// (The new one will not be pushed at the end of the queue)
if (_commandQueue.countSimilarCommands(cmd) > 0) {
DEBUG_PRINT(" ... existing entry will be replaced\r\n");
_commandQueue.replaceEntries(cmd);
return;
}
break;
case QueueInsertType::RemoveNewest:
// Checks if the queue already contains a command like the new one
// and drops the new one. The new one will not be inserted.
if (_commandQueue.countSimilarCommands(cmd) > 0) {
DEBUG_PRINT(" ... new entry will be dropped\r\n");
return;
}
break;
case QueueInsertType::AllowMultiple:
// Dont do anything, just fall through and insert the command.
break;
}
// Push the command into the queue if we reach this position of the code
DEBUG_PRINT(" ... new entry will be appended\r\n");
_commandQueue.push(cmd);
DEBUG_PRINT("Queue size after: %ld\r\n", _commandQueue.size());
}
template <typename T>
@ -38,7 +80,7 @@ protected:
void handleReceivedPackage();
serial_u _dtuSerial;
ThreadSafeQueue<std::shared_ptr<CommandAbstract>> _commandQueue;
CommandQueue _commandQueue;
bool _isInitialized = false;
bool _busyFlag = false;

View File

@ -44,7 +44,15 @@ ActivePowerControlCommand::ActivePowerControlCommand(InverterAbstract* inv, cons
String ActivePowerControlCommand::getCommandName() const
{
return "ActivePowerControl";
char buffer[30];
snprintf(buffer, sizeof(buffer), "ActivePowerControl (%02X)", getType());
return buffer;
}
bool ActivePowerControlCommand::areSameParameter(CommandAbstract* other)
{
return CommandAbstract::areSameParameter(other)
&& this->getType() == static_cast<ActivePowerControlCommand*>(other)->getType();
}
void ActivePowerControlCommand::setActivePowerLimit(const float limit, const PowerLimitControlType type)
@ -89,7 +97,7 @@ float ActivePowerControlCommand::getLimit() const
return l / 10;
}
PowerLimitControlType ActivePowerControlCommand::getType()
PowerLimitControlType ActivePowerControlCommand::getType() const
{
return (PowerLimitControlType)((static_cast<uint16_t>(_payload[14]) << 8) | _payload[15]);
}

View File

@ -15,11 +15,13 @@ public:
explicit ActivePowerControlCommand(InverterAbstract* inv, const uint64_t router_address = 0);
virtual String getCommandName() const;
virtual QueueInsertType getQueueInsertType() const { return QueueInsertType::RemoveOldest; }
virtual bool areSameParameter(CommandAbstract* other);
virtual bool handleResponse(const fragment_t fragment[], const uint8_t max_fragment_id);
virtual void gotTimeout();
void setActivePowerLimit(const float limit, const PowerLimitControlType type = RelativNonPersistent);
float getLimit() const;
PowerLimitControlType getType();
PowerLimitControlType getType() const;
};

View File

@ -138,3 +138,9 @@ uint8_t CommandAbstract::getMaxRetransmitCount() const
{
return MAX_RETRANSMIT_COUNT;
}
bool CommandAbstract::areSameParameter(CommandAbstract* other)
{
return this->getCommandName() == other->getCommandName()
&& this->_targetAddress == other->getTargetAddress();
}

View File

@ -11,6 +11,18 @@
class InverterAbstract;
enum class QueueInsertType {
AllowMultiple,
// Remove from beginning of the queue
RemoveOldest,
// Don't insert command if it already exist
RemoveNewest,
// Replace the existing entry in the queue by the one to be added
ReplaceExistent,
};
class CommandAbstract {
public:
explicit CommandAbstract(InverterAbstract* inv, const uint64_t router_address = 0);
@ -46,6 +58,10 @@ public:
// Sets the amount how often a missing fragment is re-requested if it was not available
virtual uint8_t getMaxRetransmitCount() const;
// Returns whether multiple instances of this command are allowed in the command queue.
virtual QueueInsertType getQueueInsertType() const { return QueueInsertType::RemoveNewest; }
virtual bool areSameParameter(CommandAbstract* other);
protected:
uint8_t _payload[RF_LEN];
uint8_t _payload_size;

View File

@ -8,6 +8,7 @@ public:
explicit PowerControlCommand(InverterAbstract* inv, const uint64_t router_address = 0);
virtual String getCommandName() const;
virtual QueueInsertType getQueueInsertType() const { return QueueInsertType::AllowMultiple; }
virtual bool handleResponse(const fragment_t fragment[], const uint8_t max_fragment_id);
virtual void gotTimeout();

View File

@ -0,0 +1,51 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/*
* Copyright (C) 2024 Thomas Basler and others
*/
#include "CommandQueue.h"
#include "../inverters/InverterAbstract.h"
#include <algorithm>
void CommandQueue::removeAllEntriesForInverter(InverterAbstract* inv)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = std::remove_if(_queue.begin(), _queue.end(),
[&inv](std::shared_ptr<CommandAbstract> v) -> bool { return v.get()->getTargetAddress() == inv->serial(); });
_queue.erase(it, _queue.end());
}
void CommandQueue::removeDuplicatedEntries(std::shared_ptr<CommandAbstract> cmd)
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = std::remove_if(_queue.begin() + 1, _queue.end(),
[&cmd](std::shared_ptr<CommandAbstract> v) -> bool {
return cmd->areSameParameter(v.get())
&& cmd.get()->getQueueInsertType() == QueueInsertType::RemoveOldest;
});
_queue.erase(it, _queue.end());
}
void CommandQueue::replaceEntries(std::shared_ptr<CommandAbstract> cmd)
{
std::lock_guard<std::mutex> lock(_mutex);
std::replace_if(_queue.begin() + 1, _queue.end(),
[&cmd](std::shared_ptr<CommandAbstract> v)-> bool {
return cmd.get()->getQueueInsertType() == QueueInsertType::ReplaceExistent
&& cmd->areSameParameter(v.get());
},
cmd
);
}
uint8_t CommandQueue::countSimilarCommands(std::shared_ptr<CommandAbstract> cmd)
{
std::lock_guard<std::mutex> lock(_mutex);
return std::count_if(_queue.begin(), _queue.end(),
[&cmd](std::shared_ptr<CommandAbstract> v) -> bool {
return cmd->areSameParameter(v.get());
});
}

View File

@ -0,0 +1,17 @@
// SPDX-License-Identifier: GPL-2.0-or-later
#pragma once
#include "../commands/CommandAbstract.h"
#include <ThreadSafeQueue.h>
#include <memory>
class InverterAbstract;
class CommandQueue : public ThreadSafeQueue<std::shared_ptr<CommandAbstract>> {
public:
void removeAllEntriesForInverter(InverterAbstract* inv);
void removeDuplicatedEntries(std::shared_ptr<CommandAbstract> cmd);
void replaceEntries(std::shared_ptr<CommandAbstract> cmd);
uint8_t countSimilarCommands(std::shared_ptr<CommandAbstract> cmd);
};

View File

@ -3,7 +3,7 @@
#include <mutex>
#include <optional>
#include <queue>
#include <deque>
template <typename T>
class ThreadSafeQueue {
@ -33,14 +33,14 @@ public:
return {};
}
T tmp = _queue.front();
_queue.pop();
_queue.pop_front();
return tmp;
}
void push(const T& item)
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(item);
_queue.push_back(item);
}
T front()
@ -49,6 +49,10 @@ public:
return _queue.front();
}
protected:
std::deque<T> _queue;
mutable std::mutex _mutex;
private:
// Moved out of public interface to prevent races between this
// and pop().
@ -56,7 +60,4 @@ private:
{
return _queue.empty();
}
std::queue<T> _queue;
mutable std::mutex _mutex;
};