# -*- coding: utf-8 -*- """ FTP/SFTP Client mit persistenter Verbindung. Bietet Connect/Login/Disconnect, Upload/Download, Verzeichnis-Operationen und Datei-Listing. FTP: Verwendet ftplib (Python-Standard) SFTP: Benoetigt asyncssh (pip install asyncssh) """ from __future__ import annotations import asyncio import ftplib from dataclasses import dataclass from enum import Enum from pathlib import Path from typing import Any from trixy_core.utils.debug import pinfo, pdebug, perror class FTPProtocol(Enum): """Unterstuetzte Protokolle.""" FTP = "ftp" FTPS = "ftps" # FTP ueber TLS SFTP = "sftp" # SSH File Transfer @dataclass class FTPConfig: """FTP/SFTP Verbindungskonfiguration.""" host: str port: int = 0 # 0 = Standard (FTP: 21, SFTP: 22) username: str = "" password: str = "" protocol: FTPProtocol = FTPProtocol.FTP # SFTP-spezifisch key_file: str = "" # Pfad zum SSH Private Key known_hosts: str = "" # Pfad zu known_hosts (leer = nicht pruefen) # Timeouts connect_timeout: float = 30.0 transfer_timeout: float = 300.0 @property def effective_port(self) -> int: """Gibt den effektiven Port zurueck.""" if self.port > 0: return self.port if self.protocol == FTPProtocol.SFTP: return 22 return 21 class FTPClient: """ FTP/FTPS/SFTP Client mit persistenter Verbindung. Beispiel FTP: ```python ftp = FTPClient(FTPConfig( host="ftp.example.com", username="user", password="pass", )) await ftp.connect() files = await ftp.list_dir("/models/") await ftp.download("/models/vosk.zip", "/tmp/vosk.zip") await ftp.upload("/tmp/results.json", "/uploads/results.json") await ftp.disconnect() ``` Beispiel SFTP: ```python sftp = FTPClient(FTPConfig( host="ssh.example.com", username="pi", key_file="~/.ssh/id_rsa", protocol=FTPProtocol.SFTP, )) await sftp.connect() await sftp.download("/data/model.onnx", "./models/model.onnx") await sftp.disconnect() ``` Context-Manager: ```python async with FTPClient(config) as ftp: await ftp.download(remote, local) ``` """ def __init__(self, config: FTPConfig) -> None: self._config = config self._ftp: ftplib.FTP | None = None # FTP/FTPS self._sftp_conn = None # asyncssh Connection self._sftp_client = None # asyncssh SFTPClient self._connected = False @property def is_connected(self) -> bool: """Prueft ob eine Verbindung besteht.""" return self._connected @property def protocol(self) -> FTPProtocol: """Verwendetes Protokoll.""" return self._config.protocol # === Context Manager === async def __aenter__(self) -> "FTPClient": await self.connect() return self async def __aexit__(self, *exc) -> None: await self.disconnect() # === Verbindung === async def connect(self) -> bool: """ Stellt die Verbindung her. Returns: True bei Erfolg """ cfg = self._config if self._connected: pdebug("FTP: Bereits verbunden") return True try: if cfg.protocol == FTPProtocol.SFTP: return await self._connect_sftp() else: return await self._connect_ftp() except Exception as e: perror(f"FTP Verbindungsfehler: {cfg.host}:{cfg.effective_port} — {e}") return False async def _connect_ftp(self) -> bool: """Verbindet per FTP/FTPS.""" cfg = self._config # FTP oder FTPS if cfg.protocol == FTPProtocol.FTPS: self._ftp = ftplib.FTP_TLS() else: self._ftp = ftplib.FTP() self._ftp.connect( cfg.host, cfg.effective_port, timeout=int(cfg.connect_timeout), ) if cfg.username: self._ftp.login(cfg.username, cfg.password) else: self._ftp.login() # Anonymous # TLS-Verschluesselung aktivieren (FTPS) if cfg.protocol == FTPProtocol.FTPS and isinstance(self._ftp, ftplib.FTP_TLS): self._ftp.prot_p() self._connected = True pinfo(f"FTP verbunden: {cfg.host}:{cfg.effective_port} ({cfg.protocol.value})") return True async def _connect_sftp(self) -> bool: """Verbindet per SFTP (SSH).""" try: import asyncssh except ImportError: perror("asyncssh nicht installiert. Installieren mit: pip install asyncssh") return False cfg = self._config connect_kwargs: dict[str, Any] = { "host": cfg.host, "port": cfg.effective_port, "username": cfg.username or None, } # Authentifizierung if cfg.key_file: connect_kwargs["client_keys"] = [Path(cfg.key_file).expanduser()] elif cfg.password: connect_kwargs["password"] = cfg.password # Known Hosts if cfg.known_hosts: connect_kwargs["known_hosts"] = cfg.known_hosts else: connect_kwargs["known_hosts"] = None # Nicht pruefen self._sftp_conn = await asyncssh.connect(**connect_kwargs) self._sftp_client = await self._sftp_conn.start_sftp_client() self._connected = True pinfo(f"SFTP verbunden: {cfg.host}:{cfg.effective_port}") return True async def disconnect(self) -> None: """Trennt die Verbindung.""" if not self._connected: return try: if self._config.protocol == FTPProtocol.SFTP: if self._sftp_client: self._sftp_client.exit() self._sftp_client = None if self._sftp_conn: self._sftp_conn.close() self._sftp_conn = None else: if self._ftp: try: self._ftp.quit() except Exception: self._ftp.close() self._ftp = None except Exception: pass self._connected = False pdebug(f"FTP getrennt: {self._config.host}") # === Datei-Operationen === async def download( self, remote_path: str, local_path: str, progress_callback: "callable | None" = None, ) -> bool: """ Laedt eine Datei herunter. Args: remote_path: Pfad auf dem Server local_path: Lokaler Zielpfad progress_callback: callback(downloaded_bytes, total_bytes) Returns: True bei Erfolg """ if not self._connected: perror("FTP: Nicht verbunden") return False Path(local_path).parent.mkdir(parents=True, exist_ok=True) try: if self._config.protocol == FTPProtocol.SFTP: await self._sftp_client.get(remote_path, local_path) else: downloaded = 0 total = self._ftp.size(remote_path) or 0 with open(local_path, "wb") as f: def _write_chunk(data: bytes) -> None: nonlocal downloaded f.write(data) downloaded += len(data) if progress_callback: progress_callback(downloaded, total) self._ftp.retrbinary(f"RETR {remote_path}", _write_chunk) pdebug(f"FTP Download: {remote_path} -> {local_path}") return True except Exception as e: perror(f"FTP Download-Fehler: {remote_path} — {e}") return False async def upload( self, local_path: str, remote_path: str, progress_callback: "callable | None" = None, ) -> bool: """ Laedt eine Datei hoch. Args: local_path: Lokaler Quellpfad remote_path: Pfad auf dem Server progress_callback: callback(uploaded_bytes, total_bytes) Returns: True bei Erfolg """ if not self._connected: perror("FTP: Nicht verbunden") return False if not Path(local_path).is_file(): perror(f"FTP Upload: Datei nicht gefunden: {local_path}") return False try: if self._config.protocol == FTPProtocol.SFTP: await self._sftp_client.put(local_path, remote_path) else: total = Path(local_path).stat().st_size uploaded = 0 with open(local_path, "rb") as f: def _read_callback(data: bytes) -> None: nonlocal uploaded uploaded += len(data) if progress_callback: progress_callback(uploaded, total) self._ftp.storbinary( f"STOR {remote_path}", f, callback=_read_callback, ) pdebug(f"FTP Upload: {local_path} -> {remote_path}") return True except Exception as e: perror(f"FTP Upload-Fehler: {remote_path} — {e}") return False # === Verzeichnis-Operationen === async def list_dir(self, remote_path: str = ".") -> list[str]: """ Listet den Inhalt eines Verzeichnisses. Returns: Liste der Datei-/Verzeichnisnamen """ if not self._connected: return [] try: if self._config.protocol == FTPProtocol.SFTP: entries = await self._sftp_client.listdir(remote_path) return sorted(entries) else: return sorted(self._ftp.nlst(remote_path)) except Exception as e: perror(f"FTP Listing-Fehler: {remote_path} — {e}") return [] async def file_exists(self, remote_path: str) -> bool: """Prueft ob eine Datei auf dem Server existiert.""" if not self._connected: return False try: if self._config.protocol == FTPProtocol.SFTP: stat = await self._sftp_client.stat(remote_path) return stat is not None else: self._ftp.size(remote_path) return True except Exception: return False async def file_size(self, remote_path: str) -> int: """Gibt die Groesse einer Datei auf dem Server zurueck (Bytes).""" if not self._connected: return 0 try: if self._config.protocol == FTPProtocol.SFTP: stat = await self._sftp_client.stat(remote_path) return stat.size if stat else 0 else: return self._ftp.size(remote_path) or 0 except Exception: return 0 async def mkdir(self, remote_path: str) -> bool: """Erstellt ein Verzeichnis auf dem Server.""" if not self._connected: return False try: if self._config.protocol == FTPProtocol.SFTP: await self._sftp_client.mkdir(remote_path) else: self._ftp.mkd(remote_path) return True except Exception as e: perror(f"FTP mkdir-Fehler: {remote_path} — {e}") return False async def remove(self, remote_path: str) -> bool: """Loescht eine Datei auf dem Server.""" if not self._connected: return False try: if self._config.protocol == FTPProtocol.SFTP: await self._sftp_client.remove(remote_path) else: self._ftp.delete(remote_path) return True except Exception as e: perror(f"FTP Loeschen-Fehler: {remote_path} — {e}") return False