FIX: MqttReceiver auto reconnect

This commit is contained in:
Patrick Haßel 2025-02-26 11:08:33 +01:00
parent 0e9b99e7e4
commit 483331f6df

View File

@ -81,11 +81,16 @@ public class MqttReceiver {
try { try {
log.info("Connecting mqtt broker: {} as \"{}\"{}", uri, clientId, cleanSession ? " [CLEAN SESSION]" : ""); log.info("Connecting mqtt broker: {} as \"{}\"{}", uri, clientId, cleanSession ? " [CLEAN SESSION]" : "");
client = new MqttClient(uri, clientId, persistence); client = new MqttClient(uri, clientId, persistence);
client.setCallback(new MyMqttCallback());
final MqttConnectOptions options = new MqttConnectOptions(); final MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true); options.setAutomaticReconnect(true);
options.setCleanSession(cleanSession); options.setCleanSession(cleanSession);
options.setConnectionTimeout(config.getConnectTimeoutSec()); options.setConnectionTimeout(config.getConnectTimeoutSec());
options.setKeepAliveInterval(config.getConnectKeepAliveSec());
assert client != null;
client.connect(options); client.connect(options);
log.info("MQTT broker connected."); log.info("MQTT broker connected.");
client.subscribe(config.getTopic(), this::_receive); client.subscribe(config.getTopic(), this::_receive);
} catch (MqttException e) { } catch (MqttException e) {
@ -116,4 +121,26 @@ public class MqttReceiver {
messageService.handle(message); messageService.handle(message);
} }
private class MyMqttCallback implements MqttCallback {
@Override
public void connectionLost(final Throwable throwable) {
synchronized (thread) {
log.warn("MQTT broker disconnected.");
thread.notifyAll();
}
}
@Override
public void messageArrived(final String s, final MqttMessage mqttMessage) {
}
@Override
public void deliveryComplete(final IMqttDeliveryToken iMqttDeliveryToken) {
}
}
} }