Merge pull request #571 from schlimmchen/switch-context-on-huawei-mqtt-message

Fix: switch context when handling AC charger MQTT messages
This commit is contained in:
helgeerbe 2023-12-31 14:49:00 +01:00 committed by GitHub
commit ef1aec3b26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 114 additions and 86 deletions

View File

@ -5,6 +5,9 @@
#include <Huawei_can.h> #include <Huawei_can.h>
#include <espMqttClient.h> #include <espMqttClient.h>
#include <TaskSchedulerDeclarations.h> #include <TaskSchedulerDeclarations.h>
#include <mutex>
#include <deque>
#include <functional>
class MqttHandleHuaweiClass { class MqttHandleHuaweiClass {
public: public:
@ -12,13 +15,30 @@ public:
private: private:
void loop(); void loop();
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
enum class Topic : unsigned {
LimitOnlineVoltage,
LimitOnlineCurrent,
LimitOfflineVoltage,
LimitOfflineCurrent,
Mode
};
void onMqttMessage(Topic t,
const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len,
size_t index, size_t total);
Task _loopTask; Task _loopTask;
uint32_t _lastPublishStats; uint32_t _lastPublishStats;
uint32_t _lastPublish; uint32_t _lastPublish;
// MQTT callbacks to process updates on subscribed topics are executed in
// the MQTT thread's context. we use this queue to switch processing the
// user requests into the main loop's context (TaskScheduler context).
mutable std::mutex _mqttMutex;
std::deque<std::function<void()>> _mqttCallbacks;
}; };
extern MqttHandleHuaweiClass MqttHandleHuawei; extern MqttHandleHuaweiClass MqttHandleHuawei;

View File

@ -10,12 +10,6 @@
#include "WebApi_Huawei.h" #include "WebApi_Huawei.h"
#include <ctime> #include <ctime>
#define TOPIC_SUB_LIMIT_ONLINE_VOLTAGE "limit_online_voltage"
#define TOPIC_SUB_LIMIT_ONLINE_CURRENT "limit_online_current"
#define TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE "limit_offline_voltage"
#define TOPIC_SUB_LIMIT_OFFLINE_CURRENT "limit_offline_current"
#define TOPIC_SUB_MODE "mode"
MqttHandleHuaweiClass MqttHandleHuawei; MqttHandleHuaweiClass MqttHandleHuawei;
void MqttHandleHuaweiClass::init(Scheduler& scheduler) void MqttHandleHuaweiClass::init(Scheduler& scheduler)
@ -25,19 +19,22 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)
_loopTask.setIterations(TASK_FOREVER); _loopTask.setIterations(TASK_FOREVER);
_loopTask.enable(); _loopTask.enable();
using std::placeholders::_1; String const& prefix = MqttSettings.getPrefix();
using std::placeholders::_2;
using std::placeholders::_3;
using std::placeholders::_4;
using std::placeholders::_5;
using std::placeholders::_6;
String topic = MqttSettings.getPrefix(); auto subscribe = [&prefix, this](char const* subTopic, Topic t) {
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); String fullTopic(prefix + "huawei/cmd/" + subTopic);
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_ONLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); MqttSettings.subscribe(fullTopic.c_str(), 0,
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, t,
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_LIMIT_OFFLINE_CURRENT).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); std::placeholders::_1, std::placeholders::_2,
MqttSettings.subscribe(String(topic + "huawei/cmd/" + TOPIC_SUB_MODE).c_str(), 0, std::bind(&MqttHandleHuaweiClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5, std::placeholders::_6));
};
subscribe("limit_online_voltage", Topic::LimitOnlineVoltage);
subscribe("limit_online_current", Topic::LimitOnlineCurrent);
subscribe("limit_offline_voltage", Topic::LimitOfflineVoltage);
subscribe("limit_offline_current", Topic::LimitOfflineCurrent);
subscribe("mode", Topic::Mode);
_lastPublish = millis(); _lastPublish = millis();
@ -46,13 +43,21 @@ void MqttHandleHuaweiClass::init(Scheduler& scheduler)
void MqttHandleHuaweiClass::loop() void MqttHandleHuaweiClass::loop()
{ {
if (!MqttSettings.getConnected() ) { const CONFIG_T& config = Configuration.get();
std::unique_lock<std::mutex> mqttLock(_mqttMutex);
if (!config.Huawei.Enabled) {
_mqttCallbacks.clear();
return; return;
} }
const CONFIG_T& config = Configuration.get(); for (auto& callback : _mqttCallbacks) { callback(); }
_mqttCallbacks.clear();
if (!config.Huawei.Enabled) { mqttLock.unlock();
if (!MqttSettings.getConnected() ) {
return; return;
} }
@ -78,76 +83,79 @@ void MqttHandleHuaweiClass::loop()
} }
void MqttHandleHuaweiClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) void MqttHandleHuaweiClass::onMqttMessage(Topic t,
const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len,
size_t index, size_t total)
{ {
const CONFIG_T& config = Configuration.get(); std::string strValue(reinterpret_cast<const char*>(payload), len);
float payload_val = -1;
// ignore messages if Huawei is disabled try {
if (!config.Huawei.Enabled) { payload_val = std::stof(strValue);
}
catch (std::invalid_argument const& e) {
MessageOutput.printf("Huawei MQTT handler: cannot parse payload of topic '%s' as float: %s\r\n",
topic, strValue.c_str());
return; return;
} }
char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics std::lock_guard<std::mutex> mqttLock(_mqttMutex);
strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char*
char* setting; switch (t) {
char* rest = &token_topic[strlen(config.Mqtt.Topic)]; case Topic::LimitOnlineVoltage:
MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_ONLINE_VOLTAGE));
break;
strtok_r(rest, "/", &rest); // Remove "huawei" case Topic::LimitOfflineVoltage:
strtok_r(rest, "/", &rest); // Remove "cmd" MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_OFFLINE_VOLTAGE));
break;
setting = strtok_r(rest, "/", &rest); case Topic::LimitOnlineCurrent:
MessageOutput.printf("Limit Current: %f A\r\n", payload_val);
_mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_ONLINE_CURRENT));
break;
if (setting == NULL) { case Topic::LimitOfflineCurrent:
return; MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val);
} _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setValue,
&HuaweiCan, payload_val, HUAWEI_OFFLINE_CURRENT));
break;
char* strlimit = new char[len + 1]; case Topic::Mode:
memcpy(strlimit, payload, len); switch (static_cast<int>(payload_val)) {
strlimit[len] = '\0'; case 3:
float payload_val = strtof(strlimit, NULL); MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control");
delete[] strlimit; _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
&HuaweiCan, HUAWEI_MODE_AUTO_INT));
break;
if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_VOLTAGE)) { case 2:
// Set voltage limit MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit");
MessageOutput.printf("Limit Voltage: %f V\r\n", payload_val); _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_VOLTAGE); &HuaweiCan, HUAWEI_MODE_AUTO_EXT));
break;
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_VOLTAGE)) { case 1:
// Set current limit MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON");
MessageOutput.printf("Offline Limit Voltage: %f V\r\n", payload_val); _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_VOLTAGE); &HuaweiCan, HUAWEI_MODE_ON));
break;
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_ONLINE_CURRENT)) { case 0:
// Set current limit MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF");
MessageOutput.printf("Limit Current: %f A\r\n", payload_val); _mqttCallbacks.push_back(std::bind(&HuaweiCanClass::setMode,
HuaweiCan.setValue(payload_val, HUAWEI_ONLINE_CURRENT); &HuaweiCan, HUAWEI_MODE_OFF));
break;
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_OFFLINE_CURRENT)) { default:
// Set current limit MessageOutput.printf("[Huawei MQTT::] Invalid mode %.0f\r\n", payload_val);
MessageOutput.printf("Offline Limit Current: %f A\r\n", payload_val); break;
HuaweiCan.setValue(payload_val, HUAWEI_OFFLINE_CURRENT); }
break;
} else if (!strcmp(setting, TOPIC_SUB_MODE)) {
// Control power on/off
if(payload_val == 3) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Full internal control");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_INT);
}
if(payload_val == 2) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Internal on/off control, external power limit");
HuaweiCan.setMode(HUAWEI_MODE_AUTO_EXT);
}
if(payload_val == 1) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned ON");
HuaweiCan.setMode(HUAWEI_MODE_ON);
}
if(payload_val == 0) {
MessageOutput.println("[Huawei MQTT::] Received MQTT msg. New mode: Turned OFF");
HuaweiCan.setMode(HUAWEI_MODE_OFF);
}
} }
} }