# -*- coding: utf-8 -*- """ Tests für Streaming Audio Player. Testet: - BlockingStreamPlayer Chunk-Handling - AudioOutputManager TTS/Musik-Verwaltung - Prebuffer und State-Management """ import asyncio import pytest import threading import time from unittest.mock import MagicMock, patch, AsyncMock from trixy_core.audio.player import ( BlockingStreamPlayer, AudioOutputManager, PlayerConfig, PlayerState, SimpleStreamingPlayer, RobustStreamingPlayer, ) # ============================================================================= # Fixtures # ============================================================================= @pytest.fixture def player_config(): """Standard Player-Konfiguration für Tests.""" return PlayerConfig( sample_rate=22050, channels=1, sample_width=2, prebuffer_ms=100, # Kurzer Prebuffer für schnelle Tests ) @pytest.fixture def player(player_config): """Erstellt einen BlockingStreamPlayer.""" return BlockingStreamPlayer(player_config) @pytest.fixture def audio_manager(): """Erstellt einen AudioOutputManager.""" return AudioOutputManager( tts_sample_rate=22050, music_sample_rate=44100, ) # ============================================================================= # Tests für PlayerConfig # ============================================================================= class TestPlayerConfig: """Tests für PlayerConfig Datenklasse.""" def test_default_values(self): """Testet Standard-Werte.""" config = PlayerConfig() assert config.sample_rate == 44100 assert config.channels == 2 assert config.sample_width == 2 assert config.prebuffer_ms == 1000 def test_custom_values(self): """Testet benutzerdefinierte Werte.""" config = PlayerConfig( sample_rate=16000, channels=1, sample_width=2, prebuffer_ms=500, ) assert config.sample_rate == 16000 assert config.channels == 1 assert config.sample_width == 2 assert config.prebuffer_ms == 500 def test_bytes_per_second(self): """Testet bytes_per_second Berechnung.""" config = PlayerConfig( sample_rate=16000, channels=1, sample_width=2, ) # 16000 * 1 * 2 = 32000 bytes/sec assert config.bytes_per_second == 32000 def test_bytes_per_second_stereo(self): """Testet bytes_per_second bei Stereo.""" config = PlayerConfig( sample_rate=44100, channels=2, sample_width=2, ) # 44100 * 2 * 2 = 176400 bytes/sec assert config.bytes_per_second == 176400 def test_prebuffer_bytes(self): """Testet prebuffer_bytes Berechnung.""" config = PlayerConfig( sample_rate=16000, channels=1, sample_width=2, prebuffer_ms=500, ) # 32000 bytes/sec * 500ms / 1000 = 16000 bytes assert config.prebuffer_bytes == 16000 def test_frame_size(self): """Testet frame_size Berechnung.""" config = PlayerConfig( sample_rate=44100, channels=2, sample_width=2, ) # 2 channels * 2 bytes = 4 bytes per frame assert config.frame_size == 4 def test_frame_size_mono(self): """Testet frame_size bei Mono.""" config = PlayerConfig( sample_rate=22050, channels=1, sample_width=2, ) # 1 channel * 2 bytes = 2 bytes per frame assert config.frame_size == 2 # ============================================================================= # Tests für PlayerState # ============================================================================= class TestPlayerState: """Tests für PlayerState Enum.""" def test_states_exist(self): """Testet, dass alle Zustände existieren.""" assert PlayerState.STOPPED assert PlayerState.BUFFERING assert PlayerState.PLAYING def test_states_are_unique(self): """Testet, dass alle Zustände unterschiedlich sind.""" states = [ PlayerState.STOPPED, PlayerState.BUFFERING, PlayerState.PLAYING, ] assert len(states) == len(set(states)) # ============================================================================= # Tests für BlockingStreamPlayer - Basis # ============================================================================= class TestBlockingStreamPlayerBasic: """Basis-Tests für BlockingStreamPlayer.""" def test_initial_state(self, player): """Testet den Initialzustand.""" assert player.state == PlayerState.STOPPED assert player.is_playing is False def test_default_config(self): """Testet Player mit Standard-Konfiguration.""" player = BlockingStreamPlayer() assert player.state == PlayerState.STOPPED assert player._config.sample_rate == 44100 def test_custom_config(self, player_config): """Testet Player mit benutzerdefinierter Konfiguration.""" player = BlockingStreamPlayer(player_config) assert player._config.sample_rate == 22050 assert player._config.channels == 1 def test_stats_initial(self, player): """Testet initiale Statistiken.""" stats = player.stats assert stats["state"] == "STOPPED" assert stats["bytes_played"] == 0 assert stats["queue_size"] == 0 def test_buffer_ms_empty(self, player): """Testet buffer_ms bei leerem Buffer.""" assert player.buffer_ms == 0 # ============================================================================= # Tests für BlockingStreamPlayer - Aliases # ============================================================================= class TestPlayerAliases: """Tests für Klassen-Aliases.""" def test_simple_streaming_player_alias(self): """Testet SimpleStreamingPlayer Alias.""" assert SimpleStreamingPlayer is BlockingStreamPlayer def test_robust_streaming_player_alias(self): """Testet RobustStreamingPlayer Alias.""" assert RobustStreamingPlayer is BlockingStreamPlayer # ============================================================================= # Tests für BlockingStreamPlayer - Chunk-Handling # ============================================================================= class TestBlockingStreamPlayerChunks: """Tests für Chunk-Handling.""" def test_add_chunk_starts_buffering(self, player): """Testet, dass add_chunk den Zustand auf BUFFERING setzt.""" audio_data = b'\x00' * 1024 result = player.add_chunk(audio_data) assert result is True assert player.state == PlayerState.BUFFERING def test_add_chunk_returns_true(self, player): """Testet, dass add_chunk True zurückgibt.""" result = player.add_chunk(b'\x00' * 1024) assert result is True def test_add_empty_chunk_returns_false(self, player): """Testet, dass leere Chunks abgelehnt werden.""" result = player.add_chunk(b'') assert result is False def test_add_none_chunk_returns_false(self, player): """Testet, dass None-Chunks abgelehnt werden.""" result = player.add_chunk(None) assert result is False def test_prebuffer_fills_before_playing(self, player_config): """Testet, dass Prebuffer gefüllt wird bevor Wiedergabe startet.""" # Prebuffer = 100ms bei 22050Hz mono = ~4410 bytes player = BlockingStreamPlayer(player_config) # Kleiner Chunk - sollte in Prebuffer bleiben player.add_chunk(b'\x00' * 1000) assert player.state == PlayerState.BUFFERING def test_buffer_ms_increases_with_chunks(self, player): """Testet, dass buffer_ms mit Chunks steigt.""" initial_ms = player.buffer_ms # Chunks hinzufügen for _ in range(5): player.add_chunk(b'\x00' * 1024) # buffer_ms sollte größer sein (Prebuffer zählt) # Note: buffer_ms berechnet aus queue und prebuffer assert player.buffer_ms >= 0 # ============================================================================= # Tests für BlockingStreamPlayer - State Transitions # ============================================================================= class TestBlockingStreamPlayerStateTransitions: """Tests für Zustandsübergänge.""" def test_stop_from_stopped(self, player): """Testet stop() im STOPPED-Zustand.""" assert player.state == PlayerState.STOPPED player.stop() assert player.state == PlayerState.STOPPED def test_stop_clears_queue(self, player): """Testet, dass stop() die Queue leert.""" player.add_chunk(b'\x00' * 1024) player.stop() assert player._queue.empty() def test_stop_clears_prebuffer(self, player): """Testet, dass stop() den Prebuffer leert.""" player.add_chunk(b'\x00' * 1024) assert player._prebuffer_size > 0 player.stop() assert player._prebuffer_size == 0 assert len(player._prebuffer) == 0 def test_finish_flushes_prebuffer(self, player): """Testet, dass finish() den Prebuffer in die Queue schiebt.""" player.add_chunk(b'\x00' * 1024) assert player.state == PlayerState.BUFFERING assert player._prebuffer_size > 0 player.finish() # Prebuffer sollte in Queue verschoben sein assert player._prebuffer_size == 0 # Queue sollte Daten + None (Ende-Marker) haben assert not player._queue.empty() def test_finish_adds_none_marker(self, player): """Testet, dass finish() einen None-Marker hinzufügt.""" player.finish() # Queue sollte None enthalten item = player._queue.get_nowait() assert item is None def test_is_playing_false_when_stopped(self, player): """Testet is_playing bei STOPPED-Zustand.""" assert player.state == PlayerState.STOPPED assert player.is_playing is False def test_is_playing_false_when_buffering(self, player): """Testet is_playing bei BUFFERING-Zustand.""" player.add_chunk(b'\x00' * 100) assert player.state == PlayerState.BUFFERING assert player.is_playing is False # ============================================================================= # Tests für BlockingStreamPlayer - Pause # ============================================================================= class TestBlockingStreamPlayerPause: """Tests für Pause-Funktionalität.""" def test_pause_from_stopped(self, player): """Testet, dass pause() im STOPPED-Zustand nichts tut.""" assert player.state == PlayerState.STOPPED player.pause() # State bleibt STOPPED (kann nicht von STOPPED pausieren) assert player.state == PlayerState.STOPPED def test_resume_from_stopped(self, player): """Testet, dass resume() im STOPPED-Zustand nichts tut.""" assert player.state == PlayerState.STOPPED player.resume() assert player.state == PlayerState.STOPPED def test_toggle_pause_from_stopped(self, player): """Testet toggle_pause() im STOPPED-Zustand.""" assert player.state == PlayerState.STOPPED result = player.toggle_pause() assert result is False assert player.state == PlayerState.STOPPED def test_is_paused_false_initially(self, player): """Testet is_paused Property initial.""" assert player.is_paused is False def test_is_paused_false_when_stopped(self, player): """Testet is_paused bei STOPPED-Zustand.""" assert player.state == PlayerState.STOPPED assert player.is_paused is False def test_pause_sets_state_paused(self, player): """Testet, dass pause() den State auf PAUSED setzt.""" # Manuell auf PLAYING setzen (ohne echten Thread) player._state = PlayerState.PLAYING player.pause() assert player.state == PlayerState.PAUSED assert player.is_paused is True def test_resume_sets_state_playing(self, player): """Testet, dass resume() den State auf PLAYING setzt.""" # Manuell auf PAUSED setzen player._state = PlayerState.PAUSED player._pause_event.set() player.resume() assert player.state == PlayerState.PLAYING assert player.is_paused is False def test_toggle_pause_from_playing(self, player): """Testet toggle_pause() von PLAYING nach PAUSED.""" player._state = PlayerState.PLAYING result = player.toggle_pause() assert result is True assert player.state == PlayerState.PAUSED assert player._pause_event.is_set() def test_toggle_pause_from_paused(self, player): """Testet toggle_pause() von PAUSED nach PLAYING.""" player._state = PlayerState.PAUSED player._pause_event.set() result = player.toggle_pause() assert result is False assert player.state == PlayerState.PLAYING assert not player._pause_event.is_set() def test_stop_clears_pause_event(self, player): """Testet, dass stop() das Pause-Event löscht.""" player._state = PlayerState.PAUSED player._pause_event.set() player.stop() assert not player._pause_event.is_set() assert player.state == PlayerState.STOPPED # ============================================================================= # Tests für BlockingStreamPlayer - Callbacks # ============================================================================= class TestBlockingStreamPlayerCallbacks: """Tests für Callback-Registrierung.""" def test_on_finished_callback_registration(self, player): """Testet on_finished Callback-Registrierung.""" callback = MagicMock() player.on_finished(callback) assert player._on_finished is callback def test_on_finished_callback_none(self, player): """Testet, dass on_finished None akzeptiert.""" player.on_finished(None) assert player._on_finished is None # ============================================================================= # Tests für BlockingStreamPlayer - Thread-Safety # ============================================================================= class TestBlockingStreamPlayerThreadSafety: """Tests für Thread-Sicherheit.""" def test_concurrent_chunk_adding(self, player): """Testet gleichzeitiges Hinzufügen von Chunks aus mehreren Threads.""" chunks_added = [] lock = threading.Lock() def add_chunks(thread_id): for i in range(10): chunk_data = bytes([thread_id] * 100) result = player.add_chunk(chunk_data) if result: with lock: chunks_added.append((thread_id, i)) threads = [ threading.Thread(target=add_chunks, args=(i,)) for i in range(5) ] for t in threads: t.start() for t in threads: t.join() # Alle Chunks sollten hinzugefügt worden sein assert len(chunks_added) == 50 def test_state_access_thread_safe(self, player): """Testet Thread-sicheren Zugriff auf state.""" states_read = [] def read_state(count): for _ in range(count): states_read.append(player.state) time.sleep(0.001) threads = [ threading.Thread(target=read_state, args=(20,)) for _ in range(5) ] for t in threads: t.start() for t in threads: t.join() # Alle Zustandsabfragen sollten erfolgreich sein assert len(states_read) == 100 # ============================================================================= # Tests für AudioOutputManager - Basis # ============================================================================= class TestAudioOutputManagerBasic: """Basis-Tests für AudioOutputManager.""" def test_initialization(self, audio_manager): """Testet die Initialisierung.""" assert audio_manager._tts_player is not None assert audio_manager._music_player is not None assert audio_manager._initialized is False def test_custom_sample_rates(self): """Testet benutzerdefinierte Sample-Rates.""" manager = AudioOutputManager( tts_sample_rate=16000, music_sample_rate=48000, ) assert manager._tts_player._config.sample_rate == 16000 assert manager._music_player._config.sample_rate == 48000 def test_tts_player_config(self, audio_manager): """Testet TTS-Player-Konfiguration.""" config = audio_manager._tts_player._config assert config.sample_rate == 22050 assert config.channels == 1 assert config.prebuffer_ms == 500 def test_music_player_config(self, audio_manager): """Testet Musik-Player-Konfiguration.""" config = audio_manager._music_player._config assert config.sample_rate == 44100 assert config.channels == 2 assert config.prebuffer_ms == 2000 # ============================================================================= # Tests für AudioOutputManager - Initialize/Shutdown # ============================================================================= class TestAudioOutputManagerLifecycle: """Tests für Initialisierung und Shutdown.""" @pytest.mark.asyncio async def test_initialize_returns_true(self, audio_manager): """Testet, dass initialize() True zurückgibt.""" result = await audio_manager.initialize() assert result is True assert audio_manager._initialized is True @pytest.mark.asyncio async def test_shutdown(self, audio_manager): """Testet Shutdown.""" await audio_manager.initialize() await audio_manager.shutdown() assert audio_manager._initialized is False # ============================================================================= # Tests für AudioOutputManager - TTS # ============================================================================= class TestAudioOutputManagerTTS: """Tests für TTS-Funktionen.""" def test_play_tts_chunk(self, audio_manager): """Testet TTS-Chunk-Wiedergabe.""" result = audio_manager.play_tts_chunk(b'\x00' * 1024) assert result is True def test_play_tts_chunk_empty(self, audio_manager): """Testet, dass leere TTS-Chunks abgelehnt werden.""" result = audio_manager.play_tts_chunk(b'') assert result is False def test_finish_tts(self, audio_manager): """Testet TTS-Stream beenden.""" audio_manager.play_tts_chunk(b'\x00' * 1024) audio_manager.finish_tts() # Sollte None-Marker in Queue haben assert not audio_manager._tts_player._queue.empty() def test_stop_tts(self, audio_manager): """Testet TTS-Wiedergabe stoppen.""" audio_manager.play_tts_chunk(b'\x00' * 1024) audio_manager.stop_tts() assert audio_manager._tts_player.state == PlayerState.STOPPED def test_tts_playing_property(self, audio_manager): """Testet tts_playing Property.""" assert audio_manager.tts_playing is False # ============================================================================= # Tests für AudioOutputManager - Musik # ============================================================================= class TestAudioOutputManagerMusic: """Tests für Musik-Funktionen.""" def test_play_music_chunk(self, audio_manager): """Testet Musik-Chunk-Wiedergabe.""" result = audio_manager.play_music_chunk(b'\x00' * 1024) assert result is True def test_play_music_chunk_empty(self, audio_manager): """Testet, dass leere Musik-Chunks abgelehnt werden.""" result = audio_manager.play_music_chunk(b'') assert result is False def test_finish_music(self, audio_manager): """Testet Musik-Stream beenden.""" audio_manager.play_music_chunk(b'\x00' * 1024) audio_manager.finish_music() # Sollte None-Marker in Queue haben assert not audio_manager._music_player._queue.empty() def test_stop_music(self, audio_manager): """Testet Musik-Wiedergabe stoppen.""" audio_manager.play_music_chunk(b'\x00' * 1024) audio_manager.stop_music() assert audio_manager._music_player.state == PlayerState.STOPPED def test_music_playing_property(self, audio_manager): """Testet music_playing Property.""" assert audio_manager.music_playing is False # ============================================================================= # Tests für AudioOutputManager - Kombiniert # ============================================================================= class TestAudioOutputManagerCombined: """Tests für kombinierte Funktionen.""" def test_stop_all(self, audio_manager): """Testet alle Wiedergaben stoppen.""" audio_manager.play_tts_chunk(b'\x00' * 1024) audio_manager.play_music_chunk(b'\x00' * 1024) audio_manager.stop_all() assert audio_manager._tts_player.state == PlayerState.STOPPED assert audio_manager._music_player.state == PlayerState.STOPPED def test_stats(self, audio_manager): """Testet kombinierte Statistiken.""" stats = audio_manager.stats assert "tts" in stats assert "music" in stats assert "state" in stats["tts"] assert "state" in stats["music"] def test_simultaneous_tts_and_music(self, audio_manager): """Testet gleichzeitige TTS- und Musik-Pufferung.""" # TTS-Chunks for _ in range(3): audio_manager.play_tts_chunk(b'\x00' * 1024) # Musik-Chunks for _ in range(3): audio_manager.play_music_chunk(b'\x00' * 2048) # Beide sollten Daten haben assert audio_manager._tts_player._prebuffer_size > 0 assert audio_manager._music_player._prebuffer_size > 0 # Nur TTS stoppen audio_manager.stop_tts() assert audio_manager._tts_player.state == PlayerState.STOPPED assert audio_manager._music_player.state == PlayerState.BUFFERING # ============================================================================= # Tests für AudioOutputManager - Musik-Pause # ============================================================================= class TestAudioOutputManagerMusicPause: """Tests für Musik-Pause-Funktionalität.""" def test_music_paused_initial(self, audio_manager): """Testet music_paused Property initial.""" assert audio_manager.music_paused is False def test_pause_music(self, audio_manager): """Testet pause_music() Methode.""" # Musik-Player auf PLAYING setzen audio_manager._music_player._state = PlayerState.PLAYING audio_manager.pause_music() assert audio_manager.music_paused is True assert audio_manager._music_player.state == PlayerState.PAUSED def test_resume_music(self, audio_manager): """Testet resume_music() Methode.""" # Musik-Player auf PAUSED setzen audio_manager._music_player._state = PlayerState.PAUSED audio_manager._music_player._pause_event.set() audio_manager.resume_music() assert audio_manager.music_paused is False assert audio_manager._music_player.state == PlayerState.PLAYING def test_toggle_music_pause_from_playing(self, audio_manager): """Testet toggle_music_pause() von PLAYING nach PAUSED.""" audio_manager._music_player._state = PlayerState.PLAYING result = audio_manager.toggle_music_pause() assert result is True assert audio_manager.music_paused is True def test_toggle_music_pause_from_paused(self, audio_manager): """Testet toggle_music_pause() von PAUSED nach PLAYING.""" audio_manager._music_player._state = PlayerState.PAUSED audio_manager._music_player._pause_event.set() result = audio_manager.toggle_music_pause() assert result is False assert audio_manager.music_paused is False def test_stop_all_clears_pause(self, audio_manager): """Testet, dass stop_all() auch Pause aufhebt.""" audio_manager._music_player._state = PlayerState.PAUSED audio_manager._music_player._pause_event.set() audio_manager.stop_all() assert not audio_manager._music_player._pause_event.is_set() assert audio_manager._music_player.state == PlayerState.STOPPED # ============================================================================= # Tests für AudioOutputManager - Buffer-Status # ============================================================================= class TestAudioOutputManagerBufferStatus: """Tests für Buffer-Status-Properties.""" def test_music_buffer_full_initial(self, audio_manager): """Testet music_buffer_full bei leerem Buffer.""" assert audio_manager.music_buffer_full is False def test_music_buffer_low_initial(self, audio_manager): """Testet music_buffer_low bei leerem Buffer.""" # Leerer Buffer = low assert audio_manager.music_buffer_low is True def test_music_buffer_ms_initial(self, audio_manager): """Testet music_buffer_ms bei leerem Buffer.""" assert audio_manager.music_buffer_ms == 0 def test_tts_buffer_full_initial(self, audio_manager): """Testet tts_buffer_full bei leerem Buffer.""" assert audio_manager.tts_buffer_full is False def test_can_accept_music_chunk_empty(self, audio_manager): """Testet can_accept_music_chunk bei leerer Queue.""" assert audio_manager.can_accept_music_chunk() is True def test_can_accept_tts_chunk_empty(self, audio_manager): """Testet can_accept_tts_chunk bei leerer Queue.""" assert audio_manager.can_accept_tts_chunk() is True # ============================================================================= # Tests für Audio-Format-Konfiguration # ============================================================================= class TestAudioFormatConfiguration: """Tests für dynamische Audio-Format-Konfiguration.""" def test_tts_format_property(self, audio_manager): """Testet tts_format Property.""" format_info = audio_manager.tts_format assert format_info["sample_rate"] == 22050 assert format_info["channels"] == 1 assert format_info["sample_width"] == 2 def test_music_format_property(self, audio_manager): """Testet music_format Property.""" format_info = audio_manager.music_format assert format_info["sample_rate"] == 44100 assert format_info["channels"] == 2 assert format_info["sample_width"] == 2 @pytest.mark.asyncio async def test_update_tts_format(self, audio_manager): """Testet TTS-Format-Aktualisierung.""" assert audio_manager.tts_format["sample_rate"] == 22050 await audio_manager.update_tts_format( sample_rate=16000, channels=1, sample_width=2 ) assert audio_manager.tts_format["sample_rate"] == 16000 assert audio_manager.tts_format["channels"] == 1 @pytest.mark.asyncio async def test_update_music_format(self, audio_manager): """Testet Musik-Format-Aktualisierung.""" assert audio_manager.music_format["sample_rate"] == 44100 await audio_manager.update_music_format( sample_rate=48000, channels=2, sample_width=2 ) assert audio_manager.music_format["sample_rate"] == 48000 assert audio_manager.music_format["channels"] == 2 @pytest.mark.asyncio async def test_update_format_stops_current_playback(self, audio_manager): """Testet, dass Format-Aktualisierung die Wiedergabe stoppt.""" # Chunks hinzufügen for _ in range(5): audio_manager.play_tts_chunk(b'\x00' * 1024) # Format aktualisieren await audio_manager.update_tts_format(sample_rate=16000) # Neuer Player sollte leer sein assert audio_manager._tts_player._prebuffer_size == 0 assert audio_manager._tts_player.state == PlayerState.STOPPED @pytest.mark.asyncio async def test_update_format_preserves_initialized_state(self, audio_manager): """Testet, dass _initialized nach Format-Update erhalten bleibt.""" await audio_manager.initialize() assert audio_manager._initialized is True await audio_manager.update_tts_format(sample_rate=16000) # _initialized sollte noch True sein assert audio_manager._initialized is True # ============================================================================= # Integration Tests # ============================================================================= class TestAudioPlayerIntegration: """Integrationstests für Audio-Player.""" def test_full_tts_flow(self, audio_manager): """Testet den vollständigen TTS-Flow ohne echte Wiedergabe.""" # Chunks hinzufügen for _ in range(5): result = audio_manager.play_tts_chunk(b'\x00' * 1024) assert result is True # Stream beenden audio_manager.finish_tts() # Aufräumen audio_manager.stop_tts() assert audio_manager._tts_player.state == PlayerState.STOPPED def test_full_music_flow(self, audio_manager): """Testet den vollständigen Musik-Flow ohne echte Wiedergabe.""" # Chunks hinzufügen for _ in range(5): result = audio_manager.play_music_chunk(b'\x00' * 2048) assert result is True # Stream beenden audio_manager.finish_music() # Aufräumen audio_manager.stop_music() assert audio_manager._music_player.state == PlayerState.STOPPED def test_interleaved_tts_and_music(self, audio_manager): """Testet abwechselnde TTS- und Musik-Chunks.""" for i in range(10): if i % 2 == 0: audio_manager.play_tts_chunk(b'\x00' * 512) else: audio_manager.play_music_chunk(b'\x00' * 1024) # Beide sollten Daten gepuffert haben assert audio_manager._tts_player._prebuffer_size > 0 assert audio_manager._music_player._prebuffer_size > 0 @pytest.mark.asyncio async def test_full_lifecycle(self, audio_manager): """Testet den vollständigen Lebenszyklus.""" # Initialisieren result = await audio_manager.initialize() assert result is True # TTS abspielen audio_manager.play_tts_chunk(b'\x00' * 1024) # Musik abspielen audio_manager.play_music_chunk(b'\x00' * 2048) # Shutdown await audio_manager.shutdown() assert audio_manager._initialized is False assert audio_manager._tts_player.state == PlayerState.STOPPED assert audio_manager._music_player.state == PlayerState.STOPPED # ============================================================================= # Edge Cases # ============================================================================= class TestEdgeCases: """Tests für Grenzfälle.""" def test_multiple_stops(self, player): """Testet mehrfaches Aufrufen von stop().""" player.add_chunk(b'\x00' * 1024) player.stop() player.stop() player.stop() assert player.state == PlayerState.STOPPED def test_finish_without_chunks(self, player): """Testet finish() ohne vorherige Chunks.""" player.finish() # Sollte nur None-Marker haben item = player._queue.get_nowait() assert item is None def test_add_chunk_after_stop(self, player): """Testet add_chunk nach stop().""" player.add_chunk(b'\x00' * 1024) player.stop() # Neuer Chunk sollte wieder funktionieren result = player.add_chunk(b'\x00' * 1024) assert result is True assert player.state == PlayerState.BUFFERING def test_very_small_chunks(self, player): """Testet sehr kleine Chunks.""" for _ in range(100): result = player.add_chunk(b'\x00' * 10) assert result is True def test_large_chunk(self, player): """Testet großen Chunk.""" # 1 MB Chunk large_chunk = b'\x00' * (1024 * 1024) result = player.add_chunk(large_chunk) assert result is True def test_stats_after_operations(self, audio_manager): """Testet Stats nach verschiedenen Operationen.""" # Initial stats1 = audio_manager.stats assert stats1["tts"]["bytes_played"] == 0 # Nach Chunks audio_manager.play_tts_chunk(b'\x00' * 1024) stats2 = audio_manager.stats assert stats2["tts"]["state"] == "BUFFERING" # Nach Stop audio_manager.stop_tts() stats3 = audio_manager.stats assert stats3["tts"]["state"] == "STOPPED"