Fix: switch context when processing DPL MQTT requests (#572)

MQTT message callbacks are executed in the MQTT thread context. when
processing topics that control the DPL, we must avoid executing methods
that are not thread-safe. this change binds the methods to be called to
the respective parameters and executes them in the TaskScheduler
context, such that they no longer need to be thread-safe.
This commit is contained in:
Bernhard Kirchen 2023-12-31 14:49:39 +01:00 committed by GitHub
parent ef1aec3b26
commit 8f5c4878c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 12 deletions

View File

@ -4,6 +4,9 @@
#include "Configuration.h" #include "Configuration.h"
#include <espMqttClient.h> #include <espMqttClient.h>
#include <TaskSchedulerDeclarations.h> #include <TaskSchedulerDeclarations.h>
#include <mutex>
#include <deque>
#include <functional>
class MqttHandlePowerLimiterClass { class MqttHandlePowerLimiterClass {
public: public:
@ -18,6 +21,11 @@ private:
uint32_t _lastPublishStats; uint32_t _lastPublishStats;
uint32_t _lastPublish; uint32_t _lastPublish;
// MQTT callbacks to process updates on subscribed topics are executed in
// the MQTT thread's context. we use this queue to switch processing the
// user requests into the main loop's context (TaskScheduler context).
mutable std::mutex _mqttMutex;
std::deque<std::function<void()>> _mqttCallbacks;
}; };
extern MqttHandlePowerLimiterClass MqttHandlePowerLimiter; extern MqttHandlePowerLimiterClass MqttHandlePowerLimiter;

View File

@ -34,11 +34,21 @@ void MqttHandlePowerLimiterClass::init(Scheduler& scheduler)
void MqttHandlePowerLimiterClass::loop() void MqttHandlePowerLimiterClass::loop()
{ {
if (!MqttSettings.getConnected() ) { std::unique_lock<std::mutex> mqttLock(_mqttMutex);
const CONFIG_T& config = Configuration.get();
if (!config.PowerLimiter.Enabled) {
_mqttCallbacks.clear();
return; return;
} }
const CONFIG_T& config = Configuration.get(); for (auto& callback : _mqttCallbacks) { callback(); }
_mqttCallbacks.clear();
mqttLock.unlock();
if (!MqttSettings.getConnected() ) { return; }
if ((millis() - _lastPublish) > (config.Mqtt.PublishInterval * 1000) ) { if ((millis() - _lastPublish) > (config.Mqtt.PublishInterval * 1000) ) {
auto val = static_cast<unsigned>(PowerLimiter.getMode()); auto val = static_cast<unsigned>(PowerLimiter.getMode());
@ -53,13 +63,6 @@ void MqttHandlePowerLimiterClass::loop()
void MqttHandlePowerLimiterClass::onCmdMode(const espMqttClientTypes::MessageProperties& properties, void MqttHandlePowerLimiterClass::onCmdMode(const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
{ {
const CONFIG_T& config = Configuration.get();
// ignore messages if PowerLimiter is disabled
if (!config.PowerLimiter.Enabled) {
return;
}
std::string strValue(reinterpret_cast<const char*>(payload), len); std::string strValue(reinterpret_cast<const char*>(payload), len);
int intValue = -1; int intValue = -1;
try { try {
@ -71,19 +74,24 @@ void MqttHandlePowerLimiterClass::onCmdMode(const espMqttClientTypes::MessagePro
return; return;
} }
std::lock_guard<std::mutex> mqttLock(_mqttMutex);
using Mode = PowerLimiterClass::Mode; using Mode = PowerLimiterClass::Mode;
switch (static_cast<Mode>(intValue)) { switch (static_cast<Mode>(intValue)) {
case Mode::UnconditionalFullSolarPassthrough: case Mode::UnconditionalFullSolarPassthrough:
MessageOutput.println("Power limiter unconditional full solar PT"); MessageOutput.println("Power limiter unconditional full solar PT");
PowerLimiter.setMode(Mode::UnconditionalFullSolarPassthrough); _mqttCallbacks.push_back(std::bind(&PowerLimiterClass::setMode,
&PowerLimiter, Mode::UnconditionalFullSolarPassthrough));
break; break;
case Mode::Disabled: case Mode::Disabled:
MessageOutput.println("Power limiter disabled (override)"); MessageOutput.println("Power limiter disabled (override)");
PowerLimiter.setMode(Mode::Disabled); _mqttCallbacks.push_back(std::bind(&PowerLimiterClass::setMode,
&PowerLimiter, Mode::Disabled));
break; break;
case Mode::Normal: case Mode::Normal:
MessageOutput.println("Power limiter normal operation"); MessageOutput.println("Power limiter normal operation");
PowerLimiter.setMode(Mode::Normal); _mqttCallbacks.push_back(std::bind(&PowerLimiterClass::setMode,
&PowerLimiter, Mode::Normal));
break; break;
default: default:
MessageOutput.printf("PowerLimiter - unknown mode %d\r\n", intValue); MessageOutput.printf("PowerLimiter - unknown mode %d\r\n", intValue);