| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379 |
- # -*- coding: utf-8 -*-
- """
- Wakeword Service - Haupt-Service für Wakeword-Erkennung.
- Läuft auf Satellite und Standalone.
- Audio-Verarbeitung läuft in einem dedizierten Thread (nicht im asyncio-Loop),
- analog zu den OpenWakeWord-Beispielen. Nur Netzwerk-I/O und Event-Emission
- werden auf den asyncio-Loop dispatcht.
- """
- from dataclasses import dataclass, field
- from datetime import datetime
- from enum import Enum
- from typing import TYPE_CHECKING, Callable, Any, Awaitable
- import asyncio
- import queue
- import time
- import threading
- from trixy_core.service import IService, ServicePriority, ServiceGroup
- from trixy_core.wakeword.detector import (
- WakewordDetector,
- WakewordDetection,
- DetectorConfig,
- WakewordType,
- )
- from trixy_core.wakeword.audio_buffer import AudioBuffer, BufferConfig
- from trixy_core.wakeword.vad import VoiceActivityDetector, VADConfig, VADState
- from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn
- if TYPE_CHECKING:
- from trixy_core.application import BaseApplication
- from trixy_core.audio.microphone import MicrophoneCapture
- from trixy_core.network.client_connection import ClientConnection
- class ServiceState(Enum):
- """Zustand des Wakeword-Service."""
- STOPPED = "stopped" # Service gestoppt
- LISTENING = "listening" # Wakeword-Erkennung aktiv
- PAUSED = "paused" # Pausiert (z.B. während Arbitration)
- RECORDING = "recording" # Aufnahme läuft
- WAITING_RESPONSE = "waiting" # Warte auf Antwort vom Server
- PROCESSING = "processing" # Verarbeitung läuft
- FOLLOW_UP = "follow_up" # Warte auf Rückfrage-Antwort
- @dataclass
- class WakewordServiceConfig:
- """Konfiguration für den Wakeword-Service."""
- # Detector-Konfiguration
- detector: DetectorConfig = field(default_factory=DetectorConfig)
- # Buffer-Konfiguration
- buffer: BufferConfig = field(default_factory=BufferConfig)
- # VAD-Konfiguration
- vad: VADConfig = field(default_factory=VADConfig)
- # Aufnahme-Konfiguration
- max_recording_seconds: float = 60.0
- silence_timeout_seconds: float = 3.0
- follow_up_timeout_seconds: float = 60.0
- # Arbitration
- arbitration_timeout_seconds: float = 2.0
- # Cooldown nach Wakeword-Erkennung (verhindert Mehrfach-Erkennung)
- wakeword_cooldown_seconds: float = 3.0
- # Standalone-Modus (keine Arbitration)
- standalone_mode: bool = False
- # Audio-Quelle
- audio_source: str | None = None # None = Standard-Mikrofon
- @dataclass
- class RecordingSession:
- """Eine Aufnahme-Session."""
- session_id: str
- wakeword_detection: WakewordDetection
- start_time: datetime
- end_time: datetime | None = None
- audio_data: bytes = b""
- state: ServiceState = ServiceState.RECORDING
- is_selected: bool = False
- # Für Rückfragen
- follow_up_count: int = 0
- last_activity: datetime = field(default_factory=datetime.now)
- class WakewordService(IService):
- """
- Wakeword Detection Service.
- Verantwortlich für:
- - Wakeword-Erkennung in Echtzeit
- - Audio-Aufnahme nach Wakeword
- - VAD für Sprach-Ende-Erkennung
- - Kommunikation mit Server (Arbitration)
- Audio-Verarbeitung läuft in einem dedizierten Thread, um den
- asyncio-Event-Loop nicht zu blockieren (wie in den OpenWakeWord-Beispielen).
- """
- PRIORITY = ServicePriority.CORE
- GROUP = ServiceGroup.AUDIO
- DEPENDENCIES = []
- def __init__(
- self,
- application: "BaseApplication",
- config: WakewordServiceConfig | None = None,
- ):
- """
- Initialisiert den Wakeword-Service.
- Args:
- application: Anwendungs-Instanz
- config: Service-Konfiguration
- """
- super().__init__(application)
- self._config = config or WakewordServiceConfig()
- # Komponenten
- self._detector = WakewordDetector(self._config.detector)
- self._audio_buffer = AudioBuffer(self._config.buffer)
- self._vad = VoiceActivityDetector(self._config.vad)
- # State
- self._state = ServiceState.STOPPED
- self._state_lock = threading.RLock()
- self._current_session: RecordingSession | None = None
- # Audio-Thread (dedizierter Thread für Echtzeit-Verarbeitung)
- self._audio_thread: threading.Thread | None = None
- self._audio_queue: queue.Queue[bytes] = queue.Queue(maxsize=50)
- self._stop_event = threading.Event()
- # Referenz auf den asyncio-Event-Loop (für Thread → Loop Kommunikation)
- self._loop: asyncio.AbstractEventLoop | None = None
- # Callbacks
- self._on_wakeword: list[Callable[[WakewordDetection], None]] = []
- self._on_recording_complete: list[Callable[[RecordingSession], None]] = []
- self._on_state_change: list[Callable[[ServiceState], None]] = []
- # Server-Kommunikation (wird von Client gesetzt)
- self._send_to_server: Callable[[str, dict], asyncio.Future] | None = None
- self._session_id_generator: Callable[[], str] | None = None
- # Mikrofon (Client-Modus)
- self._microphone: MicrophoneCapture | None = None
- # Audio-Streaming an Server (connection.send_audio)
- self._audio_streamer: Callable[[bytes], Awaitable[bool]] | None = None
- # Pending Future für Arbitration-Antwort
- self._arbitration_future: asyncio.Future | None = None
- # Cooldown für Wakeword-Erkennung (verhindert Mehrfach-Erkennung)
- self._last_wakeword_time: float = 0.0
- # Audio-Ducking: ALSA-Lautstaerke bei Wakeword reduzieren (Client-seitig)
- self._volume_handler: object | None = None # VolumeHandler Instanz
- self._ducking_volume_pct: int = 10 # Ziel-Lautstaerke beim Ducken
- self._pre_duck_volume: int | None = None # Gespeicherte Original-Lautstaerke
- self._is_ducked: bool = False
- # Externe Pause (Server/Plugin-gesteuert)
- self._externally_paused: bool = False
- # Server-gesteuerte Aufnahme
- self._server_recording_active: bool = False
- self._server_recording_id: str = ""
- self._server_recording_start_time: float = 0.0
- self._on_server_recording_stopped: Callable[[str, float], Awaitable[None]] | None = None
- @property
- def config(self) -> WakewordServiceConfig:
- """Gibt Konfiguration zurück."""
- return self._config
- @property
- def state(self) -> ServiceState:
- """Aktueller Zustand."""
- with self._state_lock:
- return self._state
- @property
- def externally_paused(self) -> bool:
- """Prüft ob die Erkennung extern pausiert ist."""
- return self._externally_paused
- @property
- def is_listening(self) -> bool:
- """Prüft ob Service auf Wakeword hört."""
- return self._state == ServiceState.LISTENING
- @property
- def is_recording(self) -> bool:
- """Prüft ob Aufnahme läuft."""
- return self._state in (ServiceState.RECORDING, ServiceState.FOLLOW_UP)
- @property
- def current_session(self) -> RecordingSession | None:
- """Aktuelle Aufnahme-Session."""
- return self._current_session
- def _set_state(self, new_state: ServiceState) -> None:
- """Setzt neuen Zustand und triggert Callbacks + Events."""
- with self._state_lock:
- if self._state == new_state:
- return
- old_state = self._state
- self._state = new_state
- # Callbacks außerhalb des Locks
- for callback in self._on_state_change:
- try:
- callback(new_state)
- except Exception:
- pass
- pdebug(f"WakewordService State: {old_state.value} -> {new_state.value}")
- # Events fuer wichtige State-Wechsel (async auf Event-Loop dispatchen)
- event_map = {
- ServiceState.LISTENING: "wakeword_listening",
- ServiceState.PAUSED: "wakeword_paused",
- ServiceState.RECORDING: "recording_started",
- ServiceState.PROCESSING: "recording_stopped",
- }
- event_name = event_map.get(new_state)
- if event_name:
- event_data = {"old_state": old_state.value, "new_state": new_state.value}
- if self._current_session:
- event_data["session_id"] = self._current_session.session_id
- if event_name == "recording_stopped" and self._current_session:
- elapsed = (datetime.now() - self._current_session.start_time).total_seconds()
- event_data["duration_seconds"] = elapsed
- self._emit_event(event_name, event_data)
- # === Audio-Ducking (ALSA, laeuft im Audio-Thread) ===
- def set_volume_handler(self, handler: object, ducking_pct: int = 10) -> None:
- """
- Setzt den VolumeHandler fuer Audio-Ducking.
- Wird vom Client bei der Initialisierung aufgerufen.
- Args:
- handler: VolumeHandler Instanz (hat get_volume/set_volume)
- ducking_pct: Ziel-Lautstaerke beim Ducken (0-100)
- """
- self._volume_handler = handler
- self._ducking_volume_pct = ducking_pct
- pdebug(f"WakewordService: Audio-Ducking aktiviert ({ducking_pct}%)")
- def _duck_audio(self) -> None:
- """
- Reduziert die ALSA-Lautstaerke sofort (synchron, im Audio-Thread).
- Nutzt amixer direkt statt den async VolumeHandler,
- weil wir im Audio-Thread sind und nicht warten koennen.
- """
- if self._is_ducked:
- return
- import subprocess
- import shutil
- if not shutil.which("amixer"):
- return
- try:
- # Aktuelle Lautstaerke lesen
- result = subprocess.run(
- ["amixer", "get", "Master"],
- capture_output=True, text=True, timeout=2,
- )
- if result.returncode == 0:
- for line in result.stdout.splitlines():
- if "%" in line:
- start = line.index("[") + 1
- end = line.index("%")
- self._pre_duck_volume = int(line[start:end])
- break
- # Lautstaerke reduzieren
- subprocess.run(
- ["amixer", "set", "Master", f"{self._ducking_volume_pct}%"],
- capture_output=True, timeout=2,
- )
- self._is_ducked = True
- pdebug(f"Audio-Duck: {self._pre_duck_volume}% → {self._ducking_volume_pct}%")
- except Exception as e:
- pdebug(f"Audio-Duck Fehler: {e}")
- def _unduck_audio(self) -> None:
- """Stellt die ALSA-Lautstaerke wieder her (synchron)."""
- if not self._is_ducked or self._pre_duck_volume is None:
- return
- import subprocess
- import shutil
- if not shutil.which("amixer"):
- return
- try:
- subprocess.run(
- ["amixer", "set", "Master", f"{self._pre_duck_volume}%"],
- capture_output=True, timeout=2,
- )
- pdebug(f"Audio-Unduck: {self._ducking_volume_pct}% → {self._pre_duck_volume}%")
- except Exception as e:
- pdebug(f"Audio-Unduck Fehler: {e}")
- self._pre_duck_volume = None
- self._is_ducked = False
- def _emit_event(self, event_name: str, data: dict) -> None:
- """Emittiert ein Event thread-safe auf dem asyncio-Loop."""
- events = getattr(self._application, "events", None)
- if events and self._loop and self._loop.is_running():
- self._schedule_async(events.emit(event_name, data))
- def _schedule_async(self, coro: Any) -> None:
- """
- Plant eine Coroutine thread-safe auf dem asyncio-Loop ein.
- Wird aus dem Audio-Thread aufgerufen für Netzwerk-I/O und Events.
- """
- if self._loop and self._loop.is_running():
- asyncio.run_coroutine_threadsafe(coro, self._loop)
- async def start(self) -> None:
- """Startet den Service (IService)."""
- await self.on_start()
- async def stop(self) -> None:
- """Stoppt den Service (IService)."""
- await self.on_stop()
- async def on_start(self) -> None:
- """Service-Start."""
- pinfo("Lade Wakeword-Modelle...")
- try:
- self._detector.load_models()
- pinfo(f"Wakeword-Modelle geladen: {self._detector.loaded_models}")
- except Exception as e:
- perror(f"Fehler beim Laden der Wakeword-Modelle: {e}")
- raise
- # Originale Aufnahme-Konfiguration merken (fuer recording_config_override Reset)
- self._original_silence_timeout = self._config.silence_timeout_seconds
- self._original_max_duration_ms = self._config.vad.max_duration_ms
- # Event-Handler fuer temporaere Konfigurationsaenderungen registrieren
- event_manager = getattr(self._application, "events", None)
- if event_manager:
- event_manager.register("recording_config_override", self._on_config_override)
- event_manager.register("wakeword_manual_trigger", self._on_manual_trigger)
- # Registriere Detector-Callback
- self._detector.on_detection(self._on_wakeword_detected)
- # VAD-Callbacks
- self._vad.on_silence(self._on_silence_detected)
- self._vad.on_timeout(self._on_recording_timeout)
- self._vad.on_no_speech(self._on_no_speech_detected)
- async def on_stop(self) -> None:
- """Service-Stop."""
- await self.stop_listening()
- self._detector.unload_models()
- async def start_listening(self) -> None:
- """Startet die Wakeword-Erkennung."""
- if self._state != ServiceState.STOPPED:
- return
- # Event-Loop-Referenz speichern
- self._loop = asyncio.get_running_loop()
- # Queue leeren (alte Frames von vorherigem Lauf verwerfen)
- self._drain_audio_queue()
- self._set_state(ServiceState.LISTENING)
- self._stop_event.clear()
- # Starte dedizierten Audio-Thread
- self._audio_thread = threading.Thread(
- target=self._audio_processing_loop,
- name="wakeword-audio",
- daemon=True,
- )
- self._audio_thread.start()
- # Mikrofon starten und direkt mit Queue verbinden
- if self._microphone:
- self._microphone.set_callback(self._mic_callback)
- self._microphone.start()
- pinfo("Wakeword-Erkennung gestartet")
- async def stop_listening(self) -> None:
- """Stoppt die Wakeword-Erkennung."""
- if self._microphone:
- self._microphone.stop()
- self._stop_event.set()
- if self._audio_thread and self._audio_thread.is_alive():
- self._audio_thread.join(timeout=2.0)
- self._audio_thread = None
- self._set_state(ServiceState.STOPPED)
- pinfo("Wakeword-Erkennung gestoppt")
- def _mic_callback(self, data: bytes) -> None:
- """
- Mikrofon-Callback (läuft in PortAudio-Thread).
- Ringbuffer-Semantik: Bei voller Queue wird der älteste Frame verworfen.
- Kein call_soon_threadsafe nötig — queue.Queue ist thread-safe.
- """
- try:
- self._audio_queue.put_nowait(data)
- except queue.Full:
- # Queue voll: ältesten Frame verwerfen, neuen einfügen
- try:
- self._audio_queue.get_nowait()
- except queue.Empty:
- pass
- try:
- self._audio_queue.put_nowait(data)
- except queue.Full:
- pass
- def pause(self) -> None:
- """Pausiert die Wakeword-Erkennung."""
- if self._state == ServiceState.LISTENING:
- self._set_state(ServiceState.PAUSED)
- def _drain_audio_queue(self) -> None:
- """Leert die Audio-Queue (verwirft aufgestaute Frames)."""
- drained = 0
- while True:
- try:
- self._audio_queue.get_nowait()
- drained += 1
- except queue.Empty:
- break
- if drained > 0:
- pdebug(f"Audio-Queue geleert: {drained} Frames verworfen")
- def resume(self) -> None:
- """Setzt die Wakeword-Erkennung nach interner Verarbeitung fort."""
- if self._state == ServiceState.STOPPED:
- return
- # Audio-Ducking: Lautstaerke wiederherstellen
- self._unduck_audio()
- # Queue leeren (alle Frames aus der Recording-Phase verwerfen)
- self._drain_audio_queue()
- self._detector.reset()
- if self._externally_paused:
- self._set_state(ServiceState.PAUSED)
- pdebug("Intern fortgesetzt, aber extern pausiert — bleibe in PAUSED")
- else:
- self._set_state(ServiceState.LISTENING)
- def external_pause(self) -> None:
- """Pausiert Wakeword-Erkennung extern (Server/Plugin)."""
- self._externally_paused = True
- if self._state == ServiceState.LISTENING:
- self._set_state(ServiceState.PAUSED)
- pinfo("Wakeword-Erkennung extern pausiert")
- def external_resume(self) -> None:
- """Setzt extern pausierte Wakeword-Erkennung fort."""
- self._externally_paused = False
- if self._state == ServiceState.PAUSED:
- self._detector.reset()
- self._set_state(ServiceState.LISTENING)
- pinfo("Wakeword-Erkennung extern fortgesetzt")
- async def external_stop(self) -> None:
- """Stoppt Wakeword-Erkennung komplett (Modelle entladen)."""
- self._externally_paused = False
- await self.stop_listening()
- pinfo("Wakeword-Erkennung extern gestoppt")
- async def external_start(self) -> None:
- """Startet Wakeword-Erkennung (Modelle laden + Listening)."""
- if self._state != ServiceState.STOPPED:
- pdebug("Wakeword-Service läuft bereits")
- return
- try:
- self._detector.load_models()
- pinfo(f"Wakeword-Modelle geladen: {self._detector.loaded_models}")
- except Exception as e:
- perror(f"Fehler beim Laden der Wakeword-Modelle: {e}")
- return
- self._detector.on_detection(self._on_wakeword_detected)
- await self.start_listening()
- if self._externally_paused:
- self._set_state(ServiceState.PAUSED)
- pinfo("Wakeword-Erkennung extern gestartet")
- async def _on_manual_trigger(self, event_name: str, event_data: dict) -> None:
- """
- Event-Handler fuer manuellen Wakeword-Trigger (z.B. HID Hook-Taste).
- Ueberspringt die Wakeword-Erkennung und geht direkt in die Aufnahme.
- Arbitration wird forciert (sofortige Auswahl, kein Zeitfenster).
- """
- await self.trigger_manual(
- device=event_data.get("device", "unknown"),
- forced=event_data.get("forced", True),
- )
- async def trigger_manual(self, device: str = "button", forced: bool = True) -> None:
- """
- Manueller Wakeword-Trigger (z.B. durch Tastendruck).
- Ueberspringt die Wakeword-Detection und startet direkt die Aufnahme.
- Mit forced=True wird die Arbitration sofort entschieden (kein Warten
- auf andere Satellites).
- Args:
- device: Name des ausloesenden Geraets
- forced: True = Arbitration sofort entscheiden (kein Fenster)
- """
- if self._state != ServiceState.LISTENING:
- pdebug(f"Manueller Trigger ignoriert — State: {self._state.value}")
- return
- # Cooldown pruefen
- now = time.monotonic()
- if now - self._last_wakeword_time < self._config.wakeword_cooldown_seconds:
- pdebug("Manueller Trigger ignoriert — Cooldown")
- return
- self._last_wakeword_time = now
- pinfo(f"Manueller Wakeword-Trigger ({device}, forced={forced})")
- # Audio-Ducking sofort
- self._duck_audio()
- # Synthetische Detection erstellen (kein echtes Wakeword noetig)
- detection = WakewordDetection(
- wakeword_type=WakewordType.CUSTOM,
- model_name=f"manual:{device}",
- confidence=1.0, # Maximale Confidence (manuell ausgeloest)
- audio_level=1.0, # Maximaler Audio-Level (fuer Arbitration-Gewinn)
- timestamp=datetime.now(),
- audio_chunks=[], # Kein Wakeword-Audio bei manuellem Trigger
- )
- # Pausiere Erkennung
- self._set_state(ServiceState.PAUSED)
- # Session erstellen
- session_id = self._generate_session_id()
- self._current_session = RecordingSession(
- session_id=session_id,
- wakeword_detection=detection,
- start_time=datetime.now(),
- )
- if self._config.standalone_mode:
- # Standalone: Direkt aufnehmen
- self._current_session.is_selected = True
- await self._start_recording()
- else:
- # Client: Arbitration mit Force-Flag
- await self._request_arbitration(detection, forced=forced)
- async def feed_audio(self, audio_data: bytes) -> None:
- """
- Füttert Audio-Daten in den Service (async-API für externen Gebrauch).
- Bei voller Queue wird der älteste Frame verworfen (Ringbuffer-Semantik).
- Args:
- audio_data: 16-bit PCM Audio (16kHz, Mono)
- """
- try:
- self._audio_queue.put_nowait(audio_data)
- except queue.Full:
- try:
- self._audio_queue.get_nowait()
- except queue.Empty:
- pass
- try:
- self._audio_queue.put_nowait(audio_data)
- except queue.Full:
- pass
- # === Audio-Thread (dedizierter Thread, kein asyncio) ===
- def _audio_processing_loop(self) -> None:
- """
- Haupt-Audio-Verarbeitungsschleife — läuft in eigenem Thread.
- Synchron wie in den OpenWakeWord-Beispielen:
- queue.get() blockiert bis Frame da → process_frame() → nächster get().
- Der Thread ist unabhängig vom asyncio-Loop und kann daher nie
- durch Event-Loop-Last verzögert werden.
- """
- while not self._stop_event.is_set():
- try:
- # Blockierend auf nächsten Frame warten (mit Timeout)
- try:
- audio_data = self._audio_queue.get(timeout=0.1)
- except queue.Empty:
- continue
- state = self._state
- if state in (ServiceState.LISTENING, ServiceState.PAUSED):
- # LISTENING: Nur den LETZTEN Frame verarbeiten.
- # Ältere Frames verwerfen — OpenWakeWord puffert intern
- # und braucht keine lückenlose Frame-Folge.
- latest = audio_data
- skipped = 0
- while True:
- try:
- latest = self._audio_queue.get_nowait()
- skipped += 1
- except queue.Empty:
- break
- self._process_audio_frame_sync(latest)
- elif state in (ServiceState.RECORDING, ServiceState.WAITING_RESPONSE,
- ServiceState.FOLLOW_UP):
- # RECORDING: ALLE Frames verarbeiten (VAD + Audio-Streaming).
- self._process_audio_frame_sync(audio_data)
- while True:
- try:
- frame = self._audio_queue.get_nowait()
- except queue.Empty:
- break
- self._process_audio_frame_sync(frame)
- # State könnte sich geändert haben (z.B. → PROCESSING)
- if self._state not in (ServiceState.RECORDING,
- ServiceState.WAITING_RESPONSE,
- ServiceState.FOLLOW_UP):
- break
- # STOPPED/PROCESSING: Frame verwerfen (implizit)
- except Exception as e:
- perror(f"Fehler in Audio-Thread: {e}")
- def _process_audio_frame_sync(self, audio_data: bytes) -> None:
- """
- Verarbeitet einen Audio-Frame synchron im Audio-Thread.
- CPU-intensive Arbeit (Detector, VAD) passiert hier direkt.
- Netzwerk-I/O wird auf den asyncio-Loop dispatcht.
- """
- state = self._state
- if state == ServiceState.LISTENING:
- # Wakeword-Erkennung (synchron — Detector ruft _on_wakeword_detected)
- self._audio_buffer.add_to_pre_buffer(audio_data)
- self._detector.process_frame(audio_data)
- # Server-gesteuerte Aufnahme: streamen via Event-Loop
- if self._server_recording_active and self._audio_streamer:
- self._schedule_async(self._audio_streamer(audio_data))
- elif state == ServiceState.WAITING_RESPONSE:
- # Audio buffern während auf Arbitration gewartet wird
- self._audio_buffer.add_chunk(audio_data)
- elif state == ServiceState.RECORDING:
- # Aufnahme: Buffer + VAD (synchron), Streaming via Event-Loop
- self._audio_buffer.add_chunk(audio_data)
- vad_state = self._vad.process_frame(audio_data)
- if self._audio_streamer:
- self._schedule_async(self._audio_streamer(audio_data))
- # Prüfe ob Aufnahme beendet werden soll.
- # WICHTIG: VADState.SILENCE bedeutet nur "gerade leise", NICHT
- # "Stille-Schwelle erreicht". Daher explizit silence_duration_ms prüfen.
- if vad_state == VADState.TIMEOUT:
- self._set_state(ServiceState.PROCESSING)
- self._schedule_async(self._finish_recording())
- elif vad_state == VADState.NO_SPEECH:
- self._set_state(ServiceState.PROCESSING)
- self._schedule_async(self._finish_recording_no_speech())
- elif (vad_state == VADState.SILENCE
- and self._vad.has_speech
- and self._vad.silence_duration_ms >= self._config.silence_timeout_seconds * 1000):
- self._set_state(ServiceState.PROCESSING)
- self._schedule_async(self._finish_recording())
- elif state == ServiceState.FOLLOW_UP:
- # Rückfrage-Aufnahme: Buffer + VAD + Streaming (wie RECORDING)
- self._audio_buffer.add_chunk(audio_data)
- vad_state = self._vad.process_frame(audio_data)
- # Audio an Server streamen (gleich wie bei RECORDING)
- if self._audio_streamer:
- self._schedule_async(self._audio_streamer(audio_data))
- if vad_state == VADState.TIMEOUT:
- self._set_state(ServiceState.PROCESSING)
- self._schedule_async(self._finish_follow_up())
- elif (vad_state == VADState.SILENCE
- and self._vad.has_speech
- and self._vad.silence_duration_ms >= self._config.silence_timeout_seconds * 1000):
- self._set_state(ServiceState.PROCESSING)
- self._schedule_async(self._finish_follow_up())
- elif state == ServiceState.PAUSED:
- # Pre-Buffer weiter füllen
- self._audio_buffer.add_to_pre_buffer(audio_data)
- # Server-gesteuerte Aufnahme: auch im PAUSED-Zustand streamen
- if self._server_recording_active and self._audio_streamer:
- self._schedule_async(self._audio_streamer(audio_data))
- # === Wakeword-Erkennung und Session-Handling ===
- def _on_wakeword_detected(self, detection: WakewordDetection) -> None:
- """
- Callback wenn Wakeword erkannt wurde.
- Wird aus dem Audio-Thread (via Detector) aufgerufen.
- Args:
- detection: Erkennungs-Details
- """
- if self._state != ServiceState.LISTENING:
- return
- # Cooldown prüfen (verhindert Mehrfach-Erkennung desselben Wakewords)
- now = time.monotonic()
- if now - self._last_wakeword_time < self._config.wakeword_cooldown_seconds:
- pdebug(f"Wakeword ignoriert — Cooldown ({now - self._last_wakeword_time:.1f}s < {self._config.wakeword_cooldown_seconds}s)")
- return
- self._last_wakeword_time = now
- pinfo(
- f"Wakeword erkannt: {detection.model_name} "
- f"(Confidence: {detection.confidence:.2f}, Level: {detection.audio_level:.3f})"
- )
- # Server-gesteuerte Aufnahme auto-stoppen bei Wakeword
- if self._server_recording_active:
- recording_id = self._server_recording_id
- duration = self.stop_server_recording()
- if self._on_server_recording_stopped:
- self._schedule_async(self._on_server_recording_stopped(recording_id, duration))
- # Audio-Ducking: Lautstaerke SOFORT reduzieren (im Audio-Thread)
- self._duck_audio()
- # Pausiere Erkennung (sofort im Audio-Thread)
- self._set_state(ServiceState.PAUSED)
- # Async-Handling auf Event-Loop dispatchen
- self._schedule_async(self._handle_wakeword_detection(detection))
- # User-Callbacks (synchron)
- for callback in self._on_wakeword:
- try:
- callback(detection)
- except Exception:
- pass
- async def _handle_wakeword_detection(self, detection: WakewordDetection) -> None:
- """
- Behandelt Wakeword-Erkennung.
- Läuft auf dem asyncio-Loop (dispatcht vom Audio-Thread).
- Args:
- detection: Erkennungs-Details
- """
- # Generiere Session-ID
- session_id = self._generate_session_id()
- # Erstelle Session
- self._current_session = RecordingSession(
- session_id=session_id,
- wakeword_detection=detection,
- start_time=datetime.now(),
- )
- if self._config.standalone_mode:
- # Standalone: Immer ausgewählt
- self._current_session.is_selected = True
- await self._start_recording()
- else:
- # Client: Arbitration beim Server
- await self._request_arbitration(detection)
- async def _request_arbitration(
- self, detection: WakewordDetection, forced: bool = False,
- ) -> None:
- """
- Sendet Arbitration-Request an Server.
- Args:
- detection: Wakeword-Erkennung
- forced: True = Arbitration sofort entscheiden (kein Fenster warten)
- """
- if not self._send_to_server:
- perror("Keine Server-Verbindung konfiguriert")
- self.resume()
- return
- self._set_state(ServiceState.WAITING_RESPONSE)
- # Starte Aufnahme im Hintergrund (für flüssigen Dialog)
- self._audio_buffer.start_recording(include_pre_buffer=True)
- try:
- # Sende Request
- response = await asyncio.wait_for(
- self._send_to_server("wakeword_detected", {
- "session_id": self._current_session.session_id,
- "wakeword_type": detection.wakeword_type.value,
- "model_name": detection.model_name,
- "confidence": detection.confidence,
- "audio_level": detection.audio_level,
- "audio_chunks": [
- chunk.hex() for chunk in detection.audio_chunks
- ],
- "timestamp": detection.timestamp.isoformat(),
- "forced": forced,
- }),
- timeout=self._config.arbitration_timeout_seconds,
- )
- if response.get("selected"):
- # Wir wurden ausgewählt
- if not self._current_session:
- pdebug("Arbitration: Session bereits beendet (Race Condition)")
- self.resume()
- return
- self._current_session.is_selected = True
- self._current_session.session_id = response.get(
- "session_id",
- self._current_session.session_id,
- )
- await self._start_recording()
- else:
- # Nicht ausgewählt
- pdebug("Arbitration: Nicht ausgewählt")
- self._audio_buffer.clear()
- self._current_session = None
- self.resume()
- except asyncio.TimeoutError:
- pwarn("Arbitration-Timeout")
- self._audio_buffer.clear()
- self._current_session = None
- self.resume()
- except Exception as e:
- perror(f"Arbitration-Fehler: {e}")
- self._audio_buffer.clear()
- self._current_session = None
- self.resume()
- async def _start_recording(self) -> None:
- """Startet die Audio-Aufnahme."""
- self._set_state(ServiceState.RECORDING)
- # VAD starten
- self._vad.start()
- # Buffer starten (falls nicht schon geschehen)
- if not self._audio_buffer.is_recording:
- self._audio_buffer.start_recording(include_pre_buffer=True)
- # Gepuffertes Audio (Pre-Buffer + Wartezeit) an Server senden
- if self._audio_streamer and not self._config.standalone_mode:
- buffered = self._audio_buffer.get_data()
- if buffered:
- await self._audio_streamer(buffered)
- if self._current_session:
- pinfo(f"Aufnahme gestartet (Session: {self._current_session.session_id})")
- async def _finish_recording(self) -> None:
- """Beendet die Aufnahme und sendet Daten."""
- if not self._current_session:
- return
- # State wurde bereits im Audio-Thread auf PROCESSING gesetzt
- # Stoppe Aufnahme
- audio_data = self._audio_buffer.stop_recording()
- self._vad.stop()
- self._current_session.end_time = datetime.now()
- self._current_session.audio_data = audio_data
- pinfo(
- f"Aufnahme beendet: {len(audio_data)} Bytes, "
- f"{self._audio_buffer.duration_seconds:.1f}s"
- )
- # Sende an Server (oder verarbeite lokal)
- await self._send_recording()
- async def _finish_recording_no_speech(self) -> None:
- """Beendet die Aufnahme bei No-Speech (mögliche Fehlauslösung)."""
- if not self._current_session:
- return
- # State wurde bereits im Audio-Thread auf PROCESSING gesetzt
- # Stoppe Aufnahme
- self._audio_buffer.stop_recording()
- self._vad.stop()
- self._current_session.end_time = datetime.now()
- pinfo("Aufnahme abgebrochen — keine Sprache erkannt")
- if not self._config.standalone_mode and self._send_to_server:
- # Client-Modus: RecordingComplete mit speech_detected=False senden
- try:
- await self._send_to_server("recording_complete", {
- "session_id": self._current_session.session_id,
- "duration_seconds": self._audio_buffer.duration_seconds,
- "ended_by": "no_speech",
- "speech_detected": False,
- "peak_level": self._vad._peak_level,
- "vad_stats": self._vad.get_stats(),
- })
- except Exception as e:
- perror(f"Fehler beim Senden: {e}")
- # Session abschließen
- self._complete_session()
- async def _send_recording(self) -> None:
- """Sendet Aufnahme an Server oder verarbeitet lokal."""
- session = self._current_session
- if not session:
- self.resume()
- return
- if self._config.standalone_mode:
- # Lokal: Event emittieren
- await self._application.events.emit("raw_audio_received", {
- "session_id": session.session_id,
- "audio_data": session.audio_data,
- "wakeword_type": session.wakeword_detection.wakeword_type.value,
- "duration_seconds": self._audio_buffer.duration_seconds,
- })
- # Warte auf Verarbeitungs-Ergebnis
- await self._wait_for_processing_result()
- # Standalone: Session sofort abschließen
- self._complete_session()
- else:
- # Client: RecordingComplete Command senden (Audio wurde bereits gestreamt)
- if self._send_to_server:
- # ended_by aus VAD-State ableiten
- vad_state = self._vad.state
- if vad_state == VADState.SILENCE:
- ended_by = "silence"
- elif vad_state == VADState.TIMEOUT:
- ended_by = "timeout"
- else:
- ended_by = "unknown"
- try:
- await self._send_to_server("recording_complete", {
- "session_id": session.session_id,
- "duration_seconds": self._audio_buffer.duration_seconds,
- "ended_by": ended_by,
- "speech_detected": self._vad.has_speech,
- "peak_level": self._vad._peak_level,
- "vad_stats": self._vad.get_stats(),
- })
- except Exception as e:
- perror(f"Fehler beim Senden: {e}")
- # Audio-Ducking: Lautstaerke SOFORT wiederherstellen
- # BEVOR die TTS-Antwort vom Server abgespielt wird.
- # Sonst ist die TTS-Ausgabe kaum hoerbar.
- self._unduck_audio()
- # Client-Modus: NICHT sofort zurück zu LISTENING!
- # Bleibe in PROCESSING und warte auf ConversationEnd vom Server.
- # Timeout als Sicherheitsnetz, falls Server nie antwortet.
- pinfo("RecordingComplete gesendet — warte auf ConversationEnd vom Server")
- self._schedule_conversation_end_timeout()
- async def _wait_for_processing_result(self) -> None:
- """Wartet auf Verarbeitungs-Ergebnis (Standalone-Modus)."""
- if not self._current_session:
- return
- session_id = self._current_session.session_id
- result_future: asyncio.Future[dict] = asyncio.Future()
- async def on_processing_result(event_name: str, data) -> None:
- """Handler für processing_result Event."""
- if getattr(data, "session_id", None) == session_id:
- result_future.set_result({
- "success": getattr(data, "success", True),
- "text": getattr(data, "text", ""),
- "intent": getattr(data, "intent", ""),
- "response_text": getattr(data, "response_text", ""),
- "error": getattr(data, "error", ""),
- })
- # Event-Manager holen
- event_manager = getattr(self._application, "events", None)
- if not event_manager:
- pwarn("EventManager nicht verfügbar")
- return
- # Temporären Handler registrieren
- event_manager.register("processing_result", on_processing_result)
- try:
- # Auf Ergebnis warten (mit Timeout)
- timeout = self._config.follow_up_timeout_seconds
- result = await asyncio.wait_for(result_future, timeout=timeout)
- if result.get("error"):
- perror(f"Verarbeitungsfehler: {result['error']}")
- else:
- pinfo(f"Verarbeitung abgeschlossen: {result.get('intent', 'unbekannt')}")
- except asyncio.TimeoutError:
- pwarn(f"Timeout beim Warten auf Verarbeitungsergebnis ({timeout}s)")
- finally:
- # Handler wieder entfernen
- event_manager.unregister("processing_result", on_processing_result)
- async def _handle_follow_up(self, response: dict) -> None:
- """
- Behandelt Rückfrage vom Server.
- Args:
- response: Server-Antwort mit Rückfrage
- """
- pinfo(f"_handle_follow_up aufgerufen (state={self._state}, session={'ja' if self._current_session else 'nein'})")
- if not self._current_session:
- pwarn("Follow-Up abgebrochen: keine aktive Session")
- return
- self._current_session.follow_up_count += 1
- self._current_session.last_activity = datetime.now()
- pinfo(f"Rückfrage #{self._current_session.follow_up_count}")
- # Starte neue Aufnahme für Antwort
- self._set_state(ServiceState.FOLLOW_UP)
- self._audio_buffer.clear()
- self._audio_buffer.start_recording(include_pre_buffer=True)
- self._vad.start()
- async def _finish_follow_up(self) -> None:
- """Beendet Follow-Up-Aufnahme und sendet RecordingComplete."""
- if not self._current_session:
- return
- audio_data = self._audio_buffer.stop_recording()
- self._vad.stop()
- duration = len(audio_data) / (16000 * 2) # 16kHz, 16-bit
- pinfo(f"Follow-Up Aufnahme beendet: {len(audio_data)} Bytes, {duration:.1f}s")
- # Audio-Ducking: Lautstaerke wiederherstellen vor TTS
- self._unduck_audio()
- # Im Client-Modus: RecordingComplete senden (Audio wurde bereits gestreamt)
- if self._connection:
- try:
- from trixy_core.network.cmd.wakeword import RecordingComplete
- cmd = RecordingComplete(
- session_id=self._current_session.session_id,
- speech_detected=self._vad.has_speech,
- duration_seconds=duration,
- audio_level=self._vad._peak_level if hasattr(self._vad, "_peak_level") else 0.0,
- )
- await self._connection.send_message(cmd)
- pinfo("Follow-Up RecordingComplete gesendet — warte auf Server")
- self._schedule_conversation_end_timeout()
- except Exception as e:
- perror(f"Follow-Up RecordingComplete Fehler: {e}")
- self._complete_session()
- return
- # Standalone-Modus: wie bisher
- if self._send_to_server:
- try:
- response = await self._send_to_server("follow_up_response", {
- "session_id": self._current_session.session_id,
- "audio_data": audio_data.hex(),
- "follow_up_number": self._current_session.follow_up_count,
- })
- if response.get("follow_up"):
- await self._handle_follow_up(response)
- return
- except Exception as e:
- perror(f"Follow-Up-Fehler: {e}")
- self._complete_session()
- def _schedule_conversation_end_timeout(self) -> None:
- """Startet Timeout für ConversationEnd vom Server."""
- async def _timeout_check():
- await asyncio.sleep(15.0) # 15 Sekunden Timeout
- if self._state == ServiceState.PROCESSING and self._current_session:
- pwarn("ConversationEnd-Timeout — erzwinge Session-Ende")
- self._complete_session()
- self._schedule_async(_timeout_check())
- def _complete_session(self) -> None:
- """Schließt aktuelle Session ab."""
- session = self._current_session
- if session:
- # Callbacks
- for callback in self._on_recording_complete:
- try:
- callback(session)
- except Exception:
- pass
- self._current_session = None
- self._audio_buffer.clear()
- # Recording-Config-Override zuruecksetzen (falls temporaer geaendert)
- if hasattr(self, "_original_silence_timeout"):
- self._config.silence_timeout_seconds = self._original_silence_timeout
- if hasattr(self, "_original_max_duration_ms"):
- self._config.vad.max_duration_ms = self._original_max_duration_ms
- # Cooldown erneuern — verhindert Wakeword Re-Detection durch residuales
- # Audio im OpenWakeWord Sliding Window nach dem Resume
- self._last_wakeword_time = time.monotonic()
- # Audio-Queue leeren (verhindert Stau von alten Frames)
- self._drain_audio_queue()
- # Zurück zum Listening
- self.resume()
- async def _on_config_override(self, event_name: str, data) -> None:
- """
- Temporaere Konfigurationsaenderung fuer die naechste Aufnahme.
- Wird von Plugins emittiert um z.B. laengere Aufnahmen (Notizen)
- oder kuerzere Silence-Timeouts zu ermoeglichen.
- Die Werte werden in _complete_session() zurueckgesetzt.
- """
- silence = data.get("silence_timeout_seconds") if isinstance(data, dict) else getattr(data, "silence_timeout_seconds", None)
- max_recording = data.get("max_recording_seconds") if isinstance(data, dict) else getattr(data, "max_recording_seconds", None)
- if silence is not None:
- self._config.silence_timeout_seconds = float(silence)
- pdebug(f"Recording-Config Override: silence_timeout={silence}s")
- if max_recording is not None:
- self._config.vad.max_duration_ms = int(float(max_recording) * 1000)
- pdebug(f"Recording-Config Override: max_recording={max_recording}s")
- def _on_silence_detected(self) -> None:
- """Callback wenn Stille erkannt wurde."""
- pdebug("Stille erkannt")
- def _on_recording_timeout(self) -> None:
- """Callback bei Aufnahme-Timeout."""
- pinfo("Aufnahme-Timeout erreicht")
- def _on_no_speech_detected(self) -> None:
- """Callback wenn keine Sprache erkannt wurde."""
- pinfo("Keine Sprache erkannt — mögliche Fehlauslösung")
- def _generate_session_id(self) -> str:
- """Generiert eine Session-ID."""
- if self._session_id_generator:
- return self._session_id_generator()
- import uuid
- return f"ww-{uuid.uuid4().hex[:12]}"
- # === Public Callbacks ===
- def on_wakeword(self, callback: Callable[[WakewordDetection], None]) -> None:
- """Registriert Callback für Wakeword-Erkennung."""
- self._on_wakeword.append(callback)
- def on_recording_complete(
- self,
- callback: Callable[[RecordingSession], None],
- ) -> None:
- """Registriert Callback für abgeschlossene Aufnahme."""
- self._on_recording_complete.append(callback)
- def on_state_change(self, callback: Callable[[ServiceState], None]) -> None:
- """Registriert Callback für State-Änderungen."""
- self._on_state_change.append(callback)
- def set_server_connection(
- self,
- send_func: Callable[[str, dict], asyncio.Future],
- ) -> None:
- """
- Setzt Server-Kommunikationsfunktion.
- Args:
- send_func: Async-Funktion zum Senden an Server
- """
- self._send_to_server = send_func
- def set_session_id_generator(self, generator: Callable[[], str]) -> None:
- """Setzt Session-ID-Generator."""
- self._session_id_generator = generator
- def set_audio_streamer(self, streamer: Callable[[bytes], Awaitable[bool]]) -> None:
- """
- Setzt Audio-Streaming-Funktion (connection.send_audio).
- Args:
- streamer: Async-Funktion die Audio-Bytes an Server sendet
- """
- self._audio_streamer = streamer
- def setup_for_client(self, connection: "ClientConnection", microphone: "MicrophoneCapture") -> None:
- """
- Konfiguriert Service für Client-Betrieb mit Netzwerk.
- Args:
- connection: ClientConnection zum Server
- microphone: MicrophoneCapture-Instanz
- """
- self._microphone = microphone
- self.set_audio_streamer(connection.send_audio)
- # Future-basierte _send_to_server Funktion
- async def send_to_server(event_type: str, data: dict) -> dict:
- if event_type == "wakeword_detected":
- from trixy_core.network.cmd.wakeword import WakewordDetected
- cmd = WakewordDetected(
- satellite_id=connection.satellite_id,
- wakeword_type=data.get("wakeword_type", ""),
- wakeword_model=data.get("model_name", ""),
- confidence=data.get("confidence", 0.0),
- audio_level=data.get("audio_level", 0.0),
- audio_chunks=data.get("audio_chunks", []),
- timestamp=data.get("timestamp", ""),
- client_session_id=data.get("session_id", ""),
- )
- await connection.send_command(cmd)
- # Future für Arbitration-Antwort
- self._arbitration_future = asyncio.get_event_loop().create_future()
- return await self._arbitration_future
- elif event_type == "recording_complete":
- from trixy_core.network.cmd.wakeword import RecordingComplete
- cmd = RecordingComplete(
- session_id=data.get("session_id", ""),
- duration_seconds=data.get("duration_seconds", 0.0),
- ended_by=data.get("ended_by", ""),
- speech_detected=data.get("speech_detected", True),
- peak_level=data.get("peak_level", 0.0),
- vad_stats=data.get("vad_stats", {}),
- )
- await connection.send_command(cmd)
- return {"success": True}
- return {}
- self.set_server_connection(send_to_server)
- async def handle_wakeword_selected(self, data) -> None:
- """
- WakewordSelected vom Server → Future auflösen.
- Args:
- data: WakewordSelected-Daten
- """
- if self._arbitration_future and not self._arbitration_future.done():
- session_id = getattr(data, "session_id", "") or ""
- conversation_id = getattr(data, "conversation_id", "") or ""
- if isinstance(data, dict):
- session_id = data.get("session_id", "")
- conversation_id = data.get("conversation_id", "")
- self._arbitration_future.set_result({
- "selected": True,
- "session_id": session_id or conversation_id,
- })
- async def handle_wakeword_abort(self, data) -> None:
- """
- WakewordAbort vom Server → Future auflösen.
- Args:
- data: WakewordAbort-Daten
- """
- if self._arbitration_future and not self._arbitration_future.done():
- self._arbitration_future.set_result({"selected": False})
- # Cooldown-Timestamp aktualisieren damit nach Abort keine sofortige Re-Detection
- self._last_wakeword_time = time.monotonic()
- async def handle_conversation_end(self) -> None:
- """ConversationEnd → Aufnahme beenden, zurück zu LISTENING."""
- if self._state in (ServiceState.RECORDING, ServiceState.FOLLOW_UP):
- self._vad.stop()
- self._audio_buffer.stop_recording()
- if self._state in (ServiceState.RECORDING, ServiceState.FOLLOW_UP,
- ServiceState.PROCESSING, ServiceState.WAITING_RESPONSE):
- pinfo("ConversationEnd empfangen — zurück zu LISTENING")
- self._complete_session()
- # === Server-gesteuerte Aufnahme ===
- def start_server_recording(self, recording_id: str) -> None:
- """Aktiviert Audio-Streaming im LISTENING-Zustand."""
- self._server_recording_active = True
- self._server_recording_id = recording_id
- self._server_recording_start_time = time.monotonic()
- pdebug(f"Server-Recording gestartet: {recording_id}")
- def stop_server_recording(self) -> float:
- """Beendet Audio-Streaming. Gibt Dauer zurück."""
- duration = time.monotonic() - self._server_recording_start_time if self._server_recording_active else 0.0
- self._server_recording_active = False
- self._server_recording_id = ""
- self._server_recording_start_time = 0.0
- pdebug(f"Server-Recording gestoppt: {duration:.1f}s")
- return duration
- def set_server_recording_stopped_callback(
- self, cb: Callable[[str, float], Awaitable[None]]
- ) -> None:
- """Setzt Callback für Wakeword-getriggertes Stop."""
- self._on_server_recording_stopped = cb
- def get_stats(self) -> dict:
- """Gibt Statistiken zurück."""
- return {
- "state": self._state.value,
- "detector": self._detector.stats,
- "buffer": self._audio_buffer.get_stats(),
- "vad": self._vad.get_stats() if self._vad.is_active else {},
- "session": {
- "id": self._current_session.session_id if self._current_session else None,
- "follow_up_count": self._current_session.follow_up_count if self._current_session else 0,
- },
- }
|