Resolver API¶
::: injectq.core.resolver ::: injectq.core.thread_safe_resolver
Overview¶
The Resolver is responsible for creating service instances and resolving their dependencies. It works with the Registry to locate service bindings and with Scopes to manage instance lifecycle.
Core Resolution Process¶
Dependency Resolution¶
The resolver follows a systematic approach to create service instances:
- Binding Lookup: Find the service binding in the registry
- Dependency Analysis: Analyze constructor dependencies
- Recursive Resolution: Resolve all dependencies
- Instance Creation: Create the service instance
- Scope Management: Store instance in appropriate scope
# Basic resolution example (internal API)
from injectq.core.resolver import DependencyResolver
resolver = DependencyResolver(registry, scope_manager)
# Resolve a service instance
instance = resolver.resolve(UserService, current_scope)
Resolution Strategies¶
Constructor Resolution¶
The resolver analyzes constructor signatures to determine dependencies.
class UserService:
def __init__(self, user_repo: UserRepository, logger: Logger):
self.user_repo = user_repo
self.logger = logger
# Resolver automatically detects UserRepository and Logger dependencies
# and resolves them before creating UserService
Property Resolution¶
For services that use property injection:
class EmailService:
def __init__(self):
self.smtp_client = None # Will be injected
self.config = None # Will be injected
@property
def smtp_client(self) -> SMTPClient:
return self._smtp_client
@smtp_client.setter
def smtp_client(self, value: SMTPClient):
self._smtp_client = value
# Resolver can inject properties after instance creation
Method Resolution¶
For services that need method-level injection:
class ProcessingService:
@inject
def process(self, data: str, processor: DataProcessor) -> str:
return processor.process(data)
# Resolver provides dependencies when method is called
Advanced Resolution Features¶
Circular Dependency Detection¶
class CircularDependencyDetector:
"""Detects and prevents circular dependencies during resolution."""
def __init__(self):
self.resolution_stack = []
self.visited = set()
def check_circular_dependency(self, service_type: type):
"""Check if resolving this service would create a circular dependency."""
if service_type in self.resolution_stack:
cycle = self.resolution_stack[self.resolution_stack.index(service_type):]
cycle_str = " -> ".join(t.__name__ for t in cycle + [service_type])
raise CircularDependencyError(f"Circular dependency detected: {cycle_str}")
def enter_resolution(self, service_type: type):
"""Mark service as being resolved."""
self.check_circular_dependency(service_type)
self.resolution_stack.append(service_type)
def exit_resolution(self, service_type: type):
"""Mark service resolution as complete."""
if self.resolution_stack and self.resolution_stack[-1] == service_type:
self.resolution_stack.pop()
Lazy Resolution¶
class LazyResolver:
"""Resolver that defers dependency resolution until first access."""
def __init__(self, base_resolver: DependencyResolver):
self.base_resolver = base_resolver
self.lazy_proxies = {}
def resolve_lazy(self, service_type: type, scope):
"""Create a lazy proxy for the service."""
if service_type not in self.lazy_proxies:
proxy = LazyProxy(lambda: self.base_resolver.resolve(service_type, scope))
self.lazy_proxies[service_type] = proxy
return self.lazy_proxies[service_type]
class LazyProxy:
"""Proxy that resolves the actual service on first access."""
def __init__(self, factory):
self._factory = factory
self._instance = None
self._resolved = False
def __getattr__(self, name):
if not self._resolved:
self._instance = self._factory()
self._resolved = True
return getattr(self._instance, name)
Generic Type Resolution¶
from typing import Generic, TypeVar, get_origin, get_args
T = TypeVar('T')
class Repository(Generic[T]):
"""Generic repository interface."""
pass
class UserRepository(Repository[User]):
"""Concrete user repository."""
pass
class GenericResolver:
"""Resolver that handles generic types."""
def resolve_generic(self, service_type: type, scope):
"""Resolve generic service types."""
origin = get_origin(service_type)
args = get_args(service_type)
if origin and args:
# Handle generic types like Repository[User]
concrete_type = self.find_concrete_implementation(origin, args)
return self.resolve(concrete_type, scope)
return self.resolve(service_type, scope)
def find_concrete_implementation(self, generic_type, type_args):
"""Find concrete implementation for generic type."""
# Look for registered implementations that match the generic pattern
for registered_type, binding in self.registry.get_all_services().items():
if (hasattr(registered_type, '__origin__') and
registered_type.__origin__ == generic_type and
registered_type.__args__ == type_args):
return binding.implementation
raise ServiceNotFoundError(f"No implementation found for {generic_type}[{type_args}]")
Thread-Safe Resolution¶
Thread-Safe Resolver¶
import threading
from concurrent.futures import ThreadPoolExecutor
class ThreadSafeResolver(DependencyResolver):
"""Thread-safe dependency resolver."""
def __init__(self, registry, scope_manager):
super().__init__(registry, scope_manager)
self._lock = threading.RLock()
self._thread_local = threading.local()
def resolve(self, service_type: type, scope):
"""Thread-safe service resolution."""
with self._lock:
# Use thread-local resolution context
if not hasattr(self._thread_local, 'resolution_context'):
self._thread_local.resolution_context = ResolutionContext()
context = self._thread_local.resolution_context
try:
context.enter_resolution(service_type)
return super().resolve(service_type, scope)
finally:
context.exit_resolution(service_type)
class ResolutionContext:
"""Thread-local context for dependency resolution."""
def __init__(self):
self.resolution_stack = []
self.resolved_instances = {}
def enter_resolution(self, service_type: type):
"""Enter resolution for a service type."""
if service_type in self.resolution_stack:
raise CircularDependencyError(f"Circular dependency: {service_type}")
self.resolution_stack.append(service_type)
def exit_resolution(self, service_type: type):
"""Exit resolution for a service type."""
if self.resolution_stack and self.resolution_stack[-1] == service_type:
self.resolution_stack.pop()
Concurrent Resolution¶
class ConcurrentResolver:
"""Resolver optimized for concurrent access."""
def __init__(self, registry, scope_manager, max_workers: int = 4):
self.registry = registry
self.scope_manager = scope_manager
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.resolution_cache = {}
self.cache_lock = threading.RLock()
async def resolve_async(self, service_type: type, scope):
"""Asynchronously resolve service dependencies."""
loop = asyncio.get_event_loop()
# Submit resolution to thread pool
future = self.executor.submit(self.resolve, service_type, scope)
# Wait for result asynchronously
return await loop.run_in_executor(None, future.result)
def resolve_batch(self, service_types: List[type], scope):
"""Resolve multiple services concurrently."""
futures = []
for service_type in service_types:
future = self.executor.submit(self.resolve, service_type, scope)
futures.append((service_type, future))
results = {}
for service_type, future in futures:
try:
results[service_type] = future.result()
except Exception as e:
results[service_type] = e
return results
Custom Resolvers¶
Factory-Based Resolver¶
class FactoryResolver(DependencyResolver):
"""Resolver that prioritizes factory functions."""
def resolve(self, service_type: type, scope):
"""Resolve using custom factory logic."""
binding = self.registry.get_binding(service_type)
if binding and binding.is_factory:
# Use factory function
factory = binding.implementation
# Resolve factory dependencies
factory_dependencies = self.analyze_dependencies(factory)
dependency_instances = {}
for dep_name, dep_type in factory_dependencies.items():
dependency_instances[dep_name] = self.resolve(dep_type, scope)
# Call factory with resolved dependencies
return factory(**dependency_instances)
return super().resolve(service_type, scope)
Decorator-Aware Resolver¶
class DecoratorResolver(DependencyResolver):
"""Resolver that handles service decorators."""
def resolve(self, service_type: type, scope):
"""Resolve with decorator support."""
instance = super().resolve(service_type, scope)
# Apply decorators if present
decorators = self.get_service_decorators(service_type)
for decorator in decorators:
instance = decorator(instance)
return instance
def get_service_decorators(self, service_type: type):
"""Get decorators to apply to service."""
decorators = []
# Check for logging decorator
if hasattr(service_type, '__enable_logging__'):
decorators.append(LoggingDecorator())
# Check for caching decorator
if hasattr(service_type, '__enable_caching__'):
decorators.append(CachingDecorator())
# Check for retry decorator
if hasattr(service_type, '__enable_retry__'):
decorators.append(RetryDecorator())
return decorators
class LoggingDecorator:
"""Decorator that adds logging to service methods."""
def __call__(self, instance):
original_methods = {}
for attr_name in dir(instance):
attr = getattr(instance, attr_name)
if callable(attr) and not attr_name.startswith('_'):
original_methods[attr_name] = attr
def logged_method(original=attr, name=attr_name):
def wrapper(*args, **kwargs):
print(f"Calling {name} on {instance.__class__.__name__}")
return original(*args, **kwargs)
return wrapper
setattr(instance, attr_name, logged_method())
return instance
Performance Optimization¶
Caching Resolver¶
class CachingResolver(DependencyResolver):
"""Resolver with instance caching for better performance."""
def __init__(self, registry, scope_manager, cache_size: int = 1000):
super().__init__(registry, scope_manager)
self.instance_cache = {}
self.cache_size = cache_size
self.cache_hits = 0
self.cache_misses = 0
def resolve(self, service_type: type, scope):
"""Resolve with caching."""
cache_key = (service_type, id(scope))
# Check cache first
if cache_key in self.instance_cache:
self.cache_hits += 1
return self.instance_cache[cache_key]
# Resolve normally
instance = super().resolve(service_type, scope)
# Cache if space available
if len(self.instance_cache) < self.cache_size:
self.instance_cache[cache_key] = instance
self.cache_misses += 1
return instance
def get_cache_stats(self):
"""Get cache performance statistics."""
total = self.cache_hits + self.cache_misses
hit_rate = self.cache_hits / total if total > 0 else 0
return {
'hits': self.cache_hits,
'misses': self.cache_misses,
'hit_rate': hit_rate,
'cache_size': len(self.instance_cache)
}
Profiling Resolver¶
import time
from collections import defaultdict
class ProfilingResolver(DependencyResolver):
"""Resolver that collects performance metrics."""
def __init__(self, registry, scope_manager):
super().__init__(registry, scope_manager)
self.resolution_times = defaultdict(list)
self.resolution_counts = defaultdict(int)
def resolve(self, service_type: type, scope):
"""Resolve with performance profiling."""
start_time = time.perf_counter()
try:
instance = super().resolve(service_type, scope)
return instance
finally:
resolution_time = time.perf_counter() - start_time
self.resolution_times[service_type].append(resolution_time)
self.resolution_counts[service_type] += 1
def get_performance_report(self):
"""Get detailed performance report."""
report = {}
for service_type, times in self.resolution_times.items():
report[service_type.__name__] = {
'count': self.resolution_counts[service_type],
'total_time': sum(times),
'average_time': sum(times) / len(times),
'min_time': min(times),
'max_time': max(times)
}
return report
Error Handling¶
Resilient Resolver¶
class ResilientResolver(DependencyResolver):
"""Resolver with error recovery capabilities."""
def __init__(self, registry, scope_manager, max_retries: int = 3):
super().__init__(registry, scope_manager)
self.max_retries = max_retries
self.fallback_factories = {}
def resolve(self, service_type: type, scope):
"""Resolve with error recovery."""
last_error = None
for attempt in range(self.max_retries):
try:
return super().resolve(service_type, scope)
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
print(f"Resolution attempt {attempt + 1} failed for {service_type.__name__}: {e}")
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
# Try fallback if available
if service_type in self.fallback_factories:
try:
return self.fallback_factories[service_type]()
except Exception as fallback_error:
print(f"Fallback also failed for {service_type.__name__}: {fallback_error}")
raise last_error
def register_fallback(self, service_type: type, factory):
"""Register fallback factory for service type."""
self.fallback_factories[service_type] = factory