fix: update mqtt subscriptions when topic changed (#1156)
* update mqtt subscriptions when topic was changed * DPL/Huawei: manage MQTT subscriptions in map --------- Co-authored-by: Bernhard Kirchen <schlimmchen@posteo.net>
This commit is contained in:
parent
e7d454ff0b
commit
65407dbdd6
@ -8,11 +8,18 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <frozen/map.h>
|
||||||
|
#include <frozen/string.h>
|
||||||
|
|
||||||
class MqttHandleHuaweiClass {
|
class MqttHandleHuaweiClass {
|
||||||
public:
|
public:
|
||||||
void init(Scheduler& scheduler);
|
void init(Scheduler& scheduler);
|
||||||
|
|
||||||
|
void forceUpdate();
|
||||||
|
|
||||||
|
void subscribeTopics();
|
||||||
|
void unsubscribeTopics();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void loop();
|
void loop();
|
||||||
|
|
||||||
@ -24,6 +31,15 @@ private:
|
|||||||
Mode
|
Mode
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static constexpr frozen::string _cmdtopic = "huawei/cmd/";
|
||||||
|
static constexpr frozen::map<frozen::string, Topic, 5> _subscriptions = {
|
||||||
|
{ "limit_online_voltage", Topic::LimitOnlineVoltage },
|
||||||
|
{ "limit_online_current", Topic::LimitOnlineCurrent },
|
||||||
|
{ "limit_offline_voltage", Topic::LimitOfflineVoltage },
|
||||||
|
{ "limit_offline_current", Topic::LimitOfflineCurrent },
|
||||||
|
{ "mode", Topic::Mode },
|
||||||
|
};
|
||||||
|
|
||||||
void onMqttMessage(Topic t,
|
void onMqttMessage(Topic t,
|
||||||
const espMqttClientTypes::MessageProperties& properties,
|
const espMqttClientTypes::MessageProperties& properties,
|
||||||
const char* topic, const uint8_t* payload, size_t len,
|
const char* topic, const uint8_t* payload, size_t len,
|
||||||
|
|||||||
@ -7,11 +7,18 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <frozen/map.h>
|
||||||
|
#include <frozen/string.h>
|
||||||
|
|
||||||
class MqttHandlePowerLimiterClass {
|
class MqttHandlePowerLimiterClass {
|
||||||
public:
|
public:
|
||||||
void init(Scheduler& scheduler);
|
void init(Scheduler& scheduler);
|
||||||
|
|
||||||
|
void forceUpdate();
|
||||||
|
|
||||||
|
void subscribeTopics();
|
||||||
|
void unsubscribeTopics();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void loop();
|
void loop();
|
||||||
|
|
||||||
@ -28,6 +35,20 @@ private:
|
|||||||
TargetPowerConsumption
|
TargetPowerConsumption
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static constexpr frozen::string _cmdtopic = "powerlimiter/cmd/";
|
||||||
|
static constexpr frozen::map<frozen::string, MqttPowerLimiterCommand, 10> _subscriptions = {
|
||||||
|
{ "threshold/soc/start", MqttPowerLimiterCommand::BatterySoCStartThreshold },
|
||||||
|
{ "threshold/soc/stop", MqttPowerLimiterCommand::BatterySoCStopThreshold },
|
||||||
|
{ "threshold/soc/full_solar_passthrough", MqttPowerLimiterCommand::FullSolarPassthroughSoC },
|
||||||
|
{ "threshold/voltage/start", MqttPowerLimiterCommand::VoltageStartThreshold },
|
||||||
|
{ "threshold/voltage/stop", MqttPowerLimiterCommand::VoltageStopThreshold },
|
||||||
|
{ "threshold/voltage/full_solar_passthrough_start", MqttPowerLimiterCommand::FullSolarPassThroughStartVoltage },
|
||||||
|
{ "threshold/voltage/full_solar_passthrough_stop", MqttPowerLimiterCommand::FullSolarPassThroughStopVoltage },
|
||||||
|
{ "mode", MqttPowerLimiterCommand::Mode },
|
||||||
|
{ "upper_power_limit", MqttPowerLimiterCommand::UpperPowerLimit },
|
||||||
|
{ "target_power_consumption", MqttPowerLimiterCommand::TargetPowerConsumption },
|
||||||
|
};
|
||||||
|
|
||||||
void onMqttCmd(MqttPowerLimiterCommand command, const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
|
void onMqttCmd(MqttPowerLimiterCommand command, const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
|
||||||
|
|
||||||
Task _loopTask;
|
Task _loopTask;
|
||||||
|
|||||||
@ -6,7 +6,6 @@
|
|||||||
#include "MessageOutput.h"
|
#include "MessageOutput.h"
|
||||||
#include "MqttSettings.h"
|
#include "MqttSettings.h"
|
||||||
#include "Huawei_can.h"
|
#include "Huawei_can.h"
|
||||||
// #include "Failsafe.h"
|
|
||||||
#include "WebApi_Huawei.h"
|
#include "WebApi_Huawei.h"
|
||||||
#include <ctime>
|
#include <ctime>
|
||||||
|
|
||||||
@ -19,10 +18,22 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)
|
|||||||
_loopTask.setIterations(TASK_FOREVER);
|
_loopTask.setIterations(TASK_FOREVER);
|
||||||
_loopTask.enable();
|
_loopTask.enable();
|
||||||
|
|
||||||
|
subscribeTopics();
|
||||||
|
|
||||||
|
_lastPublish = millis();
|
||||||
|
}
|
||||||
|
|
||||||
|
void MqttHandleHuaweiClass::forceUpdate()
|
||||||
|
{
|
||||||
|
_lastPublish = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void MqttHandleHuaweiClass::subscribeTopics()
|
||||||
|
{
|
||||||
String const& prefix = MqttSettings.getPrefix();
|
String const& prefix = MqttSettings.getPrefix();
|
||||||
|
|
||||||
auto subscribe = [&prefix, this](char const* subTopic, Topic t) {
|
auto subscribe = [&prefix, this](char const* subTopic, Topic t) {
|
||||||
String fullTopic(prefix + "huawei/cmd/" + subTopic);
|
String fullTopic(prefix + _cmdtopic.data() + subTopic);
|
||||||
MqttSettings.subscribe(fullTopic.c_str(), 0,
|
MqttSettings.subscribe(fullTopic.c_str(), 0,
|
||||||
std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t,
|
std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t,
|
||||||
std::placeholders::_1, std::placeholders::_2,
|
std::placeholders::_1, std::placeholders::_2,
|
||||||
@ -30,16 +41,18 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)
|
|||||||
std::placeholders::_5, std::placeholders::_6));
|
std::placeholders::_5, std::placeholders::_6));
|
||||||
};
|
};
|
||||||
|
|
||||||
subscribe("limit_online_voltage", Topic::LimitOnlineVoltage);
|
for (auto const& s : _subscriptions) {
|
||||||
subscribe("limit_online_current", Topic::LimitOnlineCurrent);
|
subscribe(s.first.data(), s.second);
|
||||||
subscribe("limit_offline_voltage", Topic::LimitOfflineVoltage);
|
}
|
||||||
subscribe("limit_offline_current", Topic::LimitOfflineCurrent);
|
|
||||||
subscribe("mode", Topic::Mode);
|
|
||||||
|
|
||||||
_lastPublish = millis();
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MqttHandleHuaweiClass::unsubscribeTopics()
|
||||||
|
{
|
||||||
|
String const prefix = MqttSettings.getPrefix() + _cmdtopic.data();
|
||||||
|
for (auto const& s : _subscriptions) {
|
||||||
|
MqttSettings.unsubscribe(prefix + s.first.data());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void MqttHandleHuaweiClass::loop()
|
void MqttHandleHuaweiClass::loop()
|
||||||
{
|
{
|
||||||
|
|||||||
@ -25,10 +25,22 @@ void MqttHandlePowerLimiterClass::init(Scheduler& scheduler)
|
|||||||
using std::placeholders::_5;
|
using std::placeholders::_5;
|
||||||
using std::placeholders::_6;
|
using std::placeholders::_6;
|
||||||
|
|
||||||
|
subscribeTopics();
|
||||||
|
|
||||||
|
_lastPublish = millis();
|
||||||
|
}
|
||||||
|
|
||||||
|
void MqttHandlePowerLimiterClass::forceUpdate()
|
||||||
|
{
|
||||||
|
_lastPublish = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void MqttHandlePowerLimiterClass::subscribeTopics()
|
||||||
|
{
|
||||||
String const& prefix = MqttSettings.getPrefix();
|
String const& prefix = MqttSettings.getPrefix();
|
||||||
|
|
||||||
auto subscribe = [&prefix, this](char const* subTopic, MqttPowerLimiterCommand command) {
|
auto subscribe = [&prefix, this](char const* subTopic, MqttPowerLimiterCommand command) {
|
||||||
String fullTopic(prefix + "powerlimiter/cmd/" + subTopic);
|
String fullTopic(prefix + _cmdtopic.data() + subTopic);
|
||||||
MqttSettings.subscribe(fullTopic.c_str(), 0,
|
MqttSettings.subscribe(fullTopic.c_str(), 0,
|
||||||
std::bind(&MqttHandlePowerLimiterClass::onMqttCmd, this, command,
|
std::bind(&MqttHandlePowerLimiterClass::onMqttCmd, this, command,
|
||||||
std::placeholders::_1, std::placeholders::_2,
|
std::placeholders::_1, std::placeholders::_2,
|
||||||
@ -36,20 +48,18 @@ void MqttHandlePowerLimiterClass::init(Scheduler& scheduler)
|
|||||||
std::placeholders::_5, std::placeholders::_6));
|
std::placeholders::_5, std::placeholders::_6));
|
||||||
};
|
};
|
||||||
|
|
||||||
subscribe("threshold/soc/start", MqttPowerLimiterCommand::BatterySoCStartThreshold);
|
for (auto const& s : _subscriptions) {
|
||||||
subscribe("threshold/soc/stop", MqttPowerLimiterCommand::BatterySoCStopThreshold);
|
subscribe(s.first.data(), s.second);
|
||||||
subscribe("threshold/soc/full_solar_passthrough", MqttPowerLimiterCommand::FullSolarPassthroughSoC);
|
}
|
||||||
subscribe("threshold/voltage/start", MqttPowerLimiterCommand::VoltageStartThreshold);
|
|
||||||
subscribe("threshold/voltage/stop", MqttPowerLimiterCommand::VoltageStopThreshold);
|
|
||||||
subscribe("threshold/voltage/full_solar_passthrough_start", MqttPowerLimiterCommand::FullSolarPassThroughStartVoltage);
|
|
||||||
subscribe("threshold/voltage/full_solar_passthrough_stop", MqttPowerLimiterCommand::FullSolarPassThroughStopVoltage);
|
|
||||||
subscribe("mode", MqttPowerLimiterCommand::Mode);
|
|
||||||
subscribe("upper_power_limit", MqttPowerLimiterCommand::UpperPowerLimit);
|
|
||||||
subscribe("target_power_consumption", MqttPowerLimiterCommand::TargetPowerConsumption);
|
|
||||||
|
|
||||||
_lastPublish = millis();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MqttHandlePowerLimiterClass::unsubscribeTopics()
|
||||||
|
{
|
||||||
|
String const prefix = MqttSettings.getPrefix() + _cmdtopic.data();
|
||||||
|
for (auto const& s : _subscriptions) {
|
||||||
|
MqttSettings.unsubscribe(prefix + s.first.data());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void MqttHandlePowerLimiterClass::loop()
|
void MqttHandlePowerLimiterClass::loop()
|
||||||
{
|
{
|
||||||
|
|||||||
@ -4,8 +4,12 @@
|
|||||||
*/
|
*/
|
||||||
#include "WebApi_mqtt.h"
|
#include "WebApi_mqtt.h"
|
||||||
#include "Configuration.h"
|
#include "Configuration.h"
|
||||||
|
#include "MqttHandleBatteryHass.h"
|
||||||
#include "MqttHandleHass.h"
|
#include "MqttHandleHass.h"
|
||||||
|
#include "MqttHandlePowerLimiterHass.h"
|
||||||
#include "MqttHandleInverter.h"
|
#include "MqttHandleInverter.h"
|
||||||
|
#include "MqttHandleHuawei.h"
|
||||||
|
#include "MqttHandlePowerLimiter.h"
|
||||||
#include "MqttHandleVedirectHass.h"
|
#include "MqttHandleVedirectHass.h"
|
||||||
#include "MqttHandleVedirect.h"
|
#include "MqttHandleVedirect.h"
|
||||||
#include "MqttSettings.h"
|
#include "MqttSettings.h"
|
||||||
@ -307,8 +311,13 @@ void WebApiMqttClass::onMqttAdminPost(AsyncWebServerRequest* request)
|
|||||||
// Check if base topic was changed
|
// Check if base topic was changed
|
||||||
if (strcmp(config.Mqtt.Topic, root["mqtt_topic"].as<String>().c_str())) {
|
if (strcmp(config.Mqtt.Topic, root["mqtt_topic"].as<String>().c_str())) {
|
||||||
MqttHandleInverter.unsubscribeTopics();
|
MqttHandleInverter.unsubscribeTopics();
|
||||||
|
MqttHandleHuawei.unsubscribeTopics();
|
||||||
|
MqttHandlePowerLimiter.unsubscribeTopics();
|
||||||
|
|
||||||
strlcpy(config.Mqtt.Topic, root["mqtt_topic"].as<String>().c_str(), sizeof(config.Mqtt.Topic));
|
strlcpy(config.Mqtt.Topic, root["mqtt_topic"].as<String>().c_str(), sizeof(config.Mqtt.Topic));
|
||||||
MqttHandleInverter.subscribeTopics();
|
MqttHandleInverter.subscribeTopics();
|
||||||
|
MqttHandleHuawei.subscribeTopics();
|
||||||
|
MqttHandlePowerLimiter.subscribeTopics();
|
||||||
}
|
}
|
||||||
|
|
||||||
WebApi.writeConfig(retMsg);
|
WebApi.writeConfig(retMsg);
|
||||||
@ -316,8 +325,14 @@ void WebApiMqttClass::onMqttAdminPost(AsyncWebServerRequest* request)
|
|||||||
WebApi.sendJsonResponse(request, response, __FUNCTION__, __LINE__);
|
WebApi.sendJsonResponse(request, response, __FUNCTION__, __LINE__);
|
||||||
|
|
||||||
MqttSettings.performReconnect();
|
MqttSettings.performReconnect();
|
||||||
|
|
||||||
|
MqttHandleBatteryHass.forceUpdate();
|
||||||
MqttHandleHass.forceUpdate();
|
MqttHandleHass.forceUpdate();
|
||||||
|
MqttHandlePowerLimiterHass.forceUpdate();
|
||||||
MqttHandleVedirectHass.forceUpdate();
|
MqttHandleVedirectHass.forceUpdate();
|
||||||
|
|
||||||
|
MqttHandleHuawei.forceUpdate();
|
||||||
|
MqttHandlePowerLimiter.forceUpdate();
|
||||||
MqttHandleVedirect.forceUpdate();
|
MqttHandleVedirect.forceUpdate();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user