# -*- coding: utf-8 -*- """ Scheduler - Hauptklasse für Job-Scheduling. """ import asyncio import json import os from dataclasses import dataclass, field from datetime import datetime, timedelta from pathlib import Path from typing import Any, Callable, TYPE_CHECKING from trixy_core.scheduler.job import ScheduledJob, JobConfig, JobState, JobResult from trixy_core.scheduler.trigger.base import Trigger, TriggerContext from trixy_core.scheduler.trigger.cron import CronTrigger from trixy_core.scheduler.trigger.interval import IntervalTrigger from trixy_core.scheduler.trigger.datetime_trigger import DateTimeTrigger from trixy_core.scheduler.trigger.event import EventTrigger from trixy_core.scheduler.trigger.composite import AndTrigger, OrTrigger from trixy_core.scheduler.condition.base import Condition from trixy_core.scheduler.condition.time import ( TimeRangeCondition, DayOfWeekCondition, DateRangeCondition, ) from trixy_core.scheduler.condition.activity import ( RecentActivityCondition, ActivityCountCondition, ) from trixy_core.scheduler.condition.sensor import ( ComparisonOperator, SensorThresholdCondition, SensorChangeCondition, ) from trixy_core.scheduler.condition.state import ( StateCondition, BooleanCondition, ExistsCondition, ) from trixy_core.scheduler.condition.composite import ( AndCondition, OrCondition, NotCondition, XorCondition, ) from trixy_core.scheduler.condition.script import ( ScriptCondition, AsyncScriptCondition, ) from trixy_core.scheduler.condition.file_value import ( JsonValueCondition, IniValueCondition, YamlValueCondition, TomlValueCondition, EnvFileValueCondition, ) from trixy_core.scheduler.condition.expression import ( ExpressionCondition, LambdaCondition, ) from trixy_core.scheduler.action.base import Action, ActionResult from trixy_core.scheduler.action.event import EmitEventAction, MultiEventAction from trixy_core.scheduler.action.callback import CallbackAction, MethodAction from trixy_core.scheduler.action.command import CommandAction, ScriptAction from trixy_core.scheduler.action.tts import TTSSpeakAction if TYPE_CHECKING: from trixy_core.events import EventManager @dataclass class SchedulerConfig: """Konfiguration für den Scheduler.""" # Intervalle tick_interval_seconds: float = 1.0 # Prüfintervall cleanup_interval_seconds: float = 3600 # History-Cleanup # Job-Persistierung jobs_directory: str = "./cron" # Verzeichnis für Job-Dateien auto_load: bool = True # Jobs beim Start laden auto_save: bool = True # Jobs bei Änderungen speichern # Limits max_concurrent_jobs: int = 10 # Maximale parallele Jobs max_history_per_job: int = 100 # History-Einträge pro Job # Defaults für Jobs default_timeout_seconds: float = 60.0 default_max_retries: int = 3 # Event-Integration emit_job_events: bool = True # Events bei Job-Ausführung emittieren class Scheduler: """ Scheduler für Job-Ausführung. Lädt und speichert Jobs als JSON-Dateien in ./cron/*.json. """ # Trigger-Registry für Deserialisierung TRIGGER_TYPES: dict[str, type] = { "CronTrigger": CronTrigger, "IntervalTrigger": IntervalTrigger, "DateTimeTrigger": DateTimeTrigger, "EventTrigger": EventTrigger, "AndTrigger": AndTrigger, "OrTrigger": OrTrigger, } # Condition-Registry für Deserialisierung CONDITION_TYPES: dict[str, type] = { "TimeRangeCondition": TimeRangeCondition, "DayOfWeekCondition": DayOfWeekCondition, "DateRangeCondition": DateRangeCondition, "RecentActivityCondition": RecentActivityCondition, "ActivityCountCondition": ActivityCountCondition, "SensorThresholdCondition": SensorThresholdCondition, "SensorChangeCondition": SensorChangeCondition, "StateCondition": StateCondition, "BooleanCondition": BooleanCondition, "ExistsCondition": ExistsCondition, "AndCondition": AndCondition, "OrCondition": OrCondition, "NotCondition": NotCondition, "XorCondition": XorCondition, # Script "ScriptCondition": ScriptCondition, "AsyncScriptCondition": AsyncScriptCondition, # File Value "JsonValueCondition": JsonValueCondition, "IniValueCondition": IniValueCondition, "YamlValueCondition": YamlValueCondition, "TomlValueCondition": TomlValueCondition, "EnvFileValueCondition": EnvFileValueCondition, # Expression "ExpressionCondition": ExpressionCondition, # LambdaCondition kann nicht aus JSON geladen werden } # Action-Registry für Deserialisierung ACTION_TYPES: dict[str, type] = { "EmitEventAction": EmitEventAction, "MultiEventAction": MultiEventAction, "CommandAction": CommandAction, "ScriptAction": ScriptAction, "TTSSpeakAction": TTSSpeakAction, # CallbackAction und MethodAction können nicht aus JSON geladen werden } def __init__( self, config: SchedulerConfig | None = None, event_manager: "EventManager | None" = None, ): """ Initialisiert den Scheduler. Args: config: Scheduler-Konfiguration event_manager: EventManager für Events """ self._config = config or SchedulerConfig() self._event_manager = event_manager # Jobs self._jobs: dict[str, ScheduledJob] = {} self._running_jobs: set[str] = set() # Zustand self._running = False self._task: asyncio.Task | None = None self._context: dict[str, Any] = {} # Callbacks self._job_callbacks: list[Callable[[ScheduledJob, JobResult], None]] = [] # Verzeichnis erstellen self._ensure_directory() def _ensure_directory(self) -> None: """Stellt sicher, dass das Jobs-Verzeichnis existiert.""" jobs_dir = Path(self._config.jobs_directory) jobs_dir.mkdir(parents=True, exist_ok=True) @property def config(self) -> SchedulerConfig: """Scheduler-Konfiguration.""" return self._config @property def is_running(self) -> bool: """Prüft ob Scheduler läuft.""" return self._running @property def jobs(self) -> dict[str, ScheduledJob]: """Alle Jobs.""" return dict(self._jobs) @property def job_count(self) -> int: """Anzahl Jobs.""" return len(self._jobs) @property def running_jobs(self) -> set[str]: """Aktuell laufende Jobs.""" return self._running_jobs.copy() @property def context(self) -> dict[str, Any]: """Aktueller Kontext.""" return dict(self._context) # === Context Management === def set_context(self, key: str, value: Any) -> None: """Setzt Kontext-Variable.""" self._context[key] = value def update_context(self, data: dict[str, Any]) -> None: """Aktualisiert Kontext.""" self._context.update(data) def clear_context(self) -> None: """Löscht Kontext.""" self._context.clear() # === Job Management === def add_job( self, job: ScheduledJob, save: bool | None = None, ) -> str: """ Fügt Job hinzu. Args: job: ScheduledJob save: Als JSON speichern (default: auto_save) Returns: Job-ID """ self._jobs[job.job_id] = job # Speichern if save is None: save = self._config.auto_save if save: self._save_job(job) return job.job_id def create_job( self, trigger: Trigger, actions: list[Action] | Action, conditions: list[Condition] | None = None, config: JobConfig | None = None, job_id: str | None = None, save: bool | None = None, ) -> ScheduledJob: """ Erstellt und fügt Job hinzu. Args: trigger: Trigger für den Job actions: Action(s) für den Job conditions: Optionale Conditions config: Job-Konfiguration job_id: Job-ID (wird generiert wenn None) save: Als JSON speichern Returns: ScheduledJob """ if isinstance(actions, Action): actions = [actions] job = ScheduledJob( job_id=job_id, trigger=trigger, conditions=conditions, actions=actions, config=config, ) self.add_job(job, save=save) return job def get_job(self, job_id: str) -> ScheduledJob | None: """Holt Job nach ID.""" return self._jobs.get(job_id) def remove_job( self, job_id: str, delete_file: bool = True, ) -> bool: """ Entfernt Job. Args: job_id: Job-ID delete_file: JSON-Datei löschen Returns: True wenn entfernt """ if job_id not in self._jobs: return False del self._jobs[job_id] if delete_file: self._delete_job_file(job_id) return True def enable_job(self, job_id: str) -> bool: """Aktiviert Job.""" job = self._jobs.get(job_id) if job: job.enable() if self._config.auto_save: self._save_job(job) return True return False def disable_job(self, job_id: str) -> bool: """Deaktiviert Job.""" job = self._jobs.get(job_id) if job: job.disable() if self._config.auto_save: self._save_job(job) return True return False # === Callbacks === def on_job_executed( self, callback: Callable[[ScheduledJob, JobResult], None], ) -> None: """Registriert Callback für Job-Ausführung.""" self._job_callbacks.append(callback) # === Persistence === def _get_job_path(self, job_id: str) -> Path: """Gibt Pfad zur Job-Datei zurück.""" # Dateiname: job_id.json (Sonderzeichen ersetzen) safe_id = "".join(c if c.isalnum() or c in "-_" else "_" for c in job_id) return Path(self._config.jobs_directory) / f"{safe_id}.json" def _save_job(self, job: ScheduledJob) -> bool: """ Speichert Job als JSON-Datei. Args: job: Zu speichernder Job Returns: True wenn erfolgreich """ try: path = self._get_job_path(job.job_id) data = job.to_dict() with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) return True except Exception as e: from trixy_core.utils.debug import perror perror(f"Fehler beim Speichern von Job {job.job_id}: {e}") return False def _delete_job_file(self, job_id: str) -> bool: """Löscht Job-Datei.""" try: path = self._get_job_path(job_id) if path.exists(): path.unlink() return True except Exception: return False def _load_job_from_dict(self, data: dict) -> ScheduledJob | None: """ Lädt Job aus Dictionary. Args: data: Job-Daten Returns: ScheduledJob oder None bei Fehler """ try: # Trigger laden trigger = None if data.get("trigger"): trigger = self._load_trigger(data["trigger"]) # Conditions laden conditions = [] for cond_data in data.get("conditions", []): condition = self._load_condition(cond_data) if condition: conditions.append(condition) # Actions laden actions = [] for action_data in data.get("actions", []): action = self._load_action(action_data) if action: actions.append(action) # Config laden config_data = data.get("config", {}) config = JobConfig( name=data.get("name", ""), description=data.get("description", ""), max_runs=config_data.get("max_runs", 0), max_failures=config_data.get("max_failures", 3), timeout_seconds=config_data.get("timeout_seconds", 60.0), priority=config_data.get("priority", 0), tags=config_data.get("tags", []), enabled=data.get("enabled", True), ) # Job erstellen job = ScheduledJob( job_id=data.get("job_id"), trigger=trigger, conditions=conditions, actions=actions, config=config, ) return job except Exception as e: from trixy_core.utils.debug import perror perror(f"Fehler beim Laden von Job: {e}") return None def _load_trigger(self, data: dict) -> Trigger | None: """Lädt Trigger aus Dictionary.""" trigger_type = data.get("type") trigger_class = self.TRIGGER_TYPES.get(trigger_type) if not trigger_class: return None if hasattr(trigger_class, "from_dict"): return trigger_class.from_dict(data) return None def _load_condition(self, data: dict) -> Condition | None: """Lädt Condition aus Dictionary.""" cond_type = data.get("type") cond_class = self.CONDITION_TYPES.get(cond_type) if not cond_class: return None if hasattr(cond_class, "from_dict"): return cond_class.from_dict(data) return None def _load_action(self, data: dict) -> Action | None: """Lädt Action aus Dictionary.""" action_type = data.get("type") action_class = self.ACTION_TYPES.get(action_type) if not action_class: return None if hasattr(action_class, "from_dict"): return action_class.from_dict(data) return None def load_jobs(self) -> int: """ Lädt alle Jobs aus dem Jobs-Verzeichnis. Returns: Anzahl geladener Jobs """ jobs_dir = Path(self._config.jobs_directory) loaded = 0 if not jobs_dir.exists(): return 0 for file_path in jobs_dir.glob("*.json"): try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) job = self._load_job_from_dict(data) if job: self._jobs[job.job_id] = job loaded += 1 except Exception as e: from trixy_core.utils.debug import perror perror(f"Fehler beim Laden von {file_path}: {e}") return loaded def save_all_jobs(self) -> int: """ Speichert alle Jobs. Returns: Anzahl gespeicherter Jobs """ saved = 0 for job in self._jobs.values(): if self._save_job(job): saved += 1 return saved # === Execution === async def start(self) -> None: """Startet den Scheduler.""" if self._running: return # Jobs laden if self._config.auto_load: loaded = self.load_jobs() from trixy_core.utils.debug import pinfo pinfo(f"Scheduler: {loaded} Jobs geladen") self._running = True self._task = asyncio.create_task(self._run_loop()) async def stop(self) -> None: """Stoppt den Scheduler.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None # Laufende Jobs abwarten while self._running_jobs: await asyncio.sleep(0.1) async def _run_loop(self) -> None: """Hauptschleife des Schedulers.""" while self._running: try: await self._tick() except Exception as e: from trixy_core.utils.debug import perror perror(f"Scheduler-Fehler: {e}") await asyncio.sleep(self._config.tick_interval_seconds) async def _tick(self) -> None: """Ein Tick des Schedulers.""" now = datetime.now() # Jobs prüfen for job in list(self._jobs.values()): if not job.is_runnable: continue if job.job_id in self._running_jobs: continue # Max concurrent prüfen if len(self._running_jobs) >= self._config.max_concurrent_jobs: break # Trigger prüfen — last_fire_time aus dem Trigger selbst uebergeben trigger = job.trigger if not trigger: continue context = TriggerContext( current_time=now, last_fire_time=trigger.last_fire_time, ) if trigger.should_fire(context): # Conditions prüfen if job.should_run_now(self._context): asyncio.create_task(self._execute_job(job)) async def _execute_job(self, job: ScheduledJob) -> None: """Führt einen Job aus.""" self._running_jobs.add(job.job_id) start_time = datetime.now() try: # Job-Status setzen job._state = JobState.RUNNING # Event emittieren if self._config.emit_job_events and self._event_manager: await self._event_manager.emit("scheduler_job_started", { "job_id": job.job_id, "job_name": job.name, }) # Kontext für Actions action_context = { **self._context, "job_id": job.job_id, "job_name": job.name, "event_manager": self._event_manager, "current_time": start_time, } # Actions ausführen results = [] error = None for action in job.actions: if not action.is_enabled: continue try: result = await action.execute(action_context) results.append(result) if not result.success: error = result.error break except Exception as e: error = str(e) break # Trigger markieren if job.trigger: job.trigger.fire() # Job-Ergebnis end_time = datetime.now() job_result = JobResult( job_id=job.job_id, success=error is None, start_time=start_time, end_time=end_time, result=results, error=error, condition_results=job.check_conditions(self._context), ) # Job aktualisieren job.record_run(job_result) # Speichern if self._config.auto_save: self._save_job(job) # Callbacks for callback in self._job_callbacks: try: callback(job, job_result) except Exception: pass # Event emittieren if self._config.emit_job_events and self._event_manager: await self._event_manager.emit("scheduler_job_completed", { "job_id": job.job_id, "job_name": job.name, "success": job_result.success, "duration_ms": job_result.duration_ms, "error": error, }) except Exception as e: from trixy_core.utils.debug import perror perror(f"Fehler bei Job {job.job_id}: {e}") job._state = JobState.FAILED finally: self._running_jobs.discard(job.job_id) # === Event Handling === async def on_event(self, event_name: str, event_data: dict | None = None) -> None: """ Wird aufgerufen wenn ein Event eintrifft. Für Jobs mit EventTrigger. """ context = TriggerContext( current_time=datetime.now(), event_name=event_name, event_data=event_data, ) for job in list(self._jobs.values()): if not job.is_runnable: continue if job.job_id in self._running_jobs: continue # Nur EventTrigger oder Composite mit EventTrigger trigger = job.trigger if not trigger: continue if isinstance(trigger, EventTrigger): if trigger.on_event(event_name, event_data): if job.should_run_now(self._context): asyncio.create_task(self._execute_job(job)) # === Statistics === def get_statistics(self) -> dict: """Gibt Scheduler-Statistiken zurück.""" enabled = sum(1 for j in self._jobs.values() if j.is_enabled) total_runs = sum(j.run_count for j in self._jobs.values()) total_failures = sum(j.failure_count for j in self._jobs.values()) return { "total_jobs": len(self._jobs), "enabled_jobs": enabled, "running_jobs": len(self._running_jobs), "total_runs": total_runs, "total_failures": total_failures, "is_running": self._running, }