Skip to content

Async Support

InjectQ provides comprehensive support for asynchronous programming patterns, enabling dependency injection in async/await applications.

Async Service Resolution

Basic Async Resolution

import asyncio
from injectq import Container, inject

# Async service
class AsyncUserService:
    def __init__(self, repository: UserRepository):
        self.repository = repository

    async def get_user(self, user_id: int) -> User:
        # Simulate async operation
        await asyncio.sleep(0.1)
        return await self.repository.get_user_async(user_id)

    async def create_user(self, email: str) -> User:
        await asyncio.sleep(0.1)
        return await self.repository.create_user_async(email)

# Register services
container = Container()
container.register(UserRepository, AsyncUserRepository)
container.register(AsyncUserService, AsyncUserService)

# Async injection
@inject
async def handle_request(user_service: AsyncUserService) -> dict:
    user = await user_service.get_user(1)
    return {"user": user.email}

# Usage
async def main():
    result = await container.resolve(handle_request)
    print(result)

asyncio.run(main())

Async Context Managers

from contextlib import asynccontextmanager
from typing import AsyncGenerator

class AsyncDatabaseConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None

    async def connect(self):
        """Establish database connection."""
        # Simulate async connection
        await asyncio.sleep(0.1)
        self.connection = f"Connected to {self.connection_string}"
        print(f"Database connected: {self.connection}")

    async def disconnect(self):
        """Close database connection."""
        if self.connection:
            await asyncio.sleep(0.05)
            print(f"Database disconnected: {self.connection}")
            self.connection = None

    async def __aenter__(self):
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.disconnect()

# Async scope with context manager
@asynccontextmanager
async def async_request_scope(container: Container) -> AsyncGenerator[Container, None]:
    """Create async request scope."""
    scope = container.create_scope()
    try:
        # Setup async resources
        db_connection = scope.resolve(AsyncDatabaseConnection)
        await db_connection.connect()

        yield scope
    finally:
        # Cleanup async resources
        if hasattr(scope, '_instances'):
            for instance in scope._instances.values():
                if hasattr(instance, '__aexit__'):
                    await instance.__aexit__(None, None, None)
                elif hasattr(instance, 'disconnect'):
                    await instance.disconnect()

        scope.dispose()

# Usage
async def process_request():
    async with async_request_scope(container) as scope:
        user_service = scope.resolve(AsyncUserService)
        result = await user_service.get_user(1)
        return result

Async Factory Functions

Async Service Factories

from typing import Awaitable, Callable

class AsyncServiceFactory:
    """Factory for creating async services."""

    def __init__(self, factory_func: Callable[..., Awaitable[Any]]):
        self.factory_func = factory_func
        self._cached_instance = None

    async def create(self, *args, **kwargs) -> Any:
        """Create service instance asynchronously."""
        return await self.factory_func(*args, **kwargs)

    async def create_singleton(self, *args, **kwargs) -> Any:
        """Create singleton instance asynchronously."""
        if self._cached_instance is None:
            self._cached_instance = await self.factory_func(*args, **kwargs)
        return self._cached_instance

# Async factory function
async def create_email_service(config: EmailConfig) -> EmailService:
    """Async factory for email service."""
    service = EmailService(config)

    # Async initialization
    await service.initialize()
    await service.test_connection()

    return service

# Register async factory
async_factory = AsyncServiceFactory(create_email_service)
container.register(EmailService, async_factory.create_singleton)

# Alternative: Direct async factory registration
container.register_async_factory(EmailService, create_email_service)

Async Lazy Services

class AsyncLazyService:
    """Lazy-loaded async service."""

    def __init__(self, factory: Callable[[], Awaitable[Any]]):
        self._factory = factory
        self._instance = None
        self._initialized = False
        self._lock = asyncio.Lock()

    async def get_instance(self) -> Any:
        """Get or create service instance."""
        if not self._initialized:
            async with self._lock:
                if not self._initialized:
                    self._instance = await self._factory()
                    self._initialized = True

        return self._instance

    async def __call__(self, *args, **kwargs):
        """Make service callable."""
        instance = await self.get_instance()
        if asyncio.iscoroutinefunction(instance):
            return await instance(*args, **kwargs)
        else:
            return instance(*args, **kwargs)

# Register lazy async service
async def create_heavy_service() -> HeavyService:
    """Create expensive service asynchronously."""
    service = HeavyService()
    await service.load_heavy_data()
    return service

lazy_service = AsyncLazyService(create_heavy_service)
container.register(HeavyService, lazy_service.get_instance)

Async Scopes

Request-Scoped Async Services

import asyncio
from contextvars import ContextVar
from typing import Dict, Any

# Context variable for request scope
_request_scope: ContextVar[Dict[str, Any]] = ContextVar('request_scope', default={})

class AsyncRequestScope:
    """Async request scope implementation."""

    def __init__(self, container: Container):
        self.container = container
        self._instances: Dict[type, Any] = {}
        self._disposal_tasks: List[Callable[[], Awaitable[None]]] = []

    async def resolve(self, service_type: type) -> Any:
        """Resolve service in async request scope."""
        if service_type not in self._instances:
            # Check if service is async
            if self._is_async_service(service_type):
                instance = await self._create_async_instance(service_type)
            else:
                instance = self.container.resolve(service_type)

            self._instances[service_type] = instance

            # Register disposal if needed
            if hasattr(instance, '__aexit__'):
                self._disposal_tasks.append(
                    lambda: instance.__aexit__(None, None, None)
                )

        return self._instances[service_type]

    async def _create_async_instance(self, service_type: type) -> Any:
        """Create async service instance."""
        # Get constructor dependencies
        dependencies = self._get_dependencies(service_type)
        resolved_deps = {}

        for name, dep_type in dependencies.items():
            resolved_deps[name] = await self.resolve(dep_type)

        # Create instance
        instance = service_type(**resolved_deps)

        # Initialize if async
        if hasattr(instance, '__aenter__'):
            await instance.__aenter__()

        return instance

    def _is_async_service(self, service_type: type) -> bool:
        """Check if service requires async initialization."""
        return (
            hasattr(service_type, '__aenter__') or
            hasattr(service_type, 'async_init') or
            any(asyncio.iscoroutinefunction(getattr(service_type, method, None))
                for method in ['__init__', 'initialize'])
        )

    def _get_dependencies(self, service_type: type) -> Dict[str, type]:
        """Get service dependencies."""
        import inspect
        dependencies = {}

        if hasattr(service_type, '__init__'):
            sig = inspect.signature(service_type.__init__)
            for param_name, param in sig.parameters.items():
                if param_name != 'self' and param.annotation != inspect.Parameter.empty:
                    dependencies[param_name] = param.annotation

        return dependencies

    async def dispose(self):
        """Dispose all async resources."""
        # Run disposal tasks
        if self._disposal_tasks:
            await asyncio.gather(*[task() for task in self._disposal_tasks])

        # Dispose instances
        for instance in self._instances.values():
            if hasattr(instance, 'dispose') and asyncio.iscoroutinefunction(instance.dispose):
                await instance.dispose()

        self._instances.clear()
        self._disposal_tasks.clear()

# Async scope decorator
def async_request_scoped(func):
    """Decorator for async request-scoped functions."""
    async def wrapper(*args, **kwargs):
        # Get or create request scope
        scope_data = _request_scope.get()

        if 'async_scope' not in scope_data:
            container = kwargs.get('container') or get_current_container()
            scope_data['async_scope'] = AsyncRequestScope(container)
            _request_scope.set(scope_data)

        scope = scope_data['async_scope']

        try:
            # Inject dependencies
            resolved_func = await container.resolve_async(func)
            return await resolved_func(*args, **kwargs)
        finally:
            # Cleanup in outermost scope
            if len(asyncio.current_task().get_stack()) == 1:
                await scope.dispose()
                scope_data.pop('async_scope', None)

    return wrapper

Async Middleware

Async Dependency Middleware

class AsyncDependencyMiddleware:
    """Middleware for async dependency injection."""

    def __init__(self, container: Container):
        self.container = container

    async def __call__(self, request, response, next_middleware):
        """Process request with async dependency injection."""
        # Create async scope for request
        async with AsyncRequestScope(self.container) as scope:
            # Store scope in request context
            request.scope = scope

            # Process request
            return await next_middleware(request, response)

# FastAPI async middleware
from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware

class FastAPIAsyncDIMiddleware(BaseHTTPMiddleware):
    """FastAPI async dependency injection middleware."""

    def __init__(self, app: FastAPI, container: Container):
        super().__init__(app)
        self.container = container

    async def dispatch(self, request: Request, call_next):
        """Process request with async DI."""
        async with AsyncRequestScope(self.container) as scope:
            # Store scope in request state
            request.state.di_scope = scope

            # Process request
            response = await call_next(request)
            return response

# Register middleware
app = FastAPI()
app.add_middleware(FastAPIAsyncDIMiddleware, container=container)

# Async endpoint with DI
@app.get("/users/{user_id}")
async def get_user(user_id: int, request: Request):
    # Get service from async scope
    scope = request.state.di_scope
    user_service = await scope.resolve(AsyncUserService)

    user = await user_service.get_user(user_id)
    return {"user": user.email}

Async Patterns

Producer-Consumer Pattern

import asyncio
from asyncio import Queue
from typing import List

class AsyncEventProducer:
    """Async event producer service."""

    def __init__(self, event_queue: Queue):
        self.event_queue = event_queue

    async def produce_event(self, event_data: dict):
        """Produce an event asynchronously."""
        await self.event_queue.put(event_data)
        print(f"Produced event: {event_data}")

class AsyncEventConsumer:
    """Async event consumer service."""

    def __init__(self, event_queue: Queue, processor: EventProcessor):
        self.event_queue = event_queue
        self.processor = processor
        self._running = False

    async def start_consuming(self):
        """Start consuming events."""
        self._running = True
        while self._running:
            try:
                # Wait for event with timeout
                event = await asyncio.wait_for(
                    self.event_queue.get(),
                    timeout=1.0
                )

                # Process event
                await self.processor.process_event(event)
                self.event_queue.task_done()

            except asyncio.TimeoutError:
                continue  # Continue waiting for events

    def stop_consuming(self):
        """Stop consuming events."""
        self._running = False

# Async service registration
async def create_event_queue() -> Queue:
    """Create async event queue."""
    return Queue(maxsize=100)

container.register_async_factory(Queue, create_event_queue)
container.register(AsyncEventProducer, AsyncEventProducer)
container.register(AsyncEventConsumer, AsyncEventConsumer)

# Usage
@inject
async def run_event_system(producer: AsyncEventProducer, consumer: AsyncEventConsumer):
    """Run async event system."""
    # Start consumer
    consumer_task = asyncio.create_task(consumer.start_consuming())

    # Produce events
    for i in range(10):
        await producer.produce_event({"id": i, "data": f"event_{i}"})
        await asyncio.sleep(0.1)

    # Wait for processing to complete
    await asyncio.sleep(1)
    consumer.stop_consuming()

    # Cancel consumer task
    consumer_task.cancel()
    try:
        await consumer_task
    except asyncio.CancelledError:
        pass

# Run the system
asyncio.run(container.resolve(run_event_system))

Async Background Tasks

class AsyncBackgroundTaskManager:
    """Manages async background tasks."""

    def __init__(self):
        self._tasks: List[asyncio.Task] = []
        self._running = True

    async def start_task(self, coro):
        """Start a background task."""
        task = asyncio.create_task(coro)
        self._tasks.append(task)
        return task

    async def stop_all_tasks(self):
        """Stop all background tasks."""
        self._running = False

        # Cancel all tasks
        for task in self._tasks:
            task.cancel()

        # Wait for cancellation
        if self._tasks:
            await asyncio.gather(*self._tasks, return_exceptions=True)

        self._tasks.clear()

class AsyncEmailService:
    """Async email service with background processing."""

    def __init__(self, task_manager: AsyncBackgroundTaskManager):
        self.task_manager = task_manager
        self._email_queue = Queue()

    async def send_email_async(self, to: str, subject: str, body: str):
        """Send email asynchronously in background."""
        email_data = {
            "to": to,
            "subject": subject,
            "body": body
        }

        await self._email_queue.put(email_data)

    async def _process_emails(self):
        """Background email processing."""
        while True:
            try:
                email_data = await self._email_queue.get()

                # Simulate email sending
                await asyncio.sleep(0.5)
                print(f"Email sent to {email_data['to']}: {email_data['subject']}")

                self._email_queue.task_done()

            except asyncio.CancelledError:
                break

    async def start_background_processing(self):
        """Start background email processing."""
        await self.task_manager.start_task(self._process_emails())

# Register async services
container.register(AsyncBackgroundTaskManager, AsyncBackgroundTaskManager)
container.register(AsyncEmailService, AsyncEmailService)

# Auto-start background processing
@inject
async def start_email_service(email_service: AsyncEmailService):
    """Start email service with background processing."""
    await email_service.start_background_processing()
    return email_service

container.register_factory(AsyncEmailService, start_email_service)

Error Handling in Async Context

Async Exception Handling

class AsyncExceptionHandler:
    """Handles exceptions in async dependency injection."""

    def __init__(self):
        self._handlers = {}

    def register_handler(self, exception_type: type, handler):
        """Register async exception handler."""
        self._handlers[exception_type] = handler

    async def handle_exception(self, exception: Exception) -> bool:
        """Handle exception asynchronously."""
        for exc_type, handler in self._handlers.items():
            if isinstance(exception, exc_type):
                if asyncio.iscoroutinefunction(handler):
                    await handler(exception)
                else:
                    handler(exception)
                return True

        return False

# Async error recovery
async def async_service_with_fallback(container: Container, service_type: type, fallback_factory):
    """Resolve service with async fallback."""
    try:
        return await container.resolve_async(service_type)
    except Exception as e:
        print(f"Service resolution failed: {e}")

        # Try fallback
        if asyncio.iscoroutinefunction(fallback_factory):
            return await fallback_factory()
        else:
            return fallback_factory()

# Usage
async def fallback_user_service():
    """Fallback user service factory."""
    return MockUserService()

@inject
async def get_user_with_fallback(user_id: int) -> dict:
    """Get user with fallback service."""
    user_service = await async_service_with_fallback(
        container,
        UserService,
        fallback_user_service
    )

    user = await user_service.get_user(user_id)
    return {"user": user.email}

Best Practices for Async DI

Performance Optimization

class AsyncServicePool:
    """Pool of async service instances."""

    def __init__(self, factory_func, pool_size: int = 10):
        self.factory_func = factory_func
        self.pool_size = pool_size
        self._pool = Queue(maxsize=pool_size)
        self._initialized = False

    async def initialize(self):
        """Initialize service pool."""
        if not self._initialized:
            for _ in range(self.pool_size):
                instance = await self.factory_func()
                await self._pool.put(instance)
            self._initialized = True

    async def get_service(self):
        """Get service from pool."""
        if not self._initialized:
            await self.initialize()

        return await self._pool.get()

    async def return_service(self, service):
        """Return service to pool."""
        await self._pool.put(service)

# Connection pooling
async def create_db_connection():
    """Create database connection."""
    connection = DatabaseConnection()
    await connection.connect()
    return connection

db_pool = AsyncServicePool(create_db_connection, pool_size=5)

# Use pooled services
@inject
async def process_data(data: dict):
    """Process data with pooled database connection."""
    db_connection = await db_pool.get_service()

    try:
        # Use connection
        result = await db_connection.execute_query(data['query'])
        return result
    finally:
        # Return to pool
        await db_pool.return_service(db_connection)

Resource Management

class AsyncResourceManager:
    """Manages async resources with proper cleanup."""

    def __init__(self):
        self._resources: List[Any] = []
        self._cleanup_tasks: List[Callable] = []

    def register_resource(self, resource, cleanup_func=None):
        """Register resource for cleanup."""
        self._resources.append(resource)

        if cleanup_func:
            self._cleanup_tasks.append(cleanup_func)
        elif hasattr(resource, '__aexit__'):
            self._cleanup_tasks.append(
                lambda: resource.__aexit__(None, None, None)
            )
        elif hasattr(resource, 'close') and asyncio.iscoroutinefunction(resource.close):
            self._cleanup_tasks.append(resource.close)

    async def cleanup_all(self):
        """Cleanup all registered resources."""
        cleanup_results = await asyncio.gather(
            *[task() for task in self._cleanup_tasks],
            return_exceptions=True
        )

        # Log any cleanup errors
        for i, result in enumerate(cleanup_results):
            if isinstance(result, Exception):
                print(f"Cleanup error for resource {i}: {result}")

        self._resources.clear()
        self._cleanup_tasks.clear()

# Register resource manager
container.register(AsyncResourceManager, AsyncResourceManager, scope="singleton")

# Auto-cleanup on container disposal
@inject
async def setup_container_cleanup(resource_manager: AsyncResourceManager):
    """Setup automatic resource cleanup."""

    async def cleanup_handler():
        await resource_manager.cleanup_all()

    # Register cleanup with container
    container.register_disposal_handler(cleanup_handler)

This comprehensive async support documentation covers all aspects of using InjectQ with asynchronous Python applications, from basic async service resolution to advanced patterns like connection pooling and resource management.