| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- # -*- coding: utf-8 -*-
- """
- Service-Container für die Verwaltung aller Trixy-Services.
- Verwaltet den Lebenszyklus, Abhängigkeiten und bietet
- Zugriff auf registrierte Services.
- """
- import asyncio
- from typing import TYPE_CHECKING, TypeVar, overload
- from trixy_core.service.enums import ServiceState
- from trixy_core.service.iservice import IService
- from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn
- if TYPE_CHECKING:
- from trixy_core.application import IApplication
- T = TypeVar("T", bound=IService)
- class ServiceContainer:
- """
- Container für die Verwaltung aller Services.
- Bietet:
- - Prioritätsbasierte Startreihenfolge
- - Abhängigkeitsauflösung und -injektion
- - Gesundheitsprüfungen mit Auto-Restart
- - Registry mit __getitem__ Zugriff
- """
- def __init__(self, application: "IApplication") -> None:
- """
- Initialisiert den Service-Container.
- Args:
- application: Referenz zur Hauptanwendung
- """
- self._application = application
- self._services: dict[str, IService] = {}
- self._start_order: list[str] = []
- self._health_check_interval: float = 30.0
- self._health_check_task: asyncio.Task | None = None
- self._running = False
- self._ready = False # True sobald start_all() inkl. Plugins komplett ist
- @property
- def services(self) -> dict[str, IService]:
- """Gibt alle registrierten Services zurück."""
- return self._services.copy()
- @property
- def is_ready(self) -> bool:
- """True wenn der Server vollstaendig gestartet und einsatzbereit ist.
- Gesetzt durch die Anwendung nach erfolgreichem start_all() und
- abgeschlossenem Plugin-Laden. Wird beim Shutdown wieder auf False gesetzt.
- """
- return self._ready
- def set_ready(self, ready: bool = True) -> None:
- """Markiert den Server als bereit (bzw. nicht bereit bei Shutdown).
- Von der Anwendung aufzurufen nachdem alle Services UND Plugins
- erfolgreich gestartet wurden.
- """
- self._ready = ready
- def register(self, service_class: type[T]) -> T:
- """
- Registriert eine Service-Klasse.
- Args:
- service_class: Die zu registrierende Service-Klasse
- Returns:
- Die erstellte Service-Instanz
- Raises:
- ValueError: Wenn der Service bereits registriert ist
- """
- name = service_class.NAME or service_class.__name__
- if name in self._services:
- raise ValueError(f"Service '{name}' ist bereits registriert")
- service = service_class(self._application)
- self._services[name] = service
- pdebug(f"Service registriert: {name}")
- return service
- def register_instance(self, service: IService) -> None:
- """
- Registriert eine bereits erstellte Service-Instanz.
- Args:
- service: Die zu registrierende Service-Instanz
- Raises:
- ValueError: Wenn der Service bereits registriert ist
- """
- name = service.name
- if name in self._services:
- raise ValueError(f"Service '{name}' ist bereits registriert")
- self._services[name] = service
- pdebug(f"Service-Instanz registriert: {name}")
- def get_service(self, name: str) -> IService | None:
- """
- Gibt einen Service nach Namen zurück.
- Args:
- name: Name des Services
- Returns:
- Service-Instanz oder None wenn nicht gefunden
- """
- return self._services.get(name)
- def get_service_typed(self, name: str, service_type: type[T]) -> T | None:
- """
- Gibt einen Service mit Typ-Casting zurück.
- Args:
- name: Name des Services
- service_type: Erwarteter Service-Typ
- Returns:
- Getypte Service-Instanz oder None
- """
- service = self._services.get(name)
- if service is not None and isinstance(service, service_type):
- return service
- return None
- @overload
- def __getitem__(self, key: str) -> IService: ...
- @overload
- def __getitem__(self, key: type[T]) -> T: ...
- def __getitem__(self, key: str | type[T]) -> IService | T:
- """
- Gibt einen Service per Index-Zugriff zurück.
- Args:
- key: Service-Name oder Service-Klasse
- Returns:
- Service-Instanz
- Raises:
- KeyError: Wenn der Service nicht gefunden wird
- """
- if isinstance(key, str):
- if key not in self._services:
- raise KeyError(f"Service '{key}' nicht gefunden")
- return self._services[key]
- else:
- name = key.NAME or key.__name__
- if name not in self._services:
- raise KeyError(f"Service '{name}' nicht gefunden")
- return self._services[name]
- def __contains__(self, key: str | type[IService]) -> bool:
- """Prüft, ob ein Service registriert ist."""
- if isinstance(key, str):
- return key in self._services
- name = key.NAME or key.__name__
- return name in self._services
- def _resolve_start_order(self) -> list[str]:
- """
- Löst die Startreihenfolge basierend auf Priorität und Abhängigkeiten auf.
- Returns:
- Liste der Service-Namen in Startreihenfolge
- Raises:
- ValueError: Bei zirkulären Abhängigkeiten oder fehlenden Services
- """
- # Sortiere nach Priorität
- sorted_services = sorted(
- self._services.items(),
- key=lambda x: x[1].PRIORITY
- )
- resolved: list[str] = []
- unresolved: set[str] = set(self._services.keys())
- def resolve(name: str, chain: set[str]) -> None:
- """Rekursive Abhängigkeitsauflösung."""
- if name in resolved:
- return
- if name in chain:
- raise ValueError(
- f"Zirkuläre Abhängigkeit erkannt: {' -> '.join(chain)} -> {name}"
- )
- if name not in self._services:
- raise ValueError(f"Abhängiger Service '{name}' nicht gefunden")
- service = self._services[name]
- chain = chain | {name}
- for dep in service.DEPENDENCIES:
- if dep not in self._services:
- raise ValueError(
- f"Service '{name}' benötigt '{dep}', aber dieser ist nicht registriert"
- )
- resolve(dep, chain)
- resolved.append(name)
- unresolved.discard(name)
- for name, _ in sorted_services:
- if name in unresolved:
- resolve(name, set())
- return resolved
- async def start_all(self) -> None:
- """
- Startet alle registrierten Services in der richtigen Reihenfolge.
- Raises:
- Exception: Bei Fehlern während des Starts
- """
- pinfo("Starte alle Services...")
- self._start_order = self._resolve_start_order()
- pdebug(f"Startreihenfolge: {self._start_order}")
- for name in self._start_order:
- await self._start_service(name)
- self._running = True
- self._health_check_task = asyncio.create_task(self._health_check_loop())
- pinfo("Alle Services gestartet")
- async def _start_service(self, name: str) -> None:
- """
- Startet einen einzelnen Service.
- Args:
- name: Name des zu startenden Services
- """
- service = self._services[name]
- if service.state == ServiceState.RUNNING:
- return
- pdebug(f"Starte Service: {name}")
- service._set_state(ServiceState.STARTING)
- try:
- await service.on_pre_start()
- await service.start()
- service._set_state(ServiceState.RUNNING)
- await service.on_post_start()
- # Benachrichtige abhängige Services
- for other_name, other_service in self._services.items():
- if name in other_service.DEPENDENCIES:
- await other_service.on_dependency_ready(name)
- pinfo(f"Service gestartet: {name}")
- # Event ausloesen
- await self._emit("service_started", {
- "service_name": name,
- "priority": service.PRIORITY.name if hasattr(service.PRIORITY, "name") else str(service.PRIORITY),
- "group": service.GROUP.name if hasattr(service.GROUP, "name") else str(service.GROUP),
- })
- except Exception as e:
- service._set_state(ServiceState.FAILED)
- perror(f"Fehler beim Starten von Service '{name}': {e}")
- await self._emit("service_error", {
- "service_name": name,
- "error": str(e),
- "traceback": __import__("traceback").format_exc(),
- })
- raise
- async def stop_all(self) -> None:
- """Stoppt alle Services in umgekehrter Startreihenfolge."""
- pinfo("Stoppe alle Services...")
- self._running = False
- self._ready = False
- if self._health_check_task:
- self._health_check_task.cancel()
- try:
- await self._health_check_task
- except asyncio.CancelledError:
- pass
- self._health_check_task = None
- # Stoppe in umgekehrter Reihenfolge
- for name in reversed(self._start_order):
- await self._stop_service(name)
- pinfo("Alle Services gestoppt")
- async def _emit(self, event_name: str, data: dict) -> None:
- """Hilfsmethode zum Ausloesen von Events (falls EventManager verfuegbar)."""
- events = getattr(self._application, "events", None)
- if events:
- try:
- await events.emit(event_name, data)
- except Exception:
- pass
- async def _stop_service(self, name: str) -> None:
- """
- Stoppt einen einzelnen Service.
- Args:
- name: Name des zu stoppenden Services
- """
- service = self._services[name]
- if service.state == ServiceState.STOPPED:
- return
- pdebug(f"Stoppe Service: {name}")
- service._set_state(ServiceState.STOPPING)
- try:
- await service.on_pre_stop()
- await service.stop()
- service._set_state(ServiceState.STOPPED)
- await service.on_post_stop()
- pinfo(f"Service gestoppt: {name}")
- await self._emit("service_stopped", {
- "service_name": name,
- "reason": "shutdown",
- })
- except Exception as e:
- perror(f"Fehler beim Stoppen von Service '{name}': {e}")
- service._set_state(ServiceState.STOPPED)
- await self._emit("service_error", {
- "service_name": name,
- "error": str(e),
- "traceback": __import__("traceback").format_exc(),
- })
- async def restart_service(self, name: str) -> None:
- """
- Startet einen Service neu.
- Args:
- name: Name des neu zu startenden Services
- Raises:
- KeyError: Wenn der Service nicht existiert
- """
- if name not in self._services:
- raise KeyError(f"Service '{name}' nicht gefunden")
- pwarn(f"Starte Service neu: {name}")
- await self._stop_service(name)
- await self._start_service(name)
- async def _health_check_loop(self) -> None:
- """Führt periodische Gesundheitsprüfungen durch."""
- while self._running:
- await asyncio.sleep(self._health_check_interval)
- if not self._running:
- break
- for name, service in self._services.items():
- if service.state != ServiceState.RUNNING:
- continue
- try:
- healthy = await service.health_check()
- if not healthy:
- pwarn(f"Service '{name}' Gesundheitsprüfung fehlgeschlagen")
- await self.restart_service(name)
- except Exception as e:
- perror(f"Fehler bei Gesundheitsprüfung von '{name}': {e}")
- def set_health_check_interval(self, seconds: float) -> None:
- """
- Setzt das Intervall für Gesundheitsprüfungen.
- Args:
- seconds: Intervall in Sekunden
- """
- self._health_check_interval = seconds
|