# -*- coding: utf-8 -*- """ PulseAudio / PipeWire Multi-Mic-Mix Wrapper. Spiegelt `PulseCombinedSink` fuer die Input-Seite: mehrere Mikrofone werden zu einer einzigen virtuellen Quelle gemischt, sodass die Wakeword-Detection und das Server-Streaming unveraendert mit EINER Quelle arbeiten koennen. Technik: 1. `module-null-sink sink_name=trixy_mics_combined` → erzeugt einen virtuellen Sink mit angehaengtem `.monitor` Source. 2. Pro Mikrofon ein `module-loopback source= sink=trixy_mics_combined` → leitet das Mic-Signal in den Null-Sink. 3. Default-Source wird auf `trixy_mics_combined.monitor` gesetzt, sodass alle Aufnehmer den Mix sehen. 4. Vorheriger Default-Source wird beim Teardown wiederhergestellt. Warum kein `module-combine-source`? Den gibt es in Pulse nicht (nur `module-combine-sink` fuer Output). Der null-sink+loopback Trick ist der etablierte Workaround. Einschraenkungen: wie PulseCombinedSink. Nur verfuegbar wo `pactl` antwortet (Linux mit Pulse/PipeWire, WSL2 mit WSLg). """ from __future__ import annotations import asyncio import os import shutil from typing import NamedTuple from trixy_core.utils.debug import pinfo, pdebug, perror class PulseSource(NamedTuple): """Ein einzelner PulseAudio/PipeWire Input-Source (Mikrofon).""" index: int name: str description: str state: str monitor_of: str # falls es ein Monitor-Source ist, sonst "" class PulseMicrophoneMix: """Verwaltet den Multi-Mic-Mix ueber pactl.""" DEFAULT_SINK_NAME = "trixy_mics_combined" def __init__(self, sink_name: str = DEFAULT_SINK_NAME) -> None: self._sink_name = sink_name self._null_sink_module: int | None = None self._loopback_modules: list[int] = [] self._sources: list[str] = [] self._available: bool | None = None self._previous_default: str | None = None @property def sink_name(self) -> str: return self._sink_name @property def monitor_source_name(self) -> str: """Name des resultierenden virtuellen Source-Namens.""" return f"{self._sink_name}.monitor" @property def sources(self) -> list[str]: return list(self._sources) @property def active(self) -> bool: return self._null_sink_module is not None async def available(self) -> bool: """True wenn pactl + Pulse/PipeWire-Server erreichbar sind.""" if self._available is not None: return self._available if not shutil.which("pactl"): pdebug("pactl nicht gefunden — Mic-Mix deaktiviert") self._available = False return False rc, _ = await self._run("info") self._available = rc == 0 return self._available # === Source-Listing === async def list_sources(self, include_monitors: bool = False) -> list[PulseSource]: """Listet alle realen Input-Sources (ohne unseren eigenen Monitor). Args: include_monitors: Wenn False (Default), werden .monitor-Sources anderer Sinks herausgefiltert — wir wollen nur echte Mikrofone. """ if not await self.available(): return [] rc, out = await self._run("list", "sources") if rc != 0: return [] all_sources = self._parse_sources(out) result: list[PulseSource] = [] for s in all_sources: # Eigenen Monitor ausblenden if s.name == self.monitor_source_name: continue # Monitor-Sources optional ausblenden if not include_monitors and s.monitor_of: continue result.append(s) return result @staticmethod def _parse_sources(text: str) -> list[PulseSource]: """Parst die Ausgabe von `pactl list sources`.""" sources: list[PulseSource] = [] current: dict = {} def flush() -> None: if not current: return try: sources.append(PulseSource( index=int(current.get("index", -1)), name=current.get("name", ""), description=current.get("description", current.get("name", "")), state=current.get("state", ""), monitor_of=current.get("monitor_of", ""), )) except Exception: pass current.clear() for raw in text.splitlines(): line = raw.rstrip() if line.startswith("Source #"): flush() try: current["index"] = int(line.split("#", 1)[1]) except Exception: pass continue stripped = line.lstrip() if stripped.startswith("Name:"): current["name"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("Description:"): current["description"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("State:"): current["state"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("Monitor of Sink:"): value = stripped.split(":", 1)[1].strip() # Pulse/PipeWire setzt "n/a" fuer Nicht-Monitor-Sources current["monitor_of"] = "" if value.lower() in ("n/a", "") else value flush() return sources # === Mix-Lifecycle === async def find_existing_null_sink(self) -> int | None: """Sucht ein bereits geladenes module-null-sink mit unserem Namen.""" rc, out = await self._run("list", "modules", "short") if rc != 0: return None for line in out.splitlines(): parts = line.split("\t") if len(parts) < 2: continue try: idx = int(parts[0]) except ValueError: continue module = parts[1] args = parts[2] if len(parts) >= 3 else "" if module == "module-null-sink" and f"sink_name={self._sink_name}" in args: return idx return None async def _find_our_loopbacks(self) -> list[int]: """Sucht alle loopback-Module die auf unseren Null-Sink zeigen.""" rc, out = await self._run("list", "modules", "short") if rc != 0: return [] result: list[int] = [] for line in out.splitlines(): parts = line.split("\t") if len(parts) < 2: continue if parts[1] != "module-loopback": continue try: idx = int(parts[0]) except ValueError: continue args = parts[2] if len(parts) >= 3 else "" if f"sink={self._sink_name}" in args: result.append(idx) return result async def setup(self, sources: list[str]) -> bool: """Erstellt den Mic-Mix mit den gewaehlten realen Sources. Args: sources: Liste der Source-Namen die gemischt werden sollen. Returns: True bei Erfolg, False sonst. """ if not await self.available(): return False if not sources: await self.teardown() return False if len(sources) == 1: # Ein einzelner Mic braucht keinen Mix — Caller soll # den Mic direkt als Default nutzen. await self.teardown() return False # Aufraeumen falls etwas altes noch haengt await self.teardown() # 1. Null-Sink erstellen rc, out = await self._run( "load-module", "module-null-sink", f"sink_name={self._sink_name}", "sink_properties=device.description=Trixy_Multi_Mic_Mix", ) if rc != 0: perror(f"Null-Sink konnte nicht erstellt werden: {out.strip()}") return False try: self._null_sink_module = int(out.strip().splitlines()[-1]) except Exception: self._null_sink_module = await self.find_existing_null_sink() # 2. Pro Mic ein Loopback in den Null-Sink self._loopback_modules = [] self._sources = [] for source in sources: rc, out = await self._run( "load-module", "module-loopback", f"source={source}", f"sink={self._sink_name}", "latency_msec=30", "source_dont_move=true", "sink_dont_move=true", ) if rc != 0: pdebug(f"Loopback fuer '{source}' fehlgeschlagen: {out.strip()}") continue try: idx = int(out.strip().splitlines()[-1]) self._loopback_modules.append(idx) self._sources.append(source) except Exception: pass if not self._loopback_modules: # Kein einziger Loopback erfolgreich — Null-Sink wieder abbauen await self.teardown() return False pinfo( f"Mic-Mix aktiv: {len(self._loopback_modules)} Mikrofone → " f"'{self.monitor_source_name}' (Null-Sink #{self._null_sink_module})" ) return True async def teardown(self) -> None: """Baut den Mix wieder ab und stellt den alten Default-Source wieder her.""" if not await self.available(): return # Vorherigen Default zuerst wiederherstellen, sonst ist er # kurz undefiniert wenn wir unseren .monitor-Source entladen. await self.restore_default_source() # Alle unsere Loopbacks entladen loopbacks = list(self._loopback_modules) if not loopbacks: # Keiner im Cache? Alle loopbacks suchen die auf uns zeigen. loopbacks = await self._find_our_loopbacks() for idx in loopbacks: await self._run("unload-module", str(idx)) self._loopback_modules = [] # Null-Sink entladen sink_idx = self._null_sink_module if sink_idx is None: sink_idx = await self.find_existing_null_sink() if sink_idx is not None: rc, _ = await self._run("unload-module", str(sink_idx)) if rc == 0: pdebug(f"Mic-Mix Null-Sink #{sink_idx} entladen") self._null_sink_module = None self._sources = [] # === Default-Source-Verwaltung === async def get_default_source(self) -> str: """Liest den aktuellen System-Default-Source-Namen.""" if not await self.available(): return "" rc, out = await self._run("get-default-source") if rc != 0: return "" return out.strip().splitlines()[0] if out.strip() else "" async def set_default_source(self, source_name: str) -> bool: """Setzt den System-Default-Source.""" if not await self.available(): return False rc, _ = await self._run("set-default-source", source_name) return rc == 0 async def make_default(self) -> bool: """Macht unseren Monitor zum System-Default-Source und merkt sich den vorherigen.""" if self._null_sink_module is None: return False current = await self.get_default_source() if current and current != self.monitor_source_name: self._previous_default = current ok = await self.set_default_source(self.monitor_source_name) if ok: pinfo( f"System-Default-Source → '{self.monitor_source_name}' " f"(vorher: '{self._previous_default or '?'}')" ) return ok async def restore_default_source(self) -> None: """Stellt den vorherigen Default-Source wieder her.""" if not self._previous_default: return if not await self.available(): return current = await self.get_default_source() if current == self.monitor_source_name: ok = await self.set_default_source(self._previous_default) if ok: pdebug(f"System-Default-Source wiederhergestellt → '{self._previous_default}'") self._previous_default = None # === Hilfsmethoden === async def _run(self, *args: str) -> tuple[int, str]: """pactl mit LC_ALL=C damit die Ausgabe sprachunabhaengig ist.""" env = os.environ.copy() env["LC_ALL"] = "C" env["LANG"] = "C" try: proc = await asyncio.create_subprocess_exec( "pactl", *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env, ) stdout, _ = await proc.communicate() return proc.returncode or 0, stdout.decode(errors="replace") except FileNotFoundError: return 127, "" except Exception as e: perror(f"pactl Fehler: {e}") return 1, str(e)