accumulator.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # -*- coding: utf-8 -*-
  2. """
  3. Audio-Akkumulator für server-seitige Sammlung gestreamter Audio-Frames.
  4. Sammelt Audio-Frames pro aktiver Conversation und emittiert
  5. bei Recording-Ende das raw_audio_received Event für STT.
  6. """
  7. from __future__ import annotations
  8. from typing import TYPE_CHECKING, Any
  9. from trixy_core.utils.debug import pinfo, pdebug, perror
  10. if TYPE_CHECKING:
  11. from trixy_core.application import IApplication
  12. class AudioAccumulatorService:
  13. """
  14. Sammelt gestreamte Audio-Frames pro aktiver Conversation.
  15. Registriert sich auf:
  16. - conversation_started: Buffer anlegen
  17. - audio_input_received: Audio an Buffer anhängen
  18. - recording_complete: Buffer auslesen und raw_audio_received emittieren
  19. """
  20. def __init__(self, application: "IApplication", wakeword_trim_seconds: float = 1.0) -> None:
  21. self._app = application
  22. self._wakeword_trim_seconds = wakeword_trim_seconds
  23. # satellite_id → {"conversation_id": str, "buffer": bytearray}
  24. self._recordings: dict[str, dict] = {}
  25. async def start(self) -> None:
  26. """Registriert Event-Handler."""
  27. self._app.events.register("conversation_started", self._on_conversation_started)
  28. self._app.events.register("audio_input_received", self._on_audio_received)
  29. self._app.events.register("recording_complete", self._on_recording_complete)
  30. pdebug("AudioAccumulatorService gestartet")
  31. async def stop(self) -> None:
  32. """Entfernt Event-Handler und räumt auf."""
  33. self._app.events.unregister("conversation_started", self._on_conversation_started)
  34. self._app.events.unregister("audio_input_received", self._on_audio_received)
  35. self._app.events.unregister("recording_complete", self._on_recording_complete)
  36. self._recordings.clear()
  37. pdebug("AudioAccumulatorService gestoppt")
  38. async def _on_conversation_started(self, event_name: str, event_data: Any) -> None:
  39. """Legt Buffer für neue Conversation an."""
  40. satellite_id = _get_field(event_data, "satellite_id", "")
  41. conversation_id = _get_field(event_data, "conversation_id", "")
  42. if not satellite_id:
  43. return
  44. self._recordings[satellite_id] = {
  45. "conversation_id": conversation_id,
  46. "buffer": bytearray(),
  47. }
  48. pdebug(f"Audio-Buffer angelegt für {satellite_id} (conv={conversation_id})")
  49. async def _on_audio_received(self, event_name: str, event_data: Any) -> None:
  50. """Hängt Audio an Buffer an (nur für aktive Conversations)."""
  51. satellite_id = _get_field(event_data, "satellite_id", "")
  52. recording = self._recordings.get(satellite_id)
  53. if recording is None:
  54. return
  55. audio_data = _get_field(event_data, "audio_data", b"")
  56. if audio_data:
  57. recording["buffer"].extend(audio_data)
  58. async def _on_recording_complete(self, event_name: str, event_data: Any) -> None:
  59. """Verarbeitet Recording-Ende: Audio an STT oder Conversation abbrechen."""
  60. satellite_id = _get_field(event_data, "satellite_id", "")
  61. speech_detected = _get_field(event_data, "speech_detected", True)
  62. session_id = _get_field(event_data, "session_id", "")
  63. ended_by = _get_field(event_data, "ended_by", "")
  64. duration_seconds = _get_field(event_data, "duration_seconds", 0.0)
  65. recording = self._recordings.pop(satellite_id, None)
  66. if recording is None:
  67. pdebug(f"Kein Audio-Buffer für {satellite_id} — ignoriere recording_complete")
  68. return
  69. conversation_id = recording["conversation_id"]
  70. audio_buffer = bytes(recording["buffer"])
  71. if speech_detected:
  72. # Wakeword-Audio am Anfang trimmen (Pre-Buffer + Wakeword-Tail)
  73. trim_bytes = int(self._wakeword_trim_seconds * 16000 * 2) # 16kHz, 16-bit mono
  74. if trim_bytes > 0 and len(audio_buffer) > trim_bytes:
  75. audio_buffer = audio_buffer[trim_bytes:]
  76. pinfo(f"Wakeword getrimmt: {trim_bytes} Bytes ({self._wakeword_trim_seconds}s)")
  77. # Sprache erkannt → raw_audio_received emittieren für STT
  78. pinfo(
  79. f"Audio gesammelt: {len(audio_buffer)} Bytes, "
  80. f"{duration_seconds:.1f}s (ended_by={ended_by})"
  81. )
  82. await self._app.events.emit("raw_audio_received", {
  83. "session_id": session_id or conversation_id,
  84. "conversation_id": conversation_id,
  85. "satellite_id": satellite_id,
  86. "audio_data": audio_buffer,
  87. "duration_seconds": duration_seconds,
  88. "ended_by": ended_by,
  89. })
  90. # ConversationEnd wird NICHT hier gesendet!
  91. # Der IntentDispatcher entscheidet nach dem Handler-Ergebnis:
  92. # - Follow-Up aktiv → FollowUpExpected (Satellite lauscht weiter)
  93. # - Kein Follow-Up → ConversationEnd (Satellite → LISTENING)
  94. # Siehe intent_dispatcher.py: on_tts_completed() und
  95. # _send_conversation_end_if_no_followup()
  96. else:
  97. # Keine Sprache → Fehlauslösung, Conversation abbrechen
  98. pinfo(f"Keine Sprache erkannt für {satellite_id} — Conversation abbrechen")
  99. await self._send_conversation_end(satellite_id, conversation_id, "no_speech")
  100. async def _send_conversation_end(
  101. self, satellite_id: str, conversation_id: str, reason: str
  102. ) -> None:
  103. """Sendet ConversationEnd an den Satellite."""
  104. satellites = getattr(self._app, "satellites", None)
  105. if satellites is None:
  106. return
  107. satellite = satellites.get(satellite_id)
  108. if satellite is None or not satellite.is_connected:
  109. pdebug(f"Satellite {satellite_id} nicht verfügbar für ConversationEnd")
  110. return
  111. try:
  112. from trixy_core.network.cmd.wakeword import ConversationEnd
  113. cmd = ConversationEnd(
  114. session_id=conversation_id,
  115. reason=reason,
  116. )
  117. await satellite.send_command(cmd)
  118. pdebug(f"ConversationEnd gesendet an {satellite_id} (reason={reason})")
  119. except Exception as e:
  120. perror(f"Fehler beim Senden von ConversationEnd: {e}")
  121. def _get_field(data: Any, field: str, default: Any) -> Any:
  122. """Holt ein Feld aus dict, EventData oder Objekt."""
  123. if isinstance(data, dict):
  124. return data.get(field, default)
  125. # EventData speichert Dict-Daten in metadata
  126. if hasattr(data, "metadata") and isinstance(data.metadata, dict):
  127. if field in data.metadata:
  128. return data.metadata[field]
  129. # Fallback: direktes Attribut
  130. return getattr(data, field, default)