This commit is contained in:
Patrick Haßel 2025-02-15 16:40:03 +01:00
parent 49759832e1
commit 147bff7927
3 changed files with 144 additions and 0 deletions

View File

@ -6,3 +6,6 @@ spring.datasource.username=sa
spring.datasource.password=password spring.datasource.password=password
#- #-
#spring.jpa.hibernate.ddl-auto=create #spring.jpa.hibernate.ddl-auto=create
#-
de.ph87.data.message.receive.mqtt.host=10.0.0.50
de.ph87.data.message.receive.mqtt.topic=#

View File

@ -0,0 +1,26 @@
package de.ph87.data.message.receive.mqtt;
import lombok.*;
import org.springframework.boot.context.properties.*;
import org.springframework.stereotype.*;
@Data
@Component
@ConfigurationProperties(prefix = "de.ph87.data.message.receive.mqtt")
public class MqttConfig {
private String host;
private int port = 1883;
private String topic;
private String clientId;
private int connectTimeoutSec = 3;
private long retrySec = 3;
private int connectKeepAliveSec = 3;
}

View File

@ -0,0 +1,115 @@
package de.ph87.data.message.receive.mqtt;
import jakarta.annotation.*;
import lombok.*;
import lombok.extern.slf4j.*;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.*;
import org.springframework.boot.context.event.*;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.*;
import java.nio.charset.*;
import java.util.*;
@Slf4j
@Service
@RequiredArgsConstructor
public class MqttReceiver {
private final MqttConfig config;
@Nullable
private MqttClient client;
private final Thread thread = new Thread(this::run, "MQTT-WATCH");
private boolean stop = false;
@EventListener(ApplicationStartedEvent.class)
public void startup() {
thread.start();
}
@PreDestroy
public void preDestroy() {
synchronized (thread) {
stop = true;
thread.notifyAll();
log.debug("stopping...");
}
}
private void run() {
try {
log.debug("started");
while (!stop) {
synchronized (thread) {
if (client == null || !client.isConnected()) {
_connect();
} else {
thread.wait();
}
}
}
} catch (InterruptedException e) {
log.error("Interrupted");
} finally {
log.debug("terminating...");
_disconnect();
log.debug("terminated");
}
}
private void _connect() throws InterruptedException {
final String clientId;
final boolean cleanSession;
final MqttClientPersistence persistence;
final String uri = "tcp://%s:%d".formatted(config.getHost(), config.getPort());
if (config.getClientId() != null && !config.getClientId().isEmpty()) {
clientId = config.getClientId();
cleanSession = false;
persistence = new MqttDefaultFilePersistence("./data");
} else {
clientId = "DATA_TMP_" + UUID.randomUUID();
cleanSession = true;
persistence = new MemoryPersistence();
}
try {
log.info("Connecting mqtt broker: {} as \"{}\"{}", uri, clientId, cleanSession ? " [CLEAN SESSION]" : "");
client = new MqttClient(uri, clientId, persistence);
final MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(cleanSession);
options.setConnectionTimeout(config.getConnectTimeoutSec());
client.connect(options);
log.info("MQTT broker connected.");
client.subscribe(config.getTopic(), this::_receive);
} catch (MqttException e) {
log.error("Failed to connect MQTT broker: {}", e.getMessage());
client = null;
synchronized (thread) {
log.debug("Retrying in {} seconds...", config.getRetrySec());
thread.wait(config.getRetrySec() * 1000);
}
}
}
private void _disconnect() {
if (client != null && client.isConnected()) {
try {
client.disconnect();
} catch (MqttException e) {
log.error(e.toString());
}
}
}
private void _receive(final String topic, final MqttMessage mqttMessage) {
Thread.currentThread().setName("MQTT-RECEIVE");
final String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
final String payloadLoggable = payload.replace("\n", "\\n").replace("\r", "\\r");
log.debug("received: topic={}, message={}", topic, payloadLoggable);
}
}