shutdown.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. # -*- coding: utf-8 -*-
  2. """
  3. Graceful Shutdown Manager für geordnetes Herunterfahren.
  4. Ermöglicht das kontrollierte Stoppen von Services mit Timeouts
  5. und Abhängigkeitsberücksichtigung.
  6. """
  7. from __future__ import annotations
  8. import asyncio
  9. import signal
  10. import sys
  11. from dataclasses import dataclass, field
  12. from datetime import datetime
  13. from enum import Enum, auto
  14. from typing import TYPE_CHECKING, Any, Callable, Coroutine
  15. from trixy_core.service.enums import ServicePriority, ServiceState
  16. if TYPE_CHECKING:
  17. from trixy_core.service.iservice import IService
  18. class ShutdownPhase(Enum):
  19. """
  20. Phasen des Shutdown-Prozesses.
  21. """
  22. NOT_STARTED = auto()
  23. """Shutdown noch nicht gestartet."""
  24. PRE_SHUTDOWN = auto()
  25. """Vorbereitungsphase - Hooks werden ausgeführt."""
  26. STOPPING_SERVICES = auto()
  27. """Services werden gestoppt."""
  28. POST_SHUTDOWN = auto()
  29. """Aufräumphase - Finale Hooks."""
  30. COMPLETED = auto()
  31. """Shutdown abgeschlossen."""
  32. FAILED = auto()
  33. """Shutdown mit Fehlern beendet."""
  34. class ShutdownReason(Enum):
  35. """
  36. Grund für den Shutdown.
  37. """
  38. NORMAL = auto()
  39. """Normaler Shutdown durch Benutzeranforderung."""
  40. SIGNAL = auto()
  41. """Shutdown durch Signal (SIGTERM, SIGINT)."""
  42. ERROR = auto()
  43. """Shutdown durch kritischen Fehler."""
  44. TIMEOUT = auto()
  45. """Shutdown durch Timeout."""
  46. MANUAL = auto()
  47. """Manueller Shutdown durch API."""
  48. @dataclass
  49. class ShutdownResult:
  50. """
  51. Ergebnis eines Service-Shutdowns.
  52. """
  53. service_name: str
  54. """Name des Services."""
  55. success: bool
  56. """Ob der Shutdown erfolgreich war."""
  57. duration: float
  58. """Dauer in Sekunden."""
  59. error: Exception | None = None
  60. """Fehler, falls aufgetreten."""
  61. timed_out: bool = False
  62. """Ob ein Timeout aufgetreten ist."""
  63. def __str__(self) -> str:
  64. """String-Repräsentation."""
  65. status = "OK" if self.success else "FAILED"
  66. if self.timed_out:
  67. status = "TIMEOUT"
  68. return f"{self.service_name}: {status} ({self.duration:.3f}s)"
  69. @dataclass
  70. class ShutdownReport:
  71. """
  72. Gesamtbericht des Shutdown-Prozesses.
  73. """
  74. reason: ShutdownReason
  75. """Grund für den Shutdown."""
  76. phase: ShutdownPhase
  77. """Endphase des Shutdowns."""
  78. started_at: datetime
  79. """Startzeitpunkt."""
  80. completed_at: datetime | None = None
  81. """Endzeitpunkt."""
  82. results: list[ShutdownResult] = field(default_factory=list)
  83. """Ergebnisse pro Service."""
  84. errors: list[str] = field(default_factory=list)
  85. """Allgemeine Fehler."""
  86. @property
  87. def duration(self) -> float:
  88. """Gesamtdauer in Sekunden."""
  89. if self.completed_at is None:
  90. return 0.0
  91. return (self.completed_at - self.started_at).total_seconds()
  92. @property
  93. def success(self) -> bool:
  94. """Ob der gesamte Shutdown erfolgreich war."""
  95. return (
  96. self.phase == ShutdownPhase.COMPLETED
  97. and all(r.success for r in self.results)
  98. )
  99. @property
  100. def failed_services(self) -> list[str]:
  101. """Liste der fehlgeschlagenen Services."""
  102. return [r.service_name for r in self.results if not r.success]
  103. def to_dict(self) -> dict[str, Any]:
  104. """Konvertiert in ein Dictionary."""
  105. return {
  106. "reason": self.reason.name,
  107. "phase": self.phase.name,
  108. "success": self.success,
  109. "duration": self.duration,
  110. "started_at": self.started_at.isoformat(),
  111. "completed_at": self.completed_at.isoformat() if self.completed_at else None,
  112. "services": [
  113. {
  114. "name": r.service_name,
  115. "success": r.success,
  116. "duration": r.duration,
  117. "timed_out": r.timed_out,
  118. "error": str(r.error) if r.error else None,
  119. }
  120. for r in self.results
  121. ],
  122. "failed_services": self.failed_services,
  123. "errors": self.errors,
  124. }
  125. @dataclass
  126. class ShutdownConfig:
  127. """
  128. Konfiguration für den Shutdown-Manager.
  129. """
  130. default_timeout: float = 30.0
  131. """Standard-Timeout pro Service in Sekunden."""
  132. total_timeout: float = 120.0
  133. """Maximale Gesamtzeit für Shutdown."""
  134. force_after_timeout: bool = True
  135. """Ob nach Timeout forciert wird."""
  136. parallel_shutdown: bool = False
  137. """Ob Services parallel gestoppt werden (wenn möglich)."""
  138. respect_dependencies: bool = True
  139. """Ob Abhängigkeiten beim Shutdown berücksichtigt werden."""
  140. service_timeouts: dict[str, float] = field(default_factory=dict)
  141. """Individuelle Timeouts pro Service."""
  142. register_signals: bool = True
  143. """Ob Signal-Handler registriert werden."""
  144. class GracefulShutdownManager:
  145. """
  146. Manager für geordnetes Herunterfahren von Services.
  147. Stoppt Services in umgekehrter Startreihenfolge unter
  148. Berücksichtigung von Abhängigkeiten und Timeouts.
  149. Example:
  150. manager = GracefulShutdownManager(service_container)
  151. # Normaler Shutdown
  152. report = await manager.shutdown()
  153. # Mit Hooks
  154. manager.add_pre_shutdown_hook(save_state)
  155. manager.add_post_shutdown_hook(cleanup)
  156. # Signal-Handler automatisch registrieren
  157. manager.register_signal_handlers()
  158. """
  159. def __init__(
  160. self,
  161. services: dict[str, "IService"] | None = None,
  162. config: ShutdownConfig | None = None,
  163. ) -> None:
  164. """
  165. Initialisiert den Shutdown-Manager.
  166. Args:
  167. services: Dictionary der Services (Name → Service).
  168. config: Shutdown-Konfiguration.
  169. """
  170. self._services = services or {}
  171. self._config = config or ShutdownConfig()
  172. self._phase = ShutdownPhase.NOT_STARTED
  173. self._reason: ShutdownReason | None = None
  174. self._pre_hooks: list[Callable[[], Coroutine[Any, Any, None]]] = []
  175. self._post_hooks: list[Callable[[ShutdownReport], Coroutine[Any, Any, None]]] = []
  176. self._shutdown_lock = asyncio.Lock()
  177. self._shutdown_event = asyncio.Event()
  178. if self._config.register_signals:
  179. self._register_default_signals()
  180. @property
  181. def phase(self) -> ShutdownPhase:
  182. """Aktuelle Shutdown-Phase."""
  183. return self._phase
  184. @property
  185. def is_shutting_down(self) -> bool:
  186. """Prüft, ob ein Shutdown läuft."""
  187. return self._phase not in (
  188. ShutdownPhase.NOT_STARTED,
  189. ShutdownPhase.COMPLETED,
  190. ShutdownPhase.FAILED,
  191. )
  192. def set_services(self, services: dict[str, "IService"]) -> None:
  193. """
  194. Setzt die zu verwaltenden Services.
  195. Args:
  196. services: Dictionary der Services.
  197. """
  198. self._services = services
  199. def add_pre_shutdown_hook(
  200. self,
  201. hook: Callable[[], Coroutine[Any, Any, None]],
  202. ) -> None:
  203. """
  204. Fügt einen Pre-Shutdown-Hook hinzu.
  205. Args:
  206. hook: Async-Funktion die vor dem Shutdown ausgeführt wird.
  207. """
  208. self._pre_hooks.append(hook)
  209. def add_post_shutdown_hook(
  210. self,
  211. hook: Callable[[ShutdownReport], Coroutine[Any, Any, None]],
  212. ) -> None:
  213. """
  214. Fügt einen Post-Shutdown-Hook hinzu.
  215. Args:
  216. hook: Async-Funktion die nach dem Shutdown ausgeführt wird.
  217. """
  218. self._post_hooks.append(hook)
  219. def get_timeout_for_service(self, service_name: str) -> float:
  220. """
  221. Gibt den Timeout für einen Service zurück.
  222. Args:
  223. service_name: Name des Services.
  224. Returns:
  225. Timeout in Sekunden.
  226. """
  227. return self._config.service_timeouts.get(
  228. service_name,
  229. self._config.default_timeout,
  230. )
  231. def _register_default_signals(self) -> None:
  232. """Registriert Standard-Signal-Handler."""
  233. try:
  234. loop = asyncio.get_running_loop()
  235. except RuntimeError:
  236. # Kein Event-Loop läuft - Handler später registrieren
  237. return
  238. if sys.platform == "win32":
  239. # Windows: add_signal_handler nicht verfügbar
  240. for sig in (signal.SIGINT, signal.SIGTERM):
  241. signal.signal(sig, lambda s, f, _loop=loop: _loop.call_soon_threadsafe(
  242. lambda _s=s: asyncio.create_task(self._signal_handler(_s))
  243. ))
  244. else:
  245. # Unix-Signale
  246. for sig in (signal.SIGTERM, signal.SIGINT):
  247. loop.add_signal_handler(
  248. sig,
  249. lambda s=sig: asyncio.create_task(
  250. self._signal_handler(s)
  251. ),
  252. )
  253. async def _signal_handler(self, sig: signal.Signals) -> None:
  254. """
  255. Handler für System-Signale.
  256. Args:
  257. sig: Empfangenes Signal.
  258. """
  259. await self.shutdown(ShutdownReason.SIGNAL)
  260. def _get_shutdown_order(self) -> list[str]:
  261. """
  262. Berechnet die Shutdown-Reihenfolge.
  263. Returns:
  264. Liste von Service-Namen in Shutdown-Reihenfolge.
  265. """
  266. if not self._config.respect_dependencies:
  267. # Einfach nach Priorität sortieren (umgekehrt)
  268. return sorted(
  269. self._services.keys(),
  270. key=lambda n: (
  271. -self._services[n].PRIORITY, # Höhere Priorität zuerst stoppen
  272. n,
  273. ),
  274. )
  275. # Topologische Sortierung mit Abhängigkeiten
  276. # Services ohne Abhängige zuerst stoppen
  277. dependents: dict[str, set[str]] = {name: set() for name in self._services}
  278. for name, service in self._services.items():
  279. for dep in service.DEPENDENCIES:
  280. if dep in dependents:
  281. dependents[dep].add(name)
  282. result: list[str] = []
  283. remaining = set(self._services.keys())
  284. while remaining:
  285. # Finde Services ohne laufende Abhängige
  286. can_stop = [
  287. name
  288. for name in remaining
  289. if not (dependents[name] & remaining)
  290. ]
  291. if not can_stop:
  292. # Zyklische Abhängigkeit - Rest nach Priorität
  293. can_stop = sorted(
  294. remaining,
  295. key=lambda n: (-self._services[n].PRIORITY, n),
  296. )
  297. # Nach Priorität sortieren (höhere zuerst)
  298. can_stop.sort(key=lambda n: (-self._services[n].PRIORITY, n))
  299. for name in can_stop:
  300. result.append(name)
  301. remaining.discard(name)
  302. return result
  303. async def _stop_service(
  304. self,
  305. service: "IService",
  306. timeout: float,
  307. ) -> ShutdownResult:
  308. """
  309. Stoppt einen einzelnen Service.
  310. Args:
  311. service: Der zu stoppende Service.
  312. timeout: Timeout in Sekunden.
  313. Returns:
  314. Ergebnis des Shutdowns.
  315. """
  316. start_time = datetime.now()
  317. name = service.name
  318. try:
  319. # Pre-Stop Hook
  320. await asyncio.wait_for(
  321. service.on_pre_stop(),
  322. timeout=timeout / 3,
  323. )
  324. # Stop
  325. await asyncio.wait_for(
  326. service.stop(),
  327. timeout=timeout / 3,
  328. )
  329. # Post-Stop Hook
  330. await asyncio.wait_for(
  331. service.on_post_stop(),
  332. timeout=timeout / 3,
  333. )
  334. duration = (datetime.now() - start_time).total_seconds()
  335. return ShutdownResult(
  336. service_name=name,
  337. success=True,
  338. duration=duration,
  339. )
  340. except asyncio.TimeoutError:
  341. duration = (datetime.now() - start_time).total_seconds()
  342. return ShutdownResult(
  343. service_name=name,
  344. success=False,
  345. duration=duration,
  346. timed_out=True,
  347. )
  348. except Exception as e:
  349. duration = (datetime.now() - start_time).total_seconds()
  350. return ShutdownResult(
  351. service_name=name,
  352. success=False,
  353. duration=duration,
  354. error=e,
  355. )
  356. async def _run_pre_hooks(self) -> list[str]:
  357. """
  358. Führt Pre-Shutdown-Hooks aus.
  359. Returns:
  360. Liste von Fehlermeldungen.
  361. """
  362. errors: list[str] = []
  363. for hook in self._pre_hooks:
  364. try:
  365. await asyncio.wait_for(
  366. hook(),
  367. timeout=self._config.default_timeout,
  368. )
  369. except asyncio.TimeoutError:
  370. errors.append(f"Pre-Hook Timeout: {hook.__name__}")
  371. except Exception as e:
  372. errors.append(f"Pre-Hook Fehler ({hook.__name__}): {e}")
  373. return errors
  374. async def _run_post_hooks(self, report: ShutdownReport) -> list[str]:
  375. """
  376. Führt Post-Shutdown-Hooks aus.
  377. Args:
  378. report: Der Shutdown-Report.
  379. Returns:
  380. Liste von Fehlermeldungen.
  381. """
  382. errors: list[str] = []
  383. for hook in self._post_hooks:
  384. try:
  385. await asyncio.wait_for(
  386. hook(report),
  387. timeout=self._config.default_timeout,
  388. )
  389. except asyncio.TimeoutError:
  390. errors.append(f"Post-Hook Timeout: {hook.__name__}")
  391. except Exception as e:
  392. errors.append(f"Post-Hook Fehler ({hook.__name__}): {e}")
  393. return errors
  394. async def shutdown(
  395. self,
  396. reason: ShutdownReason = ShutdownReason.NORMAL,
  397. ) -> ShutdownReport:
  398. """
  399. Führt einen geordneten Shutdown durch.
  400. Args:
  401. reason: Grund für den Shutdown.
  402. Returns:
  403. Bericht über den Shutdown.
  404. """
  405. async with self._shutdown_lock:
  406. if self.is_shutting_down:
  407. # Bereits im Shutdown
  408. await self._shutdown_event.wait()
  409. return ShutdownReport(
  410. reason=reason,
  411. phase=self._phase,
  412. started_at=datetime.now(),
  413. completed_at=datetime.now(),
  414. errors=["Shutdown bereits in Bearbeitung"],
  415. )
  416. self._reason = reason
  417. self._shutdown_event.clear()
  418. report = ShutdownReport(
  419. reason=reason,
  420. phase=ShutdownPhase.PRE_SHUTDOWN,
  421. started_at=datetime.now(),
  422. )
  423. try:
  424. # Phase 1: Pre-Hooks
  425. self._phase = ShutdownPhase.PRE_SHUTDOWN
  426. hook_errors = await self._run_pre_hooks()
  427. report.errors.extend(hook_errors)
  428. # Phase 2: Services stoppen
  429. self._phase = ShutdownPhase.STOPPING_SERVICES
  430. shutdown_order = self._get_shutdown_order()
  431. if self._config.parallel_shutdown:
  432. # Paralleles Stoppen (nur wenn keine Abhängigkeiten)
  433. tasks = []
  434. for name in shutdown_order:
  435. service = self._services.get(name)
  436. if service and service.state == ServiceState.RUNNING:
  437. timeout = self.get_timeout_for_service(name)
  438. tasks.append(self._stop_service(service, timeout))
  439. if tasks:
  440. results = await asyncio.gather(*tasks, return_exceptions=True)
  441. for result in results:
  442. if isinstance(result, ShutdownResult):
  443. report.results.append(result)
  444. elif isinstance(result, Exception):
  445. report.errors.append(str(result))
  446. else:
  447. # Sequentielles Stoppen
  448. for name in shutdown_order:
  449. service = self._services.get(name)
  450. if service and service.state == ServiceState.RUNNING:
  451. timeout = self.get_timeout_for_service(name)
  452. result = await self._stop_service(service, timeout)
  453. report.results.append(result)
  454. # Phase 3: Post-Hooks
  455. self._phase = ShutdownPhase.POST_SHUTDOWN
  456. post_errors = await self._run_post_hooks(report)
  457. report.errors.extend(post_errors)
  458. # Abschluss
  459. self._phase = ShutdownPhase.COMPLETED
  460. report.phase = ShutdownPhase.COMPLETED
  461. except Exception as e:
  462. self._phase = ShutdownPhase.FAILED
  463. report.phase = ShutdownPhase.FAILED
  464. report.errors.append(f"Kritischer Fehler: {e}")
  465. finally:
  466. report.completed_at = datetime.now()
  467. self._shutdown_event.set()
  468. return report
  469. async def wait_for_shutdown(self) -> None:
  470. """Wartet auf das Ende des Shutdowns."""
  471. await self._shutdown_event.wait()
  472. def request_shutdown(
  473. self,
  474. reason: ShutdownReason = ShutdownReason.MANUAL,
  475. ) -> None:
  476. """
  477. Fordert einen Shutdown an (nicht-blockierend).
  478. Args:
  479. reason: Grund für den Shutdown.
  480. """
  481. asyncio.create_task(self.shutdown(reason))