# -*- coding: utf-8 -*- """ Service-Container für die Verwaltung aller Trixy-Services. Verwaltet den Lebenszyklus, Abhängigkeiten und bietet Zugriff auf registrierte Services. """ import asyncio from typing import TYPE_CHECKING, TypeVar, overload from trixy_core.service.enums import ServiceState from trixy_core.service.iservice import IService from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn if TYPE_CHECKING: from trixy_core.application import IApplication T = TypeVar("T", bound=IService) class ServiceContainer: """ Container für die Verwaltung aller Services. Bietet: - Prioritätsbasierte Startreihenfolge - Abhängigkeitsauflösung und -injektion - Gesundheitsprüfungen mit Auto-Restart - Registry mit __getitem__ Zugriff """ def __init__(self, application: "IApplication") -> None: """ Initialisiert den Service-Container. Args: application: Referenz zur Hauptanwendung """ self._application = application self._services: dict[str, IService] = {} self._start_order: list[str] = [] self._health_check_interval: float = 30.0 self._health_check_task: asyncio.Task | None = None self._running = False self._ready = False # True sobald start_all() inkl. Plugins komplett ist @property def services(self) -> dict[str, IService]: """Gibt alle registrierten Services zurück.""" return self._services.copy() @property def is_ready(self) -> bool: """True wenn der Server vollstaendig gestartet und einsatzbereit ist. Gesetzt durch die Anwendung nach erfolgreichem start_all() und abgeschlossenem Plugin-Laden. Wird beim Shutdown wieder auf False gesetzt. """ return self._ready def set_ready(self, ready: bool = True) -> None: """Markiert den Server als bereit (bzw. nicht bereit bei Shutdown). Von der Anwendung aufzurufen nachdem alle Services UND Plugins erfolgreich gestartet wurden. """ self._ready = ready def register(self, service_class: type[T]) -> T: """ Registriert eine Service-Klasse. Args: service_class: Die zu registrierende Service-Klasse Returns: Die erstellte Service-Instanz Raises: ValueError: Wenn der Service bereits registriert ist """ name = service_class.NAME or service_class.__name__ if name in self._services: raise ValueError(f"Service '{name}' ist bereits registriert") service = service_class(self._application) self._services[name] = service pdebug(f"Service registriert: {name}") return service def register_instance(self, service: IService) -> None: """ Registriert eine bereits erstellte Service-Instanz. Args: service: Die zu registrierende Service-Instanz Raises: ValueError: Wenn der Service bereits registriert ist """ name = service.name if name in self._services: raise ValueError(f"Service '{name}' ist bereits registriert") self._services[name] = service pdebug(f"Service-Instanz registriert: {name}") def get_service(self, name: str) -> IService | None: """ Gibt einen Service nach Namen zurück. Args: name: Name des Services Returns: Service-Instanz oder None wenn nicht gefunden """ return self._services.get(name) def get_service_typed(self, name: str, service_type: type[T]) -> T | None: """ Gibt einen Service mit Typ-Casting zurück. Args: name: Name des Services service_type: Erwarteter Service-Typ Returns: Getypte Service-Instanz oder None """ service = self._services.get(name) if service is not None and isinstance(service, service_type): return service return None @overload def __getitem__(self, key: str) -> IService: ... @overload def __getitem__(self, key: type[T]) -> T: ... def __getitem__(self, key: str | type[T]) -> IService | T: """ Gibt einen Service per Index-Zugriff zurück. Args: key: Service-Name oder Service-Klasse Returns: Service-Instanz Raises: KeyError: Wenn der Service nicht gefunden wird """ if isinstance(key, str): if key not in self._services: raise KeyError(f"Service '{key}' nicht gefunden") return self._services[key] else: name = key.NAME or key.__name__ if name not in self._services: raise KeyError(f"Service '{name}' nicht gefunden") return self._services[name] def __contains__(self, key: str | type[IService]) -> bool: """Prüft, ob ein Service registriert ist.""" if isinstance(key, str): return key in self._services name = key.NAME or key.__name__ return name in self._services def _resolve_start_order(self) -> list[str]: """ Löst die Startreihenfolge basierend auf Priorität und Abhängigkeiten auf. Returns: Liste der Service-Namen in Startreihenfolge Raises: ValueError: Bei zirkulären Abhängigkeiten oder fehlenden Services """ # Sortiere nach Priorität sorted_services = sorted( self._services.items(), key=lambda x: x[1].PRIORITY ) resolved: list[str] = [] unresolved: set[str] = set(self._services.keys()) def resolve(name: str, chain: set[str]) -> None: """Rekursive Abhängigkeitsauflösung.""" if name in resolved: return if name in chain: raise ValueError( f"Zirkuläre Abhängigkeit erkannt: {' -> '.join(chain)} -> {name}" ) if name not in self._services: raise ValueError(f"Abhängiger Service '{name}' nicht gefunden") service = self._services[name] chain = chain | {name} for dep in service.DEPENDENCIES: if dep not in self._services: raise ValueError( f"Service '{name}' benötigt '{dep}', aber dieser ist nicht registriert" ) resolve(dep, chain) resolved.append(name) unresolved.discard(name) for name, _ in sorted_services: if name in unresolved: resolve(name, set()) return resolved async def start_all(self) -> None: """ Startet alle registrierten Services in der richtigen Reihenfolge. Raises: Exception: Bei Fehlern während des Starts """ pinfo("Starte alle Services...") self._start_order = self._resolve_start_order() pdebug(f"Startreihenfolge: {self._start_order}") for name in self._start_order: await self._start_service(name) self._running = True self._health_check_task = asyncio.create_task(self._health_check_loop()) pinfo("Alle Services gestartet") async def _start_service(self, name: str) -> None: """ Startet einen einzelnen Service. Args: name: Name des zu startenden Services """ service = self._services[name] if service.state == ServiceState.RUNNING: return pdebug(f"Starte Service: {name}") service._set_state(ServiceState.STARTING) try: await service.on_pre_start() await service.start() service._set_state(ServiceState.RUNNING) await service.on_post_start() # Benachrichtige abhängige Services for other_name, other_service in self._services.items(): if name in other_service.DEPENDENCIES: await other_service.on_dependency_ready(name) pinfo(f"Service gestartet: {name}") # Event ausloesen await self._emit("service_started", { "service_name": name, "priority": service.PRIORITY.name if hasattr(service.PRIORITY, "name") else str(service.PRIORITY), "group": service.GROUP.name if hasattr(service.GROUP, "name") else str(service.GROUP), }) except Exception as e: service._set_state(ServiceState.FAILED) perror(f"Fehler beim Starten von Service '{name}': {e}") await self._emit("service_error", { "service_name": name, "error": str(e), "traceback": __import__("traceback").format_exc(), }) raise async def stop_all(self) -> None: """Stoppt alle Services in umgekehrter Startreihenfolge.""" pinfo("Stoppe alle Services...") self._running = False self._ready = False if self._health_check_task: self._health_check_task.cancel() try: await self._health_check_task except asyncio.CancelledError: pass self._health_check_task = None # Stoppe in umgekehrter Reihenfolge for name in reversed(self._start_order): await self._stop_service(name) pinfo("Alle Services gestoppt") async def _emit(self, event_name: str, data: dict) -> None: """Hilfsmethode zum Ausloesen von Events (falls EventManager verfuegbar).""" events = getattr(self._application, "events", None) if events: try: await events.emit(event_name, data) except Exception: pass async def _stop_service(self, name: str) -> None: """ Stoppt einen einzelnen Service. Args: name: Name des zu stoppenden Services """ service = self._services[name] if service.state == ServiceState.STOPPED: return pdebug(f"Stoppe Service: {name}") service._set_state(ServiceState.STOPPING) try: await service.on_pre_stop() await service.stop() service._set_state(ServiceState.STOPPED) await service.on_post_stop() pinfo(f"Service gestoppt: {name}") await self._emit("service_stopped", { "service_name": name, "reason": "shutdown", }) except Exception as e: perror(f"Fehler beim Stoppen von Service '{name}': {e}") service._set_state(ServiceState.STOPPED) await self._emit("service_error", { "service_name": name, "error": str(e), "traceback": __import__("traceback").format_exc(), }) async def restart_service(self, name: str) -> None: """ Startet einen Service neu. Args: name: Name des neu zu startenden Services Raises: KeyError: Wenn der Service nicht existiert """ if name not in self._services: raise KeyError(f"Service '{name}' nicht gefunden") pwarn(f"Starte Service neu: {name}") await self._stop_service(name) await self._start_service(name) async def _health_check_loop(self) -> None: """Führt periodische Gesundheitsprüfungen durch.""" while self._running: await asyncio.sleep(self._health_check_interval) if not self._running: break for name, service in self._services.items(): if service.state != ServiceState.RUNNING: continue try: healthy = await service.health_check() if not healthy: pwarn(f"Service '{name}' Gesundheitsprüfung fehlgeschlagen") await self.restart_service(name) except Exception as e: perror(f"Fehler bei Gesundheitsprüfung von '{name}': {e}") def set_health_check_interval(self, seconds: float) -> None: """ Setzt das Intervall für Gesundheitsprüfungen. Args: seconds: Intervall in Sekunden """ self._health_check_interval = seconds