import os.path from typing import Callable from qgis._core import QgsVectorLayer, QgsProject, QgsLayerTreeLayer, QgsPalLayerSettings, QgsVectorLayerSimpleLabeling from .pegelonline_dockwidget import PegelonlineDockWidget from .pomodules.poqgscurrentw import PoQgsCurrentW from .pomodules.poqgsstations import PoQgsStations INFIX = ", ' - ', " # noinspection PyMethodMayBeStatic class PoRunner(object): def __init__(self, ui: PegelonlineDockWidget, iface): self.ui: PegelonlineDockWidget = ui self.iface = iface self.local_dir = os.path.dirname(os.path.realpath(__file__)) # Layer Variablen self.stations = None self.waterlevels = None self.lines = None self.areas = None self.connect_basemap_signals() self.connect_stations_signals() self.connect_waterlevels_signals() # basemap ----------------------------------------------------------------- def disconnectStations(self): print("disconnectStations") self.stations = None self.ui.cbStationsVisible.setChecked(False) # basemap signals --------------------------------------------------------- def _basemapCreate(self, path, name, disconnect: Callable[[], None]) -> None | QgsVectorLayer: print("_basemapCreate: %s" % (name,)) path = os.path.join(self.local_dir, "basemap", path) basemap = QgsVectorLayer(path, name, "ogr") if not basemap.isValid(): print("_basemapCreate: QgsVectorLayer nicht gültig: path=%s, name=%s" % (path, name)) return None # disconnect setzen # noinspection PyUnresolvedReferences basemap.willBeDeleted.connect(disconnect) # zur Instanz hinzufügen QgsProject.instance().addMapLayer(basemap, False) # zum LayerTree hinzufügen layer_tree = self.iface.layerTreeCanvasBridge().rootGroup() layer_tree.insertChildNode(-1, QgsLayerTreeLayer(basemap)) return basemap def connect_basemap_signals(self): print("connect_basemap_signals") self.ui.cbBasemapLines.toggled.connect(self.cbBasemapLinesToggled) self.ui.cbBasemapAreas.toggled.connect(self.cbBasemapAreasToggled) def cbBasemapLinesToggled(self): checked = self.ui.cbBasemapLines.isChecked() print("cbBasemapLinesToggled: %s" % (checked,)) if self.lines is None and checked: self.lines = self._basemapCreate("waters.gpkg|layername=water_l", "Flüsse", self.disconnectBasemapLines) if self.lines is not None: self._layerSetVisible(self.lines, checked) self._layerRefresh(self.lines) def cbBasemapAreasToggled(self): checked = self.ui.cbBasemapAreas.isChecked() print("cbBasemapAreasToggled: %s" % (checked,)) if self.areas is None and checked: self.areas = self._basemapCreate("waters.gpkg|layername=water_f", "Flächen", self.disconnectBasemapAreas) if self.areas is not None: self._layerSetVisible(self.areas, checked) self._layerRefresh(self.areas) def disconnectBasemapLines(self): print("disconnectBasemapLines") self.lines = None self.ui.cbBasemapLines.setChecked(False) def disconnectBasemapAreas(self): print("disconnectBasemapAreas") self.areas = None self.ui.cbBasemapAreas.setChecked(False) # stations ---------------------------------------------------------------- # noinspection DuplicatedCode def _stationsUpdateLabeling(self): print("_stationsUpdateLabeling") if self.stations is None: return fields = [] if self.ui.cbStationsName.isChecked(): fields.append("shortname") if self.ui.cbStationsNumber.isChecked(): fields.append("number") if self.ui.cbStationsAgency.isChecked(): fields.append("agency") if self.ui.cbStationsKm.isChecked(): fields.append("km") if self.ui.cbStationsWater.isChecked(): fields.append("water") self._layerUpdateLabeling(self.stations, fields) # stations signals -------------------------------------------------------- def connect_stations_signals(self): print("connect_stations_signals") # noinspection DuplicatedCode self.ui.cbStationsVisible.toggled.connect(self.cbStationsVisibleToggled) self.ui.cbStationsName.toggled.connect(self.cbStationsNameToggled) self.ui.cbStationsNumber.toggled.connect(self.cbStationsNumberToggled) self.ui.cbStationsAgency.toggled.connect(self.cbStationsAgencyToggled) self.ui.cbStationsKm.toggled.connect(self.cbStationsKmToggled) self.ui.cbStationsWater.toggled.connect(self.cbStationsWaterToggled) def cbStationsVisibleToggled(self): visible = self.ui.cbStationsVisible.isChecked() print("cbStationsVisibleToggled: %s" % (visible,)) if self.stations is None and visible: reader = PoQgsStations() features = reader.getStationsFeatures() self.stations = self._layerFromReader(reader.fields, reader.crs, features, "Stationen") self._layerAdd(self.stations, "styles/label_stations.qml", self.disconnectStations) if self.stations is not None: self._layerSetVisible(self.stations, visible) self._stationsUpdateLabeling() def cbStationsNameToggled(self): checked = self.ui.cbStationsName.isChecked() print("cbStationsNameToggled: %s" % (checked,)) self._stationsUpdateLabeling() def cbStationsNumberToggled(self): checked = self.ui.cbStationsNumber.isChecked() print("cbStationsNumberToggled: %s" % (checked,)) self._stationsUpdateLabeling() def cbStationsAgencyToggled(self): checked = self.ui.cbStationsAgency.isChecked() print("cbStationsAgencyToggled: %s" % (checked,)) self._stationsUpdateLabeling() def cbStationsKmToggled(self): checked = self.ui.cbStationsKm.isChecked() print("cbStationsKmToggled: %s" % (checked,)) self._stationsUpdateLabeling() def cbStationsWaterToggled(self): checked = self.ui.cbStationsWater.isChecked() print("cbStationsWaterToggled: %s" % (checked,)) self._stationsUpdateLabeling() # waterlevels ------------------------------------------------------------- # noinspection DuplicatedCode def _waterlevelsUpdateLabeling(self): if self.waterlevels is None: return fields = [] if self.ui.cbWaterlevelsName.isChecked(): fields.append("shortname") if self.ui.cbWaterlevelsNumber.isChecked(): fields.append("number") if self.ui.cbWaterlevelsAgency.isChecked(): fields.append("agency") if self.ui.cbWaterlevelsTimestamp.isChecked(): fields.append("timestamp") if self.ui.cbWaterlevelsValue.isChecked(): fields.append("value") if self.ui.cbWaterlevelsMean.isChecked(): fields.append("stateMnwMhw") if self.ui.cbWaterlevelsAbsolute.isChecked(): fields.append("stateNswHsw") self._layerUpdateLabeling(self.waterlevels, fields) # waterlevels signals ----------------------------------------------------- def connect_waterlevels_signals(self): print("connect_waterlevels_signals") # noinspection DuplicatedCode self.ui.cbWaterlevelsVisible.toggled.connect(self.cbWaterlevelsVisibleToggled) self.ui.cbWaterlevelsName.toggled.connect(self.cbWaterlevelsNameToggled) self.ui.cbWaterlevelsNumber.toggled.connect(self.cbWaterlevelsNumberToggled) self.ui.cbWaterlevelsAgency.toggled.connect(self.cbWaterlevelsAgencyToggled) self.ui.cbWaterlevelsTimestamp.toggled.connect(self.cbWaterlevelsTimestampToggled) self.ui.cbWaterlevelsValue.toggled.connect(self.cbWaterlevelsValueToggled) self.ui.cbWaterlevelsMean.toggled.connect(self.cbWaterlevelsMeanToggled) self.ui.cbWaterlevelsAbsolute.toggled.connect(self.cbWaterlevelsAbsoluteToggled) def cbWaterlevelsVisibleToggled(self): visible = self.ui.cbWaterlevelsVisible.isChecked() print("cbWaterlevelsVisibleToggled: %s" % (visible,)) if self.waterlevels is None and visible: reader = PoQgsCurrentW() features = reader.getCurrentWFeatures() self.waterlevels = self._layerFromReader(reader.fields, reader.crs, features, "Wasserstandinformationen") self._layerAdd(self.waterlevels, "styles/label_currentw.qml", self.disconnectWaterlevels) if self.waterlevels is not None: self._layerSetVisible(self.waterlevels, visible) self._waterlevelsUpdateLabeling() def cbWaterlevelsNameToggled(self): checked = self.ui.cbWaterlevelsName.isChecked() print("cbWaterlevelsNameName: %s" % (checked,)) self._waterlevelsUpdateLabeling() def cbWaterlevelsNumberToggled(self): checked = self.ui.cbWaterlevelsNumber.isChecked() print("cbWaterlevelsNameNumber: %s" % (checked,)) self._waterlevelsUpdateLabeling() def cbWaterlevelsAgencyToggled(self): checked = self.ui.cbWaterlevelsAgency.isChecked() print("cbWaterlevelsNameAgency: %s" % (checked,)) self._waterlevelsUpdateLabeling() def cbWaterlevelsTimestampToggled(self): checked = self.ui.cbWaterlevelsTimestamp.isChecked() print("cbWaterlevelsTimestampToggled: %s" % (checked,)) self._waterlevelsUpdateLabeling() def cbWaterlevelsValueToggled(self): checked = self.ui.cbWaterlevelsValue.isChecked() print("cbWaterlevelsValueToggled: %s" % (checked,)) self._waterlevelsUpdateLabeling() def cbWaterlevelsMeanToggled(self): checked = self.ui.cbWaterlevelsMean.isChecked() print("cbWaterlevelsMeanToggled: %s" % (checked,)) self._waterlevelsUpdateLabeling() def cbWaterlevelsAbsoluteToggled(self): checked = self.ui.cbWaterlevelsAbsolute.isChecked() print("cbWaterlevelsAbsoluteToggled: %s" % (checked,)) self._waterlevelsUpdateLabeling() def disconnectWaterlevels(self): print("disconnectWaterlevels") self.waterlevels = None self.ui.cbWaterlevelsVisible.setChecked(False) # layers ------------------------------------------------------------------ def _layerSetVisible(self, basemap: QgsVectorLayer, visible): print("_layerSetVisible: %s => %s" % (basemap.name, visible)) layer_tree = QgsProject.instance().layerTreeRoot().findLayer(basemap.id()) layer_tree.setItemVisibilityChecked(visible) def _layerFromReader(self, fields, crs, features, title) -> None | QgsVectorLayer: print("_layerFromReader") if features is None: return None layer_path = "Point?crs=%s" % (crs.authid(),) print("layer_path: " + layer_path) layer = QgsVectorLayer(layer_path, title, "memory") provider = layer.dataProvider() provider.addAttributes(fields) layer.updateFields() provider.addFeatures(features) for error in provider.errors(): print("Fehler beim Hinzufügen von Features: " + error) layer.updateExtents() layer.commitChanges() if layer.isValid(): return layer return None def _layerAdd(self, layer: QgsVectorLayer, styles_path: str, disconnect: Callable[[], None]): print("_layerShow") if layer is None: print("_layerShow: Übergebener Layer ist None") return # Styles laden layer.loadNamedStyle(os.path.join(self.local_dir, styles_path)) # disconnect setzen layer.willBeDeleted.connect(disconnect) # zur Instanz hinzufügen QgsProject.instance().addMapLayer(layer, False) # zum LayerTree hinzufügen layer_tree = self.iface.layerTreeCanvasBridge().rootGroup() layer_tree.insertChildNode(0, QgsLayerTreeLayer(layer)) def _layerUpdateLabeling(self, layer, fields): labeling = QgsVectorLayerSimpleLabeling(QgsPalLayerSettings()) # Anführungszeichen um Feldnamen anbringen fields_quoted = ['"%s"' % (field,) for field in fields] # Feldnamen zu einem Minus-getrennten String zusammenbauen expression = ", ' - ', ".join(fields_quoted) settings = labeling.settings() settings.fieldName = "concat(" + expression + ")" settings.isExpression = True layer.setLabeling(QgsVectorLayerSimpleLabeling(settings)) layer.setLabelsEnabled(True) self._layerRefresh(layer) def _layerRefresh(self, layer): print("_layerRefresh") if self.iface.mapCanvas().isCachingEnabled(): layer.triggerRepaint() else: self.iface.mapCanvas().refresh()