| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 |
- # -*- coding: utf-8 -*-
- """
- Trixy Email-Service.
- Core-Service fuer SMTP-Versand mit optionaler PGP-Signierung/Verschluesselung.
- Kann von Plugins verwendet werden um E-Mails zu versenden.
- Verwendung durch Plugins:
- email_service = self.application.services.get_service("EmailService")
- if email_service:
- await email_service.send(to="admin@example.com", subject="Fehler", body="...")
- """
- from __future__ import annotations
- import asyncio
- import html as html_module
- import re
- import smtplib
- import socket
- import ssl
- from email import encoders
- from email.mime.base import MIMEBase
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- from email.utils import formatdate, make_msgid, formataddr
- from typing import TYPE_CHECKING, ClassVar
- from trixy_core.service.iservice import IService
- from trixy_core.service.enums import ServicePriority, ServiceGroup
- from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn
- if TYPE_CHECKING:
- from trixy_core.application import IApplication
- # =========================================================================
- # Text-Konvertierung
- # =========================================================================
- def _plain_to_html(text: str) -> str:
- """Konvertiert Plain-Text in minimales HTML."""
- escaped = html_module.escape(text)
- paragraphs = re.split(r"\n{2,}", escaped)
- body_parts = []
- for p in paragraphs:
- lines = p.replace("\n", "<br>\n")
- body_parts.append(f"<p>{lines}</p>")
- body_html = "\n".join(body_parts)
- return (
- '<!DOCTYPE html>\n'
- '<html lang="de">\n'
- '<head><meta charset="utf-8"></head>\n'
- '<body style="font-family:sans-serif;font-size:14px;color:#333;">\n'
- f'{body_html}\n'
- '</body>\n</html>'
- )
- def _html_to_plain(html_text: str) -> str:
- """Konvertiert HTML in Plain-Text (einfach)."""
- text = re.sub(r"<br\s*/?>", "\n", html_text, flags=re.IGNORECASE)
- text = re.sub(r"</p>", "\n\n", text, flags=re.IGNORECASE)
- text = re.sub(r"</div>", "\n", text, flags=re.IGNORECASE)
- text = re.sub(r"</li>", "\n", text, flags=re.IGNORECASE)
- text = re.sub(r"<[^>]+>", "", text)
- text = html_module.unescape(text)
- text = re.sub(r"\n{3,}", "\n\n", text)
- return text.strip()
- # =========================================================================
- # PGP-Hilfsklasse
- # =========================================================================
- class _PGPHelper:
- """
- Kapselt PGP-Operationen via python-gnupg.
- Unterstuetzt:
- - PGP/MIME Signierung (RFC 3156, multipart/signed)
- - PGP/MIME Verschluesselung (RFC 3156, multipart/encrypted)
- - Kombiniert: Signieren + Verschluesseln
- """
- def __init__(
- self,
- gpg_home: str,
- sign_key: str,
- passphrase: str,
- always_sign: bool,
- always_encrypt: bool,
- encrypt_keys: dict[str, str],
- ) -> None:
- self._gpg = None
- self._gpg_home = gpg_home
- self._sign_key = sign_key
- self._passphrase = passphrase
- self._always_sign = always_sign
- self._always_encrypt = always_encrypt
- # Mapping: E-Mail-Adresse → PGP Key-ID/Fingerprint
- self._encrypt_keys = {k.lower(): v for k, v in encrypt_keys.items()}
- def initialize(self) -> bool:
- """Initialisiert GnuPG. Gibt False zurueck wenn nicht verfuegbar."""
- try:
- import gnupg
- self._gpg = gnupg.GPG(gnupghome=self._gpg_home)
- # Pruefen ob der Signaturschluessel vorhanden ist
- if self._sign_key:
- keys = self._gpg.list_keys(True) # Private Keys
- found = any(
- self._sign_key.lower() in k.get("fingerprint", "").lower()
- or self._sign_key.lower() in k.get("keyid", "").lower()
- for k in keys
- )
- if not found:
- pwarn(f"PGP: Signaturschluessel '{self._sign_key}' nicht im Keyring gefunden")
- return True
- except ImportError:
- pwarn("PGP: python-gnupg nicht installiert — PGP deaktiviert")
- return False
- except Exception as e:
- perror(f"PGP: Initialisierung fehlgeschlagen: {e}")
- return False
- @property
- def can_sign(self) -> bool:
- """Prueft ob PGP-Signierung moeglich ist."""
- return self._gpg is not None and bool(self._sign_key)
- @property
- def should_sign(self) -> bool:
- """Prueft ob standardmaessig signiert werden soll."""
- return self._always_sign and self.can_sign
- def get_encrypt_key(self, recipient: str) -> str | None:
- """Gibt den PGP-Key fuer einen Empfaenger zurueck (oder None)."""
- return self._encrypt_keys.get(recipient.lower())
- def should_encrypt(self, recipients: list[str]) -> bool:
- """Prueft ob fuer alle Empfaenger Schluessel vorhanden sind."""
- if not self._always_encrypt or not self._gpg:
- return False
- return all(self.get_encrypt_key(r) for r in recipients)
- def sign_message(self, mime_msg: MIMEMultipart) -> MIMEMultipart:
- """
- Signiert eine MIME-Nachricht (PGP/MIME, RFC 3156).
- Erzeugt multipart/signed mit:
- - Original-Nachricht als erster Teil
- - PGP-Signatur als zweiter Teil (application/pgp-signature)
- """
- if not self._gpg or not self._sign_key:
- return mime_msg
- # Nachricht fuer Signierung vorbereiten (kanonische Form)
- payload = mime_msg.as_string().replace("\n", "\r\n")
- sig = self._gpg.sign(
- payload,
- keyid=self._sign_key,
- passphrase=self._passphrase,
- detach=True,
- clearsign=False,
- )
- if not sig or not sig.data:
- perror("PGP: Signierung fehlgeschlagen")
- return mime_msg
- # multipart/signed Wrapper erstellen
- signed_msg = MIMEMultipart(
- "signed",
- micalg="pgp-sha256",
- protocol="application/pgp-signature",
- )
- # Header vom Original uebernehmen
- for key in ("From", "To", "Cc", "Subject", "Date", "Message-ID",
- "MIME-Version", "Reply-To", "Organization",
- "X-Mailer", "X-Auto-Response-Suppress", "Auto-Submitted"):
- value = mime_msg.get(key)
- if value:
- signed_msg[key] = value
- # Individuelle Header uebernehmen (X-Header und andere)
- for key, value in mime_msg.items():
- if key not in signed_msg:
- signed_msg[key] = value
- # Original-Nachricht als erster Teil
- signed_msg.attach(mime_msg)
- # Signatur als zweiter Teil
- sig_part = MIMEBase("application", "pgp-signature", name="signature.asc")
- sig_part.set_payload(str(sig))
- sig_part.add_header("Content-Description", "OpenPGP digital signature")
- sig_part.add_header("Content-Disposition", "attachment", filename="signature.asc")
- signed_msg.attach(sig_part)
- return signed_msg
- def encrypt_message(
- self, mime_msg: MIMEMultipart, recipients: list[str], sign: bool = False,
- ) -> MIMEMultipart:
- """
- Verschluesselt eine MIME-Nachricht (PGP/MIME, RFC 3156).
- Erzeugt multipart/encrypted mit:
- - PGP Version-Header als erster Teil
- - Verschluesselte Daten als zweiter Teil (application/octet-stream)
- """
- if not self._gpg:
- return mime_msg
- # Empfaenger-Keys sammeln
- key_ids = []
- for r in recipients:
- key_id = self.get_encrypt_key(r)
- if not key_id:
- pwarn(f"PGP: Kein Schluessel fuer '{r}' — Verschluesselung uebersprungen")
- return mime_msg
- key_ids.append(key_id)
- payload = mime_msg.as_string().replace("\n", "\r\n")
- encrypt_kwargs: dict = {
- "recipients": key_ids,
- "armor": True,
- "always_trust": True,
- }
- # Optional gleichzeitig signieren
- if sign and self._sign_key:
- encrypt_kwargs["sign"] = self._sign_key
- encrypt_kwargs["passphrase"] = self._passphrase
- encrypted = self._gpg.encrypt(payload, **encrypt_kwargs)
- if not encrypted.ok:
- perror(f"PGP: Verschluesselung fehlgeschlagen: {encrypted.status}")
- return mime_msg
- # multipart/encrypted Wrapper erstellen
- enc_msg = MIMEMultipart(
- "encrypted",
- protocol="application/pgp-encrypted",
- )
- # Header vom Original uebernehmen
- for key in ("From", "To", "Cc", "Subject", "Date", "Message-ID",
- "MIME-Version", "Reply-To", "Organization",
- "X-Mailer", "X-Auto-Response-Suppress", "Auto-Submitted"):
- value = mime_msg.get(key)
- if value:
- enc_msg[key] = value
- for key, value in mime_msg.items():
- if key not in enc_msg:
- enc_msg[key] = value
- # Teil 1: PGP Version-Identifikation
- version_part = MIMEBase("application", "pgp-encrypted")
- version_part.set_payload("Version: 1\n")
- version_part.add_header("Content-Description", "PGP/MIME version identification")
- enc_msg.attach(version_part)
- # Teil 2: Verschluesselte Daten
- data_part = MIMEBase("application", "octet-stream", name="encrypted.asc")
- data_part.set_payload(str(encrypted))
- data_part.add_header("Content-Description", "OpenPGP encrypted message")
- data_part.add_header("Content-Disposition", "inline", filename="encrypted.asc")
- enc_msg.attach(data_part)
- return enc_msg
- # =========================================================================
- # EmailService
- # =========================================================================
- class EmailService(IService):
- """
- Core-E-Mail-Service fuer SMTP-Versand mit optionaler PGP-Unterstuetzung.
- Plugins koennen ueber diesen Service E-Mails versenden,
- z.B. Fehlerbenachrichtigungen, Bestellungen, Reports.
- Jede E-Mail wird als multipart/alternative mit Plain-Text
- UND HTML-Version gesendet, um Spam-Filter zu passieren
- und maximale Kompatibilitaet zu gewaehrleisten.
- PGP-Unterstuetzung (optional, benoetigt python-gnupg + GnuPG):
- - Signierung: Alle ausgehenden E-Mails werden mit dem konfigurierten
- Schluessel signiert (PGP/MIME, RFC 3156)
- - Verschluesselung: E-Mails an Empfaenger mit hinterlegtem PGP-Key
- werden verschluesselt (PGP/MIME, RFC 3156)
- - Pro Aufruf steuerbar via sign/encrypt Parameter
- """
- NAME: ClassVar[str] = "EmailService"
- PRIORITY: ClassVar[ServicePriority] = ServicePriority.OPTIONAL
- GROUP: ClassVar[ServiceGroup] = ServiceGroup.UTILITY
- DEPENDENCIES: ClassVar[list[str]] = []
- def __init__(self, application: "IApplication", config: dict) -> None:
- super().__init__(application)
- self._smtp_host = config.get("smtp_host", "")
- self._smtp_port = config.get("smtp_port", 587)
- self._smtp_user = config.get("smtp_user", "")
- self._smtp_password = config.get("smtp_password", "")
- self._smtp_ssl = config.get("smtp_ssl", True)
- self._smtp_starttls = config.get("smtp_starttls", True)
- self._from_address = config.get("from_address", "") or self._smtp_user
- self._from_name = config.get("from_name", "Trixy")
- self._reply_to = config.get("reply_to", "")
- self._organization = config.get("organization", "")
- # PGP-Konfiguration
- pgp_cfg = config.get("pgp", {})
- self._pgp_enabled = pgp_cfg.get("enabled", False)
- self._pgp: _PGPHelper | None = None
- if self._pgp_enabled:
- self._pgp = _PGPHelper(
- gpg_home=pgp_cfg.get("gpg_home", "~/.gnupg"),
- sign_key=pgp_cfg.get("sign_key", ""),
- passphrase=pgp_cfg.get("passphrase", ""),
- always_sign=pgp_cfg.get("always_sign", False),
- always_encrypt=pgp_cfg.get("always_encrypt", False),
- encrypt_keys=pgp_cfg.get("recipient_keys", {}),
- )
- async def start(self) -> None:
- """Startet den EmailService."""
- if self._smtp_host:
- pinfo(f"EmailService gestartet: {self._smtp_host}:{self._smtp_port}")
- else:
- pdebug("EmailService gestartet (kein SMTP-Host konfiguriert)")
- # PGP initialisieren (im Executor, da GnuPG-Keyring geladen wird)
- if self._pgp:
- loop = asyncio.get_running_loop()
- ok = await loop.run_in_executor(None, self._pgp.initialize)
- if ok:
- features = []
- if self._pgp.can_sign:
- features.append("Signierung")
- if self._pgp._always_encrypt:
- features.append("Verschluesselung")
- pinfo(f"PGP aktiviert: {', '.join(features) if features else 'bereit'}")
- else:
- self._pgp = None
- async def stop(self) -> None:
- """Stoppt den EmailService."""
- pdebug("EmailService gestoppt")
- @property
- def is_configured(self) -> bool:
- """Prueft ob der SMTP-Versand konfiguriert ist."""
- return bool(self._smtp_host and self._smtp_user and self._smtp_password)
- @property
- def pgp_available(self) -> bool:
- """Prueft ob PGP verfuegbar und initialisiert ist."""
- return self._pgp is not None and self._pgp.can_sign
- async def send(
- self,
- to: str | list[str],
- subject: str,
- body: str,
- html_body: str | None = None,
- cc: str | list[str] | None = None,
- bcc: str | list[str] | None = None,
- reply_to: str | None = None,
- headers: dict[str, str] | None = None,
- sign: bool | None = None,
- encrypt: bool | None = None,
- ) -> bool:
- """
- Versendet eine E-Mail via SMTP.
- Die E-Mail wird immer als multipart/alternative mit Plain-Text
- und HTML-Version gesendet. Wird nur ``body`` uebergeben, wird
- daraus automatisch eine HTML-Version erzeugt. Wird nur
- ``html_body`` uebergeben, wird daraus automatisch Plain-Text
- extrahiert. Werden beide uebergeben, werden beide direkt verwendet.
- Args:
- to: Empfaenger-Adresse(n)
- subject: Betreff
- body: Plain-Text Nachrichtentext
- html_body: HTML-Nachrichtentext (optional, wird sonst aus body erzeugt)
- cc: CC-Empfaenger (optional)
- bcc: BCC-Empfaenger (optional)
- reply_to: Reply-To Adresse (optional, ueberschreibt Config-Default)
- headers: Zusaetzliche Header als Dict (optional)
- sign: PGP-Signierung (None=Config-Default, True=erzwingen, False=deaktivieren)
- encrypt: PGP-Verschluesselung (None=Config-Default, True=erzwingen, False=deaktivieren)
- Returns:
- True bei Erfolg, False bei Fehler
- """
- if not self.is_configured:
- perror("EmailService: SMTP nicht konfiguriert — E-Mail nicht gesendet")
- return False
- # Empfaenger normalisieren
- to_list = [to] if isinstance(to, str) else list(to)
- cc_list = [cc] if isinstance(cc, str) else list(cc) if cc else []
- bcc_list = [bcc] if isinstance(bcc, str) else list(bcc) if bcc else []
- # Plain-Text und HTML sicherstellen
- if body and not html_body:
- html_body = _plain_to_html(body)
- elif html_body and not body:
- body = _html_to_plain(html_body)
- elif not body and not html_body:
- body = ""
- html_body = _plain_to_html("")
- effective_reply_to = reply_to or self._reply_to
- # PGP-Flags bestimmen
- do_sign = sign if sign is not None else (self._pgp.should_sign if self._pgp else False)
- all_recipients = to_list + cc_list + bcc_list
- do_encrypt = encrypt if encrypt is not None else (self._pgp.should_encrypt(all_recipients) if self._pgp else False)
- loop = asyncio.get_running_loop()
- try:
- return await loop.run_in_executor(
- None,
- self._send_sync,
- to_list, cc_list, bcc_list, subject,
- body, html_body, effective_reply_to, headers,
- do_sign, do_encrypt,
- )
- except Exception as e:
- perror(f"EmailService: Versand fehlgeschlagen: {e}")
- return False
- def _send_sync(
- self,
- to_list: list[str],
- cc_list: list[str],
- bcc_list: list[str],
- subject: str,
- body: str,
- html_body: str,
- reply_to: str,
- extra_headers: dict[str, str] | None,
- do_sign: bool,
- do_encrypt: bool,
- ) -> bool:
- """Versendet eine E-Mail synchron (fuer run_in_executor)."""
- # multipart/alternative: Plain-Text + HTML
- content = MIMEMultipart("alternative")
- content.attach(MIMEText(body, "plain", "utf-8"))
- content.attach(MIMEText(html_body, "html", "utf-8"))
- # =====================================================================
- # Standard-Header (RFC 5322 konform, Spam-Vermeidung)
- # =====================================================================
- domain = self._from_address.split("@")[-1] if "@" in self._from_address else socket.getfqdn()
- content["From"] = formataddr((self._from_name, self._from_address))
- content["To"] = ", ".join(to_list)
- content["Subject"] = subject
- content["Date"] = formatdate(localtime=True)
- content["Message-ID"] = make_msgid(domain=domain)
- content["MIME-Version"] = "1.0"
- if cc_list:
- content["Cc"] = ", ".join(cc_list)
- if reply_to:
- content["Reply-To"] = reply_to
- if self._organization:
- content["Organization"] = self._organization
- # Spam-Reduktion: Kennzeichnung als automatische Mail
- content["X-Mailer"] = "Trixy Email-Service"
- content["X-Auto-Response-Suppress"] = "OOF, AutoReply"
- content["Auto-Submitted"] = "auto-generated"
- # =====================================================================
- # Individuelle Header
- # =====================================================================
- if extra_headers:
- for key, value in extra_headers.items():
- if key in content:
- del content[key]
- content[key] = value
- # =====================================================================
- # PGP: Signieren und/oder Verschluesseln
- # =====================================================================
- msg = content
- all_recipients = to_list + cc_list + bcc_list
- if self._pgp and do_encrypt:
- # Verschluesseln (optional gleichzeitig signieren)
- msg = self._pgp.encrypt_message(content, all_recipients, sign=do_sign)
- elif self._pgp and do_sign:
- # Nur signieren
- msg = self._pgp.sign_message(content)
- # =====================================================================
- # SMTP-Versand
- # =====================================================================
- try:
- if self._smtp_ssl and self._smtp_port == 465:
- context = ssl.create_default_context()
- with smtplib.SMTP_SSL(self._smtp_host, self._smtp_port, context=context) as server:
- server.login(self._smtp_user, self._smtp_password)
- server.sendmail(self._from_address, all_recipients, msg.as_string())
- else:
- with smtplib.SMTP(self._smtp_host, self._smtp_port) as server:
- server.ehlo()
- if self._smtp_starttls:
- context = ssl.create_default_context()
- server.starttls(context=context)
- server.ehlo()
- server.login(self._smtp_user, self._smtp_password)
- server.sendmail(self._from_address, all_recipients, msg.as_string())
- extras = []
- if do_sign:
- extras.append("signiert")
- if do_encrypt:
- extras.append("verschluesselt")
- suffix = f" ({', '.join(extras)})" if extras else ""
- pdebug(f"EmailService: E-Mail gesendet an {', '.join(to_list)}{suffix}")
- return True
- except smtplib.SMTPAuthenticationError as e:
- perror(f"EmailService: Authentifizierung fehlgeschlagen: {e}")
- return False
- except smtplib.SMTPException as e:
- perror(f"EmailService: SMTP-Fehler: {e}")
- return False
- except Exception as e:
- perror(f"EmailService: Unerwarteter Fehler: {e}")
- return False
|