Skip to content

Resource Management

Resource management provides automatic lifecycle management for dependencies that require initialization and cleanup, such as database connections, file handles, and network sockets.

🎯 Resource Lifecycle

Basic Resource Pattern

from injectq import resource, InjectQ
from typing import AsyncContextManager

@resource
class DatabaseConnection:
    """Database connection with automatic lifecycle management."""

    def __init__(self, config: DatabaseConfig):
        self.config = config
        self.connection = None
        self.is_initialized = False

    async def initialize(self):
        """Initialize the resource."""
        if not self.is_initialized:
            self.connection = await create_database_connection(self.config)
            self.is_initialized = True
            print(f"Database connection initialized: {id(self.connection)}")

    async def dispose(self):
        """Clean up the resource."""
        if self.connection and self.is_initialized:
            await self.connection.close()
            self.is_initialized = False
            print(f"Database connection disposed: {id(self.connection)}")

    async def execute_query(self, query: str):
        """Use the resource."""
        if not self.is_initialized:
            await self.initialize()
        return await self.connection.execute(query)

# Usage
async def main():
    container = InjectQ()
    container.bind(DatabaseConnection, DatabaseConnection(DatabaseConfig()))

    async with container.resource_scope():
        db = container.get(DatabaseConnection)

        # Resource automatically initialized
        result = await db.execute_query("SELECT * FROM users")
        print(f"Query result: {result}")

    # Resource automatically disposed

Resource Scope Management

# Resource scope ensures proper cleanup
async def handle_request(container):
    async with container.resource_scope():
        # Resources initialized when first accessed
        db = container.get(DatabaseConnection)
        cache = container.get(CacheConnection)

        # Use resources
        user_data = await db.execute_query("SELECT * FROM users WHERE id = ?", request.user_id)
        await cache.set(f"user:{request.user_id}", user_data)

        return {"user": user_data}

    # All resources automatically disposed here

# Nested scopes
async def complex_operation(container):
    async with container.resource_scope() as outer_scope:
        # Outer scope resources
        db = container.get(DatabaseConnection)

        async with container.resource_scope() as inner_scope:
            # Inner scope can access outer scope resources
            cache = container.get(CacheConnection)
            # Inner scope resources disposed here

        # Outer scope resources still available
        await db.execute_query("COMMIT")

    # All resources disposed here

🔧 Resource Types

Connection Pool Resources

@resource
class DatabasePool:
    """Database connection pool with resource management."""

    def __init__(self, config: DatabaseConfig):
        self.config = config
        self.pool = None
        self.active_connections = 0
        self.max_connections = config.max_connections

    async def initialize(self):
        """Initialize the connection pool."""
        self.pool = await create_connection_pool(
            host=self.config.host,
            port=self.config.port,
            database=self.config.database,
            min_size=self.config.min_connections,
            max_size=self.config.max_connections
        )
        print(f"Database pool initialized with {self.max_connections} max connections")

    async def dispose(self):
        """Close all connections in the pool."""
        if self.pool:
            await self.pool.close()
            print("Database pool disposed")

    async def acquire_connection(self):
        """Acquire a connection from the pool."""
        if not self.pool:
            await self.initialize()

        if self.active_connections >= self.max_connections:
            raise ResourceExhaustedError("No available connections")

        connection = await self.pool.acquire()
        self.active_connections += 1

        # Return connection with automatic release
        return ConnectionWrapper(connection, self)

    def release_connection(self, connection):
        """Release a connection back to the pool."""
        if hasattr(connection, '_raw_connection'):
            self.pool.release(connection._raw_connection)
        self.active_connections -= 1

class ConnectionWrapper:
    """Wrapper that automatically releases connection."""
    def __init__(self, connection, pool: DatabasePool):
        self._connection = connection
        self._pool = pool

    async def __aenter__(self):
        return self._connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._pool.release_connection(self)

# Usage
async def query_with_pool(container):
    pool = container.get(DatabasePool)

    async with container.resource_scope():
        async with pool.acquire_connection() as conn:
            result = await conn.execute("SELECT * FROM users")
            return result

File Handle Resources

@resource
class FileManager:
    """File handle manager with automatic cleanup."""

    def __init__(self, base_path: str):
        self.base_path = Path(base_path)
        self.open_files = {}
        self.lock = asyncio.Lock()

    async def initialize(self):
        """Ensure base directory exists."""
        self.base_path.mkdir(parents=True, exist_ok=True)
        print(f"File manager initialized at {self.base_path}")

    async def dispose(self):
        """Close all open files."""
        async with self.lock:
            for file_path, file_handle in self.open_files.items():
                await file_handle.close()
                print(f"Closed file: {file_path}")
            self.open_files.clear()

    async def open_file(self, filename: str, mode: str = 'r'):
        """Open a file with automatic management."""
        file_path = self.base_path / filename

        async with self.lock:
            if str(file_path) in self.open_files:
                return self.open_files[str(file_path)]

            file_handle = await aiofiles.open(file_path, mode)
            self.open_files[str(file_path)] = file_handle

            return FileWrapper(file_handle, str(file_path), self)

    def close_file(self, file_path: str):
        """Close a specific file."""
        if file_path in self.open_files:
            # Note: In real implementation, this would be async
            # For simplicity, we'll mark for cleanup
            pass

class FileWrapper:
    """Wrapper for file handles with automatic cleanup."""
    def __init__(self, file_handle, file_path: str, manager: FileManager):
        self._file_handle = file_handle
        self._file_path = file_path
        self._manager = manager

    async def __aenter__(self):
        return self._file_handle

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._file_handle.close()
        self._manager.close_file(self._file_path)

# Usage
async def process_file(container):
    file_manager = container.get(FileManager)

    async with container.resource_scope():
        async with file_manager.open_file("data.txt", "r") as f:
            content = await f.read()
            return content

Network Connection Resources

@resource
class HTTPClientPool:
    """HTTP client pool with connection management."""

    def __init__(self, config: HTTPConfig):
        self.config = config
        self.session = None
        self.connector = None

    async def initialize(self):
        """Initialize HTTP client with connection pool."""
        self.connector = aiohttp.TCPConnector(
            limit=self.config.max_connections,
            limit_per_host=self.config.max_connections_per_host,
            ttl_dns_cache=self.config.dns_cache_ttl
        )

        self.session = aiohttp.ClientSession(
            connector=self.connector,
            timeout=aiohttp.ClientTimeout(total=self.config.timeout)
        )

        print(f"HTTP client pool initialized with {self.config.max_connections} connections")

    async def dispose(self):
        """Close HTTP client and connections."""
        if self.session:
            await self.session.close()
            print("HTTP client pool disposed")

    async def request(self, method: str, url: str, **kwargs):
        """Make HTTP request using pooled connection."""
        if not self.session:
            await self.initialize()

        async with self.session.request(method, url, **kwargs) as response:
            return await response.json()

# Usage
async def fetch_data(container):
    http_client = container.get(HTTPClientPool)

    async with container.resource_scope():
        # Connection automatically managed
        data = await http_client.request('GET', 'https://api.example.com/data')
        return data

🎨 Resource Patterns

Resource Factory Pattern

class ResourceFactory:
    """Factory for creating different types of resources."""

    @staticmethod
    def create_database_pool(config: DatabaseConfig):
        @resource
        class DatabasePoolResource:
            def __init__(self):
                self.pool = None

            async def initialize(self):
                self.pool = await create_pool(config)

            async def dispose(self):
                if self.pool:
                    await self.pool.close()

            def get_pool(self):
                return self.pool

        return DatabasePoolResource()

    @staticmethod
    def create_cache_client(config: CacheConfig):
        @resource
        class CacheResource:
            def __init__(self):
                self.client = None

            async def initialize(self):
                self.client = await create_cache_client(config)

            async def dispose(self):
                if self.client:
                    await self.client.close()

            def get_client(self):
                return self.client

        return CacheResource()

# Usage
def setup_resources(container, db_config, cache_config):
    # Create and bind resources
    container.bind(DatabasePool, ResourceFactory.create_database_pool(db_config))
    container.bind(CacheClient, ResourceFactory.create_cache_client(cache_config))

Resource Decorator Pattern

def managed_resource(initialize_func=None, dispose_func=None):
    """Decorator to create managed resources."""
    def decorator(cls):
        original_init = cls.__init__

        async def __init__(self, *args, **kwargs):
            await original_init(self, *args, **kwargs)
            if initialize_func:
                await initialize_func(self)

        async def dispose(self):
            if dispose_func:
                await dispose_func(self)

        cls.__init__ = __init__
        cls.dispose = dispose

        # Mark as resource
        cls._is_injectq_resource = True

        return cls

    return decorator

# Usage
@managed_resource(
    initialize_func=lambda self: self.connect(),
    dispose_func=lambda self: self.disconnect()
)
class RedisClient:
    def __init__(self, config: RedisConfig):
        self.config = config
        self.connection = None

    async def connect(self):
        self.connection = await redis.create_connection(self.config.url)

    async def disconnect(self):
        if self.connection:
            self.connection.close()

    async def get(self, key: str):
        return await self.connection.get(key)

    async def set(self, key: str, value: str):
        return await self.connection.set(key, value)

# Automatic resource management
async def use_redis(container):
    redis_client = container.get(RedisClient)

    async with container.resource_scope():
        await redis_client.set("key", "value")
        result = await redis_client.get("key")
        return result

Resource Pool Pattern

@resource
class GenericResourcePool:
    """Generic resource pool for any type of resource."""

    def __init__(self, factory, max_size: int = 10):
        self.factory = factory
        self.max_size = max_size
        self.available = asyncio.Queue(maxsize=max_size)
        self.in_use = set()
        self.lock = asyncio.Lock()

    async def initialize(self):
        """Pre-populate the pool."""
        for _ in range(self.max_size):
            resource = await self.factory()
            await self.available.put(resource)
        print(f"Resource pool initialized with {self.max_size} resources")

    async def dispose(self):
        """Clean up all resources."""
        async with self.lock:
            # Close available resources
            while not self.available.empty():
                resource = await self.available.get()
                await self.cleanup_resource(resource)

            # Close in-use resources
            for resource in self.in_use:
                await self.cleanup_resource(resource)

            self.in_use.clear()

    async def acquire(self):
        """Acquire a resource from the pool."""
        resource = await self.available.get()

        async with self.lock:
            self.in_use.add(resource)

        return PooledResource(resource, self)

    def release(self, resource):
        """Release a resource back to the pool."""
        async with self.lock:
            if resource in self.in_use:
                self.in_use.remove(resource)
                self.available.put_nowait(resource)

    async def cleanup_resource(self, resource):
        """Clean up a single resource."""
        if hasattr(resource, 'close'):
            await resource.close()
        elif hasattr(resource, 'dispose'):
            await resource.dispose()

class PooledResource:
    """Wrapper for pooled resources."""
    def __init__(self, resource, pool: GenericResourcePool):
        self.resource = resource
        self.pool = pool

    async def __aenter__(self):
        return self.resource

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.pool.release(self.resource)

# Usage
async def create_database_connection():
    # Factory function for database connections
    return await create_connection()

async def use_pooled_resources(container):
    # Create pool of database connections
    pool = GenericResourcePool(create_database_connection, max_size=5)
    container.bind(DatabasePool, pool)

    async with container.resource_scope():
        async with pool.acquire() as conn:
            result = await conn.execute("SELECT * FROM users")
            return result

🚨 Resource Management Best Practices

✅ Good Patterns

1. Proper Resource Cleanup

# ✅ Good: Use resource scopes
async def handle_request(container):
    async with container.resource_scope():
        db = container.get(DatabaseConnection)
        cache = container.get(CacheClient)

        # Resources automatically cleaned up
        result = await process_request(db, cache)
        return result

# ✅ Good: Explicit cleanup in synchronous code
def process_sync(container):
    with container.resource_scope():
        service = container.get(Service)
        result = service.process()
        return result

2. Resource Error Handling

# ✅ Good: Handle resource initialization errors
@resource
class UnreliableResource:
    async def initialize(self):
        try:
            self.resource = await create_unreliable_resource()
        except Exception as e:
            print(f"Failed to initialize resource: {e}")
            raise ResourceInitializationError(f"Resource init failed: {e}")

    async def dispose(self):
        try:
            if self.resource:
                await self.resource.cleanup()
        except Exception as e:
            print(f"Error during resource cleanup: {e}")
            # Don't re-raise in dispose

# ✅ Good: Graceful degradation
async def use_resource_with_fallback(container):
    try:
        async with container.resource_scope():
            resource = container.get(PrimaryResource)
            return await resource.process()
    except ResourceInitializationError:
        # Fallback to secondary resource
        async with container.resource_scope():
            fallback = container.get(FallbackResource)
            return await fallback.process()

3. Resource Monitoring

# ✅ Good: Monitor resource usage
@resource
class MonitoredDatabasePool:
    def __init__(self):
        self.active_connections = 0
        self.total_connections_created = 0
        self.connection_times = []

    async def initialize(self):
        # Initialize monitoring
        self.start_time = time.time()

    async def acquire_connection(self):
        start_time = time.time()
        connection = await self._acquire()
        connection_time = time.time() - start_time

        self.connection_times.append(connection_time)
        self.active_connections += 1

        # Log slow connections
        if connection_time > 1.0:  # 1 second
            print(f"Slow connection acquisition: {connection_time}s")

        return connection

    def get_metrics(self):
        return {
            "active_connections": self.active_connections,
            "total_created": self.total_connections_created,
            "avg_connection_time": sum(self.connection_times) / len(self.connection_times) if self.connection_times else 0,
            "max_connection_time": max(self.connection_times) if self.connection_times else 0
        }

❌ Bad Patterns

1. Manual Resource Management

# ❌ Bad: Manual resource management
async def bad_resource_handling(container):
    db = container.get(DatabaseConnection)

    # Manual initialization - error prone
    await db.initialize()

    try:
        result = await db.query("SELECT * FROM users")
        return result
    finally:
        # Manual cleanup - easy to forget
        await db.dispose()

# ❌ Bad: Resource leaks
def leaky_function(container):
    resource = container.get(SomeResource)
    # No cleanup - resource leak!
    return resource.do_something()

2. Resource Exhaustion

# ❌ Bad: No limits on resource usage
@resource
class UnlimitedPool:
    def __init__(self):
        self.connections = []

    async def acquire_connection(self):
        # Create unlimited connections - can exhaust system
        conn = await create_connection()
        self.connections.append(conn)
        return conn

# ❌ Bad: Long-running resources
@resource
class LongRunningResource:
    async def initialize(self):
        # Very slow initialization
        await asyncio.sleep(30)  # 30 seconds!

    async def dispose(self):
        # Slow cleanup
        await asyncio.sleep(10)

3. Improper Error Handling

# ❌ Bad: Exceptions in dispose
@resource
class BadDisposeResource:
    async def dispose(self):
        # Don't raise exceptions in dispose
        if self.resource:
            await self.resource.close()
        raise Exception("Dispose failed!")  # Bad!

# ❌ Bad: Ignoring dispose errors
@resource
class IgnoringErrorsResource:
    async def dispose(self):
        try:
            await self.resource.close()
        except Exception:
            pass  # Silently ignore - can hide issues

📊 Resource Monitoring

Resource Usage Metrics

class ResourceMonitor:
    """Monitor resource usage across the application."""

    def __init__(self, container: InjectQ):
        self.container = container
        self.metrics = {
            "resources_created": 0,
            "resources_disposed": 0,
            "active_resources": 0,
            "resource_errors": 0,
            "avg_lifetime": 0
        }
        self.resource_lifetimes = []

    def track_resource_creation(self, resource):
        self.metrics["resources_created"] += 1
        self.metrics["active_resources"] += 1
        resource._creation_time = time.time()

    def track_resource_disposal(self, resource):
        self.metrics["resources_disposed"] += 1
        self.metrics["active_resources"] -= 1

        if hasattr(resource, '_creation_time'):
            lifetime = time.time() - resource._creation_time
            self.resource_lifetimes.append(lifetime)
            self.metrics["avg_lifetime"] = sum(self.resource_lifetimes) / len(self.resource_lifetimes)

    def track_resource_error(self, error):
        self.metrics["resource_errors"] += 1

    def get_report(self):
        return {
            **self.metrics,
            "total_lifetimes_tracked": len(self.resource_lifetimes),
            "median_lifetime": sorted(self.resource_lifetimes)[len(self.resource_lifetimes)//2] if self.resource_lifetimes else 0
        }

# Usage
monitor = ResourceMonitor(container)

# Integrate with resource lifecycle
@resource
class MonitoredResource:
    def __init__(self, monitor: ResourceMonitor):
        self.monitor = monitor
        self.monitor.track_resource_creation(self)

    async def dispose(self):
        try:
            await self._cleanup()
        except Exception as e:
            self.monitor.track_resource_error(e)
        finally:
            self.monitor.track_resource_disposal(self)

Health Checks

class ResourceHealthChecker:
    """Check health of resources."""

    def __init__(self, container: InjectQ):
        self.container = container

    async def check_all_resources(self):
        """Check health of all resources."""
        results = {}

        # Get all resources (this would require container introspection)
        resources = self.container.get_all_resources()

        for resource_type, resource in resources.items():
            try:
                health = await self.check_resource_health(resource)
                results[resource_type.__name__] = health
            except Exception as e:
                results[resource_type.__name__] = {
                    "status": "error",
                    "error": str(e)
                }

        return results

    async def check_resource_health(self, resource):
        """Check health of a single resource."""
        if hasattr(resource, 'health_check'):
            # Resource has built-in health check
            return await resource.health_check()
        elif hasattr(resource, 'ping'):
            # Simple ping check
            await resource.ping()
            return {"status": "healthy"}
        else:
            # Basic check - try to use the resource
            try:
                # This is resource-type specific
                if hasattr(resource, 'execute'):
                    await resource.execute("SELECT 1")
                elif hasattr(resource, 'get'):
                    await resource.get("health_check_key")
                else:
                    # Unknown resource type
                    return {"status": "unknown", "message": "No health check available"}

                return {"status": "healthy"}
            except Exception as e:
                return {"status": "unhealthy", "error": str(e)}

# Usage
health_checker = ResourceHealthChecker(container)
health_status = await health_checker.check_all_resources()

for resource_name, status in health_status.items():
    if status["status"] != "healthy":
        print(f"Resource {resource_name} is unhealthy: {status}")

🎯 Summary

Resource management provides automatic lifecycle management:

  • Resource lifecycle - Automatic initialization and cleanup
  • Resource scopes - Context managers for proper resource handling
  • Resource types - Connection pools, file handles, network connections
  • Resource patterns - Factories, decorators, and pools
  • Resource monitoring - Usage metrics and health checks

Key features: - Automatic resource initialization when first accessed - Automatic cleanup when exiting resource scopes - Support for async and sync resource management - Resource pooling for efficient resource usage - Monitoring and health checking capabilities

Best practices: - Use resource scopes for automatic cleanup - Handle errors gracefully in resource methods - Monitor resource usage and performance - Implement proper health checks - Avoid manual resource management - Set appropriate resource limits

Common patterns: - Database connection pools - HTTP client pools - File handle management - Resource factories and decorators - Resource monitoring and health checks

Ready to explore diagnostics?