| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947 |
- # -*- coding: utf-8 -*-
- """
- Tests für Streaming Audio Player.
- Testet:
- - BlockingStreamPlayer Chunk-Handling
- - AudioOutputManager TTS/Musik-Verwaltung
- - Prebuffer und State-Management
- """
- import asyncio
- import pytest
- import threading
- import time
- from unittest.mock import MagicMock, patch, AsyncMock
- from trixy_core.audio.player import (
- BlockingStreamPlayer,
- AudioOutputManager,
- PlayerConfig,
- PlayerState,
- SimpleStreamingPlayer,
- RobustStreamingPlayer,
- )
- # =============================================================================
- # Fixtures
- # =============================================================================
- @pytest.fixture
- def player_config():
- """Standard Player-Konfiguration für Tests."""
- return PlayerConfig(
- sample_rate=22050,
- channels=1,
- sample_width=2,
- prebuffer_ms=100, # Kurzer Prebuffer für schnelle Tests
- )
- @pytest.fixture
- def player(player_config):
- """Erstellt einen BlockingStreamPlayer."""
- return BlockingStreamPlayer(player_config)
- @pytest.fixture
- def audio_manager():
- """Erstellt einen AudioOutputManager."""
- return AudioOutputManager(
- tts_sample_rate=22050,
- music_sample_rate=44100,
- )
- # =============================================================================
- # Tests für PlayerConfig
- # =============================================================================
- class TestPlayerConfig:
- """Tests für PlayerConfig Datenklasse."""
- def test_default_values(self):
- """Testet Standard-Werte."""
- config = PlayerConfig()
- assert config.sample_rate == 44100
- assert config.channels == 2
- assert config.sample_width == 2
- assert config.prebuffer_ms == 1000
- def test_custom_values(self):
- """Testet benutzerdefinierte Werte."""
- config = PlayerConfig(
- sample_rate=16000,
- channels=1,
- sample_width=2,
- prebuffer_ms=500,
- )
- assert config.sample_rate == 16000
- assert config.channels == 1
- assert config.sample_width == 2
- assert config.prebuffer_ms == 500
- def test_bytes_per_second(self):
- """Testet bytes_per_second Berechnung."""
- config = PlayerConfig(
- sample_rate=16000,
- channels=1,
- sample_width=2,
- )
- # 16000 * 1 * 2 = 32000 bytes/sec
- assert config.bytes_per_second == 32000
- def test_bytes_per_second_stereo(self):
- """Testet bytes_per_second bei Stereo."""
- config = PlayerConfig(
- sample_rate=44100,
- channels=2,
- sample_width=2,
- )
- # 44100 * 2 * 2 = 176400 bytes/sec
- assert config.bytes_per_second == 176400
- def test_prebuffer_bytes(self):
- """Testet prebuffer_bytes Berechnung."""
- config = PlayerConfig(
- sample_rate=16000,
- channels=1,
- sample_width=2,
- prebuffer_ms=500,
- )
- # 32000 bytes/sec * 500ms / 1000 = 16000 bytes
- assert config.prebuffer_bytes == 16000
- def test_frame_size(self):
- """Testet frame_size Berechnung."""
- config = PlayerConfig(
- sample_rate=44100,
- channels=2,
- sample_width=2,
- )
- # 2 channels * 2 bytes = 4 bytes per frame
- assert config.frame_size == 4
- def test_frame_size_mono(self):
- """Testet frame_size bei Mono."""
- config = PlayerConfig(
- sample_rate=22050,
- channels=1,
- sample_width=2,
- )
- # 1 channel * 2 bytes = 2 bytes per frame
- assert config.frame_size == 2
- # =============================================================================
- # Tests für PlayerState
- # =============================================================================
- class TestPlayerState:
- """Tests für PlayerState Enum."""
- def test_states_exist(self):
- """Testet, dass alle Zustände existieren."""
- assert PlayerState.STOPPED
- assert PlayerState.BUFFERING
- assert PlayerState.PLAYING
- def test_states_are_unique(self):
- """Testet, dass alle Zustände unterschiedlich sind."""
- states = [
- PlayerState.STOPPED,
- PlayerState.BUFFERING,
- PlayerState.PLAYING,
- ]
- assert len(states) == len(set(states))
- # =============================================================================
- # Tests für BlockingStreamPlayer - Basis
- # =============================================================================
- class TestBlockingStreamPlayerBasic:
- """Basis-Tests für BlockingStreamPlayer."""
- def test_initial_state(self, player):
- """Testet den Initialzustand."""
- assert player.state == PlayerState.STOPPED
- assert player.is_playing is False
- def test_default_config(self):
- """Testet Player mit Standard-Konfiguration."""
- player = BlockingStreamPlayer()
- assert player.state == PlayerState.STOPPED
- assert player._config.sample_rate == 44100
- def test_custom_config(self, player_config):
- """Testet Player mit benutzerdefinierter Konfiguration."""
- player = BlockingStreamPlayer(player_config)
- assert player._config.sample_rate == 22050
- assert player._config.channels == 1
- def test_stats_initial(self, player):
- """Testet initiale Statistiken."""
- stats = player.stats
- assert stats["state"] == "STOPPED"
- assert stats["bytes_played"] == 0
- assert stats["queue_size"] == 0
- def test_buffer_ms_empty(self, player):
- """Testet buffer_ms bei leerem Buffer."""
- assert player.buffer_ms == 0
- # =============================================================================
- # Tests für BlockingStreamPlayer - Aliases
- # =============================================================================
- class TestPlayerAliases:
- """Tests für Klassen-Aliases."""
- def test_simple_streaming_player_alias(self):
- """Testet SimpleStreamingPlayer Alias."""
- assert SimpleStreamingPlayer is BlockingStreamPlayer
- def test_robust_streaming_player_alias(self):
- """Testet RobustStreamingPlayer Alias."""
- assert RobustStreamingPlayer is BlockingStreamPlayer
- # =============================================================================
- # Tests für BlockingStreamPlayer - Chunk-Handling
- # =============================================================================
- class TestBlockingStreamPlayerChunks:
- """Tests für Chunk-Handling."""
- def test_add_chunk_starts_buffering(self, player):
- """Testet, dass add_chunk den Zustand auf BUFFERING setzt."""
- audio_data = b'\x00' * 1024
- result = player.add_chunk(audio_data)
- assert result is True
- assert player.state == PlayerState.BUFFERING
- def test_add_chunk_returns_true(self, player):
- """Testet, dass add_chunk True zurückgibt."""
- result = player.add_chunk(b'\x00' * 1024)
- assert result is True
- def test_add_empty_chunk_returns_false(self, player):
- """Testet, dass leere Chunks abgelehnt werden."""
- result = player.add_chunk(b'')
- assert result is False
- def test_add_none_chunk_returns_false(self, player):
- """Testet, dass None-Chunks abgelehnt werden."""
- result = player.add_chunk(None)
- assert result is False
- def test_prebuffer_fills_before_playing(self, player_config):
- """Testet, dass Prebuffer gefüllt wird bevor Wiedergabe startet."""
- # Prebuffer = 100ms bei 22050Hz mono = ~4410 bytes
- player = BlockingStreamPlayer(player_config)
- # Kleiner Chunk - sollte in Prebuffer bleiben
- player.add_chunk(b'\x00' * 1000)
- assert player.state == PlayerState.BUFFERING
- def test_buffer_ms_increases_with_chunks(self, player):
- """Testet, dass buffer_ms mit Chunks steigt."""
- initial_ms = player.buffer_ms
- # Chunks hinzufügen
- for _ in range(5):
- player.add_chunk(b'\x00' * 1024)
- # buffer_ms sollte größer sein (Prebuffer zählt)
- # Note: buffer_ms berechnet aus queue und prebuffer
- assert player.buffer_ms >= 0
- # =============================================================================
- # Tests für BlockingStreamPlayer - State Transitions
- # =============================================================================
- class TestBlockingStreamPlayerStateTransitions:
- """Tests für Zustandsübergänge."""
- def test_stop_from_stopped(self, player):
- """Testet stop() im STOPPED-Zustand."""
- assert player.state == PlayerState.STOPPED
- player.stop()
- assert player.state == PlayerState.STOPPED
- def test_stop_clears_queue(self, player):
- """Testet, dass stop() die Queue leert."""
- player.add_chunk(b'\x00' * 1024)
- player.stop()
- assert player._queue.empty()
- def test_stop_clears_prebuffer(self, player):
- """Testet, dass stop() den Prebuffer leert."""
- player.add_chunk(b'\x00' * 1024)
- assert player._prebuffer_size > 0
- player.stop()
- assert player._prebuffer_size == 0
- assert len(player._prebuffer) == 0
- def test_finish_flushes_prebuffer(self, player):
- """Testet, dass finish() den Prebuffer in die Queue schiebt."""
- player.add_chunk(b'\x00' * 1024)
- assert player.state == PlayerState.BUFFERING
- assert player._prebuffer_size > 0
- player.finish()
- # Prebuffer sollte in Queue verschoben sein
- assert player._prebuffer_size == 0
- # Queue sollte Daten + None (Ende-Marker) haben
- assert not player._queue.empty()
- def test_finish_adds_none_marker(self, player):
- """Testet, dass finish() einen None-Marker hinzufügt."""
- player.finish()
- # Queue sollte None enthalten
- item = player._queue.get_nowait()
- assert item is None
- def test_is_playing_false_when_stopped(self, player):
- """Testet is_playing bei STOPPED-Zustand."""
- assert player.state == PlayerState.STOPPED
- assert player.is_playing is False
- def test_is_playing_false_when_buffering(self, player):
- """Testet is_playing bei BUFFERING-Zustand."""
- player.add_chunk(b'\x00' * 100)
- assert player.state == PlayerState.BUFFERING
- assert player.is_playing is False
- # =============================================================================
- # Tests für BlockingStreamPlayer - Pause
- # =============================================================================
- class TestBlockingStreamPlayerPause:
- """Tests für Pause-Funktionalität."""
- def test_pause_from_stopped(self, player):
- """Testet, dass pause() im STOPPED-Zustand nichts tut."""
- assert player.state == PlayerState.STOPPED
- player.pause()
- # State bleibt STOPPED (kann nicht von STOPPED pausieren)
- assert player.state == PlayerState.STOPPED
- def test_resume_from_stopped(self, player):
- """Testet, dass resume() im STOPPED-Zustand nichts tut."""
- assert player.state == PlayerState.STOPPED
- player.resume()
- assert player.state == PlayerState.STOPPED
- def test_toggle_pause_from_stopped(self, player):
- """Testet toggle_pause() im STOPPED-Zustand."""
- assert player.state == PlayerState.STOPPED
- result = player.toggle_pause()
- assert result is False
- assert player.state == PlayerState.STOPPED
- def test_is_paused_false_initially(self, player):
- """Testet is_paused Property initial."""
- assert player.is_paused is False
- def test_is_paused_false_when_stopped(self, player):
- """Testet is_paused bei STOPPED-Zustand."""
- assert player.state == PlayerState.STOPPED
- assert player.is_paused is False
- def test_pause_sets_state_paused(self, player):
- """Testet, dass pause() den State auf PAUSED setzt."""
- # Manuell auf PLAYING setzen (ohne echten Thread)
- player._state = PlayerState.PLAYING
- player.pause()
- assert player.state == PlayerState.PAUSED
- assert player.is_paused is True
- def test_resume_sets_state_playing(self, player):
- """Testet, dass resume() den State auf PLAYING setzt."""
- # Manuell auf PAUSED setzen
- player._state = PlayerState.PAUSED
- player._pause_event.set()
- player.resume()
- assert player.state == PlayerState.PLAYING
- assert player.is_paused is False
- def test_toggle_pause_from_playing(self, player):
- """Testet toggle_pause() von PLAYING nach PAUSED."""
- player._state = PlayerState.PLAYING
- result = player.toggle_pause()
- assert result is True
- assert player.state == PlayerState.PAUSED
- assert player._pause_event.is_set()
- def test_toggle_pause_from_paused(self, player):
- """Testet toggle_pause() von PAUSED nach PLAYING."""
- player._state = PlayerState.PAUSED
- player._pause_event.set()
- result = player.toggle_pause()
- assert result is False
- assert player.state == PlayerState.PLAYING
- assert not player._pause_event.is_set()
- def test_stop_clears_pause_event(self, player):
- """Testet, dass stop() das Pause-Event löscht."""
- player._state = PlayerState.PAUSED
- player._pause_event.set()
- player.stop()
- assert not player._pause_event.is_set()
- assert player.state == PlayerState.STOPPED
- # =============================================================================
- # Tests für BlockingStreamPlayer - Callbacks
- # =============================================================================
- class TestBlockingStreamPlayerCallbacks:
- """Tests für Callback-Registrierung."""
- def test_on_finished_callback_registration(self, player):
- """Testet on_finished Callback-Registrierung."""
- callback = MagicMock()
- player.on_finished(callback)
- assert player._on_finished is callback
- def test_on_finished_callback_none(self, player):
- """Testet, dass on_finished None akzeptiert."""
- player.on_finished(None)
- assert player._on_finished is None
- # =============================================================================
- # Tests für BlockingStreamPlayer - Thread-Safety
- # =============================================================================
- class TestBlockingStreamPlayerThreadSafety:
- """Tests für Thread-Sicherheit."""
- def test_concurrent_chunk_adding(self, player):
- """Testet gleichzeitiges Hinzufügen von Chunks aus mehreren Threads."""
- chunks_added = []
- lock = threading.Lock()
- def add_chunks(thread_id):
- for i in range(10):
- chunk_data = bytes([thread_id] * 100)
- result = player.add_chunk(chunk_data)
- if result:
- with lock:
- chunks_added.append((thread_id, i))
- threads = [
- threading.Thread(target=add_chunks, args=(i,))
- for i in range(5)
- ]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- # Alle Chunks sollten hinzugefügt worden sein
- assert len(chunks_added) == 50
- def test_state_access_thread_safe(self, player):
- """Testet Thread-sicheren Zugriff auf state."""
- states_read = []
- def read_state(count):
- for _ in range(count):
- states_read.append(player.state)
- time.sleep(0.001)
- threads = [
- threading.Thread(target=read_state, args=(20,))
- for _ in range(5)
- ]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- # Alle Zustandsabfragen sollten erfolgreich sein
- assert len(states_read) == 100
- # =============================================================================
- # Tests für AudioOutputManager - Basis
- # =============================================================================
- class TestAudioOutputManagerBasic:
- """Basis-Tests für AudioOutputManager."""
- def test_initialization(self, audio_manager):
- """Testet die Initialisierung."""
- assert audio_manager._tts_player is not None
- assert audio_manager._music_player is not None
- assert audio_manager._initialized is False
- def test_custom_sample_rates(self):
- """Testet benutzerdefinierte Sample-Rates."""
- manager = AudioOutputManager(
- tts_sample_rate=16000,
- music_sample_rate=48000,
- )
- assert manager._tts_player._config.sample_rate == 16000
- assert manager._music_player._config.sample_rate == 48000
- def test_tts_player_config(self, audio_manager):
- """Testet TTS-Player-Konfiguration."""
- config = audio_manager._tts_player._config
- assert config.sample_rate == 22050
- assert config.channels == 1
- assert config.prebuffer_ms == 500
- def test_music_player_config(self, audio_manager):
- """Testet Musik-Player-Konfiguration."""
- config = audio_manager._music_player._config
- assert config.sample_rate == 44100
- assert config.channels == 2
- assert config.prebuffer_ms == 2000
- # =============================================================================
- # Tests für AudioOutputManager - Initialize/Shutdown
- # =============================================================================
- class TestAudioOutputManagerLifecycle:
- """Tests für Initialisierung und Shutdown."""
- @pytest.mark.asyncio
- async def test_initialize_returns_true(self, audio_manager):
- """Testet, dass initialize() True zurückgibt."""
- result = await audio_manager.initialize()
- assert result is True
- assert audio_manager._initialized is True
- @pytest.mark.asyncio
- async def test_shutdown(self, audio_manager):
- """Testet Shutdown."""
- await audio_manager.initialize()
- await audio_manager.shutdown()
- assert audio_manager._initialized is False
- # =============================================================================
- # Tests für AudioOutputManager - TTS
- # =============================================================================
- class TestAudioOutputManagerTTS:
- """Tests für TTS-Funktionen."""
- def test_play_tts_chunk(self, audio_manager):
- """Testet TTS-Chunk-Wiedergabe."""
- result = audio_manager.play_tts_chunk(b'\x00' * 1024)
- assert result is True
- def test_play_tts_chunk_empty(self, audio_manager):
- """Testet, dass leere TTS-Chunks abgelehnt werden."""
- result = audio_manager.play_tts_chunk(b'')
- assert result is False
- def test_finish_tts(self, audio_manager):
- """Testet TTS-Stream beenden."""
- audio_manager.play_tts_chunk(b'\x00' * 1024)
- audio_manager.finish_tts()
- # Sollte None-Marker in Queue haben
- assert not audio_manager._tts_player._queue.empty()
- def test_stop_tts(self, audio_manager):
- """Testet TTS-Wiedergabe stoppen."""
- audio_manager.play_tts_chunk(b'\x00' * 1024)
- audio_manager.stop_tts()
- assert audio_manager._tts_player.state == PlayerState.STOPPED
- def test_tts_playing_property(self, audio_manager):
- """Testet tts_playing Property."""
- assert audio_manager.tts_playing is False
- # =============================================================================
- # Tests für AudioOutputManager - Musik
- # =============================================================================
- class TestAudioOutputManagerMusic:
- """Tests für Musik-Funktionen."""
- def test_play_music_chunk(self, audio_manager):
- """Testet Musik-Chunk-Wiedergabe."""
- result = audio_manager.play_music_chunk(b'\x00' * 1024)
- assert result is True
- def test_play_music_chunk_empty(self, audio_manager):
- """Testet, dass leere Musik-Chunks abgelehnt werden."""
- result = audio_manager.play_music_chunk(b'')
- assert result is False
- def test_finish_music(self, audio_manager):
- """Testet Musik-Stream beenden."""
- audio_manager.play_music_chunk(b'\x00' * 1024)
- audio_manager.finish_music()
- # Sollte None-Marker in Queue haben
- assert not audio_manager._music_player._queue.empty()
- def test_stop_music(self, audio_manager):
- """Testet Musik-Wiedergabe stoppen."""
- audio_manager.play_music_chunk(b'\x00' * 1024)
- audio_manager.stop_music()
- assert audio_manager._music_player.state == PlayerState.STOPPED
- def test_music_playing_property(self, audio_manager):
- """Testet music_playing Property."""
- assert audio_manager.music_playing is False
- # =============================================================================
- # Tests für AudioOutputManager - Kombiniert
- # =============================================================================
- class TestAudioOutputManagerCombined:
- """Tests für kombinierte Funktionen."""
- def test_stop_all(self, audio_manager):
- """Testet alle Wiedergaben stoppen."""
- audio_manager.play_tts_chunk(b'\x00' * 1024)
- audio_manager.play_music_chunk(b'\x00' * 1024)
- audio_manager.stop_all()
- assert audio_manager._tts_player.state == PlayerState.STOPPED
- assert audio_manager._music_player.state == PlayerState.STOPPED
- def test_stats(self, audio_manager):
- """Testet kombinierte Statistiken."""
- stats = audio_manager.stats
- assert "tts" in stats
- assert "music" in stats
- assert "state" in stats["tts"]
- assert "state" in stats["music"]
- def test_simultaneous_tts_and_music(self, audio_manager):
- """Testet gleichzeitige TTS- und Musik-Pufferung."""
- # TTS-Chunks
- for _ in range(3):
- audio_manager.play_tts_chunk(b'\x00' * 1024)
- # Musik-Chunks
- for _ in range(3):
- audio_manager.play_music_chunk(b'\x00' * 2048)
- # Beide sollten Daten haben
- assert audio_manager._tts_player._prebuffer_size > 0
- assert audio_manager._music_player._prebuffer_size > 0
- # Nur TTS stoppen
- audio_manager.stop_tts()
- assert audio_manager._tts_player.state == PlayerState.STOPPED
- assert audio_manager._music_player.state == PlayerState.BUFFERING
- # =============================================================================
- # Tests für AudioOutputManager - Musik-Pause
- # =============================================================================
- class TestAudioOutputManagerMusicPause:
- """Tests für Musik-Pause-Funktionalität."""
- def test_music_paused_initial(self, audio_manager):
- """Testet music_paused Property initial."""
- assert audio_manager.music_paused is False
- def test_pause_music(self, audio_manager):
- """Testet pause_music() Methode."""
- # Musik-Player auf PLAYING setzen
- audio_manager._music_player._state = PlayerState.PLAYING
- audio_manager.pause_music()
- assert audio_manager.music_paused is True
- assert audio_manager._music_player.state == PlayerState.PAUSED
- def test_resume_music(self, audio_manager):
- """Testet resume_music() Methode."""
- # Musik-Player auf PAUSED setzen
- audio_manager._music_player._state = PlayerState.PAUSED
- audio_manager._music_player._pause_event.set()
- audio_manager.resume_music()
- assert audio_manager.music_paused is False
- assert audio_manager._music_player.state == PlayerState.PLAYING
- def test_toggle_music_pause_from_playing(self, audio_manager):
- """Testet toggle_music_pause() von PLAYING nach PAUSED."""
- audio_manager._music_player._state = PlayerState.PLAYING
- result = audio_manager.toggle_music_pause()
- assert result is True
- assert audio_manager.music_paused is True
- def test_toggle_music_pause_from_paused(self, audio_manager):
- """Testet toggle_music_pause() von PAUSED nach PLAYING."""
- audio_manager._music_player._state = PlayerState.PAUSED
- audio_manager._music_player._pause_event.set()
- result = audio_manager.toggle_music_pause()
- assert result is False
- assert audio_manager.music_paused is False
- def test_stop_all_clears_pause(self, audio_manager):
- """Testet, dass stop_all() auch Pause aufhebt."""
- audio_manager._music_player._state = PlayerState.PAUSED
- audio_manager._music_player._pause_event.set()
- audio_manager.stop_all()
- assert not audio_manager._music_player._pause_event.is_set()
- assert audio_manager._music_player.state == PlayerState.STOPPED
- # =============================================================================
- # Tests für AudioOutputManager - Buffer-Status
- # =============================================================================
- class TestAudioOutputManagerBufferStatus:
- """Tests für Buffer-Status-Properties."""
- def test_music_buffer_full_initial(self, audio_manager):
- """Testet music_buffer_full bei leerem Buffer."""
- assert audio_manager.music_buffer_full is False
- def test_music_buffer_low_initial(self, audio_manager):
- """Testet music_buffer_low bei leerem Buffer."""
- # Leerer Buffer = low
- assert audio_manager.music_buffer_low is True
- def test_music_buffer_ms_initial(self, audio_manager):
- """Testet music_buffer_ms bei leerem Buffer."""
- assert audio_manager.music_buffer_ms == 0
- def test_tts_buffer_full_initial(self, audio_manager):
- """Testet tts_buffer_full bei leerem Buffer."""
- assert audio_manager.tts_buffer_full is False
- def test_can_accept_music_chunk_empty(self, audio_manager):
- """Testet can_accept_music_chunk bei leerer Queue."""
- assert audio_manager.can_accept_music_chunk() is True
- def test_can_accept_tts_chunk_empty(self, audio_manager):
- """Testet can_accept_tts_chunk bei leerer Queue."""
- assert audio_manager.can_accept_tts_chunk() is True
- # =============================================================================
- # Tests für Audio-Format-Konfiguration
- # =============================================================================
- class TestAudioFormatConfiguration:
- """Tests für dynamische Audio-Format-Konfiguration."""
- def test_tts_format_property(self, audio_manager):
- """Testet tts_format Property."""
- format_info = audio_manager.tts_format
- assert format_info["sample_rate"] == 22050
- assert format_info["channels"] == 1
- assert format_info["sample_width"] == 2
- def test_music_format_property(self, audio_manager):
- """Testet music_format Property."""
- format_info = audio_manager.music_format
- assert format_info["sample_rate"] == 44100
- assert format_info["channels"] == 2
- assert format_info["sample_width"] == 2
- @pytest.mark.asyncio
- async def test_update_tts_format(self, audio_manager):
- """Testet TTS-Format-Aktualisierung."""
- assert audio_manager.tts_format["sample_rate"] == 22050
- await audio_manager.update_tts_format(
- sample_rate=16000,
- channels=1,
- sample_width=2
- )
- assert audio_manager.tts_format["sample_rate"] == 16000
- assert audio_manager.tts_format["channels"] == 1
- @pytest.mark.asyncio
- async def test_update_music_format(self, audio_manager):
- """Testet Musik-Format-Aktualisierung."""
- assert audio_manager.music_format["sample_rate"] == 44100
- await audio_manager.update_music_format(
- sample_rate=48000,
- channels=2,
- sample_width=2
- )
- assert audio_manager.music_format["sample_rate"] == 48000
- assert audio_manager.music_format["channels"] == 2
- @pytest.mark.asyncio
- async def test_update_format_stops_current_playback(self, audio_manager):
- """Testet, dass Format-Aktualisierung die Wiedergabe stoppt."""
- # Chunks hinzufügen
- for _ in range(5):
- audio_manager.play_tts_chunk(b'\x00' * 1024)
- # Format aktualisieren
- await audio_manager.update_tts_format(sample_rate=16000)
- # Neuer Player sollte leer sein
- assert audio_manager._tts_player._prebuffer_size == 0
- assert audio_manager._tts_player.state == PlayerState.STOPPED
- @pytest.mark.asyncio
- async def test_update_format_preserves_initialized_state(self, audio_manager):
- """Testet, dass _initialized nach Format-Update erhalten bleibt."""
- await audio_manager.initialize()
- assert audio_manager._initialized is True
- await audio_manager.update_tts_format(sample_rate=16000)
- # _initialized sollte noch True sein
- assert audio_manager._initialized is True
- # =============================================================================
- # Integration Tests
- # =============================================================================
- class TestAudioPlayerIntegration:
- """Integrationstests für Audio-Player."""
- def test_full_tts_flow(self, audio_manager):
- """Testet den vollständigen TTS-Flow ohne echte Wiedergabe."""
- # Chunks hinzufügen
- for _ in range(5):
- result = audio_manager.play_tts_chunk(b'\x00' * 1024)
- assert result is True
- # Stream beenden
- audio_manager.finish_tts()
- # Aufräumen
- audio_manager.stop_tts()
- assert audio_manager._tts_player.state == PlayerState.STOPPED
- def test_full_music_flow(self, audio_manager):
- """Testet den vollständigen Musik-Flow ohne echte Wiedergabe."""
- # Chunks hinzufügen
- for _ in range(5):
- result = audio_manager.play_music_chunk(b'\x00' * 2048)
- assert result is True
- # Stream beenden
- audio_manager.finish_music()
- # Aufräumen
- audio_manager.stop_music()
- assert audio_manager._music_player.state == PlayerState.STOPPED
- def test_interleaved_tts_and_music(self, audio_manager):
- """Testet abwechselnde TTS- und Musik-Chunks."""
- for i in range(10):
- if i % 2 == 0:
- audio_manager.play_tts_chunk(b'\x00' * 512)
- else:
- audio_manager.play_music_chunk(b'\x00' * 1024)
- # Beide sollten Daten gepuffert haben
- assert audio_manager._tts_player._prebuffer_size > 0
- assert audio_manager._music_player._prebuffer_size > 0
- @pytest.mark.asyncio
- async def test_full_lifecycle(self, audio_manager):
- """Testet den vollständigen Lebenszyklus."""
- # Initialisieren
- result = await audio_manager.initialize()
- assert result is True
- # TTS abspielen
- audio_manager.play_tts_chunk(b'\x00' * 1024)
- # Musik abspielen
- audio_manager.play_music_chunk(b'\x00' * 2048)
- # Shutdown
- await audio_manager.shutdown()
- assert audio_manager._initialized is False
- assert audio_manager._tts_player.state == PlayerState.STOPPED
- assert audio_manager._music_player.state == PlayerState.STOPPED
- # =============================================================================
- # Edge Cases
- # =============================================================================
- class TestEdgeCases:
- """Tests für Grenzfälle."""
- def test_multiple_stops(self, player):
- """Testet mehrfaches Aufrufen von stop()."""
- player.add_chunk(b'\x00' * 1024)
- player.stop()
- player.stop()
- player.stop()
- assert player.state == PlayerState.STOPPED
- def test_finish_without_chunks(self, player):
- """Testet finish() ohne vorherige Chunks."""
- player.finish()
- # Sollte nur None-Marker haben
- item = player._queue.get_nowait()
- assert item is None
- def test_add_chunk_after_stop(self, player):
- """Testet add_chunk nach stop()."""
- player.add_chunk(b'\x00' * 1024)
- player.stop()
- # Neuer Chunk sollte wieder funktionieren
- result = player.add_chunk(b'\x00' * 1024)
- assert result is True
- assert player.state == PlayerState.BUFFERING
- def test_very_small_chunks(self, player):
- """Testet sehr kleine Chunks."""
- for _ in range(100):
- result = player.add_chunk(b'\x00' * 10)
- assert result is True
- def test_large_chunk(self, player):
- """Testet großen Chunk."""
- # 1 MB Chunk
- large_chunk = b'\x00' * (1024 * 1024)
- result = player.add_chunk(large_chunk)
- assert result is True
- def test_stats_after_operations(self, audio_manager):
- """Testet Stats nach verschiedenen Operationen."""
- # Initial
- stats1 = audio_manager.stats
- assert stats1["tts"]["bytes_played"] == 0
- # Nach Chunks
- audio_manager.play_tts_chunk(b'\x00' * 1024)
- stats2 = audio_manager.stats
- assert stats2["tts"]["state"] == "BUFFERING"
- # Nach Stop
- audio_manager.stop_tts()
- stats3 = audio_manager.stats
- assert stats3["tts"]["state"] == "STOPPED"
|