MQTT powermeter: avoid iterating subscriptions

instead of iterating a map with subscriptions, we now bind the target
variable to the callback, which is executed once a message is arrived.
this way, the target variable is already linked to the respective topic
when the callback is executed.

lock the mutex when writing the variable, as the MQTT callback is
executed in a different context (MQTT task) than the main loop task,
which otherwise accesses the variables.
This commit is contained in:
Bernhard Kirchen 2024-05-08 13:12:43 +02:00
parent 5cd6334880
commit d4c07836d9
2 changed files with 31 additions and 29 deletions

View File

@ -3,7 +3,7 @@
#include "PowerMeterProvider.h"
#include <espMqttClient.h>
#include <map>
#include <vector>
#include <mutex>
class PowerMeterMqtt : public PowerMeterProvider {
@ -15,14 +15,16 @@ public:
void doMqttPublish() const final;
private:
void onMqttMessage(const espMqttClientTypes::MessageProperties& properties,
const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total);
using MsgProperties = espMqttClientTypes::MessageProperties;
void onMessage(MsgProperties const& properties, char const* topic,
uint8_t const* payload, size_t len, size_t index,
size_t total, float* targetVariable);
float _powerValueOne = 0;
float _powerValueTwo = 0;
float _powerValueThree = 0;
std::map<String, float*> _mqttSubscriptions;
std::vector<String> _mqttSubscriptions;
mutable std::mutex _mutex;
};

View File

@ -6,15 +6,16 @@
bool PowerMeterMqtt::init()
{
auto subscribe = [this](char const* topic, float* target) {
auto subscribe = [this](char const* topic, float* targetVariable) {
if (strlen(topic) == 0) { return; }
MqttSettings.subscribe(topic, 0,
std::bind(&PowerMeterMqtt::onMqttMessage,
std::bind(&PowerMeterMqtt::onMessage,
this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4,
std::placeholders::_5, std::placeholders::_6)
std::placeholders::_5, std::placeholders::_6,
targetVariable)
);
_mqttSubscriptions.try_emplace(topic, target);
_mqttSubscriptions.push_back(topic);
};
auto const& config = Configuration.get();
@ -27,32 +28,31 @@ bool PowerMeterMqtt::init()
void PowerMeterMqtt::deinit()
{
for (auto const& s: _mqttSubscriptions) { MqttSettings.unsubscribe(s.first); }
for (auto const& t: _mqttSubscriptions) { MqttSettings.unsubscribe(t); }
_mqttSubscriptions.clear();
}
void PowerMeterMqtt::onMqttMessage(const espMqttClientTypes::MessageProperties& properties, const char* topic, const uint8_t* payload, size_t len, size_t index, size_t total)
void PowerMeterMqtt::onMessage(PowerMeterMqtt::MsgProperties const& properties,
char const* topic, uint8_t const* payload, size_t len, size_t index,
size_t total, float* targetVariable)
{
for (auto const& subscription: _mqttSubscriptions) {
if (subscription.first != topic) { continue; }
std::string value(reinterpret_cast<const char*>(payload), len);
try {
*subscription.second = std::stof(value);
}
catch(std::invalid_argument const& e) {
MessageOutput.printf("[PowerMeterMqtt] cannot parse payload of topic '%s' as float: %s\r\n",
topic, value.c_str());
return;
}
if (_verboseLogging) {
MessageOutput.printf("[PowerMeterMqtt] Updated from '%s', TotalPower: %5.2f\r\n",
topic, getPowerTotal());
}
gotUpdate();
std::string value(reinterpret_cast<char const*>(payload), len);
try {
std::lock_guard<std::mutex> l(_mutex);
*targetVariable = std::stof(value);
}
catch (std::invalid_argument const& e) {
MessageOutput.printf("[PowerMeterMqtt] cannot parse payload of topic "
"'%s' as float: %s\r\n", topic, value.c_str());
return;
}
if (_verboseLogging) {
MessageOutput.printf("[PowerMeterMqtt] Updated from '%s', TotalPower: %5.2f\r\n",
topic, getPowerTotal());
}
gotUpdate();
}
float PowerMeterMqtt::getPowerTotal() const