| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746 |
- """
- Main PluginManager Class for Trixy Application
- This module provides the central PluginManager class that orchestrates:
- - Plugin discovery and lifecycle management
- - Thread-safe plugin operations
- - Plugin health monitoring and error isolation
- - Hot-reload capability for development
- - Integration with application container and event system
- - Plugin enable/disable functionality
- - Plugin dependency management
- - Configuration management
- - Plugin statistics and reporting
- The PluginManager serves as the main interface for the plugin system,
- providing high-level operations while delegating to specialized components:
- - PluginLoader: Dynamic loading and unloading
- - PluginConfigHandler: Configuration management
- - TrixyPlugin: Base plugin class and lifecycle
- Features:
- - Thread-safe plugin operations
- - Automatic plugin discovery
- - Lifecycle management (load, initialize, enable, disable, unload)
- - Health monitoring with error isolation
- - Hot-reload for development
- - Event system integration
- - Configuration management
- - Dependency resolution
- - Plugin statistics and reporting
- """
- import os
- import threading
- import time
- import weakref
- from typing import Dict, List, Optional, Any, Set, Callable, Union
- from pathlib import Path
- from dataclasses import dataclass, field
- from enum import Enum
- import json
- # Import plugin system components
- from .trixy_plugin import TrixyPlugin, PluginState, PluginError, PluginMetadata
- from .plugin_loader import PluginLoader, PluginLoadResult, LoadResult
- from .config_handler import PluginConfigHandler
- # Import event system for integration
- try:
- from ..events import TrixyEvent
- except ImportError:
- TrixyEvent = None
- def pprint(message: str) -> None:
- """Plugin manager logging function."""
- print(f"[PLUGIN_MGR] {message}")
- class PluginHealthStatus(Enum):
- """Plugin health status levels."""
- HEALTHY = "healthy"
- WARNING = "warning"
- ERROR = "error"
- CRITICAL = "critical"
- UNKNOWN = "unknown"
- @dataclass
- class PluginInfo:
- """Comprehensive plugin information."""
- name: str
- state: PluginState
- enabled: bool
- health_status: PluginHealthStatus
- load_time: Optional[float] = None
- init_time: Optional[float] = None
- last_error: Optional[str] = None
- error_count: int = 0
- metadata: Optional[PluginMetadata] = None
- dependencies: List[str] = field(default_factory=list)
- path: Optional[Path] = None
- config_keys: Set[str] = field(default_factory=set)
- last_health_check: float = field(default_factory=time.time)
-
- @property
- def is_healthy(self) -> bool:
- """Check if plugin is healthy."""
- return self.health_status == PluginHealthStatus.HEALTHY
-
- @property
- def is_active(self) -> bool:
- """Check if plugin is active (enabled and running)."""
- return self.enabled and self.state == PluginState.ENABLED
- class PluginManagerError(Exception):
- """Base exception for plugin manager errors."""
- pass
- class PluginManager:
- """
- Central plugin management system for Trixy Application.
-
- This class provides the main interface for plugin operations:
- - Loading and unloading plugins
- - Enabling and disabling plugins
- - Health monitoring and error handling
- - Configuration management
- - Hot-reload for development
- - Plugin discovery and lifecycle management
- - Thread-safe operations
- - Integration with event system
-
- Usage:
- # Create plugin manager
- plugin_manager = PluginManager(
- application=app_container,
- plugins_directory=Path("./plugins")
- )
-
- # Load all plugins
- plugin_manager.load_all_plugins()
-
- # Enable/disable specific plugins
- plugin_manager.enable_plugin("my_plugin")
- plugin_manager.disable_plugin("my_plugin")
-
- # Get plugin information
- info = plugin_manager.get_plugin_info("my_plugin")
-
- # Health monitoring
- health_report = plugin_manager.get_health_report()
-
- # Hot reload for development
- plugin_manager.reload_plugin("my_plugin")
- """
-
- def __init__(
- self,
- application: Any,
- plugins_directory: Optional[Path] = None,
- auto_load: bool = True,
- enable_hot_reload: bool = False,
- health_check_interval: float = 60.0,
- max_error_count: int = 5
- ):
- """
- Initialize the plugin manager.
-
- Args:
- application: Application container instance
- plugins_directory: Directory containing plugins (defaults to ./plugins)
- auto_load: Automatically load plugins on initialization
- enable_hot_reload: Enable hot-reload for development
- health_check_interval: Interval between health checks in seconds
- max_error_count: Maximum errors before marking plugin as critical
- """
- self.application = application
- self.application_ref = weakref.ref(application)
-
- # Plugin directory setup
- if plugins_directory is None:
- plugins_directory = Path.cwd() / "plugins"
- self.plugins_directory = Path(plugins_directory)
-
- # Configuration
- self.auto_load = auto_load
- self.enable_hot_reload = enable_hot_reload
- self.health_check_interval = health_check_interval
- self.max_error_count = max_error_count
-
- # Plugin tracking
- self._plugins: Dict[str, TrixyPlugin] = {}
- self._plugin_info: Dict[str, PluginInfo] = {}
- self._plugin_configs: Dict[str, PluginConfigHandler] = {}
- self._plugins_lock = threading.RLock()
-
- # Plugin loader
- self._loader = PluginLoader(
- plugins_directory=self.plugins_directory,
- application=application,
- auto_reload=enable_hot_reload
- )
-
- # Health monitoring
- self._health_monitor_thread: Optional[threading.Thread] = None
- self._health_monitor_shutdown = threading.Event()
- self._last_health_check = 0.0
-
- # Event callbacks
- self._event_callbacks: List[Callable[[str, str, Any], None]] = []
-
- # Statistics
- self._stats = {
- 'total_plugins': 0,
- 'loaded_plugins': 0,
- 'enabled_plugins': 0,
- 'healthy_plugins': 0,
- 'error_plugins': 0,
- 'total_loads': 0,
- 'total_enables': 0,
- 'total_disables': 0,
- 'total_reloads': 0,
- 'uptime': time.time()
- }
-
- pprint(f"PluginManager initialized with directory: {self.plugins_directory}")
-
- # Start health monitoring
- if self.health_check_interval > 0:
- self._start_health_monitor()
-
- # Auto-load plugins if requested
- if self.auto_load:
- self.load_all_plugins()
-
- # Core Plugin Management Methods
-
- def load_all_plugins(self) -> Dict[str, PluginLoadResult]:
- """
- Load all plugins from the plugins directory.
-
- Returns:
- Dict mapping plugin names to their load results
- """
- pprint("Loading all plugins...")
-
- results = self._loader.load_all_plugins()
-
- with self._plugins_lock:
- # Process load results
- for plugin_name, result in results.items():
- if result.success and result.plugin_instance:
- self._register_plugin(plugin_name, result.plugin_instance)
-
- # Initialize plugin if not in error state
- try:
- if result.plugin_instance.state == PluginState.LOADED:
- result.plugin_instance.initialize_plugin()
-
- # Enable plugin if configured to be enabled
- if result.plugin_instance.enabled:
- result.plugin_instance.enable_plugin()
- self._stats['total_enables'] += 1
-
- except Exception as e:
- pprint(f"Failed to initialize plugin {plugin_name}: {e}")
- self._update_plugin_error(plugin_name, str(e))
-
- # Update statistics
- self._update_stats()
-
- successful = len([r for r in results.values() if r.success])
- pprint(f"Plugin loading complete: {successful}/{len(results)} plugins loaded")
-
- return results
-
- def load_plugin(self, plugin_name: str) -> PluginLoadResult:
- """
- Load a specific plugin.
-
- Args:
- plugin_name: Name of the plugin to load
-
- Returns:
- PluginLoadResult with load status
- """
- pprint(f"Loading plugin: {plugin_name}")
-
- result = self._loader.load_plugin(plugin_name)
- self._stats['total_loads'] += 1
-
- if result.success and result.plugin_instance:
- with self._plugins_lock:
- self._register_plugin(plugin_name, result.plugin_instance)
-
- # Initialize plugin
- try:
- if result.plugin_instance.state == PluginState.LOADED:
- result.plugin_instance.initialize_plugin()
-
- # Enable if configured
- if result.plugin_instance.enabled:
- result.plugin_instance.enable_plugin()
- self._stats['total_enables'] += 1
-
- except Exception as e:
- pprint(f"Failed to initialize plugin {plugin_name}: {e}")
- self._update_plugin_error(plugin_name, str(e))
-
- self._update_stats()
- return result
-
- def unload_plugin(self, plugin_name: str) -> bool:
- """
- Unload a specific plugin.
-
- Args:
- plugin_name: Name of plugin to unload
-
- Returns:
- bool: True if successfully unloaded
- """
- pprint(f"Unloading plugin: {plugin_name}")
-
- with self._plugins_lock:
- if plugin_name in self._plugins:
- # Disable first if enabled
- if self._plugins[plugin_name].state == PluginState.ENABLED:
- self.disable_plugin(plugin_name)
-
- # Unregister plugin
- self._unregister_plugin(plugin_name)
-
- # Unload from loader
- success = self._loader.unload_plugin(plugin_name)
-
- if success:
- pprint(f"Plugin {plugin_name} unloaded successfully")
-
- self._update_stats()
- return success
-
- def enable_plugin(self, plugin_name: str) -> bool:
- """
- Enable a plugin.
-
- Args:
- plugin_name: Name of plugin to enable
-
- Returns:
- bool: True if successfully enabled
- """
- with self._plugins_lock:
- if plugin_name not in self._plugins:
- pprint(f"Plugin {plugin_name} not loaded")
- return False
-
- plugin = self._plugins[plugin_name]
-
- try:
- if plugin.state in [PluginState.INITIALIZED, PluginState.DISABLED]:
- plugin.enable_plugin()
- self._update_plugin_info(plugin_name)
- self._stats['total_enables'] += 1
- self._notify_event("plugin_enabled", plugin_name, {"state": plugin.state.value})
- pprint(f"Plugin {plugin_name} enabled")
- return True
- else:
- pprint(f"Plugin {plugin_name} cannot be enabled from state {plugin.state.value}")
- return False
-
- except Exception as e:
- error_msg = f"Failed to enable plugin {plugin_name}: {e}"
- pprint(error_msg)
- self._update_plugin_error(plugin_name, error_msg)
- return False
-
- def disable_plugin(self, plugin_name: str) -> bool:
- """
- Disable a plugin.
-
- Args:
- plugin_name: Name of plugin to disable
-
- Returns:
- bool: True if successfully disabled
- """
- with self._plugins_lock:
- if plugin_name not in self._plugins:
- pprint(f"Plugin {plugin_name} not loaded")
- return False
-
- plugin = self._plugins[plugin_name]
-
- try:
- if plugin.state in [PluginState.ENABLED, PluginState.INITIALIZED]:
- plugin.disable_plugin()
- self._update_plugin_info(plugin_name)
- self._stats['total_disables'] += 1
- self._notify_event("plugin_disabled", plugin_name, {"state": plugin.state.value})
- pprint(f"Plugin {plugin_name} disabled")
- return True
- else:
- pprint(f"Plugin {plugin_name} cannot be disabled from state {plugin.state.value}")
- return False
-
- except Exception as e:
- error_msg = f"Failed to disable plugin {plugin_name}: {e}"
- pprint(error_msg)
- self._update_plugin_error(plugin_name, error_msg)
- return False
-
- def reload_plugin(self, plugin_name: str) -> PluginLoadResult:
- """
- Reload a plugin (hot-reload).
-
- Args:
- plugin_name: Name of plugin to reload
-
- Returns:
- PluginLoadResult with reload status
- """
- pprint(f"Reloading plugin: {plugin_name}")
-
- # Unload first
- self.unload_plugin(plugin_name)
-
- # Load again
- result = self.load_plugin(plugin_name)
- self._stats['total_reloads'] += 1
-
- if result.success:
- self._notify_event("plugin_reloaded", plugin_name, {"load_time": result.load_time})
-
- return result
-
- # Plugin Registration and Tracking
-
- def _register_plugin(self, plugin_name: str, plugin_instance: TrixyPlugin) -> None:
- """Register a plugin instance."""
- self._plugins[plugin_name] = plugin_instance
-
- # Create plugin info
- info = PluginInfo(
- name=plugin_name,
- state=plugin_instance.state,
- enabled=plugin_instance.enabled,
- health_status=PluginHealthStatus.HEALTHY,
- load_time=plugin_instance.load_time,
- init_time=plugin_instance.init_time,
- metadata=plugin_instance.metadata,
- path=plugin_instance.plugin_path
- )
- self._plugin_info[plugin_name] = info
-
- # Create config handler
- config_handler = PluginConfigHandler(
- config_path=plugin_instance.plugin_path / "config.json",
- plugin_name=plugin_name
- )
- self._plugin_configs[plugin_name] = config_handler
-
- pprint(f"Plugin {plugin_name} registered")
-
- def _unregister_plugin(self, plugin_name: str) -> None:
- """Unregister a plugin."""
- if plugin_name in self._plugins:
- del self._plugins[plugin_name]
-
- if plugin_name in self._plugin_info:
- del self._plugin_info[plugin_name]
-
- if plugin_name in self._plugin_configs:
- del self._plugin_configs[plugin_name]
-
- pprint(f"Plugin {plugin_name} unregistered")
-
- def _update_plugin_info(self, plugin_name: str) -> None:
- """Update plugin information."""
- if plugin_name not in self._plugins or plugin_name not in self._plugin_info:
- return
-
- plugin = self._plugins[plugin_name]
- info = self._plugin_info[plugin_name]
-
- # Update basic info
- info.state = plugin.state
- info.enabled = plugin.enabled
- info.last_error = plugin.last_error
- info.error_count = plugin.error_count
- info.last_health_check = time.time()
-
- # Update health status
- if plugin.last_error:
- if plugin.error_count >= self.max_error_count:
- info.health_status = PluginHealthStatus.CRITICAL
- else:
- info.health_status = PluginHealthStatus.ERROR
- elif plugin.state == PluginState.ERROR:
- info.health_status = PluginHealthStatus.ERROR
- elif plugin.state in [PluginState.ENABLED, PluginState.INITIALIZED]:
- info.health_status = PluginHealthStatus.HEALTHY
- else:
- info.health_status = PluginHealthStatus.WARNING
-
- def _update_plugin_error(self, plugin_name: str, error_message: str) -> None:
- """Update plugin error information."""
- if plugin_name in self._plugin_info:
- info = self._plugin_info[plugin_name]
- info.last_error = error_message
- info.error_count += 1
- info.health_status = PluginHealthStatus.ERROR
- info.last_health_check = time.time()
-
- if info.error_count >= self.max_error_count:
- info.health_status = PluginHealthStatus.CRITICAL
- pprint(f"Plugin {plugin_name} marked as critical due to {info.error_count} errors")
-
- # Health Monitoring
-
- def _start_health_monitor(self) -> None:
- """Start the health monitoring thread."""
- if self._health_monitor_thread is None or not self._health_monitor_thread.is_alive():
- self._health_monitor_thread = threading.Thread(
- target=self._health_monitor_loop,
- name="PluginHealthMonitor",
- daemon=True
- )
- self._health_monitor_thread.start()
- pprint("Health monitoring started")
-
- def _health_monitor_loop(self) -> None:
- """Health monitoring loop."""
- while not self._health_monitor_shutdown.wait(self.health_check_interval):
- try:
- self._perform_health_check()
- except Exception as e:
- pprint(f"Error in health monitor: {e}")
-
- def _perform_health_check(self) -> None:
- """Perform health check on all plugins."""
- with self._plugins_lock:
- for plugin_name in list(self._plugins.keys()):
- try:
- plugin = self._plugins[plugin_name]
- health_data = plugin.check_health()
-
- # Update plugin info based on health check
- self._update_plugin_info(plugin_name)
-
- except Exception as e:
- pprint(f"Health check failed for plugin {plugin_name}: {e}")
- self._update_plugin_error(plugin_name, f"Health check failed: {e}")
-
- self._last_health_check = time.time()
- self._update_stats()
-
- # Statistics and Reporting
-
- def _update_stats(self) -> None:
- """Update plugin statistics."""
- with self._plugins_lock:
- self._stats.update({
- 'total_plugins': len(self._plugin_info),
- 'loaded_plugins': len(self._plugins),
- 'enabled_plugins': len([p for p in self._plugins.values() if p.enabled]),
- 'healthy_plugins': len([i for i in self._plugin_info.values() if i.is_healthy]),
- 'error_plugins': len([i for i in self._plugin_info.values() if i.health_status in [PluginHealthStatus.ERROR, PluginHealthStatus.CRITICAL]]),
- })
-
- def get_stats(self) -> Dict[str, Any]:
- """Get plugin manager statistics."""
- self._update_stats()
- stats = self._stats.copy()
- stats['uptime'] = time.time() - stats['uptime']
- return stats
-
- def get_health_report(self) -> Dict[str, Any]:
- """
- Get comprehensive health report for all plugins.
-
- Returns:
- Dict containing health information for all plugins
- """
- with self._plugins_lock:
- plugins_health = {}
-
- for plugin_name, info in self._plugin_info.items():
- plugins_health[plugin_name] = {
- 'name': info.name,
- 'state': info.state.value,
- 'enabled': info.enabled,
- 'healthy': info.is_healthy,
- 'active': info.is_active,
- 'health_status': info.health_status.value,
- 'last_error': info.last_error,
- 'error_count': info.error_count,
- 'load_time': info.load_time,
- 'init_time': info.init_time,
- 'last_health_check': info.last_health_check,
- 'metadata': info.metadata.__dict__ if info.metadata else None
- }
-
- return {
- 'timestamp': time.time(),
- 'last_health_check': self._last_health_check,
- 'plugins': plugins_health,
- 'summary': self.get_stats()
- }
-
- # Query Methods
-
- def get_plugin_names(self) -> List[str]:
- """Get list of all plugin names."""
- with self._plugins_lock:
- return list(self._plugins.keys())
-
- def get_enabled_plugins(self) -> List[str]:
- """Get list of enabled plugin names."""
- with self._plugins_lock:
- return [name for name, plugin in self._plugins.items() if plugin.enabled]
-
- def get_disabled_plugins(self) -> List[str]:
- """Get list of disabled plugin names."""
- with self._plugins_lock:
- return [name for name, plugin in self._plugins.items() if not plugin.enabled]
-
- def get_plugin_info(self, plugin_name: str) -> Optional[PluginInfo]:
- """
- Get information about a specific plugin.
-
- Args:
- plugin_name: Name of the plugin
-
- Returns:
- PluginInfo object or None if plugin not found
- """
- with self._plugins_lock:
- if plugin_name in self._plugin_info:
- # Update info before returning
- self._update_plugin_info(plugin_name)
- return self._plugin_info[plugin_name]
- return None
-
- def get_plugin_instance(self, plugin_name: str) -> Optional[TrixyPlugin]:
- """
- Get plugin instance.
-
- Args:
- plugin_name: Name of the plugin
-
- Returns:
- Plugin instance or None if not found
- """
- with self._plugins_lock:
- return self._plugins.get(plugin_name)
-
- def is_plugin_loaded(self, plugin_name: str) -> bool:
- """Check if a plugin is loaded."""
- return plugin_name in self._plugins
-
- def is_plugin_enabled(self, plugin_name: str) -> bool:
- """Check if a plugin is enabled."""
- with self._plugins_lock:
- if plugin_name in self._plugins:
- return self._plugins[plugin_name].enabled
- return False
-
- def is_plugin_healthy(self, plugin_name: str) -> bool:
- """Check if a plugin is healthy."""
- info = self.get_plugin_info(plugin_name)
- return info.is_healthy if info else False
-
- # Event System Integration
-
- def add_event_callback(self, callback: Callable[[str, str, Any], None]) -> None:
- """Add callback for plugin events."""
- self._event_callbacks.append(callback)
-
- def remove_event_callback(self, callback: Callable[[str, str, Any], None]) -> None:
- """Remove event callback."""
- if callback in self._event_callbacks:
- self._event_callbacks.remove(callback)
-
- def _notify_event(self, event_type: str, plugin_name: str, data: Any) -> None:
- """Notify event callbacks."""
- for callback in self._event_callbacks:
- try:
- callback(event_type, plugin_name, data)
- except Exception as e:
- pprint(f"Error in event callback: {e}")
-
- # Also trigger system events if event handler is available
- try:
- app = self.application_ref()
- if app and hasattr(app, 'get_event_handler'):
- event_handler = app.get_event_handler()
- if event_handler:
- event_data = {
- 'plugin_name': plugin_name,
- 'event_type': event_type,
- 'data': data,
- 'timestamp': time.time()
- }
- event_handler.trigger_event(f"plugin_{event_type}", event_data)
- except Exception as e:
- pprint(f"Error triggering system event: {e}")
-
- # Configuration Management
-
- def get_plugin_config(self, plugin_name: str) -> Optional[Dict[str, Any]]:
- """Get plugin configuration."""
- if plugin_name in self._plugin_configs:
- return self._plugin_configs[plugin_name].get_config_copy()
- return None
-
- def set_plugin_config_value(self, plugin_name: str, key: str, value: Any) -> bool:
- """Set a plugin configuration value."""
- if plugin_name in self._plugin_configs:
- try:
- self._plugin_configs[plugin_name].set_value(key, value)
- return True
- except Exception as e:
- pprint(f"Failed to set config value for {plugin_name}: {e}")
- return False
-
- # Lifecycle Management
-
- def shutdown(self) -> None:
- """Shutdown the plugin manager and all plugins."""
- pprint("Shutting down plugin manager...")
-
- # Stop health monitor
- if self._health_monitor_thread:
- self._health_monitor_shutdown.set()
- self._health_monitor_thread.join(timeout=10.0)
-
- # Disable all plugins first
- with self._plugins_lock:
- for plugin_name in list(self._plugins.keys()):
- try:
- if self._plugins[plugin_name].state == PluginState.ENABLED:
- self.disable_plugin(plugin_name)
- except Exception as e:
- pprint(f"Error disabling plugin {plugin_name}: {e}")
-
- # Unload all plugins
- for plugin_name in list(self._plugins.keys()):
- try:
- self.unload_plugin(plugin_name)
- except Exception as e:
- pprint(f"Error unloading plugin {plugin_name}: {e}")
-
- # Shutdown loader
- self._loader.shutdown()
-
- pprint("Plugin manager shutdown complete")
-
- def __repr__(self) -> str:
- """String representation of the plugin manager."""
- with self._plugins_lock:
- enabled_count = len([p for p in self._plugins.values() if p.enabled])
- return f"<PluginManager loaded={len(self._plugins)} enabled={enabled_count}>"
|