# -*- coding: utf-8 -*- """ Tests für Live-Audio-Streaming zwischen Satellites. Testet: - Streaming EventData-Klassen - Satellite Streaming Properties - start_stream / stop_stream Methoden - start_record_stream / stop_record_stream Methoden - Private Handler (_on_stream_audio_received, etc.) - Mutual Exclusion (Recording vs. Streaming) - Disconnect-Guard - Integration mit echtem EventManager """ import asyncio import os import wave import pytest from unittest.mock import AsyncMock, MagicMock, patch from trixy_core.events.event_data.basic import ( SatelliteStreamStarted, SatelliteStreamStopped, SatelliteStreamRecordStarted, SatelliteStreamRecordStopped, ) from trixy_core.satellite.satellite import Satellite, ConnectionState # ============================================================================= # Fixtures # ============================================================================= @pytest.fixture def mock_event_manager(): """Erstellt einen Mock-EventManager.""" manager = MagicMock() manager.trigger = AsyncMock() manager.register = MagicMock() manager.unregister = MagicMock(return_value=True) return manager @pytest.fixture def mock_application(mock_event_manager): """Erstellt eine Mock-Application.""" app = MagicMock() app.events = mock_event_manager return app @pytest.fixture def source_satellite(mock_application): """Erstellt einen verbundenen Quell-Satellite (Mikrofon).""" sat = Satellite( satellite_id="sat-source-001", room_id="wohnzimmer", mac_address="AA:BB:CC:DD:EE:01", alias="Wohnzimmer", application=mock_application, ) sat.set_connected() return sat @pytest.fixture def target_satellite(mock_application): """Erstellt einen verbundenen Ziel-Satellite (Lautsprecher).""" sat = Satellite( satellite_id="sat-target-002", room_id="buero", mac_address="AA:BB:CC:DD:EE:02", alias="Büro", application=mock_application, ) sat.set_connected() # Mock audio_out Socket mock_writer = MagicMock() mock_writer.write = MagicMock() mock_writer.drain = AsyncMock() sat.sockets.audio_out = mock_writer return sat # ============================================================================= # Tests für Streaming EventData-Klassen # ============================================================================= class TestStreamingEventDataClasses: """Tests für Streaming Event-Datenklassen.""" def test_stream_started_creation(self): """Testet SatelliteStreamStarted Erstellung.""" event = SatelliteStreamStarted( satellite_id="sat-001", target_satellite_id="sat-002", stream_id="stream-abc123", ) assert event.satellite_id == "sat-001" assert event.target_satellite_id == "sat-002" assert event.stream_id == "stream-abc123" def test_stream_stopped_creation(self): """Testet SatelliteStreamStopped Erstellung.""" event = SatelliteStreamStopped( satellite_id="sat-001", target_satellite_id="sat-002", stream_id="stream-abc123", reason="manual", duration_seconds=45.3, bytes_forwarded=1024000, ) assert event.reason == "manual" assert event.duration_seconds == 45.3 assert event.bytes_forwarded == 1024000 def test_stream_record_started_creation(self): """Testet SatelliteStreamRecordStarted Erstellung.""" event = SatelliteStreamRecordStarted( satellite_id="sat-001", stream_id="stream-abc123", filename="/tmp/live.wav", ) assert event.filename == "/tmp/live.wav" def test_stream_record_stopped_creation(self): """Testet SatelliteStreamRecordStopped Erstellung.""" event = SatelliteStreamRecordStopped( satellite_id="sat-001", stream_id="stream-abc123", filename="/tmp/live.wav", reason="manual", duration_seconds=10.5, file_size_bytes=336000, ) assert event.reason == "manual" assert event.file_size_bytes == 336000 def test_stream_started_defaults(self): """Testet Standardwerte von SatelliteStreamStarted.""" event = SatelliteStreamStarted() assert event.satellite_id == "" assert event.target_satellite_id == "" assert event.stream_id == "" def test_stream_stopped_defaults(self): """Testet Standardwerte von SatelliteStreamStopped.""" event = SatelliteStreamStopped() assert event.duration_seconds == 0.0 assert event.bytes_forwarded == 0 # ============================================================================= # Tests für Streaming Properties # ============================================================================= class TestStreamingProperties: """Tests für Satellite Streaming-Properties.""" def test_is_streaming_initial(self, source_satellite): """Testet initialen Streaming-Status.""" assert source_satellite.is_streaming is False def test_stream_id_initial(self, source_satellite): """Testet initiale Stream-ID.""" assert source_satellite.stream_id == "" def test_stream_target_initial(self, source_satellite): """Testet initiales Stream-Ziel.""" assert source_satellite.stream_target is None def test_is_stream_recording_initial(self, source_satellite): """Testet initialen Stream-Recording-Status.""" assert source_satellite.is_stream_recording is False # ============================================================================= # Tests für start_stream # ============================================================================= class TestStartStream: """Tests für start_stream Methode.""" @pytest.mark.asyncio async def test_start_stream_success(self, source_satellite, target_satellite, mock_event_manager): """Testet erfolgreichen Stream-Start.""" # Mock send_command source_satellite.send_command = AsyncMock(return_value=True) result = await source_satellite.start_stream(target_satellite) assert result is True assert source_satellite.is_streaming is True assert source_satellite.stream_target is target_satellite assert source_satellite.stream_id.startswith("stream-") @pytest.mark.asyncio async def test_start_stream_triggers_event(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass start_stream das satellite_stream_started Event auslöst.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) mock_event_manager.trigger.assert_called_once() call_args = mock_event_manager.trigger.call_args assert call_args[0][0] == "satellite_stream_started" event_data = call_args[0][1] assert isinstance(event_data, SatelliteStreamStarted) assert event_data.satellite_id == "sat-source-001" assert event_data.target_satellite_id == "sat-target-002" @pytest.mark.asyncio async def test_start_stream_sends_record_start_command(self, source_satellite, target_satellite): """Testet, dass SatelliteRecordStart Command gesendet wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) source_satellite.send_command.assert_called_once() cmd = source_satellite.send_command.call_args[0][0] from trixy_core.network.cmd import SatelliteRecordStart assert isinstance(cmd, SatelliteRecordStart) @pytest.mark.asyncio async def test_start_stream_registers_listeners(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass Event-Listener registriert werden.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) # Zwei Listener: audio_input_received und satellite_record_stopped assert mock_event_manager.register.call_count == 2 event_names = [call[0][0] for call in mock_event_manager.register.call_args_list] assert "audio_input_received" in event_names assert "satellite_record_stopped" in event_names @pytest.mark.asyncio async def test_start_stream_no_application(self): """Testet Fehler ohne Application-Referenz.""" sat = Satellite(satellite_id="no-app") target = Satellite(satellite_id="target") result = await sat.start_stream(target) assert result is False @pytest.mark.asyncio async def test_start_stream_not_connected(self, mock_application, target_satellite): """Testet Fehler wenn Quell-Satellite nicht verbunden.""" sat = Satellite(satellite_id="disconnected", application=mock_application) result = await sat.start_stream(target_satellite) assert result is False @pytest.mark.asyncio async def test_start_stream_already_streaming(self, source_satellite, target_satellite): """Testet Fehler wenn bereits gestreamt wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) result = await source_satellite.start_stream(target_satellite) assert result is False @pytest.mark.asyncio async def test_start_stream_active_conversation(self, source_satellite, target_satellite): """Testet Fehler bei aktiver Konversation.""" source_satellite.conversation_id = "conv-123" result = await source_satellite.start_stream(target_satellite) assert result is False @pytest.mark.asyncio async def test_start_stream_self_target(self, source_satellite): """Testet Fehler bei Stream zu sich selbst.""" result = await source_satellite.start_stream(source_satellite) assert result is False @pytest.mark.asyncio async def test_start_stream_target_not_connected(self, source_satellite, mock_application): """Testet Fehler wenn Ziel-Satellite nicht verbunden.""" target = Satellite( satellite_id="target-disconnected", application=mock_application, ) # Nicht set_connected() aufrufen result = await source_satellite.start_stream(target) assert result is False @pytest.mark.asyncio async def test_start_stream_target_no_audio_out(self, source_satellite, mock_application): """Testet Fehler wenn Ziel keinen audio_out Socket hat.""" target = Satellite( satellite_id="target-no-audio", application=mock_application, ) target.set_connected() # audio_out bleibt None result = await source_satellite.start_stream(target) assert result is False @pytest.mark.asyncio async def test_start_stream_stops_active_recording(self, source_satellite, target_satellite, mock_event_manager): """Testet Mutual Exclusion: Stream stoppt laufende Aufnahme.""" source_satellite.send_command = AsyncMock(return_value=True) # Recording simulieren source_satellite._recording_active = True source_satellite._recording_id = "rec-test" source_satellite._recording_listener_registered = True result = await source_satellite.start_stream(target_satellite) assert result is True assert source_satellite.is_streaming is True # Recording sollte gestoppt worden sein assert source_satellite._recording_active is False @pytest.mark.asyncio async def test_start_stream_with_timeout(self, source_satellite, target_satellite): """Testet Stream-Start mit Timeout.""" source_satellite.send_command = AsyncMock(return_value=True) result = await source_satellite.start_stream(target_satellite, timeout=30.0) assert result is True assert source_satellite._stream_timeout_handle is not None @pytest.mark.asyncio async def test_start_stream_without_timeout(self, source_satellite, target_satellite): """Testet Stream-Start ohne Timeout (unbegrenzt).""" source_satellite.send_command = AsyncMock(return_value=True) result = await source_satellite.start_stream(target_satellite, timeout=0.0) assert result is True assert source_satellite._stream_timeout_handle is None # ============================================================================= # Tests für stop_stream # ============================================================================= class TestStopStream: """Tests für stop_stream Methode.""" @pytest.mark.asyncio async def test_stop_stream_success(self, source_satellite, target_satellite, mock_event_manager): """Testet erfolgreiches Stream-Stoppen.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) mock_event_manager.trigger.reset_mock() result = await source_satellite.stop_stream() assert result is True assert source_satellite.is_streaming is False assert source_satellite.stream_target is None assert source_satellite.stream_id == "" @pytest.mark.asyncio async def test_stop_stream_triggers_event(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass stop_stream das satellite_stream_stopped Event auslöst.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) mock_event_manager.trigger.reset_mock() await source_satellite.stop_stream(reason="manual") mock_event_manager.trigger.assert_called_once() call_args = mock_event_manager.trigger.call_args assert call_args[0][0] == "satellite_stream_stopped" event_data = call_args[0][1] assert isinstance(event_data, SatelliteStreamStopped) assert event_data.reason == "manual" assert event_data.satellite_id == "sat-source-001" assert event_data.target_satellite_id == "sat-target-002" @pytest.mark.asyncio async def test_stop_stream_sends_record_stop_command(self, source_satellite, target_satellite): """Testet, dass SatelliteRecordStop Command gesendet wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) source_satellite.send_command.reset_mock() await source_satellite.stop_stream() source_satellite.send_command.assert_called_once() cmd = source_satellite.send_command.call_args[0][0] from trixy_core.network.cmd import SatelliteRecordStop assert isinstance(cmd, SatelliteRecordStop) @pytest.mark.asyncio async def test_stop_stream_unregisters_listeners(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass Event-Listener entfernt werden.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) mock_event_manager.unregister.reset_mock() await source_satellite.stop_stream() assert mock_event_manager.unregister.call_count == 2 event_names = [call[0][0] for call in mock_event_manager.unregister.call_args_list] assert "audio_input_received" in event_names assert "satellite_record_stopped" in event_names @pytest.mark.asyncio async def test_stop_stream_cancels_timeout(self, source_satellite, target_satellite): """Testet, dass der Timeout-Timer abgebrochen wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite, timeout=60.0) # TimerHandle durch Mock ersetzen um cancel-Aufruf zu prüfen mock_handle = MagicMock() source_satellite._stream_timeout_handle = mock_handle await source_satellite.stop_stream() assert source_satellite._stream_timeout_handle is None mock_handle.cancel.assert_called_once() @pytest.mark.asyncio async def test_stop_stream_not_streaming(self, source_satellite): """Testet Fehler wenn kein Stream aktiv.""" result = await source_satellite.stop_stream() assert result is False @pytest.mark.asyncio async def test_stop_stream_with_reason(self, source_satellite, target_satellite, mock_event_manager): """Testet verschiedene Stop-Gründe.""" source_satellite.send_command = AsyncMock(return_value=True) for reason in ("manual", "timeout", "wakeword", "disconnect", "target_disconnect"): await source_satellite.start_stream(target_satellite) mock_event_manager.trigger.reset_mock() await source_satellite.stop_stream(reason=reason) event_data = mock_event_manager.trigger.call_args[0][1] assert event_data.reason == reason @pytest.mark.asyncio async def test_stop_stream_also_stops_recording(self, source_satellite, target_satellite, temp_dir, mock_event_manager): """Testet, dass stop_stream auch Stream-Aufnahme stoppt.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) wav_path = os.path.join(str(temp_dir), "test_stream.wav") await source_satellite.start_record_stream(wav_path) assert source_satellite.is_stream_recording is True await source_satellite.stop_stream() assert source_satellite.is_stream_recording is False assert source_satellite.is_streaming is False # ============================================================================= # Tests für start_record_stream / stop_record_stream # ============================================================================= class TestStreamRecording: """Tests für Stream-Aufnahme auf Disk.""" @pytest.mark.asyncio async def test_start_record_stream_success(self, source_satellite, target_satellite, temp_dir, mock_event_manager): """Testet erfolgreichen Start der Stream-Aufnahme.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) mock_event_manager.trigger.reset_mock() wav_path = os.path.join(str(temp_dir), "test_record.wav") result = await source_satellite.start_record_stream(wav_path) assert result is True assert source_satellite.is_stream_recording is True @pytest.mark.asyncio async def test_start_record_stream_triggers_event(self, source_satellite, target_satellite, temp_dir, mock_event_manager): """Testet, dass start_record_stream Event auslöst.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) mock_event_manager.trigger.reset_mock() wav_path = os.path.join(str(temp_dir), "test_record.wav") await source_satellite.start_record_stream(wav_path) mock_event_manager.trigger.assert_called_once() call_args = mock_event_manager.trigger.call_args assert call_args[0][0] == "satellite_stream_record_started" event_data = call_args[0][1] assert isinstance(event_data, SatelliteStreamRecordStarted) assert event_data.filename == wav_path @pytest.mark.asyncio async def test_start_record_stream_no_active_stream(self, source_satellite, temp_dir): """Testet Fehler wenn kein Stream aktiv.""" wav_path = os.path.join(str(temp_dir), "test.wav") result = await source_satellite.start_record_stream(wav_path) assert result is False @pytest.mark.asyncio async def test_start_record_stream_already_recording(self, source_satellite, target_satellite, temp_dir): """Testet Fehler wenn bereits aufgenommen wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) wav_path = os.path.join(str(temp_dir), "test.wav") await source_satellite.start_record_stream(wav_path) result = await source_satellite.start_record_stream(wav_path) assert result is False @pytest.mark.asyncio async def test_start_record_stream_invalid_path(self, source_satellite, target_satellite): """Testet Fehler bei ungültigem Dateipfad.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) result = await source_satellite.start_record_stream("/nonexistent/dir/test.wav") assert result is False @pytest.mark.asyncio async def test_stop_record_stream_success(self, source_satellite, target_satellite, temp_dir, mock_event_manager): """Testet erfolgreiches Stoppen der Stream-Aufnahme.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) wav_path = os.path.join(str(temp_dir), "test_stop.wav") await source_satellite.start_record_stream(wav_path) mock_event_manager.trigger.reset_mock() result = await source_satellite.stop_record_stream() assert result is True assert source_satellite.is_stream_recording is False @pytest.mark.asyncio async def test_stop_record_stream_triggers_event(self, source_satellite, target_satellite, temp_dir, mock_event_manager): """Testet, dass stop_record_stream Event auslöst.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) wav_path = os.path.join(str(temp_dir), "test_stop_event.wav") await source_satellite.start_record_stream(wav_path) mock_event_manager.trigger.reset_mock() await source_satellite.stop_record_stream(reason="test_stop") mock_event_manager.trigger.assert_called_once() call_args = mock_event_manager.trigger.call_args assert call_args[0][0] == "satellite_stream_record_stopped" event_data = call_args[0][1] assert isinstance(event_data, SatelliteStreamRecordStopped) assert event_data.reason == "test_stop" assert event_data.filename == wav_path @pytest.mark.asyncio async def test_stop_record_stream_not_recording(self, source_satellite): """Testet Fehler wenn keine Stream-Aufnahme aktiv.""" result = await source_satellite.stop_record_stream() assert result is False @pytest.mark.asyncio async def test_stop_record_stream_creates_valid_wav(self, source_satellite, target_satellite, temp_dir, mock_event_manager): """Testet, dass eine gültige WAV-Datei erstellt wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) wav_path = os.path.join(str(temp_dir), "valid_wav.wav") await source_satellite.start_record_stream(wav_path) # Audio-Daten simulieren (1s Stille bei 16KHz 16-bit mono) audio_chunk = b"\x00\x00" * 4096 source_satellite._stream_recording_wav.writeframes(audio_chunk) source_satellite._stream_recording_bytes += len(audio_chunk) await source_satellite.stop_record_stream() # WAV-Datei prüfen assert os.path.exists(wav_path) with wave.open(wav_path, "rb") as wf: assert wf.getnchannels() == 1 assert wf.getsampwidth() == 2 assert wf.getframerate() == 16000 @pytest.mark.asyncio async def test_stream_continues_after_record_stop(self, source_satellite, target_satellite, temp_dir): """Testet, dass der Stream nach Recording-Stop weiterläuft.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) wav_path = os.path.join(str(temp_dir), "continue.wav") await source_satellite.start_record_stream(wav_path) await source_satellite.stop_record_stream() # Stream sollte noch aktiv sein assert source_satellite.is_streaming is True assert source_satellite.is_stream_recording is False # ============================================================================= # Tests für Audio-Handler # ============================================================================= class TestStreamAudioHandler: """Tests für _on_stream_audio_received Handler.""" @pytest.mark.asyncio async def test_audio_forwarded_to_target(self, source_satellite, target_satellite): """Testet, dass Audio zum Ziel-Satellite weitergeleitet wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) # Audio-Event simulieren audio_chunk = b"\x01\x02" * 2048 event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.audio_data = audio_chunk await source_satellite._on_stream_audio_received("audio_input_received", event_data) # Prüfe, dass Audio zum Ziel geschrieben wurde target_satellite.sockets.audio_out.write.assert_called_once_with(audio_chunk) target_satellite.sockets.audio_out.drain.assert_called_once() assert source_satellite._stream_bytes_forwarded == len(audio_chunk) @pytest.mark.asyncio async def test_audio_filtered_by_satellite_id(self, source_satellite, target_satellite): """Testet, dass Audio von anderen Satellites ignoriert wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) # Event von anderem Satellite event_data = MagicMock() event_data.satellite_id = "sat-other-999" event_data.audio_data = b"\x00" * 4096 await source_satellite._on_stream_audio_received("audio_input_received", event_data) # Kein Write zum Ziel target_satellite.sockets.audio_out.write.assert_not_called() @pytest.mark.asyncio async def test_audio_ignored_when_not_streaming(self, source_satellite, target_satellite): """Testet, dass Audio ignoriert wird wenn kein Stream aktiv.""" event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.audio_data = b"\x00" * 4096 # Kein aktiver Stream await source_satellite._on_stream_audio_received("audio_input_received", event_data) # Kein Crash, kein Write assert source_satellite._stream_bytes_forwarded == 0 @pytest.mark.asyncio async def test_audio_empty_data_ignored(self, source_satellite, target_satellite): """Testet, dass leere Audio-Daten ignoriert werden.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.audio_data = b"" await source_satellite._on_stream_audio_received("audio_input_received", event_data) target_satellite.sockets.audio_out.write.assert_not_called() @pytest.mark.asyncio async def test_target_disconnect_stops_stream(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass Stream stoppt wenn Ziel-Socket weg ist.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) # Simuliere Ziel-Disconnect target_satellite.sockets.audio_out = None event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.audio_data = b"\x00" * 4096 await source_satellite._on_stream_audio_received("audio_input_received", event_data) assert source_satellite.is_streaming is False @pytest.mark.asyncio async def test_target_write_error_stops_stream(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass Stream stoppt bei Socket-Write-Fehler.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) # Simuliere Write-Fehler target_satellite.sockets.audio_out.write.side_effect = ConnectionError("Verbindung verloren") event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.audio_data = b"\x00" * 4096 await source_satellite._on_stream_audio_received("audio_input_received", event_data) assert source_satellite.is_streaming is False @pytest.mark.asyncio async def test_audio_also_written_to_recording(self, source_satellite, target_satellite, temp_dir): """Testet paralleles Schreiben auf Disk während Streaming.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) wav_path = os.path.join(str(temp_dir), "parallel.wav") await source_satellite.start_record_stream(wav_path) # Mock writeframes um Aufruf zu prüfen mock_wav = MagicMock() source_satellite._stream_recording_wav = mock_wav audio_chunk = b"\x01\x02" * 2048 event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.audio_data = audio_chunk await source_satellite._on_stream_audio_received("audio_input_received", event_data) # Audio wurde zum Ziel UND zur WAV-Datei geschrieben target_satellite.sockets.audio_out.write.assert_called_once_with(audio_chunk) mock_wav.writeframes.assert_called_once_with(audio_chunk) assert source_satellite._stream_recording_bytes == len(audio_chunk) @pytest.mark.asyncio async def test_dict_event_data_supported(self, source_satellite, target_satellite): """Testet, dass dict-basierte Event-Daten unterstützt werden.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) audio_chunk = b"\x03\x04" * 1024 event_data = { "satellite_id": "sat-source-001", "audio_data": audio_chunk, } await source_satellite._on_stream_audio_received("audio_input_received", event_data) target_satellite.sockets.audio_out.write.assert_called_once_with(audio_chunk) @pytest.mark.asyncio async def test_bytes_forwarded_accumulates(self, source_satellite, target_satellite): """Testet, dass bytes_forwarded korrekt aufsummiert wird.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) chunk1 = b"\x00" * 4096 chunk2 = b"\x01" * 2048 for chunk in (chunk1, chunk2): event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.audio_data = chunk await source_satellite._on_stream_audio_received("audio_input_received", event_data) assert source_satellite._stream_bytes_forwarded == 4096 + 2048 # ============================================================================= # Tests für Wakeword-Stop Handler # ============================================================================= class TestStreamRecordStoppedHandler: """Tests für _on_stream_record_stopped_event Handler.""" @pytest.mark.asyncio async def test_wakeword_stops_stream(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass Wakeword-Stop den Stream beendet.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.reason = "wakeword" await source_satellite._on_stream_record_stopped_event("satellite_record_stopped", event_data) assert source_satellite.is_streaming is False @pytest.mark.asyncio async def test_handler_ignores_other_satellites(self, source_satellite, target_satellite): """Testet, dass Handler Events anderer Satellites ignoriert.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) event_data = MagicMock() event_data.satellite_id = "sat-other-999" event_data.reason = "wakeword" await source_satellite._on_stream_record_stopped_event("satellite_record_stopped", event_data) # Stream sollte noch laufen assert source_satellite.is_streaming is True @pytest.mark.asyncio async def test_handler_ignores_when_not_streaming(self, source_satellite): """Testet, dass Handler ignoriert wenn kein Stream aktiv.""" event_data = MagicMock() event_data.satellite_id = "sat-source-001" event_data.reason = "wakeword" # Kein Crash await source_satellite._on_stream_record_stopped_event("satellite_record_stopped", event_data) @pytest.mark.asyncio async def test_handler_with_dict_event(self, source_satellite, target_satellite, mock_event_manager): """Testet Handler mit dict-basiertem Event.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) event_data = { "satellite_id": "sat-source-001", "reason": "wakeword", } await source_satellite._on_stream_record_stopped_event("satellite_record_stopped", event_data) assert source_satellite.is_streaming is False # ============================================================================= # Tests für Timeout-Handler # ============================================================================= class TestStreamTimeout: """Tests für _on_stream_timeout Handler.""" @pytest.mark.asyncio async def test_timeout_stops_stream(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass Timeout den Stream beendet.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) await source_satellite._on_stream_timeout() assert source_satellite.is_streaming is False @pytest.mark.asyncio async def test_timeout_reason_in_event(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass der Timeout-Grund im Event steht.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) mock_event_manager.trigger.reset_mock() await source_satellite._on_stream_timeout() event_data = mock_event_manager.trigger.call_args[0][1] assert event_data.reason == "timeout" @pytest.mark.asyncio async def test_timeout_ignored_when_not_streaming(self, source_satellite): """Testet, dass Timeout ignoriert wird wenn kein Stream aktiv.""" # Kein Crash await source_satellite._on_stream_timeout() assert source_satellite.is_streaming is False # ============================================================================= # Tests für Disconnect-Guard # ============================================================================= class TestDisconnectGuard: """Tests für Streaming-Bereinigung bei Disconnect.""" @pytest.mark.asyncio async def test_disconnect_stops_stream(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass disconnect() aktiven Stream stoppt.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) await source_satellite.disconnect("test") assert source_satellite.is_streaming is False assert source_satellite.state == ConnectionState.DISCONNECTED # ============================================================================= # Tests für Mutual Exclusion (Recording blockiert bei Stream) # ============================================================================= class TestMutualExclusion: """Tests für gegenseitigen Ausschluss von Recording und Streaming.""" @pytest.mark.asyncio async def test_recording_blocked_during_stream(self, source_satellite, target_satellite): """Testet, dass start_recording bei aktivem Stream fehlschlägt.""" source_satellite.send_command = AsyncMock(return_value=True) await source_satellite.start_stream(target_satellite) result = await source_satellite.start_recording(timeout=60.0) assert result is False assert source_satellite.is_streaming is True @pytest.mark.asyncio async def test_stream_stops_recording(self, source_satellite, target_satellite, mock_event_manager): """Testet, dass start_stream laufende Aufnahme stoppt.""" source_satellite.send_command = AsyncMock(return_value=True) # Recording starten await source_satellite.start_recording(timeout=60.0) assert source_satellite.is_recording is True # Stream starten — Recording sollte gestoppt werden result = await source_satellite.start_stream(target_satellite) assert result is True assert source_satellite.is_streaming is True assert source_satellite.is_recording is False # ============================================================================= # Integration Tests mit echtem EventManager # ============================================================================= class TestStreamingIntegration: """Integrationstests mit echtem EventManager.""" @pytest.mark.asyncio async def test_full_stream_flow(self): """Testet den vollständigen Stream-Flow mit echtem EventManager.""" from trixy_core.events.eventmanager import EventManager app = MagicMock() event_manager = EventManager(app) app.events = event_manager # Zwei Satellites erstellen source = Satellite( satellite_id="int-source", alias="Quelle", application=app, ) source.set_connected() source.send_command = AsyncMock(return_value=True) target = Satellite( satellite_id="int-target", alias="Ziel", application=app, ) target.set_connected() mock_writer = MagicMock() mock_writer.write = MagicMock() mock_writer.drain = AsyncMock() target.sockets.audio_out = mock_writer # Events sammeln stream_events = [] async def on_stream_started(event_name, data): stream_events.append(("started", data)) async def on_stream_stopped(event_name, data): stream_events.append(("stopped", data)) event_manager.register("satellite_stream_started", on_stream_started) event_manager.register("satellite_stream_stopped", on_stream_stopped) # Stream starten result = await source.start_stream(target) assert result is True assert len(stream_events) == 1 assert stream_events[0][0] == "started" # Stream stoppen await source.stop_stream(reason="test_complete") assert len(stream_events) == 2 assert stream_events[1][0] == "stopped" assert stream_events[1][1].reason == "test_complete" @pytest.mark.asyncio async def test_full_stream_with_recording_flow(self, temp_dir): """Testet Stream + Recording Flow mit echtem EventManager.""" from trixy_core.events.eventmanager import EventManager app = MagicMock() event_manager = EventManager(app) app.events = event_manager source = Satellite( satellite_id="rec-source", alias="Quelle", application=app, ) source.set_connected() source.send_command = AsyncMock(return_value=True) target = Satellite( satellite_id="rec-target", alias="Ziel", application=app, ) target.set_connected() mock_writer = MagicMock() mock_writer.write = MagicMock() mock_writer.drain = AsyncMock() target.sockets.audio_out = mock_writer # Events sammeln record_events = [] async def on_record_started(event_name, data): record_events.append(("started", data)) async def on_record_stopped(event_name, data): record_events.append(("stopped", data)) event_manager.register("satellite_stream_record_started", on_record_started) event_manager.register("satellite_stream_record_stopped", on_record_stopped) # Stream starten await source.start_stream(target) # Recording starten wav_path = os.path.join(str(temp_dir), "integration.wav") await source.start_record_stream(wav_path) assert len(record_events) == 1 assert record_events[0][0] == "started" # Recording stoppen (Stream läuft weiter) await source.stop_record_stream() assert len(record_events) == 2 assert record_events[1][0] == "stopped" assert source.is_streaming is True # Stream stoppen await source.stop_stream() assert source.is_streaming is False @pytest.mark.asyncio async def test_audio_forwarding_integration(self): """Testet die Audio-Weiterleitung mit echtem EventManager.""" from trixy_core.events.eventmanager import EventManager app = MagicMock() event_manager = EventManager(app) app.events = event_manager source = Satellite( satellite_id="fwd-source", alias="Quelle", application=app, ) source.set_connected() source.send_command = AsyncMock(return_value=True) target = Satellite( satellite_id="fwd-target", alias="Ziel", application=app, ) target.set_connected() mock_writer = MagicMock() mock_writer.write = MagicMock() mock_writer.drain = AsyncMock() target.sockets.audio_out = mock_writer # Stream starten await source.start_stream(target) # Audio-Event simulieren via EventManager (mit EventData-Objekt) from trixy_core.events.event_data import RawAudioInputReceived audio_chunk = b"\xAB\xCD" * 2048 await event_manager.trigger("audio_input_received", RawAudioInputReceived( satellite_id="fwd-source", audio_data=audio_chunk, )) # Audio sollte zum Ziel weitergeleitet worden sein mock_writer.write.assert_called_once_with(audio_chunk) await source.stop_stream()