#include #include #include "mqtt.h" #include "PubSubClient.h" #include "wifi.h" #define MQTT_CONNECT_TIMEOUT_MILLISECONDS 5000 #define MQTT_MAX_MESSAGE_AGE_MILLIS 11000 #define PHOTOVOLTAIC_POWER_W "openDTU/pv/ac/power" #define PHOTOVOLTAIC_ENERGY_KWH "openDTU/pv/ac/yieldtotal" #define GRID_POWER_W "electricity/grid/power/signed/w" #define GRID_IMPORT_WH "electricity/grid/energy/import/wh" #define GRID_EXPORT_WH "electricity/grid/energy/export/wh" WiFiClient espClient; PubSubClient mqtt(espClient); bool mqttConnected = false; unsigned long mqttLastConnectTry = 0; double photovoltaicPowerW = NAN; unsigned long photovoltaicPowerWLast = 0; double photovoltaicEnergyKWh = NAN; unsigned long photovoltaicEnergyKWhLast = 0; double gridPowerW = NAN; unsigned long gridPowerWLast = 0; double gridImportKWh = NAN; unsigned long gridImportKWhLast = 0; double gridExportKWh = NAN; unsigned long gridExportKWhLast = 0; void mqttDisconnect(); void mqttCallback(char *topic, uint8_t *payload, unsigned int length) { char message[128]; if (length > sizeof message - 1) { Serial.printf("MQTT: received too long message: topic=%s, length=%d", topic, length); return; } memcpy(message, payload, length); message[length] = 0; if (strcmp(PHOTOVOLTAIC_POWER_W, topic) == 0) { photovoltaicPowerW = strtod(message, nullptr); photovoltaicPowerWLast = millis(); } else if (strcmp(PHOTOVOLTAIC_ENERGY_KWH, topic) == 0) { photovoltaicEnergyKWh = strtod(message, nullptr); photovoltaicEnergyKWhLast = millis(); } else if (strcmp(GRID_POWER_W, topic) == 0) { gridPowerW = strtod(message, nullptr); gridPowerWLast = millis(); } else if (strcmp(GRID_IMPORT_WH, topic) == 0) { gridImportKWh = strtod(message, nullptr) / 1000; gridImportKWhLast = millis(); } else if (strcmp(GRID_EXPORT_WH, topic) == 0) { gridExportKWh = strtod(message, nullptr) / 1000; gridExportKWhLast = millis(); } } void mqtt_loop() { if (!wifiIsConnected()) { mqttDisconnect(); return; } if (!mqtt.loop() && (mqttLastConnectTry == 0 || millis() - mqttLastConnectTry > MQTT_CONNECT_TIMEOUT_MILLISECONDS)) { mqttLastConnectTry = millis(); mqtt.setServer("10.0.0.50", 1883); mqttConnected = mqtt.connect(WiFiClass::getHostname()); if (mqttConnected) { Serial.printf("Successfully connected mqtt broker at %s:%d\n", "10.0.0.50", 1883); mqtt.setCallback(mqttCallback); mqtt.subscribe(PHOTOVOLTAIC_POWER_W); mqtt.subscribe(PHOTOVOLTAIC_ENERGY_KWH); mqtt.subscribe(GRID_POWER_W); mqtt.subscribe(GRID_IMPORT_WH); mqtt.subscribe(GRID_EXPORT_WH); } else { Serial.printf("ERROR: Failed to connect MQTT broker at %s:%d\n", "10.0.0.50", 1883); mqttDisconnect(); } } } void mqttDisconnect() { if (mqttConnected) { mqtt.disconnect(); mqttConnected = false; photovoltaicPowerW = NAN; photovoltaicPowerWLast = 0; gridPowerW = NAN; gridPowerWLast = 0; } } double getPhotovoltaicPowerW() { if (millis() - photovoltaicPowerWLast > MQTT_MAX_MESSAGE_AGE_MILLIS) { return NAN; } return photovoltaicPowerW; } double getPhotovoltaicEnergyKWh() { if (millis() - photovoltaicEnergyKWhLast > MQTT_MAX_MESSAGE_AGE_MILLIS) { return NAN; } return photovoltaicEnergyKWh; } double getGridPowerW() { if (millis() - gridPowerWLast > MQTT_MAX_MESSAGE_AGE_MILLIS) { return NAN; } return gridPowerW; } double getGridImportKWh() { if (millis() - gridImportKWhLast > MQTT_MAX_MESSAGE_AGE_MILLIS) { return NAN; } return gridImportKWh; } double getGridExportKWh() { if (millis() - gridExportKWhLast > MQTT_MAX_MESSAGE_AGE_MILLIS) { return NAN; } return gridExportKWh; }