# -*- coding: utf-8 -*-
"""
Trixy Email-Service.
Core-Service fuer SMTP-Versand mit optionaler PGP-Signierung/Verschluesselung.
Kann von Plugins verwendet werden um E-Mails zu versenden.
Verwendung durch Plugins:
email_service = self.application.services.get_service("EmailService")
if email_service:
await email_service.send(to="admin@example.com", subject="Fehler", body="...")
"""
from __future__ import annotations
import asyncio
import html as html_module
import re
import smtplib
import socket
import ssl
from email import encoders
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate, make_msgid, formataddr
from typing import TYPE_CHECKING, ClassVar
from trixy_core.service.iservice import IService
from trixy_core.service.enums import ServicePriority, ServiceGroup
from trixy_core.utils.debug import pinfo, pdebug, perror, pwarn
if TYPE_CHECKING:
from trixy_core.application import IApplication
# =========================================================================
# Text-Konvertierung
# =========================================================================
def _plain_to_html(text: str) -> str:
"""Konvertiert Plain-Text in minimales HTML."""
escaped = html_module.escape(text)
paragraphs = re.split(r"\n{2,}", escaped)
body_parts = []
for p in paragraphs:
lines = p.replace("\n", "
\n")
body_parts.append(f"
{lines}
")
body_html = "\n".join(body_parts)
return (
'\n'
'\n'
'\n'
'\n'
f'{body_html}\n'
'\n'
)
def _html_to_plain(html_text: str) -> str:
"""Konvertiert HTML in Plain-Text (einfach)."""
text = re.sub(r"
", "\n", html_text, flags=re.IGNORECASE)
text = re.sub(r"", "\n\n", text, flags=re.IGNORECASE)
text = re.sub(r"", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
text = html_module.unescape(text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
# =========================================================================
# PGP-Hilfsklasse
# =========================================================================
class _PGPHelper:
"""
Kapselt PGP-Operationen via python-gnupg.
Unterstuetzt:
- PGP/MIME Signierung (RFC 3156, multipart/signed)
- PGP/MIME Verschluesselung (RFC 3156, multipart/encrypted)
- Kombiniert: Signieren + Verschluesseln
"""
def __init__(
self,
gpg_home: str,
sign_key: str,
passphrase: str,
always_sign: bool,
always_encrypt: bool,
encrypt_keys: dict[str, str],
) -> None:
self._gpg = None
self._gpg_home = gpg_home
self._sign_key = sign_key
self._passphrase = passphrase
self._always_sign = always_sign
self._always_encrypt = always_encrypt
# Mapping: E-Mail-Adresse → PGP Key-ID/Fingerprint
self._encrypt_keys = {k.lower(): v for k, v in encrypt_keys.items()}
def initialize(self) -> bool:
"""Initialisiert GnuPG. Gibt False zurueck wenn nicht verfuegbar."""
try:
import gnupg
self._gpg = gnupg.GPG(gnupghome=self._gpg_home)
# Pruefen ob der Signaturschluessel vorhanden ist
if self._sign_key:
keys = self._gpg.list_keys(True) # Private Keys
found = any(
self._sign_key.lower() in k.get("fingerprint", "").lower()
or self._sign_key.lower() in k.get("keyid", "").lower()
for k in keys
)
if not found:
pwarn(f"PGP: Signaturschluessel '{self._sign_key}' nicht im Keyring gefunden")
return True
except ImportError:
pwarn("PGP: python-gnupg nicht installiert — PGP deaktiviert")
return False
except Exception as e:
perror(f"PGP: Initialisierung fehlgeschlagen: {e}")
return False
@property
def can_sign(self) -> bool:
"""Prueft ob PGP-Signierung moeglich ist."""
return self._gpg is not None and bool(self._sign_key)
@property
def should_sign(self) -> bool:
"""Prueft ob standardmaessig signiert werden soll."""
return self._always_sign and self.can_sign
def get_encrypt_key(self, recipient: str) -> str | None:
"""Gibt den PGP-Key fuer einen Empfaenger zurueck (oder None)."""
return self._encrypt_keys.get(recipient.lower())
def should_encrypt(self, recipients: list[str]) -> bool:
"""Prueft ob fuer alle Empfaenger Schluessel vorhanden sind."""
if not self._always_encrypt or not self._gpg:
return False
return all(self.get_encrypt_key(r) for r in recipients)
def sign_message(self, mime_msg: MIMEMultipart) -> MIMEMultipart:
"""
Signiert eine MIME-Nachricht (PGP/MIME, RFC 3156).
Erzeugt multipart/signed mit:
- Original-Nachricht als erster Teil
- PGP-Signatur als zweiter Teil (application/pgp-signature)
"""
if not self._gpg or not self._sign_key:
return mime_msg
# Nachricht fuer Signierung vorbereiten (kanonische Form)
payload = mime_msg.as_string().replace("\n", "\r\n")
sig = self._gpg.sign(
payload,
keyid=self._sign_key,
passphrase=self._passphrase,
detach=True,
clearsign=False,
)
if not sig or not sig.data:
perror("PGP: Signierung fehlgeschlagen")
return mime_msg
# multipart/signed Wrapper erstellen
signed_msg = MIMEMultipart(
"signed",
micalg="pgp-sha256",
protocol="application/pgp-signature",
)
# Header vom Original uebernehmen
for key in ("From", "To", "Cc", "Subject", "Date", "Message-ID",
"MIME-Version", "Reply-To", "Organization",
"X-Mailer", "X-Auto-Response-Suppress", "Auto-Submitted"):
value = mime_msg.get(key)
if value:
signed_msg[key] = value
# Individuelle Header uebernehmen (X-Header und andere)
for key, value in mime_msg.items():
if key not in signed_msg:
signed_msg[key] = value
# Original-Nachricht als erster Teil
signed_msg.attach(mime_msg)
# Signatur als zweiter Teil
sig_part = MIMEBase("application", "pgp-signature", name="signature.asc")
sig_part.set_payload(str(sig))
sig_part.add_header("Content-Description", "OpenPGP digital signature")
sig_part.add_header("Content-Disposition", "attachment", filename="signature.asc")
signed_msg.attach(sig_part)
return signed_msg
def encrypt_message(
self, mime_msg: MIMEMultipart, recipients: list[str], sign: bool = False,
) -> MIMEMultipart:
"""
Verschluesselt eine MIME-Nachricht (PGP/MIME, RFC 3156).
Erzeugt multipart/encrypted mit:
- PGP Version-Header als erster Teil
- Verschluesselte Daten als zweiter Teil (application/octet-stream)
"""
if not self._gpg:
return mime_msg
# Empfaenger-Keys sammeln
key_ids = []
for r in recipients:
key_id = self.get_encrypt_key(r)
if not key_id:
pwarn(f"PGP: Kein Schluessel fuer '{r}' — Verschluesselung uebersprungen")
return mime_msg
key_ids.append(key_id)
payload = mime_msg.as_string().replace("\n", "\r\n")
encrypt_kwargs: dict = {
"recipients": key_ids,
"armor": True,
"always_trust": True,
}
# Optional gleichzeitig signieren
if sign and self._sign_key:
encrypt_kwargs["sign"] = self._sign_key
encrypt_kwargs["passphrase"] = self._passphrase
encrypted = self._gpg.encrypt(payload, **encrypt_kwargs)
if not encrypted.ok:
perror(f"PGP: Verschluesselung fehlgeschlagen: {encrypted.status}")
return mime_msg
# multipart/encrypted Wrapper erstellen
enc_msg = MIMEMultipart(
"encrypted",
protocol="application/pgp-encrypted",
)
# Header vom Original uebernehmen
for key in ("From", "To", "Cc", "Subject", "Date", "Message-ID",
"MIME-Version", "Reply-To", "Organization",
"X-Mailer", "X-Auto-Response-Suppress", "Auto-Submitted"):
value = mime_msg.get(key)
if value:
enc_msg[key] = value
for key, value in mime_msg.items():
if key not in enc_msg:
enc_msg[key] = value
# Teil 1: PGP Version-Identifikation
version_part = MIMEBase("application", "pgp-encrypted")
version_part.set_payload("Version: 1\n")
version_part.add_header("Content-Description", "PGP/MIME version identification")
enc_msg.attach(version_part)
# Teil 2: Verschluesselte Daten
data_part = MIMEBase("application", "octet-stream", name="encrypted.asc")
data_part.set_payload(str(encrypted))
data_part.add_header("Content-Description", "OpenPGP encrypted message")
data_part.add_header("Content-Disposition", "inline", filename="encrypted.asc")
enc_msg.attach(data_part)
return enc_msg
# =========================================================================
# EmailService
# =========================================================================
class EmailService(IService):
"""
Core-E-Mail-Service fuer SMTP-Versand mit optionaler PGP-Unterstuetzung.
Plugins koennen ueber diesen Service E-Mails versenden,
z.B. Fehlerbenachrichtigungen, Bestellungen, Reports.
Jede E-Mail wird als multipart/alternative mit Plain-Text
UND HTML-Version gesendet, um Spam-Filter zu passieren
und maximale Kompatibilitaet zu gewaehrleisten.
PGP-Unterstuetzung (optional, benoetigt python-gnupg + GnuPG):
- Signierung: Alle ausgehenden E-Mails werden mit dem konfigurierten
Schluessel signiert (PGP/MIME, RFC 3156)
- Verschluesselung: E-Mails an Empfaenger mit hinterlegtem PGP-Key
werden verschluesselt (PGP/MIME, RFC 3156)
- Pro Aufruf steuerbar via sign/encrypt Parameter
"""
NAME: ClassVar[str] = "EmailService"
PRIORITY: ClassVar[ServicePriority] = ServicePriority.OPTIONAL
GROUP: ClassVar[ServiceGroup] = ServiceGroup.UTILITY
DEPENDENCIES: ClassVar[list[str]] = []
def __init__(self, application: "IApplication", config: dict) -> None:
super().__init__(application)
self._smtp_host = config.get("smtp_host", "")
self._smtp_port = config.get("smtp_port", 587)
self._smtp_user = config.get("smtp_user", "")
self._smtp_password = config.get("smtp_password", "")
self._smtp_ssl = config.get("smtp_ssl", True)
self._smtp_starttls = config.get("smtp_starttls", True)
self._from_address = config.get("from_address", "") or self._smtp_user
self._from_name = config.get("from_name", "Trixy")
self._reply_to = config.get("reply_to", "")
self._organization = config.get("organization", "")
# PGP-Konfiguration
pgp_cfg = config.get("pgp", {})
self._pgp_enabled = pgp_cfg.get("enabled", False)
self._pgp: _PGPHelper | None = None
if self._pgp_enabled:
self._pgp = _PGPHelper(
gpg_home=pgp_cfg.get("gpg_home", "~/.gnupg"),
sign_key=pgp_cfg.get("sign_key", ""),
passphrase=pgp_cfg.get("passphrase", ""),
always_sign=pgp_cfg.get("always_sign", False),
always_encrypt=pgp_cfg.get("always_encrypt", False),
encrypt_keys=pgp_cfg.get("recipient_keys", {}),
)
async def start(self) -> None:
"""Startet den EmailService."""
if self._smtp_host:
pinfo(f"EmailService gestartet: {self._smtp_host}:{self._smtp_port}")
else:
pdebug("EmailService gestartet (kein SMTP-Host konfiguriert)")
# PGP initialisieren (im Executor, da GnuPG-Keyring geladen wird)
if self._pgp:
loop = asyncio.get_running_loop()
ok = await loop.run_in_executor(None, self._pgp.initialize)
if ok:
features = []
if self._pgp.can_sign:
features.append("Signierung")
if self._pgp._always_encrypt:
features.append("Verschluesselung")
pinfo(f"PGP aktiviert: {', '.join(features) if features else 'bereit'}")
else:
self._pgp = None
async def stop(self) -> None:
"""Stoppt den EmailService."""
pdebug("EmailService gestoppt")
@property
def is_configured(self) -> bool:
"""Prueft ob der SMTP-Versand konfiguriert ist."""
return bool(self._smtp_host and self._smtp_user and self._smtp_password)
@property
def pgp_available(self) -> bool:
"""Prueft ob PGP verfuegbar und initialisiert ist."""
return self._pgp is not None and self._pgp.can_sign
async def send(
self,
to: str | list[str],
subject: str,
body: str,
html_body: str | None = None,
cc: str | list[str] | None = None,
bcc: str | list[str] | None = None,
reply_to: str | None = None,
headers: dict[str, str] | None = None,
sign: bool | None = None,
encrypt: bool | None = None,
) -> bool:
"""
Versendet eine E-Mail via SMTP.
Die E-Mail wird immer als multipart/alternative mit Plain-Text
und HTML-Version gesendet. Wird nur ``body`` uebergeben, wird
daraus automatisch eine HTML-Version erzeugt. Wird nur
``html_body`` uebergeben, wird daraus automatisch Plain-Text
extrahiert. Werden beide uebergeben, werden beide direkt verwendet.
Args:
to: Empfaenger-Adresse(n)
subject: Betreff
body: Plain-Text Nachrichtentext
html_body: HTML-Nachrichtentext (optional, wird sonst aus body erzeugt)
cc: CC-Empfaenger (optional)
bcc: BCC-Empfaenger (optional)
reply_to: Reply-To Adresse (optional, ueberschreibt Config-Default)
headers: Zusaetzliche Header als Dict (optional)
sign: PGP-Signierung (None=Config-Default, True=erzwingen, False=deaktivieren)
encrypt: PGP-Verschluesselung (None=Config-Default, True=erzwingen, False=deaktivieren)
Returns:
True bei Erfolg, False bei Fehler
"""
if not self.is_configured:
perror("EmailService: SMTP nicht konfiguriert — E-Mail nicht gesendet")
return False
# Empfaenger normalisieren
to_list = [to] if isinstance(to, str) else list(to)
cc_list = [cc] if isinstance(cc, str) else list(cc) if cc else []
bcc_list = [bcc] if isinstance(bcc, str) else list(bcc) if bcc else []
# Plain-Text und HTML sicherstellen
if body and not html_body:
html_body = _plain_to_html(body)
elif html_body and not body:
body = _html_to_plain(html_body)
elif not body and not html_body:
body = ""
html_body = _plain_to_html("")
effective_reply_to = reply_to or self._reply_to
# PGP-Flags bestimmen
do_sign = sign if sign is not None else (self._pgp.should_sign if self._pgp else False)
all_recipients = to_list + cc_list + bcc_list
do_encrypt = encrypt if encrypt is not None else (self._pgp.should_encrypt(all_recipients) if self._pgp else False)
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(
None,
self._send_sync,
to_list, cc_list, bcc_list, subject,
body, html_body, effective_reply_to, headers,
do_sign, do_encrypt,
)
except Exception as e:
perror(f"EmailService: Versand fehlgeschlagen: {e}")
return False
def _send_sync(
self,
to_list: list[str],
cc_list: list[str],
bcc_list: list[str],
subject: str,
body: str,
html_body: str,
reply_to: str,
extra_headers: dict[str, str] | None,
do_sign: bool,
do_encrypt: bool,
) -> bool:
"""Versendet eine E-Mail synchron (fuer run_in_executor)."""
# multipart/alternative: Plain-Text + HTML
content = MIMEMultipart("alternative")
content.attach(MIMEText(body, "plain", "utf-8"))
content.attach(MIMEText(html_body, "html", "utf-8"))
# =====================================================================
# Standard-Header (RFC 5322 konform, Spam-Vermeidung)
# =====================================================================
domain = self._from_address.split("@")[-1] if "@" in self._from_address else socket.getfqdn()
content["From"] = formataddr((self._from_name, self._from_address))
content["To"] = ", ".join(to_list)
content["Subject"] = subject
content["Date"] = formatdate(localtime=True)
content["Message-ID"] = make_msgid(domain=domain)
content["MIME-Version"] = "1.0"
if cc_list:
content["Cc"] = ", ".join(cc_list)
if reply_to:
content["Reply-To"] = reply_to
if self._organization:
content["Organization"] = self._organization
# Spam-Reduktion: Kennzeichnung als automatische Mail
content["X-Mailer"] = "Trixy Email-Service"
content["X-Auto-Response-Suppress"] = "OOF, AutoReply"
content["Auto-Submitted"] = "auto-generated"
# =====================================================================
# Individuelle Header
# =====================================================================
if extra_headers:
for key, value in extra_headers.items():
if key in content:
del content[key]
content[key] = value
# =====================================================================
# PGP: Signieren und/oder Verschluesseln
# =====================================================================
msg = content
all_recipients = to_list + cc_list + bcc_list
if self._pgp and do_encrypt:
# Verschluesseln (optional gleichzeitig signieren)
msg = self._pgp.encrypt_message(content, all_recipients, sign=do_sign)
elif self._pgp and do_sign:
# Nur signieren
msg = self._pgp.sign_message(content)
# =====================================================================
# SMTP-Versand
# =====================================================================
try:
if self._smtp_ssl and self._smtp_port == 465:
context = ssl.create_default_context()
with smtplib.SMTP_SSL(self._smtp_host, self._smtp_port, context=context) as server:
server.login(self._smtp_user, self._smtp_password)
server.sendmail(self._from_address, all_recipients, msg.as_string())
else:
with smtplib.SMTP(self._smtp_host, self._smtp_port) as server:
server.ehlo()
if self._smtp_starttls:
context = ssl.create_default_context()
server.starttls(context=context)
server.ehlo()
server.login(self._smtp_user, self._smtp_password)
server.sendmail(self._from_address, all_recipients, msg.as_string())
extras = []
if do_sign:
extras.append("signiert")
if do_encrypt:
extras.append("verschluesselt")
suffix = f" ({', '.join(extras)})" if extras else ""
pdebug(f"EmailService: E-Mail gesendet an {', '.join(to_list)}{suffix}")
return True
except smtplib.SMTPAuthenticationError as e:
perror(f"EmailService: Authentifizierung fehlgeschlagen: {e}")
return False
except smtplib.SMTPException as e:
perror(f"EmailService: SMTP-Fehler: {e}")
return False
except Exception as e:
perror(f"EmailService: Unerwarteter Fehler: {e}")
return False