# -*- coding: utf-8 -*- """ Trixy Email-Service. Core-Service fuer SMTP-Versand mit optionaler PGP-Signierung/Verschluesselung. Kann von Plugins verwendet werden um E-Mails zu versenden. Verwendung durch Plugins: email_service = self.application.services.get_service("EmailService") if email_service: await email_service.send(to="admin@example.com", subject="Fehler", body="...") """ from __future__ import annotations import asyncio import html as html_module import re import smtplib import socket import ssl from email import encoders from email.mime.base import MIMEBase from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.utils import formatdate, make_msgid, formataddr from typing import TYPE_CHECKING, ClassVar from trixy_core.service.iservice import IService from trixy_core.service.enums import ServicePriority, ServiceGroup from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn if TYPE_CHECKING: from trixy_core.application import IApplication # ========================================================================= # Text-Konvertierung # ========================================================================= def _plain_to_html(text: str) -> str: """Konvertiert Plain-Text in minimales HTML.""" escaped = html_module.escape(text) paragraphs = re.split(r"\n{2,}", escaped) body_parts = [] for p in paragraphs: lines = p.replace("\n", "
\n") body_parts.append(f"

{lines}

") body_html = "\n".join(body_parts) return ( '\n' '\n' '\n' '\n' f'{body_html}\n' '\n' ) def _html_to_plain(html_text: str) -> str: """Konvertiert HTML in Plain-Text (einfach).""" text = re.sub(r"", "\n", html_text, flags=re.IGNORECASE) text = re.sub(r"

", "\n\n", text, flags=re.IGNORECASE) text = re.sub(r"", "\n", text, flags=re.IGNORECASE) text = re.sub(r"", "\n", text, flags=re.IGNORECASE) text = re.sub(r"<[^>]+>", "", text) text = html_module.unescape(text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() # ========================================================================= # PGP-Hilfsklasse # ========================================================================= class _PGPHelper: """ Kapselt PGP-Operationen via python-gnupg. Unterstuetzt: - PGP/MIME Signierung (RFC 3156, multipart/signed) - PGP/MIME Verschluesselung (RFC 3156, multipart/encrypted) - Kombiniert: Signieren + Verschluesseln """ def __init__( self, gpg_home: str, sign_key: str, passphrase: str, always_sign: bool, always_encrypt: bool, encrypt_keys: dict[str, str], ) -> None: self._gpg = None self._gpg_home = gpg_home self._sign_key = sign_key self._passphrase = passphrase self._always_sign = always_sign self._always_encrypt = always_encrypt # Mapping: E-Mail-Adresse → PGP Key-ID/Fingerprint self._encrypt_keys = {k.lower(): v for k, v in encrypt_keys.items()} def initialize(self) -> bool: """Initialisiert GnuPG. Gibt False zurueck wenn nicht verfuegbar.""" try: import gnupg self._gpg = gnupg.GPG(gnupghome=self._gpg_home) # Pruefen ob der Signaturschluessel vorhanden ist if self._sign_key: keys = self._gpg.list_keys(True) # Private Keys found = any( self._sign_key.lower() in k.get("fingerprint", "").lower() or self._sign_key.lower() in k.get("keyid", "").lower() for k in keys ) if not found: pwarn(f"PGP: Signaturschluessel '{self._sign_key}' nicht im Keyring gefunden") return True except ImportError: pwarn("PGP: python-gnupg nicht installiert — PGP deaktiviert") return False except Exception as e: perror(f"PGP: Initialisierung fehlgeschlagen: {e}") return False @property def can_sign(self) -> bool: """Prueft ob PGP-Signierung moeglich ist.""" return self._gpg is not None and bool(self._sign_key) @property def should_sign(self) -> bool: """Prueft ob standardmaessig signiert werden soll.""" return self._always_sign and self.can_sign def get_encrypt_key(self, recipient: str) -> str | None: """Gibt den PGP-Key fuer einen Empfaenger zurueck (oder None).""" return self._encrypt_keys.get(recipient.lower()) def should_encrypt(self, recipients: list[str]) -> bool: """Prueft ob fuer alle Empfaenger Schluessel vorhanden sind.""" if not self._always_encrypt or not self._gpg: return False return all(self.get_encrypt_key(r) for r in recipients) def sign_message(self, mime_msg: MIMEMultipart) -> MIMEMultipart: """ Signiert eine MIME-Nachricht (PGP/MIME, RFC 3156). Erzeugt multipart/signed mit: - Original-Nachricht als erster Teil - PGP-Signatur als zweiter Teil (application/pgp-signature) """ if not self._gpg or not self._sign_key: return mime_msg # Nachricht fuer Signierung vorbereiten (kanonische Form) payload = mime_msg.as_string().replace("\n", "\r\n") sig = self._gpg.sign( payload, keyid=self._sign_key, passphrase=self._passphrase, detach=True, clearsign=False, ) if not sig or not sig.data: perror("PGP: Signierung fehlgeschlagen") return mime_msg # multipart/signed Wrapper erstellen signed_msg = MIMEMultipart( "signed", micalg="pgp-sha256", protocol="application/pgp-signature", ) # Header vom Original uebernehmen for key in ("From", "To", "Cc", "Subject", "Date", "Message-ID", "MIME-Version", "Reply-To", "Organization", "X-Mailer", "X-Auto-Response-Suppress", "Auto-Submitted"): value = mime_msg.get(key) if value: signed_msg[key] = value # Individuelle Header uebernehmen (X-Header und andere) for key, value in mime_msg.items(): if key not in signed_msg: signed_msg[key] = value # Original-Nachricht als erster Teil signed_msg.attach(mime_msg) # Signatur als zweiter Teil sig_part = MIMEBase("application", "pgp-signature", name="signature.asc") sig_part.set_payload(str(sig)) sig_part.add_header("Content-Description", "OpenPGP digital signature") sig_part.add_header("Content-Disposition", "attachment", filename="signature.asc") signed_msg.attach(sig_part) return signed_msg def encrypt_message( self, mime_msg: MIMEMultipart, recipients: list[str], sign: bool = False, ) -> MIMEMultipart: """ Verschluesselt eine MIME-Nachricht (PGP/MIME, RFC 3156). Erzeugt multipart/encrypted mit: - PGP Version-Header als erster Teil - Verschluesselte Daten als zweiter Teil (application/octet-stream) """ if not self._gpg: return mime_msg # Empfaenger-Keys sammeln key_ids = [] for r in recipients: key_id = self.get_encrypt_key(r) if not key_id: pwarn(f"PGP: Kein Schluessel fuer '{r}' — Verschluesselung uebersprungen") return mime_msg key_ids.append(key_id) payload = mime_msg.as_string().replace("\n", "\r\n") encrypt_kwargs: dict = { "recipients": key_ids, "armor": True, "always_trust": True, } # Optional gleichzeitig signieren if sign and self._sign_key: encrypt_kwargs["sign"] = self._sign_key encrypt_kwargs["passphrase"] = self._passphrase encrypted = self._gpg.encrypt(payload, **encrypt_kwargs) if not encrypted.ok: perror(f"PGP: Verschluesselung fehlgeschlagen: {encrypted.status}") return mime_msg # multipart/encrypted Wrapper erstellen enc_msg = MIMEMultipart( "encrypted", protocol="application/pgp-encrypted", ) # Header vom Original uebernehmen for key in ("From", "To", "Cc", "Subject", "Date", "Message-ID", "MIME-Version", "Reply-To", "Organization", "X-Mailer", "X-Auto-Response-Suppress", "Auto-Submitted"): value = mime_msg.get(key) if value: enc_msg[key] = value for key, value in mime_msg.items(): if key not in enc_msg: enc_msg[key] = value # Teil 1: PGP Version-Identifikation version_part = MIMEBase("application", "pgp-encrypted") version_part.set_payload("Version: 1\n") version_part.add_header("Content-Description", "PGP/MIME version identification") enc_msg.attach(version_part) # Teil 2: Verschluesselte Daten data_part = MIMEBase("application", "octet-stream", name="encrypted.asc") data_part.set_payload(str(encrypted)) data_part.add_header("Content-Description", "OpenPGP encrypted message") data_part.add_header("Content-Disposition", "inline", filename="encrypted.asc") enc_msg.attach(data_part) return enc_msg # ========================================================================= # EmailService # ========================================================================= class EmailService(IService): """ Core-E-Mail-Service fuer SMTP-Versand mit optionaler PGP-Unterstuetzung. Plugins koennen ueber diesen Service E-Mails versenden, z.B. Fehlerbenachrichtigungen, Bestellungen, Reports. Jede E-Mail wird als multipart/alternative mit Plain-Text UND HTML-Version gesendet, um Spam-Filter zu passieren und maximale Kompatibilitaet zu gewaehrleisten. PGP-Unterstuetzung (optional, benoetigt python-gnupg + GnuPG): - Signierung: Alle ausgehenden E-Mails werden mit dem konfigurierten Schluessel signiert (PGP/MIME, RFC 3156) - Verschluesselung: E-Mails an Empfaenger mit hinterlegtem PGP-Key werden verschluesselt (PGP/MIME, RFC 3156) - Pro Aufruf steuerbar via sign/encrypt Parameter """ NAME: ClassVar[str] = "EmailService" PRIORITY: ClassVar[ServicePriority] = ServicePriority.OPTIONAL GROUP: ClassVar[ServiceGroup] = ServiceGroup.UTILITY DEPENDENCIES: ClassVar[list[str]] = [] def __init__(self, application: "IApplication", config: dict) -> None: super().__init__(application) self._smtp_host = config.get("smtp_host", "") self._smtp_port = config.get("smtp_port", 587) self._smtp_user = config.get("smtp_user", "") self._smtp_password = config.get("smtp_password", "") self._smtp_ssl = config.get("smtp_ssl", True) self._smtp_starttls = config.get("smtp_starttls", True) self._from_address = config.get("from_address", "") or self._smtp_user self._from_name = config.get("from_name", "Trixy") self._reply_to = config.get("reply_to", "") self._organization = config.get("organization", "") # PGP-Konfiguration pgp_cfg = config.get("pgp", {}) self._pgp_enabled = pgp_cfg.get("enabled", False) self._pgp: _PGPHelper | None = None if self._pgp_enabled: self._pgp = _PGPHelper( gpg_home=pgp_cfg.get("gpg_home", "~/.gnupg"), sign_key=pgp_cfg.get("sign_key", ""), passphrase=pgp_cfg.get("passphrase", ""), always_sign=pgp_cfg.get("always_sign", False), always_encrypt=pgp_cfg.get("always_encrypt", False), encrypt_keys=pgp_cfg.get("recipient_keys", {}), ) async def start(self) -> None: """Startet den EmailService.""" if self._smtp_host: pinfo(f"EmailService gestartet: {self._smtp_host}:{self._smtp_port}") else: pdebug("EmailService gestartet (kein SMTP-Host konfiguriert)") # PGP initialisieren (im Executor, da GnuPG-Keyring geladen wird) if self._pgp: loop = asyncio.get_running_loop() ok = await loop.run_in_executor(None, self._pgp.initialize) if ok: features = [] if self._pgp.can_sign: features.append("Signierung") if self._pgp._always_encrypt: features.append("Verschluesselung") pinfo(f"PGP aktiviert: {', '.join(features) if features else 'bereit'}") else: self._pgp = None async def stop(self) -> None: """Stoppt den EmailService.""" pdebug("EmailService gestoppt") @property def is_configured(self) -> bool: """Prueft ob der SMTP-Versand konfiguriert ist.""" return bool(self._smtp_host and self._smtp_user and self._smtp_password) @property def pgp_available(self) -> bool: """Prueft ob PGP verfuegbar und initialisiert ist.""" return self._pgp is not None and self._pgp.can_sign async def send( self, to: str | list[str], subject: str, body: str, html_body: str | None = None, cc: str | list[str] | None = None, bcc: str | list[str] | None = None, reply_to: str | None = None, headers: dict[str, str] | None = None, sign: bool | None = None, encrypt: bool | None = None, ) -> bool: """ Versendet eine E-Mail via SMTP. Die E-Mail wird immer als multipart/alternative mit Plain-Text und HTML-Version gesendet. Wird nur ``body`` uebergeben, wird daraus automatisch eine HTML-Version erzeugt. Wird nur ``html_body`` uebergeben, wird daraus automatisch Plain-Text extrahiert. Werden beide uebergeben, werden beide direkt verwendet. Args: to: Empfaenger-Adresse(n) subject: Betreff body: Plain-Text Nachrichtentext html_body: HTML-Nachrichtentext (optional, wird sonst aus body erzeugt) cc: CC-Empfaenger (optional) bcc: BCC-Empfaenger (optional) reply_to: Reply-To Adresse (optional, ueberschreibt Config-Default) headers: Zusaetzliche Header als Dict (optional) sign: PGP-Signierung (None=Config-Default, True=erzwingen, False=deaktivieren) encrypt: PGP-Verschluesselung (None=Config-Default, True=erzwingen, False=deaktivieren) Returns: True bei Erfolg, False bei Fehler """ if not self.is_configured: perror("EmailService: SMTP nicht konfiguriert — E-Mail nicht gesendet") return False # Empfaenger normalisieren to_list = [to] if isinstance(to, str) else list(to) cc_list = [cc] if isinstance(cc, str) else list(cc) if cc else [] bcc_list = [bcc] if isinstance(bcc, str) else list(bcc) if bcc else [] # Plain-Text und HTML sicherstellen if body and not html_body: html_body = _plain_to_html(body) elif html_body and not body: body = _html_to_plain(html_body) elif not body and not html_body: body = "" html_body = _plain_to_html("") effective_reply_to = reply_to or self._reply_to # PGP-Flags bestimmen do_sign = sign if sign is not None else (self._pgp.should_sign if self._pgp else False) all_recipients = to_list + cc_list + bcc_list do_encrypt = encrypt if encrypt is not None else (self._pgp.should_encrypt(all_recipients) if self._pgp else False) loop = asyncio.get_running_loop() try: return await loop.run_in_executor( None, self._send_sync, to_list, cc_list, bcc_list, subject, body, html_body, effective_reply_to, headers, do_sign, do_encrypt, ) except Exception as e: perror(f"EmailService: Versand fehlgeschlagen: {e}") return False def _send_sync( self, to_list: list[str], cc_list: list[str], bcc_list: list[str], subject: str, body: str, html_body: str, reply_to: str, extra_headers: dict[str, str] | None, do_sign: bool, do_encrypt: bool, ) -> bool: """Versendet eine E-Mail synchron (fuer run_in_executor).""" # multipart/alternative: Plain-Text + HTML content = MIMEMultipart("alternative") content.attach(MIMEText(body, "plain", "utf-8")) content.attach(MIMEText(html_body, "html", "utf-8")) # ===================================================================== # Standard-Header (RFC 5322 konform, Spam-Vermeidung) # ===================================================================== domain = self._from_address.split("@")[-1] if "@" in self._from_address else socket.getfqdn() content["From"] = formataddr((self._from_name, self._from_address)) content["To"] = ", ".join(to_list) content["Subject"] = subject content["Date"] = formatdate(localtime=True) content["Message-ID"] = make_msgid(domain=domain) content["MIME-Version"] = "1.0" if cc_list: content["Cc"] = ", ".join(cc_list) if reply_to: content["Reply-To"] = reply_to if self._organization: content["Organization"] = self._organization # Spam-Reduktion: Kennzeichnung als automatische Mail content["X-Mailer"] = "Trixy Email-Service" content["X-Auto-Response-Suppress"] = "OOF, AutoReply" content["Auto-Submitted"] = "auto-generated" # ===================================================================== # Individuelle Header # ===================================================================== if extra_headers: for key, value in extra_headers.items(): if key in content: del content[key] content[key] = value # ===================================================================== # PGP: Signieren und/oder Verschluesseln # ===================================================================== msg = content all_recipients = to_list + cc_list + bcc_list if self._pgp and do_encrypt: # Verschluesseln (optional gleichzeitig signieren) msg = self._pgp.encrypt_message(content, all_recipients, sign=do_sign) elif self._pgp and do_sign: # Nur signieren msg = self._pgp.sign_message(content) # ===================================================================== # SMTP-Versand # ===================================================================== try: if self._smtp_ssl and self._smtp_port == 465: context = ssl.create_default_context() with smtplib.SMTP_SSL(self._smtp_host, self._smtp_port, context=context) as server: server.login(self._smtp_user, self._smtp_password) server.sendmail(self._from_address, all_recipients, msg.as_string()) else: with smtplib.SMTP(self._smtp_host, self._smtp_port) as server: server.ehlo() if self._smtp_starttls: context = ssl.create_default_context() server.starttls(context=context) server.ehlo() server.login(self._smtp_user, self._smtp_password) server.sendmail(self._from_address, all_recipients, msg.as_string()) extras = [] if do_sign: extras.append("signiert") if do_encrypt: extras.append("verschluesselt") suffix = f" ({', '.join(extras)})" if extras else "" pdebug(f"EmailService: E-Mail gesendet an {', '.join(to_list)}{suffix}") return True except smtplib.SMTPAuthenticationError as e: perror(f"EmailService: Authentifizierung fehlgeschlagen: {e}") return False except smtplib.SMTPException as e: perror(f"EmailService: SMTP-Fehler: {e}") return False except Exception as e: perror(f"EmailService: Unerwarteter Fehler: {e}") return False