job.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. # -*- coding: utf-8 -*-
  2. """
  3. Scheduled Job - Definition und Konfiguration von Jobs.
  4. """
  5. from dataclasses import dataclass, field
  6. from datetime import datetime
  7. from enum import Enum
  8. from typing import TYPE_CHECKING, Any
  9. import uuid
  10. if TYPE_CHECKING:
  11. from trixy_core.scheduler.trigger import Trigger
  12. from trixy_core.scheduler.condition import Condition
  13. from trixy_core.scheduler.action import Action
  14. class JobState(Enum):
  15. """Zustand eines Jobs."""
  16. PENDING = "pending" # Wartet auf Ausführung
  17. RUNNING = "running" # Wird ausgeführt
  18. COMPLETED = "completed" # Erfolgreich ausgeführt
  19. FAILED = "failed" # Fehlgeschlagen
  20. DISABLED = "disabled" # Deaktiviert
  21. EXPIRED = "expired" # Abgelaufen (max_runs erreicht)
  22. @dataclass
  23. class JobResult:
  24. """Ergebnis einer Job-Ausführung."""
  25. job_id: str
  26. success: bool
  27. start_time: datetime
  28. end_time: datetime
  29. result: Any = None
  30. error: str | None = None
  31. condition_results: dict[str, bool] = field(default_factory=dict)
  32. @property
  33. def duration_ms(self) -> float:
  34. """Ausführungsdauer in Millisekunden."""
  35. return (self.end_time - self.start_time).total_seconds() * 1000
  36. def to_dict(self) -> dict:
  37. """Konvertiert zu Dictionary."""
  38. return {
  39. "job_id": self.job_id,
  40. "success": self.success,
  41. "start_time": self.start_time.isoformat(),
  42. "end_time": self.end_time.isoformat(),
  43. "duration_ms": self.duration_ms,
  44. "error": self.error,
  45. "condition_results": self.condition_results,
  46. }
  47. @dataclass
  48. class JobConfig:
  49. """Konfiguration für einen Job."""
  50. # Identifikation
  51. name: str = ""
  52. description: str = ""
  53. tags: list[str] = field(default_factory=list)
  54. # Ausführungslimits
  55. max_runs: int = 0 # 0 = unbegrenzt
  56. max_failures: int = 3 # Nach X Fehlern deaktivieren
  57. timeout_seconds: float = 60.0
  58. # Verhalten
  59. enabled: bool = True
  60. run_on_startup: bool = False
  61. catch_up: bool = False # Verpasste Ausführungen nachholen
  62. coalesce: bool = True # Mehrere Trigger zusammenfassen
  63. # Retry
  64. retry_on_failure: bool = False
  65. retry_delay_seconds: float = 60.0
  66. max_retries: int = 3
  67. # Priorität
  68. priority: int = 0 # Höher = wichtiger
  69. # Metadaten
  70. metadata: dict[str, Any] = field(default_factory=dict)
  71. class ScheduledJob:
  72. """
  73. Ein geplanter Job.
  74. Kombiniert:
  75. - Trigger: Wann der Job ausgelöst wird
  76. - Conditions: Zusätzliche Bedingungen für Ausführung
  77. - Actions: Was ausgeführt wird
  78. """
  79. def __init__(
  80. self,
  81. job_id: str | None = None,
  82. trigger: "Trigger | None" = None,
  83. conditions: list["Condition"] | None = None,
  84. actions: list["Action"] | None = None,
  85. config: JobConfig | None = None,
  86. ):
  87. """
  88. Initialisiert den Job.
  89. Args:
  90. job_id: Job-ID (wird generiert wenn None)
  91. trigger: Trigger für den Job
  92. conditions: Liste von Conditions
  93. actions: Liste von Actions
  94. config: Job-Konfiguration
  95. """
  96. self._job_id = job_id or f"job-{uuid.uuid4().hex[:8]}"
  97. self._trigger = trigger
  98. self._conditions = conditions or []
  99. self._actions = actions or []
  100. self._config = config or JobConfig()
  101. # State
  102. self._state = JobState.PENDING
  103. self._created_at = datetime.now()
  104. self._last_run: datetime | None = None
  105. self._next_run: datetime | None = None
  106. self._run_count = 0
  107. self._failure_count = 0
  108. self._consecutive_failures = 0
  109. # History
  110. self._history: list[JobResult] = []
  111. self._max_history = 100
  112. # === Properties ===
  113. @property
  114. def job_id(self) -> str:
  115. """Job-ID."""
  116. return self._job_id
  117. @property
  118. def name(self) -> str:
  119. """Job-Name."""
  120. return self._config.name or self._job_id
  121. @property
  122. def trigger(self) -> "Trigger | None":
  123. """Trigger."""
  124. return self._trigger
  125. @trigger.setter
  126. def trigger(self, value: "Trigger") -> None:
  127. """Setzt Trigger."""
  128. self._trigger = value
  129. self._update_next_run()
  130. @property
  131. def conditions(self) -> list["Condition"]:
  132. """Conditions."""
  133. return list(self._conditions)
  134. @property
  135. def actions(self) -> list["Action"]:
  136. """Actions."""
  137. return list(self._actions)
  138. @property
  139. def config(self) -> JobConfig:
  140. """Konfiguration."""
  141. return self._config
  142. @property
  143. def state(self) -> JobState:
  144. """Aktueller Zustand."""
  145. return self._state
  146. @property
  147. def is_enabled(self) -> bool:
  148. """Prüft ob Job aktiviert ist."""
  149. return self._config.enabled and self._state not in (
  150. JobState.DISABLED, JobState.EXPIRED
  151. )
  152. @property
  153. def is_runnable(self) -> bool:
  154. """Prüft ob Job ausführbar ist."""
  155. return self.is_enabled and self._state != JobState.RUNNING
  156. @property
  157. def created_at(self) -> datetime:
  158. """Erstellungszeit."""
  159. return self._created_at
  160. @property
  161. def last_run(self) -> datetime | None:
  162. """Letzte Ausführung."""
  163. return self._last_run
  164. @property
  165. def next_run(self) -> datetime | None:
  166. """Nächste geplante Ausführung."""
  167. return self._next_run
  168. @property
  169. def run_count(self) -> int:
  170. """Anzahl Ausführungen."""
  171. return self._run_count
  172. @property
  173. def failure_count(self) -> int:
  174. """Anzahl Fehler."""
  175. return self._failure_count
  176. @property
  177. def history(self) -> list[JobResult]:
  178. """Ausführungs-History."""
  179. return list(self._history)
  180. @property
  181. def last_result(self) -> JobResult | None:
  182. """Letztes Ergebnis."""
  183. return self._history[-1] if self._history else None
  184. # === Conditions ===
  185. def add_condition(self, condition: "Condition") -> None:
  186. """
  187. Fügt Condition hinzu.
  188. Args:
  189. condition: Zu prüfende Condition
  190. """
  191. self._conditions.append(condition)
  192. def remove_condition(self, condition: "Condition") -> bool:
  193. """
  194. Entfernt Condition.
  195. Args:
  196. condition: Zu entfernende Condition
  197. Returns:
  198. True wenn entfernt
  199. """
  200. try:
  201. self._conditions.remove(condition)
  202. return True
  203. except ValueError:
  204. return False
  205. def clear_conditions(self) -> None:
  206. """Entfernt alle Conditions."""
  207. self._conditions.clear()
  208. # === Actions ===
  209. def add_action(self, action: "Action") -> None:
  210. """
  211. Fügt Action hinzu.
  212. Args:
  213. action: Auszuführende Action
  214. """
  215. self._actions.append(action)
  216. def remove_action(self, action: "Action") -> bool:
  217. """
  218. Entfernt Action.
  219. Args:
  220. action: Zu entfernende Action
  221. Returns:
  222. True wenn entfernt
  223. """
  224. try:
  225. self._actions.remove(action)
  226. return True
  227. except ValueError:
  228. return False
  229. def clear_actions(self) -> None:
  230. """Entfernt alle Actions."""
  231. self._actions.clear()
  232. # === Lifecycle ===
  233. def enable(self) -> None:
  234. """Aktiviert den Job."""
  235. self._config.enabled = True
  236. if self._state == JobState.DISABLED:
  237. self._state = JobState.PENDING
  238. self._update_next_run()
  239. def disable(self) -> None:
  240. """Deaktiviert den Job."""
  241. self._config.enabled = False
  242. self._state = JobState.DISABLED
  243. def reset(self) -> None:
  244. """Setzt Job-Statistiken zurück."""
  245. self._run_count = 0
  246. self._failure_count = 0
  247. self._consecutive_failures = 0
  248. self._history.clear()
  249. self._state = JobState.PENDING
  250. self._update_next_run()
  251. # === Execution Tracking ===
  252. def should_run_now(self, context: dict | None = None) -> bool:
  253. """
  254. Prüft ob Job jetzt ausgeführt werden soll.
  255. Args:
  256. context: Ausführungs-Kontext
  257. Returns:
  258. True wenn alle Conditions erfüllt
  259. """
  260. if not self.is_runnable:
  261. return False
  262. # Prüfe alle Conditions
  263. for condition in self._conditions:
  264. if not condition.evaluate(context or {}):
  265. return False
  266. return True
  267. def check_conditions(self, context: dict | None = None) -> dict[str, bool]:
  268. """
  269. Prüft alle Conditions und gibt Ergebnisse zurück.
  270. Args:
  271. context: Ausführungs-Kontext
  272. Returns:
  273. Dictionary mit Condition-Name -> Ergebnis
  274. """
  275. results = {}
  276. for condition in self._conditions:
  277. results[condition.name] = condition.evaluate(context or {})
  278. return results
  279. def record_run(self, result: JobResult) -> None:
  280. """
  281. Zeichnet Ausführung auf.
  282. Args:
  283. result: Ausführungs-Ergebnis
  284. """
  285. self._run_count += 1
  286. self._last_run = result.end_time
  287. if result.success:
  288. self._consecutive_failures = 0
  289. else:
  290. self._failure_count += 1
  291. self._consecutive_failures += 1
  292. # History begrenzen
  293. self._history.append(result)
  294. while len(self._history) > self._max_history:
  295. self._history.pop(0)
  296. # Prüfe max_runs
  297. if self._config.max_runs > 0 and self._run_count >= self._config.max_runs:
  298. self._state = JobState.EXPIRED
  299. # Prüfe max_failures
  300. elif self._consecutive_failures >= self._config.max_failures:
  301. self._state = JobState.DISABLED
  302. else:
  303. self._state = JobState.PENDING
  304. self._update_next_run()
  305. def _update_next_run(self) -> None:
  306. """Aktualisiert next_run basierend auf Trigger."""
  307. if not self.is_enabled or not self._trigger:
  308. self._next_run = None
  309. return
  310. self._next_run = self._trigger.get_next_fire_time(self._last_run)
  311. # === Serialization ===
  312. def to_dict(self) -> dict:
  313. """Konvertiert zu Dictionary."""
  314. return {
  315. "job_id": self._job_id,
  316. "name": self.name,
  317. "description": self._config.description,
  318. "state": self._state.value,
  319. "enabled": self._config.enabled,
  320. "created_at": self._created_at.isoformat(),
  321. "last_run": self._last_run.isoformat() if self._last_run else None,
  322. "next_run": self._next_run.isoformat() if self._next_run else None,
  323. "run_count": self._run_count,
  324. "failure_count": self._failure_count,
  325. "trigger": self._trigger.to_dict() if self._trigger else None,
  326. "conditions": [c.to_dict() for c in self._conditions],
  327. "actions": [a.to_dict() for a in self._actions],
  328. "config": {
  329. "max_runs": self._config.max_runs,
  330. "max_failures": self._config.max_failures,
  331. "timeout_seconds": self._config.timeout_seconds,
  332. "priority": self._config.priority,
  333. "tags": self._config.tags,
  334. },
  335. }
  336. def __repr__(self) -> str:
  337. return f"ScheduledJob(id={self._job_id}, name={self.name}, state={self._state.value})"