| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 |
- # -*- coding: utf-8 -*-
- """
- Graceful Shutdown Manager für geordnetes Herunterfahren.
- Ermöglicht das kontrollierte Stoppen von Services mit Timeouts
- und Abhängigkeitsberücksichtigung.
- """
- from __future__ import annotations
- import asyncio
- import signal
- import sys
- from dataclasses import dataclass, field
- from datetime import datetime
- from enum import Enum, auto
- from typing import TYPE_CHECKING, Any, Callable, Coroutine
- from trixy_core.service.enums import ServicePriority, ServiceState
- if TYPE_CHECKING:
- from trixy_core.service.iservice import IService
- class ShutdownPhase(Enum):
- """
- Phasen des Shutdown-Prozesses.
- """
- NOT_STARTED = auto()
- """Shutdown noch nicht gestartet."""
- PRE_SHUTDOWN = auto()
- """Vorbereitungsphase - Hooks werden ausgeführt."""
- STOPPING_SERVICES = auto()
- """Services werden gestoppt."""
- POST_SHUTDOWN = auto()
- """Aufräumphase - Finale Hooks."""
- COMPLETED = auto()
- """Shutdown abgeschlossen."""
- FAILED = auto()
- """Shutdown mit Fehlern beendet."""
- class ShutdownReason(Enum):
- """
- Grund für den Shutdown.
- """
- NORMAL = auto()
- """Normaler Shutdown durch Benutzeranforderung."""
- SIGNAL = auto()
- """Shutdown durch Signal (SIGTERM, SIGINT)."""
- ERROR = auto()
- """Shutdown durch kritischen Fehler."""
- TIMEOUT = auto()
- """Shutdown durch Timeout."""
- MANUAL = auto()
- """Manueller Shutdown durch API."""
- @dataclass
- class ShutdownResult:
- """
- Ergebnis eines Service-Shutdowns.
- """
- service_name: str
- """Name des Services."""
- success: bool
- """Ob der Shutdown erfolgreich war."""
- duration: float
- """Dauer in Sekunden."""
- error: Exception | None = None
- """Fehler, falls aufgetreten."""
- timed_out: bool = False
- """Ob ein Timeout aufgetreten ist."""
- def __str__(self) -> str:
- """String-Repräsentation."""
- status = "OK" if self.success else "FAILED"
- if self.timed_out:
- status = "TIMEOUT"
- return f"{self.service_name}: {status} ({self.duration:.3f}s)"
- @dataclass
- class ShutdownReport:
- """
- Gesamtbericht des Shutdown-Prozesses.
- """
- reason: ShutdownReason
- """Grund für den Shutdown."""
- phase: ShutdownPhase
- """Endphase des Shutdowns."""
- started_at: datetime
- """Startzeitpunkt."""
- completed_at: datetime | None = None
- """Endzeitpunkt."""
- results: list[ShutdownResult] = field(default_factory=list)
- """Ergebnisse pro Service."""
- errors: list[str] = field(default_factory=list)
- """Allgemeine Fehler."""
- @property
- def duration(self) -> float:
- """Gesamtdauer in Sekunden."""
- if self.completed_at is None:
- return 0.0
- return (self.completed_at - self.started_at).total_seconds()
- @property
- def success(self) -> bool:
- """Ob der gesamte Shutdown erfolgreich war."""
- return (
- self.phase == ShutdownPhase.COMPLETED
- and all(r.success for r in self.results)
- )
- @property
- def failed_services(self) -> list[str]:
- """Liste der fehlgeschlagenen Services."""
- return [r.service_name for r in self.results if not r.success]
- def to_dict(self) -> dict[str, Any]:
- """Konvertiert in ein Dictionary."""
- return {
- "reason": self.reason.name,
- "phase": self.phase.name,
- "success": self.success,
- "duration": self.duration,
- "started_at": self.started_at.isoformat(),
- "completed_at": self.completed_at.isoformat() if self.completed_at else None,
- "services": [
- {
- "name": r.service_name,
- "success": r.success,
- "duration": r.duration,
- "timed_out": r.timed_out,
- "error": str(r.error) if r.error else None,
- }
- for r in self.results
- ],
- "failed_services": self.failed_services,
- "errors": self.errors,
- }
- @dataclass
- class ShutdownConfig:
- """
- Konfiguration für den Shutdown-Manager.
- """
- default_timeout: float = 30.0
- """Standard-Timeout pro Service in Sekunden."""
- total_timeout: float = 120.0
- """Maximale Gesamtzeit für Shutdown."""
- force_after_timeout: bool = True
- """Ob nach Timeout forciert wird."""
- parallel_shutdown: bool = False
- """Ob Services parallel gestoppt werden (wenn möglich)."""
- respect_dependencies: bool = True
- """Ob Abhängigkeiten beim Shutdown berücksichtigt werden."""
- service_timeouts: dict[str, float] = field(default_factory=dict)
- """Individuelle Timeouts pro Service."""
- register_signals: bool = True
- """Ob Signal-Handler registriert werden."""
- class GracefulShutdownManager:
- """
- Manager für geordnetes Herunterfahren von Services.
- Stoppt Services in umgekehrter Startreihenfolge unter
- Berücksichtigung von Abhängigkeiten und Timeouts.
- Example:
- manager = GracefulShutdownManager(service_container)
- # Normaler Shutdown
- report = await manager.shutdown()
- # Mit Hooks
- manager.add_pre_shutdown_hook(save_state)
- manager.add_post_shutdown_hook(cleanup)
- # Signal-Handler automatisch registrieren
- manager.register_signal_handlers()
- """
- def __init__(
- self,
- services: dict[str, "IService"] | None = None,
- config: ShutdownConfig | None = None,
- ) -> None:
- """
- Initialisiert den Shutdown-Manager.
- Args:
- services: Dictionary der Services (Name → Service).
- config: Shutdown-Konfiguration.
- """
- self._services = services or {}
- self._config = config or ShutdownConfig()
- self._phase = ShutdownPhase.NOT_STARTED
- self._reason: ShutdownReason | None = None
- self._pre_hooks: list[Callable[[], Coroutine[Any, Any, None]]] = []
- self._post_hooks: list[Callable[[ShutdownReport], Coroutine[Any, Any, None]]] = []
- self._shutdown_lock = asyncio.Lock()
- self._shutdown_event = asyncio.Event()
- if self._config.register_signals:
- self._register_default_signals()
- @property
- def phase(self) -> ShutdownPhase:
- """Aktuelle Shutdown-Phase."""
- return self._phase
- @property
- def is_shutting_down(self) -> bool:
- """Prüft, ob ein Shutdown läuft."""
- return self._phase not in (
- ShutdownPhase.NOT_STARTED,
- ShutdownPhase.COMPLETED,
- ShutdownPhase.FAILED,
- )
- def set_services(self, services: dict[str, "IService"]) -> None:
- """
- Setzt die zu verwaltenden Services.
- Args:
- services: Dictionary der Services.
- """
- self._services = services
- def add_pre_shutdown_hook(
- self,
- hook: Callable[[], Coroutine[Any, Any, None]],
- ) -> None:
- """
- Fügt einen Pre-Shutdown-Hook hinzu.
- Args:
- hook: Async-Funktion die vor dem Shutdown ausgeführt wird.
- """
- self._pre_hooks.append(hook)
- def add_post_shutdown_hook(
- self,
- hook: Callable[[ShutdownReport], Coroutine[Any, Any, None]],
- ) -> None:
- """
- Fügt einen Post-Shutdown-Hook hinzu.
- Args:
- hook: Async-Funktion die nach dem Shutdown ausgeführt wird.
- """
- self._post_hooks.append(hook)
- def get_timeout_for_service(self, service_name: str) -> float:
- """
- Gibt den Timeout für einen Service zurück.
- Args:
- service_name: Name des Services.
- Returns:
- Timeout in Sekunden.
- """
- return self._config.service_timeouts.get(
- service_name,
- self._config.default_timeout,
- )
- def _register_default_signals(self) -> None:
- """Registriert Standard-Signal-Handler."""
- try:
- loop = asyncio.get_running_loop()
- except RuntimeError:
- # Kein Event-Loop läuft - Handler später registrieren
- return
- if sys.platform == "win32":
- # Windows: add_signal_handler nicht verfügbar
- for sig in (signal.SIGINT, signal.SIGTERM):
- signal.signal(sig, lambda s, f, _loop=loop: _loop.call_soon_threadsafe(
- lambda _s=s: asyncio.create_task(self._signal_handler(_s))
- ))
- else:
- # Unix-Signale
- for sig in (signal.SIGTERM, signal.SIGINT):
- loop.add_signal_handler(
- sig,
- lambda s=sig: asyncio.create_task(
- self._signal_handler(s)
- ),
- )
- async def _signal_handler(self, sig: signal.Signals) -> None:
- """
- Handler für System-Signale.
- Args:
- sig: Empfangenes Signal.
- """
- await self.shutdown(ShutdownReason.SIGNAL)
- def _get_shutdown_order(self) -> list[str]:
- """
- Berechnet die Shutdown-Reihenfolge.
- Returns:
- Liste von Service-Namen in Shutdown-Reihenfolge.
- """
- if not self._config.respect_dependencies:
- # Einfach nach Priorität sortieren (umgekehrt)
- return sorted(
- self._services.keys(),
- key=lambda n: (
- -self._services[n].PRIORITY, # Höhere Priorität zuerst stoppen
- n,
- ),
- )
- # Topologische Sortierung mit Abhängigkeiten
- # Services ohne Abhängige zuerst stoppen
- dependents: dict[str, set[str]] = {name: set() for name in self._services}
- for name, service in self._services.items():
- for dep in service.DEPENDENCIES:
- if dep in dependents:
- dependents[dep].add(name)
- result: list[str] = []
- remaining = set(self._services.keys())
- while remaining:
- # Finde Services ohne laufende Abhängige
- can_stop = [
- name
- for name in remaining
- if not (dependents[name] & remaining)
- ]
- if not can_stop:
- # Zyklische Abhängigkeit - Rest nach Priorität
- can_stop = sorted(
- remaining,
- key=lambda n: (-self._services[n].PRIORITY, n),
- )
- # Nach Priorität sortieren (höhere zuerst)
- can_stop.sort(key=lambda n: (-self._services[n].PRIORITY, n))
- for name in can_stop:
- result.append(name)
- remaining.discard(name)
- return result
- async def _stop_service(
- self,
- service: "IService",
- timeout: float,
- ) -> ShutdownResult:
- """
- Stoppt einen einzelnen Service.
- Args:
- service: Der zu stoppende Service.
- timeout: Timeout in Sekunden.
- Returns:
- Ergebnis des Shutdowns.
- """
- start_time = datetime.now()
- name = service.name
- try:
- # Pre-Stop Hook
- await asyncio.wait_for(
- service.on_pre_stop(),
- timeout=timeout / 3,
- )
- # Stop
- await asyncio.wait_for(
- service.stop(),
- timeout=timeout / 3,
- )
- # Post-Stop Hook
- await asyncio.wait_for(
- service.on_post_stop(),
- timeout=timeout / 3,
- )
- duration = (datetime.now() - start_time).total_seconds()
- return ShutdownResult(
- service_name=name,
- success=True,
- duration=duration,
- )
- except asyncio.TimeoutError:
- duration = (datetime.now() - start_time).total_seconds()
- return ShutdownResult(
- service_name=name,
- success=False,
- duration=duration,
- timed_out=True,
- )
- except Exception as e:
- duration = (datetime.now() - start_time).total_seconds()
- return ShutdownResult(
- service_name=name,
- success=False,
- duration=duration,
- error=e,
- )
- async def _run_pre_hooks(self) -> list[str]:
- """
- Führt Pre-Shutdown-Hooks aus.
- Returns:
- Liste von Fehlermeldungen.
- """
- errors: list[str] = []
- for hook in self._pre_hooks:
- try:
- await asyncio.wait_for(
- hook(),
- timeout=self._config.default_timeout,
- )
- except asyncio.TimeoutError:
- errors.append(f"Pre-Hook Timeout: {hook.__name__}")
- except Exception as e:
- errors.append(f"Pre-Hook Fehler ({hook.__name__}): {e}")
- return errors
- async def _run_post_hooks(self, report: ShutdownReport) -> list[str]:
- """
- Führt Post-Shutdown-Hooks aus.
- Args:
- report: Der Shutdown-Report.
- Returns:
- Liste von Fehlermeldungen.
- """
- errors: list[str] = []
- for hook in self._post_hooks:
- try:
- await asyncio.wait_for(
- hook(report),
- timeout=self._config.default_timeout,
- )
- except asyncio.TimeoutError:
- errors.append(f"Post-Hook Timeout: {hook.__name__}")
- except Exception as e:
- errors.append(f"Post-Hook Fehler ({hook.__name__}): {e}")
- return errors
- async def shutdown(
- self,
- reason: ShutdownReason = ShutdownReason.NORMAL,
- ) -> ShutdownReport:
- """
- Führt einen geordneten Shutdown durch.
- Args:
- reason: Grund für den Shutdown.
- Returns:
- Bericht über den Shutdown.
- """
- async with self._shutdown_lock:
- if self.is_shutting_down:
- # Bereits im Shutdown
- await self._shutdown_event.wait()
- return ShutdownReport(
- reason=reason,
- phase=self._phase,
- started_at=datetime.now(),
- completed_at=datetime.now(),
- errors=["Shutdown bereits in Bearbeitung"],
- )
- self._reason = reason
- self._shutdown_event.clear()
- report = ShutdownReport(
- reason=reason,
- phase=ShutdownPhase.PRE_SHUTDOWN,
- started_at=datetime.now(),
- )
- try:
- # Phase 1: Pre-Hooks
- self._phase = ShutdownPhase.PRE_SHUTDOWN
- hook_errors = await self._run_pre_hooks()
- report.errors.extend(hook_errors)
- # Phase 2: Services stoppen
- self._phase = ShutdownPhase.STOPPING_SERVICES
- shutdown_order = self._get_shutdown_order()
- if self._config.parallel_shutdown:
- # Paralleles Stoppen (nur wenn keine Abhängigkeiten)
- tasks = []
- for name in shutdown_order:
- service = self._services.get(name)
- if service and service.state == ServiceState.RUNNING:
- timeout = self.get_timeout_for_service(name)
- tasks.append(self._stop_service(service, timeout))
- if tasks:
- results = await asyncio.gather(*tasks, return_exceptions=True)
- for result in results:
- if isinstance(result, ShutdownResult):
- report.results.append(result)
- elif isinstance(result, Exception):
- report.errors.append(str(result))
- else:
- # Sequentielles Stoppen
- for name in shutdown_order:
- service = self._services.get(name)
- if service and service.state == ServiceState.RUNNING:
- timeout = self.get_timeout_for_service(name)
- result = await self._stop_service(service, timeout)
- report.results.append(result)
- # Phase 3: Post-Hooks
- self._phase = ShutdownPhase.POST_SHUTDOWN
- post_errors = await self._run_post_hooks(report)
- report.errors.extend(post_errors)
- # Abschluss
- self._phase = ShutdownPhase.COMPLETED
- report.phase = ShutdownPhase.COMPLETED
- except Exception as e:
- self._phase = ShutdownPhase.FAILED
- report.phase = ShutdownPhase.FAILED
- report.errors.append(f"Kritischer Fehler: {e}")
- finally:
- report.completed_at = datetime.now()
- self._shutdown_event.set()
- return report
- async def wait_for_shutdown(self) -> None:
- """Wartet auf das Ende des Shutdowns."""
- await self._shutdown_event.wait()
- def request_shutdown(
- self,
- reason: ShutdownReason = ShutdownReason.MANUAL,
- ) -> None:
- """
- Fordert einen Shutdown an (nicht-blockierend).
- Args:
- reason: Grund für den Shutdown.
- """
- asyncio.create_task(self.shutdown(reason))
|