# -*- coding: utf-8 -*- """ Config-Tool Anwendung. Verbindet sich per TRXI-Protokoll mit einer laufenden Trixy-Instanz und stellt eine TUI fuer Remote-Verwaltung bereit. """ from __future__ import annotations import asyncio import struct from pathlib import Path from typing import Any from trixy_core.network.protocol import ( TrixyProtocol, ProtocolFlags, ProtocolMessage, MAGIC, MAGIC_LENGTH, COMMAND_HELLO, COMMAND_PING, COMMAND_PONG, ) from trixy_core.network.encryption import TrixyEncryption from trixy_core.network.cmd.config_cmd import ( ConfigConnect, ConfigConnectAccepted, ConfigConnectDenied, ConfigStatusRequest, ConfigStatusResponse, ConfigReadRequest, ConfigReadResponse, ConfigWriteRequest, ConfigWriteResponse, ConfigFieldOptionsRequest, ConfigFieldOptionsResponse, ConfigSatelliteListRequest, ConfigSatelliteListResponse, ConfigSatelliteActionRequest, ConfigSatelliteActionResponse, ConfigSatelliteDetailRequest, ConfigSatelliteDetailResponse, ConfigSatelliteConfigReadRequest, ConfigSatelliteConfigReadResponse, ConfigSatelliteConfigWriteRequest, ConfigSatelliteConfigWriteResponse, ConfigSatelliteAudioRequest, ConfigSatelliteAudioResponse, ConfigMicTestStartRequest, ConfigMicTestStartResponse, ConfigMicTestStopRequest, ConfigMicTestStopResponse, ConfigMicTestAudioData, ConfigPluginListRequest, ConfigPluginListResponse, ConfigPluginActionRequest, ConfigPluginActionResponse, ConfigPluginDetailRequest, ConfigPluginDetailResponse, ConfigPluginConfigWriteRequest, ConfigPluginConfigWriteResponse, ConfigScheduleListRequest, ConfigScheduleListResponse, ConfigScheduleActionRequest, ConfigScheduleActionResponse, ConfigScheduleDetailRequest, ConfigScheduleDetailResponse, ConfigScheduleSaveRequest, ConfigScheduleSaveResponse, ConfigScheduleFormMetaRequest, ConfigScheduleFormMetaResponse, ConfigTrainerListRequest, ConfigTrainerListResponse, ConfigTrainerDetailRequest, ConfigTrainerDetailResponse, ConfigTrainerSettingsWriteRequest, ConfigTrainerSettingsWriteResponse, ConfigTrainerValidateRequest, ConfigTrainerValidateResponse, ConfigTrainerStartRequest, ConfigTrainerStartResponse, ConfigTrainerPauseRequest, ConfigTrainerPauseResponse, ConfigTrainerResumeRequest, ConfigTrainerResumeResponse, ConfigTrainerStopRequest, ConfigTrainerStopResponse, ConfigTrainerProgressRequest, ConfigTrainerProgressResponse, ConfigTrainerActionRequest, ConfigTrainerActionResponse, ) from trixy_core.utils.debug import pinfo, pdebug, perror from trixy_core.utils.version import VERSION_STRING def _load_refresh_interval() -> float | None: """Laedt das Refresh-Intervall fuer die Config-Tool TUI. Prioritaet: 1. Umgebungsvariable TRIXY_CONFIG_REFRESH_INTERVAL 2. config/config_tool_config.json → refresh_interval_seconds 3. None → Default aus TrixyTUI wird verwendet """ import os as _os env_val = _os.environ.get("TRIXY_CONFIG_REFRESH_INTERVAL") if env_val: try: val = float(env_val) if val >= 0.5: return val except ValueError: pass try: import json as _json from pathlib import Path as _Path # Mehrere Pfade probieren — abhaengig von wo das Config-Tool gestartet wird for path in [ _Path("config/config_tool_config.json"), _Path(__file__).parent.parent / "config" / "config_tool_config.json", ]: if path.exists(): with open(path, "r", encoding="utf-8") as f: cfg = _json.load(f) val = float(cfg.get("refresh_interval_seconds", 0)) if val >= 0.5: return val break except Exception: pass return None class ConfigConnection: """ Netzwerk-Verbindung zum ConfigListener einer laufenden Instanz. Verwaltet den Handshake, Request/Response-Korrelation und bietet High-Level-API fuer Status- und Config-Abfragen. """ def __init__( self, host: str = "localhost", port: int = 2105, encryption_key_path: str = "certs/encryption.key", timeout: float = 10.0, ) -> None: self._host = host self._port = port self._encryption_key_path = encryption_key_path self._timeout = timeout self._reader: asyncio.StreamReader | None = None self._writer: asyncio.StreamWriter | None = None self._protocol = TrixyProtocol() self._connected = False self._receive_task: asyncio.Task | None = None # Request/Response-Korrelation self._pending: dict[str, asyncio.Future] = {} self._send_lock = asyncio.Lock() # Verbindungsinformationen self.instance_type: str = "" self.instance_version: str = "" self.hostname: str = "" # Mikrofon-Test Callback self._mic_test_callback: Any = None @property def is_connected(self) -> bool: """Ist die Verbindung aktiv?""" return self._connected @property def connection_info(self) -> str: """Verbindungsinformation fuer Anzeige.""" return f"{self.instance_type}@{self._host}:{self._port}" async def connect(self) -> None: """ Baut die Verbindung zum ConfigListener auf. Raises: ConnectionError: Bei Verbindungsfehlern """ # Verschluesselung laden key_path = Path(self._encryption_key_path) if key_path.exists(): try: encryption = TrixyEncryption.load_key(key_path) self._protocol.set_encryption(encryption) except Exception as e: pdebug(f"Verschluesselung nicht verfuegbar: {e}") # TCP-Verbindung aufbauen try: self._reader, self._writer = await asyncio.wait_for( asyncio.open_connection(self._host, self._port), timeout=self._timeout, ) except (OSError, asyncio.TimeoutError) as e: raise ConnectionError( f"Verbindung zu {self._host}:{self._port} fehlgeschlagen: {e}" ) from e # HELLO senden self._writer.write(COMMAND_HELLO) await self._writer.drain() # ConfigConnect senden connect_msg = ConfigConnect(tool_version=VERSION_STRING) await self._send_message(connect_msg) # Antwort lesen try: response = await asyncio.wait_for( self._read_message(), timeout=self._timeout, ) except asyncio.TimeoutError: await self._close() raise ConnectionError("Timeout beim Handshake") if response is None: await self._close() raise ConnectionError("Keine Antwort beim Handshake") if response.class_name == "ConfigConnectDenied": reason = "" if hasattr(response.data, "reason"): reason = response.data.reason elif isinstance(response.data, dict): reason = response.data.get("reason", "") await self._close() raise ConnectionError(f"Verbindung abgelehnt: {reason}") if response.class_name == "ConfigConnectAccepted": data = response.data if hasattr(data, "instance_type"): self.instance_type = data.instance_type self.instance_version = data.instance_version self.hostname = data.hostname elif isinstance(data, dict): self.instance_type = data.get("instance_type", "") self.instance_version = data.get("instance_version", "") self.hostname = data.get("hostname", "") self._connected = True # Empfangsschleife starten self._receive_task = asyncio.create_task(self._receive_loop()) pinfo(f"Verbunden mit {self.connection_info}") async def disconnect(self) -> None: """Trennt die Verbindung.""" self._connected = False if self._receive_task: self._receive_task.cancel() self._receive_task = None await self._close() async def request_status( self, include_network: bool = True, include_disk: bool = True ) -> ConfigStatusResponse | None: """ Fordert System-Metriken an. Returns: ConfigStatusResponse oder None bei Fehler """ request = ConfigStatusRequest( include_network=include_network, include_disk=include_disk, ) response = await self._request(request) if response and isinstance(response.data, ConfigStatusResponse): return response.data # Fallback: Daten aus ProtocolMessage extrahieren if response: return self._extract_response(response, ConfigStatusResponse) return None async def request_config( self, config_name: str = "", section: str = "" ) -> dict | None: """ Liest die Remote-Konfiguration. Returns: Konfigurationsdaten als Dictionary oder None """ request = ConfigReadRequest( config_name=config_name, section=section, ) response = await self._request(request) if response: data = response.data if isinstance(data, ConfigReadResponse): return data.data elif isinstance(data, dict): return data.get("data", data) elif hasattr(data, "data"): return data.data return None async def write_config( self, config_name: str, key_path: str, value: Any ) -> ConfigWriteResponse | None: """ Schreibt einen Konfigurationswert. Returns: ConfigWriteResponse oder None bei Fehler """ request = ConfigWriteRequest( config_name=config_name, key_path=key_path, value=value, ) response = await self._request(request) if response and isinstance(response.data, ConfigWriteResponse): return response.data if response: return self._extract_response(response, ConfigWriteResponse) return None async def request_satellites(self) -> ConfigSatelliteListResponse | None: """ Fordert die Satellite-Liste vom Server an. Returns: ConfigSatelliteListResponse oder None bei Fehler """ request = ConfigSatelliteListRequest() response = await self._request(request) if response and isinstance(response.data, ConfigSatelliteListResponse): return response.data if response: return self._extract_response(response, ConfigSatelliteListResponse) return None async def satellite_action( self, action: str, satellite_id: str = "", mac_address: str = "", timeout: int = 60, ) -> ConfigSatelliteActionResponse | None: """ Fuehrt eine Satellite-Aktion aus. Args: action: Aktions-Typ satellite_id: Betroffener Satellite mac_address: MAC-Adresse (alternativ) timeout: Timeout fuer Registration Returns: ConfigSatelliteActionResponse oder None """ request = ConfigSatelliteActionRequest( action=action, satellite_id=satellite_id, mac_address=mac_address, timeout=timeout, ) response = await self._request(request) if response and isinstance(response.data, ConfigSatelliteActionResponse): return response.data if response: return self._extract_response(response, ConfigSatelliteActionResponse) return None async def request_satellite_detail( self, satellite_id: str ) -> ConfigSatelliteDetailResponse | None: """ Fordert detaillierte Satellite-Informationen an. Args: satellite_id: ID des Satellites Returns: ConfigSatelliteDetailResponse oder None bei Fehler """ request = ConfigSatelliteDetailRequest(satellite_id=satellite_id) response = await self._request(request) if response and isinstance(response.data, ConfigSatelliteDetailResponse): return response.data if response: return self._extract_response(response, ConfigSatelliteDetailResponse) return None async def request_satellite_config( self, satellite_id: str, config_name: str = "", section: str = "" ) -> dict | None: """ Liest die Konfiguration eines Satellites (via Server-Proxy). Returns: Konfigurationsdaten als Dictionary oder None """ request = ConfigSatelliteConfigReadRequest( satellite_id=satellite_id, config_name=config_name, section=section, ) response = await self._request(request) if response: data = response.data if isinstance(data, ConfigSatelliteConfigReadResponse): if data.error: return None return data.data extracted = self._extract_response(response, ConfigSatelliteConfigReadResponse) if isinstance(extracted, ConfigSatelliteConfigReadResponse): if extracted.error: return None return extracted.data if isinstance(extracted, dict): return extracted.get("data", extracted) return None async def write_satellite_config( self, satellite_id: str, key_path: str, value: Any, config_name: str = "" ) -> ConfigSatelliteConfigWriteResponse | None: """ Schreibt einen Konfigurationswert auf einem Satellite (via Server-Proxy). Returns: ConfigSatelliteConfigWriteResponse oder None """ request = ConfigSatelliteConfigWriteRequest( satellite_id=satellite_id, config_name=config_name, key_path=key_path, value=value, ) response = await self._request(request) if response and isinstance(response.data, ConfigSatelliteConfigWriteResponse): return response.data if response: return self._extract_response(response, ConfigSatelliteConfigWriteResponse) return None async def request_satellite_audio( self, satellite_id: str, action: str = "query", target: str = "output", volume: int = 0, muted: bool = False, speakers: list | None = None, microphones: list | None = None, ) -> ConfigSatelliteAudioResponse | None: """ Sendet eine Audio-Anfrage an einen Satellite (via Server-Proxy). Args: satellite_id: ID des Satellites action: "query" | "set_volume" | "set_mute" | "list_devices" target: "output" | "input" volume: Lautstaerke (0-100) bei action="set_volume" muted: Mute-Status bei action="set_mute" """ request = ConfigSatelliteAudioRequest( satellite_id=satellite_id, action=action, target=target, volume=int(volume), muted=bool(muted), speakers=list(speakers or []), microphones=list(microphones or []), ) response = await self._request(request) if response and isinstance(response.data, ConfigSatelliteAudioResponse): return response.data if response: return self._extract_response(response, ConfigSatelliteAudioResponse) return None async def request_field_options( self, field_sources: list[str] ) -> dict[str, list[tuple[str, str]]] | None: """ Fordert dynamische Feld-Optionen vom Server an. Args: field_sources: Liste von Quell-Bezeichnern (z.B. ["wakeword_models"]) Returns: Dictionary von Quell-Name -> Liste von (wert, anzeige) Tupeln """ request = ConfigFieldOptionsRequest(field_sources=field_sources) response = await self._request(request) if response and isinstance(response.data, ConfigFieldOptionsResponse): return response.data.options if response: extracted = self._extract_response(response, ConfigFieldOptionsResponse) if isinstance(extracted, ConfigFieldOptionsResponse): return extracted.options if isinstance(extracted, dict): return extracted.get("options", {}) return None async def start_mic_test( self, satellite_id: str, audio_callback: Any = None ) -> ConfigMicTestStartResponse | None: """ Startet einen Mikrofon-Test fuer einen Satellite. Args: satellite_id: ID des Satellites audio_callback: Callback(audio_data: bytes) fuer empfangene Audio-Daten Returns: ConfigMicTestStartResponse oder None """ self._mic_test_callback = audio_callback request = ConfigMicTestStartRequest(satellite_id=satellite_id) response = await self._request(request, timeout=15.0) if response and isinstance(response.data, ConfigMicTestStartResponse): if not response.data.success: self._mic_test_callback = None return response.data if response: extracted = self._extract_response(response, ConfigMicTestStartResponse) if isinstance(extracted, ConfigMicTestStartResponse) and not extracted.success: self._mic_test_callback = None return extracted self._mic_test_callback = None return None async def stop_mic_test( self, satellite_id: str ) -> ConfigMicTestStopResponse | None: """ Stoppt einen laufenden Mikrofon-Test. Args: satellite_id: ID des Satellites Returns: ConfigMicTestStopResponse oder None """ self._mic_test_callback = None request = ConfigMicTestStopRequest(satellite_id=satellite_id) response = await self._request(request) if response and isinstance(response.data, ConfigMicTestStopResponse): return response.data if response: return self._extract_response(response, ConfigMicTestStopResponse) return None # ── Plugin-API ── async def request_plugins(self) -> ConfigPluginListResponse | None: """ Fordert die Plugin-Liste vom Server an. Returns: ConfigPluginListResponse oder None bei Fehler """ request = ConfigPluginListRequest() response = await self._request(request) if response and isinstance(response.data, ConfigPluginListResponse): return response.data if response: return self._extract_response(response, ConfigPluginListResponse) return None async def plugin_action( self, action: str, plugin_name: str, params: dict | None = None, ) -> ConfigPluginActionResponse | None: """ Fuehrt eine Plugin-Aktion aus. Standard-Aktionen: enable, disable, reload. Plugin-spezifische Aktionen werden an plugin.handle_config_action() delegiert. Returns: ConfigPluginActionResponse oder None """ request = ConfigPluginActionRequest( action=action, plugin_name=plugin_name, params=params or {}, ) response = await self._request(request) if response and isinstance(response.data, ConfigPluginActionResponse): return response.data if response: return self._extract_response(response, ConfigPluginActionResponse) return None async def request_plugin_detail( self, plugin_name: str ) -> ConfigPluginDetailResponse | None: """ Fordert detaillierte Plugin-Informationen an. Returns: ConfigPluginDetailResponse oder None """ request = ConfigPluginDetailRequest(plugin_name=plugin_name) response = await self._request(request) if response and isinstance(response.data, ConfigPluginDetailResponse): return response.data if response: return self._extract_response(response, ConfigPluginDetailResponse) return None async def write_plugin_config( self, plugin_name: str, key_path: str, value: Any ) -> ConfigPluginConfigWriteResponse | None: """ Schreibt einen Konfigurationswert fuer ein Plugin. Returns: ConfigPluginConfigWriteResponse oder None """ request = ConfigPluginConfigWriteRequest( plugin_name=plugin_name, key_path=key_path, value=value, ) response = await self._request(request) if response and isinstance(response.data, ConfigPluginConfigWriteResponse): return response.data if response: return self._extract_response(response, ConfigPluginConfigWriteResponse) return None # ── Schedule-API ── async def request_schedule_jobs(self) -> ConfigScheduleListResponse | None: """Fordert die Schedule-Job-Liste an.""" request = ConfigScheduleListRequest() response = await self._request(request) if response and isinstance(response.data, ConfigScheduleListResponse): return response.data if response: return self._extract_response(response, ConfigScheduleListResponse) return None async def schedule_action( self, action: str, job_id: str ) -> ConfigScheduleActionResponse | None: """Fuehrt eine Schedule-Job-Aktion aus (enable/disable/delete/run_now).""" request = ConfigScheduleActionRequest(action=action, job_id=job_id) response = await self._request(request) if response and isinstance(response.data, ConfigScheduleActionResponse): return response.data if response: return self._extract_response(response, ConfigScheduleActionResponse) return None async def request_schedule_detail( self, job_id: str ) -> ConfigScheduleDetailResponse | None: """Fordert detaillierte Job-Informationen an.""" request = ConfigScheduleDetailRequest(job_id=job_id) response = await self._request(request) if response and isinstance(response.data, ConfigScheduleDetailResponse): return response.data if response: return self._extract_response(response, ConfigScheduleDetailResponse) return None async def save_schedule_job( self, job_id: str, name: str, description: str, trigger: dict, conditions: list[dict], actions: list[dict], config: dict, ) -> ConfigScheduleSaveResponse | None: """Erstellt oder aktualisiert einen Schedule-Job.""" request = ConfigScheduleSaveRequest( job_id=job_id, name=name, description=description, trigger=trigger, conditions=conditions, actions=actions, config=config, ) response = await self._request(request) if response and isinstance(response.data, ConfigScheduleSaveResponse): return response.data if response: return self._extract_response(response, ConfigScheduleSaveResponse) return None async def request_schedule_form_meta(self) -> ConfigScheduleFormMetaResponse | None: """Fordert Formular-Metadaten fuer Scheduler-Komponenten an.""" request = ConfigScheduleFormMetaRequest() response = await self._request(request) if response and isinstance(response.data, ConfigScheduleFormMetaResponse): return response.data if response: return self._extract_response(response, ConfigScheduleFormMetaResponse) return None # ── Trainer-API ── async def request_trainers(self) -> ConfigTrainerListResponse | None: """Fordert die Trainer-Liste an.""" request = ConfigTrainerListRequest() response = await self._request(request) if response and isinstance(response.data, ConfigTrainerListResponse): return response.data if response: return self._extract_response(response, ConfigTrainerListResponse) return None async def request_trainer_detail( self, trainer_id: str ) -> ConfigTrainerDetailResponse | None: """Fordert detaillierte Trainer-Informationen an.""" request = ConfigTrainerDetailRequest(trainer_id=trainer_id) response = await self._request(request) if response and isinstance(response.data, ConfigTrainerDetailResponse): return response.data if response: return self._extract_response(response, ConfigTrainerDetailResponse) return None async def write_trainer_settings( self, trainer_id: str, settings: dict ) -> ConfigTrainerSettingsWriteResponse | None: """Schreibt Trainer-Einstellungen.""" request = ConfigTrainerSettingsWriteRequest( trainer_id=trainer_id, settings=settings, ) response = await self._request(request) if response and isinstance(response.data, ConfigTrainerSettingsWriteResponse): return response.data if response: return self._extract_response(response, ConfigTrainerSettingsWriteResponse) return None async def validate_trainer( self, trainer_id: str ) -> ConfigTrainerValidateResponse | None: """Validiert einen Trainer.""" request = ConfigTrainerValidateRequest(trainer_id=trainer_id) response = await self._request(request) if response and isinstance(response.data, ConfigTrainerValidateResponse): return response.data if response: return self._extract_response(response, ConfigTrainerValidateResponse) return None async def start_training( self, trainer_id: str ) -> ConfigTrainerStartResponse | None: """Startet ein Training.""" request = ConfigTrainerStartRequest(trainer_id=trainer_id) response = await self._request(request, timeout=30.0) if response and isinstance(response.data, ConfigTrainerStartResponse): return response.data if response: return self._extract_response(response, ConfigTrainerStartResponse) return None async def pause_training(self) -> ConfigTrainerPauseResponse | None: """Pausiert das laufende Training.""" request = ConfigTrainerPauseRequest() response = await self._request(request) if response and isinstance(response.data, ConfigTrainerPauseResponse): return response.data if response: return self._extract_response(response, ConfigTrainerPauseResponse) return None async def resume_training(self) -> ConfigTrainerResumeResponse | None: """Setzt ein pausiertes Training fort.""" request = ConfigTrainerResumeRequest() response = await self._request(request) if response and isinstance(response.data, ConfigTrainerResumeResponse): return response.data if response: return self._extract_response(response, ConfigTrainerResumeResponse) return None async def stop_training(self) -> ConfigTrainerStopResponse | None: """Stoppt das laufende Training.""" request = ConfigTrainerStopRequest() response = await self._request(request) if response and isinstance(response.data, ConfigTrainerStopResponse): return response.data if response: return self._extract_response(response, ConfigTrainerStopResponse) return None async def request_training_progress(self) -> ConfigTrainerProgressResponse | None: """Fordert den aktuellen Trainings-Fortschritt an.""" request = ConfigTrainerProgressRequest() response = await self._request(request) if response and isinstance(response.data, ConfigTrainerProgressResponse): return response.data if response: return self._extract_response(response, ConfigTrainerProgressResponse) return None async def execute_trainer_action( self, trainer_id: str, action: str, params: dict | None = None ) -> ConfigTrainerActionResponse | None: """Fuehrt eine Trainer-Aktion aus.""" request = ConfigTrainerActionRequest( trainer_id=trainer_id, action=action, params=params or {}, ) response = await self._request(request, timeout=120.0) if response and isinstance(response.data, ConfigTrainerActionResponse): return response.data if response: return self._extract_response(response, ConfigTrainerActionResponse) return None def _extract_response(self, message: ProtocolMessage, cls: type) -> Any: """Extrahiert eine typisierte Response aus einer ProtocolMessage.""" data = message.data if isinstance(data, cls): return data if isinstance(data, dict): try: return cls(**{ k: v for k, v in data.items() if k in cls.__dataclass_fields__ }) except Exception: pass if hasattr(data, "__dict__"): try: return cls(**{ k: v for k, v in data.__dict__.items() if k in cls.__dataclass_fields__ }) except Exception: pass return data async def _request( self, message: object, timeout: float = 10.0 ) -> ProtocolMessage | None: """ Sendet eine Anfrage und wartet auf die Antwort. Nutzt message_id fuer Request/Response-Korrelation. """ if not self._connected: return None message_id = getattr(message, "message_id", "") # Future fuer Antwort erstellen future: asyncio.Future[ProtocolMessage] = asyncio.get_event_loop().create_future() self._pending[message_id] = future try: await self._send_message(message) return await asyncio.wait_for(future, timeout=timeout) except asyncio.TimeoutError: return None finally: self._pending.pop(message_id, None) async def _receive_loop(self) -> None: """Empfaengt Nachrichten und loest Pending-Futures auf.""" while self._connected: try: message = await self._read_message() if message is None: break # PING beantworten if message.class_name == "TRXIPING": await self._send_pong() continue # Pong ignorieren if message.class_name == "TRXIPONG": continue # Mikrofon-Test Audio-Daten (unsolicited) if message.class_name == "ConfigMicTestAudioData": if self._mic_test_callback: data = message.data audio = b"" if hasattr(data, "audio_data"): audio = data.audio_data elif isinstance(data, dict): audio = data.get("audio_data", b"") if audio: try: self._mic_test_callback(audio) except Exception: pass continue # Message-ID extrahieren fuer Korrelation msg_id = "" data = message.data if hasattr(data, "message_id"): msg_id = data.message_id elif isinstance(data, dict): msg_id = data.get("message_id", "") # Pending Future aufloesen if msg_id and msg_id in self._pending: future = self._pending.pop(msg_id) if not future.done(): future.set_result(message) elif self._pending: # Fallback: Erstes Pending aufloesen (wenn keine ID) first_key = next(iter(self._pending)) future = self._pending.pop(first_key) if not future.done(): future.set_result(message) except asyncio.CancelledError: break except Exception as e: if self._connected: perror(f"Config-Empfangsfehler: {e}") break self._connected = False async def _read_message(self) -> ProtocolMessage | None: """Liest eine vollstaendige Protokoll-Nachricht.""" if not self._reader: return None try: # Prüfe auf Hard-coded Befehle (8 bytes) peek_data = await self._reader.readexactly(8) if self._protocol.is_hard_command(peek_data): return self._protocol.deserialize(peek_data) # Normaler Header header_size = MAGIC_LENGTH + 2 + 8 + 4 + 16 + 2 remaining_header = await self._reader.readexactly(header_size - 8) header = peek_data + remaining_header if header[:MAGIC_LENGTH] != MAGIC: perror(f"Ungueltige Magic: {header[:MAGIC_LENGTH]}") return None class_name_length = struct.unpack(">H", header[34:36])[0] class_name = await self._reader.readexactly(class_name_length) data_length_bytes = await self._reader.readexactly(4) data_length = struct.unpack(">I", data_length_bytes)[0] data = await self._reader.readexactly(data_length) if data_length > 0 else b"" full_message = header + class_name + data_length_bytes + data return self._protocol.deserialize(full_message) except (ConnectionResetError, asyncio.IncompleteReadError): return None except Exception as e: perror(f"Fehler beim Lesen: {e}") return None async def _send_message( self, message: object, flags: ProtocolFlags = ProtocolFlags.PICKLE ) -> bool: """Sendet eine Nachricht (thread-safe via Lock).""" if not self._writer: return False try: data = self._protocol.serialize(message, flags) async with self._send_lock: self._writer.write(data) await self._writer.drain() return True except Exception as e: perror(f"Sendefehler: {e}") return False async def _send_pong(self) -> None: """Sendet einen Pong.""" if self._writer: try: self._writer.write(COMMAND_PONG) await self._writer.drain() except Exception: pass async def _close(self) -> None: """Schliesst die Verbindung.""" if self._writer: try: self._writer.close() await self._writer.wait_closed() except Exception: pass self._writer = None self._reader = None class ConfigApplication: """ Anwendung fuer das Config-Tool. Baut eine Verbindung zu einer laufenden Trixy-Instanz auf und startet die Textual-TUI fuer Remote-Verwaltung. Hinweis: Erbt NICHT von IApplication, da die Hauptschleife von Textual gesteuert wird (nicht von asyncio.Event.wait). """ def __init__( self, host: str = "localhost", port: int = 2105, encryption_key_path: str = "certs/encryption.key", config_path: str = "config/config_tool_config.json", ) -> None: self._host = host self._port = port self._encryption_key_path = encryption_key_path self._config_path = config_path self._connection: ConfigConnection | None = None async def run(self) -> None: """Startet die Config-Tool-Anwendung.""" # Verbindung aufbauen self._connection = ConfigConnection( host=self._host, port=self._port, encryption_key_path=self._encryption_key_path, ) try: await self._connection.connect() except ConnectionError as e: perror(str(e)) raise # TUI-Views erstellen from trixy_core.tui.views.health_view import HealthView from trixy_core.tui.views.config_view import ConfigView from trixy_core.tui.views.log_view import LogView from trixy_core.tui.views.sat_info_view import SatInfoView from trixy_core.tui.views.sat_connection_view import SatConnectionView from trixy_core.tui.views.sat_conversation_view import SatConversationView from trixy_core.tui.views.sat_config_view import SatConfigView from trixy_core.tui.views.sat_updates_view import SatUpdatesView from trixy_core.tui.views.sat_audio_view import SatAudioView from trixy_core.tui.views.plugins_view import PluginsView from trixy_core.tui.views.plugin_info_view import PluginInfoView from trixy_core.tui.views.plugin_config_view import PluginConfigView from trixy_core.tui.views.schedule_view import ScheduleView from trixy_core.tui.views.schedule_info_view import ScheduleInfoView from trixy_core.tui.views.schedule_trigger_view import ScheduleTriggerView from trixy_core.tui.views.schedule_condition_view import ScheduleConditionView from trixy_core.tui.views.schedule_action_view import ScheduleActionView from trixy_core.tui.views.trainer_view import TrainerView from trixy_core.tui.views.trainer_info_view import TrainerInfoView from trixy_core.tui.views.trainer_settings_view import TrainerSettingsView from trixy_core.tui.views.trainer_dataset_view import TrainerDatasetView from trixy_core.tui.views.trainer_optional_view import TrainerOptionalView from trixy_core.tui.views.trainer_validate_view import TrainerValidateView from trixy_core.tui.views.trainer_training_view import TrainerTrainingView conn = self._connection views = [ HealthView(connection=conn), ConfigView(connection=conn), LogView(connection=conn), ] # Satellite-SubViews (fuer beide Modi benoetigt) sat_subviews = [ SatInfoView(connection=conn), SatConnectionView(connection=conn), SatConversationView(connection=conn), SatConfigView(connection=conn), SatUpdatesView(connection=conn), SatAudioView(connection=conn), ] # Plugin-SubViews plugin_subviews = [ PluginInfoView(connection=conn), PluginConfigView(connection=conn), ] # Schedule-SubViews schedule_subviews = [ ScheduleInfoView(connection=conn), ScheduleTriggerView(connection=conn), ScheduleConditionView(connection=conn), ScheduleActionView(connection=conn), ] # Trainer-SubViews trainer_subviews = [ TrainerInfoView(connection=conn), TrainerSettingsView(connection=conn), TrainerDatasetView(connection=conn), TrainerOptionalView(connection=conn), TrainerValidateView(connection=conn), TrainerTrainingView(connection=conn), ] # F3: Satellite-Views, F4: Plugin-Views je nach Instanz-Typ instance_type = self._connection.instance_type.lower() if instance_type == "server": from trixy_core.tui.views.satellites_view import SatellitesView views.insert(2, SatellitesView(connection=conn)) views.insert(3, PluginsView(connection=conn)) views.insert(4, ScheduleView(connection=conn)) views.insert(5, TrainerView(connection=conn)) views.extend(sat_subviews) views.extend(plugin_subviews) views.extend(schedule_subviews) views.extend(trainer_subviews) elif instance_type in ("client", "satellite"): # Client-Modus: SubViews direkt als Haupt-Navigation views.extend(sat_subviews) elif instance_type == "standalone": # Standalone: Plugins und Scheduler verfuegbar, keine Satellites views.insert(2, PluginsView(connection=conn)) views.insert(3, ScheduleView(connection=conn)) views.insert(4, TrainerView(connection=conn)) views.extend(plugin_subviews) views.extend(schedule_subviews) views.extend(trainer_subviews) # TUI erstellen und Verbindung injizieren from trixy_core.tui.app import TrixyTUI # Refresh-Intervall aus Config-Datei oder ENV lesen refresh_interval = _load_refresh_interval() tui = TrixyTUI( views=views, connection_info=self._connection.connection_info, refresh_interval=refresh_interval, ) tui.set_connection(self._connection) try: await tui.run_async() finally: await self._connection.disconnect() pinfo("Config-Tool beendet")