| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736 |
- # -*- coding: utf-8 -*-
- """
- Scheduler - Hauptklasse für Job-Scheduling.
- """
- import asyncio
- import json
- import os
- from dataclasses import dataclass, field
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Any, Callable, TYPE_CHECKING
- from trixy_core.scheduler.job import ScheduledJob, JobConfig, JobState, JobResult
- from trixy_core.scheduler.trigger.base import Trigger, TriggerContext
- from trixy_core.scheduler.trigger.cron import CronTrigger
- from trixy_core.scheduler.trigger.interval import IntervalTrigger
- from trixy_core.scheduler.trigger.datetime_trigger import DateTimeTrigger
- from trixy_core.scheduler.trigger.event import EventTrigger
- from trixy_core.scheduler.trigger.composite import AndTrigger, OrTrigger
- from trixy_core.scheduler.condition.base import Condition
- from trixy_core.scheduler.condition.time import (
- TimeRangeCondition, DayOfWeekCondition, DateRangeCondition,
- )
- from trixy_core.scheduler.condition.activity import (
- RecentActivityCondition, ActivityCountCondition,
- )
- from trixy_core.scheduler.condition.sensor import (
- ComparisonOperator, SensorThresholdCondition, SensorChangeCondition,
- )
- from trixy_core.scheduler.condition.state import (
- StateCondition, BooleanCondition, ExistsCondition,
- )
- from trixy_core.scheduler.condition.composite import (
- AndCondition, OrCondition, NotCondition, XorCondition,
- )
- from trixy_core.scheduler.condition.script import (
- ScriptCondition, AsyncScriptCondition,
- )
- from trixy_core.scheduler.condition.file_value import (
- JsonValueCondition, IniValueCondition, YamlValueCondition,
- TomlValueCondition, EnvFileValueCondition,
- )
- from trixy_core.scheduler.condition.expression import (
- ExpressionCondition, LambdaCondition,
- )
- from trixy_core.scheduler.action.base import Action, ActionResult
- from trixy_core.scheduler.action.event import EmitEventAction, MultiEventAction
- from trixy_core.scheduler.action.callback import CallbackAction, MethodAction
- from trixy_core.scheduler.action.command import CommandAction, ScriptAction
- from trixy_core.scheduler.action.tts import TTSSpeakAction
- if TYPE_CHECKING:
- from trixy_core.events import EventManager
- @dataclass
- class SchedulerConfig:
- """Konfiguration für den Scheduler."""
- # Intervalle
- tick_interval_seconds: float = 1.0 # Prüfintervall
- cleanup_interval_seconds: float = 3600 # History-Cleanup
- # Job-Persistierung
- jobs_directory: str = "./cron" # Verzeichnis für Job-Dateien
- auto_load: bool = True # Jobs beim Start laden
- auto_save: bool = True # Jobs bei Änderungen speichern
- # Limits
- max_concurrent_jobs: int = 10 # Maximale parallele Jobs
- max_history_per_job: int = 100 # History-Einträge pro Job
- # Defaults für Jobs
- default_timeout_seconds: float = 60.0
- default_max_retries: int = 3
- # Event-Integration
- emit_job_events: bool = True # Events bei Job-Ausführung emittieren
- class Scheduler:
- """
- Scheduler für Job-Ausführung.
- Lädt und speichert Jobs als JSON-Dateien in ./cron/*.json.
- """
- # Trigger-Registry für Deserialisierung
- TRIGGER_TYPES: dict[str, type] = {
- "CronTrigger": CronTrigger,
- "IntervalTrigger": IntervalTrigger,
- "DateTimeTrigger": DateTimeTrigger,
- "EventTrigger": EventTrigger,
- "AndTrigger": AndTrigger,
- "OrTrigger": OrTrigger,
- }
- # Condition-Registry für Deserialisierung
- CONDITION_TYPES: dict[str, type] = {
- "TimeRangeCondition": TimeRangeCondition,
- "DayOfWeekCondition": DayOfWeekCondition,
- "DateRangeCondition": DateRangeCondition,
- "RecentActivityCondition": RecentActivityCondition,
- "ActivityCountCondition": ActivityCountCondition,
- "SensorThresholdCondition": SensorThresholdCondition,
- "SensorChangeCondition": SensorChangeCondition,
- "StateCondition": StateCondition,
- "BooleanCondition": BooleanCondition,
- "ExistsCondition": ExistsCondition,
- "AndCondition": AndCondition,
- "OrCondition": OrCondition,
- "NotCondition": NotCondition,
- "XorCondition": XorCondition,
- # Script
- "ScriptCondition": ScriptCondition,
- "AsyncScriptCondition": AsyncScriptCondition,
- # File Value
- "JsonValueCondition": JsonValueCondition,
- "IniValueCondition": IniValueCondition,
- "YamlValueCondition": YamlValueCondition,
- "TomlValueCondition": TomlValueCondition,
- "EnvFileValueCondition": EnvFileValueCondition,
- # Expression
- "ExpressionCondition": ExpressionCondition,
- # LambdaCondition kann nicht aus JSON geladen werden
- }
- # Action-Registry für Deserialisierung
- ACTION_TYPES: dict[str, type] = {
- "EmitEventAction": EmitEventAction,
- "MultiEventAction": MultiEventAction,
- "CommandAction": CommandAction,
- "ScriptAction": ScriptAction,
- "TTSSpeakAction": TTSSpeakAction,
- # CallbackAction und MethodAction können nicht aus JSON geladen werden
- }
- def __init__(
- self,
- config: SchedulerConfig | None = None,
- event_manager: "EventManager | None" = None,
- ):
- """
- Initialisiert den Scheduler.
- Args:
- config: Scheduler-Konfiguration
- event_manager: EventManager für Events
- """
- self._config = config or SchedulerConfig()
- self._event_manager = event_manager
- # Jobs
- self._jobs: dict[str, ScheduledJob] = {}
- self._running_jobs: set[str] = set()
- # Zustand
- self._running = False
- self._task: asyncio.Task | None = None
- self._context: dict[str, Any] = {}
- # Callbacks
- self._job_callbacks: list[Callable[[ScheduledJob, JobResult], None]] = []
- # Verzeichnis erstellen
- self._ensure_directory()
- def _ensure_directory(self) -> None:
- """Stellt sicher, dass das Jobs-Verzeichnis existiert."""
- jobs_dir = Path(self._config.jobs_directory)
- jobs_dir.mkdir(parents=True, exist_ok=True)
- @property
- def config(self) -> SchedulerConfig:
- """Scheduler-Konfiguration."""
- return self._config
- @property
- def is_running(self) -> bool:
- """Prüft ob Scheduler läuft."""
- return self._running
- @property
- def jobs(self) -> dict[str, ScheduledJob]:
- """Alle Jobs."""
- return dict(self._jobs)
- @property
- def job_count(self) -> int:
- """Anzahl Jobs."""
- return len(self._jobs)
- @property
- def running_jobs(self) -> set[str]:
- """Aktuell laufende Jobs."""
- return self._running_jobs.copy()
- @property
- def context(self) -> dict[str, Any]:
- """Aktueller Kontext."""
- return dict(self._context)
- # === Context Management ===
- def set_context(self, key: str, value: Any) -> None:
- """Setzt Kontext-Variable."""
- self._context[key] = value
- def update_context(self, data: dict[str, Any]) -> None:
- """Aktualisiert Kontext."""
- self._context.update(data)
- def clear_context(self) -> None:
- """Löscht Kontext."""
- self._context.clear()
- # === Job Management ===
- def add_job(
- self,
- job: ScheduledJob,
- save: bool | None = None,
- ) -> str:
- """
- Fügt Job hinzu.
- Args:
- job: ScheduledJob
- save: Als JSON speichern (default: auto_save)
- Returns:
- Job-ID
- """
- self._jobs[job.job_id] = job
- # Speichern
- if save is None:
- save = self._config.auto_save
- if save:
- self._save_job(job)
- return job.job_id
- def create_job(
- self,
- trigger: Trigger,
- actions: list[Action] | Action,
- conditions: list[Condition] | None = None,
- config: JobConfig | None = None,
- job_id: str | None = None,
- save: bool | None = None,
- ) -> ScheduledJob:
- """
- Erstellt und fügt Job hinzu.
- Args:
- trigger: Trigger für den Job
- actions: Action(s) für den Job
- conditions: Optionale Conditions
- config: Job-Konfiguration
- job_id: Job-ID (wird generiert wenn None)
- save: Als JSON speichern
- Returns:
- ScheduledJob
- """
- if isinstance(actions, Action):
- actions = [actions]
- job = ScheduledJob(
- job_id=job_id,
- trigger=trigger,
- conditions=conditions,
- actions=actions,
- config=config,
- )
- self.add_job(job, save=save)
- return job
- def get_job(self, job_id: str) -> ScheduledJob | None:
- """Holt Job nach ID."""
- return self._jobs.get(job_id)
- def remove_job(
- self,
- job_id: str,
- delete_file: bool = True,
- ) -> bool:
- """
- Entfernt Job.
- Args:
- job_id: Job-ID
- delete_file: JSON-Datei löschen
- Returns:
- True wenn entfernt
- """
- if job_id not in self._jobs:
- return False
- del self._jobs[job_id]
- if delete_file:
- self._delete_job_file(job_id)
- return True
- def enable_job(self, job_id: str) -> bool:
- """Aktiviert Job."""
- job = self._jobs.get(job_id)
- if job:
- job.enable()
- if self._config.auto_save:
- self._save_job(job)
- return True
- return False
- def disable_job(self, job_id: str) -> bool:
- """Deaktiviert Job."""
- job = self._jobs.get(job_id)
- if job:
- job.disable()
- if self._config.auto_save:
- self._save_job(job)
- return True
- return False
- # === Callbacks ===
- def on_job_executed(
- self,
- callback: Callable[[ScheduledJob, JobResult], None],
- ) -> None:
- """Registriert Callback für Job-Ausführung."""
- self._job_callbacks.append(callback)
- # === Persistence ===
- def _get_job_path(self, job_id: str) -> Path:
- """Gibt Pfad zur Job-Datei zurück."""
- # Dateiname: job_id.json (Sonderzeichen ersetzen)
- safe_id = "".join(c if c.isalnum() or c in "-_" else "_" for c in job_id)
- return Path(self._config.jobs_directory) / f"{safe_id}.json"
- def _save_job(self, job: ScheduledJob) -> bool:
- """
- Speichert Job als JSON-Datei.
- Args:
- job: Zu speichernder Job
- Returns:
- True wenn erfolgreich
- """
- try:
- path = self._get_job_path(job.job_id)
- data = job.to_dict()
- with open(path, "w", encoding="utf-8") as f:
- json.dump(data, f, indent=2, ensure_ascii=False)
- return True
- except Exception as e:
- from trixy_core.utils.debug import perror
- perror(f"Fehler beim Speichern von Job {job.job_id}: {e}")
- return False
- def _delete_job_file(self, job_id: str) -> bool:
- """Löscht Job-Datei."""
- try:
- path = self._get_job_path(job_id)
- if path.exists():
- path.unlink()
- return True
- except Exception:
- return False
- def _load_job_from_dict(self, data: dict) -> ScheduledJob | None:
- """
- Lädt Job aus Dictionary.
- Args:
- data: Job-Daten
- Returns:
- ScheduledJob oder None bei Fehler
- """
- try:
- # Trigger laden
- trigger = None
- if data.get("trigger"):
- trigger = self._load_trigger(data["trigger"])
- # Conditions laden
- conditions = []
- for cond_data in data.get("conditions", []):
- condition = self._load_condition(cond_data)
- if condition:
- conditions.append(condition)
- # Actions laden
- actions = []
- for action_data in data.get("actions", []):
- action = self._load_action(action_data)
- if action:
- actions.append(action)
- # Config laden
- config_data = data.get("config", {})
- config = JobConfig(
- name=data.get("name", ""),
- description=data.get("description", ""),
- max_runs=config_data.get("max_runs", 0),
- max_failures=config_data.get("max_failures", 3),
- timeout_seconds=config_data.get("timeout_seconds", 60.0),
- priority=config_data.get("priority", 0),
- tags=config_data.get("tags", []),
- enabled=data.get("enabled", True),
- )
- # Job erstellen
- job = ScheduledJob(
- job_id=data.get("job_id"),
- trigger=trigger,
- conditions=conditions,
- actions=actions,
- config=config,
- )
- return job
- except Exception as e:
- from trixy_core.utils.debug import perror
- perror(f"Fehler beim Laden von Job: {e}")
- return None
- def _load_trigger(self, data: dict) -> Trigger | None:
- """Lädt Trigger aus Dictionary."""
- trigger_type = data.get("type")
- trigger_class = self.TRIGGER_TYPES.get(trigger_type)
- if not trigger_class:
- return None
- if hasattr(trigger_class, "from_dict"):
- return trigger_class.from_dict(data)
- return None
- def _load_condition(self, data: dict) -> Condition | None:
- """Lädt Condition aus Dictionary."""
- cond_type = data.get("type")
- cond_class = self.CONDITION_TYPES.get(cond_type)
- if not cond_class:
- return None
- if hasattr(cond_class, "from_dict"):
- return cond_class.from_dict(data)
- return None
- def _load_action(self, data: dict) -> Action | None:
- """Lädt Action aus Dictionary."""
- action_type = data.get("type")
- action_class = self.ACTION_TYPES.get(action_type)
- if not action_class:
- return None
- if hasattr(action_class, "from_dict"):
- return action_class.from_dict(data)
- return None
- def load_jobs(self) -> int:
- """
- Lädt alle Jobs aus dem Jobs-Verzeichnis.
- Returns:
- Anzahl geladener Jobs
- """
- jobs_dir = Path(self._config.jobs_directory)
- loaded = 0
- if not jobs_dir.exists():
- return 0
- for file_path in jobs_dir.glob("*.json"):
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- data = json.load(f)
- job = self._load_job_from_dict(data)
- if job:
- self._jobs[job.job_id] = job
- loaded += 1
- except Exception as e:
- from trixy_core.utils.debug import perror
- perror(f"Fehler beim Laden von {file_path}: {e}")
- return loaded
- def save_all_jobs(self) -> int:
- """
- Speichert alle Jobs.
- Returns:
- Anzahl gespeicherter Jobs
- """
- saved = 0
- for job in self._jobs.values():
- if self._save_job(job):
- saved += 1
- return saved
- # === Execution ===
- async def start(self) -> None:
- """Startet den Scheduler."""
- if self._running:
- return
- # Jobs laden
- if self._config.auto_load:
- loaded = self.load_jobs()
- from trixy_core.utils.debug import pinfo
- pinfo(f"Scheduler: {loaded} Jobs geladen")
- self._running = True
- self._task = asyncio.create_task(self._run_loop())
- async def stop(self) -> None:
- """Stoppt den Scheduler."""
- self._running = False
- if self._task:
- self._task.cancel()
- try:
- await self._task
- except asyncio.CancelledError:
- pass
- self._task = None
- # Laufende Jobs abwarten
- while self._running_jobs:
- await asyncio.sleep(0.1)
- async def _run_loop(self) -> None:
- """Hauptschleife des Schedulers."""
- while self._running:
- try:
- await self._tick()
- except Exception as e:
- from trixy_core.utils.debug import perror
- perror(f"Scheduler-Fehler: {e}")
- await asyncio.sleep(self._config.tick_interval_seconds)
- async def _tick(self) -> None:
- """Ein Tick des Schedulers."""
- now = datetime.now()
- # Jobs prüfen
- for job in list(self._jobs.values()):
- if not job.is_runnable:
- continue
- if job.job_id in self._running_jobs:
- continue
- # Max concurrent prüfen
- if len(self._running_jobs) >= self._config.max_concurrent_jobs:
- break
- # Trigger prüfen — last_fire_time aus dem Trigger selbst uebergeben
- trigger = job.trigger
- if not trigger:
- continue
- context = TriggerContext(
- current_time=now,
- last_fire_time=trigger.last_fire_time,
- )
- if trigger.should_fire(context):
- # Conditions prüfen
- if job.should_run_now(self._context):
- asyncio.create_task(self._execute_job(job))
- async def _execute_job(self, job: ScheduledJob) -> None:
- """Führt einen Job aus."""
- self._running_jobs.add(job.job_id)
- start_time = datetime.now()
- try:
- # Job-Status setzen
- job._state = JobState.RUNNING
- # Event emittieren
- if self._config.emit_job_events and self._event_manager:
- await self._event_manager.emit("scheduler_job_started", {
- "job_id": job.job_id,
- "job_name": job.name,
- })
- # Kontext für Actions
- action_context = {
- **self._context,
- "job_id": job.job_id,
- "job_name": job.name,
- "event_manager": self._event_manager,
- "current_time": start_time,
- }
- # Actions ausführen
- results = []
- error = None
- for action in job.actions:
- if not action.is_enabled:
- continue
- try:
- result = await action.execute(action_context)
- results.append(result)
- if not result.success:
- error = result.error
- break
- except Exception as e:
- error = str(e)
- break
- # Trigger markieren
- if job.trigger:
- job.trigger.fire()
- # Job-Ergebnis
- end_time = datetime.now()
- job_result = JobResult(
- job_id=job.job_id,
- success=error is None,
- start_time=start_time,
- end_time=end_time,
- result=results,
- error=error,
- condition_results=job.check_conditions(self._context),
- )
- # Job aktualisieren
- job.record_run(job_result)
- # Speichern
- if self._config.auto_save:
- self._save_job(job)
- # Callbacks
- for callback in self._job_callbacks:
- try:
- callback(job, job_result)
- except Exception:
- pass
- # Event emittieren
- if self._config.emit_job_events and self._event_manager:
- await self._event_manager.emit("scheduler_job_completed", {
- "job_id": job.job_id,
- "job_name": job.name,
- "success": job_result.success,
- "duration_ms": job_result.duration_ms,
- "error": error,
- })
- except Exception as e:
- from trixy_core.utils.debug import perror
- perror(f"Fehler bei Job {job.job_id}: {e}")
- job._state = JobState.FAILED
- finally:
- self._running_jobs.discard(job.job_id)
- # === Event Handling ===
- async def on_event(self, event_name: str, event_data: dict | None = None) -> None:
- """
- Wird aufgerufen wenn ein Event eintrifft.
- Für Jobs mit EventTrigger.
- """
- context = TriggerContext(
- current_time=datetime.now(),
- event_name=event_name,
- event_data=event_data,
- )
- for job in list(self._jobs.values()):
- if not job.is_runnable:
- continue
- if job.job_id in self._running_jobs:
- continue
- # Nur EventTrigger oder Composite mit EventTrigger
- trigger = job.trigger
- if not trigger:
- continue
- if isinstance(trigger, EventTrigger):
- if trigger.on_event(event_name, event_data):
- if job.should_run_now(self._context):
- asyncio.create_task(self._execute_job(job))
- # === Statistics ===
- def get_statistics(self) -> dict:
- """Gibt Scheduler-Statistiken zurück."""
- enabled = sum(1 for j in self._jobs.values() if j.is_enabled)
- total_runs = sum(j.run_count for j in self._jobs.values())
- total_failures = sum(j.failure_count for j in self._jobs.values())
- return {
- "total_jobs": len(self._jobs),
- "enabled_jobs": enabled,
- "running_jobs": len(self._running_jobs),
- "total_runs": total_runs,
- "total_failures": total_failures,
- "is_running": self._running,
- }
|