# -*- coding: utf-8 -*- """ Scheduler Service - IService Wrapper für Scheduler. """ from typing import TYPE_CHECKING from trixy_core.service import IService, ServicePriority, ServiceGroup from trixy_core.scheduler.scheduler import Scheduler, SchedulerConfig from trixy_core.scheduler.job import ScheduledJob, JobConfig, JobResult from trixy_core.scheduler.trigger.base import Trigger from trixy_core.scheduler.condition.base import Condition from trixy_core.scheduler.action.base import Action if TYPE_CHECKING: from trixy_core.application import IApplication from trixy_core.events import EventManager class SchedulerService(IService): """ Scheduler als IService für Integration in ServiceContainer. Beispiel: ```python scheduler_service = SchedulerService(app, config) container.register_instance(scheduler_service) ``` """ PRIORITY = ServicePriority.MANAGER GROUP = ServiceGroup.SCHEDULER DEPENDENCIES: list[str] = [] def __init__( self, application: "IApplication", config: SchedulerConfig | None = None, ): """ Initialisiert den Scheduler-Service. Args: application: Referenz zur Hauptanwendung config: Scheduler-Konfiguration """ super().__init__(application) self._config = config or SchedulerConfig() self._scheduler: Scheduler | None = None self._event_manager: "EventManager | None" = None @property def scheduler(self) -> Scheduler | None: """Scheduler-Instanz.""" return self._scheduler # === IService Lifecycle === async def start(self) -> None: """Startet den Service.""" pass async def stop(self) -> None: """Stoppt den Service.""" pass async def on_pre_start(self) -> None: """Vorbereitung vor Start.""" self._event_manager = self._application.events self._scheduler = Scheduler( config=self._config, event_manager=self._event_manager, ) async def on_post_start(self) -> None: """Nach Start - Scheduler starten.""" if self._scheduler: # Applikation und Manager in Kontext injizieren self._scheduler.set_context("application", self._application) self._scheduler.set_context("event_manager", self._event_manager) satellites = getattr(self._application, "satellites", None) if satellites is not None: self._scheduler.set_context("satellite_manager", satellites) await self._scheduler.start() # Events abonnieren für EventTrigger if self._event_manager: @self._event_manager.on("*") async def on_any_event(event_name: str, event_data: dict) -> None: if self._scheduler: await self._scheduler.on_event(event_name, event_data) async def on_pre_stop(self) -> None: """Vor Stop - Scheduler stoppen.""" if self._scheduler: await self._scheduler.stop() # === Job-Verwaltung (Proxies) === def add_job( self, job: ScheduledJob, save: bool | None = None, ) -> str: """Fügt Job hinzu.""" if not self._scheduler: raise RuntimeError("Scheduler nicht initialisiert") return self._scheduler.add_job(job, save) def create_job( self, trigger: Trigger, actions: list[Action] | Action, conditions: list[Condition] | None = None, config: JobConfig | None = None, job_id: str | None = None, save: bool | None = None, ) -> ScheduledJob: """Erstellt und fügt Job hinzu.""" if not self._scheduler: raise RuntimeError("Scheduler nicht initialisiert") return self._scheduler.create_job( trigger=trigger, actions=actions, conditions=conditions, config=config, job_id=job_id, save=save, ) def get_job(self, job_id: str) -> ScheduledJob | None: """Holt Job nach ID.""" if self._scheduler: return self._scheduler.get_job(job_id) return None def remove_job(self, job_id: str, delete_file: bool = True) -> bool: """Entfernt Job.""" if self._scheduler: return self._scheduler.remove_job(job_id, delete_file) return False def enable_job(self, job_id: str) -> bool: """Aktiviert Job.""" if self._scheduler: return self._scheduler.enable_job(job_id) return False def disable_job(self, job_id: str) -> bool: """Deaktiviert Job.""" if self._scheduler: return self._scheduler.disable_job(job_id) return False @property def jobs(self) -> dict[str, ScheduledJob]: """Alle Jobs.""" if self._scheduler: return self._scheduler.jobs return {} @property def job_count(self) -> int: """Anzahl Jobs.""" if self._scheduler: return self._scheduler.job_count return 0 # === Context === def set_context(self, key: str, value) -> None: """Setzt Kontext-Variable.""" if self._scheduler: self._scheduler.set_context(key, value) def update_context(self, data: dict) -> None: """Aktualisiert Kontext.""" if self._scheduler: self._scheduler.update_context(data) # === Statistics === def get_statistics(self) -> dict: """Gibt Scheduler-Statistiken zurück.""" if self._scheduler: return self._scheduler.get_statistics() return {}