REFACTOR: json, mqtt, websocket + CACHE
This commit is contained in:
parent
8bae83ea32
commit
7b5d0b30f4
@ -7,7 +7,7 @@
|
||||
#include "patrix/sensor/DHT22.h"
|
||||
#include "patrix/sensor/Max6675Sensor.h"
|
||||
|
||||
class NodeHeizung final : public Node {
|
||||
class NodeHeizung final : public Node<100, 14> {
|
||||
|
||||
const unsigned long MAX_AGE_SECONDS = 10;
|
||||
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
#ifndef NODE_TEST_H
|
||||
#define NODE_TEST_H
|
||||
|
||||
#include <AsyncWebSocket.h>
|
||||
|
||||
#include "patrix/Node.h"
|
||||
#include "patrix/sensor/Dallas.h"
|
||||
#include "patrix/sensor/DallasSensor.h"
|
||||
#include "patrix/sensor/DHT22.h"
|
||||
|
||||
class NodeTest final : public Node {
|
||||
class NodeTest final : public Node<100, 4> {
|
||||
|
||||
const unsigned long MAX_AGE_SECONDS = 10;
|
||||
|
||||
|
||||
82
src/patrix/Cache.h
Normal file
82
src/patrix/Cache.h
Normal file
@ -0,0 +1,82 @@
|
||||
#ifndef CACHE_H
|
||||
#define CACHE_H
|
||||
|
||||
#include "log.h"
|
||||
|
||||
template<size_t ENTRY_COUNT, size_t VALUE_COUNT>
|
||||
class Cache {
|
||||
|
||||
public:
|
||||
|
||||
struct Entry {
|
||||
|
||||
time_t timestamp;
|
||||
|
||||
double data[VALUE_COUNT];
|
||||
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
Entry entries[ENTRY_COUNT];
|
||||
|
||||
Entry *r = entries;
|
||||
|
||||
Entry *w = entries;
|
||||
|
||||
void clearWriteEntry() {
|
||||
for (double& value: r->data) {
|
||||
value = 0;
|
||||
}
|
||||
}
|
||||
|
||||
Entry *findEntry(time_t timestamp) {
|
||||
if (w->timestamp == timestamp) {
|
||||
return w;
|
||||
}
|
||||
auto wNext = (w + 1 - entries) % ENTRY_COUNT + entries;
|
||||
if (w->timestamp == 0) {
|
||||
w->timestamp = timestamp;
|
||||
clearWriteEntry();
|
||||
} else if (wNext != r) {
|
||||
w = wNext;
|
||||
w->timestamp = timestamp;
|
||||
clearWriteEntry();
|
||||
} else {
|
||||
return nullptr;
|
||||
}
|
||||
return w;
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
void put(const time_t timestamp, const int valueIndex, const double value) {
|
||||
if (valueIndex >= VALUE_COUNT) {
|
||||
error("Cache.put: valueIndex to big for valueCount: valueIndex=%d, valueCount=%d", valueIndex, VALUE_COUNT);
|
||||
return;
|
||||
}
|
||||
Entry *entry = findEntry(timestamp);
|
||||
if (entry == nullptr) {
|
||||
warn("Cache full!");
|
||||
return;
|
||||
}
|
||||
entry->data[valueIndex] = value;
|
||||
}
|
||||
|
||||
Entry *read() {
|
||||
if (w != r || r->timestamp != 0) {
|
||||
return r;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void skip() {
|
||||
if (w != r || w->timestamp != 0) {
|
||||
r->timestamp = 0;
|
||||
r = (r + 1 - entries) % ENTRY_COUNT + entries;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
#endif
|
||||
@ -1,33 +1,38 @@
|
||||
#ifndef NODE_H
|
||||
#define NODE_H
|
||||
|
||||
#include "Cache.h"
|
||||
#include "clock.h"
|
||||
#include "sensor/Sensor.h"
|
||||
|
||||
template<size_t ENTRY_COUNT, size_t VALUE_COUNT>
|
||||
class Node {
|
||||
|
||||
Cache<ENTRY_COUNT, VALUE_COUNT> cache;
|
||||
|
||||
public:
|
||||
|
||||
Node() : cache() {
|
||||
//
|
||||
}
|
||||
|
||||
virtual ~Node() = default;
|
||||
|
||||
void toJson(const JsonObject& json) {
|
||||
auto index = 0;
|
||||
Sensor *sensor;
|
||||
while ((sensor = getSensor(index++)) != nullptr) {
|
||||
sensor->toJson(json[sensor->getName()].to<JsonObject>());
|
||||
}
|
||||
}
|
||||
|
||||
void send(const bool mqtt, const bool websocket) {
|
||||
JsonDocument json;
|
||||
toJson(json.to<JsonObject>());
|
||||
mqttPublish(String(WiFiClass::getHostname()) + "/json", json, mqtt, websocket);
|
||||
}
|
||||
|
||||
void loop() {
|
||||
loopBeforeSensors();
|
||||
const auto changed = loopSensors();
|
||||
if (changed) {
|
||||
send(true, true);
|
||||
loopSensors();
|
||||
cacheSend();
|
||||
}
|
||||
|
||||
void websocketSendOne(AsyncWebSocketClient *client) {
|
||||
auto sensorIndex = 0;
|
||||
Sensor *sensor;
|
||||
while ((sensor = this->getSensor(sensorIndex++)) != nullptr) {
|
||||
auto valueIndex = 0;
|
||||
Value *value;
|
||||
while ((value = sensor->getValue(valueIndex++)) != nullptr) {
|
||||
client->text(value->toJson(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -39,16 +44,48 @@ public:
|
||||
|
||||
private:
|
||||
|
||||
bool loopSensors() {
|
||||
auto changed = false;
|
||||
auto index = 0;
|
||||
void loopSensors() {
|
||||
const auto timestamp = time(nullptr);
|
||||
auto cacheIndex = 0;
|
||||
auto sensorIndex = 0;
|
||||
Sensor *sensor;
|
||||
while ((sensor = getSensor(index++)) != nullptr) {
|
||||
if (sensor->loop()) {
|
||||
changed = true;
|
||||
while ((sensor = getSensor(sensorIndex++)) != nullptr) {
|
||||
auto valueIndex = 0;
|
||||
Value *value;
|
||||
while ((value = sensor->getValue(valueIndex++)) != nullptr) {
|
||||
if (value->loop()) {
|
||||
cache.put(timestamp, cacheIndex++, value->getCurrentValue());
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
}
|
||||
|
||||
void cacheSend() {
|
||||
const auto entry = cache.read();
|
||||
if (entry == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto cacheIndex = 0;
|
||||
auto sensorIndex = 0;
|
||||
Sensor *sensor;
|
||||
while ((sensor = getSensor(sensorIndex++)) != nullptr) {
|
||||
auto valueIndex = 0;
|
||||
Value *value;
|
||||
while ((value = sensor->getValue(valueIndex++)) != nullptr) {
|
||||
const auto v = entry->data[cacheIndex++];
|
||||
const auto t = entry->timestamp;
|
||||
if (isCorrectTime(t) && !isnan(v)) {
|
||||
auto json = value->toJson(t, v, false);
|
||||
if (mqttPublish(value->name + "/persist", json, NO_RETAIN)) {
|
||||
entry->data[cacheIndex++] = NAN;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
cache.skip();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
@ -2,10 +2,10 @@
|
||||
|
||||
#include <ESPAsyncWebServer.h>
|
||||
|
||||
#include "INDEX_HTML.h"
|
||||
#include "log.h"
|
||||
#include "main.h"
|
||||
#include "system.h"
|
||||
#include "INDEX_HTML.h"
|
||||
|
||||
AsyncWebServer server(80);
|
||||
|
||||
@ -31,7 +31,7 @@ void httpSetup() {
|
||||
switch (type) {
|
||||
case WS_EVT_CONNECT:
|
||||
t = "CONNECT";
|
||||
node.send(false, true); // TODO this currently sends to ALL
|
||||
node.websocketSendOne(client);
|
||||
break;
|
||||
case WS_EVT_DISCONNECT:
|
||||
t = "DISCONNECT";
|
||||
@ -62,6 +62,6 @@ void httpLoop() {
|
||||
ws.cleanupClients();
|
||||
}
|
||||
|
||||
void httpPublish(char *payload) {
|
||||
void websocketSendAll(const String& payload) {
|
||||
ws.textAll(payload);
|
||||
}
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
#ifndef HTTP_H
|
||||
#define HTTP_H
|
||||
|
||||
#include <Arduino.h>
|
||||
|
||||
void httpSetup();
|
||||
|
||||
void httpLoop();
|
||||
|
||||
void httpPublish(char *payload);
|
||||
void websocketSendAll(const String& payload);
|
||||
|
||||
#endif
|
||||
|
||||
@ -7,9 +7,13 @@
|
||||
#include "mqtt.h"
|
||||
#include "wifi.h"
|
||||
|
||||
// ReSharper disable CppUnusedIncludeDirective
|
||||
#include "sensor/DallasSensor.h"
|
||||
#include "sensor/DHT22.h"
|
||||
#include "sensor/Max6675Sensor.h"
|
||||
// ReSharper restore CppUnusedIncludeDirective
|
||||
|
||||
// ReSharper disable CppUseAuto
|
||||
|
||||
#ifdef NODE_TEST
|
||||
NodeTest node = NodeTest();
|
||||
@ -19,6 +23,8 @@ NodeTest node = NodeTest();
|
||||
NodeHeizung node = NodeHeizung();
|
||||
#endif
|
||||
|
||||
// ReSharper restore CppUseAuto
|
||||
|
||||
void setup() {
|
||||
logSetup();
|
||||
bootDelay();
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
#ifndef MAIN_H
|
||||
#define MAIN_H
|
||||
|
||||
#include "NodeTest.h"
|
||||
// ReSharper disable CppUnusedIncludeDirective
|
||||
#include "NodeHeizung.h"
|
||||
#include "NodeTest.h"
|
||||
// ReSharper restore CppUnusedIncludeDirective
|
||||
|
||||
#ifdef NODE_TEST
|
||||
extern NodeTest node;
|
||||
|
||||
@ -3,7 +3,6 @@
|
||||
#include <PubSubClient.h>
|
||||
#include <WiFi.h>
|
||||
|
||||
#include "http.h"
|
||||
#include "log.h"
|
||||
#include "wifi.h"
|
||||
|
||||
@ -47,19 +46,6 @@ void mqttLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
bool mqttPublish(const String& topic, const double value, const bool retained) {
|
||||
return mqtt.publish(topic.c_str(), String(value).c_str(), retained);
|
||||
}
|
||||
|
||||
|
||||
bool mqttPublish(const String& topic, const JsonDocument& json, const bool mqttSend, const bool websocketSend) {
|
||||
char buffer[512];
|
||||
serializeJson(json, buffer);
|
||||
if (websocketSend) {
|
||||
httpPublish(buffer);
|
||||
}
|
||||
if (mqttSend) {
|
||||
return mqtt.publish(topic.c_str(), buffer);
|
||||
}
|
||||
return true;
|
||||
bool mqttPublish(const String& topic, const String& payload, const Retain retain) {
|
||||
return mqtt.publish(topic.c_str(), payload.c_str(), retain == RETAIN);
|
||||
}
|
||||
|
||||
@ -1,12 +1,14 @@
|
||||
#ifndef MQTT_H
|
||||
#define MQTT_H
|
||||
|
||||
#include <ArduinoJson.h>
|
||||
#include <AsyncWebSocket.h>
|
||||
|
||||
enum Retain {
|
||||
NO_RETAIN, RETAIN
|
||||
};
|
||||
|
||||
void mqttLoop();
|
||||
|
||||
bool mqttPublish(const String& topic, const double value, bool retained);
|
||||
|
||||
bool mqttPublish(const String& topic, const JsonDocument& json, bool mqtt, bool websocketSend);
|
||||
bool mqttPublish(const String& topic, const String& payload, Retain retain);
|
||||
|
||||
#endif
|
||||
|
||||
@ -19,7 +19,7 @@ class DHT22 final : public Sensor {
|
||||
public:
|
||||
|
||||
DHT22(const int pin,
|
||||
const char *name,
|
||||
const String& name,
|
||||
const double temperatureThreshold,
|
||||
const double humidityRelativeThreshold,
|
||||
const double humidityAbsoluteThreshold,
|
||||
@ -27,9 +27,9 @@ public:
|
||||
const unsigned long overdueSeconds
|
||||
) : Sensor(name),
|
||||
sensor(pin, DHT_TYPE_22),
|
||||
temperature(name, "temperature", temperatureThreshold, maxAgeSeconds, overdueSeconds),
|
||||
humidityRelative(name, "humidity/relative", humidityRelativeThreshold, maxAgeSeconds, overdueSeconds),
|
||||
humidityAbsolute(name, "humidity/absolute", humidityAbsoluteThreshold, maxAgeSeconds, overdueSeconds) {
|
||||
temperature(name + "/temperature", temperatureThreshold, maxAgeSeconds, overdueSeconds),
|
||||
humidityRelative(name + "/humidity/relative", humidityRelativeThreshold, maxAgeSeconds, overdueSeconds),
|
||||
humidityAbsolute(name + "/humidity/absolute", humidityAbsoluteThreshold, maxAgeSeconds, overdueSeconds) {
|
||||
//
|
||||
}
|
||||
|
||||
|
||||
@ -19,14 +19,14 @@ public:
|
||||
DallasSensor(
|
||||
Dallas& sensors,
|
||||
const uint64_t address,
|
||||
const char *name,
|
||||
const String& name,
|
||||
const double threshold,
|
||||
const unsigned long maxAgeSeconds,
|
||||
const unsigned long overdueSeconds
|
||||
) : Sensor(name),
|
||||
sensors(sensors),
|
||||
address(address),
|
||||
temperature(name, "temperature", threshold, maxAgeSeconds, overdueSeconds) {
|
||||
temperature(name + "/temperature", threshold, maxAgeSeconds, overdueSeconds) {
|
||||
//
|
||||
}
|
||||
|
||||
|
||||
@ -20,13 +20,13 @@ public:
|
||||
const int8_t pinMISO,
|
||||
const int8_t pinCS,
|
||||
const int8_t pinCLK,
|
||||
const char *name,
|
||||
const String& name,
|
||||
const double threshold,
|
||||
const unsigned long maxAgeSeconds,
|
||||
const unsigned long overdueSeconds
|
||||
) : Sensor(name),
|
||||
sensor(pinCLK, pinCS, pinMISO),
|
||||
temperature(name, "temperature", threshold, maxAgeSeconds, overdueSeconds) {
|
||||
temperature(name + "/temperature", threshold, maxAgeSeconds, overdueSeconds) {
|
||||
//
|
||||
}
|
||||
|
||||
|
||||
@ -6,63 +6,20 @@
|
||||
|
||||
class Sensor {
|
||||
|
||||
protected:
|
||||
|
||||
const char *name;
|
||||
|
||||
public:
|
||||
|
||||
explicit Sensor(const char *name): name(name) {
|
||||
const String& name;
|
||||
|
||||
explicit Sensor(const String& name): name(name) {
|
||||
//
|
||||
}
|
||||
|
||||
virtual ~Sensor() = default;
|
||||
|
||||
const char *getName() const {
|
||||
return name;
|
||||
}
|
||||
|
||||
bool loop() {
|
||||
loopBeforeValues();
|
||||
const auto changed = loopValues();
|
||||
if (changed) {
|
||||
send();
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
void send() {
|
||||
JsonDocument json;
|
||||
toJson(json.to<JsonObject>());
|
||||
mqttPublish(String(name) + "/json", json, true, false);
|
||||
}
|
||||
|
||||
void toJson(const JsonObject& json) {
|
||||
auto index = 0;
|
||||
Value *value;
|
||||
while ((value = getValue(index++)) != nullptr) {
|
||||
value->toJson(json[value->getName()].to<JsonObject>());
|
||||
}
|
||||
}
|
||||
|
||||
virtual void loopBeforeValues() {}
|
||||
|
||||
virtual Value *getValue(int index) = 0;
|
||||
|
||||
private:
|
||||
|
||||
bool loopValues() {
|
||||
auto changed = false;
|
||||
auto index = 0;
|
||||
Value *value;
|
||||
while ((value = getValue(index++)) != nullptr) {
|
||||
if (value->loop()) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
@ -1,16 +1,13 @@
|
||||
#ifndef VALUE_H
|
||||
#define VALUE_H
|
||||
|
||||
#include <WiFi.h>
|
||||
#include <ArduinoJson.h>
|
||||
#include <patrix/http.h>
|
||||
#include <patrix/log.h>
|
||||
#include <patrix/mqtt.h>
|
||||
|
||||
class Value {
|
||||
|
||||
const char *parent;
|
||||
|
||||
const char *name;
|
||||
|
||||
double threshold;
|
||||
|
||||
unsigned long maxAgeMillis;
|
||||
@ -29,17 +26,17 @@ class Value {
|
||||
|
||||
public:
|
||||
|
||||
const String name;
|
||||
|
||||
Value(
|
||||
const char *parent,
|
||||
const char *name,
|
||||
const String& name,
|
||||
const double threshold,
|
||||
const unsigned long maxAgeSeconds,
|
||||
const unsigned long overdueSeconds
|
||||
) : parent(parent),
|
||||
name(name),
|
||||
threshold(threshold),
|
||||
) : threshold(threshold),
|
||||
maxAgeMillis(maxAgeSeconds * 1000),
|
||||
overdueSeconds(overdueSeconds) {
|
||||
overdueSeconds(overdueSeconds),
|
||||
name(name) {
|
||||
//
|
||||
}
|
||||
|
||||
@ -49,24 +46,9 @@ public:
|
||||
currentEpoch = time(nullptr);
|
||||
}
|
||||
|
||||
void toJson(const JsonObject& json) const {
|
||||
json["value"] = currentValue;
|
||||
json["time"] = currentEpoch;
|
||||
}
|
||||
|
||||
void send() {
|
||||
mqttPublish(String(parent) + "/" + name + "/plain", currentValue, true);
|
||||
|
||||
JsonDocument json;
|
||||
toJson(json.to<JsonObject>());
|
||||
mqttPublish(String(parent) + "/" + name + "/json", json, true, false);
|
||||
|
||||
markSent();
|
||||
}
|
||||
|
||||
bool loop() {
|
||||
if (!isnan(currentValue) && millis() - currentMillis > maxAgeMillis) {
|
||||
warn("Value too old: %s/%s", parent, name);
|
||||
warn("Value too old: %s", name.c_str());
|
||||
update(NAN);
|
||||
}
|
||||
|
||||
@ -75,28 +57,33 @@ public:
|
||||
const auto dueToTime = sentInterval != 0 && sentInterval != time(nullptr) / overdueSeconds;
|
||||
const auto changed = dueToNAN || dueToThreshold || dueToTime;
|
||||
if (changed) {
|
||||
send();
|
||||
mqttPublish(name + "/retain", String(currentValue), RETAIN);
|
||||
websocketSendAll(toJson(false));
|
||||
sentValue = currentValue;
|
||||
sentInterval = time(nullptr) / overdueSeconds;
|
||||
}
|
||||
return changed;
|
||||
}
|
||||
|
||||
const char *getName() const {
|
||||
return name;
|
||||
}
|
||||
|
||||
double getCurrentValue() const {
|
||||
return currentValue;
|
||||
}
|
||||
|
||||
time_t getCurrentEpoch() const {
|
||||
return currentEpoch;
|
||||
String toJson(const bool addNameField) const {
|
||||
return toJson(currentEpoch, currentValue, addNameField);
|
||||
}
|
||||
|
||||
private:
|
||||
String toJson(const time_t timestamp, const double value, const bool addNameField) const {
|
||||
JsonDocument json;
|
||||
if (addNameField) {
|
||||
json["name"] = name.c_str();
|
||||
}
|
||||
json["timestamp"] = timestamp;
|
||||
json["value"] = value;
|
||||
|
||||
void markSent() {
|
||||
sentValue = currentValue;
|
||||
sentInterval = time(nullptr) / overdueSeconds;
|
||||
char buffer[256];
|
||||
serializeJson(json, buffer, sizeof buffer);
|
||||
return String(buffer);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
Loading…
Reference in New Issue
Block a user