Introduce MqttSubscribeParser and moved inverter specific subscribes to MqttHandleInverterClass

This allows individual callback functions for each subscribed topic. Allows easier implementation of further mqtt functions
This commit is contained in:
Thomas Basler 2022-12-14 20:42:23 +01:00
parent a4316c89b4
commit 5dd4d5b452
6 changed files with 331 additions and 96 deletions

View File

@ -4,6 +4,7 @@
#include "Configuration.h"
#include <Arduino.h>
#include <Hoymiles.h>
#include <espMqttClient.h>
#include <memory>
class MqttHandleInverterClass {
@ -15,6 +16,7 @@ public:
private:
void publishField(std::shared_ptr<InverterAbstract> inv, uint8_t channel, uint8_t fieldId);
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
uint32_t _lastPublishStats[INV_MAX_COUNT];
uint32_t _lastPublish;

View File

@ -3,6 +3,7 @@
#include "NetworkSettings.h"
#include <Arduino.h>
#include <MqttSubscribeParser.h>
#include <Ticker.h>
#include <espMqttClient.h>
#include <memory>
@ -16,6 +17,9 @@ public:
void publish(const String& subtopic, const String& payload);
void publishHass(const String& subtopic, const String& payload);
void subscribe(const String& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb);
void unsubscribe(const String& topic);
String getPrefix();
private:
@ -34,6 +38,7 @@ private:
String clientId;
String willTopic;
Ticker mqttReconnectTimer;
MqttSubscribeParser _mqttSubscribeParser;
};
extern MqttSettingsClass MqttSettings;

View File

@ -0,0 +1,171 @@
// SPDX-License-Identifier: GPL-2.0-or-later
/*
* Copyright (C) 2022 Thomas Basler and others
*/
#include "MqttSubscribeParser.h"
void MqttSubscribeParser::register_callback(const std::string& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb)
{
cb_filter_t cbf;
cbf.topic = topic;
cbf.qos = qos;
cbf.cb = cb;
_callbacks.push_back(cbf);
}
void MqttSubscribeParser::unregister_callback(const std::string& topic)
{
for (auto it = _callbacks.begin(); it != _callbacks.end();) {
if ((*it).topic == topic) {
it = _callbacks.erase(it);
} else {
++it;
}
}
}
void MqttSubscribeParser::handle_message(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
{
bool result = false;
for (const auto& cb : _callbacks) {
if (mosquitto_topic_matches_sub(cb.topic.c_str(), topic, &result) == MOSQ_ERR_SUCCESS) {
if (result) {
cb.cb(properties, topic, payload, len, index, total);
}
}
}
}
std::vector<cb_filter_t> MqttSubscribeParser::get_callbacks()
{
return _callbacks;
}
/* Does a topic match a subscription? */
int MqttSubscribeParser::mosquitto_topic_matches_sub(const char* sub, const char* topic, bool* result)
{
size_t spos;
if (!result)
return MOSQ_ERR_INVAL;
*result = false;
if (!sub || !topic || sub[0] == 0 || topic[0] == 0) {
return MOSQ_ERR_INVAL;
}
if ((sub[0] == '$' && topic[0] != '$')
|| (topic[0] == '$' && sub[0] != '$')) {
return MOSQ_ERR_SUCCESS;
}
spos = 0;
while (sub[0] != 0) {
if (topic[0] == '+' || topic[0] == '#') {
return MOSQ_ERR_INVAL;
}
if (sub[0] != topic[0] || topic[0] == 0) { /* Check for wildcard matches */
if (sub[0] == '+') {
/* Check for bad "+foo" or "a/+foo" subscription */
if (spos > 0 && sub[-1] != '/') {
return MOSQ_ERR_INVAL;
}
/* Check for bad "foo+" or "foo+/a" subscription */
if (sub[1] != 0 && sub[1] != '/') {
return MOSQ_ERR_INVAL;
}
spos++;
sub++;
while (topic[0] != 0 && topic[0] != '/') {
if (topic[0] == '+' || topic[0] == '#') {
return MOSQ_ERR_INVAL;
}
topic++;
}
if (topic[0] == 0 && sub[0] == 0) {
*result = true;
return MOSQ_ERR_SUCCESS;
}
} else if (sub[0] == '#') {
/* Check for bad "foo#" subscription */
if (spos > 0 && sub[-1] != '/') {
return MOSQ_ERR_INVAL;
}
/* Check for # not the final character of the sub, e.g. "#foo" */
if (sub[1] != 0) {
return MOSQ_ERR_INVAL;
} else {
while (topic[0] != 0) {
if (topic[0] == '+' || topic[0] == '#') {
return MOSQ_ERR_INVAL;
}
topic++;
}
*result = true;
return MOSQ_ERR_SUCCESS;
}
} else {
/* Check for e.g. foo/bar matching foo/+/# */
if (topic[0] == 0
&& spos > 0
&& sub[-1] == '+'
&& sub[0] == '/'
&& sub[1] == '#') {
*result = true;
return MOSQ_ERR_SUCCESS;
}
/* There is no match at this point, but is the sub invalid? */
while (sub[0] != 0) {
if (sub[0] == '#' && sub[1] != 0) {
return MOSQ_ERR_INVAL;
}
spos++;
sub++;
}
/* Valid input, but no match */
return MOSQ_ERR_SUCCESS;
}
} else {
/* sub[spos] == topic[tpos] */
if (topic[1] == 0) {
/* Check for e.g. foo matching foo/# */
if (sub[1] == '/'
&& sub[2] == '#'
&& sub[3] == 0) {
*result = true;
return MOSQ_ERR_SUCCESS;
}
}
spos++;
sub++;
topic++;
if (sub[0] == 0 && topic[0] == 0) {
*result = true;
return MOSQ_ERR_SUCCESS;
} else if (topic[0] == 0 && sub[0] == '+' && sub[1] == 0) {
if (spos > 0 && sub[-1] != '/') {
return MOSQ_ERR_INVAL;
}
spos++;
sub++;
*result = true;
return MOSQ_ERR_SUCCESS;
}
}
}
if ((topic[0] != 0 || sub[0] != 0)) {
*result = false;
}
while (topic[0] != 0) {
if (topic[0] == '+' || topic[0] == '#') {
return MOSQ_ERR_INVAL;
}
topic++;
}
return MOSQ_ERR_SUCCESS;
}

View File

@ -0,0 +1,31 @@
// SPDX-License-Identifier: GPL-2.0-or-later
#pragma once
#include <cstdint>
#include <espMqttClient.h>
#include <string>
#include <vector>
struct cb_filter_t {
std::string topic;
uint8_t qos;
espMqttClientTypes::OnMessageCallback cb;
};
class MqttSubscribeParser {
public:
void register_callback(const std::string& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb);
void unregister_callback(const std::string& topic);
void handle_message(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
std::vector<cb_filter_t> get_callbacks();
private:
int mosquitto_topic_matches_sub(const char* sub, const char* topic, bool* result);
std::vector<cb_filter_t> _callbacks;
enum mosq_err_t {
MOSQ_ERR_SUCCESS = 0,
MOSQ_ERR_INVAL = 3,
};
};

View File

@ -6,10 +6,31 @@
#include "MqttSettings.h"
#include <ctime>
#define TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE "limit_persistent_relative"
#define TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE "limit_persistent_absolute"
#define TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE "limit_nonpersistent_relative"
#define TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE "limit_nonpersistent_absolute"
#define TOPIC_SUB_POWER "power"
#define TOPIC_SUB_RESTART "restart"
MqttHandleInverterClass MqttHandleInverter;
void MqttHandleInverterClass::init()
{
using std::placeholders::_1;
using std::placeholders::_2;
using std::placeholders::_3;
using std::placeholders::_4;
using std::placeholders::_5;
using std::placeholders::_6;
String topic = MqttSettings.getPrefix();
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_POWER).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
MqttSettings.subscribe(String(topic + "+/cmd/" + TOPIC_SUB_RESTART).c_str(), 0, std::bind(&MqttHandleInverterClass::onMqttMessage, this, _1, _2, _3, _4, _5, _6));
}
void MqttHandleInverterClass::loop()
@ -120,3 +141,88 @@ String MqttHandleInverterClass::getTopic(std::shared_ptr<InverterAbstract> inv,
return inv->serialString() + "/" + String(channel) + "/" + chanName;
}
void MqttHandleInverterClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
{
const CONFIG_T& config = Configuration.get();
char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics
strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char*
char* serial_str;
char* subtopic;
char* setting;
char* rest = &token_topic[strlen(config.Mqtt_Topic)];
serial_str = strtok_r(rest, "/", &rest);
subtopic = strtok_r(rest, "/", &rest);
setting = strtok_r(rest, "/", &rest);
if (serial_str == NULL || subtopic == NULL || setting == NULL) {
return;
}
uint64_t serial;
serial = strtoull(serial_str, 0, 16);
auto inv = Hoymiles.getInverterBySerial(serial);
if (inv == nullptr) {
Serial.println(F("Inverter not found"));
return;
}
// check if subtopic is unequal cmd
if (strcmp(subtopic, "cmd")) {
return;
}
char* strlimit = new char[len + 1];
memcpy(strlimit, payload, len);
strlimit[len] = '\0';
uint32_t payload_val = strtol(strlimit, NULL, 10);
delete[] strlimit;
if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE)) {
// Set inverter limit relative persistent
Serial.printf("Limit Persistent: %d %%\n", payload_val);
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativPersistent);
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE)) {
// Set inverter limit absolute persistent
Serial.printf("Limit Persistent: %d W\n", payload_val);
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutPersistent);
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE)) {
// Set inverter limit relative non persistent
Serial.printf("Limit Non-Persistent: %d %%\n", payload_val);
if (!properties.retain) {
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativNonPersistent);
} else {
Serial.println("Ignored because retained");
}
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE)) {
// Set inverter limit absolute non persistent
Serial.printf("Limit Non-Persistent: %d W\n", payload_val);
if (!properties.retain) {
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutNonPersistent);
} else {
Serial.println("Ignored because retained");
}
} else if (!strcmp(setting, TOPIC_SUB_POWER)) {
// Turn inverter on or off
Serial.printf("Set inverter power to: %d\n", payload_val);
inv->sendPowerControlRequest(Hoymiles.getRadio(), payload_val > 0);
} else if (!strcmp(setting, TOPIC_SUB_RESTART)) {
// Restart inverter
Serial.printf("Restart inverter\n");
if (!properties.retain && payload_val == 1) {
inv->sendRestartControlRequest(Hoymiles.getRadio());
} else {
Serial.println("Ignored because retained");
}
}
}

View File

@ -5,18 +5,10 @@
#include "MqttSettings.h"
#include "Configuration.h"
#include "NetworkSettings.h"
#include <Hoymiles.h>
#include <MqttClientSetup.h>
#include <Ticker.h>
#include <espMqttClient.h>
#define TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE "limit_persistent_relative"
#define TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE "limit_persistent_absolute"
#define TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE "limit_nonpersistent_relative"
#define TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE "limit_nonpersistent_absolute"
#define TOPIC_SUB_POWER "power"
#define TOPIC_SUB_RESTART "restart"
MqttSettingsClass::MqttSettingsClass()
{
}
@ -43,13 +35,21 @@ void MqttSettingsClass::onMqttConnect(bool sessionPresent)
const CONFIG_T& config = Configuration.get();
publish(config.Mqtt_LwtTopic, config.Mqtt_LwtValue_Online);
String topic = getPrefix();
mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE).c_str(), 0);
mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE).c_str(), 0);
mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE).c_str(), 0);
mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE).c_str(), 0);
mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_POWER).c_str(), 0);
mqttClient->subscribe(String(topic + "+/cmd/" + TOPIC_SUB_RESTART).c_str(), 0);
for (const auto& cb : _mqttSubscribeParser.get_callbacks()) {
mqttClient->subscribe(cb.topic.c_str(), cb.qos);
}
}
void MqttSettingsClass::subscribe(const String& topic, uint8_t qos, const espMqttClientTypes::OnMessageCallback& cb)
{
_mqttSubscribeParser.register_callback(topic.c_str(), qos, cb);
mqttClient->subscribe(topic.c_str(), qos);
}
void MqttSettingsClass::unsubscribe(const String& topic)
{
_mqttSubscribeParser.unregister_callback(topic.c_str());
mqttClient->unsubscribe(topic.c_str());
}
void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason reason)
@ -85,90 +85,10 @@ void MqttSettingsClass::onMqttDisconnect(espMqttClientTypes::DisconnectReason re
void MqttSettingsClass::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
{
const CONFIG_T& config = Configuration.get();
Serial.print(F("Received MQTT message on topic: "));
Serial.println(topic);
char token_topic[MQTT_MAX_TOPIC_STRLEN + 40]; // respect all subtopics
strncpy(token_topic, topic, MQTT_MAX_TOPIC_STRLEN + 40); // convert const char* to char*
char* serial_str;
char* subtopic;
char* setting;
char* rest = &token_topic[strlen(config.Mqtt_Topic)];
serial_str = strtok_r(rest, "/", &rest);
subtopic = strtok_r(rest, "/", &rest);
setting = strtok_r(rest, "/", &rest);
if (serial_str == NULL || subtopic == NULL || setting == NULL) {
return;
}
uint64_t serial;
serial = strtoull(serial_str, 0, 16);
auto inv = Hoymiles.getInverterBySerial(serial);
if (inv == nullptr) {
Serial.println(F("Inverter not found"));
return;
}
// check if subtopic is unequal cmd
if (strcmp(subtopic, "cmd")) {
return;
}
char* strlimit = new char[len + 1];
memcpy(strlimit, payload, len);
strlimit[len] = '\0';
uint32_t payload_val = strtol(strlimit, NULL, 10);
delete[] strlimit;
if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_RELATIVE)) {
// Set inverter limit relative persistent
Serial.printf("Limit Persistent: %d %%\n", payload_val);
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativPersistent);
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_PERSISTENT_ABSOLUTE)) {
// Set inverter limit absolute persistent
Serial.printf("Limit Persistent: %d W\n", payload_val);
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutPersistent);
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_RELATIVE)) {
// Set inverter limit relative non persistent
Serial.printf("Limit Non-Persistent: %d %%\n", payload_val);
if (!properties.retain) {
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::RelativNonPersistent);
} else {
Serial.println("Ignored because retained");
}
} else if (!strcmp(setting, TOPIC_SUB_LIMIT_NONPERSISTENT_ABSOLUTE)) {
// Set inverter limit absolute non persistent
Serial.printf("Limit Non-Persistent: %d W\n", payload_val);
if (!properties.retain) {
inv->sendActivePowerControlRequest(Hoymiles.getRadio(), payload_val, PowerLimitControlType::AbsolutNonPersistent);
} else {
Serial.println("Ignored because retained");
}
} else if (!strcmp(setting, TOPIC_SUB_POWER)) {
// Turn inverter on or off
Serial.printf("Set inverter power to: %d\n", payload_val);
inv->sendPowerControlRequest(Hoymiles.getRadio(), payload_val > 0);
} else if (!strcmp(setting, TOPIC_SUB_RESTART)) {
// Restart inverter
Serial.printf("Restart inverter\n");
if (!properties.retain && payload_val == 1) {
inv->sendRestartControlRequest(Hoymiles.getRadio());
} else {
Serial.println("Ignored because retained");
}
}
_mqttSubscribeParser.handle_message(properties, topic, payload, len, index, total);
}
void MqttSettingsClass::performConnect()