# -*- coding: utf-8 -*- """ Wakeword Service - Haupt-Service für Wakeword-Erkennung. Läuft auf Satellite und Standalone. Audio-Verarbeitung läuft in einem dedizierten Thread (nicht im asyncio-Loop), analog zu den OpenWakeWord-Beispielen. Nur Netzwerk-I/O und Event-Emission werden auf den asyncio-Loop dispatcht. """ from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import TYPE_CHECKING, Callable, Any, Awaitable import asyncio import queue import time import threading from trixy_core.service import IService, ServicePriority, ServiceGroup from trixy_core.wakeword.detector import ( WakewordDetector, WakewordDetection, DetectorConfig, WakewordType, ) from trixy_core.wakeword.audio_buffer import AudioBuffer, BufferConfig from trixy_core.wakeword.vad import VoiceActivityDetector, VADConfig, VADState from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn if TYPE_CHECKING: from trixy_core.application import BaseApplication from trixy_core.audio.microphone import MicrophoneCapture from trixy_core.network.client_connection import ClientConnection class ServiceState(Enum): """Zustand des Wakeword-Service.""" STOPPED = "stopped" # Service gestoppt LISTENING = "listening" # Wakeword-Erkennung aktiv PAUSED = "paused" # Pausiert (z.B. während Arbitration) RECORDING = "recording" # Aufnahme läuft WAITING_RESPONSE = "waiting" # Warte auf Antwort vom Server PROCESSING = "processing" # Verarbeitung läuft FOLLOW_UP = "follow_up" # Warte auf Rückfrage-Antwort @dataclass class WakewordServiceConfig: """Konfiguration für den Wakeword-Service.""" # Detector-Konfiguration detector: DetectorConfig = field(default_factory=DetectorConfig) # Buffer-Konfiguration buffer: BufferConfig = field(default_factory=BufferConfig) # VAD-Konfiguration vad: VADConfig = field(default_factory=VADConfig) # Aufnahme-Konfiguration max_recording_seconds: float = 60.0 silence_timeout_seconds: float = 3.0 follow_up_timeout_seconds: float = 60.0 # Arbitration arbitration_timeout_seconds: float = 2.0 # Cooldown nach Wakeword-Erkennung (verhindert Mehrfach-Erkennung) wakeword_cooldown_seconds: float = 3.0 # Standalone-Modus (keine Arbitration) standalone_mode: bool = False # Audio-Quelle audio_source: str | None = None # None = Standard-Mikrofon @dataclass class RecordingSession: """Eine Aufnahme-Session.""" session_id: str wakeword_detection: WakewordDetection start_time: datetime end_time: datetime | None = None audio_data: bytes = b"" state: ServiceState = ServiceState.RECORDING is_selected: bool = False # Für Rückfragen follow_up_count: int = 0 last_activity: datetime = field(default_factory=datetime.now) class WakewordService(IService): """ Wakeword Detection Service. Verantwortlich für: - Wakeword-Erkennung in Echtzeit - Audio-Aufnahme nach Wakeword - VAD für Sprach-Ende-Erkennung - Kommunikation mit Server (Arbitration) Audio-Verarbeitung läuft in einem dedizierten Thread, um den asyncio-Event-Loop nicht zu blockieren (wie in den OpenWakeWord-Beispielen). """ PRIORITY = ServicePriority.CORE GROUP = ServiceGroup.AUDIO DEPENDENCIES = [] def __init__( self, application: "BaseApplication", config: WakewordServiceConfig | None = None, ): """ Initialisiert den Wakeword-Service. Args: application: Anwendungs-Instanz config: Service-Konfiguration """ super().__init__(application) self._config = config or WakewordServiceConfig() # Komponenten self._detector = WakewordDetector(self._config.detector) self._audio_buffer = AudioBuffer(self._config.buffer) self._vad = VoiceActivityDetector(self._config.vad) # State self._state = ServiceState.STOPPED self._state_lock = threading.RLock() self._current_session: RecordingSession | None = None # Audio-Thread (dedizierter Thread für Echtzeit-Verarbeitung) self._audio_thread: threading.Thread | None = None self._audio_queue: queue.Queue[bytes] = queue.Queue(maxsize=50) self._stop_event = threading.Event() # Referenz auf den asyncio-Event-Loop (für Thread → Loop Kommunikation) self._loop: asyncio.AbstractEventLoop | None = None # Callbacks self._on_wakeword: list[Callable[[WakewordDetection], None]] = [] self._on_recording_complete: list[Callable[[RecordingSession], None]] = [] self._on_state_change: list[Callable[[ServiceState], None]] = [] # Server-Kommunikation (wird von Client gesetzt) self._send_to_server: Callable[[str, dict], asyncio.Future] | None = None self._session_id_generator: Callable[[], str] | None = None # Mikrofon (Client-Modus) self._microphone: MicrophoneCapture | None = None # Audio-Streaming an Server (connection.send_audio) self._audio_streamer: Callable[[bytes], Awaitable[bool]] | None = None # Pending Future für Arbitration-Antwort self._arbitration_future: asyncio.Future | None = None # Cooldown für Wakeword-Erkennung (verhindert Mehrfach-Erkennung) self._last_wakeword_time: float = 0.0 # Audio-Ducking: ALSA-Lautstaerke bei Wakeword reduzieren (Client-seitig) self._volume_handler: object | None = None # VolumeHandler Instanz self._ducking_volume_pct: int = 10 # Ziel-Lautstaerke beim Ducken self._pre_duck_volume: int | None = None # Gespeicherte Original-Lautstaerke self._is_ducked: bool = False # Externe Pause (Server/Plugin-gesteuert) self._externally_paused: bool = False # Server-gesteuerte Aufnahme self._server_recording_active: bool = False self._server_recording_id: str = "" self._server_recording_start_time: float = 0.0 self._on_server_recording_stopped: Callable[[str, float], Awaitable[None]] | None = None @property def config(self) -> WakewordServiceConfig: """Gibt Konfiguration zurück.""" return self._config @property def state(self) -> ServiceState: """Aktueller Zustand.""" with self._state_lock: return self._state @property def externally_paused(self) -> bool: """Prüft ob die Erkennung extern pausiert ist.""" return self._externally_paused @property def is_listening(self) -> bool: """Prüft ob Service auf Wakeword hört.""" return self._state == ServiceState.LISTENING @property def is_recording(self) -> bool: """Prüft ob Aufnahme läuft.""" return self._state in (ServiceState.RECORDING, ServiceState.FOLLOW_UP) @property def current_session(self) -> RecordingSession | None: """Aktuelle Aufnahme-Session.""" return self._current_session def _set_state(self, new_state: ServiceState) -> None: """Setzt neuen Zustand und triggert Callbacks + Events.""" with self._state_lock: if self._state == new_state: return old_state = self._state self._state = new_state # Callbacks außerhalb des Locks for callback in self._on_state_change: try: callback(new_state) except Exception: pass pdebug(f"WakewordService State: {old_state.value} -> {new_state.value}") # Events fuer wichtige State-Wechsel (async auf Event-Loop dispatchen) event_map = { ServiceState.LISTENING: "wakeword_listening", ServiceState.PAUSED: "wakeword_paused", ServiceState.RECORDING: "recording_started", ServiceState.PROCESSING: "recording_stopped", } event_name = event_map.get(new_state) if event_name: event_data = {"old_state": old_state.value, "new_state": new_state.value} if self._current_session: event_data["session_id"] = self._current_session.session_id if event_name == "recording_stopped" and self._current_session: elapsed = (datetime.now() - self._current_session.start_time).total_seconds() event_data["duration_seconds"] = elapsed self._emit_event(event_name, event_data) # === Audio-Ducking (ALSA, laeuft im Audio-Thread) === def set_volume_handler(self, handler: object, ducking_pct: int = 10) -> None: """ Setzt den VolumeHandler fuer Audio-Ducking. Wird vom Client bei der Initialisierung aufgerufen. Args: handler: VolumeHandler Instanz (hat get_volume/set_volume) ducking_pct: Ziel-Lautstaerke beim Ducken (0-100) """ self._volume_handler = handler self._ducking_volume_pct = ducking_pct pdebug(f"WakewordService: Audio-Ducking aktiviert ({ducking_pct}%)") def _duck_audio(self) -> None: """ Reduziert die ALSA-Lautstaerke sofort (synchron, im Audio-Thread). Nutzt amixer direkt statt den async VolumeHandler, weil wir im Audio-Thread sind und nicht warten koennen. """ if self._is_ducked: return import subprocess import shutil if not shutil.which("amixer"): return try: # Aktuelle Lautstaerke lesen result = subprocess.run( ["amixer", "get", "Master"], capture_output=True, text=True, timeout=2, ) if result.returncode == 0: for line in result.stdout.splitlines(): if "%" in line: start = line.index("[") + 1 end = line.index("%") self._pre_duck_volume = int(line[start:end]) break # Lautstaerke reduzieren subprocess.run( ["amixer", "set", "Master", f"{self._ducking_volume_pct}%"], capture_output=True, timeout=2, ) self._is_ducked = True pdebug(f"Audio-Duck: {self._pre_duck_volume}% → {self._ducking_volume_pct}%") except Exception as e: pdebug(f"Audio-Duck Fehler: {e}") def _unduck_audio(self) -> None: """Stellt die ALSA-Lautstaerke wieder her (synchron).""" if not self._is_ducked or self._pre_duck_volume is None: return import subprocess import shutil if not shutil.which("amixer"): return try: subprocess.run( ["amixer", "set", "Master", f"{self._pre_duck_volume}%"], capture_output=True, timeout=2, ) pdebug(f"Audio-Unduck: {self._ducking_volume_pct}% → {self._pre_duck_volume}%") except Exception as e: pdebug(f"Audio-Unduck Fehler: {e}") self._pre_duck_volume = None self._is_ducked = False def _emit_event(self, event_name: str, data: dict) -> None: """Emittiert ein Event thread-safe auf dem asyncio-Loop.""" events = getattr(self._application, "events", None) if events and self._loop and self._loop.is_running(): self._schedule_async(events.emit(event_name, data)) def _schedule_async(self, coro: Any) -> None: """ Plant eine Coroutine thread-safe auf dem asyncio-Loop ein. Wird aus dem Audio-Thread aufgerufen für Netzwerk-I/O und Events. """ if self._loop and self._loop.is_running(): asyncio.run_coroutine_threadsafe(coro, self._loop) async def start(self) -> None: """Startet den Service (IService).""" await self.on_start() async def stop(self) -> None: """Stoppt den Service (IService).""" await self.on_stop() async def on_start(self) -> None: """Service-Start.""" pinfo("Lade Wakeword-Modelle...") try: self._detector.load_models() pinfo(f"Wakeword-Modelle geladen: {self._detector.loaded_models}") except Exception as e: perror(f"Fehler beim Laden der Wakeword-Modelle: {e}") raise # Originale Aufnahme-Konfiguration merken (fuer recording_config_override Reset) self._original_silence_timeout = self._config.silence_timeout_seconds self._original_max_duration_ms = self._config.vad.max_duration_ms # Event-Handler fuer temporaere Konfigurationsaenderungen registrieren event_manager = getattr(self._application, "events", None) if event_manager: event_manager.register("recording_config_override", self._on_config_override) event_manager.register("wakeword_manual_trigger", self._on_manual_trigger) # Registriere Detector-Callback self._detector.on_detection(self._on_wakeword_detected) # VAD-Callbacks self._vad.on_silence(self._on_silence_detected) self._vad.on_timeout(self._on_recording_timeout) self._vad.on_no_speech(self._on_no_speech_detected) async def on_stop(self) -> None: """Service-Stop.""" await self.stop_listening() self._detector.unload_models() async def start_listening(self) -> None: """Startet die Wakeword-Erkennung.""" if self._state != ServiceState.STOPPED: return # Event-Loop-Referenz speichern self._loop = asyncio.get_running_loop() # Queue leeren (alte Frames von vorherigem Lauf verwerfen) self._drain_audio_queue() self._set_state(ServiceState.LISTENING) self._stop_event.clear() # Starte dedizierten Audio-Thread self._audio_thread = threading.Thread( target=self._audio_processing_loop, name="wakeword-audio", daemon=True, ) self._audio_thread.start() # Mikrofon starten und direkt mit Queue verbinden if self._microphone: self._microphone.set_callback(self._mic_callback) self._microphone.start() pinfo("Wakeword-Erkennung gestartet") async def stop_listening(self) -> None: """Stoppt die Wakeword-Erkennung.""" if self._microphone: self._microphone.stop() self._stop_event.set() if self._audio_thread and self._audio_thread.is_alive(): self._audio_thread.join(timeout=2.0) self._audio_thread = None self._set_state(ServiceState.STOPPED) pinfo("Wakeword-Erkennung gestoppt") def _mic_callback(self, data: bytes) -> None: """ Mikrofon-Callback (läuft in PortAudio-Thread). Ringbuffer-Semantik: Bei voller Queue wird der älteste Frame verworfen. Kein call_soon_threadsafe nötig — queue.Queue ist thread-safe. """ try: self._audio_queue.put_nowait(data) except queue.Full: # Queue voll: ältesten Frame verwerfen, neuen einfügen try: self._audio_queue.get_nowait() except queue.Empty: pass try: self._audio_queue.put_nowait(data) except queue.Full: pass def pause(self) -> None: """Pausiert die Wakeword-Erkennung.""" if self._state == ServiceState.LISTENING: self._set_state(ServiceState.PAUSED) def _drain_audio_queue(self) -> None: """Leert die Audio-Queue (verwirft aufgestaute Frames).""" drained = 0 while True: try: self._audio_queue.get_nowait() drained += 1 except queue.Empty: break if drained > 0: pdebug(f"Audio-Queue geleert: {drained} Frames verworfen") def resume(self) -> None: """Setzt die Wakeword-Erkennung nach interner Verarbeitung fort.""" if self._state == ServiceState.STOPPED: return # Audio-Ducking: Lautstaerke wiederherstellen self._unduck_audio() # Queue leeren (alle Frames aus der Recording-Phase verwerfen) self._drain_audio_queue() self._detector.reset() if self._externally_paused: self._set_state(ServiceState.PAUSED) pdebug("Intern fortgesetzt, aber extern pausiert — bleibe in PAUSED") else: self._set_state(ServiceState.LISTENING) def external_pause(self) -> None: """Pausiert Wakeword-Erkennung extern (Server/Plugin).""" self._externally_paused = True if self._state == ServiceState.LISTENING: self._set_state(ServiceState.PAUSED) pinfo("Wakeword-Erkennung extern pausiert") def external_resume(self) -> None: """Setzt extern pausierte Wakeword-Erkennung fort.""" self._externally_paused = False if self._state == ServiceState.PAUSED: self._detector.reset() self._set_state(ServiceState.LISTENING) pinfo("Wakeword-Erkennung extern fortgesetzt") async def external_stop(self) -> None: """Stoppt Wakeword-Erkennung komplett (Modelle entladen).""" self._externally_paused = False await self.stop_listening() pinfo("Wakeword-Erkennung extern gestoppt") async def external_start(self) -> None: """Startet Wakeword-Erkennung (Modelle laden + Listening).""" if self._state != ServiceState.STOPPED: pdebug("Wakeword-Service läuft bereits") return try: self._detector.load_models() pinfo(f"Wakeword-Modelle geladen: {self._detector.loaded_models}") except Exception as e: perror(f"Fehler beim Laden der Wakeword-Modelle: {e}") return self._detector.on_detection(self._on_wakeword_detected) await self.start_listening() if self._externally_paused: self._set_state(ServiceState.PAUSED) pinfo("Wakeword-Erkennung extern gestartet") async def _on_manual_trigger(self, event_name: str, event_data: dict) -> None: """ Event-Handler fuer manuellen Wakeword-Trigger (z.B. HID Hook-Taste). Ueberspringt die Wakeword-Erkennung und geht direkt in die Aufnahme. Arbitration wird forciert (sofortige Auswahl, kein Zeitfenster). """ await self.trigger_manual( device=event_data.get("device", "unknown"), forced=event_data.get("forced", True), ) async def trigger_manual(self, device: str = "button", forced: bool = True) -> None: """ Manueller Wakeword-Trigger (z.B. durch Tastendruck). Ueberspringt die Wakeword-Detection und startet direkt die Aufnahme. Mit forced=True wird die Arbitration sofort entschieden (kein Warten auf andere Satellites). Args: device: Name des ausloesenden Geraets forced: True = Arbitration sofort entscheiden (kein Fenster) """ if self._state != ServiceState.LISTENING: pdebug(f"Manueller Trigger ignoriert — State: {self._state.value}") return # Cooldown pruefen now = time.monotonic() if now - self._last_wakeword_time < self._config.wakeword_cooldown_seconds: pdebug("Manueller Trigger ignoriert — Cooldown") return self._last_wakeword_time = now pinfo(f"Manueller Wakeword-Trigger ({device}, forced={forced})") # Audio-Ducking sofort self._duck_audio() # Synthetische Detection erstellen (kein echtes Wakeword noetig) detection = WakewordDetection( wakeword_type=WakewordType.CUSTOM, model_name=f"manual:{device}", confidence=1.0, # Maximale Confidence (manuell ausgeloest) audio_level=1.0, # Maximaler Audio-Level (fuer Arbitration-Gewinn) timestamp=datetime.now(), audio_chunks=[], # Kein Wakeword-Audio bei manuellem Trigger ) # Pausiere Erkennung self._set_state(ServiceState.PAUSED) # Session erstellen session_id = self._generate_session_id() self._current_session = RecordingSession( session_id=session_id, wakeword_detection=detection, start_time=datetime.now(), ) if self._config.standalone_mode: # Standalone: Direkt aufnehmen self._current_session.is_selected = True await self._start_recording() else: # Client: Arbitration mit Force-Flag await self._request_arbitration(detection, forced=forced) async def feed_audio(self, audio_data: bytes) -> None: """ Füttert Audio-Daten in den Service (async-API für externen Gebrauch). Bei voller Queue wird der älteste Frame verworfen (Ringbuffer-Semantik). Args: audio_data: 16-bit PCM Audio (16kHz, Mono) """ try: self._audio_queue.put_nowait(audio_data) except queue.Full: try: self._audio_queue.get_nowait() except queue.Empty: pass try: self._audio_queue.put_nowait(audio_data) except queue.Full: pass # === Audio-Thread (dedizierter Thread, kein asyncio) === def _audio_processing_loop(self) -> None: """ Haupt-Audio-Verarbeitungsschleife — läuft in eigenem Thread. Synchron wie in den OpenWakeWord-Beispielen: queue.get() blockiert bis Frame da → process_frame() → nächster get(). Der Thread ist unabhängig vom asyncio-Loop und kann daher nie durch Event-Loop-Last verzögert werden. """ while not self._stop_event.is_set(): try: # Blockierend auf nächsten Frame warten (mit Timeout) try: audio_data = self._audio_queue.get(timeout=0.1) except queue.Empty: continue state = self._state if state in (ServiceState.LISTENING, ServiceState.PAUSED): # LISTENING: Nur den LETZTEN Frame verarbeiten. # Ältere Frames verwerfen — OpenWakeWord puffert intern # und braucht keine lückenlose Frame-Folge. latest = audio_data skipped = 0 while True: try: latest = self._audio_queue.get_nowait() skipped += 1 except queue.Empty: break self._process_audio_frame_sync(latest) elif state in (ServiceState.RECORDING, ServiceState.WAITING_RESPONSE, ServiceState.FOLLOW_UP): # RECORDING: ALLE Frames verarbeiten (VAD + Audio-Streaming). self._process_audio_frame_sync(audio_data) while True: try: frame = self._audio_queue.get_nowait() except queue.Empty: break self._process_audio_frame_sync(frame) # State könnte sich geändert haben (z.B. → PROCESSING) if self._state not in (ServiceState.RECORDING, ServiceState.WAITING_RESPONSE, ServiceState.FOLLOW_UP): break # STOPPED/PROCESSING: Frame verwerfen (implizit) except Exception as e: perror(f"Fehler in Audio-Thread: {e}") def _process_audio_frame_sync(self, audio_data: bytes) -> None: """ Verarbeitet einen Audio-Frame synchron im Audio-Thread. CPU-intensive Arbeit (Detector, VAD) passiert hier direkt. Netzwerk-I/O wird auf den asyncio-Loop dispatcht. """ state = self._state if state == ServiceState.LISTENING: # Wakeword-Erkennung (synchron — Detector ruft _on_wakeword_detected) self._audio_buffer.add_to_pre_buffer(audio_data) self._detector.process_frame(audio_data) # Server-gesteuerte Aufnahme: streamen via Event-Loop if self._server_recording_active and self._audio_streamer: self._schedule_async(self._audio_streamer(audio_data)) elif state == ServiceState.WAITING_RESPONSE: # Audio buffern während auf Arbitration gewartet wird self._audio_buffer.add_chunk(audio_data) elif state == ServiceState.RECORDING: # Aufnahme: Buffer + VAD (synchron), Streaming via Event-Loop self._audio_buffer.add_chunk(audio_data) vad_state = self._vad.process_frame(audio_data) if self._audio_streamer: self._schedule_async(self._audio_streamer(audio_data)) # Prüfe ob Aufnahme beendet werden soll. # WICHTIG: VADState.SILENCE bedeutet nur "gerade leise", NICHT # "Stille-Schwelle erreicht". Daher explizit silence_duration_ms prüfen. if vad_state == VADState.TIMEOUT: self._set_state(ServiceState.PROCESSING) self._schedule_async(self._finish_recording()) elif vad_state == VADState.NO_SPEECH: self._set_state(ServiceState.PROCESSING) self._schedule_async(self._finish_recording_no_speech()) elif (vad_state == VADState.SILENCE and self._vad.has_speech and self._vad.silence_duration_ms >= self._config.silence_timeout_seconds * 1000): self._set_state(ServiceState.PROCESSING) self._schedule_async(self._finish_recording()) elif state == ServiceState.FOLLOW_UP: # Rückfrage-Aufnahme: Buffer + VAD + Streaming (wie RECORDING) self._audio_buffer.add_chunk(audio_data) vad_state = self._vad.process_frame(audio_data) # Audio an Server streamen (gleich wie bei RECORDING) if self._audio_streamer: self._schedule_async(self._audio_streamer(audio_data)) if vad_state == VADState.TIMEOUT: self._set_state(ServiceState.PROCESSING) self._schedule_async(self._finish_follow_up()) elif (vad_state == VADState.SILENCE and self._vad.has_speech and self._vad.silence_duration_ms >= self._config.silence_timeout_seconds * 1000): self._set_state(ServiceState.PROCESSING) self._schedule_async(self._finish_follow_up()) elif state == ServiceState.PAUSED: # Pre-Buffer weiter füllen self._audio_buffer.add_to_pre_buffer(audio_data) # Server-gesteuerte Aufnahme: auch im PAUSED-Zustand streamen if self._server_recording_active and self._audio_streamer: self._schedule_async(self._audio_streamer(audio_data)) # === Wakeword-Erkennung und Session-Handling === def _on_wakeword_detected(self, detection: WakewordDetection) -> None: """ Callback wenn Wakeword erkannt wurde. Wird aus dem Audio-Thread (via Detector) aufgerufen. Args: detection: Erkennungs-Details """ if self._state != ServiceState.LISTENING: return # Cooldown prüfen (verhindert Mehrfach-Erkennung desselben Wakewords) now = time.monotonic() if now - self._last_wakeword_time < self._config.wakeword_cooldown_seconds: pdebug(f"Wakeword ignoriert — Cooldown ({now - self._last_wakeword_time:.1f}s < {self._config.wakeword_cooldown_seconds}s)") return self._last_wakeword_time = now pinfo( f"Wakeword erkannt: {detection.model_name} " f"(Confidence: {detection.confidence:.2f}, Level: {detection.audio_level:.3f})" ) # Server-gesteuerte Aufnahme auto-stoppen bei Wakeword if self._server_recording_active: recording_id = self._server_recording_id duration = self.stop_server_recording() if self._on_server_recording_stopped: self._schedule_async(self._on_server_recording_stopped(recording_id, duration)) # Audio-Ducking: Lautstaerke SOFORT reduzieren (im Audio-Thread) self._duck_audio() # Pausiere Erkennung (sofort im Audio-Thread) self._set_state(ServiceState.PAUSED) # Async-Handling auf Event-Loop dispatchen self._schedule_async(self._handle_wakeword_detection(detection)) # User-Callbacks (synchron) for callback in self._on_wakeword: try: callback(detection) except Exception: pass async def _handle_wakeword_detection(self, detection: WakewordDetection) -> None: """ Behandelt Wakeword-Erkennung. Läuft auf dem asyncio-Loop (dispatcht vom Audio-Thread). Args: detection: Erkennungs-Details """ # Generiere Session-ID session_id = self._generate_session_id() # Erstelle Session self._current_session = RecordingSession( session_id=session_id, wakeword_detection=detection, start_time=datetime.now(), ) if self._config.standalone_mode: # Standalone: Immer ausgewählt self._current_session.is_selected = True await self._start_recording() else: # Client: Arbitration beim Server await self._request_arbitration(detection) async def _request_arbitration( self, detection: WakewordDetection, forced: bool = False, ) -> None: """ Sendet Arbitration-Request an Server. Args: detection: Wakeword-Erkennung forced: True = Arbitration sofort entscheiden (kein Fenster warten) """ if not self._send_to_server: perror("Keine Server-Verbindung konfiguriert") self.resume() return self._set_state(ServiceState.WAITING_RESPONSE) # Starte Aufnahme im Hintergrund (für flüssigen Dialog) self._audio_buffer.start_recording(include_pre_buffer=True) try: # Sende Request response = await asyncio.wait_for( self._send_to_server("wakeword_detected", { "session_id": self._current_session.session_id, "wakeword_type": detection.wakeword_type.value, "model_name": detection.model_name, "confidence": detection.confidence, "audio_level": detection.audio_level, "audio_chunks": [ chunk.hex() for chunk in detection.audio_chunks ], "timestamp": detection.timestamp.isoformat(), "forced": forced, }), timeout=self._config.arbitration_timeout_seconds, ) if response.get("selected"): # Wir wurden ausgewählt if not self._current_session: pdebug("Arbitration: Session bereits beendet (Race Condition)") self.resume() return self._current_session.is_selected = True self._current_session.session_id = response.get( "session_id", self._current_session.session_id, ) await self._start_recording() else: # Nicht ausgewählt pdebug("Arbitration: Nicht ausgewählt") self._audio_buffer.clear() self._current_session = None self.resume() except asyncio.TimeoutError: pwarn("Arbitration-Timeout") self._audio_buffer.clear() self._current_session = None self.resume() except Exception as e: perror(f"Arbitration-Fehler: {e}") self._audio_buffer.clear() self._current_session = None self.resume() async def _start_recording(self) -> None: """Startet die Audio-Aufnahme.""" self._set_state(ServiceState.RECORDING) # VAD starten self._vad.start() # Buffer starten (falls nicht schon geschehen) if not self._audio_buffer.is_recording: self._audio_buffer.start_recording(include_pre_buffer=True) # Gepuffertes Audio (Pre-Buffer + Wartezeit) an Server senden if self._audio_streamer and not self._config.standalone_mode: buffered = self._audio_buffer.get_data() if buffered: await self._audio_streamer(buffered) if self._current_session: pinfo(f"Aufnahme gestartet (Session: {self._current_session.session_id})") async def _finish_recording(self) -> None: """Beendet die Aufnahme und sendet Daten.""" if not self._current_session: return # State wurde bereits im Audio-Thread auf PROCESSING gesetzt # Stoppe Aufnahme audio_data = self._audio_buffer.stop_recording() self._vad.stop() self._current_session.end_time = datetime.now() self._current_session.audio_data = audio_data pinfo( f"Aufnahme beendet: {len(audio_data)} Bytes, " f"{self._audio_buffer.duration_seconds:.1f}s" ) # Sende an Server (oder verarbeite lokal) await self._send_recording() async def _finish_recording_no_speech(self) -> None: """Beendet die Aufnahme bei No-Speech (mögliche Fehlauslösung).""" if not self._current_session: return # State wurde bereits im Audio-Thread auf PROCESSING gesetzt # Stoppe Aufnahme self._audio_buffer.stop_recording() self._vad.stop() self._current_session.end_time = datetime.now() pinfo("Aufnahme abgebrochen — keine Sprache erkannt") if not self._config.standalone_mode and self._send_to_server: # Client-Modus: RecordingComplete mit speech_detected=False senden try: await self._send_to_server("recording_complete", { "session_id": self._current_session.session_id, "duration_seconds": self._audio_buffer.duration_seconds, "ended_by": "no_speech", "speech_detected": False, "peak_level": self._vad._peak_level, "vad_stats": self._vad.get_stats(), }) except Exception as e: perror(f"Fehler beim Senden: {e}") # Session abschließen self._complete_session() async def _send_recording(self) -> None: """Sendet Aufnahme an Server oder verarbeitet lokal.""" session = self._current_session if not session: self.resume() return if self._config.standalone_mode: # Lokal: Event emittieren await self._application.events.emit("raw_audio_received", { "session_id": session.session_id, "audio_data": session.audio_data, "wakeword_type": session.wakeword_detection.wakeword_type.value, "duration_seconds": self._audio_buffer.duration_seconds, }) # Warte auf Verarbeitungs-Ergebnis await self._wait_for_processing_result() # Standalone: Session sofort abschließen self._complete_session() else: # Client: RecordingComplete Command senden (Audio wurde bereits gestreamt) if self._send_to_server: # ended_by aus VAD-State ableiten vad_state = self._vad.state if vad_state == VADState.SILENCE: ended_by = "silence" elif vad_state == VADState.TIMEOUT: ended_by = "timeout" else: ended_by = "unknown" try: await self._send_to_server("recording_complete", { "session_id": session.session_id, "duration_seconds": self._audio_buffer.duration_seconds, "ended_by": ended_by, "speech_detected": self._vad.has_speech, "peak_level": self._vad._peak_level, "vad_stats": self._vad.get_stats(), }) except Exception as e: perror(f"Fehler beim Senden: {e}") # Audio-Ducking: Lautstaerke SOFORT wiederherstellen # BEVOR die TTS-Antwort vom Server abgespielt wird. # Sonst ist die TTS-Ausgabe kaum hoerbar. self._unduck_audio() # Client-Modus: NICHT sofort zurück zu LISTENING! # Bleibe in PROCESSING und warte auf ConversationEnd vom Server. # Timeout als Sicherheitsnetz, falls Server nie antwortet. pinfo("RecordingComplete gesendet — warte auf ConversationEnd vom Server") self._schedule_conversation_end_timeout() async def _wait_for_processing_result(self) -> None: """Wartet auf Verarbeitungs-Ergebnis (Standalone-Modus).""" if not self._current_session: return session_id = self._current_session.session_id result_future: asyncio.Future[dict] = asyncio.Future() async def on_processing_result(event_name: str, data) -> None: """Handler für processing_result Event.""" if getattr(data, "session_id", None) == session_id: result_future.set_result({ "success": getattr(data, "success", True), "text": getattr(data, "text", ""), "intent": getattr(data, "intent", ""), "response_text": getattr(data, "response_text", ""), "error": getattr(data, "error", ""), }) # Event-Manager holen event_manager = getattr(self._application, "events", None) if not event_manager: pwarn("EventManager nicht verfügbar") return # Temporären Handler registrieren event_manager.register("processing_result", on_processing_result) try: # Auf Ergebnis warten (mit Timeout) timeout = self._config.follow_up_timeout_seconds result = await asyncio.wait_for(result_future, timeout=timeout) if result.get("error"): perror(f"Verarbeitungsfehler: {result['error']}") else: pinfo(f"Verarbeitung abgeschlossen: {result.get('intent', 'unbekannt')}") except asyncio.TimeoutError: pwarn(f"Timeout beim Warten auf Verarbeitungsergebnis ({timeout}s)") finally: # Handler wieder entfernen event_manager.unregister("processing_result", on_processing_result) async def _handle_follow_up(self, response: dict) -> None: """ Behandelt Rückfrage vom Server. Args: response: Server-Antwort mit Rückfrage """ pinfo(f"_handle_follow_up aufgerufen (state={self._state}, session={'ja' if self._current_session else 'nein'})") if not self._current_session: pwarn("Follow-Up abgebrochen: keine aktive Session") return self._current_session.follow_up_count += 1 self._current_session.last_activity = datetime.now() pinfo(f"Rückfrage #{self._current_session.follow_up_count}") # Starte neue Aufnahme für Antwort self._set_state(ServiceState.FOLLOW_UP) self._audio_buffer.clear() self._audio_buffer.start_recording(include_pre_buffer=True) self._vad.start() async def _finish_follow_up(self) -> None: """Beendet Follow-Up-Aufnahme und sendet RecordingComplete.""" if not self._current_session: return audio_data = self._audio_buffer.stop_recording() self._vad.stop() duration = len(audio_data) / (16000 * 2) # 16kHz, 16-bit pinfo(f"Follow-Up Aufnahme beendet: {len(audio_data)} Bytes, {duration:.1f}s") # Audio-Ducking: Lautstaerke wiederherstellen vor TTS self._unduck_audio() # Im Client-Modus: RecordingComplete senden (Audio wurde bereits gestreamt) if self._connection: try: from trixy_core.network.cmd.wakeword import RecordingComplete cmd = RecordingComplete( session_id=self._current_session.session_id, speech_detected=self._vad.has_speech, duration_seconds=duration, audio_level=self._vad._peak_level if hasattr(self._vad, "_peak_level") else 0.0, ) await self._connection.send_message(cmd) pinfo("Follow-Up RecordingComplete gesendet — warte auf Server") self._schedule_conversation_end_timeout() except Exception as e: perror(f"Follow-Up RecordingComplete Fehler: {e}") self._complete_session() return # Standalone-Modus: wie bisher if self._send_to_server: try: response = await self._send_to_server("follow_up_response", { "session_id": self._current_session.session_id, "audio_data": audio_data.hex(), "follow_up_number": self._current_session.follow_up_count, }) if response.get("follow_up"): await self._handle_follow_up(response) return except Exception as e: perror(f"Follow-Up-Fehler: {e}") self._complete_session() def _schedule_conversation_end_timeout(self) -> None: """Startet Timeout für ConversationEnd vom Server.""" async def _timeout_check(): await asyncio.sleep(15.0) # 15 Sekunden Timeout if self._state == ServiceState.PROCESSING and self._current_session: pwarn("ConversationEnd-Timeout — erzwinge Session-Ende") self._complete_session() self._schedule_async(_timeout_check()) def _complete_session(self) -> None: """Schließt aktuelle Session ab.""" session = self._current_session if session: # Callbacks for callback in self._on_recording_complete: try: callback(session) except Exception: pass self._current_session = None self._audio_buffer.clear() # Recording-Config-Override zuruecksetzen (falls temporaer geaendert) if hasattr(self, "_original_silence_timeout"): self._config.silence_timeout_seconds = self._original_silence_timeout if hasattr(self, "_original_max_duration_ms"): self._config.vad.max_duration_ms = self._original_max_duration_ms # Cooldown erneuern — verhindert Wakeword Re-Detection durch residuales # Audio im OpenWakeWord Sliding Window nach dem Resume self._last_wakeword_time = time.monotonic() # Audio-Queue leeren (verhindert Stau von alten Frames) self._drain_audio_queue() # Zurück zum Listening self.resume() async def _on_config_override(self, event_name: str, data) -> None: """ Temporaere Konfigurationsaenderung fuer die naechste Aufnahme. Wird von Plugins emittiert um z.B. laengere Aufnahmen (Notizen) oder kuerzere Silence-Timeouts zu ermoeglichen. Die Werte werden in _complete_session() zurueckgesetzt. """ silence = data.get("silence_timeout_seconds") if isinstance(data, dict) else getattr(data, "silence_timeout_seconds", None) max_recording = data.get("max_recording_seconds") if isinstance(data, dict) else getattr(data, "max_recording_seconds", None) if silence is not None: self._config.silence_timeout_seconds = float(silence) pdebug(f"Recording-Config Override: silence_timeout={silence}s") if max_recording is not None: self._config.vad.max_duration_ms = int(float(max_recording) * 1000) pdebug(f"Recording-Config Override: max_recording={max_recording}s") def _on_silence_detected(self) -> None: """Callback wenn Stille erkannt wurde.""" pdebug("Stille erkannt") def _on_recording_timeout(self) -> None: """Callback bei Aufnahme-Timeout.""" pinfo("Aufnahme-Timeout erreicht") def _on_no_speech_detected(self) -> None: """Callback wenn keine Sprache erkannt wurde.""" pinfo("Keine Sprache erkannt — mögliche Fehlauslösung") def _generate_session_id(self) -> str: """Generiert eine Session-ID.""" if self._session_id_generator: return self._session_id_generator() import uuid return f"ww-{uuid.uuid4().hex[:12]}" # === Public Callbacks === def on_wakeword(self, callback: Callable[[WakewordDetection], None]) -> None: """Registriert Callback für Wakeword-Erkennung.""" self._on_wakeword.append(callback) def on_recording_complete( self, callback: Callable[[RecordingSession], None], ) -> None: """Registriert Callback für abgeschlossene Aufnahme.""" self._on_recording_complete.append(callback) def on_state_change(self, callback: Callable[[ServiceState], None]) -> None: """Registriert Callback für State-Änderungen.""" self._on_state_change.append(callback) def set_server_connection( self, send_func: Callable[[str, dict], asyncio.Future], ) -> None: """ Setzt Server-Kommunikationsfunktion. Args: send_func: Async-Funktion zum Senden an Server """ self._send_to_server = send_func def set_session_id_generator(self, generator: Callable[[], str]) -> None: """Setzt Session-ID-Generator.""" self._session_id_generator = generator def set_audio_streamer(self, streamer: Callable[[bytes], Awaitable[bool]]) -> None: """ Setzt Audio-Streaming-Funktion (connection.send_audio). Args: streamer: Async-Funktion die Audio-Bytes an Server sendet """ self._audio_streamer = streamer def setup_for_client(self, connection: "ClientConnection", microphone: "MicrophoneCapture") -> None: """ Konfiguriert Service für Client-Betrieb mit Netzwerk. Args: connection: ClientConnection zum Server microphone: MicrophoneCapture-Instanz """ self._microphone = microphone self.set_audio_streamer(connection.send_audio) # Future-basierte _send_to_server Funktion async def send_to_server(event_type: str, data: dict) -> dict: if event_type == "wakeword_detected": from trixy_core.network.cmd.wakeword import WakewordDetected cmd = WakewordDetected( satellite_id=connection.satellite_id, wakeword_type=data.get("wakeword_type", ""), wakeword_model=data.get("model_name", ""), confidence=data.get("confidence", 0.0), audio_level=data.get("audio_level", 0.0), audio_chunks=data.get("audio_chunks", []), timestamp=data.get("timestamp", ""), client_session_id=data.get("session_id", ""), ) await connection.send_command(cmd) # Future für Arbitration-Antwort self._arbitration_future = asyncio.get_event_loop().create_future() return await self._arbitration_future elif event_type == "recording_complete": from trixy_core.network.cmd.wakeword import RecordingComplete cmd = RecordingComplete( session_id=data.get("session_id", ""), duration_seconds=data.get("duration_seconds", 0.0), ended_by=data.get("ended_by", ""), speech_detected=data.get("speech_detected", True), peak_level=data.get("peak_level", 0.0), vad_stats=data.get("vad_stats", {}), ) await connection.send_command(cmd) return {"success": True} return {} self.set_server_connection(send_to_server) async def handle_wakeword_selected(self, data) -> None: """ WakewordSelected vom Server → Future auflösen. Args: data: WakewordSelected-Daten """ if self._arbitration_future and not self._arbitration_future.done(): session_id = getattr(data, "session_id", "") or "" conversation_id = getattr(data, "conversation_id", "") or "" if isinstance(data, dict): session_id = data.get("session_id", "") conversation_id = data.get("conversation_id", "") self._arbitration_future.set_result({ "selected": True, "session_id": session_id or conversation_id, }) async def handle_wakeword_abort(self, data) -> None: """ WakewordAbort vom Server → Future auflösen. Args: data: WakewordAbort-Daten """ if self._arbitration_future and not self._arbitration_future.done(): self._arbitration_future.set_result({"selected": False}) # Cooldown-Timestamp aktualisieren damit nach Abort keine sofortige Re-Detection self._last_wakeword_time = time.monotonic() async def handle_conversation_end(self) -> None: """ConversationEnd → Aufnahme beenden, zurück zu LISTENING.""" if self._state in (ServiceState.RECORDING, ServiceState.FOLLOW_UP): self._vad.stop() self._audio_buffer.stop_recording() if self._state in (ServiceState.RECORDING, ServiceState.FOLLOW_UP, ServiceState.PROCESSING, ServiceState.WAITING_RESPONSE): pinfo("ConversationEnd empfangen — zurück zu LISTENING") self._complete_session() # === Server-gesteuerte Aufnahme === def start_server_recording(self, recording_id: str) -> None: """Aktiviert Audio-Streaming im LISTENING-Zustand.""" self._server_recording_active = True self._server_recording_id = recording_id self._server_recording_start_time = time.monotonic() pdebug(f"Server-Recording gestartet: {recording_id}") def stop_server_recording(self) -> float: """Beendet Audio-Streaming. Gibt Dauer zurück.""" duration = time.monotonic() - self._server_recording_start_time if self._server_recording_active else 0.0 self._server_recording_active = False self._server_recording_id = "" self._server_recording_start_time = 0.0 pdebug(f"Server-Recording gestoppt: {duration:.1f}s") return duration def set_server_recording_stopped_callback( self, cb: Callable[[str, float], Awaitable[None]] ) -> None: """Setzt Callback für Wakeword-getriggertes Stop.""" self._on_server_recording_stopped = cb def get_stats(self) -> dict: """Gibt Statistiken zurück.""" return { "state": self._state.value, "detector": self._detector.stats, "buffer": self._audio_buffer.get_stats(), "vad": self._vad.get_stats() if self._vad.is_active else {}, "session": { "id": self._current_session.session_id if self._current_session else None, "follow_up_count": self._current_session.follow_up_count if self._current_session else 0, }, }