| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421 |
- # -*- coding: utf-8 -*-
- """
- Scheduled Job - Definition und Konfiguration von Jobs.
- """
- from dataclasses import dataclass, field
- from datetime import datetime
- from enum import Enum
- from typing import TYPE_CHECKING, Any
- import uuid
- if TYPE_CHECKING:
- from trixy_core.scheduler.trigger import Trigger
- from trixy_core.scheduler.condition import Condition
- from trixy_core.scheduler.action import Action
- class JobState(Enum):
- """Zustand eines Jobs."""
- PENDING = "pending" # Wartet auf Ausführung
- RUNNING = "running" # Wird ausgeführt
- COMPLETED = "completed" # Erfolgreich ausgeführt
- FAILED = "failed" # Fehlgeschlagen
- DISABLED = "disabled" # Deaktiviert
- EXPIRED = "expired" # Abgelaufen (max_runs erreicht)
- @dataclass
- class JobResult:
- """Ergebnis einer Job-Ausführung."""
- job_id: str
- success: bool
- start_time: datetime
- end_time: datetime
- result: Any = None
- error: str | None = None
- condition_results: dict[str, bool] = field(default_factory=dict)
- @property
- def duration_ms(self) -> float:
- """Ausführungsdauer in Millisekunden."""
- return (self.end_time - self.start_time).total_seconds() * 1000
- def to_dict(self) -> dict:
- """Konvertiert zu Dictionary."""
- return {
- "job_id": self.job_id,
- "success": self.success,
- "start_time": self.start_time.isoformat(),
- "end_time": self.end_time.isoformat(),
- "duration_ms": self.duration_ms,
- "error": self.error,
- "condition_results": self.condition_results,
- }
- @dataclass
- class JobConfig:
- """Konfiguration für einen Job."""
- # Identifikation
- name: str = ""
- description: str = ""
- tags: list[str] = field(default_factory=list)
- # Ausführungslimits
- max_runs: int = 0 # 0 = unbegrenzt
- max_failures: int = 3 # Nach X Fehlern deaktivieren
- timeout_seconds: float = 60.0
- # Verhalten
- enabled: bool = True
- run_on_startup: bool = False
- catch_up: bool = False # Verpasste Ausführungen nachholen
- coalesce: bool = True # Mehrere Trigger zusammenfassen
- # Retry
- retry_on_failure: bool = False
- retry_delay_seconds: float = 60.0
- max_retries: int = 3
- # Priorität
- priority: int = 0 # Höher = wichtiger
- # Metadaten
- metadata: dict[str, Any] = field(default_factory=dict)
- class ScheduledJob:
- """
- Ein geplanter Job.
- Kombiniert:
- - Trigger: Wann der Job ausgelöst wird
- - Conditions: Zusätzliche Bedingungen für Ausführung
- - Actions: Was ausgeführt wird
- """
- def __init__(
- self,
- job_id: str | None = None,
- trigger: "Trigger | None" = None,
- conditions: list["Condition"] | None = None,
- actions: list["Action"] | None = None,
- config: JobConfig | None = None,
- ):
- """
- Initialisiert den Job.
- Args:
- job_id: Job-ID (wird generiert wenn None)
- trigger: Trigger für den Job
- conditions: Liste von Conditions
- actions: Liste von Actions
- config: Job-Konfiguration
- """
- self._job_id = job_id or f"job-{uuid.uuid4().hex[:8]}"
- self._trigger = trigger
- self._conditions = conditions or []
- self._actions = actions or []
- self._config = config or JobConfig()
- # State
- self._state = JobState.PENDING
- self._created_at = datetime.now()
- self._last_run: datetime | None = None
- self._next_run: datetime | None = None
- self._run_count = 0
- self._failure_count = 0
- self._consecutive_failures = 0
- # History
- self._history: list[JobResult] = []
- self._max_history = 100
- # === Properties ===
- @property
- def job_id(self) -> str:
- """Job-ID."""
- return self._job_id
- @property
- def name(self) -> str:
- """Job-Name."""
- return self._config.name or self._job_id
- @property
- def trigger(self) -> "Trigger | None":
- """Trigger."""
- return self._trigger
- @trigger.setter
- def trigger(self, value: "Trigger") -> None:
- """Setzt Trigger."""
- self._trigger = value
- self._update_next_run()
- @property
- def conditions(self) -> list["Condition"]:
- """Conditions."""
- return list(self._conditions)
- @property
- def actions(self) -> list["Action"]:
- """Actions."""
- return list(self._actions)
- @property
- def config(self) -> JobConfig:
- """Konfiguration."""
- return self._config
- @property
- def state(self) -> JobState:
- """Aktueller Zustand."""
- return self._state
- @property
- def is_enabled(self) -> bool:
- """Prüft ob Job aktiviert ist."""
- return self._config.enabled and self._state not in (
- JobState.DISABLED, JobState.EXPIRED
- )
- @property
- def is_runnable(self) -> bool:
- """Prüft ob Job ausführbar ist."""
- return self.is_enabled and self._state != JobState.RUNNING
- @property
- def created_at(self) -> datetime:
- """Erstellungszeit."""
- return self._created_at
- @property
- def last_run(self) -> datetime | None:
- """Letzte Ausführung."""
- return self._last_run
- @property
- def next_run(self) -> datetime | None:
- """Nächste geplante Ausführung."""
- return self._next_run
- @property
- def run_count(self) -> int:
- """Anzahl Ausführungen."""
- return self._run_count
- @property
- def failure_count(self) -> int:
- """Anzahl Fehler."""
- return self._failure_count
- @property
- def history(self) -> list[JobResult]:
- """Ausführungs-History."""
- return list(self._history)
- @property
- def last_result(self) -> JobResult | None:
- """Letztes Ergebnis."""
- return self._history[-1] if self._history else None
- # === Conditions ===
- def add_condition(self, condition: "Condition") -> None:
- """
- Fügt Condition hinzu.
- Args:
- condition: Zu prüfende Condition
- """
- self._conditions.append(condition)
- def remove_condition(self, condition: "Condition") -> bool:
- """
- Entfernt Condition.
- Args:
- condition: Zu entfernende Condition
- Returns:
- True wenn entfernt
- """
- try:
- self._conditions.remove(condition)
- return True
- except ValueError:
- return False
- def clear_conditions(self) -> None:
- """Entfernt alle Conditions."""
- self._conditions.clear()
- # === Actions ===
- def add_action(self, action: "Action") -> None:
- """
- Fügt Action hinzu.
- Args:
- action: Auszuführende Action
- """
- self._actions.append(action)
- def remove_action(self, action: "Action") -> bool:
- """
- Entfernt Action.
- Args:
- action: Zu entfernende Action
- Returns:
- True wenn entfernt
- """
- try:
- self._actions.remove(action)
- return True
- except ValueError:
- return False
- def clear_actions(self) -> None:
- """Entfernt alle Actions."""
- self._actions.clear()
- # === Lifecycle ===
- def enable(self) -> None:
- """Aktiviert den Job."""
- self._config.enabled = True
- if self._state == JobState.DISABLED:
- self._state = JobState.PENDING
- self._update_next_run()
- def disable(self) -> None:
- """Deaktiviert den Job."""
- self._config.enabled = False
- self._state = JobState.DISABLED
- def reset(self) -> None:
- """Setzt Job-Statistiken zurück."""
- self._run_count = 0
- self._failure_count = 0
- self._consecutive_failures = 0
- self._history.clear()
- self._state = JobState.PENDING
- self._update_next_run()
- # === Execution Tracking ===
- def should_run_now(self, context: dict | None = None) -> bool:
- """
- Prüft ob Job jetzt ausgeführt werden soll.
- Args:
- context: Ausführungs-Kontext
- Returns:
- True wenn alle Conditions erfüllt
- """
- if not self.is_runnable:
- return False
- # Prüfe alle Conditions
- for condition in self._conditions:
- if not condition.evaluate(context or {}):
- return False
- return True
- def check_conditions(self, context: dict | None = None) -> dict[str, bool]:
- """
- Prüft alle Conditions und gibt Ergebnisse zurück.
- Args:
- context: Ausführungs-Kontext
- Returns:
- Dictionary mit Condition-Name -> Ergebnis
- """
- results = {}
- for condition in self._conditions:
- results[condition.name] = condition.evaluate(context or {})
- return results
- def record_run(self, result: JobResult) -> None:
- """
- Zeichnet Ausführung auf.
- Args:
- result: Ausführungs-Ergebnis
- """
- self._run_count += 1
- self._last_run = result.end_time
- if result.success:
- self._consecutive_failures = 0
- else:
- self._failure_count += 1
- self._consecutive_failures += 1
- # History begrenzen
- self._history.append(result)
- while len(self._history) > self._max_history:
- self._history.pop(0)
- # Prüfe max_runs
- if self._config.max_runs > 0 and self._run_count >= self._config.max_runs:
- self._state = JobState.EXPIRED
- # Prüfe max_failures
- elif self._consecutive_failures >= self._config.max_failures:
- self._state = JobState.DISABLED
- else:
- self._state = JobState.PENDING
- self._update_next_run()
- def _update_next_run(self) -> None:
- """Aktualisiert next_run basierend auf Trigger."""
- if not self.is_enabled or not self._trigger:
- self._next_run = None
- return
- self._next_run = self._trigger.get_next_fire_time(self._last_run)
- # === Serialization ===
- def to_dict(self) -> dict:
- """Konvertiert zu Dictionary."""
- return {
- "job_id": self._job_id,
- "name": self.name,
- "description": self._config.description,
- "state": self._state.value,
- "enabled": self._config.enabled,
- "created_at": self._created_at.isoformat(),
- "last_run": self._last_run.isoformat() if self._last_run else None,
- "next_run": self._next_run.isoformat() if self._next_run else None,
- "run_count": self._run_count,
- "failure_count": self._failure_count,
- "trigger": self._trigger.to_dict() if self._trigger else None,
- "conditions": [c.to_dict() for c in self._conditions],
- "actions": [a.to_dict() for a in self._actions],
- "config": {
- "max_runs": self._config.max_runs,
- "max_failures": self._config.max_failures,
- "timeout_seconds": self._config.timeout_seconds,
- "priority": self._config.priority,
- "tags": self._config.tags,
- },
- }
- def __repr__(self) -> str:
- return f"ScheduledJob(id={self._job_id}, name={self.name}, state={self._state.value})"
|