store.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # -*- coding: utf-8 -*-
  2. """
  3. SyncStore — JSON-per-Entity Persistenz fuer synchronisierbare Daten.
  4. Speicherlayout: {base_directory}/{data_type}/{item_id}.json
  5. Folgt dem Scheduler-Pattern (./cron/*.json).
  6. """
  7. import json
  8. import threading
  9. from pathlib import Path
  10. from typing import Any
  11. from trixy_core.sync.models import SyncItem, _now_iso
  12. from trixy_core.utils.debug import pdebug, perror
  13. class SyncStore:
  14. """
  15. Persistenter Speicher fuer SyncItems.
  16. Speichert jedes Item als einzelne JSON-Datei.
  17. Thread-safe durch Lock.
  18. """
  19. def __init__(self, base_directory: str = "./sync_data") -> None:
  20. """
  21. Initialisiert den SyncStore.
  22. Args:
  23. base_directory: Basis-Verzeichnis fuer Sync-Daten
  24. """
  25. self._base_dir = Path(base_directory)
  26. self._base_dir.mkdir(parents=True, exist_ok=True)
  27. self._lock = threading.Lock()
  28. @property
  29. def base_directory(self) -> Path:
  30. """Basis-Verzeichnis."""
  31. return self._base_dir
  32. # =========================================================================
  33. # CRUD
  34. # =========================================================================
  35. def save(self, item: SyncItem) -> None:
  36. """
  37. Speichert ein SyncItem als JSON-Datei.
  38. Args:
  39. item: Das zu speichernde Item
  40. """
  41. with self._lock:
  42. item_dir = self._base_dir / item.data_type
  43. item_dir.mkdir(parents=True, exist_ok=True)
  44. file_path = item_dir / f"{item.item_id}.json"
  45. try:
  46. file_path.write_text(
  47. json.dumps(item.to_dict(), ensure_ascii=False, indent=2),
  48. encoding="utf-8",
  49. )
  50. except Exception as e:
  51. perror(f"SyncStore: Fehler beim Speichern von {item.item_id}: {e}")
  52. def get(self, data_type: str, item_id: str) -> SyncItem | None:
  53. """
  54. Laedt ein einzelnes SyncItem.
  55. Args:
  56. data_type: Datentyp (z.B. "notes.note")
  57. item_id: Eindeutige ID
  58. Returns:
  59. SyncItem oder None wenn nicht gefunden
  60. """
  61. file_path = self._base_dir / data_type / f"{item_id}.json"
  62. if not file_path.exists():
  63. return None
  64. try:
  65. data = json.loads(file_path.read_text(encoding="utf-8"))
  66. return SyncItem.from_dict(data)
  67. except Exception as e:
  68. perror(f"SyncStore: Fehler beim Laden von {item_id}: {e}")
  69. return None
  70. def get_all(self, data_type: str) -> list[SyncItem]:
  71. """
  72. Laedt alle SyncItems eines Datentyps (ohne Soft-Deleted).
  73. Args:
  74. data_type: Datentyp
  75. Returns:
  76. Liste der aktiven Items
  77. """
  78. type_dir = self._base_dir / data_type
  79. if not type_dir.exists():
  80. return []
  81. items: list[SyncItem] = []
  82. for file_path in type_dir.glob("*.json"):
  83. try:
  84. data = json.loads(file_path.read_text(encoding="utf-8"))
  85. item = SyncItem.from_dict(data)
  86. if not item.deleted:
  87. items.append(item)
  88. except Exception as e:
  89. perror(f"SyncStore: Fehler beim Laden von {file_path.name}: {e}")
  90. return items
  91. def delete(self, data_type: str, item_id: str) -> None:
  92. """
  93. Soft-Delete eines Items (setzt deleted=True, aktualisiert updated_at).
  94. Args:
  95. data_type: Datentyp
  96. item_id: Item-ID
  97. """
  98. item = self.get(data_type, item_id)
  99. if item is None:
  100. return
  101. item.deleted = True
  102. item.updated_at = _now_iso()
  103. item.version += 1
  104. self.save(item)
  105. # =========================================================================
  106. # Sync-spezifisch
  107. # =========================================================================
  108. def get_changes_since(self, since: str) -> list[SyncItem]:
  109. """
  110. Gibt alle Items zurueck, die seit 'since' geaendert wurden.
  111. Args:
  112. since: ISO 8601 Zeitpunkt
  113. Returns:
  114. Liste geaenderter Items (inkl. Soft-Deleted)
  115. """
  116. results: list[SyncItem] = []
  117. if not self._base_dir.exists():
  118. return results
  119. for type_dir in self._base_dir.iterdir():
  120. if not type_dir.is_dir() or type_dir.name.startswith("."):
  121. continue
  122. for file_path in type_dir.glob("*.json"):
  123. try:
  124. data = json.loads(file_path.read_text(encoding="utf-8"))
  125. item = SyncItem.from_dict(data)
  126. if item.updated_at > since:
  127. results.append(item)
  128. except Exception as e:
  129. perror(f"SyncStore: Fehler beim Lesen von {file_path}: {e}")
  130. return results
  131. def merge_incoming(self, item: SyncItem) -> bool:
  132. """
  133. Merged ein eingehendes Item (Last-Write-Wins).
  134. - Lokal nicht vorhanden → speichern
  135. - incoming.updated_at > local.updated_at → ueberschreiben
  136. - Sonst → ignorieren
  137. Args:
  138. item: Das eingehende Item
  139. Returns:
  140. True wenn lokal aktualisiert wurde
  141. """
  142. local = self.get(item.data_type, item.item_id)
  143. if local is None:
  144. # Lokal nicht vorhanden -> speichern
  145. self.save(item)
  146. pdebug(f"SyncStore: Neues Item gemerged: {item.item_id}")
  147. return True
  148. if item.updated_at > local.updated_at:
  149. # Eingehendes Item ist neuer -> ueberschreiben
  150. self.save(item)
  151. pdebug(f"SyncStore: Item aktualisiert: {item.item_id}")
  152. return True
  153. # Lokale Version ist neuer oder gleich -> ignorieren
  154. return False
  155. # =========================================================================
  156. # Sync-Zeitpunkt
  157. # =========================================================================
  158. def get_last_sync_time(self) -> str:
  159. """
  160. Liest den letzten Sync-Zeitpunkt aus .last_sync Datei.
  161. Returns:
  162. ISO 8601 Zeitpunkt oder leerer String
  163. """
  164. sync_file = self._base_dir / ".last_sync"
  165. if not sync_file.exists():
  166. return ""
  167. try:
  168. return sync_file.read_text(encoding="utf-8").strip()
  169. except Exception:
  170. return ""
  171. def set_last_sync_time(self, ts: str) -> None:
  172. """
  173. Speichert den letzten Sync-Zeitpunkt.
  174. Args:
  175. ts: ISO 8601 Zeitpunkt
  176. """
  177. sync_file = self._base_dir / ".last_sync"
  178. try:
  179. sync_file.write_text(ts, encoding="utf-8")
  180. except Exception as e:
  181. perror(f"SyncStore: Fehler beim Schreiben von .last_sync: {e}")