| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775 |
- # -*- coding: utf-8 -*-
- """
- Intent Dispatcher Service.
- Zentrale Komponente die Intent-Events verarbeitet und
- den Handler-Flow koordiniert.
- Event-Flow:
- intent_received
- ↓
- [Intent Dispatcher]
- ├── Handler finden und aufrufen
- ├── System-Intents direkt behandeln
- ↓
- intent_handled
- ↓
- [Falls keine response_text und nicht suppress_response]
- ↓
- create_output_text
- """
- import time
- from typing import Any, TYPE_CHECKING
- from trixy_core.service.iservice import IService
- from trixy_core.service.enums import ServicePriority, ServiceGroup
- from trixy_core.events.decorators import TrixyEvent
- from trixy_core.events.event_data.basic import (
- IntentReceived,
- IntentHandled,
- CreateOutputText,
- OutputTextCreated,
- FollowUpExpected,
- )
- from trixy_core.nlp.intent_registry import IntentRegistry
- from trixy_core.nlp.system_intents import is_system_intent, get_system_intent_info, is_admin_intent
- from trixy_core.nlp.handler import IntentReceivedData
- from trixy_core.nlp.decorators import INTENT_METADATA_ATTR
- from trixy_core.utils.debug import pinfo, pdebug, perror
- import logging
- from pathlib import Path
- # Separater Logger fuer Admin-Audit-Log
- _admin_logger: logging.Logger | None = None
- def _get_admin_logger() -> logging.Logger:
- """Erstellt/gibt den Admin-Audit-Logger zurueck."""
- global _admin_logger
- if _admin_logger is None:
- _admin_logger = logging.getLogger("trixy.admin_audit")
- _admin_logger.setLevel(logging.INFO)
- _admin_logger.propagate = False
- log_dir = Path("logs")
- log_dir.mkdir(exist_ok=True)
- handler = logging.FileHandler(log_dir / "admin_audit.log", encoding="utf-8")
- handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
- _admin_logger.addHandler(handler)
- return _admin_logger
- if TYPE_CHECKING:
- from trixy_core.application import IApplication
- class IntentDispatcherService(IService):
- """
- Service der Intent-Events verarbeitet.
- Verantwortlich für:
- 1. Handler-Aufruf für intent_received
- 2. Emission von intent_handled
- 3. Emission von create_output_text (falls nötig)
- 4. System-Intent Behandlung
- 5. TTS-Request für output_text_created
- """
- PRIORITY = ServicePriority.MANAGER
- GROUP = ServiceGroup.CONVERSATION
- DEPENDENCIES: list[str] = []
- NAME = "IntentDispatcher"
- def __init__(self, application: "IApplication") -> None:
- super().__init__(application)
- self._admin_sessions: dict[str, dict[str, Any]] = {}
- # Tracking für ausstehende Follow-ups: request_id → session_info
- self._pending_followups: dict[str, dict[str, Any]] = {}
- # Aktive Follow-Up-Kontexte: satellite_id → Follow-Up-Info
- self._active_followups: dict[str, dict[str, Any]] = {}
- async def start(self) -> None:
- """Startet den Service."""
- pinfo("IntentDispatcher gestartet")
- async def stop(self) -> None:
- """Stoppt den Service."""
- pdebug("IntentDispatcher gestoppt")
- # =========================================================================
- # Event: intent_received → intent_handled
- # =========================================================================
- @TrixyEvent(["intent_received"])
- async def on_intent_received(self, event_name: str, event_data: IntentReceived) -> None:
- """
- Verarbeitet erkannte Intents.
- 1. Findet passenden Handler
- 2. Ruft Handler auf
- 3. Emittiert intent_handled
- 4. Emittiert ggf. create_output_text
- """
- pinfo(f"[DISPATCHER] intent_received: '{event_data.intent}' (confidence={event_data.confidence:.2f}, text='{event_data.original_text[:50]}')")
- # System-Intents direkt behandeln
- if is_system_intent(event_data.intent):
- pinfo(f"[DISPATCHER] System-Intent erkannt: '{event_data.intent}'")
- # Admin-System-Intents loggen
- if is_admin_intent(event_data.intent):
- _get_admin_logger().info(
- "SYSTEM intent='%s' satellite='%s' room='%s' "
- "wakeword='%s' text='%s'",
- event_data.intent, event_data.satellite_id,
- event_data.room_id, event_data.wakeword_type,
- event_data.original_text,
- )
- await self._handle_system_intent(event_data)
- return
- # Follow-Up-Validierung: Ist eine Rueckfrage aktiv?
- followup_ctx = self._active_followups.get(event_data.satellite_id)
- if followup_ctx:
- import time as _time
- # Timeout: Follow-Up nach 60s ablaufen lassen
- created_at = followup_ctx.get("created_at", 0)
- if created_at and (_time.time() - created_at) > 60:
- pdebug(f"[DISPATCHER] Follow-Up abgelaufen (>60s)")
- del self._active_followups[event_data.satellite_id]
- followup_ctx = None
- # Wenn der Classifier einen klaren anderen Intent erkannt hat
- # (nicht "unknown" und hohe Confidence), Follow-Up ignorieren
- elif (event_data.intent != "unknown"
- and event_data.confidence >= 0.8
- and event_data.intent != followup_ctx.get("follow_up_intent")
- and event_data.intent != followup_ctx.get("original_intent")):
- pinfo(
- f"[DISPATCHER] Follow-Up uebersprungen — "
- f"neuer Intent '{event_data.intent}' (conf={event_data.confidence:.2f}) "
- f"hat Vorrang vor Follow-Up '{followup_ctx.get('follow_up_intent')}'"
- )
- del self._active_followups[event_data.satellite_id]
- followup_ctx = None
- if followup_ctx:
- validated = self._validate_followup_response(
- event_data, followup_ctx,
- )
- if validated is not None:
- # Antwort validiert — Intent und Slots anpassen
- event_data.intent = followup_ctx["follow_up_intent"]
- event_data.slots = {**followup_ctx.get("data", {}), **validated}
- # Follow-Up-Kontext loeschen
- del self._active_followups[event_data.satellite_id]
- pdebug(f"[DISPATCHER] Follow-Up validiert: {event_data.intent}")
- elif validated is None and followup_ctx.get("valid_responses"):
- # Ungueltige Antwort — Retry
- retry_text = followup_ctx.get("retry_text", "")
- if retry_text:
- pinfo(f"[DISPATCHER] Follow-Up: Ungueltige Antwort, Retry")
- output_event = OutputTextCreated(
- satellite_id=event_data.satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- text=retry_text,
- intent=followup_ctx.get("original_intent", ""),
- is_followup=True,
- expects_response=True,
- )
- await self._application.events.trigger("output_text_created", output_event)
- return
- # Plugin-Handler suchen (mit Suffix-Fallback fuer LLM-verkuerzte Namen)
- registry = IntentRegistry.get_instance()
- resolved = registry.resolve_intent(event_data.intent)
- handler = resolved.handler if resolved else None
- # Falls Suffix-Match, den aufgeloesten Namen verwenden
- if resolved and resolved.name != event_data.intent:
- pinfo(f"[DISPATCHER] Intent aufgeloest: '{event_data.intent}' → '{resolved.name}'")
- event_data.intent = resolved.name
- pinfo(f"[DISPATCHER] Handler-Suche für '{event_data.intent}': {'gefunden' if handler else 'NICHT gefunden'}")
- response_text = ""
- handler_data: dict[str, Any] = {}
- success = True
- error = ""
- needs_followup = False
- followup_prompt = ""
- suppress_response = False
- if handler is not None:
- # Admin-Only Pruefung: Intent nur bei system_command Wakeword erlauben
- handler_meta = getattr(handler, INTENT_METADATA_ATTR, {})
- is_admin = handler_meta.get("admin_only", False)
- if is_admin:
- ww_type = getattr(event_data, "wakeword_type", "")
- ww_model = getattr(event_data, "wakeword_model", ww_type)
- admin_wakewords = self._get_admin_wakewords()
- audit = _get_admin_logger()
- if ww_model not in admin_wakewords and ww_type not in admin_wakewords:
- audit.warning(
- "ABGELEHNT intent='%s' satellite='%s' room='%s' "
- "wakeword='%s' text='%s'",
- event_data.intent, event_data.satellite_id,
- event_data.room_id, ww_type, event_data.original_text,
- )
- pinfo(f"[DISPATCHER] Admin-Intent '{event_data.intent}' abgelehnt "
- f"(Wakeword: '{ww_type}', erfordert: 'system_command')")
- response_text = "Dieser Befehl erfordert das System-Wakeword."
- success = False
- handled_event = IntentHandled(
- satellite_id=event_data.satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- intent=event_data.intent,
- original_text=event_data.original_text,
- slots=event_data.slots,
- success=False,
- response_text=response_text,
- )
- await self._application.events.trigger("intent_handled", handled_event)
- output_event = OutputTextCreated(
- satellite_id=event_data.satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- text=response_text,
- intent=event_data.intent,
- )
- await self._application.events.trigger("output_text_created", output_event)
- return
- else:
- audit.info(
- "ERLAUBT intent='%s' satellite='%s' room='%s' "
- "wakeword='%s' text='%s'",
- event_data.intent, event_data.satellite_id,
- event_data.room_id, ww_type, event_data.original_text,
- )
- # Handler-Input erstellen
- handler_input = IntentReceivedData(
- intent=event_data.intent,
- confidence=event_data.confidence,
- slots=event_data.slots,
- original_text=event_data.original_text,
- satellite_id=event_data.satellite_id,
- room_id=event_data.room_id,
- session_id=event_data.session_id,
- wakeword_type=event_data.wakeword_type,
- is_authenticated=event_data.is_authenticated,
- )
- try:
- result = await handler(handler_input)
- if result:
- success = result.success
- error = result.error
- handler_data = result.data or {}
- suppress_response = result.suppress_tts
- if result.has_response():
- response_text = result.response_text
- if result.needs_follow_up():
- needs_followup = True
- followup_prompt = result.follow_up_intent
- # Follow-Up-Kontext mit Validierung speichern
- import time as _time
- self._active_followups[event_data.satellite_id] = {
- "follow_up_intent": result.follow_up_intent,
- "valid_responses": result.follow_up_valid_responses,
- "retry_text": result.follow_up_retry_text,
- "data": result.data or {},
- "original_intent": event_data.intent,
- "created_at": _time.time(),
- }
- except Exception as e:
- perror(f"Handler-Fehler für '{event_data.intent}': {e}")
- success = False
- error = str(e)
- else:
- pdebug(f"Kein Handler für Intent: {event_data.intent}")
- handler_data = {"note": "Kein Handler registriert"}
- # intent_handled emittieren
- handled_event = IntentHandled(
- satellite_id=event_data.satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- intent=event_data.intent,
- original_text=event_data.original_text,
- slots=event_data.slots,
- success=success,
- response_text=response_text,
- data=handler_data,
- error=error,
- needs_followup=needs_followup,
- followup_prompt=followup_prompt,
- suppress_response=suppress_response,
- )
- await self._application.events.trigger("intent_handled", handled_event)
- # Wenn Handler Antwort hat → direkt output_text_created
- if response_text:
- pinfo(f"[DISPATCHER] Handler hat Antwort: '{response_text[:80]}' → output_text_created")
- output_event = OutputTextCreated(
- satellite_id=event_data.satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- text=response_text,
- intent=event_data.intent,
- is_followup=needs_followup,
- expects_response=needs_followup,
- )
- await self._application.events.trigger("output_text_created", output_event)
- # Wenn keine Antwort und nicht unterdrückt → create_output_text (LLM generiert Antwort)
- elif not suppress_response:
- pinfo(f"[DISPATCHER] Keine Handler-Antwort → create_output_text (LLM generiert)")
- create_event = CreateOutputText(
- satellite_id=event_data.satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- intent=event_data.intent,
- original_text=event_data.original_text,
- slots=event_data.slots,
- handler_data=handler_data,
- handler_success=success,
- handler_error=error,
- language=event_data.language,
- )
- await self._application.events.trigger("create_output_text", create_event)
- # =========================================================================
- # Event: output_text_created → tts_request
- # =========================================================================
- @TrixyEvent(["output_text_created"])
- async def on_output_text_created(self, event_name: str, event_data: OutputTextCreated) -> None:
- """
- Leitet Antworttext an TTS weiter.
- Emittiert: tts_request
- Bei Follow-up: Speichert Info für followup_expected nach TTS
- """
- if not event_data.text:
- return
- import uuid
- from trixy_core.utils.template_formatter import format_template
- # Template-Platzhalter aufloesen (Plugins koennen {date.*} etc. nutzen)
- resolved_text = format_template(event_data.text, application=self._application)
- request_id = str(uuid.uuid4())
- tts_request = {
- "request_id": request_id,
- "satellite_id": event_data.satellite_id,
- "text": resolved_text,
- }
- # Bei Follow-up: Request-ID merken für späteren followup_expected
- if event_data.expects_response:
- self._pending_followups[request_id] = {
- "satellite_id": event_data.satellite_id,
- "session_id": event_data.session_id,
- "room_id": event_data.room_id,
- "intent": event_data.intent,
- }
- pdebug(f"Follow-up registriert für TTS-Request: {request_id}")
- pdebug(f"TTS-Request: {resolved_text[:50]}...")
- await self._application.events.emit("tts_request", tts_request)
- # =========================================================================
- # Event: tts_completed → followup_expected (bei Rückfragen)
- # =========================================================================
- @TrixyEvent(["tts_completed"])
- async def on_tts_completed(self, event_name: str, event_data) -> None:
- """
- Verarbeitet TTS-Ergebnis:
- 1. Audio an Satellite senden
- 2. Bei Follow-up: followup_expected emittieren
- 3. Ohne Follow-up: ConversationEnd senden
- """
- # tts_completed kommt als generisches EventData (via emit() mit dict)
- satellite_id = event_data.get("satellite_id", "")
- audio_data_hex = event_data.get("audio_data", "")
- request_id = event_data.get("request_id", "")
- session_id = event_data.get("session_id", "")
- duration_seconds = event_data.get("duration_seconds", 0)
- # Audio an Satellite senden (falls satellite_id vorhanden)
- if satellite_id and audio_data_hex:
- await self._send_tts_to_satellite(satellite_id, audio_data_hex)
- # Follow-up prüfen
- followup_info = self._pending_followups.pop(request_id, None) if request_id else None
- if followup_info:
- success = event_data.get("success", True)
- if not success:
- perror(f"TTS fehlgeschlagen für Follow-up Request: {request_id}")
- await self._send_conversation_end(satellite_id, session_id)
- return
- # followup_expected emittieren - Client wechselt in Hör-Modus
- followup_event = FollowUpExpected(
- satellite_id=followup_info["satellite_id"],
- session_id=followup_info["session_id"],
- room_id=followup_info["room_id"],
- timeout_seconds=30.0,
- followup_context={
- "previous_intent": followup_info["intent"],
- },
- )
- pinfo(f"Follow-up erwartet für Satellite: {followup_info['satellite_id']}")
- await self._application.events.trigger("followup_expected", followup_event)
- # FollowUpRequest an Satellite senden (Server→Client Modus)
- await self._send_follow_up_request(
- followup_info["satellite_id"],
- followup_info["session_id"],
- audio_duration=duration_seconds,
- )
- else:
- # Kein Follow-up → ConversationEnd senden
- if satellite_id:
- await self._send_conversation_end(satellite_id, session_id)
- async def _send_conversation_end(self, satellite_id: str, session_id: str = "") -> None:
- """Sendet ConversationEnd an den Satellite."""
- satellites = getattr(self._application, "satellites", None)
- if satellites is None:
- return
- satellite = satellites.get(satellite_id)
- if satellite is None or not satellite.is_connected:
- return
- try:
- from trixy_core.network.cmd.wakeword import ConversationEnd
- cmd = ConversationEnd(
- session_id=session_id,
- reason="completed",
- )
- await satellite.send_command(cmd)
- pdebug(f"ConversationEnd gesendet an {satellite_id} (reason=completed)")
- except Exception as e:
- perror(f"Fehler beim Senden von ConversationEnd: {e}")
- async def _send_follow_up_request(
- self, satellite_id: str, session_id: str = "", audio_duration: float = 0,
- ) -> None:
- """Sendet FollowUpRequest an den Satellite → Client wechselt in Hoer-Modus."""
- satellites = getattr(self._application, "satellites", None)
- if satellites is None:
- return
- satellite = satellites.get(satellite_id)
- if satellite is None or not satellite.is_connected:
- pdebug(f"Satellite {satellite_id} nicht verfuegbar fuer FollowUpRequest")
- return
- try:
- from trixy_core.network.cmd.wakeword import FollowUpRequest
- cmd = FollowUpRequest(
- session_id=session_id,
- question="", # Keine explizite Rueckfrage — Konversation geht einfach weiter
- timeout_seconds=60.0,
- audio_duration=audio_duration,
- )
- await satellite.send_command(cmd)
- pdebug(f"FollowUpRequest gesendet an {satellite_id}")
- except Exception as e:
- perror(f"Fehler beim Senden von FollowUpRequest: {e}")
- async def _send_tts_to_satellite(self, satellite_id: str, audio_data_hex: str) -> None:
- """Sendet TTS-Audio an den Satellite."""
- try:
- audio_bytes = bytes.fromhex(audio_data_hex)
- except (ValueError, AttributeError) as e:
- perror(f"TTS-Audio Dekodierung fehlgeschlagen: {e}")
- return
- satellites = getattr(self._application, "satellites", None)
- if not satellites:
- return
- satellite = satellites.get(satellite_id)
- if not satellite or not satellite.is_connected:
- pdebug(f"Satellite nicht verfügbar für TTS: {satellite_id}")
- return
- pinfo(f"[DISPATCHER] Sende TTS-Audio ({len(audio_bytes)} bytes) an {satellite.alias}")
- success = await satellite.say(audio_bytes)
- if success:
- # TTSStop senden damit Client weiß dass Stream beendet ist
- from trixy_core.network.cmd import TTSStop
- network = self._application.services.get_service("NetworkService")
- if network and hasattr(network, "send_to_satellite"):
- await network.send_to_satellite(satellite_id, TTSStop())
- pdebug(f"TTSStop gesendet an {satellite.alias}")
- else:
- perror(f"TTS-Audio senden fehlgeschlagen an {satellite_id}")
- # =========================================================================
- # System-Intent Behandlung
- # =========================================================================
- async def _handle_system_intent(self, event_data: IntentReceived) -> None:
- """Behandelt System-Intents direkt."""
- intent = event_data.intent
- slots = event_data.slots
- satellite_id = event_data.satellite_id
- response_text = ""
- handler_data: dict[str, Any] = {}
- success = True
- needs_followup = False
- # Zeit-Abfragen
- if intent == "get_time":
- from datetime import datetime
- now = datetime.now()
- response_text = f"Es ist {now.strftime('%H:%M')} Uhr."
- elif intent == "get_date":
- from datetime import datetime
- weekdays = ["Montag", "Dienstag", "Mittwoch", "Donnerstag",
- "Freitag", "Samstag", "Sonntag"]
- now = datetime.now()
- response_text = f"Heute ist {weekdays[now.weekday()]}, der {now.strftime('%d.%m.%Y')}."
- elif intent == "health_check":
- response_text = await self._generate_health_response()
- elif intent == "help":
- response_text = "Ich kann Geräte steuern, Fragen beantworten und vieles mehr. Sag einfach was du brauchst."
- elif intent == "cancel":
- response_text = "Alles klar, abgebrochen."
- # Medien-Steuerung
- elif intent == "stop":
- await self._application.events.emit("media_stop_all", {
- "satellite_id": satellite_id,
- })
- response_text = "Gestoppt."
- elif intent == "pause":
- await self._application.events.emit("music_paused", {
- "satellite_id": satellite_id,
- })
- response_text = "Pausiert."
- elif intent == "resume":
- await self._application.events.emit("music_resumed", {
- "satellite_id": satellite_id,
- })
- response_text = "Wird fortgesetzt."
- # Lautstärke
- elif intent in ("volume_up", "volume_down", "volume_set", "mute", "unmute"):
- if intent == "volume_up":
- await self._application.events.emit("music_volume_change", {
- "direction": "up", "amount": slots.get("amount", 10),
- })
- response_text = "Lauter."
- elif intent == "volume_down":
- await self._application.events.emit("music_volume_change", {
- "direction": "down", "amount": slots.get("amount", 10),
- })
- response_text = "Leiser."
- elif intent == "volume_set":
- level = slots.get("level", 50)
- await self._application.events.emit("music_volume_change", {
- "direction": "set", "level": level,
- })
- response_text = f"Lautstärke auf {level} Prozent."
- elif intent == "mute":
- await self._application.events.emit("music_volume_change", {
- "direction": "mute",
- })
- response_text = "Stumm."
- elif intent == "unmute":
- await self._application.events.emit("music_volume_change", {
- "direction": "unmute",
- })
- response_text = "Ton an."
- # Admin-Authentifizierung
- elif intent == "system_login":
- password = slots.get("password", "")
- if self._authenticate(satellite_id, password):
- response_text = "Administrator-Anmeldung erfolgreich."
- else:
- response_text = "Falsches Passwort."
- success = False
- elif intent == "system_logout":
- self._logout(satellite_id)
- response_text = "Administrator-Sitzung beendet."
- # Admin-Befehle
- elif intent == "system_shutdown":
- response_text = "System wird heruntergefahren."
- from trixy_core.events.event_data.basic import SystemShutdown
- await self._application.events.trigger(
- "system_shutdown", SystemShutdown(reason="Admin-Befehl")
- )
- elif intent == "system_reboot":
- response_text = "System wird neu gestartet."
- elif intent == "system_status":
- response_text = await self._generate_status_response()
- # Unknown system intent
- else:
- response_text = f"System-Befehl '{intent}' ist nicht implementiert."
- success = False
- # intent_handled emittieren
- handled_event = IntentHandled(
- satellite_id=satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- intent=intent,
- original_text=event_data.original_text,
- slots=slots,
- success=success,
- response_text=response_text,
- data=handler_data,
- needs_followup=needs_followup,
- )
- await self._application.events.trigger("intent_handled", handled_event)
- # output_text_created emittieren
- if response_text:
- output_event = OutputTextCreated(
- satellite_id=satellite_id,
- session_id=event_data.session_id,
- room_id=event_data.room_id,
- text=response_text,
- intent=intent,
- expects_response=needs_followup,
- )
- await self._application.events.trigger("output_text_created", output_event)
- async def _generate_health_response(self) -> str:
- """Generiert Health-Check Antwort."""
- try:
- import psutil
- cpu = psutil.cpu_percent()
- mem = psutil.virtual_memory().percent
- return f"Mir geht es gut! CPU: {cpu}%, Speicher: {mem}%."
- except ImportError:
- return "Mir geht es gut! Alle Systeme laufen normal."
- async def _generate_status_response(self) -> str:
- """Generiert detaillierten Status."""
- try:
- import psutil
- from datetime import datetime
- cpu = psutil.cpu_percent()
- mem = psutil.virtual_memory()
- disk = psutil.disk_usage('/')
- return (f"Systemstatus: CPU {cpu}%, "
- f"RAM {mem.percent}% ({mem.used // (1024**3)}GB), "
- f"Disk {disk.percent}%.")
- except ImportError:
- return "Detaillierter Status nicht verfügbar."
- def _get_admin_wakewords(self) -> set[str]:
- """Liest die Admin-Wakewords aus der Server-Config."""
- config_manager = getattr(self._application, "config_manager", None)
- if config_manager:
- ww_cfg = config_manager.get("wakeword", {})
- if isinstance(ww_cfg, dict):
- return set(ww_cfg.get("admin_wakewords", ["system_command"]))
- elif hasattr(ww_cfg, "admin_wakewords"):
- return set(ww_cfg.admin_wakewords)
- return {"system_command"}
- def _authenticate(self, satellite_id: str, password: str) -> bool:
- """Admin-Authentifizierung."""
- # TODO: Passwort aus Config holen
- admin_password = "admin" # Placeholder
- if password == admin_password:
- self._admin_sessions[satellite_id] = {
- "authenticated": True,
- "expires": time.time() + 1800,
- }
- return True
- return False
- def _logout(self, satellite_id: str) -> None:
- """Admin-Abmeldung."""
- self._admin_sessions.pop(satellite_id, None)
- def _validate_followup_response(
- self,
- event_data: IntentReceived,
- followup_ctx: dict[str, Any],
- ) -> dict[str, Any] | None:
- """
- Validiert eine Antwort im Follow-Up-Kontext.
- Prueft ob die Antwort zu den erlaubten Antworten passt.
- Bei offener Validierung (keine valid_responses) wird
- der Text als Slot-Wert uebernommen.
- Args:
- event_data: Das eingehende Intent-Event
- followup_ctx: Der gespeicherte Follow-Up-Kontext
- Returns:
- Dict mit extrahierten Slot-Werten oder None bei ungueltiger Antwort.
- Leeres Dict wenn keine Validierung definiert (alles akzeptiert).
- """
- valid_responses = followup_ctx.get("valid_responses", [])
- text = event_data.original_text.strip().lower()
- # Keine Validierung definiert — alles akzeptieren
- if not valid_responses:
- return {"response_text": event_data.original_text}
- # Gegen gueltige Antworten pruefen (fuzzy, case-insensitive)
- for valid in valid_responses:
- valid_lower = valid.lower()
- # Exakter Match
- if valid_lower == text or valid_lower in text:
- return {"response_text": valid, "matched_value": valid}
- # Mehrere Werte in einem Satz suchen (z.B. "Salami und Pilze")
- found_values = []
- for valid in valid_responses:
- if valid.lower() in text:
- found_values.append(valid)
- if found_values:
- return {"response_text": ", ".join(found_values), "matched_values": found_values}
- # Kein Match — ungueltige Antwort
- pdebug(
- f"[DISPATCHER] Follow-Up: '{text}' nicht in "
- f"{[v[:15] for v in valid_responses[:5]]}..."
- )
- return None
|