| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- # -*- coding: utf-8 -*-
- """
- IMAP-Client fuer das Email-Plugin.
- Stellt eine einfache Schnittstelle fuer IMAP-Operationen bereit:
- - Ungelesene E-Mails zaehlen
- - Neueste E-Mails abrufen (Header + Body)
- """
- from __future__ import annotations
- import asyncio
- import email
- import email.header
- import email.utils
- import imaplib
- import re
- from dataclasses import dataclass, field
- from datetime import datetime
- from html.parser import HTMLParser
- from trixy_core.utils.debug import pdebug, perror
- @dataclass
- class EmailMessage:
- """Einzelne E-Mail-Nachricht."""
- uid: str = ""
- subject: str = ""
- sender_name: str = ""
- sender_address: str = ""
- date: datetime | None = None
- date_str: str = ""
- body_text: str = ""
- is_unread: bool = False
- class _HTMLStripper(HTMLParser):
- """Entfernt HTML-Tags und gibt reinen Text zurueck."""
- def __init__(self) -> None:
- super().__init__()
- self._parts: list[str] = []
- self._skip = False
- def handle_starttag(self, tag: str, attrs) -> None:
- if tag in ("script", "style"):
- self._skip = True
- def handle_endtag(self, tag: str) -> None:
- if tag in ("script", "style"):
- self._skip = False
- if tag in ("br", "p", "div", "li", "tr"):
- self._parts.append("\n")
- def handle_data(self, data: str) -> None:
- if not self._skip:
- self._parts.append(data)
- def get_text(self) -> str:
- return "".join(self._parts).strip()
- def _strip_html(html_text: str) -> str:
- """Konvertiert HTML in Klartext."""
- stripper = _HTMLStripper()
- try:
- stripper.feed(html_text)
- return stripper.get_text()
- except Exception:
- # Fallback: Regex
- return re.sub(r"<[^>]+>", "", html_text).strip()
- def _decode_header(raw: str) -> str:
- """Dekodiert einen MIME-Header (Subject, From, etc.)."""
- if not raw:
- return ""
- parts = email.header.decode_header(raw)
- decoded: list[str] = []
- for data, charset in parts:
- if isinstance(data, bytes):
- decoded.append(data.decode(charset or "utf-8", errors="replace"))
- else:
- decoded.append(data)
- return " ".join(decoded)
- def _extract_body(msg: email.message.Message, max_length: int) -> str:
- """Extrahiert den Nachrichtentext (Klartext bevorzugt, HTML als Fallback)."""
- text_part = ""
- html_part = ""
- if msg.is_multipart():
- for part in msg.walk():
- content_type = part.get_content_type()
- disposition = str(part.get("Content-Disposition", ""))
- if "attachment" in disposition:
- continue
- if content_type == "text/plain" and not text_part:
- payload = part.get_payload(decode=True)
- if payload:
- charset = part.get_content_charset() or "utf-8"
- text_part = payload.decode(charset, errors="replace")
- elif content_type == "text/html" and not html_part:
- payload = part.get_payload(decode=True)
- if payload:
- charset = part.get_content_charset() or "utf-8"
- html_part = payload.decode(charset, errors="replace")
- else:
- payload = msg.get_payload(decode=True)
- if payload:
- charset = msg.get_content_charset() or "utf-8"
- content_type = msg.get_content_type()
- raw = payload.decode(charset, errors="replace")
- if content_type == "text/html":
- html_part = raw
- else:
- text_part = raw
- body = text_part if text_part else _strip_html(html_part)
- # Mehrfache Leerzeilen reduzieren
- body = re.sub(r"\n{3,}", "\n\n", body).strip()
- if len(body) > max_length:
- body = body[:max_length] + "..."
- return body
- class ImapClient:
- """
- IMAP-Client fuer ein einzelnes E-Mail-Konto.
- Alle Operationen laufen in einem Thread-Executor,
- damit der asyncio Event-Loop nicht blockiert wird.
- """
- def __init__(self, account_config: dict) -> None:
- self._host = account_config.get("host", "")
- self._port = account_config.get("port", 993)
- self._ssl = account_config.get("ssl", True)
- self._username = account_config.get("username", "")
- self._password = account_config.get("password", "")
- self._folder = account_config.get("folder", "INBOX")
- def _connect(self) -> imaplib.IMAP4 | imaplib.IMAP4_SSL:
- """Stellt eine IMAP-Verbindung her (synchron)."""
- if self._ssl:
- conn = imaplib.IMAP4_SSL(self._host, self._port)
- else:
- conn = imaplib.IMAP4(self._host, self._port)
- conn.login(self._username, self._password)
- conn.select(self._folder, readonly=True)
- return conn
- def _count_unseen_sync(self) -> int:
- """Zaehlt ungelesene E-Mails (synchron)."""
- conn = self._connect()
- try:
- _, data = conn.search(None, "UNSEEN")
- uids = data[0].split()
- return len(uids)
- finally:
- try:
- conn.close()
- conn.logout()
- except Exception:
- pass
- def _fetch_recent_sync(self, count: int, max_body_length: int) -> list[EmailMessage]:
- """Holt die neuesten E-Mails (synchron)."""
- conn = self._connect()
- try:
- # Alle UIDs holen, die letzten N nehmen
- _, data = conn.search(None, "ALL")
- all_uids = data[0].split()
- if not all_uids:
- return []
- # Ungelesene UIDs ermitteln
- _, unseen_data = conn.search(None, "UNSEEN")
- unseen_uids = set(unseen_data[0].split())
- recent_uids = all_uids[-count:]
- recent_uids.reverse() # Neuste zuerst
- messages: list[EmailMessage] = []
- for uid in recent_uids:
- _, msg_data = conn.fetch(uid, "(RFC822)")
- if not msg_data or not msg_data[0]:
- continue
- raw_email = msg_data[0][1]
- if not isinstance(raw_email, bytes):
- continue
- msg = email.message_from_bytes(raw_email)
- # Absender parsen
- from_raw = _decode_header(msg.get("From", ""))
- sender_name, sender_addr = email.utils.parseaddr(from_raw)
- if not sender_name:
- sender_name = sender_addr.split("@")[0] if sender_addr else "Unbekannt"
- # Datum parsen
- date_str = msg.get("Date", "")
- date_obj = None
- if date_str:
- try:
- date_tuple = email.utils.parsedate_to_datetime(date_str)
- date_obj = date_tuple
- date_str = date_obj.strftime("%d.%m.%Y %H:%M")
- except Exception:
- pass
- # Body extrahieren
- body = _extract_body(msg, max_body_length)
- messages.append(EmailMessage(
- uid=uid.decode() if isinstance(uid, bytes) else str(uid),
- subject=_decode_header(msg.get("Subject", "")),
- sender_name=_decode_header(sender_name),
- sender_address=sender_addr,
- date=date_obj,
- date_str=date_str,
- body_text=body,
- is_unread=uid in unseen_uids,
- ))
- return messages
- finally:
- try:
- conn.close()
- conn.logout()
- except Exception:
- pass
- async def count_unseen(self) -> int:
- """Zaehlt ungelesene E-Mails (async)."""
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(None, self._count_unseen_sync)
- async def fetch_recent(
- self, count: int = 5, max_body_length: int = 500
- ) -> list[EmailMessage]:
- """Holt die neuesten E-Mails (async)."""
- loop = asyncio.get_running_loop()
- return await loop.run_in_executor(
- None, self._fetch_recent_sync, count, max_body_length
- )
|