service_container.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. # -*- coding: utf-8 -*-
  2. """
  3. Service-Container für die Verwaltung aller Trixy-Services.
  4. Verwaltet den Lebenszyklus, Abhängigkeiten und bietet
  5. Zugriff auf registrierte Services.
  6. """
  7. import asyncio
  8. from typing import TYPE_CHECKING, TypeVar, overload
  9. from trixy_core.service.enums import ServiceState
  10. from trixy_core.service.iservice import IService
  11. from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn
  12. if TYPE_CHECKING:
  13. from trixy_core.application import IApplication
  14. T = TypeVar("T", bound=IService)
  15. class ServiceContainer:
  16. """
  17. Container für die Verwaltung aller Services.
  18. Bietet:
  19. - Prioritätsbasierte Startreihenfolge
  20. - Abhängigkeitsauflösung und -injektion
  21. - Gesundheitsprüfungen mit Auto-Restart
  22. - Registry mit __getitem__ Zugriff
  23. """
  24. def __init__(self, application: "IApplication") -> None:
  25. """
  26. Initialisiert den Service-Container.
  27. Args:
  28. application: Referenz zur Hauptanwendung
  29. """
  30. self._application = application
  31. self._services: dict[str, IService] = {}
  32. self._start_order: list[str] = []
  33. self._health_check_interval: float = 30.0
  34. self._health_check_task: asyncio.Task | None = None
  35. self._running = False
  36. self._ready = False # True sobald start_all() inkl. Plugins komplett ist
  37. @property
  38. def services(self) -> dict[str, IService]:
  39. """Gibt alle registrierten Services zurück."""
  40. return self._services.copy()
  41. @property
  42. def is_ready(self) -> bool:
  43. """True wenn der Server vollstaendig gestartet und einsatzbereit ist.
  44. Gesetzt durch die Anwendung nach erfolgreichem start_all() und
  45. abgeschlossenem Plugin-Laden. Wird beim Shutdown wieder auf False gesetzt.
  46. """
  47. return self._ready
  48. def set_ready(self, ready: bool = True) -> None:
  49. """Markiert den Server als bereit (bzw. nicht bereit bei Shutdown).
  50. Von der Anwendung aufzurufen nachdem alle Services UND Plugins
  51. erfolgreich gestartet wurden.
  52. """
  53. self._ready = ready
  54. def register(self, service_class: type[T]) -> T:
  55. """
  56. Registriert eine Service-Klasse.
  57. Args:
  58. service_class: Die zu registrierende Service-Klasse
  59. Returns:
  60. Die erstellte Service-Instanz
  61. Raises:
  62. ValueError: Wenn der Service bereits registriert ist
  63. """
  64. name = service_class.NAME or service_class.__name__
  65. if name in self._services:
  66. raise ValueError(f"Service '{name}' ist bereits registriert")
  67. service = service_class(self._application)
  68. self._services[name] = service
  69. pdebug(f"Service registriert: {name}")
  70. return service
  71. def register_instance(self, service: IService) -> None:
  72. """
  73. Registriert eine bereits erstellte Service-Instanz.
  74. Args:
  75. service: Die zu registrierende Service-Instanz
  76. Raises:
  77. ValueError: Wenn der Service bereits registriert ist
  78. """
  79. name = service.name
  80. if name in self._services:
  81. raise ValueError(f"Service '{name}' ist bereits registriert")
  82. self._services[name] = service
  83. pdebug(f"Service-Instanz registriert: {name}")
  84. def get_service(self, name: str) -> IService | None:
  85. """
  86. Gibt einen Service nach Namen zurück.
  87. Args:
  88. name: Name des Services
  89. Returns:
  90. Service-Instanz oder None wenn nicht gefunden
  91. """
  92. return self._services.get(name)
  93. def get_service_typed(self, name: str, service_type: type[T]) -> T | None:
  94. """
  95. Gibt einen Service mit Typ-Casting zurück.
  96. Args:
  97. name: Name des Services
  98. service_type: Erwarteter Service-Typ
  99. Returns:
  100. Getypte Service-Instanz oder None
  101. """
  102. service = self._services.get(name)
  103. if service is not None and isinstance(service, service_type):
  104. return service
  105. return None
  106. @overload
  107. def __getitem__(self, key: str) -> IService: ...
  108. @overload
  109. def __getitem__(self, key: type[T]) -> T: ...
  110. def __getitem__(self, key: str | type[T]) -> IService | T:
  111. """
  112. Gibt einen Service per Index-Zugriff zurück.
  113. Args:
  114. key: Service-Name oder Service-Klasse
  115. Returns:
  116. Service-Instanz
  117. Raises:
  118. KeyError: Wenn der Service nicht gefunden wird
  119. """
  120. if isinstance(key, str):
  121. if key not in self._services:
  122. raise KeyError(f"Service '{key}' nicht gefunden")
  123. return self._services[key]
  124. else:
  125. name = key.NAME or key.__name__
  126. if name not in self._services:
  127. raise KeyError(f"Service '{name}' nicht gefunden")
  128. return self._services[name]
  129. def __contains__(self, key: str | type[IService]) -> bool:
  130. """Prüft, ob ein Service registriert ist."""
  131. if isinstance(key, str):
  132. return key in self._services
  133. name = key.NAME or key.__name__
  134. return name in self._services
  135. def _resolve_start_order(self) -> list[str]:
  136. """
  137. Löst die Startreihenfolge basierend auf Priorität und Abhängigkeiten auf.
  138. Returns:
  139. Liste der Service-Namen in Startreihenfolge
  140. Raises:
  141. ValueError: Bei zirkulären Abhängigkeiten oder fehlenden Services
  142. """
  143. # Sortiere nach Priorität
  144. sorted_services = sorted(
  145. self._services.items(),
  146. key=lambda x: x[1].PRIORITY
  147. )
  148. resolved: list[str] = []
  149. unresolved: set[str] = set(self._services.keys())
  150. def resolve(name: str, chain: set[str]) -> None:
  151. """Rekursive Abhängigkeitsauflösung."""
  152. if name in resolved:
  153. return
  154. if name in chain:
  155. raise ValueError(
  156. f"Zirkuläre Abhängigkeit erkannt: {' -> '.join(chain)} -> {name}"
  157. )
  158. if name not in self._services:
  159. raise ValueError(f"Abhängiger Service '{name}' nicht gefunden")
  160. service = self._services[name]
  161. chain = chain | {name}
  162. for dep in service.DEPENDENCIES:
  163. if dep not in self._services:
  164. raise ValueError(
  165. f"Service '{name}' benötigt '{dep}', aber dieser ist nicht registriert"
  166. )
  167. resolve(dep, chain)
  168. resolved.append(name)
  169. unresolved.discard(name)
  170. for name, _ in sorted_services:
  171. if name in unresolved:
  172. resolve(name, set())
  173. return resolved
  174. async def start_all(self) -> None:
  175. """
  176. Startet alle registrierten Services in der richtigen Reihenfolge.
  177. Raises:
  178. Exception: Bei Fehlern während des Starts
  179. """
  180. pinfo("Starte alle Services...")
  181. self._start_order = self._resolve_start_order()
  182. pdebug(f"Startreihenfolge: {self._start_order}")
  183. for name in self._start_order:
  184. await self._start_service(name)
  185. self._running = True
  186. self._health_check_task = asyncio.create_task(self._health_check_loop())
  187. pinfo("Alle Services gestartet")
  188. async def _start_service(self, name: str) -> None:
  189. """
  190. Startet einen einzelnen Service.
  191. Args:
  192. name: Name des zu startenden Services
  193. """
  194. service = self._services[name]
  195. if service.state == ServiceState.RUNNING:
  196. return
  197. pdebug(f"Starte Service: {name}")
  198. service._set_state(ServiceState.STARTING)
  199. try:
  200. await service.on_pre_start()
  201. await service.start()
  202. service._set_state(ServiceState.RUNNING)
  203. await service.on_post_start()
  204. # Benachrichtige abhängige Services
  205. for other_name, other_service in self._services.items():
  206. if name in other_service.DEPENDENCIES:
  207. await other_service.on_dependency_ready(name)
  208. pinfo(f"Service gestartet: {name}")
  209. # Event ausloesen
  210. await self._emit("service_started", {
  211. "service_name": name,
  212. "priority": service.PRIORITY.name if hasattr(service.PRIORITY, "name") else str(service.PRIORITY),
  213. "group": service.GROUP.name if hasattr(service.GROUP, "name") else str(service.GROUP),
  214. })
  215. except Exception as e:
  216. service._set_state(ServiceState.FAILED)
  217. perror(f"Fehler beim Starten von Service '{name}': {e}")
  218. await self._emit("service_error", {
  219. "service_name": name,
  220. "error": str(e),
  221. "traceback": __import__("traceback").format_exc(),
  222. })
  223. raise
  224. async def stop_all(self) -> None:
  225. """Stoppt alle Services in umgekehrter Startreihenfolge."""
  226. pinfo("Stoppe alle Services...")
  227. self._running = False
  228. self._ready = False
  229. if self._health_check_task:
  230. self._health_check_task.cancel()
  231. try:
  232. await self._health_check_task
  233. except asyncio.CancelledError:
  234. pass
  235. self._health_check_task = None
  236. # Stoppe in umgekehrter Reihenfolge
  237. for name in reversed(self._start_order):
  238. await self._stop_service(name)
  239. pinfo("Alle Services gestoppt")
  240. async def _emit(self, event_name: str, data: dict) -> None:
  241. """Hilfsmethode zum Ausloesen von Events (falls EventManager verfuegbar)."""
  242. events = getattr(self._application, "events", None)
  243. if events:
  244. try:
  245. await events.emit(event_name, data)
  246. except Exception:
  247. pass
  248. async def _stop_service(self, name: str) -> None:
  249. """
  250. Stoppt einen einzelnen Service.
  251. Args:
  252. name: Name des zu stoppenden Services
  253. """
  254. service = self._services[name]
  255. if service.state == ServiceState.STOPPED:
  256. return
  257. pdebug(f"Stoppe Service: {name}")
  258. service._set_state(ServiceState.STOPPING)
  259. try:
  260. await service.on_pre_stop()
  261. await service.stop()
  262. service._set_state(ServiceState.STOPPED)
  263. await service.on_post_stop()
  264. pinfo(f"Service gestoppt: {name}")
  265. await self._emit("service_stopped", {
  266. "service_name": name,
  267. "reason": "shutdown",
  268. })
  269. except Exception as e:
  270. perror(f"Fehler beim Stoppen von Service '{name}': {e}")
  271. service._set_state(ServiceState.STOPPED)
  272. await self._emit("service_error", {
  273. "service_name": name,
  274. "error": str(e),
  275. "traceback": __import__("traceback").format_exc(),
  276. })
  277. async def restart_service(self, name: str) -> None:
  278. """
  279. Startet einen Service neu.
  280. Args:
  281. name: Name des neu zu startenden Services
  282. Raises:
  283. KeyError: Wenn der Service nicht existiert
  284. """
  285. if name not in self._services:
  286. raise KeyError(f"Service '{name}' nicht gefunden")
  287. pwarn(f"Starte Service neu: {name}")
  288. await self._stop_service(name)
  289. await self._start_service(name)
  290. async def _health_check_loop(self) -> None:
  291. """Führt periodische Gesundheitsprüfungen durch."""
  292. while self._running:
  293. await asyncio.sleep(self._health_check_interval)
  294. if not self._running:
  295. break
  296. for name, service in self._services.items():
  297. if service.state != ServiceState.RUNNING:
  298. continue
  299. try:
  300. healthy = await service.health_check()
  301. if not healthy:
  302. pwarn(f"Service '{name}' Gesundheitsprüfung fehlgeschlagen")
  303. await self.restart_service(name)
  304. except Exception as e:
  305. perror(f"Fehler bei Gesundheitsprüfung von '{name}': {e}")
  306. def set_health_check_interval(self, seconds: float) -> None:
  307. """
  308. Setzt das Intervall für Gesundheitsprüfungen.
  309. Args:
  310. seconds: Intervall in Sekunden
  311. """
  312. self._health_check_interval = seconds