| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- # -*- coding: utf-8 -*-
- """
- Audio Buffer für Aufnahmen im Arbeitsspeicher.
- """
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import Iterator
- import threading
- import io
- @dataclass
- class BufferConfig:
- """Konfiguration für Audio-Buffer."""
- # Maximale Aufnahmedauer in Sekunden
- max_duration_seconds: float = 60.0
- # Audio-Format
- sample_rate: int = 16000
- channels: int = 1
- bit_depth: int = 16 # 16-bit PCM
- # Buffer-Verwaltung
- chunk_size: int = 1280 # Samples pro Chunk (80ms bei 16kHz)
- pre_buffer_seconds: float = 0.5 # Audio vor dem Trigger behalten
- @property
- def bytes_per_sample(self) -> int:
- """Bytes pro Sample."""
- return self.bit_depth // 8
- @property
- def bytes_per_second(self) -> int:
- """Bytes pro Sekunde."""
- return self.sample_rate * self.channels * self.bytes_per_sample
- @property
- def max_buffer_size(self) -> int:
- """Maximale Buffer-Größe in Bytes."""
- return int(self.max_duration_seconds * self.bytes_per_second)
- @property
- def pre_buffer_size(self) -> int:
- """Pre-Buffer-Größe in Bytes."""
- return int(self.pre_buffer_seconds * self.bytes_per_second)
- @dataclass
- class AudioChunk:
- """Ein Audio-Chunk mit Metadaten."""
- data: bytes
- timestamp: datetime
- sequence_number: int
- duration_ms: float
- def __len__(self) -> int:
- return len(self.data)
- class AudioBuffer:
- """
- Thread-sicherer Audio-Buffer für Aufnahmen.
- Speichert Audio-Daten im Arbeitsspeicher mit konfigurierbarer
- maximaler Dauer und Pre-Buffer-Unterstützung.
- """
- def __init__(self, config: BufferConfig | None = None):
- """
- Initialisiert den Audio-Buffer.
- Args:
- config: Buffer-Konfiguration
- """
- self._config = config or BufferConfig()
- self._lock = threading.RLock()
- # Hauptbuffer
- self._buffer = io.BytesIO()
- self._chunks: list[AudioChunk] = []
- # Pre-Buffer (Ring-Buffer für Audio vor Trigger)
- self._pre_buffer: list[AudioChunk] = []
- self._pre_buffer_bytes = 0
- # State
- self._is_recording = False
- self._start_time: datetime | None = None
- self._sequence_counter = 0
- self._total_bytes = 0
- @property
- def config(self) -> BufferConfig:
- """Gibt Konfiguration zurück."""
- return self._config
- @property
- def is_recording(self) -> bool:
- """Prüft ob Aufnahme läuft."""
- with self._lock:
- return self._is_recording
- @property
- def duration_seconds(self) -> float:
- """Aktuelle Aufnahmedauer in Sekunden."""
- with self._lock:
- return self._total_bytes / self._config.bytes_per_second
- @property
- def size_bytes(self) -> int:
- """Aktuelle Größe in Bytes."""
- with self._lock:
- return self._total_bytes
- @property
- def chunk_count(self) -> int:
- """Anzahl gespeicherter Chunks."""
- with self._lock:
- return len(self._chunks)
- @property
- def start_time(self) -> datetime | None:
- """Startzeit der Aufnahme."""
- with self._lock:
- return self._start_time
- def add_to_pre_buffer(self, audio_data: bytes) -> None:
- """
- Fügt Audio zum Pre-Buffer hinzu.
- Wird verwendet um Audio vor dem Wakeword zu behalten.
- Args:
- audio_data: Audio-Daten (16-bit PCM)
- """
- with self._lock:
- if self._is_recording:
- # Während Aufnahme zum Hauptbuffer hinzufügen
- self._add_chunk(audio_data)
- return
- # Zum Pre-Buffer hinzufügen
- chunk = AudioChunk(
- data=audio_data,
- timestamp=datetime.now(),
- sequence_number=self._sequence_counter,
- duration_ms=len(audio_data) / self._config.bytes_per_second * 1000,
- )
- self._sequence_counter += 1
- self._pre_buffer.append(chunk)
- self._pre_buffer_bytes += len(audio_data)
- # Pre-Buffer beschränken
- while self._pre_buffer_bytes > self._config.pre_buffer_size:
- removed = self._pre_buffer.pop(0)
- self._pre_buffer_bytes -= len(removed.data)
- def start_recording(self, include_pre_buffer: bool = True) -> None:
- """
- Startet die Aufnahme.
- Args:
- include_pre_buffer: Pre-Buffer einschließen
- """
- with self._lock:
- if self._is_recording:
- return
- self._is_recording = True
- self._start_time = datetime.now()
- self._buffer = io.BytesIO()
- self._chunks.clear()
- self._total_bytes = 0
- # Pre-Buffer übernehmen
- if include_pre_buffer and self._pre_buffer:
- for chunk in self._pre_buffer:
- self._chunks.append(chunk)
- self._buffer.write(chunk.data)
- self._total_bytes += len(chunk.data)
- self._pre_buffer.clear()
- self._pre_buffer_bytes = 0
- def add_chunk(self, audio_data: bytes) -> bool:
- """
- Fügt Audio-Chunk zur Aufnahme hinzu.
- Args:
- audio_data: Audio-Daten (16-bit PCM)
- Returns:
- True wenn hinzugefügt, False wenn Buffer voll
- """
- with self._lock:
- if not self._is_recording:
- self.add_to_pre_buffer(audio_data)
- return True
- return self._add_chunk(audio_data)
- def _add_chunk(self, audio_data: bytes) -> bool:
- """Interne Methode zum Hinzufügen eines Chunks."""
- # Prüfe maximale Größe
- if self._total_bytes + len(audio_data) > self._config.max_buffer_size:
- return False
- chunk = AudioChunk(
- data=audio_data,
- timestamp=datetime.now(),
- sequence_number=self._sequence_counter,
- duration_ms=len(audio_data) / self._config.bytes_per_second * 1000,
- )
- self._sequence_counter += 1
- self._chunks.append(chunk)
- self._buffer.write(audio_data)
- self._total_bytes += len(audio_data)
- return True
- def stop_recording(self) -> bytes:
- """
- Stoppt die Aufnahme und gibt alle Daten zurück.
- Returns:
- Alle aufgenommenen Audio-Daten
- """
- with self._lock:
- self._is_recording = False
- # Hole alle Daten
- self._buffer.seek(0)
- data = self._buffer.read()
- return data
- def get_data(self) -> bytes:
- """
- Gibt alle aktuellen Daten zurück ohne die Aufnahme zu stoppen.
- Returns:
- Alle aufgenommenen Audio-Daten
- """
- with self._lock:
- self._buffer.seek(0)
- return self._buffer.read()
- def get_chunks(self) -> list[AudioChunk]:
- """
- Gibt alle Chunks zurück.
- Returns:
- Liste aller Audio-Chunks
- """
- with self._lock:
- return list(self._chunks)
- def iterate_chunks(self) -> Iterator[AudioChunk]:
- """
- Iteriert über alle Chunks.
- Yields:
- Audio-Chunks
- """
- with self._lock:
- for chunk in self._chunks:
- yield chunk
- def clear(self) -> None:
- """Löscht alle Daten."""
- with self._lock:
- self._buffer = io.BytesIO()
- self._chunks.clear()
- self._pre_buffer.clear()
- self._pre_buffer_bytes = 0
- self._is_recording = False
- self._start_time = None
- self._total_bytes = 0
- def get_stats(self) -> dict:
- """
- Gibt Statistiken zurück.
- Returns:
- Dictionary mit Statistiken
- """
- with self._lock:
- return {
- "is_recording": self._is_recording,
- "duration_seconds": self.duration_seconds,
- "size_bytes": self._total_bytes,
- "chunk_count": len(self._chunks),
- "pre_buffer_chunks": len(self._pre_buffer),
- "pre_buffer_bytes": self._pre_buffer_bytes,
- "start_time": self._start_time.isoformat() if self._start_time else None,
- }
- class AudioBufferPool:
- """
- Pool von Audio-Buffern für mehrere gleichzeitige Aufnahmen.
- """
- def __init__(self, pool_size: int = 5, config: BufferConfig | None = None):
- """
- Initialisiert den Buffer-Pool.
- Args:
- pool_size: Anzahl vorzuhaltender Buffer
- config: Gemeinsame Buffer-Konfiguration
- """
- self._config = config or BufferConfig()
- self._lock = threading.Lock()
- # Verfügbare Buffer
- self._available: list[AudioBuffer] = [
- AudioBuffer(self._config) for _ in range(pool_size)
- ]
- # In Verwendung
- self._in_use: dict[str, AudioBuffer] = {}
- def acquire(self, buffer_id: str) -> AudioBuffer:
- """
- Holt einen Buffer aus dem Pool.
- Args:
- buffer_id: Eindeutige ID für den Buffer
- Returns:
- Audio-Buffer
- Raises:
- RuntimeError: Wenn kein Buffer verfügbar
- """
- with self._lock:
- if buffer_id in self._in_use:
- return self._in_use[buffer_id]
- if not self._available:
- # Erstelle neuen Buffer wenn Pool leer
- buffer = AudioBuffer(self._config)
- else:
- buffer = self._available.pop()
- buffer.clear()
- self._in_use[buffer_id] = buffer
- return buffer
- def release(self, buffer_id: str) -> bytes | None:
- """
- Gibt Buffer zurück an den Pool.
- Args:
- buffer_id: Buffer-ID
- Returns:
- Aufgenommene Daten oder None
- """
- with self._lock:
- if buffer_id not in self._in_use:
- return None
- buffer = self._in_use.pop(buffer_id)
- # Hole Daten bevor wir zurückgeben
- data = buffer.stop_recording() if buffer.is_recording else buffer.get_data()
- # Zurück in den Pool
- buffer.clear()
- self._available.append(buffer)
- return data
- def get(self, buffer_id: str) -> AudioBuffer | None:
- """
- Gibt Buffer für ID zurück ohne ihn zu releasen.
- Args:
- buffer_id: Buffer-ID
- Returns:
- Audio-Buffer oder None
- """
- with self._lock:
- return self._in_use.get(buffer_id)
- @property
- def active_count(self) -> int:
- """Anzahl aktiver Buffer."""
- with self._lock:
- return len(self._in_use)
- @property
- def available_count(self) -> int:
- """Anzahl verfügbarer Buffer."""
- with self._lock:
- return len(self._available)
|