| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- # -*- coding: utf-8 -*-
- """
- DeepSpeech STT Plugin - Speech-to-Text mit Mozilla DeepSpeech.
- Hinweis: Mozilla DeepSpeech wird nicht mehr aktiv entwickelt.
- Dieses Plugin unterstützt vorhandene DeepSpeech-Modelle.
- Unterstützt:
- - Offline-Erkennung
- - Vortrainierte Modelle
- - Streaming
- - Auto-Download von Modellen
- """
- import asyncio
- import time
- from pathlib import Path
- from typing import Any, AsyncIterator
- from trixy_core.plugins import TrixyPlugin
- from trixy_core.audio.stt import STTProvider, STTConfig, STTResult, STTState, WordTiming
- # DeepSpeech Modell-Katalog
- DEEPSPEECH_MODELS = {
- "de": {
- "name": "deepspeech-0.9.3-models-de",
- "model_url": "https://github.com/AASHISHAG/deepspeech-german/releases/download/v0.9.0/output_graph.pbmm",
- "scorer_url": "https://github.com/AASHISHAG/deepspeech-german/releases/download/v0.9.0/kenlm.scorer",
- "model_file": "deepspeech-de.pbmm",
- "scorer_file": "deepspeech-de.scorer",
- "size": "~1.8 GB",
- "description": "Deutsches Modell (Community)",
- },
- "en": {
- "name": "deepspeech-0.9.3-models",
- "model_url": "https://github.com/mozilla/DeepSpeech/releases/download/v0.9.3/deepspeech-0.9.3-models.pbmm",
- "scorer_url": "https://github.com/mozilla/DeepSpeech/releases/download/v0.9.3/deepspeech-0.9.3-models.scorer",
- "model_file": "deepspeech-0.9.3-models.pbmm",
- "scorer_file": "deepspeech-0.9.3-models.scorer",
- "size": "~1.0 GB",
- "description": "Offizielles englisches Modell",
- },
- }
- class DeepSpeechSTTProvider(STTProvider):
- """DeepSpeech-basierter STT-Provider."""
- def __init__(
- self,
- config: STTConfig | None = None,
- models_dir: Path | None = None,
- model_name: str = "de",
- auto_download: bool = True,
- event_manager: Any = None,
- ):
- super().__init__(config)
- self._models_dir = models_dir
- self._model_name = model_name
- self._auto_download = auto_download
- self._event_manager = event_manager
- self._model = None
- self._model_path: Path | None = None
- self._scorer_path: Path | None = None
- @property
- def name(self) -> str:
- return "deepspeech"
- @property
- def supported_languages(self) -> list[str]:
- return ["de-DE", "en-US", "en-GB"]
- @property
- def supports_streaming(self) -> bool:
- return True
- @property
- def supports_word_timings(self) -> bool:
- return True
- async def initialize(self) -> None:
- """Lädt das DeepSpeech-Modell."""
- try:
- from deepspeech import Model
- from trixy_core.utils.debug import pinfo, pwarn
- pwarn("DeepSpeech wird nicht mehr aktiv entwickelt. "
- "Erwäge Whisper oder Vosk als Alternative.")
- self._state = STTState.PROCESSING
- # Modell-Info
- if self._model_name not in DEEPSPEECH_MODELS:
- raise RuntimeError(
- f"Unbekanntes Modell: {self._model_name}. "
- f"Verfügbar: {', '.join(DEEPSPEECH_MODELS.keys())}"
- )
- model_info = DEEPSPEECH_MODELS[self._model_name]
- self._model_path = self._models_dir / model_info["model_file"]
- self._scorer_path = self._models_dir / model_info["scorer_file"]
- # Auto-Download wenn nicht vorhanden
- if not self._model_path.exists():
- if self._auto_download:
- await self._download_model()
- else:
- raise RuntimeError(
- f"DeepSpeech-Modell nicht gefunden: {self._model_path}. "
- "Auto-Download ist deaktiviert."
- )
- # Modell laden (in Thread)
- loop = asyncio.get_event_loop()
- def load_model():
- model = Model(str(self._model_path))
- if self._scorer_path.exists():
- model.enableExternalScorer(str(self._scorer_path))
- return model
- pinfo(f"Lade DeepSpeech-Modell: {self._model_name}...")
- self._model = await loop.run_in_executor(None, load_model)
- self._model_loaded = True
- self._state = STTState.READY
- pinfo(f"DeepSpeech-Modell geladen: {self._model_name}")
- except ImportError:
- raise RuntimeError(
- "DeepSpeech nicht installiert. "
- "Installieren mit: pip install deepspeech"
- )
- except Exception as e:
- self._state = STTState.ERROR
- raise RuntimeError(f"Fehler beim Laden des DeepSpeech-Modells: {e}")
- async def _download_model(self) -> None:
- """Lädt das Modell herunter."""
- from trixy_core.utils.debug import pinfo
- from trixy_core.utils.download import download_file
- model_info = DEEPSPEECH_MODELS[self._model_name]
- self._models_dir.mkdir(parents=True, exist_ok=True)
- pinfo(f"Lade DeepSpeech-Modell: {self._model_name} ({model_info['size']})...")
- # Progressbar wird automatisch von DownloadProgress angezeigt
- success = await download_file(
- url=model_info["model_url"],
- dest_path=self._model_path,
- event_manager=self._event_manager,
- download_id=f"deepspeech-{self._model_name}-model",
- )
- if not success:
- raise RuntimeError(f"Download des Modells fehlgeschlagen")
- # Scorer herunterladen (optional)
- pinfo("Lade Scorer...")
- await download_file(
- url=model_info["scorer_url"],
- dest_path=self._scorer_path,
- event_manager=self._event_manager,
- download_id=f"deepspeech-{self._model_name}-scorer",
- )
- pinfo(f"DeepSpeech-Modell heruntergeladen: {self._model_name}")
- async def shutdown(self) -> None:
- """Gibt Ressourcen frei."""
- self._model = None
- self._model_loaded = False
- self._state = STTState.UNINITIALIZED
- async def transcribe(
- self,
- audio_data: bytes,
- language: str | None = None,
- ) -> STTResult:
- """Transkribiert Audio mit DeepSpeech."""
- if not self._model:
- raise RuntimeError("DeepSpeech-Modell nicht geladen")
- self._state = STTState.PROCESSING
- start_time = time.time()
- try:
- import numpy as np
- # Audio-Daten zu numpy array (16-bit signed int)
- audio_np = np.frombuffer(audio_data, dtype=np.int16)
- # Transkription in Thread
- loop = asyncio.get_event_loop()
- def do_transcribe():
- return self._model.sttWithMetadata(audio_np, 1)
- metadata = await loop.run_in_executor(None, do_transcribe)
- processing_time = (time.time() - start_time) * 1000
- # Ergebnis extrahieren
- text = ""
- word_timings = []
- confidence = 0.0
- if metadata.transcripts:
- transcript = metadata.transcripts[0]
- confidence = transcript.confidence
- # Text und Timings aus Tokens
- current_word = ""
- word_start = 0.0
- for token in transcript.tokens:
- if token.text == " ":
- if current_word:
- word_timings.append(WordTiming(
- word=current_word,
- start_time=word_start,
- end_time=token.start_time,
- confidence=1.0,
- ))
- current_word = ""
- else:
- if not current_word:
- word_start = token.start_time
- current_word += token.text
- # Letztes Wort
- if current_word:
- word_timings.append(WordTiming(
- word=current_word,
- start_time=word_start,
- end_time=token.start_time + 0.1,
- confidence=1.0,
- ))
- text = " ".join(w.word for w in word_timings)
- self._state = STTState.READY
- return STTResult(
- text=text.strip(),
- confidence=confidence,
- language=language or self._config.language,
- duration_seconds=self.get_audio_duration(audio_data),
- processing_time_ms=processing_time,
- word_timings=word_timings,
- provider="deepspeech",
- model=f"deepspeech-{self._model_name}",
- raw_result=metadata,
- )
- except Exception as e:
- self._state = STTState.ERROR
- raise RuntimeError(f"DeepSpeech-Transkription fehlgeschlagen: {e}")
- async def transcribe_stream(
- self,
- audio_stream: AsyncIterator[bytes],
- language: str | None = None,
- ) -> AsyncIterator[STTResult]:
- """Transkribiert Audio-Stream mit DeepSpeech."""
- if not self._model:
- raise RuntimeError("DeepSpeech-Modell nicht geladen")
- import numpy as np
- # Stream-Kontext erstellen
- stream = self._model.createStream()
- start_time = time.time()
- all_audio = b""
- async for chunk in audio_stream:
- all_audio += chunk
- # Chunk verarbeiten
- audio_np = np.frombuffer(chunk, dtype=np.int16)
- stream.feedAudioContent(audio_np)
- # Intermediate result
- text = stream.intermediateDecode()
- if text:
- yield STTResult(
- text=text.strip(),
- confidence=0.5, # Intermediate hat keine Confidence
- language=language or self._config.language,
- is_final=False,
- provider="deepspeech",
- )
- # Final result
- metadata = stream.finishStreamWithMetadata(1)
- processing_time = (time.time() - start_time) * 1000
- text = ""
- confidence = 0.0
- word_timings = []
- if metadata.transcripts:
- transcript = metadata.transcripts[0]
- confidence = transcript.confidence
- current_word = ""
- word_start = 0.0
- for token in transcript.tokens:
- if token.text == " ":
- if current_word:
- word_timings.append(WordTiming(
- word=current_word,
- start_time=word_start,
- end_time=token.start_time,
- confidence=1.0,
- ))
- current_word = ""
- else:
- if not current_word:
- word_start = token.start_time
- current_word += token.text
- if current_word:
- word_timings.append(WordTiming(
- word=current_word,
- start_time=word_start,
- end_time=token.start_time + 0.1,
- confidence=1.0,
- ))
- text = " ".join(w.word for w in word_timings)
- yield STTResult(
- text=text.strip(),
- confidence=confidence,
- language=language or self._config.language,
- duration_seconds=self.get_audio_duration(all_audio),
- processing_time_ms=processing_time,
- word_timings=word_timings,
- is_final=True,
- provider="deepspeech",
- raw_result=metadata,
- )
- class DeepSpeechSTTPlugin(TrixyPlugin):
- """DeepSpeech STT Plugin für Trixy."""
- def __init__(self, application, plugin_path, config: dict | None = None):
- super().__init__(application, plugin_path, config)
- self._provider: DeepSpeechSTTProvider | None = None
- async def on_load(self) -> None:
- """Plugin wird geladen."""
- from trixy_core.utils.debug import pinfo, pwarn
- pinfo("DeepSpeech STT Plugin: Lade...")
- # Konfiguration
- model_name = self.config.get("model", "de")
- language = self.config.get("language", "de-DE")
- auto_download = self.config.get("auto_download", True)
- # Models-Verzeichnis im Plugin-Ordner
- models_dir = self.plugin_path / "models"
- models_dir.mkdir(parents=True, exist_ok=True)
- stt_config = STTConfig(
- language=language,
- )
- # Provider erstellen
- # Provider erstellen (mit EventManager für Download-Events)
- self._provider = DeepSpeechSTTProvider(
- stt_config,
- models_dir=models_dir,
- model_name=model_name,
- auto_download=auto_download,
- event_manager=self.application.events,
- )
- # Modell laden (und ggf. downloaden)
- await self._provider.initialize()
- # Extension registrieren
- if hasattr(self.application, "extension_points"):
- ext_point = self.application.extension_points.get("conversation.stt")
- if ext_point:
- ext_point.register(self._provider)
- pinfo("DeepSpeech STT Plugin: Extension registriert")
- # Event-Handler registrieren
- self._register_event_handlers()
- pinfo(f"DeepSpeech STT Plugin: Geladen (Modell: {model_name})")
- def _register_event_handlers(self) -> None:
- """Registriert Event-Handler."""
- em = self.application.events
- @em.on("raw_audio_received")
- async def on_audio_received(event_name: str, data: dict) -> None:
- """Verarbeitet empfangene Audio-Daten."""
- if not self._provider or not self._provider.is_ready:
- return
- audio_data = data.get("audio_data")
- satellite_id = data.get("satellite_id")
- session_id = data.get("session_id")
- if not audio_data:
- return
- if isinstance(audio_data, str):
- audio_data = bytes.fromhex(audio_data)
- from trixy_core.utils.debug import pinfo
- pinfo(f"DeepSpeech STT: Verarbeite Audio ({len(audio_data)} bytes)")
- try:
- result = await self._provider.transcribe(audio_data)
- await em.emit("stt_completed", {
- "text": result.text,
- "confidence": result.confidence,
- "language": result.language,
- "provider": "deepspeech",
- "satellite_id": satellite_id,
- "session_id": session_id,
- "duration_seconds": result.duration_seconds,
- "processing_time_ms": result.processing_time_ms,
- "word_timings": [w.__dict__ for w in result.word_timings],
- })
- pinfo(f"DeepSpeech STT: '{result.text}' ({result.processing_time_ms:.0f}ms)")
- # Event-Brücke: speech_recognized für NLP-Pipeline
- if result.text.strip():
- from trixy_core.events.event_data.basic import SpeechRecognized
- speech_event = SpeechRecognized(
- satellite_id=satellite_id or "",
- text=result.text,
- confidence=result.confidence,
- language=result.language or "de",
- is_final=True,
- source="stt",
- )
- speech_event.metadata["session_id"] = session_id or ""
- await em.trigger("speech_recognized", speech_event)
- except Exception as e:
- from trixy_core.utils.debug import perror
- perror(f"DeepSpeech STT Fehler: {e}")
- await em.emit("stt_error", {
- "error": str(e),
- "provider": "deepspeech",
- "satellite_id": satellite_id,
- "session_id": session_id,
- })
- async def on_unload(self) -> None:
- """Plugin wird entladen."""
- if self._provider:
- await self._provider.shutdown()
- self._provider = None
- from trixy_core.utils.debug import pinfo
- pinfo("DeepSpeech STT Plugin: Entladen")
- @property
- def provider(self) -> DeepSpeechSTTProvider | None:
- """STT-Provider."""
- return self._provider
- # Plugin-Export
- Plugin = DeepSpeechSTTPlugin
|