From 5dd4d5b452d94457ed6b83056b0e699b826e8f00 Mon Sep 17 00:00:00 2001 From: Thomas Basler Date: Wed, 14 Dec 2022 20:42:23 +0100 Subject: [PATCH] Introduce MqttSubscribeParser and moved inverter specific subscribes to MqttHandleInverterClass This allows individual callback functions for each subscribed topic. Allows easier implementation of further mqtt functions --- include/MqttHandleInverter.h | 2 + include/MqttSettings.h | 5 + .../MqttSubscribeParser.cpp | 171 ++++++++++++++++++ lib/MqttSubscribeParser/MqttSubscribeParser.h | 31 ++++ src/MqttHandleInverter.cpp | 106 +++++++++++ src/MqttSettings.cpp | 112 ++---------- 6 files changed, 331 insertions(+), 96 deletions(-) create mode 100644 lib/MqttSubscribeParser/MqttSubscribeParser.cpp create mode 100644 lib/MqttSubscribeParser/MqttSubscribeParser.h diff --git a/include/MqttHandleInverter.h b/include/MqttHandleInverter.h index 4667563..56c1ef8 100644 --- a/include/MqttHandleInverter.h +++ b/include/MqttHandleInverter.h @@ -4,6 +4,7 @@ #include "Configuration.h" #include #include +#include #include class MqttHandleInverterClass { @@ -15,6 +16,7 @@ public: private: void publishField(std::shared_ptr inv, uint8_t channel, uint8_t fieldId); + void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total); uint32_t _lastPublishStats[INV_MAX_COUNT]; uint32_t _lastPublish; diff --git a/include/MqttSettings.h b/include/MqttSettings.h index afa3283..a6b3bc1 100644 --- a/include/MqttSettings.h +++ b/include/MqttSettings.h @@ -3,6 +3,7 @@ #include "NetworkSettings.h" #include +#include #include #include #include @@ -16,6 +17,9 @@ public: void publish(const String& subtopic, const String& payload); void publishHass(const String& subtopic, const String& payload); + void subscribe(const String& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb); + void unsubscribe(const String& topic); + String getPrefix(); private: @@ -34,6 +38,7 @@ private: String clientId; String willTopic; Ticker mqttReconnectTimer; + MqttSubscribeParser _mqttSubscribeParser; }; extern MqttSettingsClass MqttSettings; \ No newline at end of file diff --git a/lib/MqttSubscribeParser/MqttSubscribeParser.cpp b/lib/MqttSubscribeParser/MqttSubscribeParser.cpp new file mode 100644 index 0000000..c0cc3bb --- /dev/null +++ b/lib/MqttSubscribeParser/MqttSubscribeParser.cpp @@ -0,0 +1,171 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +/* + * Copyright (C) 2022 Thomas Basler and others + */ +#include "MqttSubscribeParser.h" + +void MqttSubscribeParser::register_callback(const std::string& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb) +{ + cb_filter_t cbf; + cbf.topic = topic; + cbf.qos = qos; + cbf.cb = cb; + _callbacks.push_back(cbf); +} + +void MqttSubscribeParser::unregister_callback(const std::string& topic) +{ + for (auto it = _callbacks.begin(); it != _callbacks.end();) { + if ((*it).topic == topic) { + it = _callbacks.erase(it); + } else { + ++it; + } + } +} + +void MqttSubscribeParser::handle_message(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) +{ + bool result = false; + for (const auto& cb : _callbacks) { + if (mosquitto_topic_matches_sub(cb.topic.c_str(), topic, &result) == MOSQ_ERR_SUCCESS) { + if (result) { + cb.cb(properties, topic, payload, len, index, total); + } + } + } +} + +std::vector MqttSubscribeParser::get_callbacks() +{ + return _callbacks; +} + +/* Does a topic match a subscription? */ +int MqttSubscribeParser::mosquitto_topic_matches_sub(const char* sub, const char* topic, bool* result) +{ + size_t spos; + + if (!result) + return MOSQ_ERR_INVAL; + *result = false; + + if (!sub || !topic || sub[0] == 0 || topic[0] == 0) { + return MOSQ_ERR_INVAL; + } + + if ((sub[0] == '$' && topic[0] != '$') + || (topic[0] == '$' && sub[0] != '$')) { + + return MOSQ_ERR_SUCCESS; + } + + spos = 0; + + while (sub[0] != 0) { + if (topic[0] == '+' || topic[0] == '#') { + return MOSQ_ERR_INVAL; + } + if (sub[0] != topic[0] || topic[0] == 0) { /* Check for wildcard matches */ + if (sub[0] == '+') { + /* Check for bad "+foo" or "a/+foo" subscription */ + if (spos > 0 && sub[-1] != '/') { + return MOSQ_ERR_INVAL; + } + /* Check for bad "foo+" or "foo+/a" subscription */ + if (sub[1] != 0 && sub[1] != '/') { + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + while (topic[0] != 0 && topic[0] != '/') { + if (topic[0] == '+' || topic[0] == '#') { + return MOSQ_ERR_INVAL; + } + topic++; + } + if (topic[0] == 0 && sub[0] == 0) { + *result = true; + return MOSQ_ERR_SUCCESS; + } + } else if (sub[0] == '#') { + /* Check for bad "foo#" subscription */ + if (spos > 0 && sub[-1] != '/') { + return MOSQ_ERR_INVAL; + } + /* Check for # not the final character of the sub, e.g. "#foo" */ + if (sub[1] != 0) { + return MOSQ_ERR_INVAL; + } else { + while (topic[0] != 0) { + if (topic[0] == '+' || topic[0] == '#') { + return MOSQ_ERR_INVAL; + } + topic++; + } + *result = true; + return MOSQ_ERR_SUCCESS; + } + } else { + /* Check for e.g. foo/bar matching foo/+/# */ + if (topic[0] == 0 + && spos > 0 + && sub[-1] == '+' + && sub[0] == '/' + && sub[1] == '#') { + *result = true; + return MOSQ_ERR_SUCCESS; + } + + /* There is no match at this point, but is the sub invalid? */ + while (sub[0] != 0) { + if (sub[0] == '#' && sub[1] != 0) { + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + } + + /* Valid input, but no match */ + return MOSQ_ERR_SUCCESS; + } + } else { + /* sub[spos] == topic[tpos] */ + if (topic[1] == 0) { + /* Check for e.g. foo matching foo/# */ + if (sub[1] == '/' + && sub[2] == '#' + && sub[3] == 0) { + *result = true; + return MOSQ_ERR_SUCCESS; + } + } + spos++; + sub++; + topic++; + if (sub[0] == 0 && topic[0] == 0) { + *result = true; + return MOSQ_ERR_SUCCESS; + } else if (topic[0] == 0 && sub[0] == '+' && sub[1] == 0) { + if (spos > 0 && sub[-1] != '/') { + return MOSQ_ERR_INVAL; + } + spos++; + sub++; + *result = true; + return MOSQ_ERR_SUCCESS; + } + } + } + if ((topic[0] != 0 || sub[0] != 0)) { + *result = false; + } + while (topic[0] != 0) { + if (topic[0] == '+' || topic[0] == '#') { + return MOSQ_ERR_INVAL; + } + topic++; + } + + return MOSQ_ERR_SUCCESS; +} diff --git a/lib/MqttSubscribeParser/MqttSubscribeParser.h b/lib/MqttSubscribeParser/MqttSubscribeParser.h new file mode 100644 index 0000000..68589fd --- /dev/null +++ b/lib/MqttSubscribeParser/MqttSubscribeParser.h @@ -0,0 +1,31 @@ +// SPDX-License-Identifier: GPL-2.0-or-later +#pragma once + +#include +#include +#include +#include + +struct cb_filter_t { + std::string topic; + uint8_t qos; + espMqttClientTypes::OnMessageCallback cb; +}; + +class MqttSubscribeParser { +public: + void register_callback(const std::string& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb); + void unregister_callback(const std::string& topic); + void handle_message(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total); + std::vector get_callbacks(); + +private: + int mosquitto_topic_matches_sub(const char* sub, const char* topic, bool* result); + + std::vector _callbacks; + + enum mosq_err_t { + MOSQ_ERR_SUCCESS = 0, + MOSQ_ERR_INVAL = 3, + }; +}; \ No newline at end of file diff --git a/src/MqttHandleInverter.cpp b/src/MqttHandleInverter.cpp index c049cba..b2bbf87 100644 --- a/src/MqttHandleInverter.cpp +++ b/src/MqttHandleInverter.cpp @@ -6,10 +6,31 @@ #include "MqttSettings.h" #include +#define TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE "limit_persistent_relative" +#define TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE "limit_persistent_absolute" +#define TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE "limit_nonpersistent_relative" +#define TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE "limit_nonpersistent_absolute" +#define TOPIC_SUB_POWER "power" +#define TOPIC_SUB_RESTART "restart" + MqttHandleInverterClass MqttHandleInverter; void MqttHandleInverterClass::init() { + using std::placeholders::_1; + using std::placeholders::_2; + using std::placeholders::_3; + using std::placeholders::_4; + using std::placeholders::_5; + using std::placeholders::_6; + + String topic = MqttSettings.getPrefix(); + MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); + MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); + MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); + MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); + MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_POWER).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); + MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_RESTART).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6)); } void MqttHandleInverterClass::loop() @@ -119,4 +140,89 @@ String MqttHandleInverterClass::getTopic(std::shared_ptr inv, } return inv->serialString() + "/" + String(channel) + "/" + chanName; +} + +void MqttHandleInverterClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) +{ + const CONFIG_T& config = Configuration.get(); + + char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics + strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char* + + char* serial_str; + char* subtopic; + char* setting; + char* rest = &token_topic[strlen(config.Mqtt_Topic)]; + + serial_str = strtok_r(rest, "/", &rest); + subtopic = strtok_r(rest, "/", &rest); + setting = strtok_r(rest, "/", &rest); + + if (serial_str == NULL || subtopic == NULL || setting == NULL) { + return; + } + + uint64_t serial; + serial = strtoull(serial_str, 0, 16); + + auto inv = Hoymiles.getInverterBySerial(serial); + + if (inv == nullptr) { + Serial.println(F("Inverter not found")); + return; + } + + // check if subtopic is unequal cmd + if (strcmp(subtopic, "cmd")) { + return; + } + + char* strlimit = new char[len + 1]; + memcpy(strlimit, payload, len); + strlimit[len] = '\0'; + uint32_t payload_val = strtol(strlimit, NULL, 10); + delete[] strlimit; + + if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE)) { + // Set inverter limit relative persistent + Serial.printf("Limit Persistent: %d %%\n", payload_val); + inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativPersistent); + + } else if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE)) { + // Set inverter limit absolute persistent + Serial.printf("Limit Persistent: %d W\n", payload_val); + inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutPersistent); + + } else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE)) { + // Set inverter limit relative non persistent + Serial.printf("Limit Non-Persistent: %d %%\n", payload_val); + if (!properties.retain) { + inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativNonPersistent); + } else { + Serial.println("Ignored because retained"); + } + + } else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE)) { + // Set inverter limit absolute non persistent + Serial.printf("Limit Non-Persistent: %d W\n", payload_val); + if (!properties.retain) { + inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutNonPersistent); + } else { + Serial.println("Ignored because retained"); + } + + } else if (!strcmp(setting, TOPIC_SUB_POWER)) { + // Turn inverter on or off + Serial.printf("Set inverter power to: %d\n", payload_val); + inv->sendPowerControlRequest(Hoymiles.getRadio(), payload_val > 0); + + } else if (!strcmp(setting, TOPIC_SUB_RESTART)) { + // Restart inverter + Serial.printf("Restart inverter\n"); + if (!properties.retain && payload_val == 1) { + inv->sendRestartControlRequest(Hoymiles.getRadio()); + } else { + Serial.println("Ignored because retained"); + } + } } \ No newline at end of file diff --git a/src/MqttSettings.cpp b/src/MqttSettings.cpp index 516409b..06bc749 100644 --- a/src/MqttSettings.cpp +++ b/src/MqttSettings.cpp @@ -5,18 +5,10 @@ #include "MqttSettings.h" #include "Configuration.h" #include "NetworkSettings.h" -#include #include #include #include -#define TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE "limit_persistent_relative" -#define TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE "limit_persistent_absolute" -#define TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE "limit_nonpersistent_relative" -#define TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE "limit_nonpersistent_absolute" -#define TOPIC_SUB_POWER "power" -#define TOPIC_SUB_RESTART "restart" - MqttSettingsClass::MqttSettingsClass() { } @@ -43,13 +35,21 @@ void MqttSettingsClass::onMqttConnect(bool sessionPresent) const CONFIG_T& config = Configuration.get(); publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Online); - String topic = getPrefix(); - mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE).c_str(), 0); - mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE).c_str(), 0); - mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE).c_str(), 0); - mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE).c_str(), 0); - mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_POWER).c_str(), 0); - mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_RESTART).c_str(), 0); + for (const auto& cb : _mqttSubscribeParser.get_callbacks()) { + mqttClient->subscribe(cb.topic.c_str(), cb.qos); + } +} + +void MqttSettingsClass::subscribe(const String& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb) +{ + _mqttSubscribeParser.register_callback(topic.c_str(), qos, cb); + mqttClient->subscribe(topic.c_str(), qos); +} + +void MqttSettingsClass::unsubscribe(const String& topic) +{ + _mqttSubscribeParser.unregister_callback(topic.c_str()); + mqttClient->unsubscribe(topic.c_str()); } void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) @@ -85,90 +85,10 @@ void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason re void MqttSettingsClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total) { - const CONFIG_T& config = Configuration.get(); - Serial.print(F("Received MQTT message on topic: ")); Serial.println(topic); - char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics - strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char* - - char* serial_str; - char* subtopic; - char* setting; - char* rest = &token_topic[strlen(config.Mqtt_Topic)]; - - serial_str = strtok_r(rest, "/", &rest); - subtopic = strtok_r(rest, "/", &rest); - setting = strtok_r(rest, "/", &rest); - - if (serial_str == NULL || subtopic == NULL || setting == NULL) { - return; - } - - uint64_t serial; - serial = strtoull(serial_str, 0, 16); - - auto inv = Hoymiles.getInverterBySerial(serial); - - if (inv == nullptr) { - Serial.println(F("Inverter not found")); - return; - } - - // check if subtopic is unequal cmd - if (strcmp(subtopic, "cmd")) { - return; - } - - char* strlimit = new char[len + 1]; - memcpy(strlimit, payload, len); - strlimit[len] = '\0'; - uint32_t payload_val = strtol(strlimit, NULL, 10); - delete[] strlimit; - - if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE)) { - // Set inverter limit relative persistent - Serial.printf("Limit Persistent: %d %%\n", payload_val); - inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativPersistent); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE)) { - // Set inverter limit absolute persistent - Serial.printf("Limit Persistent: %d W\n", payload_val); - inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutPersistent); - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE)) { - // Set inverter limit relative non persistent - Serial.printf("Limit Non-Persistent: %d %%\n", payload_val); - if (!properties.retain) { - inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativNonPersistent); - } else { - Serial.println("Ignored because retained"); - } - - } else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE)) { - // Set inverter limit absolute non persistent - Serial.printf("Limit Non-Persistent: %d W\n", payload_val); - if (!properties.retain) { - inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutNonPersistent); - } else { - Serial.println("Ignored because retained"); - } - - } else if (!strcmp(setting, TOPIC_SUB_POWER)) { - // Turn inverter on or off - Serial.printf("Set inverter power to: %d\n", payload_val); - inv->sendPowerControlRequest(Hoymiles.getRadio(), payload_val > 0); - - } else if (!strcmp(setting, TOPIC_SUB_RESTART)) { - // Restart inverter - Serial.printf("Restart inverter\n"); - if (!properties.retain && payload_val == 1) { - inv->sendRestartControlRequest(Hoymiles.getRadio()); - } else { - Serial.println("Ignored because retained"); - } - } + _mqttSubscribeParser.handle_message(properties, topic, payload, len, index, total); } void MqttSettingsClass::performConnect()