| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- # -*- coding: utf-8 -*-
- """
- PulseAudio / PipeWire Multi-Mic-Mix Wrapper.
- Spiegelt `PulseCombinedSink` fuer die Input-Seite: mehrere
- Mikrofone werden zu einer einzigen virtuellen Quelle gemischt,
- sodass die Wakeword-Detection und das Server-Streaming
- unveraendert mit EINER Quelle arbeiten koennen.
- Technik:
- 1. `module-null-sink sink_name=trixy_mics_combined`
- → erzeugt einen virtuellen Sink mit angehaengtem
- `.monitor` Source.
- 2. Pro Mikrofon ein `module-loopback source=<mic> sink=trixy_mics_combined`
- → leitet das Mic-Signal in den Null-Sink.
- 3. Default-Source wird auf `trixy_mics_combined.monitor`
- gesetzt, sodass alle Aufnehmer den Mix sehen.
- 4. Vorheriger Default-Source wird beim Teardown wiederhergestellt.
- Warum kein `module-combine-source`? Den gibt es in Pulse nicht
- (nur `module-combine-sink` fuer Output). Der null-sink+loopback
- Trick ist der etablierte Workaround.
- Einschraenkungen: wie PulseCombinedSink. Nur verfuegbar wo
- `pactl` antwortet (Linux mit Pulse/PipeWire, WSL2 mit WSLg).
- """
- from __future__ import annotations
- import asyncio
- import os
- import shutil
- from typing import NamedTuple
- from trixy_core.utils.debug import pinfo, pdebug, perror
- class PulseSource(NamedTuple):
- """Ein einzelner PulseAudio/PipeWire Input-Source (Mikrofon)."""
- index: int
- name: str
- description: str
- state: str
- monitor_of: str # falls es ein Monitor-Source ist, sonst ""
- class PulseMicrophoneMix:
- """Verwaltet den Multi-Mic-Mix ueber pactl."""
- DEFAULT_SINK_NAME = "trixy_mics_combined"
- def __init__(self, sink_name: str = DEFAULT_SINK_NAME) -> None:
- self._sink_name = sink_name
- self._null_sink_module: int | None = None
- self._loopback_modules: list[int] = []
- self._sources: list[str] = []
- self._available: bool | None = None
- self._previous_default: str | None = None
- @property
- def sink_name(self) -> str:
- return self._sink_name
- @property
- def monitor_source_name(self) -> str:
- """Name des resultierenden virtuellen Source-Namens."""
- return f"{self._sink_name}.monitor"
- @property
- def sources(self) -> list[str]:
- return list(self._sources)
- @property
- def active(self) -> bool:
- return self._null_sink_module is not None
- async def available(self) -> bool:
- """True wenn pactl + Pulse/PipeWire-Server erreichbar sind."""
- if self._available is not None:
- return self._available
- if not shutil.which("pactl"):
- pdebug("pactl nicht gefunden — Mic-Mix deaktiviert")
- self._available = False
- return False
- rc, _ = await self._run("info")
- self._available = rc == 0
- return self._available
- # === Source-Listing ===
- async def list_sources(self, include_monitors: bool = False) -> list[PulseSource]:
- """Listet alle realen Input-Sources (ohne unseren eigenen Monitor).
- Args:
- include_monitors: Wenn False (Default), werden .monitor-Sources
- anderer Sinks herausgefiltert — wir wollen nur
- echte Mikrofone.
- """
- if not await self.available():
- return []
- rc, out = await self._run("list", "sources")
- if rc != 0:
- return []
- all_sources = self._parse_sources(out)
- result: list[PulseSource] = []
- for s in all_sources:
- # Eigenen Monitor ausblenden
- if s.name == self.monitor_source_name:
- continue
- # Monitor-Sources optional ausblenden
- if not include_monitors and s.monitor_of:
- continue
- result.append(s)
- return result
- @staticmethod
- def _parse_sources(text: str) -> list[PulseSource]:
- """Parst die Ausgabe von `pactl list sources`."""
- sources: list[PulseSource] = []
- current: dict = {}
- def flush() -> None:
- if not current:
- return
- try:
- sources.append(PulseSource(
- index=int(current.get("index", -1)),
- name=current.get("name", ""),
- description=current.get("description", current.get("name", "")),
- state=current.get("state", ""),
- monitor_of=current.get("monitor_of", ""),
- ))
- except Exception:
- pass
- current.clear()
- for raw in text.splitlines():
- line = raw.rstrip()
- if line.startswith("Source #"):
- flush()
- try:
- current["index"] = int(line.split("#", 1)[1])
- except Exception:
- pass
- continue
- stripped = line.lstrip()
- if stripped.startswith("Name:"):
- current["name"] = stripped.split(":", 1)[1].strip()
- elif stripped.startswith("Description:"):
- current["description"] = stripped.split(":", 1)[1].strip()
- elif stripped.startswith("State:"):
- current["state"] = stripped.split(":", 1)[1].strip()
- elif stripped.startswith("Monitor of Sink:"):
- value = stripped.split(":", 1)[1].strip()
- # Pulse/PipeWire setzt "n/a" fuer Nicht-Monitor-Sources
- current["monitor_of"] = "" if value.lower() in ("n/a", "") else value
- flush()
- return sources
- # === Mix-Lifecycle ===
- async def find_existing_null_sink(self) -> int | None:
- """Sucht ein bereits geladenes module-null-sink mit unserem Namen."""
- rc, out = await self._run("list", "modules", "short")
- if rc != 0:
- return None
- for line in out.splitlines():
- parts = line.split("\t")
- if len(parts) < 2:
- continue
- try:
- idx = int(parts[0])
- except ValueError:
- continue
- module = parts[1]
- args = parts[2] if len(parts) >= 3 else ""
- if module == "module-null-sink" and f"sink_name={self._sink_name}" in args:
- return idx
- return None
- async def _find_our_loopbacks(self) -> list[int]:
- """Sucht alle loopback-Module die auf unseren Null-Sink zeigen."""
- rc, out = await self._run("list", "modules", "short")
- if rc != 0:
- return []
- result: list[int] = []
- for line in out.splitlines():
- parts = line.split("\t")
- if len(parts) < 2:
- continue
- if parts[1] != "module-loopback":
- continue
- try:
- idx = int(parts[0])
- except ValueError:
- continue
- args = parts[2] if len(parts) >= 3 else ""
- if f"sink={self._sink_name}" in args:
- result.append(idx)
- return result
- async def setup(self, sources: list[str]) -> bool:
- """Erstellt den Mic-Mix mit den gewaehlten realen Sources.
- Args:
- sources: Liste der Source-Namen die gemischt werden sollen.
- Returns:
- True bei Erfolg, False sonst.
- """
- if not await self.available():
- return False
- if not sources:
- await self.teardown()
- return False
- if len(sources) == 1:
- # Ein einzelner Mic braucht keinen Mix — Caller soll
- # den Mic direkt als Default nutzen.
- await self.teardown()
- return False
- # Aufraeumen falls etwas altes noch haengt
- await self.teardown()
- # 1. Null-Sink erstellen
- rc, out = await self._run(
- "load-module",
- "module-null-sink",
- f"sink_name={self._sink_name}",
- "sink_properties=device.description=Trixy_Multi_Mic_Mix",
- )
- if rc != 0:
- perror(f"Null-Sink konnte nicht erstellt werden: {out.strip()}")
- return False
- try:
- self._null_sink_module = int(out.strip().splitlines()[-1])
- except Exception:
- self._null_sink_module = await self.find_existing_null_sink()
- # 2. Pro Mic ein Loopback in den Null-Sink
- self._loopback_modules = []
- self._sources = []
- for source in sources:
- rc, out = await self._run(
- "load-module",
- "module-loopback",
- f"source={source}",
- f"sink={self._sink_name}",
- "latency_msec=30",
- "source_dont_move=true",
- "sink_dont_move=true",
- )
- if rc != 0:
- pdebug(f"Loopback fuer '{source}' fehlgeschlagen: {out.strip()}")
- continue
- try:
- idx = int(out.strip().splitlines()[-1])
- self._loopback_modules.append(idx)
- self._sources.append(source)
- except Exception:
- pass
- if not self._loopback_modules:
- # Kein einziger Loopback erfolgreich — Null-Sink wieder abbauen
- await self.teardown()
- return False
- pinfo(
- f"Mic-Mix aktiv: {len(self._loopback_modules)} Mikrofone → "
- f"'{self.monitor_source_name}' (Null-Sink #{self._null_sink_module})"
- )
- return True
- async def teardown(self) -> None:
- """Baut den Mix wieder ab und stellt den alten Default-Source wieder her."""
- if not await self.available():
- return
- # Vorherigen Default zuerst wiederherstellen, sonst ist er
- # kurz undefiniert wenn wir unseren .monitor-Source entladen.
- await self.restore_default_source()
- # Alle unsere Loopbacks entladen
- loopbacks = list(self._loopback_modules)
- if not loopbacks:
- # Keiner im Cache? Alle loopbacks suchen die auf uns zeigen.
- loopbacks = await self._find_our_loopbacks()
- for idx in loopbacks:
- await self._run("unload-module", str(idx))
- self._loopback_modules = []
- # Null-Sink entladen
- sink_idx = self._null_sink_module
- if sink_idx is None:
- sink_idx = await self.find_existing_null_sink()
- if sink_idx is not None:
- rc, _ = await self._run("unload-module", str(sink_idx))
- if rc == 0:
- pdebug(f"Mic-Mix Null-Sink #{sink_idx} entladen")
- self._null_sink_module = None
- self._sources = []
- # === Default-Source-Verwaltung ===
- async def get_default_source(self) -> str:
- """Liest den aktuellen System-Default-Source-Namen."""
- if not await self.available():
- return ""
- rc, out = await self._run("get-default-source")
- if rc != 0:
- return ""
- return out.strip().splitlines()[0] if out.strip() else ""
- async def set_default_source(self, source_name: str) -> bool:
- """Setzt den System-Default-Source."""
- if not await self.available():
- return False
- rc, _ = await self._run("set-default-source", source_name)
- return rc == 0
- async def make_default(self) -> bool:
- """Macht unseren Monitor zum System-Default-Source und merkt sich den vorherigen."""
- if self._null_sink_module is None:
- return False
- current = await self.get_default_source()
- if current and current != self.monitor_source_name:
- self._previous_default = current
- ok = await self.set_default_source(self.monitor_source_name)
- if ok:
- pinfo(
- f"System-Default-Source → '{self.monitor_source_name}' "
- f"(vorher: '{self._previous_default or '?'}')"
- )
- return ok
- async def restore_default_source(self) -> None:
- """Stellt den vorherigen Default-Source wieder her."""
- if not self._previous_default:
- return
- if not await self.available():
- return
- current = await self.get_default_source()
- if current == self.monitor_source_name:
- ok = await self.set_default_source(self._previous_default)
- if ok:
- pdebug(f"System-Default-Source wiederhergestellt → '{self._previous_default}'")
- self._previous_default = None
- # === Hilfsmethoden ===
- async def _run(self, *args: str) -> tuple[int, str]:
- """pactl mit LC_ALL=C damit die Ausgabe sprachunabhaengig ist."""
- env = os.environ.copy()
- env["LC_ALL"] = "C"
- env["LANG"] = "C"
- try:
- proc = await asyncio.create_subprocess_exec(
- "pactl", *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
- env=env,
- )
- stdout, _ = await proc.communicate()
- return proc.returncode or 0, stdout.decode(errors="replace")
- except FileNotFoundError:
- return 127, ""
- except Exception as e:
- perror(f"pactl Fehler: {e}")
- return 1, str(e)
|