scheduler.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736
  1. # -*- coding: utf-8 -*-
  2. """
  3. Scheduler - Hauptklasse für Job-Scheduling.
  4. """
  5. import asyncio
  6. import json
  7. import os
  8. from dataclasses import dataclass, field
  9. from datetime import datetime, timedelta
  10. from pathlib import Path
  11. from typing import Any, Callable, TYPE_CHECKING
  12. from trixy_core.scheduler.job import ScheduledJob, JobConfig, JobState, JobResult
  13. from trixy_core.scheduler.trigger.base import Trigger, TriggerContext
  14. from trixy_core.scheduler.trigger.cron import CronTrigger
  15. from trixy_core.scheduler.trigger.interval import IntervalTrigger
  16. from trixy_core.scheduler.trigger.datetime_trigger import DateTimeTrigger
  17. from trixy_core.scheduler.trigger.event import EventTrigger
  18. from trixy_core.scheduler.trigger.composite import AndTrigger, OrTrigger
  19. from trixy_core.scheduler.condition.base import Condition
  20. from trixy_core.scheduler.condition.time import (
  21. TimeRangeCondition, DayOfWeekCondition, DateRangeCondition,
  22. )
  23. from trixy_core.scheduler.condition.activity import (
  24. RecentActivityCondition, ActivityCountCondition,
  25. )
  26. from trixy_core.scheduler.condition.sensor import (
  27. ComparisonOperator, SensorThresholdCondition, SensorChangeCondition,
  28. )
  29. from trixy_core.scheduler.condition.state import (
  30. StateCondition, BooleanCondition, ExistsCondition,
  31. )
  32. from trixy_core.scheduler.condition.composite import (
  33. AndCondition, OrCondition, NotCondition, XorCondition,
  34. )
  35. from trixy_core.scheduler.condition.script import (
  36. ScriptCondition, AsyncScriptCondition,
  37. )
  38. from trixy_core.scheduler.condition.file_value import (
  39. JsonValueCondition, IniValueCondition, YamlValueCondition,
  40. TomlValueCondition, EnvFileValueCondition,
  41. )
  42. from trixy_core.scheduler.condition.expression import (
  43. ExpressionCondition, LambdaCondition,
  44. )
  45. from trixy_core.scheduler.action.base import Action, ActionResult
  46. from trixy_core.scheduler.action.event import EmitEventAction, MultiEventAction
  47. from trixy_core.scheduler.action.callback import CallbackAction, MethodAction
  48. from trixy_core.scheduler.action.command import CommandAction, ScriptAction
  49. from trixy_core.scheduler.action.tts import TTSSpeakAction
  50. if TYPE_CHECKING:
  51. from trixy_core.events import EventManager
  52. @dataclass
  53. class SchedulerConfig:
  54. """Konfiguration für den Scheduler."""
  55. # Intervalle
  56. tick_interval_seconds: float = 1.0 # Prüfintervall
  57. cleanup_interval_seconds: float = 3600 # History-Cleanup
  58. # Job-Persistierung
  59. jobs_directory: str = "./cron" # Verzeichnis für Job-Dateien
  60. auto_load: bool = True # Jobs beim Start laden
  61. auto_save: bool = True # Jobs bei Änderungen speichern
  62. # Limits
  63. max_concurrent_jobs: int = 10 # Maximale parallele Jobs
  64. max_history_per_job: int = 100 # History-Einträge pro Job
  65. # Defaults für Jobs
  66. default_timeout_seconds: float = 60.0
  67. default_max_retries: int = 3
  68. # Event-Integration
  69. emit_job_events: bool = True # Events bei Job-Ausführung emittieren
  70. class Scheduler:
  71. """
  72. Scheduler für Job-Ausführung.
  73. Lädt und speichert Jobs als JSON-Dateien in ./cron/*.json.
  74. """
  75. # Trigger-Registry für Deserialisierung
  76. TRIGGER_TYPES: dict[str, type] = {
  77. "CronTrigger": CronTrigger,
  78. "IntervalTrigger": IntervalTrigger,
  79. "DateTimeTrigger": DateTimeTrigger,
  80. "EventTrigger": EventTrigger,
  81. "AndTrigger": AndTrigger,
  82. "OrTrigger": OrTrigger,
  83. }
  84. # Condition-Registry für Deserialisierung
  85. CONDITION_TYPES: dict[str, type] = {
  86. "TimeRangeCondition": TimeRangeCondition,
  87. "DayOfWeekCondition": DayOfWeekCondition,
  88. "DateRangeCondition": DateRangeCondition,
  89. "RecentActivityCondition": RecentActivityCondition,
  90. "ActivityCountCondition": ActivityCountCondition,
  91. "SensorThresholdCondition": SensorThresholdCondition,
  92. "SensorChangeCondition": SensorChangeCondition,
  93. "StateCondition": StateCondition,
  94. "BooleanCondition": BooleanCondition,
  95. "ExistsCondition": ExistsCondition,
  96. "AndCondition": AndCondition,
  97. "OrCondition": OrCondition,
  98. "NotCondition": NotCondition,
  99. "XorCondition": XorCondition,
  100. # Script
  101. "ScriptCondition": ScriptCondition,
  102. "AsyncScriptCondition": AsyncScriptCondition,
  103. # File Value
  104. "JsonValueCondition": JsonValueCondition,
  105. "IniValueCondition": IniValueCondition,
  106. "YamlValueCondition": YamlValueCondition,
  107. "TomlValueCondition": TomlValueCondition,
  108. "EnvFileValueCondition": EnvFileValueCondition,
  109. # Expression
  110. "ExpressionCondition": ExpressionCondition,
  111. # LambdaCondition kann nicht aus JSON geladen werden
  112. }
  113. # Action-Registry für Deserialisierung
  114. ACTION_TYPES: dict[str, type] = {
  115. "EmitEventAction": EmitEventAction,
  116. "MultiEventAction": MultiEventAction,
  117. "CommandAction": CommandAction,
  118. "ScriptAction": ScriptAction,
  119. "TTSSpeakAction": TTSSpeakAction,
  120. # CallbackAction und MethodAction können nicht aus JSON geladen werden
  121. }
  122. def __init__(
  123. self,
  124. config: SchedulerConfig | None = None,
  125. event_manager: "EventManager | None" = None,
  126. ):
  127. """
  128. Initialisiert den Scheduler.
  129. Args:
  130. config: Scheduler-Konfiguration
  131. event_manager: EventManager für Events
  132. """
  133. self._config = config or SchedulerConfig()
  134. self._event_manager = event_manager
  135. # Jobs
  136. self._jobs: dict[str, ScheduledJob] = {}
  137. self._running_jobs: set[str] = set()
  138. # Zustand
  139. self._running = False
  140. self._task: asyncio.Task | None = None
  141. self._context: dict[str, Any] = {}
  142. # Callbacks
  143. self._job_callbacks: list[Callable[[ScheduledJob, JobResult], None]] = []
  144. # Verzeichnis erstellen
  145. self._ensure_directory()
  146. def _ensure_directory(self) -> None:
  147. """Stellt sicher, dass das Jobs-Verzeichnis existiert."""
  148. jobs_dir = Path(self._config.jobs_directory)
  149. jobs_dir.mkdir(parents=True, exist_ok=True)
  150. @property
  151. def config(self) -> SchedulerConfig:
  152. """Scheduler-Konfiguration."""
  153. return self._config
  154. @property
  155. def is_running(self) -> bool:
  156. """Prüft ob Scheduler läuft."""
  157. return self._running
  158. @property
  159. def jobs(self) -> dict[str, ScheduledJob]:
  160. """Alle Jobs."""
  161. return dict(self._jobs)
  162. @property
  163. def job_count(self) -> int:
  164. """Anzahl Jobs."""
  165. return len(self._jobs)
  166. @property
  167. def running_jobs(self) -> set[str]:
  168. """Aktuell laufende Jobs."""
  169. return self._running_jobs.copy()
  170. @property
  171. def context(self) -> dict[str, Any]:
  172. """Aktueller Kontext."""
  173. return dict(self._context)
  174. # === Context Management ===
  175. def set_context(self, key: str, value: Any) -> None:
  176. """Setzt Kontext-Variable."""
  177. self._context[key] = value
  178. def update_context(self, data: dict[str, Any]) -> None:
  179. """Aktualisiert Kontext."""
  180. self._context.update(data)
  181. def clear_context(self) -> None:
  182. """Löscht Kontext."""
  183. self._context.clear()
  184. # === Job Management ===
  185. def add_job(
  186. self,
  187. job: ScheduledJob,
  188. save: bool | None = None,
  189. ) -> str:
  190. """
  191. Fügt Job hinzu.
  192. Args:
  193. job: ScheduledJob
  194. save: Als JSON speichern (default: auto_save)
  195. Returns:
  196. Job-ID
  197. """
  198. self._jobs[job.job_id] = job
  199. # Speichern
  200. if save is None:
  201. save = self._config.auto_save
  202. if save:
  203. self._save_job(job)
  204. return job.job_id
  205. def create_job(
  206. self,
  207. trigger: Trigger,
  208. actions: list[Action] | Action,
  209. conditions: list[Condition] | None = None,
  210. config: JobConfig | None = None,
  211. job_id: str | None = None,
  212. save: bool | None = None,
  213. ) -> ScheduledJob:
  214. """
  215. Erstellt und fügt Job hinzu.
  216. Args:
  217. trigger: Trigger für den Job
  218. actions: Action(s) für den Job
  219. conditions: Optionale Conditions
  220. config: Job-Konfiguration
  221. job_id: Job-ID (wird generiert wenn None)
  222. save: Als JSON speichern
  223. Returns:
  224. ScheduledJob
  225. """
  226. if isinstance(actions, Action):
  227. actions = [actions]
  228. job = ScheduledJob(
  229. job_id=job_id,
  230. trigger=trigger,
  231. conditions=conditions,
  232. actions=actions,
  233. config=config,
  234. )
  235. self.add_job(job, save=save)
  236. return job
  237. def get_job(self, job_id: str) -> ScheduledJob | None:
  238. """Holt Job nach ID."""
  239. return self._jobs.get(job_id)
  240. def remove_job(
  241. self,
  242. job_id: str,
  243. delete_file: bool = True,
  244. ) -> bool:
  245. """
  246. Entfernt Job.
  247. Args:
  248. job_id: Job-ID
  249. delete_file: JSON-Datei löschen
  250. Returns:
  251. True wenn entfernt
  252. """
  253. if job_id not in self._jobs:
  254. return False
  255. del self._jobs[job_id]
  256. if delete_file:
  257. self._delete_job_file(job_id)
  258. return True
  259. def enable_job(self, job_id: str) -> bool:
  260. """Aktiviert Job."""
  261. job = self._jobs.get(job_id)
  262. if job:
  263. job.enable()
  264. if self._config.auto_save:
  265. self._save_job(job)
  266. return True
  267. return False
  268. def disable_job(self, job_id: str) -> bool:
  269. """Deaktiviert Job."""
  270. job = self._jobs.get(job_id)
  271. if job:
  272. job.disable()
  273. if self._config.auto_save:
  274. self._save_job(job)
  275. return True
  276. return False
  277. # === Callbacks ===
  278. def on_job_executed(
  279. self,
  280. callback: Callable[[ScheduledJob, JobResult], None],
  281. ) -> None:
  282. """Registriert Callback für Job-Ausführung."""
  283. self._job_callbacks.append(callback)
  284. # === Persistence ===
  285. def _get_job_path(self, job_id: str) -> Path:
  286. """Gibt Pfad zur Job-Datei zurück."""
  287. # Dateiname: job_id.json (Sonderzeichen ersetzen)
  288. safe_id = "".join(c if c.isalnum() or c in "-_" else "_" for c in job_id)
  289. return Path(self._config.jobs_directory) / f"{safe_id}.json"
  290. def _save_job(self, job: ScheduledJob) -> bool:
  291. """
  292. Speichert Job als JSON-Datei.
  293. Args:
  294. job: Zu speichernder Job
  295. Returns:
  296. True wenn erfolgreich
  297. """
  298. try:
  299. path = self._get_job_path(job.job_id)
  300. data = job.to_dict()
  301. with open(path, "w", encoding="utf-8") as f:
  302. json.dump(data, f, indent=2, ensure_ascii=False)
  303. return True
  304. except Exception as e:
  305. from trixy_core.utils.debug import perror
  306. perror(f"Fehler beim Speichern von Job {job.job_id}: {e}")
  307. return False
  308. def _delete_job_file(self, job_id: str) -> bool:
  309. """Löscht Job-Datei."""
  310. try:
  311. path = self._get_job_path(job_id)
  312. if path.exists():
  313. path.unlink()
  314. return True
  315. except Exception:
  316. return False
  317. def _load_job_from_dict(self, data: dict) -> ScheduledJob | None:
  318. """
  319. Lädt Job aus Dictionary.
  320. Args:
  321. data: Job-Daten
  322. Returns:
  323. ScheduledJob oder None bei Fehler
  324. """
  325. try:
  326. # Trigger laden
  327. trigger = None
  328. if data.get("trigger"):
  329. trigger = self._load_trigger(data["trigger"])
  330. # Conditions laden
  331. conditions = []
  332. for cond_data in data.get("conditions", []):
  333. condition = self._load_condition(cond_data)
  334. if condition:
  335. conditions.append(condition)
  336. # Actions laden
  337. actions = []
  338. for action_data in data.get("actions", []):
  339. action = self._load_action(action_data)
  340. if action:
  341. actions.append(action)
  342. # Config laden
  343. config_data = data.get("config", {})
  344. config = JobConfig(
  345. name=data.get("name", ""),
  346. description=data.get("description", ""),
  347. max_runs=config_data.get("max_runs", 0),
  348. max_failures=config_data.get("max_failures", 3),
  349. timeout_seconds=config_data.get("timeout_seconds", 60.0),
  350. priority=config_data.get("priority", 0),
  351. tags=config_data.get("tags", []),
  352. enabled=data.get("enabled", True),
  353. )
  354. # Job erstellen
  355. job = ScheduledJob(
  356. job_id=data.get("job_id"),
  357. trigger=trigger,
  358. conditions=conditions,
  359. actions=actions,
  360. config=config,
  361. )
  362. return job
  363. except Exception as e:
  364. from trixy_core.utils.debug import perror
  365. perror(f"Fehler beim Laden von Job: {e}")
  366. return None
  367. def _load_trigger(self, data: dict) -> Trigger | None:
  368. """Lädt Trigger aus Dictionary."""
  369. trigger_type = data.get("type")
  370. trigger_class = self.TRIGGER_TYPES.get(trigger_type)
  371. if not trigger_class:
  372. return None
  373. if hasattr(trigger_class, "from_dict"):
  374. return trigger_class.from_dict(data)
  375. return None
  376. def _load_condition(self, data: dict) -> Condition | None:
  377. """Lädt Condition aus Dictionary."""
  378. cond_type = data.get("type")
  379. cond_class = self.CONDITION_TYPES.get(cond_type)
  380. if not cond_class:
  381. return None
  382. if hasattr(cond_class, "from_dict"):
  383. return cond_class.from_dict(data)
  384. return None
  385. def _load_action(self, data: dict) -> Action | None:
  386. """Lädt Action aus Dictionary."""
  387. action_type = data.get("type")
  388. action_class = self.ACTION_TYPES.get(action_type)
  389. if not action_class:
  390. return None
  391. if hasattr(action_class, "from_dict"):
  392. return action_class.from_dict(data)
  393. return None
  394. def load_jobs(self) -> int:
  395. """
  396. Lädt alle Jobs aus dem Jobs-Verzeichnis.
  397. Returns:
  398. Anzahl geladener Jobs
  399. """
  400. jobs_dir = Path(self._config.jobs_directory)
  401. loaded = 0
  402. if not jobs_dir.exists():
  403. return 0
  404. for file_path in jobs_dir.glob("*.json"):
  405. try:
  406. with open(file_path, "r", encoding="utf-8") as f:
  407. data = json.load(f)
  408. job = self._load_job_from_dict(data)
  409. if job:
  410. self._jobs[job.job_id] = job
  411. loaded += 1
  412. except Exception as e:
  413. from trixy_core.utils.debug import perror
  414. perror(f"Fehler beim Laden von {file_path}: {e}")
  415. return loaded
  416. def save_all_jobs(self) -> int:
  417. """
  418. Speichert alle Jobs.
  419. Returns:
  420. Anzahl gespeicherter Jobs
  421. """
  422. saved = 0
  423. for job in self._jobs.values():
  424. if self._save_job(job):
  425. saved += 1
  426. return saved
  427. # === Execution ===
  428. async def start(self) -> None:
  429. """Startet den Scheduler."""
  430. if self._running:
  431. return
  432. # Jobs laden
  433. if self._config.auto_load:
  434. loaded = self.load_jobs()
  435. from trixy_core.utils.debug import pinfo
  436. pinfo(f"Scheduler: {loaded} Jobs geladen")
  437. self._running = True
  438. self._task = asyncio.create_task(self._run_loop())
  439. async def stop(self) -> None:
  440. """Stoppt den Scheduler."""
  441. self._running = False
  442. if self._task:
  443. self._task.cancel()
  444. try:
  445. await self._task
  446. except asyncio.CancelledError:
  447. pass
  448. self._task = None
  449. # Laufende Jobs abwarten
  450. while self._running_jobs:
  451. await asyncio.sleep(0.1)
  452. async def _run_loop(self) -> None:
  453. """Hauptschleife des Schedulers."""
  454. while self._running:
  455. try:
  456. await self._tick()
  457. except Exception as e:
  458. from trixy_core.utils.debug import perror
  459. perror(f"Scheduler-Fehler: {e}")
  460. await asyncio.sleep(self._config.tick_interval_seconds)
  461. async def _tick(self) -> None:
  462. """Ein Tick des Schedulers."""
  463. now = datetime.now()
  464. # Jobs prüfen
  465. for job in list(self._jobs.values()):
  466. if not job.is_runnable:
  467. continue
  468. if job.job_id in self._running_jobs:
  469. continue
  470. # Max concurrent prüfen
  471. if len(self._running_jobs) >= self._config.max_concurrent_jobs:
  472. break
  473. # Trigger prüfen — last_fire_time aus dem Trigger selbst uebergeben
  474. trigger = job.trigger
  475. if not trigger:
  476. continue
  477. context = TriggerContext(
  478. current_time=now,
  479. last_fire_time=trigger.last_fire_time,
  480. )
  481. if trigger.should_fire(context):
  482. # Conditions prüfen
  483. if job.should_run_now(self._context):
  484. asyncio.create_task(self._execute_job(job))
  485. async def _execute_job(self, job: ScheduledJob) -> None:
  486. """Führt einen Job aus."""
  487. self._running_jobs.add(job.job_id)
  488. start_time = datetime.now()
  489. try:
  490. # Job-Status setzen
  491. job._state = JobState.RUNNING
  492. # Event emittieren
  493. if self._config.emit_job_events and self._event_manager:
  494. await self._event_manager.emit("scheduler_job_started", {
  495. "job_id": job.job_id,
  496. "job_name": job.name,
  497. })
  498. # Kontext für Actions
  499. action_context = {
  500. **self._context,
  501. "job_id": job.job_id,
  502. "job_name": job.name,
  503. "event_manager": self._event_manager,
  504. "current_time": start_time,
  505. }
  506. # Actions ausführen
  507. results = []
  508. error = None
  509. for action in job.actions:
  510. if not action.is_enabled:
  511. continue
  512. try:
  513. result = await action.execute(action_context)
  514. results.append(result)
  515. if not result.success:
  516. error = result.error
  517. break
  518. except Exception as e:
  519. error = str(e)
  520. break
  521. # Trigger markieren
  522. if job.trigger:
  523. job.trigger.fire()
  524. # Job-Ergebnis
  525. end_time = datetime.now()
  526. job_result = JobResult(
  527. job_id=job.job_id,
  528. success=error is None,
  529. start_time=start_time,
  530. end_time=end_time,
  531. result=results,
  532. error=error,
  533. condition_results=job.check_conditions(self._context),
  534. )
  535. # Job aktualisieren
  536. job.record_run(job_result)
  537. # Speichern
  538. if self._config.auto_save:
  539. self._save_job(job)
  540. # Callbacks
  541. for callback in self._job_callbacks:
  542. try:
  543. callback(job, job_result)
  544. except Exception:
  545. pass
  546. # Event emittieren
  547. if self._config.emit_job_events and self._event_manager:
  548. await self._event_manager.emit("scheduler_job_completed", {
  549. "job_id": job.job_id,
  550. "job_name": job.name,
  551. "success": job_result.success,
  552. "duration_ms": job_result.duration_ms,
  553. "error": error,
  554. })
  555. except Exception as e:
  556. from trixy_core.utils.debug import perror
  557. perror(f"Fehler bei Job {job.job_id}: {e}")
  558. job._state = JobState.FAILED
  559. finally:
  560. self._running_jobs.discard(job.job_id)
  561. # === Event Handling ===
  562. async def on_event(self, event_name: str, event_data: dict | None = None) -> None:
  563. """
  564. Wird aufgerufen wenn ein Event eintrifft.
  565. Für Jobs mit EventTrigger.
  566. """
  567. context = TriggerContext(
  568. current_time=datetime.now(),
  569. event_name=event_name,
  570. event_data=event_data,
  571. )
  572. for job in list(self._jobs.values()):
  573. if not job.is_runnable:
  574. continue
  575. if job.job_id in self._running_jobs:
  576. continue
  577. # Nur EventTrigger oder Composite mit EventTrigger
  578. trigger = job.trigger
  579. if not trigger:
  580. continue
  581. if isinstance(trigger, EventTrigger):
  582. if trigger.on_event(event_name, event_data):
  583. if job.should_run_now(self._context):
  584. asyncio.create_task(self._execute_job(job))
  585. # === Statistics ===
  586. def get_statistics(self) -> dict:
  587. """Gibt Scheduler-Statistiken zurück."""
  588. enabled = sum(1 for j in self._jobs.values() if j.is_enabled)
  589. total_runs = sum(j.run_count for j in self._jobs.values())
  590. total_failures = sum(j.failure_count for j in self._jobs.values())
  591. return {
  592. "total_jobs": len(self._jobs),
  593. "enabled_jobs": enabled,
  594. "running_jobs": len(self._running_jobs),
  595. "total_runs": total_runs,
  596. "total_failures": total_failures,
  597. "is_running": self._running,
  598. }