# -*- coding: utf-8 -*- """ Intent Dispatcher Service. Zentrale Komponente die Intent-Events verarbeitet und den Handler-Flow koordiniert. Event-Flow: intent_received ↓ [Intent Dispatcher] ├── Handler finden und aufrufen ├── System-Intents direkt behandeln ↓ intent_handled ↓ [Falls keine response_text und nicht suppress_response] ↓ create_output_text """ import time from typing import Any, TYPE_CHECKING from trixy_core.service.iservice import IService from trixy_core.service.enums import ServicePriority, ServiceGroup from trixy_core.events.decorators import TrixyEvent from trixy_core.events.event_data.basic import ( IntentReceived, IntentHandled, CreateOutputText, OutputTextCreated, FollowUpExpected, ) from trixy_core.nlp.intent_registry import IntentRegistry from trixy_core.nlp.system_intents import is_system_intent, get_system_intent_info, is_admin_intent from trixy_core.nlp.handler import IntentReceivedData from trixy_core.nlp.decorators import INTENT_METADATA_ATTR from trixy_core.utils.debug import pinfo, pdebug, perror import logging from pathlib import Path # Separater Logger fuer Admin-Audit-Log _admin_logger: logging.Logger | None = None def _get_admin_logger() -> logging.Logger: """Erstellt/gibt den Admin-Audit-Logger zurueck.""" global _admin_logger if _admin_logger is None: _admin_logger = logging.getLogger("trixy.admin_audit") _admin_logger.setLevel(logging.INFO) _admin_logger.propagate = False log_dir = Path("logs") log_dir.mkdir(exist_ok=True) handler = logging.FileHandler(log_dir / "admin_audit.log", encoding="utf-8") handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) _admin_logger.addHandler(handler) return _admin_logger if TYPE_CHECKING: from trixy_core.application import IApplication class IntentDispatcherService(IService): """ Service der Intent-Events verarbeitet. Verantwortlich für: 1. Handler-Aufruf für intent_received 2. Emission von intent_handled 3. Emission von create_output_text (falls nötig) 4. System-Intent Behandlung 5. TTS-Request für output_text_created """ PRIORITY = ServicePriority.MANAGER GROUP = ServiceGroup.CONVERSATION DEPENDENCIES: list[str] = [] NAME = "IntentDispatcher" def __init__(self, application: "IApplication") -> None: super().__init__(application) self._admin_sessions: dict[str, dict[str, Any]] = {} # Tracking für ausstehende Follow-ups: request_id → session_info self._pending_followups: dict[str, dict[str, Any]] = {} # Aktive Follow-Up-Kontexte: satellite_id → Follow-Up-Info self._active_followups: dict[str, dict[str, Any]] = {} async def start(self) -> None: """Startet den Service.""" pinfo("IntentDispatcher gestartet") async def stop(self) -> None: """Stoppt den Service.""" pdebug("IntentDispatcher gestoppt") # ========================================================================= # Event: intent_received → intent_handled # ========================================================================= @TrixyEvent(["intent_received"]) async def on_intent_received(self, event_name: str, event_data: IntentReceived) -> None: """ Verarbeitet erkannte Intents. 1. Findet passenden Handler 2. Ruft Handler auf 3. Emittiert intent_handled 4. Emittiert ggf. create_output_text """ pinfo(f"[DISPATCHER] intent_received: '{event_data.intent}' (confidence={event_data.confidence:.2f}, text='{event_data.original_text[:50]}')") # System-Intents direkt behandeln if is_system_intent(event_data.intent): pinfo(f"[DISPATCHER] System-Intent erkannt: '{event_data.intent}'") # Admin-System-Intents loggen if is_admin_intent(event_data.intent): _get_admin_logger().info( "SYSTEM intent='%s' satellite='%s' room='%s' " "wakeword='%s' text='%s'", event_data.intent, event_data.satellite_id, event_data.room_id, event_data.wakeword_type, event_data.original_text, ) await self._handle_system_intent(event_data) return # Follow-Up-Validierung: Ist eine Rueckfrage aktiv? followup_ctx = self._active_followups.get(event_data.satellite_id) if followup_ctx: import time as _time # Timeout: Follow-Up nach 60s ablaufen lassen created_at = followup_ctx.get("created_at", 0) if created_at and (_time.time() - created_at) > 60: pdebug(f"[DISPATCHER] Follow-Up abgelaufen (>60s)") del self._active_followups[event_data.satellite_id] followup_ctx = None # Wenn der Classifier einen klaren anderen Intent erkannt hat # (nicht "unknown" und hohe Confidence), Follow-Up ignorieren elif (event_data.intent != "unknown" and event_data.confidence >= 0.8 and event_data.intent != followup_ctx.get("follow_up_intent") and event_data.intent != followup_ctx.get("original_intent")): pinfo( f"[DISPATCHER] Follow-Up uebersprungen — " f"neuer Intent '{event_data.intent}' (conf={event_data.confidence:.2f}) " f"hat Vorrang vor Follow-Up '{followup_ctx.get('follow_up_intent')}'" ) del self._active_followups[event_data.satellite_id] followup_ctx = None if followup_ctx: validated = self._validate_followup_response( event_data, followup_ctx, ) if validated is not None: # Antwort validiert — Intent und Slots anpassen event_data.intent = followup_ctx["follow_up_intent"] event_data.slots = {**followup_ctx.get("data", {}), **validated} # Follow-Up-Kontext loeschen del self._active_followups[event_data.satellite_id] pdebug(f"[DISPATCHER] Follow-Up validiert: {event_data.intent}") elif validated is None and followup_ctx.get("valid_responses"): # Ungueltige Antwort — Retry retry_text = followup_ctx.get("retry_text", "") if retry_text: pinfo(f"[DISPATCHER] Follow-Up: Ungueltige Antwort, Retry") output_event = OutputTextCreated( satellite_id=event_data.satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, text=retry_text, intent=followup_ctx.get("original_intent", ""), is_followup=True, expects_response=True, ) await self._application.events.trigger("output_text_created", output_event) return # Plugin-Handler suchen (mit Suffix-Fallback fuer LLM-verkuerzte Namen) registry = IntentRegistry.get_instance() resolved = registry.resolve_intent(event_data.intent) handler = resolved.handler if resolved else None # Falls Suffix-Match, den aufgeloesten Namen verwenden if resolved and resolved.name != event_data.intent: pinfo(f"[DISPATCHER] Intent aufgeloest: '{event_data.intent}' → '{resolved.name}'") event_data.intent = resolved.name pinfo(f"[DISPATCHER] Handler-Suche für '{event_data.intent}': {'gefunden' if handler else 'NICHT gefunden'}") response_text = "" handler_data: dict[str, Any] = {} success = True error = "" needs_followup = False followup_prompt = "" suppress_response = False if handler is not None: # Admin-Only Pruefung: Intent nur bei system_command Wakeword erlauben handler_meta = getattr(handler, INTENT_METADATA_ATTR, {}) is_admin = handler_meta.get("admin_only", False) if is_admin: ww_type = getattr(event_data, "wakeword_type", "") ww_model = getattr(event_data, "wakeword_model", ww_type) admin_wakewords = self._get_admin_wakewords() audit = _get_admin_logger() if ww_model not in admin_wakewords and ww_type not in admin_wakewords: audit.warning( "ABGELEHNT intent='%s' satellite='%s' room='%s' " "wakeword='%s' text='%s'", event_data.intent, event_data.satellite_id, event_data.room_id, ww_type, event_data.original_text, ) pinfo(f"[DISPATCHER] Admin-Intent '{event_data.intent}' abgelehnt " f"(Wakeword: '{ww_type}', erfordert: 'system_command')") response_text = "Dieser Befehl erfordert das System-Wakeword." success = False handled_event = IntentHandled( satellite_id=event_data.satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, intent=event_data.intent, original_text=event_data.original_text, slots=event_data.slots, success=False, response_text=response_text, ) await self._application.events.trigger("intent_handled", handled_event) output_event = OutputTextCreated( satellite_id=event_data.satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, text=response_text, intent=event_data.intent, ) await self._application.events.trigger("output_text_created", output_event) return else: audit.info( "ERLAUBT intent='%s' satellite='%s' room='%s' " "wakeword='%s' text='%s'", event_data.intent, event_data.satellite_id, event_data.room_id, ww_type, event_data.original_text, ) # Handler-Input erstellen handler_input = IntentReceivedData( intent=event_data.intent, confidence=event_data.confidence, slots=event_data.slots, original_text=event_data.original_text, satellite_id=event_data.satellite_id, room_id=event_data.room_id, session_id=event_data.session_id, wakeword_type=event_data.wakeword_type, is_authenticated=event_data.is_authenticated, ) try: result = await handler(handler_input) if result: success = result.success error = result.error handler_data = result.data or {} suppress_response = result.suppress_tts if result.has_response(): response_text = result.response_text if result.needs_follow_up(): needs_followup = True followup_prompt = result.follow_up_intent # Follow-Up-Kontext mit Validierung speichern import time as _time self._active_followups[event_data.satellite_id] = { "follow_up_intent": result.follow_up_intent, "valid_responses": result.follow_up_valid_responses, "retry_text": result.follow_up_retry_text, "data": result.data or {}, "original_intent": event_data.intent, "created_at": _time.time(), } except Exception as e: perror(f"Handler-Fehler für '{event_data.intent}': {e}") success = False error = str(e) else: pdebug(f"Kein Handler für Intent: {event_data.intent}") handler_data = {"note": "Kein Handler registriert"} # intent_handled emittieren handled_event = IntentHandled( satellite_id=event_data.satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, intent=event_data.intent, original_text=event_data.original_text, slots=event_data.slots, success=success, response_text=response_text, data=handler_data, error=error, needs_followup=needs_followup, followup_prompt=followup_prompt, suppress_response=suppress_response, ) await self._application.events.trigger("intent_handled", handled_event) # Wenn Handler Antwort hat → direkt output_text_created if response_text: pinfo(f"[DISPATCHER] Handler hat Antwort: '{response_text[:80]}' → output_text_created") output_event = OutputTextCreated( satellite_id=event_data.satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, text=response_text, intent=event_data.intent, is_followup=needs_followup, expects_response=needs_followup, ) await self._application.events.trigger("output_text_created", output_event) # Wenn keine Antwort und nicht unterdrückt → create_output_text (LLM generiert Antwort) elif not suppress_response: pinfo(f"[DISPATCHER] Keine Handler-Antwort → create_output_text (LLM generiert)") create_event = CreateOutputText( satellite_id=event_data.satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, intent=event_data.intent, original_text=event_data.original_text, slots=event_data.slots, handler_data=handler_data, handler_success=success, handler_error=error, language=event_data.language, ) await self._application.events.trigger("create_output_text", create_event) # ========================================================================= # Event: output_text_created → tts_request # ========================================================================= @TrixyEvent(["output_text_created"]) async def on_output_text_created(self, event_name: str, event_data: OutputTextCreated) -> None: """ Leitet Antworttext an TTS weiter. Emittiert: tts_request Bei Follow-up: Speichert Info für followup_expected nach TTS """ if not event_data.text: return import uuid from trixy_core.utils.template_formatter import format_template # Template-Platzhalter aufloesen (Plugins koennen {date.*} etc. nutzen) resolved_text = format_template(event_data.text, application=self._application) request_id = str(uuid.uuid4()) tts_request = { "request_id": request_id, "satellite_id": event_data.satellite_id, "text": resolved_text, } # Bei Follow-up: Request-ID merken für späteren followup_expected if event_data.expects_response: self._pending_followups[request_id] = { "satellite_id": event_data.satellite_id, "session_id": event_data.session_id, "room_id": event_data.room_id, "intent": event_data.intent, } pdebug(f"Follow-up registriert für TTS-Request: {request_id}") pdebug(f"TTS-Request: {resolved_text[:50]}...") await self._application.events.emit("tts_request", tts_request) # ========================================================================= # Event: tts_completed → followup_expected (bei Rückfragen) # ========================================================================= @TrixyEvent(["tts_completed"]) async def on_tts_completed(self, event_name: str, event_data) -> None: """ Verarbeitet TTS-Ergebnis: 1. Audio an Satellite senden 2. Bei Follow-up: followup_expected emittieren 3. Ohne Follow-up: ConversationEnd senden """ # tts_completed kommt als generisches EventData (via emit() mit dict) satellite_id = event_data.get("satellite_id", "") audio_data_hex = event_data.get("audio_data", "") request_id = event_data.get("request_id", "") session_id = event_data.get("session_id", "") duration_seconds = event_data.get("duration_seconds", 0) # Audio an Satellite senden (falls satellite_id vorhanden) if satellite_id and audio_data_hex: await self._send_tts_to_satellite(satellite_id, audio_data_hex) # Follow-up prüfen followup_info = self._pending_followups.pop(request_id, None) if request_id else None if followup_info: success = event_data.get("success", True) if not success: perror(f"TTS fehlgeschlagen für Follow-up Request: {request_id}") await self._send_conversation_end(satellite_id, session_id) return # followup_expected emittieren - Client wechselt in Hör-Modus followup_event = FollowUpExpected( satellite_id=followup_info["satellite_id"], session_id=followup_info["session_id"], room_id=followup_info["room_id"], timeout_seconds=30.0, followup_context={ "previous_intent": followup_info["intent"], }, ) pinfo(f"Follow-up erwartet für Satellite: {followup_info['satellite_id']}") await self._application.events.trigger("followup_expected", followup_event) # FollowUpRequest an Satellite senden (Server→Client Modus) await self._send_follow_up_request( followup_info["satellite_id"], followup_info["session_id"], audio_duration=duration_seconds, ) else: # Kein Follow-up → ConversationEnd senden if satellite_id: await self._send_conversation_end(satellite_id, session_id) async def _send_conversation_end(self, satellite_id: str, session_id: str = "") -> None: """Sendet ConversationEnd an den Satellite.""" satellites = getattr(self._application, "satellites", None) if satellites is None: return satellite = satellites.get(satellite_id) if satellite is None or not satellite.is_connected: return try: from trixy_core.network.cmd.wakeword import ConversationEnd cmd = ConversationEnd( session_id=session_id, reason="completed", ) await satellite.send_command(cmd) pdebug(f"ConversationEnd gesendet an {satellite_id} (reason=completed)") except Exception as e: perror(f"Fehler beim Senden von ConversationEnd: {e}") async def _send_follow_up_request( self, satellite_id: str, session_id: str = "", audio_duration: float = 0, ) -> None: """Sendet FollowUpRequest an den Satellite → Client wechselt in Hoer-Modus.""" satellites = getattr(self._application, "satellites", None) if satellites is None: return satellite = satellites.get(satellite_id) if satellite is None or not satellite.is_connected: pdebug(f"Satellite {satellite_id} nicht verfuegbar fuer FollowUpRequest") return try: from trixy_core.network.cmd.wakeword import FollowUpRequest cmd = FollowUpRequest( session_id=session_id, question="", # Keine explizite Rueckfrage — Konversation geht einfach weiter timeout_seconds=60.0, audio_duration=audio_duration, ) await satellite.send_command(cmd) pdebug(f"FollowUpRequest gesendet an {satellite_id}") except Exception as e: perror(f"Fehler beim Senden von FollowUpRequest: {e}") async def _send_tts_to_satellite(self, satellite_id: str, audio_data_hex: str) -> None: """Sendet TTS-Audio an den Satellite.""" try: audio_bytes = bytes.fromhex(audio_data_hex) except (ValueError, AttributeError) as e: perror(f"TTS-Audio Dekodierung fehlgeschlagen: {e}") return satellites = getattr(self._application, "satellites", None) if not satellites: return satellite = satellites.get(satellite_id) if not satellite or not satellite.is_connected: pdebug(f"Satellite nicht verfügbar für TTS: {satellite_id}") return pinfo(f"[DISPATCHER] Sende TTS-Audio ({len(audio_bytes)} bytes) an {satellite.alias}") success = await satellite.say(audio_bytes) if success: # TTSStop senden damit Client weiß dass Stream beendet ist from trixy_core.network.cmd import TTSStop network = self._application.services.get_service("NetworkService") if network and hasattr(network, "send_to_satellite"): await network.send_to_satellite(satellite_id, TTSStop()) pdebug(f"TTSStop gesendet an {satellite.alias}") else: perror(f"TTS-Audio senden fehlgeschlagen an {satellite_id}") # ========================================================================= # System-Intent Behandlung # ========================================================================= async def _handle_system_intent(self, event_data: IntentReceived) -> None: """Behandelt System-Intents direkt.""" intent = event_data.intent slots = event_data.slots satellite_id = event_data.satellite_id response_text = "" handler_data: dict[str, Any] = {} success = True needs_followup = False # Zeit-Abfragen if intent == "get_time": from datetime import datetime now = datetime.now() response_text = f"Es ist {now.strftime('%H:%M')} Uhr." elif intent == "get_date": from datetime import datetime weekdays = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"] now = datetime.now() response_text = f"Heute ist {weekdays[now.weekday()]}, der {now.strftime('%d.%m.%Y')}." elif intent == "health_check": response_text = await self._generate_health_response() elif intent == "help": response_text = "Ich kann Geräte steuern, Fragen beantworten und vieles mehr. Sag einfach was du brauchst." elif intent == "cancel": response_text = "Alles klar, abgebrochen." # Medien-Steuerung elif intent == "stop": await self._application.events.emit("media_stop_all", { "satellite_id": satellite_id, }) response_text = "Gestoppt." elif intent == "pause": await self._application.events.emit("music_paused", { "satellite_id": satellite_id, }) response_text = "Pausiert." elif intent == "resume": await self._application.events.emit("music_resumed", { "satellite_id": satellite_id, }) response_text = "Wird fortgesetzt." # Lautstärke elif intent in ("volume_up", "volume_down", "volume_set", "mute", "unmute"): if intent == "volume_up": await self._application.events.emit("music_volume_change", { "direction": "up", "amount": slots.get("amount", 10), }) response_text = "Lauter." elif intent == "volume_down": await self._application.events.emit("music_volume_change", { "direction": "down", "amount": slots.get("amount", 10), }) response_text = "Leiser." elif intent == "volume_set": level = slots.get("level", 50) await self._application.events.emit("music_volume_change", { "direction": "set", "level": level, }) response_text = f"Lautstärke auf {level} Prozent." elif intent == "mute": await self._application.events.emit("music_volume_change", { "direction": "mute", }) response_text = "Stumm." elif intent == "unmute": await self._application.events.emit("music_volume_change", { "direction": "unmute", }) response_text = "Ton an." # Admin-Authentifizierung elif intent == "system_login": password = slots.get("password", "") if self._authenticate(satellite_id, password): response_text = "Administrator-Anmeldung erfolgreich." else: response_text = "Falsches Passwort." success = False elif intent == "system_logout": self._logout(satellite_id) response_text = "Administrator-Sitzung beendet." # Admin-Befehle elif intent == "system_shutdown": response_text = "System wird heruntergefahren." from trixy_core.events.event_data.basic import SystemShutdown await self._application.events.trigger( "system_shutdown", SystemShutdown(reason="Admin-Befehl") ) elif intent == "system_reboot": response_text = "System wird neu gestartet." elif intent == "system_status": response_text = await self._generate_status_response() # Unknown system intent else: response_text = f"System-Befehl '{intent}' ist nicht implementiert." success = False # intent_handled emittieren handled_event = IntentHandled( satellite_id=satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, intent=intent, original_text=event_data.original_text, slots=slots, success=success, response_text=response_text, data=handler_data, needs_followup=needs_followup, ) await self._application.events.trigger("intent_handled", handled_event) # output_text_created emittieren if response_text: output_event = OutputTextCreated( satellite_id=satellite_id, session_id=event_data.session_id, room_id=event_data.room_id, text=response_text, intent=intent, expects_response=needs_followup, ) await self._application.events.trigger("output_text_created", output_event) async def _generate_health_response(self) -> str: """Generiert Health-Check Antwort.""" try: import psutil cpu = psutil.cpu_percent() mem = psutil.virtual_memory().percent return f"Mir geht es gut! CPU: {cpu}%, Speicher: {mem}%." except ImportError: return "Mir geht es gut! Alle Systeme laufen normal." async def _generate_status_response(self) -> str: """Generiert detaillierten Status.""" try: import psutil from datetime import datetime cpu = psutil.cpu_percent() mem = psutil.virtual_memory() disk = psutil.disk_usage('/') return (f"Systemstatus: CPU {cpu}%, " f"RAM {mem.percent}% ({mem.used // (1024**3)}GB), " f"Disk {disk.percent}%.") except ImportError: return "Detaillierter Status nicht verfügbar." def _get_admin_wakewords(self) -> set[str]: """Liest die Admin-Wakewords aus der Server-Config.""" config_manager = getattr(self._application, "config_manager", None) if config_manager: ww_cfg = config_manager.get("wakeword", {}) if isinstance(ww_cfg, dict): return set(ww_cfg.get("admin_wakewords", ["system_command"])) elif hasattr(ww_cfg, "admin_wakewords"): return set(ww_cfg.admin_wakewords) return {"system_command"} def _authenticate(self, satellite_id: str, password: str) -> bool: """Admin-Authentifizierung.""" # TODO: Passwort aus Config holen admin_password = "admin" # Placeholder if password == admin_password: self._admin_sessions[satellite_id] = { "authenticated": True, "expires": time.time() + 1800, } return True return False def _logout(self, satellite_id: str) -> None: """Admin-Abmeldung.""" self._admin_sessions.pop(satellite_id, None) def _validate_followup_response( self, event_data: IntentReceived, followup_ctx: dict[str, Any], ) -> dict[str, Any] | None: """ Validiert eine Antwort im Follow-Up-Kontext. Prueft ob die Antwort zu den erlaubten Antworten passt. Bei offener Validierung (keine valid_responses) wird der Text als Slot-Wert uebernommen. Args: event_data: Das eingehende Intent-Event followup_ctx: Der gespeicherte Follow-Up-Kontext Returns: Dict mit extrahierten Slot-Werten oder None bei ungueltiger Antwort. Leeres Dict wenn keine Validierung definiert (alles akzeptiert). """ valid_responses = followup_ctx.get("valid_responses", []) text = event_data.original_text.strip().lower() # Keine Validierung definiert — alles akzeptieren if not valid_responses: return {"response_text": event_data.original_text} # Gegen gueltige Antworten pruefen (fuzzy, case-insensitive) for valid in valid_responses: valid_lower = valid.lower() # Exakter Match if valid_lower == text or valid_lower in text: return {"response_text": valid, "matched_value": valid} # Mehrere Werte in einem Satz suchen (z.B. "Salami und Pilze") found_values = [] for valid in valid_responses: if valid.lower() in text: found_values.append(valid) if found_values: return {"response_text": ", ".join(found_values), "matched_values": found_values} # Kein Match — ungueltige Antwort pdebug( f"[DISPATCHER] Follow-Up: '{text}' nicht in " f"{[v[:15] for v in valid_responses[:5]]}..." ) return None