| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- # -*- coding: utf-8 -*-
- """
- TTL-basierter In-Memory-Cache fuer Wetterdaten.
- """
- from datetime import datetime, timedelta
- from plugins.weather.models import WeatherResult
- from trixy_core.utils.debug import pdebug
- class WeatherCache:
- """
- Einfacher TTL-basierter Cache fuer WeatherResult-Objekte.
- Attributes:
- _cache: Zuordnung location_key -> WeatherResult
- _ttl_hours: Gueltigkeitsdauer in Stunden
- """
- def __init__(self, ttl_hours: float = 5.0) -> None:
- self._cache: dict[str, WeatherResult] = {}
- self._ttl_hours = ttl_hours
- def get(self, location_key: str) -> WeatherResult | None:
- """
- Gibt gecachte Wetterdaten zurueck, falls noch gueltig.
- Args:
- location_key: Normalisierter Stadtname (lowercase)
- Returns:
- WeatherResult oder None wenn abgelaufen/nicht vorhanden
- """
- result = self._cache.get(location_key)
- if result is None:
- return None
- if not self._is_fresh(result):
- pdebug(f"WeatherCache: Cache abgelaufen fuer '{location_key}'")
- del self._cache[location_key]
- return None
- return result
- def set(self, location_key: str, result: WeatherResult) -> None:
- """
- Speichert ein WeatherResult im Cache.
- Args:
- location_key: Normalisierter Stadtname (lowercase)
- result: Wetterergebnis
- """
- self._cache[location_key] = result
- pdebug(f"WeatherCache: Cache aktualisiert fuer '{location_key}'")
- def is_valid(self, location_key: str) -> bool:
- """
- Prueft ob ein gueltiger Cache-Eintrag existiert.
- Args:
- location_key: Normalisierter Stadtname (lowercase)
- Returns:
- True wenn gueltig und nicht abgelaufen
- """
- result = self._cache.get(location_key)
- if result is None:
- return False
- return self._is_fresh(result)
- def clear(self) -> None:
- """Leert den gesamten Cache."""
- self._cache.clear()
- pdebug("WeatherCache: Cache geleert")
- def _is_fresh(self, result: WeatherResult) -> bool:
- """Prueft ob ein Ergebnis noch innerhalb der TTL liegt."""
- age = datetime.now() - result.fetched_at
- return age < timedelta(hours=self._ttl_hours)
|