AggregationReceiver, Heating, Measure, Counter

This commit is contained in:
Patrick Haßel 2024-10-15 16:29:37 +02:00
parent c85fbb12c3
commit e090e1530d
12 changed files with 389 additions and 0 deletions

View File

@ -0,0 +1,75 @@
package de.ph87.data.aggregation;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import java.time.ZonedDateTime;
import static de.ph87.data.common.DateTimeHelpers.ZDT;
@Getter
@ToString
public class Aggregation {
@NonNull
public final String name;
@NonNull
public final ZonedDateTime date;
public final int count;
public final int errors;
@NonNull
public final ZonedDateTime firstTime;
@NonNull
public final ZonedDateTime lastTime;
public final double firstValue;
public final double lastValue;
public final double min;
public final double max;
public final double sum;
public final double squares;
public final boolean terminal;
public Aggregation(
final String name,
final long date,
final int count,
final int errors,
final long firstTime,
final long lastTime,
final double firstValue,
final double lastValue,
final double min,
final double max,
final double sum,
final double squares,
final boolean terminal
) {
this.name = name;
this.date = ZDT(date);
this.count = count;
this.errors = errors;
this.firstTime = ZDT(firstTime);
this.lastTime = ZDT(lastTime);
this.firstValue = firstValue;
this.lastValue = lastValue;
this.min = min;
this.max = max;
this.sum = sum;
this.squares = squares;
this.terminal = terminal;
}
}

View File

@ -0,0 +1,21 @@
package de.ph87.data.aggregation;
import java.util.Map;
import static de.ph87.data.heating.HeatingConstants.*;
public class AggregationConstants {
public static final Map<String, String> HEATING_TOPIC_MAP = Map.of(
"aggregation/heizung/abgas/temperatur", HEATING_EXHAUST_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/puffer/vorlauf/temperatur", HEATING_BUFFER_SUPPLY_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/puffer/ruecklauf/temperatur", HEATING_BUFFER_RETURN_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/puffer/eingang/temperatur", HEATING_BUFFER_COLD_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/puffer/speicher/temperatur", HEATING_BUFFER_INNER_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/puffer/ausgang/temperatur", HEATING_BUFFER_HOT_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/puffer/zirkulation/temperatur", HEATING_BUFFER_CIRCULATION_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/heizkreis/vorlauf/temperatur", HEATING_LOOP_SUPPLY_TEMPERATURE_SERIES_NAME,
"aggregation/heizung/heizkreis/ruecklauf/temperatur", HEATING_LOOP_RETURN_TEMPERATURE_SERIES_NAME
);
}

View File

@ -0,0 +1,43 @@
package de.ph87.data.aggregation;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.ph87.data.mqtt.MqttEvent;
import de.ph87.data.series.measure.MeasureEvent;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import static de.ph87.data.aggregation.AggregationConstants.HEATING_TOPIC_MAP;
@Slf4j
@Service
@RequiredArgsConstructor
public class AggregationReceiver {
private final ApplicationEventPublisher applicationEventPublisher;
private final ObjectMapper objectMapper;
@EventListener(MqttEvent.class)
public void onEvent(@NonNull final MqttEvent event) {
final String seriesName = HEATING_TOPIC_MAP.get(event.topic);
if (seriesName == null) {
return;
}
final Aggregation aggregation;
try {
aggregation = objectMapper.readValue(event.payload, Aggregation.class);
} catch (JsonProcessingException e) {
log.error("Failed to parse Aggregation: error={}, event={}", e, event);
return;
}
applicationEventPublisher.publishEvent(new MeasureEvent(seriesName, aggregation.lastTime, aggregation.lastValue));
}
}

View File

@ -0,0 +1,23 @@
package de.ph87.data.heating;
public class HeatingConstants {
public static final String HEATING_EXHAUST_TEMPERATURE_SERIES_NAME = "heating.exhaust.temperature";
public static final String HEATING_BUFFER_SUPPLY_TEMPERATURE_SERIES_NAME = "heating.buffer.supply.temperature";
public static final String HEATING_BUFFER_RETURN_TEMPERATURE_SERIES_NAME = "heating.buffer.return.temperature";
public static final String HEATING_BUFFER_COLD_TEMPERATURE_SERIES_NAME = "heating.buffer.cold.temperature";
public static final String HEATING_BUFFER_INNER_TEMPERATURE_SERIES_NAME = "heating.buffer.inner.temperature";
public static final String HEATING_BUFFER_HOT_TEMPERATURE_SERIES_NAME = "heating.buffer.hot.temperature";
public static final String HEATING_BUFFER_CIRCULATION_TEMPERATURE_SERIES_NAME = "heating.buffer.circulation.temperature";
public static final String HEATING_LOOP_SUPPLY_TEMPERATURE_SERIES_NAME = "heating.loop.supply.temperature";
public static final String HEATING_LOOP_RETURN_TEMPERATURE_SERIES_NAME = "heating.loop.return.temperature";
}

View File

@ -5,6 +5,8 @@ import lombok.NonNull;
import java.util.function.BiFunction;
public enum SeriesMode {
MEASURE((first, second) -> second - first),
COUNTER((first, second) -> second - first),
INCREASING((first, second) -> second - first),
DECREASING((first, second) -> first - second),
;

View File

@ -0,0 +1,26 @@
package de.ph87.data.series.counter;
import de.ph87.data.series.SeriesIntervalKey;
import jakarta.persistence.EmbeddedId;
import jakarta.persistence.Entity;
import lombok.*;
@Entity
@Getter
@ToString
@NoArgsConstructor
public class Counter {
@NonNull
@EmbeddedId
private SeriesIntervalKey id;
@Setter
private int count;
public Counter(@NonNull final SeriesIntervalKey id, final int count) {
this.id = id;
this.count = count;
}
}

View File

@ -0,0 +1,23 @@
package de.ph87.data.series.counter;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import java.time.ZonedDateTime;
@Getter
@ToString
@RequiredArgsConstructor
public class CounterEvent {
@NonNull
private final String name;
@NonNull
private final ZonedDateTime date;
private final int count;
}

View File

@ -0,0 +1,8 @@
package de.ph87.data.series.counter;
import de.ph87.data.series.SeriesIntervalKey;
import org.springframework.data.repository.ListCrudRepository;
public interface CounterRepository extends ListCrudRepository<Counter, SeriesIntervalKey> {
}

View File

@ -0,0 +1,38 @@
package de.ph87.data.series.counter;
import de.ph87.data.series.Series;
import de.ph87.data.series.SeriesIntervalKey;
import de.ph87.data.series.SeriesMode;
import de.ph87.data.series.SeriesService;
import de.ph87.data.series.consumption.unit.Unit;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class CounterService {
private final CounterRepository counterRepository;
private final SeriesService seriesService;
@EventListener(CounterEvent.class)
public void onCounterEvent(@NonNull final CounterEvent event) {
final Series series = seriesService.getOrCreateByName(event.getName(), SeriesMode.COUNTER);
for (final Unit unit : Unit.values()) {
final SeriesIntervalKey id = new SeriesIntervalKey(series, unit, event.getDate());
counterRepository.findById(id)
.stream()
.peek(existing -> existing.setCount(existing.getCount() + event.getCount()))
.findFirst()
.orElseGet(() -> counterRepository.save(new Counter(id, event.getCount())));
}
}
}

View File

@ -0,0 +1,84 @@
package de.ph87.data.series.measure;
import de.ph87.data.series.SeriesIntervalKey;
import jakarta.persistence.Column;
import jakarta.persistence.EmbeddedId;
import jakarta.persistence.Entity;
import lombok.*;
import java.time.ZonedDateTime;
@Entity
@Getter
@ToString
@NoArgsConstructor
public class Measure {
@NonNull
@EmbeddedId
private SeriesIntervalKey id;
@NonNull
@Column(nullable = false)
private ZonedDateTime firstDate;
@Column(nullable = false)
private double firstValue;
@Setter
@NonNull
@Column(nullable = false)
private ZonedDateTime lastDate;
@Setter
@Column(nullable = false)
private double lastValue;
@Setter
@Column(nullable = false)
private int count;
@Setter
@Column(nullable = false)
private double min;
@Setter
@Column(nullable = false)
private double max;
@Setter
@Column(nullable = false)
private double mean;
@Setter
@Column(nullable = false)
private double variance;
public Measure(@NonNull final SeriesIntervalKey id, @NonNull final ZonedDateTime date, final double value) {
this.id = id;
this.firstDate = date;
this.firstValue = value;
this.lastDate = date;
this.lastValue = value;
this.count = 1;
this.min = value;
this.max = value;
this.mean = value;
this.variance = 0;
}
public void update(@NonNull final MeasureEvent event) {
final double x = event.getValue();
this.count++;
this.lastDate = event.getDate();
this.lastValue = x;
this.min = Math.min(this.min, x);
this.max = Math.max(this.max, x);
final double oldMean = this.mean;
this.mean = oldMean + (x - oldMean) / this.count;
this.variance = this.variance + (x - oldMean) * (x - this.mean) / this.count;
}
}

View File

@ -0,0 +1,8 @@
package de.ph87.data.series.measure;
import de.ph87.data.series.SeriesIntervalKey;
import org.springframework.data.repository.ListCrudRepository;
public interface MeasureRepository extends ListCrudRepository<Measure, SeriesIntervalKey> {
}

View File

@ -0,0 +1,38 @@
package de.ph87.data.series.measure;
import de.ph87.data.series.Series;
import de.ph87.data.series.SeriesIntervalKey;
import de.ph87.data.series.SeriesMode;
import de.ph87.data.series.SeriesService;
import de.ph87.data.series.consumption.unit.Unit;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class MeasureService {
private final MeasureRepository measureRepository;
private final SeriesService seriesService;
@EventListener(MeasureEvent.class)
public void onMeasureEvent(@NonNull final MeasureEvent event) {
final Series series = seriesService.getOrCreateByName(event.getName(), SeriesMode.MEASURE);
for (final Unit unit : Unit.values()) {
final SeriesIntervalKey id = new SeriesIntervalKey(series, unit, event.getDate());
measureRepository.findById(id)
.stream()
.peek(existing -> existing.update(event))
.findFirst()
.orElseGet(() -> measureRepository.save(new Measure(id, event.getDate(), event.getValue())));
}
}
}