""" Main PluginManager Class for Trixy Application This module provides the central PluginManager class that orchestrates: - Plugin discovery and lifecycle management - Thread-safe plugin operations - Plugin health monitoring and error isolation - Hot-reload capability for development - Integration with application container and event system - Plugin enable/disable functionality - Plugin dependency management - Configuration management - Plugin statistics and reporting The PluginManager serves as the main interface for the plugin system, providing high-level operations while delegating to specialized components: - PluginLoader: Dynamic loading and unloading - PluginConfigHandler: Configuration management - TrixyPlugin: Base plugin class and lifecycle Features: - Thread-safe plugin operations - Automatic plugin discovery - Lifecycle management (load, initialize, enable, disable, unload) - Health monitoring with error isolation - Hot-reload for development - Event system integration - Configuration management - Dependency resolution - Plugin statistics and reporting """ import os import threading import time import weakref from typing import Dict, List, Optional, Any, Set, Callable, Union from pathlib import Path from dataclasses import dataclass, field from enum import Enum import json # Import plugin system components from .trixy_plugin import TrixyPlugin, PluginState, PluginError, PluginMetadata from .plugin_loader import PluginLoader, PluginLoadResult, LoadResult from .config_handler import PluginConfigHandler # Import event system for integration try: from ..events import TrixyEvent except ImportError: TrixyEvent = None def pprint(message: str) -> None: """Plugin manager logging function.""" print(f"[PLUGIN_MGR] {message}") class PluginHealthStatus(Enum): """Plugin health status levels.""" HEALTHY = "healthy" WARNING = "warning" ERROR = "error" CRITICAL = "critical" UNKNOWN = "unknown" @dataclass class PluginInfo: """Comprehensive plugin information.""" name: str state: PluginState enabled: bool health_status: PluginHealthStatus load_time: Optional[float] = None init_time: Optional[float] = None last_error: Optional[str] = None error_count: int = 0 metadata: Optional[PluginMetadata] = None dependencies: List[str] = field(default_factory=list) path: Optional[Path] = None config_keys: Set[str] = field(default_factory=set) last_health_check: float = field(default_factory=time.time) @property def is_healthy(self) -> bool: """Check if plugin is healthy.""" return self.health_status == PluginHealthStatus.HEALTHY @property def is_active(self) -> bool: """Check if plugin is active (enabled and running).""" return self.enabled and self.state == PluginState.ENABLED class PluginManagerError(Exception): """Base exception for plugin manager errors.""" pass class PluginManager: """ Central plugin management system for Trixy Application. This class provides the main interface for plugin operations: - Loading and unloading plugins - Enabling and disabling plugins - Health monitoring and error handling - Configuration management - Hot-reload for development - Plugin discovery and lifecycle management - Thread-safe operations - Integration with event system Usage: # Create plugin manager plugin_manager = PluginManager( application=app_container, plugins_directory=Path("./plugins") ) # Load all plugins plugin_manager.load_all_plugins() # Enable/disable specific plugins plugin_manager.enable_plugin("my_plugin") plugin_manager.disable_plugin("my_plugin") # Get plugin information info = plugin_manager.get_plugin_info("my_plugin") # Health monitoring health_report = plugin_manager.get_health_report() # Hot reload for development plugin_manager.reload_plugin("my_plugin") """ def __init__( self, application: Any, plugins_directory: Optional[Path] = None, auto_load: bool = True, enable_hot_reload: bool = False, health_check_interval: float = 60.0, max_error_count: int = 5 ): """ Initialize the plugin manager. Args: application: Application container instance plugins_directory: Directory containing plugins (defaults to ./plugins) auto_load: Automatically load plugins on initialization enable_hot_reload: Enable hot-reload for development health_check_interval: Interval between health checks in seconds max_error_count: Maximum errors before marking plugin as critical """ self.application = application self.application_ref = weakref.ref(application) # Plugin directory setup if plugins_directory is None: plugins_directory = Path.cwd() / "plugins" self.plugins_directory = Path(plugins_directory) # Configuration self.auto_load = auto_load self.enable_hot_reload = enable_hot_reload self.health_check_interval = health_check_interval self.max_error_count = max_error_count # Plugin tracking self._plugins: Dict[str, TrixyPlugin] = {} self._plugin_info: Dict[str, PluginInfo] = {} self._plugin_configs: Dict[str, PluginConfigHandler] = {} self._plugins_lock = threading.RLock() # Plugin loader self._loader = PluginLoader( plugins_directory=self.plugins_directory, application=application, auto_reload=enable_hot_reload ) # Health monitoring self._health_monitor_thread: Optional[threading.Thread] = None self._health_monitor_shutdown = threading.Event() self._last_health_check = 0.0 # Event callbacks self._event_callbacks: List[Callable[[str, str, Any], None]] = [] # Statistics self._stats = { 'total_plugins': 0, 'loaded_plugins': 0, 'enabled_plugins': 0, 'healthy_plugins': 0, 'error_plugins': 0, 'total_loads': 0, 'total_enables': 0, 'total_disables': 0, 'total_reloads': 0, 'uptime': time.time() } pprint(f"PluginManager initialized with directory: {self.plugins_directory}") # Start health monitoring if self.health_check_interval > 0: self._start_health_monitor() # Auto-load plugins if requested if self.auto_load: self.load_all_plugins() # Core Plugin Management Methods def load_all_plugins(self) -> Dict[str, PluginLoadResult]: """ Load all plugins from the plugins directory. Returns: Dict mapping plugin names to their load results """ pprint("Loading all plugins...") results = self._loader.load_all_plugins() with self._plugins_lock: # Process load results for plugin_name, result in results.items(): if result.success and result.plugin_instance: self._register_plugin(plugin_name, result.plugin_instance) # Initialize plugin if not in error state try: if result.plugin_instance.state == PluginState.LOADED: result.plugin_instance.initialize_plugin() # Enable plugin if configured to be enabled if result.plugin_instance.enabled: result.plugin_instance.enable_plugin() self._stats['total_enables'] += 1 except Exception as e: pprint(f"Failed to initialize plugin {plugin_name}: {e}") self._update_plugin_error(plugin_name, str(e)) # Update statistics self._update_stats() successful = len([r for r in results.values() if r.success]) pprint(f"Plugin loading complete: {successful}/{len(results)} plugins loaded") return results def load_plugin(self, plugin_name: str) -> PluginLoadResult: """ Load a specific plugin. Args: plugin_name: Name of the plugin to load Returns: PluginLoadResult with load status """ pprint(f"Loading plugin: {plugin_name}") result = self._loader.load_plugin(plugin_name) self._stats['total_loads'] += 1 if result.success and result.plugin_instance: with self._plugins_lock: self._register_plugin(plugin_name, result.plugin_instance) # Initialize plugin try: if result.plugin_instance.state == PluginState.LOADED: result.plugin_instance.initialize_plugin() # Enable if configured if result.plugin_instance.enabled: result.plugin_instance.enable_plugin() self._stats['total_enables'] += 1 except Exception as e: pprint(f"Failed to initialize plugin {plugin_name}: {e}") self._update_plugin_error(plugin_name, str(e)) self._update_stats() return result def unload_plugin(self, plugin_name: str) -> bool: """ Unload a specific plugin. Args: plugin_name: Name of plugin to unload Returns: bool: True if successfully unloaded """ pprint(f"Unloading plugin: {plugin_name}") with self._plugins_lock: if plugin_name in self._plugins: # Disable first if enabled if self._plugins[plugin_name].state == PluginState.ENABLED: self.disable_plugin(plugin_name) # Unregister plugin self._unregister_plugin(plugin_name) # Unload from loader success = self._loader.unload_plugin(plugin_name) if success: pprint(f"Plugin {plugin_name} unloaded successfully") self._update_stats() return success def enable_plugin(self, plugin_name: str) -> bool: """ Enable a plugin. Args: plugin_name: Name of plugin to enable Returns: bool: True if successfully enabled """ with self._plugins_lock: if plugin_name not in self._plugins: pprint(f"Plugin {plugin_name} not loaded") return False plugin = self._plugins[plugin_name] try: if plugin.state in [PluginState.INITIALIZED, PluginState.DISABLED]: plugin.enable_plugin() self._update_plugin_info(plugin_name) self._stats['total_enables'] += 1 self._notify_event("plugin_enabled", plugin_name, {"state": plugin.state.value}) pprint(f"Plugin {plugin_name} enabled") return True else: pprint(f"Plugin {plugin_name} cannot be enabled from state {plugin.state.value}") return False except Exception as e: error_msg = f"Failed to enable plugin {plugin_name}: {e}" pprint(error_msg) self._update_plugin_error(plugin_name, error_msg) return False def disable_plugin(self, plugin_name: str) -> bool: """ Disable a plugin. Args: plugin_name: Name of plugin to disable Returns: bool: True if successfully disabled """ with self._plugins_lock: if plugin_name not in self._plugins: pprint(f"Plugin {plugin_name} not loaded") return False plugin = self._plugins[plugin_name] try: if plugin.state in [PluginState.ENABLED, PluginState.INITIALIZED]: plugin.disable_plugin() self._update_plugin_info(plugin_name) self._stats['total_disables'] += 1 self._notify_event("plugin_disabled", plugin_name, {"state": plugin.state.value}) pprint(f"Plugin {plugin_name} disabled") return True else: pprint(f"Plugin {plugin_name} cannot be disabled from state {plugin.state.value}") return False except Exception as e: error_msg = f"Failed to disable plugin {plugin_name}: {e}" pprint(error_msg) self._update_plugin_error(plugin_name, error_msg) return False def reload_plugin(self, plugin_name: str) -> PluginLoadResult: """ Reload a plugin (hot-reload). Args: plugin_name: Name of plugin to reload Returns: PluginLoadResult with reload status """ pprint(f"Reloading plugin: {plugin_name}") # Unload first self.unload_plugin(plugin_name) # Load again result = self.load_plugin(plugin_name) self._stats['total_reloads'] += 1 if result.success: self._notify_event("plugin_reloaded", plugin_name, {"load_time": result.load_time}) return result # Plugin Registration and Tracking def _register_plugin(self, plugin_name: str, plugin_instance: TrixyPlugin) -> None: """Register a plugin instance.""" self._plugins[plugin_name] = plugin_instance # Create plugin info info = PluginInfo( name=plugin_name, state=plugin_instance.state, enabled=plugin_instance.enabled, health_status=PluginHealthStatus.HEALTHY, load_time=plugin_instance.load_time, init_time=plugin_instance.init_time, metadata=plugin_instance.metadata, path=plugin_instance.plugin_path ) self._plugin_info[plugin_name] = info # Create config handler config_handler = PluginConfigHandler( config_path=plugin_instance.plugin_path / "config.json", plugin_name=plugin_name ) self._plugin_configs[plugin_name] = config_handler pprint(f"Plugin {plugin_name} registered") def _unregister_plugin(self, plugin_name: str) -> None: """Unregister a plugin.""" if plugin_name in self._plugins: del self._plugins[plugin_name] if plugin_name in self._plugin_info: del self._plugin_info[plugin_name] if plugin_name in self._plugin_configs: del self._plugin_configs[plugin_name] pprint(f"Plugin {plugin_name} unregistered") def _update_plugin_info(self, plugin_name: str) -> None: """Update plugin information.""" if plugin_name not in self._plugins or plugin_name not in self._plugin_info: return plugin = self._plugins[plugin_name] info = self._plugin_info[plugin_name] # Update basic info info.state = plugin.state info.enabled = plugin.enabled info.last_error = plugin.last_error info.error_count = plugin.error_count info.last_health_check = time.time() # Update health status if plugin.last_error: if plugin.error_count >= self.max_error_count: info.health_status = PluginHealthStatus.CRITICAL else: info.health_status = PluginHealthStatus.ERROR elif plugin.state == PluginState.ERROR: info.health_status = PluginHealthStatus.ERROR elif plugin.state in [PluginState.ENABLED, PluginState.INITIALIZED]: info.health_status = PluginHealthStatus.HEALTHY else: info.health_status = PluginHealthStatus.WARNING def _update_plugin_error(self, plugin_name: str, error_message: str) -> None: """Update plugin error information.""" if plugin_name in self._plugin_info: info = self._plugin_info[plugin_name] info.last_error = error_message info.error_count += 1 info.health_status = PluginHealthStatus.ERROR info.last_health_check = time.time() if info.error_count >= self.max_error_count: info.health_status = PluginHealthStatus.CRITICAL pprint(f"Plugin {plugin_name} marked as critical due to {info.error_count} errors") # Health Monitoring def _start_health_monitor(self) -> None: """Start the health monitoring thread.""" if self._health_monitor_thread is None or not self._health_monitor_thread.is_alive(): self._health_monitor_thread = threading.Thread( target=self._health_monitor_loop, name="PluginHealthMonitor", daemon=True ) self._health_monitor_thread.start() pprint("Health monitoring started") def _health_monitor_loop(self) -> None: """Health monitoring loop.""" while not self._health_monitor_shutdown.wait(self.health_check_interval): try: self._perform_health_check() except Exception as e: pprint(f"Error in health monitor: {e}") def _perform_health_check(self) -> None: """Perform health check on all plugins.""" with self._plugins_lock: for plugin_name in list(self._plugins.keys()): try: plugin = self._plugins[plugin_name] health_data = plugin.check_health() # Update plugin info based on health check self._update_plugin_info(plugin_name) except Exception as e: pprint(f"Health check failed for plugin {plugin_name}: {e}") self._update_plugin_error(plugin_name, f"Health check failed: {e}") self._last_health_check = time.time() self._update_stats() # Statistics and Reporting def _update_stats(self) -> None: """Update plugin statistics.""" with self._plugins_lock: self._stats.update({ 'total_plugins': len(self._plugin_info), 'loaded_plugins': len(self._plugins), 'enabled_plugins': len([p for p in self._plugins.values() if p.enabled]), 'healthy_plugins': len([i for i in self._plugin_info.values() if i.is_healthy]), 'error_plugins': len([i for i in self._plugin_info.values() if i.health_status in [PluginHealthStatus.ERROR, PluginHealthStatus.CRITICAL]]), }) def get_stats(self) -> Dict[str, Any]: """Get plugin manager statistics.""" self._update_stats() stats = self._stats.copy() stats['uptime'] = time.time() - stats['uptime'] return stats def get_health_report(self) -> Dict[str, Any]: """ Get comprehensive health report for all plugins. Returns: Dict containing health information for all plugins """ with self._plugins_lock: plugins_health = {} for plugin_name, info in self._plugin_info.items(): plugins_health[plugin_name] = { 'name': info.name, 'state': info.state.value, 'enabled': info.enabled, 'healthy': info.is_healthy, 'active': info.is_active, 'health_status': info.health_status.value, 'last_error': info.last_error, 'error_count': info.error_count, 'load_time': info.load_time, 'init_time': info.init_time, 'last_health_check': info.last_health_check, 'metadata': info.metadata.__dict__ if info.metadata else None } return { 'timestamp': time.time(), 'last_health_check': self._last_health_check, 'plugins': plugins_health, 'summary': self.get_stats() } # Query Methods def get_plugin_names(self) -> List[str]: """Get list of all plugin names.""" with self._plugins_lock: return list(self._plugins.keys()) def get_enabled_plugins(self) -> List[str]: """Get list of enabled plugin names.""" with self._plugins_lock: return [name for name, plugin in self._plugins.items() if plugin.enabled] def get_disabled_plugins(self) -> List[str]: """Get list of disabled plugin names.""" with self._plugins_lock: return [name for name, plugin in self._plugins.items() if not plugin.enabled] def get_plugin_info(self, plugin_name: str) -> Optional[PluginInfo]: """ Get information about a specific plugin. Args: plugin_name: Name of the plugin Returns: PluginInfo object or None if plugin not found """ with self._plugins_lock: if plugin_name in self._plugin_info: # Update info before returning self._update_plugin_info(plugin_name) return self._plugin_info[plugin_name] return None def get_plugin_instance(self, plugin_name: str) -> Optional[TrixyPlugin]: """ Get plugin instance. Args: plugin_name: Name of the plugin Returns: Plugin instance or None if not found """ with self._plugins_lock: return self._plugins.get(plugin_name) def is_plugin_loaded(self, plugin_name: str) -> bool: """Check if a plugin is loaded.""" return plugin_name in self._plugins def is_plugin_enabled(self, plugin_name: str) -> bool: """Check if a plugin is enabled.""" with self._plugins_lock: if plugin_name in self._plugins: return self._plugins[plugin_name].enabled return False def is_plugin_healthy(self, plugin_name: str) -> bool: """Check if a plugin is healthy.""" info = self.get_plugin_info(plugin_name) return info.is_healthy if info else False # Event System Integration def add_event_callback(self, callback: Callable[[str, str, Any], None]) -> None: """Add callback for plugin events.""" self._event_callbacks.append(callback) def remove_event_callback(self, callback: Callable[[str, str, Any], None]) -> None: """Remove event callback.""" if callback in self._event_callbacks: self._event_callbacks.remove(callback) def _notify_event(self, event_type: str, plugin_name: str, data: Any) -> None: """Notify event callbacks.""" for callback in self._event_callbacks: try: callback(event_type, plugin_name, data) except Exception as e: pprint(f"Error in event callback: {e}") # Also trigger system events if event handler is available try: app = self.application_ref() if app and hasattr(app, 'get_event_handler'): event_handler = app.get_event_handler() if event_handler: event_data = { 'plugin_name': plugin_name, 'event_type': event_type, 'data': data, 'timestamp': time.time() } event_handler.trigger_event(f"plugin_{event_type}", event_data) except Exception as e: pprint(f"Error triggering system event: {e}") # Configuration Management def get_plugin_config(self, plugin_name: str) -> Optional[Dict[str, Any]]: """Get plugin configuration.""" if plugin_name in self._plugin_configs: return self._plugin_configs[plugin_name].get_config_copy() return None def set_plugin_config_value(self, plugin_name: str, key: str, value: Any) -> bool: """Set a plugin configuration value.""" if plugin_name in self._plugin_configs: try: self._plugin_configs[plugin_name].set_value(key, value) return True except Exception as e: pprint(f"Failed to set config value for {plugin_name}: {e}") return False # Lifecycle Management def shutdown(self) -> None: """Shutdown the plugin manager and all plugins.""" pprint("Shutting down plugin manager...") # Stop health monitor if self._health_monitor_thread: self._health_monitor_shutdown.set() self._health_monitor_thread.join(timeout=10.0) # Disable all plugins first with self._plugins_lock: for plugin_name in list(self._plugins.keys()): try: if self._plugins[plugin_name].state == PluginState.ENABLED: self.disable_plugin(plugin_name) except Exception as e: pprint(f"Error disabling plugin {plugin_name}: {e}") # Unload all plugins for plugin_name in list(self._plugins.keys()): try: self.unload_plugin(plugin_name) except Exception as e: pprint(f"Error unloading plugin {plugin_name}: {e}") # Shutdown loader self._loader.shutdown() pprint("Plugin manager shutdown complete") def __repr__(self) -> str: """String representation of the plugin manager.""" with self._plugins_lock: enabled_count = len([p for p in self._plugins.values() if p.enabled]) return f""