From e090e1530d4d35a452999d0d4315b123fc52d153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patrick=20Ha=C3=9Fel?= Date: Tue, 15 Oct 2024 16:29:37 +0200 Subject: [PATCH] AggregationReceiver, Heating, Measure, Counter --- .../de/ph87/data/aggregation/Aggregation.java | 75 +++++++++++++++++ .../aggregation/AggregationConstants.java | 21 +++++ .../data/aggregation/AggregationReceiver.java | 43 ++++++++++ .../ph87/data/heating/HeatingConstants.java | 23 +++++ .../java/de/ph87/data/series/SeriesMode.java | 2 + .../de/ph87/data/series/counter/Counter.java | 26 ++++++ .../data/series/counter/CounterEvent.java | 23 +++++ .../series/counter/CounterRepository.java | 8 ++ .../data/series/counter/CounterService.java | 38 +++++++++ .../de/ph87/data/series/measure/Measure.java | 84 +++++++++++++++++++ .../series/measure/MeasureRepository.java | 8 ++ .../data/series/measure/MeasureService.java | 38 +++++++++ 12 files changed, 389 insertions(+) create mode 100644 src/main/java/de/ph87/data/aggregation/Aggregation.java create mode 100644 src/main/java/de/ph87/data/aggregation/AggregationConstants.java create mode 100644 src/main/java/de/ph87/data/aggregation/AggregationReceiver.java create mode 100644 src/main/java/de/ph87/data/heating/HeatingConstants.java create mode 100644 src/main/java/de/ph87/data/series/counter/Counter.java create mode 100644 src/main/java/de/ph87/data/series/counter/CounterEvent.java create mode 100644 src/main/java/de/ph87/data/series/counter/CounterRepository.java create mode 100644 src/main/java/de/ph87/data/series/counter/CounterService.java create mode 100644 src/main/java/de/ph87/data/series/measure/Measure.java create mode 100644 src/main/java/de/ph87/data/series/measure/MeasureRepository.java create mode 100644 src/main/java/de/ph87/data/series/measure/MeasureService.java diff --git a/src/main/java/de/ph87/data/aggregation/Aggregation.java b/src/main/java/de/ph87/data/aggregation/Aggregation.java new file mode 100644 index 0000000..4bd3ad5 --- /dev/null +++ b/src/main/java/de/ph87/data/aggregation/Aggregation.java @@ -0,0 +1,75 @@ +package de.ph87.data.aggregation; + +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +import java.time.ZonedDateTime; + +import static de.ph87.data.common.DateTimeHelpers.ZDT; + +@Getter +@ToString +public class Aggregation { + + @NonNull + public final String name; + + @NonNull + public final ZonedDateTime date; + + public final int count; + + public final int errors; + + @NonNull + public final ZonedDateTime firstTime; + + @NonNull + public final ZonedDateTime lastTime; + + public final double firstValue; + + public final double lastValue; + + public final double min; + + public final double max; + + public final double sum; + + public final double squares; + + public final boolean terminal; + + public Aggregation( + final String name, + final long date, + final int count, + final int errors, + final long firstTime, + final long lastTime, + final double firstValue, + final double lastValue, + final double min, + final double max, + final double sum, + final double squares, + final boolean terminal + ) { + this.name = name; + this.date = ZDT(date); + this.count = count; + this.errors = errors; + this.firstTime = ZDT(firstTime); + this.lastTime = ZDT(lastTime); + this.firstValue = firstValue; + this.lastValue = lastValue; + this.min = min; + this.max = max; + this.sum = sum; + this.squares = squares; + this.terminal = terminal; + } + +} diff --git a/src/main/java/de/ph87/data/aggregation/AggregationConstants.java b/src/main/java/de/ph87/data/aggregation/AggregationConstants.java new file mode 100644 index 0000000..7a5824e --- /dev/null +++ b/src/main/java/de/ph87/data/aggregation/AggregationConstants.java @@ -0,0 +1,21 @@ +package de.ph87.data.aggregation; + +import java.util.Map; + +import static de.ph87.data.heating.HeatingConstants.*; + +public class AggregationConstants { + + public static final Map HEATING_TOPIC_MAP = Map.of( + "aggregation/heizung/abgas/temperatur", HEATING_EXHAUST_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/puffer/vorlauf/temperatur", HEATING_BUFFER_SUPPLY_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/puffer/ruecklauf/temperatur", HEATING_BUFFER_RETURN_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/puffer/eingang/temperatur", HEATING_BUFFER_COLD_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/puffer/speicher/temperatur", HEATING_BUFFER_INNER_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/puffer/ausgang/temperatur", HEATING_BUFFER_HOT_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/puffer/zirkulation/temperatur", HEATING_BUFFER_CIRCULATION_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/heizkreis/vorlauf/temperatur", HEATING_LOOP_SUPPLY_TEMPERATURE_SERIES_NAME, + "aggregation/heizung/heizkreis/ruecklauf/temperatur", HEATING_LOOP_RETURN_TEMPERATURE_SERIES_NAME + ); + +} diff --git a/src/main/java/de/ph87/data/aggregation/AggregationReceiver.java b/src/main/java/de/ph87/data/aggregation/AggregationReceiver.java new file mode 100644 index 0000000..237b7c7 --- /dev/null +++ b/src/main/java/de/ph87/data/aggregation/AggregationReceiver.java @@ -0,0 +1,43 @@ +package de.ph87.data.aggregation; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.ph87.data.mqtt.MqttEvent; +import de.ph87.data.series.measure.MeasureEvent; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import static de.ph87.data.aggregation.AggregationConstants.HEATING_TOPIC_MAP; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AggregationReceiver { + + private final ApplicationEventPublisher applicationEventPublisher; + + private final ObjectMapper objectMapper; + + @EventListener(MqttEvent.class) + public void onEvent(@NonNull final MqttEvent event) { + final String seriesName = HEATING_TOPIC_MAP.get(event.topic); + if (seriesName == null) { + return; + } + + final Aggregation aggregation; + try { + aggregation = objectMapper.readValue(event.payload, Aggregation.class); + } catch (JsonProcessingException e) { + log.error("Failed to parse Aggregation: error={}, event={}", e, event); + return; + } + + applicationEventPublisher.publishEvent(new MeasureEvent(seriesName, aggregation.lastTime, aggregation.lastValue)); + } + +} diff --git a/src/main/java/de/ph87/data/heating/HeatingConstants.java b/src/main/java/de/ph87/data/heating/HeatingConstants.java new file mode 100644 index 0000000..a75ffa0 --- /dev/null +++ b/src/main/java/de/ph87/data/heating/HeatingConstants.java @@ -0,0 +1,23 @@ +package de.ph87.data.heating; + +public class HeatingConstants { + + public static final String HEATING_EXHAUST_TEMPERATURE_SERIES_NAME = "heating.exhaust.temperature"; + + public static final String HEATING_BUFFER_SUPPLY_TEMPERATURE_SERIES_NAME = "heating.buffer.supply.temperature"; + + public static final String HEATING_BUFFER_RETURN_TEMPERATURE_SERIES_NAME = "heating.buffer.return.temperature"; + + public static final String HEATING_BUFFER_COLD_TEMPERATURE_SERIES_NAME = "heating.buffer.cold.temperature"; + + public static final String HEATING_BUFFER_INNER_TEMPERATURE_SERIES_NAME = "heating.buffer.inner.temperature"; + + public static final String HEATING_BUFFER_HOT_TEMPERATURE_SERIES_NAME = "heating.buffer.hot.temperature"; + + public static final String HEATING_BUFFER_CIRCULATION_TEMPERATURE_SERIES_NAME = "heating.buffer.circulation.temperature"; + + public static final String HEATING_LOOP_SUPPLY_TEMPERATURE_SERIES_NAME = "heating.loop.supply.temperature"; + + public static final String HEATING_LOOP_RETURN_TEMPERATURE_SERIES_NAME = "heating.loop.return.temperature"; + +} diff --git a/src/main/java/de/ph87/data/series/SeriesMode.java b/src/main/java/de/ph87/data/series/SeriesMode.java index 11f955a..e30801a 100644 --- a/src/main/java/de/ph87/data/series/SeriesMode.java +++ b/src/main/java/de/ph87/data/series/SeriesMode.java @@ -5,6 +5,8 @@ import lombok.NonNull; import java.util.function.BiFunction; public enum SeriesMode { + MEASURE((first, second) -> second - first), + COUNTER((first, second) -> second - first), INCREASING((first, second) -> second - first), DECREASING((first, second) -> first - second), ; diff --git a/src/main/java/de/ph87/data/series/counter/Counter.java b/src/main/java/de/ph87/data/series/counter/Counter.java new file mode 100644 index 0000000..f28a08c --- /dev/null +++ b/src/main/java/de/ph87/data/series/counter/Counter.java @@ -0,0 +1,26 @@ +package de.ph87.data.series.counter; + +import de.ph87.data.series.SeriesIntervalKey; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import lombok.*; + +@Entity +@Getter +@ToString +@NoArgsConstructor +public class Counter { + + @NonNull + @EmbeddedId + private SeriesIntervalKey id; + + @Setter + private int count; + + public Counter(@NonNull final SeriesIntervalKey id, final int count) { + this.id = id; + this.count = count; + } + +} diff --git a/src/main/java/de/ph87/data/series/counter/CounterEvent.java b/src/main/java/de/ph87/data/series/counter/CounterEvent.java new file mode 100644 index 0000000..620e8c7 --- /dev/null +++ b/src/main/java/de/ph87/data/series/counter/CounterEvent.java @@ -0,0 +1,23 @@ +package de.ph87.data.series.counter; + +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.time.ZonedDateTime; + +@Getter +@ToString +@RequiredArgsConstructor +public class CounterEvent { + + @NonNull + private final String name; + + @NonNull + private final ZonedDateTime date; + + private final int count; + +} diff --git a/src/main/java/de/ph87/data/series/counter/CounterRepository.java b/src/main/java/de/ph87/data/series/counter/CounterRepository.java new file mode 100644 index 0000000..ab2dfd4 --- /dev/null +++ b/src/main/java/de/ph87/data/series/counter/CounterRepository.java @@ -0,0 +1,8 @@ +package de.ph87.data.series.counter; + +import de.ph87.data.series.SeriesIntervalKey; +import org.springframework.data.repository.ListCrudRepository; + +public interface CounterRepository extends ListCrudRepository { + +} diff --git a/src/main/java/de/ph87/data/series/counter/CounterService.java b/src/main/java/de/ph87/data/series/counter/CounterService.java new file mode 100644 index 0000000..60c673f --- /dev/null +++ b/src/main/java/de/ph87/data/series/counter/CounterService.java @@ -0,0 +1,38 @@ +package de.ph87.data.series.counter; + +import de.ph87.data.series.Series; +import de.ph87.data.series.SeriesIntervalKey; +import de.ph87.data.series.SeriesMode; +import de.ph87.data.series.SeriesService; +import de.ph87.data.series.consumption.unit.Unit; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Service +@Transactional +@RequiredArgsConstructor +public class CounterService { + + private final CounterRepository counterRepository; + + private final SeriesService seriesService; + + @EventListener(CounterEvent.class) + public void onCounterEvent(@NonNull final CounterEvent event) { + final Series series = seriesService.getOrCreateByName(event.getName(), SeriesMode.COUNTER); + for (final Unit unit : Unit.values()) { + final SeriesIntervalKey id = new SeriesIntervalKey(series, unit, event.getDate()); + counterRepository.findById(id) + .stream() + .peek(existing -> existing.setCount(existing.getCount() + event.getCount())) + .findFirst() + .orElseGet(() -> counterRepository.save(new Counter(id, event.getCount()))); + } + } + +} diff --git a/src/main/java/de/ph87/data/series/measure/Measure.java b/src/main/java/de/ph87/data/series/measure/Measure.java new file mode 100644 index 0000000..96fbb48 --- /dev/null +++ b/src/main/java/de/ph87/data/series/measure/Measure.java @@ -0,0 +1,84 @@ +package de.ph87.data.series.measure; + +import de.ph87.data.series.SeriesIntervalKey; +import jakarta.persistence.Column; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import lombok.*; + +import java.time.ZonedDateTime; + +@Entity +@Getter +@ToString +@NoArgsConstructor +public class Measure { + + @NonNull + @EmbeddedId + private SeriesIntervalKey id; + + @NonNull + @Column(nullable = false) + private ZonedDateTime firstDate; + + @Column(nullable = false) + private double firstValue; + + @Setter + @NonNull + @Column(nullable = false) + private ZonedDateTime lastDate; + + @Setter + @Column(nullable = false) + private double lastValue; + + @Setter + @Column(nullable = false) + private int count; + + @Setter + @Column(nullable = false) + private double min; + + @Setter + @Column(nullable = false) + private double max; + + @Setter + @Column(nullable = false) + private double mean; + + @Setter + @Column(nullable = false) + private double variance; + + public Measure(@NonNull final SeriesIntervalKey id, @NonNull final ZonedDateTime date, final double value) { + this.id = id; + this.firstDate = date; + this.firstValue = value; + this.lastDate = date; + this.lastValue = value; + this.count = 1; + this.min = value; + this.max = value; + this.mean = value; + this.variance = 0; + } + + public void update(@NonNull final MeasureEvent event) { + final double x = event.getValue(); + + this.count++; + this.lastDate = event.getDate(); + this.lastValue = x; + this.min = Math.min(this.min, x); + this.max = Math.max(this.max, x); + + final double oldMean = this.mean; + this.mean = oldMean + (x - oldMean) / this.count; + this.variance = this.variance + (x - oldMean) * (x - this.mean) / this.count; + } + +} diff --git a/src/main/java/de/ph87/data/series/measure/MeasureRepository.java b/src/main/java/de/ph87/data/series/measure/MeasureRepository.java new file mode 100644 index 0000000..ccf73e5 --- /dev/null +++ b/src/main/java/de/ph87/data/series/measure/MeasureRepository.java @@ -0,0 +1,8 @@ +package de.ph87.data.series.measure; + +import de.ph87.data.series.SeriesIntervalKey; +import org.springframework.data.repository.ListCrudRepository; + +public interface MeasureRepository extends ListCrudRepository { + +} diff --git a/src/main/java/de/ph87/data/series/measure/MeasureService.java b/src/main/java/de/ph87/data/series/measure/MeasureService.java new file mode 100644 index 0000000..09c1bac --- /dev/null +++ b/src/main/java/de/ph87/data/series/measure/MeasureService.java @@ -0,0 +1,38 @@ +package de.ph87.data.series.measure; + +import de.ph87.data.series.Series; +import de.ph87.data.series.SeriesIntervalKey; +import de.ph87.data.series.SeriesMode; +import de.ph87.data.series.SeriesService; +import de.ph87.data.series.consumption.unit.Unit; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Service +@Transactional +@RequiredArgsConstructor +public class MeasureService { + + private final MeasureRepository measureRepository; + + private final SeriesService seriesService; + + @EventListener(MeasureEvent.class) + public void onMeasureEvent(@NonNull final MeasureEvent event) { + final Series series = seriesService.getOrCreateByName(event.getName(), SeriesMode.MEASURE); + for (final Unit unit : Unit.values()) { + final SeriesIntervalKey id = new SeriesIntervalKey(series, unit, event.getDate()); + measureRepository.findById(id) + .stream() + .peek(existing -> existing.update(event)) + .findFirst() + .orElseGet(() -> measureRepository.save(new Measure(id, event.getDate(), event.getValue()))); + } + } + +}