Skip to content

Plugins

InjectQ's plugin system provides a powerful way to extend functionality, integrate with third-party libraries, and create reusable components that can be easily shared across projects.

Plugin Architecture

Plugin Interface

from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Type
from dataclasses import dataclass
import importlib
import sys

@dataclass
class PluginMetadata:
    """Metadata for a plugin."""
    name: str
    version: str
    description: str
    author: str
    dependencies: List[str] = None
    tags: List[str] = None

    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []
        if self.tags is None:
            self.tags = []

class InjectQPlugin(ABC):
    """Base class for InjectQ plugins."""

    @property
    @abstractmethod
    def metadata(self) -> PluginMetadata:
        """Get plugin metadata."""
        pass

    @abstractmethod
    def initialize(self, container) -> None:
        """Initialize plugin with container."""
        pass

    def configure(self, config: Dict[str, Any]) -> None:
        """Configure plugin with settings."""
        pass

    def dispose(self) -> None:
        """Cleanup plugin resources."""
        pass

    def get_services(self) -> Dict[Type, Any]:
        """Get services provided by this plugin."""
        return {}

    def get_middleware(self) -> List[Any]:
        """Get middleware provided by this plugin."""
        return []

# Plugin registry
class PluginRegistry:
    """Registry for managing plugins."""

    def __init__(self):
        self._plugins: Dict[str, InjectQPlugin] = {}
        self._plugin_configs: Dict[str, Dict[str, Any]] = {}
        self._loaded_plugins: Dict[str, bool] = {}

    def register_plugin(self, plugin: InjectQPlugin, config: Dict[str, Any] = None):
        """Register a plugin."""
        metadata = plugin.metadata

        if metadata.name in self._plugins:
            raise ValueError(f"Plugin '{metadata.name}' is already registered")

        # Check dependencies
        self._check_dependencies(metadata)

        # Register plugin
        self._plugins[metadata.name] = plugin
        self._plugin_configs[metadata.name] = config or {}
        self._loaded_plugins[metadata.name] = False

        print(f"Registered plugin: {metadata.name} v{metadata.version}")

    def load_plugin(self, name: str, container) -> None:
        """Load and initialize a plugin."""
        if name not in self._plugins:
            raise ValueError(f"Plugin '{name}' is not registered")

        if self._loaded_plugins[name]:
            return  # Already loaded

        plugin = self._plugins[name]
        config = self._plugin_configs[name]

        # Configure plugin
        plugin.configure(config)

        # Initialize plugin
        plugin.initialize(container)

        # Register plugin services
        services = plugin.get_services()
        for service_type, implementation in services.items():
            container.register(service_type, implementation)

        # Add plugin middleware
        middleware_list = plugin.get_middleware()
        for middleware in middleware_list:
            container.add_middleware(middleware)

        self._loaded_plugins[name] = True
        print(f"Loaded plugin: {name}")

    def unload_plugin(self, name: str) -> None:
        """Unload a plugin."""
        if name not in self._plugins:
            return

        if not self._loaded_plugins[name]:
            return  # Not loaded

        plugin = self._plugins[name]
        plugin.dispose()

        self._loaded_plugins[name] = False
        print(f"Unloaded plugin: {name}")

    def get_plugin(self, name: str) -> Optional[InjectQPlugin]:
        """Get a plugin by name."""
        return self._plugins.get(name)

    def list_plugins(self) -> List[PluginMetadata]:
        """List all registered plugins."""
        return [plugin.metadata for plugin in self._plugins.values()]

    def _check_dependencies(self, metadata: PluginMetadata):
        """Check if plugin dependencies are satisfied."""
        for dependency in metadata.dependencies:
            if dependency not in self._plugins:
                raise ValueError(
                    f"Plugin '{metadata.name}' requires dependency '{dependency}' "
                    f"which is not registered"
                )

# Global plugin registry
plugin_registry = PluginRegistry()

Built-in Plugins

Database Plugin

class DatabasePlugin(InjectQPlugin):
    """Plugin for database integration."""

    @property
    def metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="database",
            version="1.0.0",
            description="Database integration plugin with connection pooling",
            author="InjectQ Team",
            tags=["database", "sql", "orm"]
        )

    def __init__(self):
        self._connection_pool = None
        self._config = {}

    def configure(self, config: Dict[str, Any]) -> None:
        """Configure database plugin."""
        self._config = config

        # Validate required config
        required_keys = ['connection_string', 'pool_size']
        for key in required_keys:
            if key not in config:
                raise ValueError(f"Database plugin requires '{key}' configuration")

    def initialize(self, container) -> None:
        """Initialize database plugin."""
        # Create connection pool
        self._connection_pool = self._create_connection_pool()

        print(f"Database plugin initialized with pool size: {self._config['pool_size']}")

    def _create_connection_pool(self):
        """Create database connection pool."""
        # This would create actual connection pool
        class MockConnectionPool:
            def __init__(self, connection_string: str, pool_size: int):
                self.connection_string = connection_string
                self.pool_size = pool_size

            def get_connection(self):
                return MockConnection(self.connection_string)

            def close(self):
                print("Connection pool closed")

        return MockConnectionPool(
            self._config['connection_string'],
            self._config['pool_size']
        )

    def get_services(self) -> Dict[Type, Any]:
        """Get database services."""
        return {
            ConnectionPool: lambda: self._connection_pool,
            DatabaseConnection: lambda: self._connection_pool.get_connection(),
            UserRepository: lambda conn=self._connection_pool.get_connection(): DatabaseUserRepository(conn),
            OrderRepository: lambda conn=self._connection_pool.get_connection(): DatabaseOrderRepository(conn)
        }

    def dispose(self) -> None:
        """Cleanup database resources."""
        if self._connection_pool:
            self._connection_pool.close()

# Mock classes for example
class ConnectionPool:
    pass

class DatabaseConnection:
    pass

class MockConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string

class DatabaseUserRepository:
    def __init__(self, connection):
        self.connection = connection

class DatabaseOrderRepository:
    def __init__(self, connection):
        self.connection = connection

# Usage
database_config = {
    'connection_string': 'postgresql://localhost/mydb',
    'pool_size': 10
}

plugin_registry.register_plugin(DatabasePlugin(), database_config)
plugin_registry.load_plugin('database', container)

Caching Plugin

import time
from typing import Union

class CachingPlugin(InjectQPlugin):
    """Plugin for caching functionality."""

    @property
    def metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="caching",
            version="1.0.0",
            description="Caching plugin with multiple backends",
            author="InjectQ Team",
            tags=["cache", "performance", "memory"]
        )

    def __init__(self):
        self._cache_backend = None
        self._config = {}

    def configure(self, config: Dict[str, Any]) -> None:
        """Configure caching plugin."""
        self._config = config

        # Default configuration
        self._config.setdefault('backend', 'memory')
        self._config.setdefault('ttl', 300)  # 5 minutes
        self._config.setdefault('max_size', 1000)

    def initialize(self, container) -> None:
        """Initialize caching plugin."""
        backend_type = self._config['backend']

        if backend_type == 'memory':
            self._cache_backend = MemoryCache(
                ttl=self._config['ttl'],
                max_size=self._config['max_size']
            )
        elif backend_type == 'redis':
            self._cache_backend = RedisCache(
                redis_url=self._config.get('redis_url', 'redis://localhost'),
                ttl=self._config['ttl']
            )
        else:
            raise ValueError(f"Unsupported cache backend: {backend_type}")

        print(f"Caching plugin initialized with {backend_type} backend")

    def get_services(self) -> Dict[Type, Any]:
        """Get caching services."""
        return {
            CacheService: lambda: self._cache_backend,
            CacheManager: lambda: CacheManager(self._cache_backend)
        }

    def get_middleware(self) -> List[Any]:
        """Get caching middleware."""
        return [CacheMiddleware(self._cache_backend)]

# Cache implementations
class CacheService(ABC):
    """Abstract cache service."""

    @abstractmethod
    def get(self, key: str) -> Any:
        pass

    @abstractmethod
    def set(self, key: str, value: Any, ttl: int = None) -> None:
        pass

    @abstractmethod
    def delete(self, key: str) -> None:
        pass

class MemoryCache(CacheService):
    """In-memory cache implementation."""

    def __init__(self, ttl: int = 300, max_size: int = 1000):
        self.ttl = ttl
        self.max_size = max_size
        self._cache: Dict[str, tuple] = {}  # key -> (value, expiry_time)

    def get(self, key: str) -> Any:
        """Get value from cache."""
        if key in self._cache:
            value, expiry_time = self._cache[key]

            if time.time() < expiry_time:
                return value
            else:
                del self._cache[key]

        return None

    def set(self, key: str, value: Any, ttl: int = None) -> None:
        """Set value in cache."""
        ttl = ttl or self.ttl
        expiry_time = time.time() + ttl

        # Evict old entries if at max size
        if len(self._cache) >= self.max_size:
            self._evict_old_entries()

        self._cache[key] = (value, expiry_time)

    def delete(self, key: str) -> None:
        """Delete value from cache."""
        self._cache.pop(key, None)

    def _evict_old_entries(self):
        """Evict expired or oldest entries."""
        current_time = time.time()

        # Remove expired entries
        expired_keys = [
            key for key, (_, expiry_time) in self._cache.items()
            if current_time >= expiry_time
        ]

        for key in expired_keys:
            del self._cache[key]

        # If still at max size, remove oldest
        if len(self._cache) >= self.max_size:
            oldest_key = min(self._cache.keys(), key=lambda k: self._cache[k][1])
            del self._cache[oldest_key]

class RedisCache(CacheService):
    """Redis cache implementation."""

    def __init__(self, redis_url: str, ttl: int = 300):
        self.redis_url = redis_url
        self.ttl = ttl
        self._redis_client = None
        self._connect()

    def _connect(self):
        """Connect to Redis."""
        # This would use actual Redis client
        print(f"Connected to Redis at {self.redis_url}")
        self._redis_client = MockRedisClient()

    def get(self, key: str) -> Any:
        """Get value from Redis cache."""
        return self._redis_client.get(key)

    def set(self, key: str, value: Any, ttl: int = None) -> None:
        """Set value in Redis cache."""
        ttl = ttl or self.ttl
        self._redis_client.setex(key, ttl, value)

    def delete(self, key: str) -> None:
        """Delete value from Redis cache."""
        self._redis_client.delete(key)

class MockRedisClient:
    """Mock Redis client for example."""

    def __init__(self):
        self._data = {}

    def get(self, key: str):
        return self._data.get(key)

    def setex(self, key: str, ttl: int, value: Any):
        self._data[key] = value

    def delete(self, key: str):
        self._data.pop(key, None)

class CacheManager:
    """High-level cache manager."""

    def __init__(self, cache_service: CacheService):
        self.cache_service = cache_service

    def cached(self, key: str, ttl: int = None):
        """Decorator for caching function results."""
        def decorator(func):
            def wrapper(*args, **kwargs):
                cache_key = f"{key}:{hash((args, tuple(kwargs.items())))}"

                # Try to get from cache
                result = self.cache_service.get(cache_key)
                if result is not None:
                    return result

                # Execute function and cache result
                result = func(*args, **kwargs)
                self.cache_service.set(cache_key, result, ttl)

                return result

            return wrapper
        return decorator

class CacheMiddleware:
    """Middleware for caching service instances."""

    def __init__(self, cache_service: CacheService):
        self.cache_service = cache_service

    async def process_resolution(self, service_type: Type, next_resolver):
        """Cache service resolution."""
        cache_key = f"service:{service_type.__name__}"

        # Try cache first
        cached_instance = self.cache_service.get(cache_key)
        if cached_instance is not None:
            return cached_instance

        # Resolve and cache
        instance = await next_resolver(service_type)
        self.cache_service.set(cache_key, instance, ttl=60)  # Cache for 1 minute

        return instance

# Usage
caching_config = {
    'backend': 'memory',
    'ttl': 600,
    'max_size': 5000
}

plugin_registry.register_plugin(CachingPlugin(), caching_config)
plugin_registry.load_plugin('caching', container)

Logging Plugin

import logging
import sys
from datetime import datetime

class LoggingPlugin(InjectQPlugin):
    """Plugin for enhanced logging functionality."""

    @property
    def metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="logging",
            version="1.0.0",
            description="Enhanced logging plugin with structured logging",
            author="InjectQ Team",
            tags=["logging", "monitoring", "debugging"]
        )

    def __init__(self):
        self._logger = None
        self._config = {}

    def configure(self, config: Dict[str, Any]) -> None:
        """Configure logging plugin."""
        self._config = config

        # Default configuration
        self._config.setdefault('level', 'INFO')
        self._config.setdefault('format', 'json')
        self._config.setdefault('output', 'console')

    def initialize(self, container) -> None:
        """Initialize logging plugin."""
        self._setup_logger()
        print(f"Logging plugin initialized with {self._config['format']} format")

    def _setup_logger(self):
        """Setup logger configuration."""
        logger_name = 'injectq'
        self._logger = logging.getLogger(logger_name)

        # Set level
        level = getattr(logging, self._config['level'].upper())
        self._logger.setLevel(level)

        # Create formatter
        if self._config['format'] == 'json':
            formatter = JsonFormatter()
        else:
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )

        # Create handler
        if self._config['output'] == 'console':
            handler = logging.StreamHandler(sys.stdout)
        elif self._config['output'] == 'file':
            handler = logging.FileHandler(self._config.get('filename', 'injectq.log'))
        else:
            handler = logging.StreamHandler(sys.stdout)

        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

    def get_services(self) -> Dict[Type, Any]:
        """Get logging services."""
        return {
            StructuredLogger: lambda: StructuredLogger(self._logger),
            LoggingService: lambda: LoggingService(self._logger)
        }

    def get_middleware(self) -> List[Any]:
        """Get logging middleware."""
        return [DetailedLoggingMiddleware(self._logger)]

class JsonFormatter(logging.Formatter):
    """JSON formatter for structured logging."""

    def format(self, record):
        """Format log record as JSON."""
        import json

        log_data = {
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }

        # Add exception info if present
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)

        # Add extra fields
        for key, value in record.__dict__.items():
            if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname',
                          'filename', 'module', 'lineno', 'funcName', 'created',
                          'msecs', 'relativeCreated', 'thread', 'threadName',
                          'processName', 'process', 'message', 'exc_info', 'exc_text',
                          'stack_info']:
                log_data[key] = value

        return json.dumps(log_data)

class StructuredLogger:
    """Structured logger with context support."""

    def __init__(self, logger: logging.Logger):
        self._logger = logger
        self._context = {}

    def with_context(self, **context):
        """Add context to logger."""
        new_logger = StructuredLogger(self._logger)
        new_logger._context = {**self._context, **context}
        return new_logger

    def info(self, message: str, **extra):
        """Log info message with context."""
        self._log(logging.INFO, message, extra)

    def error(self, message: str, **extra):
        """Log error message with context."""
        self._log(logging.ERROR, message, extra)

    def warning(self, message: str, **extra):
        """Log warning message with context."""
        self._log(logging.WARNING, message, extra)

    def debug(self, message: str, **extra):
        """Log debug message with context."""
        self._log(logging.DEBUG, message, extra)

    def _log(self, level: int, message: str, extra: Dict[str, Any]):
        """Log message with context."""
        log_extra = {**self._context, **extra}
        self._logger.log(level, message, extra=log_extra)

class LoggingService:
    """High-level logging service."""

    def __init__(self, logger: logging.Logger):
        self._logger = logger

    def log_service_resolution(self, service_type: Type, duration_ms: float, success: bool):
        """Log service resolution."""
        status = "SUCCESS" if success else "FAILED"
        self._logger.info(
            f"Service resolution {status}",
            extra={
                'service_type': service_type.__name__,
                'duration_ms': duration_ms,
                'success': success,
                'event_type': 'service_resolution'
            }
        )

    def log_service_registration(self, service_type: Type, implementation: Any):
        """Log service registration."""
        self._logger.info(
            "Service registered",
            extra={
                'service_type': service_type.__name__,
                'implementation': implementation.__name__ if hasattr(implementation, '__name__') else str(implementation),
                'event_type': 'service_registration'
            }
        )

class DetailedLoggingMiddleware:
    """Detailed logging middleware."""

    def __init__(self, logger: logging.Logger):
        self._logger = logger

    async def process_resolution(self, service_type: Type, next_resolver):
        """Log detailed service resolution."""
        start_time = time.time()

        self._logger.debug(
            "Starting service resolution",
            extra={
                'service_type': service_type.__name__,
                'event_type': 'resolution_start'
            }
        )

        try:
            result = await next_resolver(service_type)
            duration_ms = (time.time() - start_time) * 1000

            self._logger.info(
                "Service resolution completed",
                extra={
                    'service_type': service_type.__name__,
                    'duration_ms': duration_ms,
                    'event_type': 'resolution_success'
                }
            )

            return result

        except Exception as e:
            duration_ms = (time.time() - start_time) * 1000

            self._logger.error(
                "Service resolution failed",
                extra={
                    'service_type': service_type.__name__,
                    'duration_ms': duration_ms,
                    'error': str(e),
                    'error_type': type(e).__name__,
                    'event_type': 'resolution_error'
                },
                exc_info=True
            )

            raise

# Usage
logging_config = {
    'level': 'DEBUG',
    'format': 'json',
    'output': 'console'
}

plugin_registry.register_plugin(LoggingPlugin(), logging_config)
plugin_registry.load_plugin('logging', container)

Plugin Discovery

Automatic Plugin Discovery

import pkgutil
import importlib.util
from pathlib import Path

class PluginDiscovery:
    """Automatic plugin discovery system."""

    def __init__(self):
        self._discovered_plugins: Dict[str, InjectQPlugin] = {}

    def discover_plugins(self, paths: List[str] = None) -> List[InjectQPlugin]:
        """Discover plugins from specified paths."""
        paths = paths or [
            'injectq_plugins',  # Standard plugin namespace
            'plugins',          # Local plugins directory
        ]

        discovered = []

        for path in paths:
            plugins = self._discover_from_path(path)
            discovered.extend(plugins)

        return discovered

    def _discover_from_path(self, path: str) -> List[InjectQPlugin]:
        """Discover plugins from a specific path."""
        plugins = []

        try:
            # Try to import as module
            if '.' not in path:
                # Package path
                plugins.extend(self._discover_from_package(path))
            else:
                # File path
                plugins.extend(self._discover_from_file(path))
        except ImportError:
            # Try as directory path
            plugins.extend(self._discover_from_directory(path))

        return plugins

    def _discover_from_package(self, package_name: str) -> List[InjectQPlugin]:
        """Discover plugins from Python package."""
        plugins = []

        try:
            package = importlib.import_module(package_name)

            # Look for plugin modules
            if hasattr(package, '__path__'):
                for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
                    full_name = f"{package_name}.{modname}"

                    try:
                        module = importlib.import_module(full_name)
                        plugins.extend(self._extract_plugins_from_module(module))
                    except ImportError as e:
                        print(f"Failed to import plugin module {full_name}: {e}")

        except ImportError:
            pass  # Package not found

        return plugins

    def _discover_from_directory(self, directory_path: str) -> List[InjectQPlugin]:
        """Discover plugins from directory."""
        plugins = []
        directory = Path(directory_path)

        if not directory.exists():
            return plugins

        # Look for Python files
        for file_path in directory.glob("*.py"):
            if file_path.name.startswith('__'):
                continue

            plugins.extend(self._discover_from_file(str(file_path)))

        return plugins

    def _discover_from_file(self, file_path: str) -> List[InjectQPlugin]:
        """Discover plugins from Python file."""
        plugins = []

        try:
            spec = importlib.util.spec_from_file_location("plugin_module", file_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)

            plugins.extend(self._extract_plugins_from_module(module))

        except Exception as e:
            print(f"Failed to load plugin from {file_path}: {e}")

        return plugins

    def _extract_plugins_from_module(self, module) -> List[InjectQPlugin]:
        """Extract plugin classes from module."""
        plugins = []

        for name in dir(module):
            obj = getattr(module, name)

            # Check if it's a plugin class
            if (isinstance(obj, type) and 
                issubclass(obj, InjectQPlugin) and 
                obj is not InjectQPlugin):

                try:
                    # Instantiate plugin
                    plugin = obj()
                    plugins.append(plugin)
                except Exception as e:
                    print(f"Failed to instantiate plugin {name}: {e}")

        return plugins

    def auto_register_plugins(self, paths: List[str] = None) -> int:
        """Automatically discover and register plugins."""
        plugins = self.discover_plugins(paths)

        registered_count = 0
        for plugin in plugins:
            try:
                plugin_registry.register_plugin(plugin)
                registered_count += 1
            except Exception as e:
                print(f"Failed to register plugin {plugin.metadata.name}: {e}")

        return registered_count

# Usage
discovery = PluginDiscovery()
count = discovery.auto_register_plugins(['./plugins', 'injectq_plugins'])
print(f"Discovered and registered {count} plugins")

Plugin Configuration

import yaml
import json
from pathlib import Path

class PluginConfiguration:
    """Manages plugin configuration from files."""

    def __init__(self, config_file: str = "plugins.yaml"):
        self.config_file = config_file
        self._config = {}
        self.load_config()

    def load_config(self):
        """Load plugin configuration from file."""
        config_path = Path(self.config_file)

        if not config_path.exists():
            return

        try:
            with open(config_path, 'r') as f:
                if config_path.suffix in ['.yaml', '.yml']:
                    self._config = yaml.safe_load(f)
                elif config_path.suffix == '.json':
                    self._config = json.load(f)
                else:
                    print(f"Unsupported config file format: {config_path.suffix}")

        except Exception as e:
            print(f"Failed to load plugin config: {e}")

    def get_plugin_config(self, plugin_name: str) -> Dict[str, Any]:
        """Get configuration for a specific plugin."""
        return self._config.get('plugins', {}).get(plugin_name, {})

    def is_plugin_enabled(self, plugin_name: str) -> bool:
        """Check if plugin is enabled."""
        plugin_config = self.get_plugin_config(plugin_name)
        return plugin_config.get('enabled', True)

    def get_enabled_plugins(self) -> List[str]:
        """Get list of enabled plugin names."""
        enabled = []

        for plugin_name, config in self._config.get('plugins', {}).items():
            if config.get('enabled', True):
                enabled.append(plugin_name)

        return enabled

    def apply_configuration(self):
        """Apply configuration to registered plugins."""
        for plugin_name in self.get_enabled_plugins():
            plugin = plugin_registry.get_plugin(plugin_name)

            if plugin:
                config = self.get_plugin_config(plugin_name)
                plugin.configure(config)

# Example configuration file (plugins.yaml):
"""
plugins:
  database:
    enabled: true
    connection_string: "postgresql://localhost/myapp"
    pool_size: 20
    timeout: 30

  caching:
    enabled: true
    backend: "redis"
    redis_url: "redis://localhost:6379"
    ttl: 3600

  logging:
    enabled: true
    level: "INFO"
    format: "json"
    output: "file"
    filename: "app.log"
"""

# Usage
config = PluginConfiguration("plugins.yaml")
config.apply_configuration()

Plugin Management

Plugin Manager

class PluginManager:
    """Comprehensive plugin management system."""

    def __init__(self, container):
        self.container = container
        self.registry = plugin_registry
        self.discovery = PluginDiscovery()
        self.config = PluginConfiguration()
        self._load_order: List[str] = []

    def setup_plugins(self, auto_discover: bool = True, config_file: str = None):
        """Setup all plugins with configuration."""
        if config_file:
            self.config = PluginConfiguration(config_file)

        # Auto-discover plugins
        if auto_discover:
            self.discovery.auto_register_plugins()

        # Apply configuration
        self.config.apply_configuration()

        # Load enabled plugins in dependency order
        self._load_plugins_in_order()

    def _load_plugins_in_order(self):
        """Load plugins in dependency order."""
        enabled_plugins = self.config.get_enabled_plugins()
        loaded = set()

        def load_plugin_recursive(plugin_name: str):
            if plugin_name in loaded:
                return

            plugin = self.registry.get_plugin(plugin_name)
            if not plugin:
                print(f"Plugin '{plugin_name}' not found")
                return

            # Load dependencies first
            for dependency in plugin.metadata.dependencies:
                if dependency in enabled_plugins:
                    load_plugin_recursive(dependency)

            # Load plugin
            try:
                self.registry.load_plugin(plugin_name, self.container)
                loaded.add(plugin_name)
                self._load_order.append(plugin_name)
            except Exception as e:
                print(f"Failed to load plugin '{plugin_name}': {e}")

        # Load all enabled plugins
        for plugin_name in enabled_plugins:
            load_plugin_recursive(plugin_name)

    def reload_plugin(self, plugin_name: str):
        """Reload a specific plugin."""
        # Unload plugin
        self.registry.unload_plugin(plugin_name)

        # Reload configuration
        self.config.load_config()

        # Apply configuration
        plugin = self.registry.get_plugin(plugin_name)
        if plugin:
            config = self.config.get_plugin_config(plugin_name)
            plugin.configure(config)

            # Load plugin
            self.registry.load_plugin(plugin_name, self.container)

    def get_plugin_status(self) -> Dict[str, Dict[str, Any]]:
        """Get status of all plugins."""
        status = {}

        for metadata in self.registry.list_plugins():
            plugin_name = metadata.name

            status[plugin_name] = {
                'metadata': metadata,
                'enabled': self.config.is_plugin_enabled(plugin_name),
                'loaded': self.registry._loaded_plugins.get(plugin_name, False),
                'config': self.config.get_plugin_config(plugin_name)
            }

        return status

    def print_plugin_status(self):
        """Print plugin status report."""
        status = self.get_plugin_status()

        print("Plugin Status Report")
        print("=" * 50)

        for plugin_name, info in status.items():
            metadata = info['metadata']
            enabled = "✅" if info['enabled'] else "❌"
            loaded = "🟢" if info['loaded'] else "🔴"

            print(f"{enabled} {loaded} {plugin_name} v{metadata.version}")
            print(f"    {metadata.description}")

            if metadata.dependencies:
                deps = ", ".join(metadata.dependencies)
                print(f"    Dependencies: {deps}")

            print()

# Usage
def setup_application_with_plugins():
    """Setup application with plugin system."""
    # Create container
    container = MiddlewareContainer()

    # Create plugin manager
    plugin_manager = PluginManager(container)

    # Setup plugins
    plugin_manager.setup_plugins(auto_discover=True, config_file="plugins.yaml")

    # Print status
    plugin_manager.print_plugin_status()

    return container, plugin_manager

# Initialize application
container, plugin_manager = setup_application_with_plugins()

This comprehensive plugin system documentation shows how to extend InjectQ through a flexible and powerful plugin architecture, enabling modular functionality and easy integration of third-party components.