Skip to content

Performance Best Practices

This guide provides comprehensive recommendations for optimizing performance when using InjectQ in production applications.

🚀 Container Configuration Optimization

Use Appropriate Lifecycles

Choosing the right lifecycle for your dependencies is crucial for performance:

from injectq import InjectQ, Module, inject

class PerformanceOptimizedModule(Module):
    def configure(self):
        # Singleton for expensive-to-create objects
        self.bind(DatabaseConnection, DatabaseConnection).singleton()
        self.bind(CacheService, RedisCache).singleton()
        self.bind(LoggingService, LoggingService).singleton()

        # Scoped for stateful services within request boundaries
        self.bind(UserService, UserService).scoped()
        self.bind(OrderService, OrderService).scoped()

        # Transient only when necessary (stateless, lightweight)
        self.bind(ValidationService, ValidationService).transient()
        self.bind(CalculationEngine, CalculationEngine).transient()

# ❌ Bad: Everything as transient
class IneffientModule(Module):
    def configure(self):
        self.bind(DatabaseConnection, DatabaseConnection).transient()  # Expensive!
        self.bind(CacheService, RedisCache).transient()  # Unnecessary overhead!
        self.bind(UserService, UserService).transient()

# ✅ Good: Appropriate lifecycles
class OptimizedModule(Module):
    def configure(self):
        # Expensive resources as singletons
        self.bind(DatabaseConnection, DatabaseConnection).singleton()
        self.bind(CacheService, RedisCache).singleton()

        # Request-scoped services
        self.bind(UserService, UserService).scoped()

Minimize Container Lookups

Cache frequently accessed dependencies instead of repeated container lookups:

# ❌ Bad: Repeated container lookups
class IneffientService:
    @inject
    def __init__(self, container: InjectQ):
        self.container = container

    def process_data(self, data):
        # Expensive lookup every time
        validator = self.container.get(DataValidator)
        processor = self.container.get(DataProcessor)
        logger = self.container.get(Logger)

        # Process data...

# ✅ Good: Inject dependencies directly
class EfficientService:
    @inject
    def __init__(
        self,
        validator: DataValidator,
        processor: DataProcessor,
        logger: Logger
    ):
        self.validator = validator
        self.processor = processor
        self.logger = logger

    def process_data(self, data):
        # Direct access, no container lookups
        self.validator.validate(data)
        result = self.processor.process(data)
        self.logger.log(f"Processed {len(data)} items")
        return result

# ✅ Good: Cache for dynamic lookups
class CachedDynamicService:
    @inject
    def __init__(self, container: InjectQ):
        self.container = container
        self._handler_cache = {}

    def get_handler(self, handler_type: str):
        if handler_type not in self._handler_cache:
            self._handler_cache[handler_type] = self.container.get(
                IHandler, name=handler_type
            )
        return self._handler_cache[handler_type]

Optimize Module Registration

# ✅ Efficient module registration
class ProductionModule(Module):
    def configure(self):
        # Group related bindings
        self._configure_data_layer()
        self._configure_services()
        self._configure_external_apis()

    def _configure_data_layer(self):
        """Configure data access layer efficiently."""
        # Connection pool as singleton
        self.bind(ConnectionPool, self._create_connection_pool()).singleton()

        # Repository base class with shared dependencies
        self.bind(IRepository, BaseRepository, name="base").singleton()

        # Specific repositories inheriting base
        self.bind(UserRepository, UserRepository).scoped()
        self.bind(OrderRepository, OrderRepository).scoped()

    def _create_connection_pool(self) -> ConnectionPool:
        """Factory method for expensive resource creation."""
        return ConnectionPool(
            host="db.example.com",
            port=5432,
            min_connections=5,
            max_connections=20
        )

🏃‍♂️ Runtime Performance Optimization

Async/Await Best Practices

import asyncio
from injectq import InjectQ, inject
from typing import List

class AsyncOptimizedService:
    @inject
    def __init__(
        self,
        database: AsyncDatabase,
        cache: AsyncCache,
        api_client: AsyncAPIClient
    ):
        self.database = database
        self.cache = cache
        self.api_client = api_client

    async def process_user_data(self, user_ids: List[str]) -> List[dict]:
        """Optimized async processing with concurrency."""

        # ✅ Good: Concurrent cache lookups
        cache_tasks = [
            self.cache.get(f"user:{user_id}")
            for user_id in user_ids
        ]
        cache_results = await asyncio.gather(*cache_tasks, return_exceptions=True)

        # Identify cache misses
        missing_users = []
        cached_users = {}

        for user_id, result in zip(user_ids, cache_results):
            if isinstance(result, Exception) or result is None:
                missing_users.append(user_id)
            else:
                cached_users[user_id] = result

        # ✅ Good: Batch database query for missing users
        if missing_users:
            db_users = await self.database.get_users_batch(missing_users)

            # ✅ Good: Concurrent cache updates
            cache_updates = [
                self.cache.set(f"user:{user['id']}", user, ttl=300)
                for user in db_users
            ]
            await asyncio.gather(*cache_updates, return_exceptions=True)

            # Update cached users
            for user in db_users:
                cached_users[user['id']] = user

        return [cached_users[user_id] for user_id in user_ids if user_id in cached_users]

# ❌ Bad: Sequential processing
class SequentialService:
    @inject
    def __init__(self, database: AsyncDatabase, cache: AsyncCache):
        self.database = database
        self.cache = cache

    async def process_user_data(self, user_ids: List[str]) -> List[dict]:
        users = []
        for user_id in user_ids:  # Sequential - slow!
            user = await self.cache.get(f"user:{user_id}")
            if not user:
                user = await self.database.get_user(user_id)  # N+1 queries!
                await self.cache.set(f"user:{user_id}", user)
            users.append(user)
        return users

Memory Management

import weakref
from typing import Dict, Any
from injectq import inject, Module

class MemoryEfficientService:
    """Service that manages memory efficiently."""

    @inject
    def __init__(self, cache_service: CacheService):
        self.cache = cache_service
        self._weak_refs: Dict[str, weakref.ref] = {}
        self._memory_threshold = 100 * 1024 * 1024  # 100MB

    def get_large_object(self, key: str) -> Any:
        """Get large object with memory management."""
        # Check weak reference first
        if key in self._weak_refs:
            obj = self._weak_refs[key]()
            if obj is not None:
                return obj
            else:
                # Object was garbage collected
                del self._weak_refs[key]

        # Load from cache or create
        obj = self.cache.get(key)
        if obj is None:
            obj = self._create_large_object(key)
            self.cache.set(key, obj)

        # Store weak reference
        self._weak_refs[key] = weakref.ref(obj)

        return obj

    def cleanup_memory(self):
        """Periodic memory cleanup."""
        import gc
        import psutil
        import os

        # Check memory usage
        process = psutil.Process(os.getpid())
        memory_usage = process.memory_info().rss

        if memory_usage > self._memory_threshold:
            # Force garbage collection
            gc.collect()

            # Clear weak references to dead objects
            dead_refs = [
                key for key, ref in self._weak_refs.items()
                if ref() is None
            ]
            for key in dead_refs:
                del self._weak_refs[key]

# Module with memory optimization
class MemoryOptimizedModule(Module):
    def configure(self):
        # Use scoped lifecycle for memory-intensive services
        self.bind(DataProcessingService, DataProcessingService).scoped()

        # Singleton for lightweight services
        self.bind(MemoryEfficientService, MemoryEfficientService).singleton()

📊 Monitoring and Profiling

Performance Monitoring

import time
import functools
from dataclasses import dataclass
from typing import Dict, List
from injectq import inject

@dataclass
class PerformanceMetric:
    method_name: str
    execution_time: float
    memory_usage: int
    call_count: int

class PerformanceMonitor:
    """Monitor service performance."""

    def __init__(self):
        self.metrics: Dict[str, List[float]] = {}
        self.call_counts: Dict[str, int] = {}

    def record_execution(self, method_name: str, execution_time: float):
        """Record method execution time."""
        if method_name not in self.metrics:
            self.metrics[method_name] = []
            self.call_counts[method_name] = 0

        self.metrics[method_name].append(execution_time)
        self.call_counts[method_name] += 1

    def get_performance_report(self) -> List[PerformanceMetric]:
        """Generate performance report."""
        report = []

        for method_name, times in self.metrics.items():
            avg_time = sum(times) / len(times)
            call_count = self.call_counts[method_name]

            report.append(PerformanceMetric(
                method_name=method_name,
                execution_time=avg_time,
                memory_usage=0,  # Would need psutil for actual memory tracking
                call_count=call_count
            ))

        return sorted(report, key=lambda x: x.execution_time, reverse=True)

def monitor_performance(monitor: PerformanceMonitor):
    """Decorator to monitor method performance."""
    def decorator(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            start_time = time.perf_counter()
            try:
                result = await func(*args, **kwargs)
                return result
            finally:
                end_time = time.perf_counter()
                execution_time = end_time - start_time
                monitor.record_execution(
                    f"{func.__module__}.{func.__qualname__}",
                    execution_time
                )

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            start_time = time.perf_counter()
            try:
                result = func(*args, **kwargs)
                return result
            finally:
                end_time = time.perf_counter()
                execution_time = end_time - start_time
                monitor.record_execution(
                    f"{func.__module__}.{func.__qualname__}",
                    execution_time
                )

        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

    return decorator

class MonitoredService:
    @inject
    def __init__(self, database: Database, monitor: PerformanceMonitor):
        self.database = database
        self.monitor = monitor

    @monitor_performance
    async def process_data(self, data: List[dict]) -> List[dict]:
        """Monitored data processing method."""
        results = []
        for item in data:
            processed = await self._process_item(item)
            results.append(processed)
        return results

    async def _process_item(self, item: dict) -> dict:
        # Processing logic here
        await asyncio.sleep(0.01)  # Simulate work
        return {"processed": True, **item}

Database Connection Optimization

import asyncpg
from contextlib import asynccontextmanager
from injectq import inject, Module

class OptimizedDatabaseService:
    """Database service with connection pooling and optimization."""

    @inject
    def __init__(self, connection_pool: asyncpg.Pool):
        self.pool = connection_pool

    @asynccontextmanager
    async def get_connection(self):
        """Context manager for database connections."""
        async with self.pool.acquire() as connection:
            yield connection

    async def execute_batch_query(self, queries: List[str]) -> List[Any]:
        """Execute multiple queries efficiently."""
        async with self.get_connection() as conn:
            async with conn.transaction():
                results = []
                for query in queries:
                    result = await conn.fetch(query)
                    results.append(result)
                return results

    async def bulk_insert(self, table: str, data: List[Dict]) -> int:
        """Optimized bulk insert operation."""
        if not data:
            return 0

        # Prepare column names and placeholders
        columns = list(data[0].keys())
        column_names = ", ".join(columns)
        placeholders = ", ".join(f"${i+1}" for i in range(len(columns)))

        query = f"INSERT INTO {table} ({column_names}) VALUES ({placeholders})"

        async with self.get_connection() as conn:
            # Use executemany for bulk operations
            values = [[row[col] for col in columns] for row in data]
            await conn.executemany(query, values)
            return len(data)

class DatabaseModule(Module):
    def configure(self):
        # Connection pool configuration
        self.bind(asyncpg.Pool, self._create_pool()).singleton()
        self.bind(OptimizedDatabaseService, OptimizedDatabaseService).singleton()

    async def _create_pool(self) -> asyncpg.Pool:
        """Create optimized connection pool."""
        return await asyncpg.create_pool(
            "postgresql://user:pass@localhost/db",
            min_size=5,
            max_size=20,
            command_timeout=60,
            server_settings={
                'application_name': 'injectq_app',
                'jit': 'off'  # Disable JIT for small queries
            }
        )

🎯 Caching Strategies

Multi-Level Caching

from abc import ABC, abstractmethod
from typing import Optional, Any
import asyncio
import json
import hashlib

class ICacheLayer(ABC):
    @abstractmethod
    async def get(self, key: str) -> Optional[Any]:
        pass

    @abstractmethod
    async def set(self, key: str, value: Any, ttl: int = 300) -> bool:
        pass

    @abstractmethod
    async def delete(self, key: str) -> bool:
        pass

class MemoryCache(ICacheLayer):
    """In-memory cache layer (L1)."""

    def __init__(self, max_size: int = 1000):
        self._cache: Dict[str, Any] = {}
        self._max_size = max_size

    async def get(self, key: str) -> Optional[Any]:
        return self._cache.get(key)

    async def set(self, key: str, value: Any, ttl: int = 300) -> bool:
        if len(self._cache) >= self._max_size:
            # Simple LRU eviction (remove first item)
            oldest_key = next(iter(self._cache))
            del self._cache[oldest_key]

        self._cache[key] = value
        return True

    async def delete(self, key: str) -> bool:
        self._cache.pop(key, None)
        return True

class RedisCache(ICacheLayer):
    """Redis cache layer (L2)."""

    @inject
    def __init__(self, redis_client: RedisClient):
        self.redis = redis_client

    async def get(self, key: str) -> Optional[Any]:
        data = await self.redis.get(key)
        if data:
            return json.loads(data)
        return None

    async def set(self, key: str, value: Any, ttl: int = 300) -> bool:
        data = json.dumps(value, default=str)
        return await self.redis.setex(key, ttl, data)

    async def delete(self, key: str) -> bool:
        return await self.redis.delete(key) > 0

class MultiLevelCache:
    """Multi-level caching system."""

    @inject
    def __init__(self, l1_cache: MemoryCache, l2_cache: RedisCache):
        self.l1 = l1_cache
        self.l2 = l2_cache

    async def get(self, key: str) -> Optional[Any]:
        # Try L1 cache first
        value = await self.l1.get(key)
        if value is not None:
            return value

        # Try L2 cache
        value = await self.l2.get(key)
        if value is not None:
            # Populate L1 cache
            await self.l1.set(key, value, ttl=60)  # Shorter TTL for L1
            return value

        return None

    async def set(self, key: str, value: Any, ttl: int = 300) -> bool:
        # Set in both layers
        l1_result = await self.l1.set(key, value, min(ttl, 60))
        l2_result = await self.l2.set(key, value, ttl)
        return l1_result and l2_result

    async def delete(self, key: str) -> bool:
        # Delete from both layers
        l1_result = await self.l1.delete(key)
        l2_result = await self.l2.delete(key)
        return l1_result or l2_result

class CachedDataService:
    """Service with intelligent caching."""

    @inject
    def __init__(
        self,
        database: DatabaseService,
        cache: MultiLevelCache
    ):
        self.database = database
        self.cache = cache

    async def get_user_profile(self, user_id: str) -> Optional[dict]:
        """Get user profile with caching."""
        cache_key = f"user_profile:{user_id}"

        # Try cache first
        cached_profile = await self.cache.get(cache_key)
        if cached_profile:
            return cached_profile

        # Load from database
        profile = await self.database.get_user_profile(user_id)
        if profile:
            # Cache with appropriate TTL
            await self.cache.set(cache_key, profile, ttl=1800)  # 30 minutes

        return profile

    async def update_user_profile(self, user_id: str, updates: dict) -> bool:
        """Update user profile and invalidate cache."""
        success = await self.database.update_user_profile(user_id, updates)

        if success:
            # Invalidate cache
            cache_key = f"user_profile:{user_id}"
            await self.cache.delete(cache_key)

        return success

    def _generate_cache_key(self, prefix: str, **kwargs) -> str:
        """Generate consistent cache key."""
        key_data = f"{prefix}:" + ":".join(f"{k}={v}" for k, v in sorted(kwargs.items()))
        return hashlib.md5(key_data.encode()).hexdigest()

🔥 Production Performance Checklist

Container Setup

  • Use appropriate lifecycles: Singletons for expensive resources, scoped for request-bound services
  • Minimize container lookups: Inject dependencies directly instead of container access
  • Cache dynamic lookups: Store frequently accessed named dependencies
  • Avoid circular dependencies: Design clean dependency graphs
  • Use lazy loading: Only create dependencies when needed

Async Performance

  • Use concurrent operations: Batch operations with asyncio.gather()
  • Implement connection pooling: Reuse database and HTTP connections
  • Optimize database queries: Use bulk operations and prepared statements
  • Add circuit breakers: Prevent cascade failures in external service calls
  • Implement timeouts: Set appropriate timeouts for all async operations

Memory Management

  • Monitor memory usage: Track memory consumption in production
  • Use weak references: For large objects that can be garbage collected
  • Implement cleanup routines: Periodic memory cleanup for long-running services
  • Profile memory leaks: Regular memory profiling to identify leaks
  • Configure garbage collection: Tune Python GC settings for your workload

Caching Strategy

  • Multi-level caching: Implement L1 (memory) and L2 (Redis) caching
  • Cache invalidation: Clear cache when data changes
  • TTL optimization: Set appropriate cache expiration times
  • Cache warming: Pre-populate cache for frequently accessed data
  • Monitor cache hit rates: Track and optimize cache effectiveness

Monitoring and Observability

  • Performance metrics: Track response times and throughput
  • Error monitoring: Log and alert on errors and exceptions
  • Resource utilization: Monitor CPU, memory, and I/O usage
  • Dependency health: Monitor external service health
  • Custom metrics: Track business-specific performance indicators

Example Production Configuration

# production_config.py
from injectq import InjectQ, Module
import asyncio
import logging

class ProductionModule(Module):
    def configure(self):
        # Performance optimized bindings
        self._configure_database()
        self._configure_caching()
        self._configure_monitoring()

    def _configure_database(self):
        """Configure optimized database connections."""
        self.bind(DatabaseConfig, DatabaseConfig(
            max_connections=20,
            min_connections=5,
            connection_timeout=30,
            command_timeout=60
        )).singleton()

        self.bind(ConnectionPool, self._create_connection_pool()).singleton()
        self.bind(DatabaseService, OptimizedDatabaseService).singleton()

    def _configure_caching(self):
        """Configure multi-level caching."""
        self.bind(MemoryCache, MemoryCache(max_size=1000)).singleton()
        self.bind(RedisCache, RedisCache).singleton()
        self.bind(MultiLevelCache, MultiLevelCache).singleton()

    def _configure_monitoring(self):
        """Configure performance monitoring."""
        self.bind(PerformanceMonitor, PerformanceMonitor).singleton()
        self.bind(MetricsCollector, MetricsCollector).singleton()

# Usage in production
async def main():
    # Setup container with production optimizations
    container = InjectQ()
    container.install(ProductionModule())

    # Pre-warm critical services
    database = container.get(DatabaseService)
    cache = container.get(MultiLevelCache)

    # Start performance monitoring
    monitor = container.get(PerformanceMonitor)

    # Application startup
    app = container.get(Application)
    await app.start()

if __name__ == "__main__":
    # Configure logging for production
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

    asyncio.run(main())

Following these performance best practices will ensure your InjectQ applications run efficiently in production environments with optimal resource utilization and response times.