| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- # -*- coding: utf-8 -*-
- """
- NLP Classifier Plugin — Intent-Erkennung via trainiertem ONNX-Modell.
- 3-stufige Pipeline:
- 1. KeywordMatcher (< 5ms) — Pattern-basiert, hoechste Praezision
- 2. ONNX-Classifier (< 100ms) — Trainiertes Sentence-Embedding-Modell
- 3. LLM-Fallback — Nur wenn nlp_llm Plugin aktiv und Confidence zu niedrig
- Hoert auf speech_recognized mit HIGHEST Prioritaet.
- Bei ausreichender Confidence wird das Event gecancelt,
- damit das nlp_llm Plugin nicht zusaetzlich aufgerufen wird.
- Event-Flow:
- speech_recognized (HIGHEST Prioritaet)
- ↓
- [Classifier Plugin] KeywordMatcher → ONNX → Fallback
- ↓
- intent_received (mit confidence, slots, tones)
- ↓
- [Intent-Handler Plugin] verarbeitet Intent
- """
- from __future__ import annotations
- import time
- from pathlib import Path
- from typing import Any
- from trixy_core.events.decorators import TrixyEvent
- from trixy_core.events.enums import EventPriority
- from trixy_core.events.event_data.basic import IntentReceived, SpeechRecognized
- from trixy_core.nlp.intent_classifier import IntentClassifier
- from trixy_core.nlp.tone import get_analyzer, analyze_tone, emit_tone_events
- from trixy_core.plugins.trixy_plugin import TrixyPlugin
- from trixy_core.utils.debug import pdebug, perror, pinfo, pwarn
- class NLPClassifierPlugin(TrixyPlugin):
- """
- NLP-Backend via trainiertem Intent-Classifier.
- Nutzt ein ONNX-Modell fuer schnelle Intent-Erkennung (<100ms auf Pi).
- Faellt auf das LLM-Plugin zurueck wenn Confidence zu niedrig.
- """
- NAME = "nlp_classifier"
- VERSION = "1.0.0"
- DESCRIPTION = "Intent-Erkennung via trainiertem ONNX-Classifier"
- AUTHOR = "Trixy"
- def __init__(self, application, plugin_path, config=None) -> None:
- super().__init__(application, plugin_path, config)
- self._classifier: IntentClassifier | None = None
- self._keyword_matcher = None
- self._entity_registry = None
- async def on_load(self) -> None:
- """Plugin laden: Classifier, KeywordMatcher, ToneAnalyzer initialisieren."""
- pinfo(f"[Classifier] Lade Plugin v{self.VERSION}...")
- # 1. Intent-Classifier laden
- model_dir = self.get_config_value("model_dir", "models/intent")
- self._classifier = IntentClassifier()
- ok = await self._classifier.load(model_dir)
- if ok:
- pinfo(f"[Classifier] ONNX-Modell geladen: {self._classifier.intent_count} Intents")
- else:
- pwarn("[Classifier] ONNX-Modell nicht verfuegbar — nur KeywordMatcher aktiv")
- # 2. KeywordMatcher initialisieren (Pattern-basiert, schnellster Pfad)
- try:
- from trixy_core.nlp.keyword_matcher import KeywordIntentMatcher
- from trixy_core.nlp.entities import EntityRegistry
- from trixy_core.nlp.intent_registry import IntentRegistry
- matcher_config = {"enabled": True}
- self._keyword_matcher = KeywordIntentMatcher(matcher_config)
- # Entity-Registry laden
- nlp_dir = Path(__file__).resolve().parent.parent.parent / "trixy_core" / "nlp"
- self._entity_registry = EntityRegistry()
- self._entity_registry.load(nlp_dir, "de")
- self._keyword_matcher.set_entity_registry(self._entity_registry)
- # Intents aus der Registry laden und kompilieren
- registry = IntentRegistry.get_instance()
- all_intents = registry.get_all_as_dict()
- # System-Intents hinzufuegen
- try:
- from trixy_core.nlp.system_intents import get_system_intents_for_context
- system_intents = get_system_intents_for_context()
- all_intents = system_intents + all_intents
- except ImportError:
- pass
- self._keyword_matcher.compile_intents(all_intents)
- pinfo(f"[Classifier] KeywordMatcher: {len(all_intents)} Intents kompiliert")
- except Exception as e:
- pwarn(f"[Classifier] KeywordMatcher nicht verfuegbar: {e}")
- # 3. ToneAnalyzer laden
- tone_file = self.get_config_value("tone_keywords_file", "config/tone_keywords.json")
- if Path(tone_file).is_file():
- get_analyzer().load_keywords(tone_file)
- pdebug(f"[Classifier] ToneAnalyzer geladen")
- pinfo(f"[Classifier] Plugin geladen — Pipeline: KeywordMatcher → ONNX → LLM-Fallback")
- async def on_unload(self) -> None:
- """Plugin entladen."""
- self._classifier = None
- self._keyword_matcher = None
- self._entity_registry = None
- pdebug("[Classifier] Plugin entladen")
- # === KeywordMatcher aktualisieren bei Plugin-Aenderungen ===
- @TrixyEvent(["plugin_loaded", "plugin_unloaded"])
- async def on_plugin_change(self, event_name: str, event_data) -> None:
- """Aktualisiert den KeywordMatcher wenn Plugins sich aendern."""
- plugin_name = getattr(event_data, "plugin_name", "")
- if plugin_name == self.NAME:
- return
- if self._keyword_matcher:
- try:
- from trixy_core.nlp.intent_registry import IntentRegistry
- registry = IntentRegistry.get_instance()
- all_intents = registry.get_all_as_dict()
- self._keyword_matcher.compile_intents(all_intents)
- pdebug(f"[Classifier] KeywordMatcher aktualisiert: {len(all_intents)} Intents")
- except Exception:
- pass
- # === Intent-Erkennung ===
- @TrixyEvent(["speech_recognized"], priority=EventPriority.HIGHEST)
- async def on_speech_recognized(self, event_name: str, event_data: SpeechRecognized) -> None:
- """
- Erkennt Intent aus Spracheingabe.
- Prioritaet HIGHEST — laeuft VOR dem nlp_llm Plugin.
- Bei ausreichender Confidence wird das Event gecancelt.
- """
- text = getattr(event_data, "text", "")
- if not text or not text.strip():
- return
- satellite_id = getattr(event_data, "satellite_id", "")
- min_confidence = self.get_config_value("min_confidence", 0.7)
- log_predictions = self.get_config_value("log_predictions", True)
- t_start = time.monotonic()
- # --- Stufe 1: KeywordMatcher (< 5ms) ---
- intent = ""
- confidence = 0.0
- slots: dict[str, Any] = {}
- matched_by = ""
- if self._keyword_matcher:
- kw_match = self._keyword_matcher.match(text)
- if kw_match and kw_match.confidence >= min_confidence:
- intent = kw_match.intent
- confidence = kw_match.confidence
- slots = kw_match.slots
- matched_by = "keyword"
- if log_predictions:
- t_kw = (time.monotonic() - t_start) * 1000
- pinfo(
- f"[Classifier] KeywordMatch: '{intent}' "
- f"(conf={confidence:.2f}, {t_kw:.1f}ms)"
- )
- # --- Stufe 2: ONNX-Classifier (< 100ms) ---
- if not intent and self._classifier and self._classifier.is_loaded:
- prediction = await self._classifier.classify(text, min_confidence=min_confidence)
- if prediction.confidence >= min_confidence:
- intent = prediction.intent
- confidence = prediction.confidence
- matched_by = "classifier"
- # Slot-Extraktion via KeywordMatcher (der Classifier erkennt Intents,
- # der KeywordMatcher extrahiert Slots aus den Patterns)
- slots = prediction.slots or {}
- if self._keyword_matcher:
- kw_slots = self._keyword_matcher.match(text)
- if kw_slots and kw_slots.slots:
- slots = kw_slots.slots
- pdebug(f"[Classifier] Slots via KeywordMatcher: {slots}")
- if log_predictions:
- pinfo(
- f"[Classifier] ONNX: '{intent}' "
- f"(conf={confidence:.2f}, {prediction.inference_ms:.1f}ms, "
- f"slots={slots})"
- )
- # --- Stufe 3: LLM-Fallback ---
- if not intent:
- fallback = self.get_config_value("fallback_to_llm", True)
- if fallback:
- t_elapsed = (time.monotonic() - t_start) * 1000
- if log_predictions:
- pdebug(
- f"[Classifier] Keine Erkennung ({t_elapsed:.1f}ms) "
- f"— Fallback an LLM"
- )
- # Event NICHT canceln → nlp_llm Plugin uebernimmt
- return
- # Kein Fallback — unknown emittieren
- intent = "unknown"
- confidence = 0.0
- matched_by = "none"
- # --- Tone-Analyse ---
- tone_results = analyze_tone(text)
- tones = [r.tag for r in tone_results]
- # Tone-Events emittieren
- events = getattr(self._application, "events", None)
- if events and tone_results:
- await emit_tone_events(tone_results, text, events)
- # --- intent_received emittieren ---
- t_total = (time.monotonic() - t_start) * 1000
- # Session-Info sammeln
- session_info = self._get_session_info(satellite_id)
- intent_event = IntentReceived(
- satellite_id=satellite_id,
- intent=intent,
- confidence=confidence,
- slots=slots,
- original_text=text,
- session_id=session_info.get("session_id", ""),
- room_id=session_info.get("room_id", ""),
- wakeword_type=session_info.get("wakeword_type", "custom"),
- is_authenticated=self._is_authenticated(satellite_id),
- language=getattr(event_data, "language", "de") or "de",
- metadata={
- "matched_by": matched_by,
- "tones": [{"tag": r.tag, "category": r.category} for r in tone_results],
- "inference_ms": t_total,
- },
- )
- pinfo(
- f"[Classifier] Intent: '{intent}' (conf={confidence:.2f}, "
- f"by={matched_by}, {t_total:.1f}ms, "
- f"tones={tones[:3]})"
- )
- await self._application.events.trigger("intent_received", intent_event)
- # Event canceln — verhindert dass nlp_llm nochmal laeuft
- if hasattr(event_data, "cancel"):
- event_data.cancel()
- # === Hilfsmethoden ===
- def _get_session_info(self, satellite_id: str) -> dict[str, Any]:
- """Holt Session-Info fuer den Satellite."""
- try:
- if hasattr(self._application, "satellites"):
- satellite = self._application.satellites.get(satellite_id)
- if satellite:
- conv = getattr(satellite, "conversation", None)
- if conv:
- return {
- "session_id": getattr(conv, "session_id", ""),
- "room_id": getattr(satellite, "room", ""),
- "wakeword_type": getattr(conv, "wakeword_type", "custom"),
- }
- return {"room_id": getattr(satellite, "room", "")}
- except Exception:
- pass
- return {}
- def _is_authenticated(self, satellite_id: str) -> bool:
- """Prueft ob der Satellite authentifiziert ist (Admin-Befehle)."""
- # TODO: Admin-Session-Verwaltung (wie in nlp_llm)
- return False
|