imap_client.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. # -*- coding: utf-8 -*-
  2. """
  3. IMAP-Client fuer das Email-Plugin.
  4. Stellt eine einfache Schnittstelle fuer IMAP-Operationen bereit:
  5. - Ungelesene E-Mails zaehlen
  6. - Neueste E-Mails abrufen (Header + Body)
  7. """
  8. from __future__ import annotations
  9. import asyncio
  10. import email
  11. import email.header
  12. import email.utils
  13. import imaplib
  14. import re
  15. from dataclasses import dataclass, field
  16. from datetime import datetime
  17. from html.parser import HTMLParser
  18. from trixy_core.utils.debug import pdebug, perror
  19. @dataclass
  20. class EmailMessage:
  21. """Einzelne E-Mail-Nachricht."""
  22. uid: str = ""
  23. subject: str = ""
  24. sender_name: str = ""
  25. sender_address: str = ""
  26. date: datetime | None = None
  27. date_str: str = ""
  28. body_text: str = ""
  29. is_unread: bool = False
  30. class _HTMLStripper(HTMLParser):
  31. """Entfernt HTML-Tags und gibt reinen Text zurueck."""
  32. def __init__(self) -> None:
  33. super().__init__()
  34. self._parts: list[str] = []
  35. self._skip = False
  36. def handle_starttag(self, tag: str, attrs) -> None:
  37. if tag in ("script", "style"):
  38. self._skip = True
  39. def handle_endtag(self, tag: str) -> None:
  40. if tag in ("script", "style"):
  41. self._skip = False
  42. if tag in ("br", "p", "div", "li", "tr"):
  43. self._parts.append("\n")
  44. def handle_data(self, data: str) -> None:
  45. if not self._skip:
  46. self._parts.append(data)
  47. def get_text(self) -> str:
  48. return "".join(self._parts).strip()
  49. def _strip_html(html_text: str) -> str:
  50. """Konvertiert HTML in Klartext."""
  51. stripper = _HTMLStripper()
  52. try:
  53. stripper.feed(html_text)
  54. return stripper.get_text()
  55. except Exception:
  56. # Fallback: Regex
  57. return re.sub(r"<[^>]+>", "", html_text).strip()
  58. def _decode_header(raw: str) -> str:
  59. """Dekodiert einen MIME-Header (Subject, From, etc.)."""
  60. if not raw:
  61. return ""
  62. parts = email.header.decode_header(raw)
  63. decoded: list[str] = []
  64. for data, charset in parts:
  65. if isinstance(data, bytes):
  66. decoded.append(data.decode(charset or "utf-8", errors="replace"))
  67. else:
  68. decoded.append(data)
  69. return " ".join(decoded)
  70. def _extract_body(msg: email.message.Message, max_length: int) -> str:
  71. """Extrahiert den Nachrichtentext (Klartext bevorzugt, HTML als Fallback)."""
  72. text_part = ""
  73. html_part = ""
  74. if msg.is_multipart():
  75. for part in msg.walk():
  76. content_type = part.get_content_type()
  77. disposition = str(part.get("Content-Disposition", ""))
  78. if "attachment" in disposition:
  79. continue
  80. if content_type == "text/plain" and not text_part:
  81. payload = part.get_payload(decode=True)
  82. if payload:
  83. charset = part.get_content_charset() or "utf-8"
  84. text_part = payload.decode(charset, errors="replace")
  85. elif content_type == "text/html" and not html_part:
  86. payload = part.get_payload(decode=True)
  87. if payload:
  88. charset = part.get_content_charset() or "utf-8"
  89. html_part = payload.decode(charset, errors="replace")
  90. else:
  91. payload = msg.get_payload(decode=True)
  92. if payload:
  93. charset = msg.get_content_charset() or "utf-8"
  94. content_type = msg.get_content_type()
  95. raw = payload.decode(charset, errors="replace")
  96. if content_type == "text/html":
  97. html_part = raw
  98. else:
  99. text_part = raw
  100. body = text_part if text_part else _strip_html(html_part)
  101. # Mehrfache Leerzeilen reduzieren
  102. body = re.sub(r"\n{3,}", "\n\n", body).strip()
  103. if len(body) > max_length:
  104. body = body[:max_length] + "..."
  105. return body
  106. class ImapClient:
  107. """
  108. IMAP-Client fuer ein einzelnes E-Mail-Konto.
  109. Alle Operationen laufen in einem Thread-Executor,
  110. damit der asyncio Event-Loop nicht blockiert wird.
  111. """
  112. def __init__(self, account_config: dict) -> None:
  113. self._host = account_config.get("host", "")
  114. self._port = account_config.get("port", 993)
  115. self._ssl = account_config.get("ssl", True)
  116. self._username = account_config.get("username", "")
  117. self._password = account_config.get("password", "")
  118. self._folder = account_config.get("folder", "INBOX")
  119. def _connect(self) -> imaplib.IMAP4 | imaplib.IMAP4_SSL:
  120. """Stellt eine IMAP-Verbindung her (synchron)."""
  121. if self._ssl:
  122. conn = imaplib.IMAP4_SSL(self._host, self._port)
  123. else:
  124. conn = imaplib.IMAP4(self._host, self._port)
  125. conn.login(self._username, self._password)
  126. conn.select(self._folder, readonly=True)
  127. return conn
  128. def _count_unseen_sync(self) -> int:
  129. """Zaehlt ungelesene E-Mails (synchron)."""
  130. conn = self._connect()
  131. try:
  132. _, data = conn.search(None, "UNSEEN")
  133. uids = data[0].split()
  134. return len(uids)
  135. finally:
  136. try:
  137. conn.close()
  138. conn.logout()
  139. except Exception:
  140. pass
  141. def _fetch_recent_sync(self, count: int, max_body_length: int) -> list[EmailMessage]:
  142. """Holt die neuesten E-Mails (synchron)."""
  143. conn = self._connect()
  144. try:
  145. # Alle UIDs holen, die letzten N nehmen
  146. _, data = conn.search(None, "ALL")
  147. all_uids = data[0].split()
  148. if not all_uids:
  149. return []
  150. # Ungelesene UIDs ermitteln
  151. _, unseen_data = conn.search(None, "UNSEEN")
  152. unseen_uids = set(unseen_data[0].split())
  153. recent_uids = all_uids[-count:]
  154. recent_uids.reverse() # Neuste zuerst
  155. messages: list[EmailMessage] = []
  156. for uid in recent_uids:
  157. _, msg_data = conn.fetch(uid, "(RFC822)")
  158. if not msg_data or not msg_data[0]:
  159. continue
  160. raw_email = msg_data[0][1]
  161. if not isinstance(raw_email, bytes):
  162. continue
  163. msg = email.message_from_bytes(raw_email)
  164. # Absender parsen
  165. from_raw = _decode_header(msg.get("From", ""))
  166. sender_name, sender_addr = email.utils.parseaddr(from_raw)
  167. if not sender_name:
  168. sender_name = sender_addr.split("@")[0] if sender_addr else "Unbekannt"
  169. # Datum parsen
  170. date_str = msg.get("Date", "")
  171. date_obj = None
  172. if date_str:
  173. try:
  174. date_tuple = email.utils.parsedate_to_datetime(date_str)
  175. date_obj = date_tuple
  176. date_str = date_obj.strftime("%d.%m.%Y %H:%M")
  177. except Exception:
  178. pass
  179. # Body extrahieren
  180. body = _extract_body(msg, max_body_length)
  181. messages.append(EmailMessage(
  182. uid=uid.decode() if isinstance(uid, bytes) else str(uid),
  183. subject=_decode_header(msg.get("Subject", "")),
  184. sender_name=_decode_header(sender_name),
  185. sender_address=sender_addr,
  186. date=date_obj,
  187. date_str=date_str,
  188. body_text=body,
  189. is_unread=uid in unseen_uids,
  190. ))
  191. return messages
  192. finally:
  193. try:
  194. conn.close()
  195. conn.logout()
  196. except Exception:
  197. pass
  198. async def count_unseen(self) -> int:
  199. """Zaehlt ungelesene E-Mails (async)."""
  200. loop = asyncio.get_running_loop()
  201. return await loop.run_in_executor(None, self._count_unseen_sync)
  202. async def fetch_recent(
  203. self, count: int = 5, max_body_length: int = 500
  204. ) -> list[EmailMessage]:
  205. """Holt die neuesten E-Mails (async)."""
  206. loop = asyncio.get_running_loop()
  207. return await loop.run_in_executor(
  208. None, self._fetch_recent_sync, count, max_body_length
  209. )