diff --git a/application.properties b/application.properties index a03b4d5..c35e625 100644 --- a/application.properties +++ b/application.properties @@ -8,4 +8,4 @@ spring.jpa.hibernate.ddl-auto=update #- spring.jackson.serialization.indent_output=true #- -de.ph87.knx.mqtt.uri=tcp://10.0.0.50:1883 +de.ph87.knx.mqtt.uri=tcp://10.255.0.1:1883 diff --git a/pom.xml b/pom.xml index 2e440a1..d3f6bf1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 de.ph87 - DataMulti + Data2025 1.0-SNAPSHOT @@ -61,4 +61,14 @@ + + Data2025 + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/src/main/java/de/ph87/data/DemoConfig.java b/src/main/java/de/ph87/data/DemoConfig.java new file mode 100644 index 0000000..6b3f851 --- /dev/null +++ b/src/main/java/de/ph87/data/DemoConfig.java @@ -0,0 +1,14 @@ +package de.ph87.data; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "de.ph87.data.demo") +public class DemoConfig { + + private boolean enabled = false; + +} diff --git a/src/main/java/de/ph87/data/DemoService.java b/src/main/java/de/ph87/data/DemoService.java index 6a8f7d3..c32a415 100644 --- a/src/main/java/de/ph87/data/DemoService.java +++ b/src/main/java/de/ph87/data/DemoService.java @@ -39,9 +39,14 @@ public class DemoService { private final GraphRepository graphRepository; + private final DemoConfig demoConfig; + @Transactional @EventListener(ApplicationReadyEvent.class) public void init() { + if (!demoConfig.isEnabled()) { + return; + } topics(); } diff --git a/src/main/java/de/ph87/data/mqtt/MqttConfig.java b/src/main/java/de/ph87/data/mqtt/MqttConfig.java new file mode 100644 index 0000000..bd6d74c --- /dev/null +++ b/src/main/java/de/ph87/data/mqtt/MqttConfig.java @@ -0,0 +1,18 @@ +package de.ph87.data.mqtt; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties(prefix = "de.ph87.data.mqtt") +public class MqttConfig { + + private String clientId; + + private String uri; + + private String topic = "#"; + +} diff --git a/src/main/java/de/ph87/data/mqtt/MqttService.java b/src/main/java/de/ph87/data/mqtt/MqttService.java index 85193b5..c7ff5e7 100644 --- a/src/main/java/de/ph87/data/mqtt/MqttService.java +++ b/src/main/java/de/ph87/data/mqtt/MqttService.java @@ -6,11 +6,15 @@ import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; import org.springframework.stereotype.Service; +import java.util.UUID; + @Slf4j @Service @RequiredArgsConstructor @@ -20,6 +24,8 @@ public class MqttService { private final Object lock = new Object(); + private final MqttConfig config; + private boolean stop = false; @PostConstruct @@ -50,15 +56,18 @@ public class MqttService { private void connectOnce() throws InterruptedException { MqttClient client = null; try { - log.info("MQTT connecting..."); - client = new MqttClient("tcp://10.0.0.50:1883", "DataDynamic", new MemoryPersistence()); + final boolean cleanSession = config.getClientId() == null || config.getClientId().isEmpty(); + final String clientId = cleanSession ? "Data2025-TMP-" + UUID.randomUUID() : config.getClientId(); + final MqttClientPersistence persistence = cleanSession ? new MemoryPersistence() : new MqttDefaultFilePersistence(); + log.info("MQTT connecting {} as {}{}...", config.getUri(), clientId, cleanSession ? " (CLEAN SESSION)" : ""); + client = new MqttClient(config.getUri(), clientId, persistence); final MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(false); - options.setCleanSession(true); + options.setCleanSession(cleanSession); options.setConnectionTimeout(5); options.setKeepAliveInterval(2); client.connect(options); - client.subscribe("#", (topic, message) -> topicReceiver.receive(new MqttInbound(topic, new String(message.getPayload())))); + client.subscribe(config.getTopic(), 2, (topic, message) -> topicReceiver.receive(new MqttInbound(topic, new String(message.getPayload())))); log.info("MQTT connected."); synchronized (lock) { while (!stop && client.isConnected()) { @@ -69,10 +78,13 @@ public class MqttService { } } } catch (MqttException e) { - log.error("MQTT connection error: {}", e.getMessage()); + log.error("MQTT error.", e); synchronized (lock) { lock.wait(3000); } + } catch (Exception e) { + log.error("Unexpected error connecting MQTT.", e); + System.exit(1); } finally { if (client != null && client.isConnected()) { log.info("MQTT disconnecting...");