diff --git a/application.properties b/application.properties index b0f4e3d..857de87 100644 --- a/application.properties +++ b/application.properties @@ -6,3 +6,6 @@ spring.datasource.username=sa spring.datasource.password=password #- #spring.jpa.hibernate.ddl-auto=create +#- +de.ph87.data.message.receive.mqtt.host=10.0.0.50 +de.ph87.data.message.receive.mqtt.topic=# diff --git a/src/main/java/de/ph87/data/message/receive/mqtt/MqttConfig.java b/src/main/java/de/ph87/data/message/receive/mqtt/MqttConfig.java new file mode 100644 index 0000000..fb55d66 --- /dev/null +++ b/src/main/java/de/ph87/data/message/receive/mqtt/MqttConfig.java @@ -0,0 +1,26 @@ +package de.ph87.data.message.receive.mqtt; + +import lombok.*; +import org.springframework.boot.context.properties.*; +import org.springframework.stereotype.*; + +@Data +@Component +@ConfigurationProperties(prefix = "de.ph87.data.message.receive.mqtt") +public class MqttConfig { + + private String host; + + private int port = 1883; + + private String topic; + + private String clientId; + + private int connectTimeoutSec = 3; + + private long retrySec = 3; + + private int connectKeepAliveSec = 3; + +} diff --git a/src/main/java/de/ph87/data/message/receive/mqtt/MqttReceiver.java b/src/main/java/de/ph87/data/message/receive/mqtt/MqttReceiver.java new file mode 100644 index 0000000..0a64539 --- /dev/null +++ b/src/main/java/de/ph87/data/message/receive/mqtt/MqttReceiver.java @@ -0,0 +1,115 @@ +package de.ph87.data.message.receive.mqtt; + +import jakarta.annotation.*; +import lombok.*; +import lombok.extern.slf4j.*; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.*; +import org.springframework.boot.context.event.*; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.*; + +import java.nio.charset.*; +import java.util.*; + +@Slf4j +@Service +@RequiredArgsConstructor +public class MqttReceiver { + + private final MqttConfig config; + + @Nullable + private MqttClient client; + + private final Thread thread = new Thread(this::run, "MQTT-WATCH"); + + private boolean stop = false; + + @EventListener(ApplicationStartedEvent.class) + public void startup() { + thread.start(); + } + + @PreDestroy + public void preDestroy() { + synchronized (thread) { + stop = true; + thread.notifyAll(); + log.debug("stopping..."); + } + } + + private void run() { + try { + log.debug("started"); + while (!stop) { + synchronized (thread) { + if (client == null || !client.isConnected()) { + _connect(); + } else { + thread.wait(); + } + } + } + } catch (InterruptedException e) { + log.error("Interrupted"); + } finally { + log.debug("terminating..."); + _disconnect(); + log.debug("terminated"); + } + } + + private void _connect() throws InterruptedException { + final String clientId; + final boolean cleanSession; + final MqttClientPersistence persistence; + final String uri = "tcp://%s:%d".formatted(config.getHost(), config.getPort()); + if (config.getClientId() != null && !config.getClientId().isEmpty()) { + clientId = config.getClientId(); + cleanSession = false; + persistence = new MqttDefaultFilePersistence("./data"); + } else { + clientId = "DATA_TMP_" + UUID.randomUUID(); + cleanSession = true; + persistence = new MemoryPersistence(); + } + try { + log.info("Connecting mqtt broker: {} as \"{}\"{}", uri, clientId, cleanSession ? " [CLEAN SESSION]" : ""); + client = new MqttClient(uri, clientId, persistence); + final MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(cleanSession); + options.setConnectionTimeout(config.getConnectTimeoutSec()); + client.connect(options); + log.info("MQTT broker connected."); + client.subscribe(config.getTopic(), this::_receive); + } catch (MqttException e) { + log.error("Failed to connect MQTT broker: {}", e.getMessage()); + client = null; + synchronized (thread) { + log.debug("Retrying in {} seconds...", config.getRetrySec()); + thread.wait(config.getRetrySec() * 1000); + } + } + } + + private void _disconnect() { + if (client != null && client.isConnected()) { + try { + client.disconnect(); + } catch (MqttException e) { + log.error(e.toString()); + } + } + } + + private void _receive(final String topic, final MqttMessage mqttMessage) { + Thread.currentThread().setName("MQTT-RECEIVE"); + final String payload = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8); + final String payloadLoggable = payload.replace("\n", "\\n").replace("\r", "\\r"); + log.debug("received: topic={}, message={}", topic, payloadLoggable); + } + +}