| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- # -*- coding: utf-8 -*-
- """
- Notification Manager fuer das Calendar-Plugin.
- Prueft periodisch gecachte Events und benachrichtigt
- Satellites bei Erinnerungen und Terminbeginn.
- """
- import asyncio
- from datetime import datetime, timedelta, time
- from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn
- from plugins.calendar.models import CalendarEvent, NotificationRecord
- class NotificationManager:
- """
- Verwaltet Erinnerungen und Start-Benachrichtigungen fuer Kalender-Events.
- - Eigener asyncio-Task prueft alle 60 Sekunden
- - Duplikat-Vermeidung ueber (event_id, notification_type)
- - Quiet Hours optional konfigurierbar
- """
- def __init__(
- self,
- application: object,
- sync_manager: object,
- mapper: object,
- config: dict,
- ) -> None:
- """
- Args:
- application: Application-Instanz
- sync_manager: CalendarSyncManager
- mapper: SatelliteCalendarMapper
- config: notifications-Abschnitt der Plugin-Config
- """
- self._application = application
- self._sync_manager = sync_manager
- self._mapper = mapper
- self._config = config
- self._sent_notifications: dict[tuple[str, str], NotificationRecord] = {}
- self._check_task: asyncio.Task | None = None
- self._reminder_enabled = config.get("reminder_enabled", True)
- self._start_enabled = config.get("start_enabled", True)
- self._default_reminder_minutes = config.get("default_reminder_minutes", 15)
- # Sounds (werden vom Plugin geladen)
- self._reminder_pcm: bytes | None = None
- self._start_pcm: bytes | None = None
- # Quiet Hours
- quiet_config = config.get("quiet_hours", {})
- self._quiet_enabled = quiet_config.get("enabled", False)
- self._quiet_start = self._parse_time(quiet_config.get("start", "22:00"))
- self._quiet_end = self._parse_time(quiet_config.get("end", "07:00"))
- def set_sounds(
- self,
- reminder_pcm: bytes | None,
- start_pcm: bytes | None,
- ) -> None:
- """Setzt die PCM-Audiodaten fuer Benachrichtigungs-Sounds."""
- self._reminder_pcm = reminder_pcm
- self._start_pcm = start_pcm
- async def start_checking(self) -> None:
- """Startet den periodischen Benachrichtigungs-Check."""
- if self._check_task and not self._check_task.done():
- return
- self._check_task = asyncio.create_task(self._check_loop())
- pdebug("NotificationManager: Check-Task gestartet")
- async def stop_checking(self) -> None:
- """Stoppt den Benachrichtigungs-Check."""
- if self._check_task and not self._check_task.done():
- self._check_task.cancel()
- try:
- await self._check_task
- except asyncio.CancelledError:
- pass
- self._check_task = None
- pdebug("NotificationManager: Check-Task gestoppt")
- async def check_notifications(self) -> None:
- """
- Prueft alle gecachten Events auf faellige Benachrichtigungen.
- Wird alle 60 Sekunden vom Check-Task aufgerufen.
- """
- now = datetime.now()
- # Quiet Hours pruefen
- if self._is_quiet_time(now):
- return
- # Alte Records aufraeumen (aelter als 24h)
- self._cleanup_old_records(now)
- events = self._sync_manager.get_all_cached_events()
- for event in events:
- # Erinnerung pruefen
- if self._reminder_enabled:
- await self._check_reminder(event, now)
- # Terminbeginn pruefen
- if self._start_enabled:
- await self._check_start(event, now)
- # =========================================================================
- # Interne Pruefungen
- # =========================================================================
- async def _check_reminder(self, event: CalendarEvent, now: datetime) -> None:
- """Prueft ob eine Erinnerung fuer ein Event faellig ist."""
- key = (event.event_id, "reminder")
- if key in self._sent_notifications:
- return
- reminder_minutes = event.reminder_minutes or self._default_reminder_minutes
- reminder_time = event.start - timedelta(minutes=reminder_minutes)
- if reminder_time <= now < event.start:
- minutes_left = int((event.start - now).total_seconds() / 60) + 1
- text = f"In {minutes_left} Minuten: {event.title}"
- await self._send_notification(event, "reminder", text)
- async def _check_start(self, event: CalendarEvent, now: datetime) -> None:
- """Prueft ob ein Terminbeginn gemeldet werden soll."""
- key = (event.event_id, "start")
- if key in self._sent_notifications:
- return
- if event.start <= now < event.start + timedelta(minutes=1):
- text = f"Termin jetzt: {event.title}"
- await self._send_notification(event, "start", text)
- async def _send_notification(
- self,
- event: CalendarEvent,
- notification_type: str,
- text: str,
- ) -> None:
- """
- Sendet eine Benachrichtigung an die betroffenen Satellites.
- Args:
- event: Das Event
- notification_type: "reminder" oder "start"
- text: TTS-Text
- """
- # Betroffene Satellites ermitteln
- calendar_key = self._sync_manager.get_calendar_key_for_event(event)
- satellite_ids = self._mapper.get_satellites_for_calendar(calendar_key)
- if not satellite_ids:
- pdebug(f"NotificationManager: Keine Satellites fuer '{calendar_key}'")
- # Als gesendet markieren (damit nicht wiederholt versucht wird)
- self._mark_sent(event.event_id, notification_type, [])
- return
- # Sound und TTS an jeden Satellite senden
- sent_to: list[str] = []
- for sat_id in satellite_ids:
- success = await self._notify_satellite(sat_id, notification_type, text)
- if success:
- sent_to.append(sat_id)
- self._mark_sent(event.event_id, notification_type, sent_to)
- pinfo(f"NotificationManager: {notification_type} fuer '{event.title}' an {len(sent_to)} Satellites gesendet")
- async def _notify_satellite(
- self,
- satellite_id: str,
- notification_type: str,
- text: str,
- ) -> bool:
- """
- Sendet Sound + TTS an einen einzelnen Satellite.
- Returns:
- True bei Erfolg
- """
- satellites = getattr(self._application, "satellites", None)
- if not satellites:
- # Standalone-Modus: TTS via Event
- await self._application.events.emit("tts_request", {
- "text": text,
- "source": "calendar",
- })
- return True
- satellite = satellites.get(satellite_id)
- if not satellite or not satellite.is_connected:
- return False
- try:
- # Sound abspielen
- pcm = self._reminder_pcm if notification_type == "reminder" else self._start_pcm
- if pcm:
- await satellite.say(pcm)
- # TTS-Meldung
- await satellite.speak(text)
- return True
- except Exception as e:
- perror(f"NotificationManager: Fehler bei Satellite '{satellite_id}': {e}")
- return False
- # =========================================================================
- # Hilfsmethoden
- # =========================================================================
- async def _check_loop(self) -> None:
- """Endlos-Schleife fuer periodische Benachrichtigungs-Pruefung."""
- try:
- while True:
- try:
- await self.check_notifications()
- except Exception as e:
- perror(f"NotificationManager: Fehler im Check-Loop: {e}")
- await asyncio.sleep(60)
- except asyncio.CancelledError:
- pass
- def _mark_sent(
- self,
- event_id: str,
- notification_type: str,
- satellite_ids: list[str],
- ) -> None:
- """Markiert eine Benachrichtigung als gesendet."""
- key = (event_id, notification_type)
- self._sent_notifications[key] = NotificationRecord(
- event_id=event_id,
- notification_type=notification_type,
- sent_at=datetime.now(),
- satellite_ids=satellite_ids,
- )
- def _cleanup_old_records(self, now: datetime) -> None:
- """Entfernt Notification-Records die aelter als 24 Stunden sind."""
- cutoff = now - timedelta(hours=24)
- expired = [
- key for key, record in self._sent_notifications.items()
- if record.sent_at < cutoff
- ]
- for key in expired:
- del self._sent_notifications[key]
- def _is_quiet_time(self, now: datetime) -> bool:
- """Prueft ob gerade Quiet Hours aktiv sind."""
- if not self._quiet_enabled:
- return False
- current_time = now.time()
- if self._quiet_start <= self._quiet_end:
- # z.B. 22:00 - 23:00
- return self._quiet_start <= current_time <= self._quiet_end
- else:
- # z.B. 22:00 - 07:00 (ueber Mitternacht)
- return current_time >= self._quiet_start or current_time <= self._quiet_end
- @staticmethod
- def _parse_time(time_str: str) -> time:
- """Parst einen Zeit-String (HH:MM) in ein time-Objekt."""
- try:
- parts = time_str.split(":")
- return time(int(parts[0]), int(parts[1]))
- except (ValueError, IndexError):
- return time(0, 0)
|