diff --git a/src/main/java/de/ph87/data/message/receive/mqtt/MqttReceiver.java b/src/main/java/de/ph87/data/message/receive/mqtt/MqttReceiver.java index 5d8ac56..31ecb67 100644 --- a/src/main/java/de/ph87/data/message/receive/mqtt/MqttReceiver.java +++ b/src/main/java/de/ph87/data/message/receive/mqtt/MqttReceiver.java @@ -81,11 +81,16 @@ public class MqttReceiver { try { log.info("Connecting mqtt broker: {} as \"{}\"{}", uri, clientId, cleanSession ? " [CLEAN SESSION]" : ""); client = new MqttClient(uri, clientId, persistence); + client.setCallback(new MyMqttCallback()); + final MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(cleanSession); options.setConnectionTimeout(config.getConnectTimeoutSec()); + options.setKeepAliveInterval(config.getConnectKeepAliveSec()); + assert client != null; client.connect(options); + log.info("MQTT broker connected."); client.subscribe(config.getTopic(), this::_receive); } catch (MqttException e) { @@ -116,4 +121,26 @@ public class MqttReceiver { messageService.handle(message); } + private class MyMqttCallback implements MqttCallback { + + @Override + public void connectionLost(final Throwable throwable) { + synchronized (thread) { + log.warn("MQTT broker disconnected."); + thread.notifyAll(); + } + } + + @Override + public void messageArrived(final String s, final MqttMessage mqttMessage) { + + } + + @Override + public void deliveryComplete(final IMqttDeliveryToken iMqttDeliveryToken) { + + } + + } + }