| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # -*- coding: utf-8 -*-
- """
- Scheduler Service - IService Wrapper für Scheduler.
- """
- from typing import TYPE_CHECKING
- from trixy_core.service import IService, ServicePriority, ServiceGroup
- from trixy_core.scheduler.scheduler import Scheduler, SchedulerConfig
- from trixy_core.scheduler.job import ScheduledJob, JobConfig, JobResult
- from trixy_core.scheduler.trigger.base import Trigger
- from trixy_core.scheduler.condition.base import Condition
- from trixy_core.scheduler.action.base import Action
- if TYPE_CHECKING:
- from trixy_core.application import IApplication
- from trixy_core.events import EventManager
- class SchedulerService(IService):
- """
- Scheduler als IService für Integration in ServiceContainer.
- Beispiel:
- ```python
- scheduler_service = SchedulerService(app, config)
- container.register_instance(scheduler_service)
- ```
- """
- PRIORITY = ServicePriority.MANAGER
- GROUP = ServiceGroup.SCHEDULER
- DEPENDENCIES: list[str] = []
- def __init__(
- self,
- application: "IApplication",
- config: SchedulerConfig | None = None,
- ):
- """
- Initialisiert den Scheduler-Service.
- Args:
- application: Referenz zur Hauptanwendung
- config: Scheduler-Konfiguration
- """
- super().__init__(application)
- self._config = config or SchedulerConfig()
- self._scheduler: Scheduler | None = None
- self._event_manager: "EventManager | None" = None
- @property
- def scheduler(self) -> Scheduler | None:
- """Scheduler-Instanz."""
- return self._scheduler
- # === IService Lifecycle ===
- async def start(self) -> None:
- """Startet den Service."""
- pass
- async def stop(self) -> None:
- """Stoppt den Service."""
- pass
- async def on_pre_start(self) -> None:
- """Vorbereitung vor Start."""
- self._event_manager = self._application.events
- self._scheduler = Scheduler(
- config=self._config,
- event_manager=self._event_manager,
- )
- async def on_post_start(self) -> None:
- """Nach Start - Scheduler starten."""
- if self._scheduler:
- # Applikation und Manager in Kontext injizieren
- self._scheduler.set_context("application", self._application)
- self._scheduler.set_context("event_manager", self._event_manager)
- satellites = getattr(self._application, "satellites", None)
- if satellites is not None:
- self._scheduler.set_context("satellite_manager", satellites)
- await self._scheduler.start()
- # Events abonnieren für EventTrigger
- if self._event_manager:
- @self._event_manager.on("*")
- async def on_any_event(event_name: str, event_data: dict) -> None:
- if self._scheduler:
- await self._scheduler.on_event(event_name, event_data)
- async def on_pre_stop(self) -> None:
- """Vor Stop - Scheduler stoppen."""
- if self._scheduler:
- await self._scheduler.stop()
- # === Job-Verwaltung (Proxies) ===
- def add_job(
- self,
- job: ScheduledJob,
- save: bool | None = None,
- ) -> str:
- """Fügt Job hinzu."""
- if not self._scheduler:
- raise RuntimeError("Scheduler nicht initialisiert")
- return self._scheduler.add_job(job, save)
- def create_job(
- self,
- trigger: Trigger,
- actions: list[Action] | Action,
- conditions: list[Condition] | None = None,
- config: JobConfig | None = None,
- job_id: str | None = None,
- save: bool | None = None,
- ) -> ScheduledJob:
- """Erstellt und fügt Job hinzu."""
- if not self._scheduler:
- raise RuntimeError("Scheduler nicht initialisiert")
- return self._scheduler.create_job(
- trigger=trigger,
- actions=actions,
- conditions=conditions,
- config=config,
- job_id=job_id,
- save=save,
- )
- def get_job(self, job_id: str) -> ScheduledJob | None:
- """Holt Job nach ID."""
- if self._scheduler:
- return self._scheduler.get_job(job_id)
- return None
- def remove_job(self, job_id: str, delete_file: bool = True) -> bool:
- """Entfernt Job."""
- if self._scheduler:
- return self._scheduler.remove_job(job_id, delete_file)
- return False
- def enable_job(self, job_id: str) -> bool:
- """Aktiviert Job."""
- if self._scheduler:
- return self._scheduler.enable_job(job_id)
- return False
- def disable_job(self, job_id: str) -> bool:
- """Deaktiviert Job."""
- if self._scheduler:
- return self._scheduler.disable_job(job_id)
- return False
- @property
- def jobs(self) -> dict[str, ScheduledJob]:
- """Alle Jobs."""
- if self._scheduler:
- return self._scheduler.jobs
- return {}
- @property
- def job_count(self) -> int:
- """Anzahl Jobs."""
- if self._scheduler:
- return self._scheduler.job_count
- return 0
- # === Context ===
- def set_context(self, key: str, value) -> None:
- """Setzt Kontext-Variable."""
- if self._scheduler:
- self._scheduler.set_context(key, value)
- def update_context(self, data: dict) -> None:
- """Aktualisiert Kontext."""
- if self._scheduler:
- self._scheduler.update_context(data)
- # === Statistics ===
- def get_statistics(self) -> dict:
- """Gibt Scheduler-Statistiken zurück."""
- if self._scheduler:
- return self._scheduler.get_statistics()
- return {}
|