| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- # -*- coding: utf-8 -*-
- """
- Audio-Akkumulator für server-seitige Sammlung gestreamter Audio-Frames.
- Sammelt Audio-Frames pro aktiver Conversation und emittiert
- bei Recording-Ende das raw_audio_received Event für STT.
- """
- from __future__ import annotations
- from typing import TYPE_CHECKING, Any
- from trixy_core.utils.debug import pinfo, pdebug, perror
- if TYPE_CHECKING:
- from trixy_core.application import IApplication
- class AudioAccumulatorService:
- """
- Sammelt gestreamte Audio-Frames pro aktiver Conversation.
- Registriert sich auf:
- - conversation_started: Buffer anlegen
- - audio_input_received: Audio an Buffer anhängen
- - recording_complete: Buffer auslesen und raw_audio_received emittieren
- """
- def __init__(self, application: "IApplication", wakeword_trim_seconds: float = 1.0) -> None:
- self._app = application
- self._wakeword_trim_seconds = wakeword_trim_seconds
- # satellite_id → {"conversation_id": str, "buffer": bytearray}
- self._recordings: dict[str, dict] = {}
- async def start(self) -> None:
- """Registriert Event-Handler."""
- self._app.events.register("conversation_started", self._on_conversation_started)
- self._app.events.register("audio_input_received", self._on_audio_received)
- self._app.events.register("recording_complete", self._on_recording_complete)
- pdebug("AudioAccumulatorService gestartet")
- async def stop(self) -> None:
- """Entfernt Event-Handler und räumt auf."""
- self._app.events.unregister("conversation_started", self._on_conversation_started)
- self._app.events.unregister("audio_input_received", self._on_audio_received)
- self._app.events.unregister("recording_complete", self._on_recording_complete)
- self._recordings.clear()
- pdebug("AudioAccumulatorService gestoppt")
- async def _on_conversation_started(self, event_name: str, event_data: Any) -> None:
- """Legt Buffer für neue Conversation an."""
- satellite_id = _get_field(event_data, "satellite_id", "")
- conversation_id = _get_field(event_data, "conversation_id", "")
- if not satellite_id:
- return
- self._recordings[satellite_id] = {
- "conversation_id": conversation_id,
- "buffer": bytearray(),
- }
- pdebug(f"Audio-Buffer angelegt für {satellite_id} (conv={conversation_id})")
- async def _on_audio_received(self, event_name: str, event_data: Any) -> None:
- """Hängt Audio an Buffer an (nur für aktive Conversations)."""
- satellite_id = _get_field(event_data, "satellite_id", "")
- recording = self._recordings.get(satellite_id)
- if recording is None:
- return
- audio_data = _get_field(event_data, "audio_data", b"")
- if audio_data:
- recording["buffer"].extend(audio_data)
- async def _on_recording_complete(self, event_name: str, event_data: Any) -> None:
- """Verarbeitet Recording-Ende: Audio an STT oder Conversation abbrechen."""
- satellite_id = _get_field(event_data, "satellite_id", "")
- speech_detected = _get_field(event_data, "speech_detected", True)
- session_id = _get_field(event_data, "session_id", "")
- ended_by = _get_field(event_data, "ended_by", "")
- duration_seconds = _get_field(event_data, "duration_seconds", 0.0)
- recording = self._recordings.pop(satellite_id, None)
- if recording is None:
- pdebug(f"Kein Audio-Buffer für {satellite_id} — ignoriere recording_complete")
- return
- conversation_id = recording["conversation_id"]
- audio_buffer = bytes(recording["buffer"])
- if speech_detected:
- # Wakeword-Audio am Anfang trimmen (Pre-Buffer + Wakeword-Tail)
- trim_bytes = int(self._wakeword_trim_seconds * 16000 * 2) # 16kHz, 16-bit mono
- if trim_bytes > 0 and len(audio_buffer) > trim_bytes:
- audio_buffer = audio_buffer[trim_bytes:]
- pinfo(f"Wakeword getrimmt: {trim_bytes} Bytes ({self._wakeword_trim_seconds}s)")
- # Sprache erkannt → raw_audio_received emittieren für STT
- pinfo(
- f"Audio gesammelt: {len(audio_buffer)} Bytes, "
- f"{duration_seconds:.1f}s (ended_by={ended_by})"
- )
- await self._app.events.emit("raw_audio_received", {
- "session_id": session_id or conversation_id,
- "conversation_id": conversation_id,
- "satellite_id": satellite_id,
- "audio_data": audio_buffer,
- "duration_seconds": duration_seconds,
- "ended_by": ended_by,
- })
- # ConversationEnd wird NICHT hier gesendet!
- # Der IntentDispatcher entscheidet nach dem Handler-Ergebnis:
- # - Follow-Up aktiv → FollowUpExpected (Satellite lauscht weiter)
- # - Kein Follow-Up → ConversationEnd (Satellite → LISTENING)
- # Siehe intent_dispatcher.py: on_tts_completed() und
- # _send_conversation_end_if_no_followup()
- else:
- # Keine Sprache → Fehlauslösung, Conversation abbrechen
- pinfo(f"Keine Sprache erkannt für {satellite_id} — Conversation abbrechen")
- await self._send_conversation_end(satellite_id, conversation_id, "no_speech")
- async def _send_conversation_end(
- self, satellite_id: str, conversation_id: str, reason: str
- ) -> None:
- """Sendet ConversationEnd an den Satellite."""
- satellites = getattr(self._app, "satellites", None)
- if satellites is None:
- return
- satellite = satellites.get(satellite_id)
- if satellite is None or not satellite.is_connected:
- pdebug(f"Satellite {satellite_id} nicht verfügbar für ConversationEnd")
- return
- try:
- from trixy_core.network.cmd.wakeword import ConversationEnd
- cmd = ConversationEnd(
- session_id=conversation_id,
- reason=reason,
- )
- await satellite.send_command(cmd)
- pdebug(f"ConversationEnd gesendet an {satellite_id} (reason={reason})")
- except Exception as e:
- perror(f"Fehler beim Senden von ConversationEnd: {e}")
- def _get_field(data: Any, field: str, default: Any) -> Any:
- """Holt ein Feld aus dict, EventData oder Objekt."""
- if isinstance(data, dict):
- return data.get(field, default)
- # EventData speichert Dict-Daten in metadata
- if hasattr(data, "metadata") and isinstance(data.metadata, dict):
- if field in data.metadata:
- return data.metadata[field]
- # Fallback: direktes Attribut
- return getattr(data, field, default)
|