Skip to content

Taskiq Integration

Taskiq integration enables dependency injection for background tasks and workers, providing automatic service resolution with proper task scoping and lifecycle management.

๐ŸŽฏ Getting Started

Basic Setup

from taskiq import TaskiqScheduler
from injectq import InjectQ
from injectq.integrations.taskiq import setup_taskiq_integration, InjectQDependency

# 1. Create container and bind services
container = InjectQ()
container.bind(IEmailService, EmailService())
container.bind(IUserService, UserService())
container.bind(INotificationService, NotificationService())

# 2. Create Taskiq scheduler
scheduler = TaskiqScheduler()

# 3. Set up integration
setup_taskiq_integration(scheduler, container)

# 4. Use dependency injection in tasks
@scheduler.task
async def send_welcome_email(
    user_id: int,
    email_service: IEmailService = InjectQDependency(IEmailService),
    user_service: IUserService = InjectQDependency(IUserService)
):
    user = user_service.get_user(user_id)
    await email_service.send_welcome_email(user.email)

@scheduler.task
async def process_order(
    order_id: int,
    notification_svc: INotificationService = InjectQDependency(INotificationService)
):
    # Process order logic
    await notification_svc.send_order_confirmation(order_id)

# 5. Schedule tasks
await scheduler.schedule_task(send_welcome_email, user_id=123)
await scheduler.schedule_task(process_order, order_id=456)

Service Definitions

from typing import Protocol

# Define service interfaces
class IEmailService(Protocol):
    async def send_welcome_email(self, email: str) -> None: ...
    async def send_order_confirmation(self, email: str, order_id: int) -> None: ...

class IUserService(Protocol):
    def get_user(self, user_id: int) -> User: ...
    def update_user_status(self, user_id: int, status: str) -> None: ...

class INotificationService(Protocol):
    async def send_order_confirmation(self, order_id: int) -> None: ...
    async def send_payment_failed(self, user_id: int) -> None: ...

# Implement services
class EmailService:
    def __init__(self, smtp_config: SMTPConfig):
        self.smtp_config = smtp_config

    async def send_welcome_email(self, email: str) -> None:
        # Send welcome email logic
        print(f"Sending welcome email to {email}")

    async def send_order_confirmation(self, email: str, order_id: int) -> None:
        # Send order confirmation logic
        print(f"Sending order confirmation to {email} for order {order_id}")

class UserService:
    def __init__(self, db: IDatabaseConnection):
        self.db = db

    def get_user(self, user_id: int) -> User:
        return self.db.query(User).filter(id=user_id).first()

    def update_user_status(self, user_id: int, status: str) -> None:
        user = self.get_user(user_id)
        user.status = status
        self.db.commit()

class NotificationService:
    def __init__(self, email_svc: IEmailService, user_svc: IUserService):
        self.email_svc = email_svc
        self.user_svc = user_svc

    async def send_order_confirmation(self, order_id: int) -> None:
        # Get order and user
        order = self.db.get_order(order_id)
        user = self.user_svc.get_user(order.user_id)

        # Send notification
        await self.email_svc.send_order_confirmation(user.email, order_id)

    async def send_payment_failed(self, user_id: int) -> None:
        user = self.user_svc.get_user(user_id)
        await self.email_svc.send_payment_failed(user.email)

๐Ÿ”ง Advanced Configuration

Task-Scoped Services

from injectq import scoped

@scoped
class TaskContext:
    def __init__(self):
        self.task_id = str(uuid.uuid4())
        self.start_time = time.time()
        self.metadata = {}

    def set_metadata(self, key: str, value: Any):
        self.metadata[key] = value

    def get_duration(self) -> float:
        return time.time() - self.start_time

@scoped
class TaskMetrics:
    def __init__(self):
        self.operations = []
        self.errors = []

    def record_operation(self, operation: str, duration: float):
        self.operations.append({
            "operation": operation,
            "duration": duration,
            "timestamp": time.time()
        })

    def record_error(self, error: str):
        self.errors.append({
            "error": error,
            "timestamp": time.time()
        })

# Use in tasks
@scheduler.task
async def complex_task(
    data: dict,
    ctx: TaskContext = InjectQDependency(TaskContext),
    metrics: TaskMetrics = InjectQDependency(TaskMetrics),
    processor: IDataProcessor = InjectQDependency(IDataProcessor)
):
    ctx.set_metadata("input_size", len(data))

    try:
        # Process data with metrics
        start_time = time.time()
        result = await processor.process_data(data)
        duration = time.time() - start_time

        metrics.record_operation("process_data", duration)

        return result

    except Exception as e:
        metrics.record_error(str(e))
        raise

Module-Based Setup

from injectq import Module

class TaskModule(Module):
    def configure(self, binder):
        # Task-specific services
        binder.bind(IEmailService, EmailService())
        binder.bind(IUserService, UserService())
        binder.bind(INotificationService, NotificationService())

        # Task context services
        binder.bind(TaskContext, TaskContext())
        binder.bind(TaskMetrics, TaskMetrics())

        # Data processors
        binder.bind(IDataProcessor, DataProcessor())

class InfrastructureModule(Module):
    def configure(self, binder):
        # Database and external services
        binder.bind(IDatabaseConnection, PostgresConnection())
        binder.bind(SMTPConfig, SMTPConfig.from_env())

def create_taskiq_scheduler() -> TaskiqScheduler:
    # Create container with modules
    container = InjectQ()
    container.install(InfrastructureModule())
    container.install(TaskModule())

    # Create scheduler
    scheduler = TaskiqScheduler()

    # Set up integration
    setup_taskiq_integration(scheduler, container)

    return scheduler

# Usage
scheduler = create_taskiq_scheduler()

๐ŸŽจ Task Patterns

Background Email Tasks

@scheduler.task
async def send_bulk_emails(
    user_ids: List[int],
    template: str,
    email_service: IEmailService = InjectQDependency(IEmailService),
    user_service: IUserService = InjectQDependency(IUserService)
):
    """Send emails to multiple users."""
    for user_id in user_ids:
        user = user_service.get_user(user_id)
        await email_service.send_template_email(
            user.email,
            template,
            {"name": user.name}
        )

@scheduler.task
async def send_reminder_emails(
    reminder_type: str,
    email_service: IEmailService = InjectQDependency(IEmailService),
    user_service: IUserService = InjectQDependency(IUserService)
):
    """Send reminder emails based on type."""
    users = user_service.get_users_due_for_reminder(reminder_type)

    for user in users:
        await email_service.send_reminder_email(
            user.email,
            reminder_type
        )

# Schedule recurring tasks
await scheduler.schedule_task(
    send_reminder_emails,
    reminder_type="payment_due",
    cron="0 9 * * *"  # Daily at 9 AM
)

Data Processing Tasks

@scheduler.task
async def process_user_data(
    user_id: int,
    data_type: str,
    processor: IDataProcessor = InjectQDependency(IDataProcessor),
    storage: IDataStorage = InjectQDependency(IDataStorage),
    metrics: TaskMetrics = InjectQDependency(TaskMetrics)
):
    """Process user data in background."""
    try:
        # Get user data
        raw_data = await storage.get_user_data(user_id, data_type)

        # Process data
        start_time = time.time()
        processed_data = await processor.process_user_data(raw_data)
        processing_time = time.time() - start_time

        metrics.record_operation("process_user_data", processing_time)

        # Store processed data
        await storage.store_processed_data(user_id, data_type, processed_data)

    except Exception as e:
        metrics.record_error(f"Failed to process user data: {e}")
        raise

@scheduler.task
async def cleanup_old_data(
    days_old: int = 30,
    storage: IDataStorage = InjectQDependency(IDataStorage)
):
    """Clean up old processed data."""
    cutoff_date = datetime.now() - timedelta(days=days_old)
    deleted_count = await storage.cleanup_old_data(cutoff_date)

    print(f"Cleaned up {deleted_count} old data records")

Notification Tasks

@scheduler.task
async def send_order_notifications(
    order_id: int,
    notification_svc: INotificationService = InjectQDependency(INotificationService),
    user_svc: IUserService = InjectQDependency(IUserService)
):
    """Send notifications for order events."""
    order = user_svc.get_order(order_id)

    # Send to customer
    await notification_svc.send_order_confirmation(order_id)

    # Send to admin if high value
    if order.total > 1000:
        await notification_svc.send_high_value_order_alert(order_id)

@scheduler.task
async def send_payment_reminders(
    user_id: int,
    amount: float,
    due_date: str,
    notification_svc: INotificationService = InjectQDependency(INotificationService)
):
    """Send payment reminder notifications."""
    await notification_svc.send_payment_reminder(user_id, amount, due_date)

# Chain tasks together
@scheduler.task
async def process_payment_and_notify(
    payment_data: dict,
    payment_svc: IPaymentService = InjectQDependency(IPaymentService),
    notification_svc: INotificationService = InjectQDependency(INotificationService)
):
    """Process payment and send notifications."""
    # Process payment
    result = await payment_svc.process_payment(payment_data)

    if result.success:
        # Send success notification
        await notification_svc.send_payment_success(
            result.user_id,
            result.amount
        )
    else:
        # Send failure notification
        await notification_svc.send_payment_failed(result.user_id)

    return result

๐Ÿงช Testing Taskiq Integration

Unit Testing Tasks

import pytest
from injectq.integrations.taskiq import setup_taskiq_integration

@pytest.fixture
def test_scheduler():
    # Create test container
    container = InjectQ()
    container.bind(IEmailService, MockEmailService())
    container.bind(IUserService, MockUserService())

    # Create test scheduler
    scheduler = TaskiqScheduler()
    setup_taskiq_integration(scheduler, container)

    return scheduler

def test_send_welcome_email_task(test_scheduler):
    # Define test task
    @test_scheduler.task
    async def send_welcome_email(
        user_id: int,
        email_service: IEmailService = InjectQDependency(IEmailService),
        user_service: IUserService = InjectQDependency(IUserService)
    ):
        user = user_service.get_user(user_id)
        await email_service.send_welcome_email(user.email)
        return {"email": user.email}

    # Execute task
    result = await test_scheduler.execute_task(
        send_welcome_email,
        user_id=123
    )

    # Verify result
    assert result["email"] == "user123@example.com"

    # Verify mocks were called
    email_service = test_scheduler.container.get(IEmailService)
    user_service = test_scheduler.container.get(IUserService)

    assert email_service.send_welcome_email_called
    assert user_service.get_user_called

def test_task_scoping(test_scheduler):
    # Define task with scoped service
    @test_scheduler.task
    async def scoped_task(
        data: str,
        ctx: TaskContext = InjectQDependency(TaskContext)
    ):
        ctx.set_metadata("input", data)
        return ctx.metadata

    # Execute multiple tasks
    result1 = await test_scheduler.execute_task(scoped_task, data="test1")
    result2 = await test_scheduler.execute_task(scoped_task, data="test2")

    # Each task should have its own context
    assert result1["input"] == "test1"
    assert result2["input"] == "test2"

Integration Testing

@pytest.fixture
def integration_scheduler():
    # Real container with test database
    container = InjectQ()
    container.install(TestDatabaseModule())
    container.install(EmailModule())
    container.install(TaskModule())

    scheduler = TaskiqScheduler()
    setup_taskiq_integration(scheduler, container)

    return scheduler

def test_email_task_integration(integration_scheduler):
    # Define integration task
    @integration_scheduler.task
    async def send_user_notification(
        user_id: int,
        message: str,
        email_service: IEmailService = InjectQDependency(IEmailService),
        user_service: IUserService = InjectQDependency(IUserService)
    ):
        user = user_service.get_user(user_id)
        await email_service.send_notification(user.email, message)
        return {"sent_to": user.email}

    # Execute task
    result = await integration_scheduler.execute_task(
        send_user_notification,
        user_id=123,
        message="Welcome to our platform!"
    )

    # Verify result
    assert "sent_to" in result
    assert result["sent_to"].endswith("@example.com")

def test_task_error_handling(integration_scheduler):
    # Define task that might fail
    @integration_scheduler.task
    async def risky_task(
        user_id: int,
        user_service: IUserService = InjectQDependency(IUserService)
    ):
        user = user_service.get_user(user_id)
        if user.status == "inactive":
            raise ValueError("Cannot process inactive user")
        return {"processed": user.id}

    # Test successful case
    result = await integration_scheduler.execute_task(risky_task, user_id=123)
    assert result["processed"] == 123

    # Test error case
    with pytest.raises(ValueError, match="Cannot process inactive user"):
        await integration_scheduler.execute_task(risky_task, user_id=456)

Mock Testing

class MockEmailService:
    def __init__(self):
        self.sent_emails = []

    async def send_welcome_email(self, email: str):
        self.sent_emails.append({
            "type": "welcome",
            "email": email,
            "timestamp": time.time()
        })

    async def send_notification(self, email: str, message: str):
        self.sent_emails.append({
            "type": "notification",
            "email": email,
            "message": message,
            "timestamp": time.time()
        })

class MockUserService:
    def __init__(self):
        self.users = {
            123: User(id=123, email="user123@example.com", status="active"),
            456: User(id=456, email="user456@example.com", status="inactive")
        }

    def get_user(self, user_id: int) -> User:
        return self.users.get(user_id)

def test_with_mocks():
    container = InjectQ()
    mock_email = MockEmailService()
    mock_user = MockUserService()

    container.bind(IEmailService, mock_email)
    container.bind(IUserService, mock_user)

    scheduler = TaskiqScheduler()
    setup_taskiq_integration(scheduler, container)

    @scheduler.task
    async def test_task(
        user_id: int,
        email_service: IEmailService = InjectQDependency(IEmailService),
        user_service: IUserService = InjectQDependency(IUserService)
    ):
        user = user_service.get_user(user_id)
        await email_service.send_welcome_email(user.email)
        return len(mock_email.sent_emails)

    # Execute task
    result = await scheduler.execute_task(test_task, user_id=123)

    # Verify mock interactions
    assert result == 1
    assert len(mock_email.sent_emails) == 1
    assert mock_email.sent_emails[0]["email"] == "user123@example.com"

๐Ÿšจ Common Patterns and Pitfalls

โœ… Good Patterns

1. Proper Task Scoping

# โœ… Good: Use scoped for task-specific data
@scoped
class TaskProgress:
    def __init__(self):
        self.steps = []
        self.current_step = 0

    def record_step(self, step_name: str):
        self.steps.append({
            "name": step_name,
            "timestamp": time.time()
        })
        self.current_step += 1

# โœ… Good: Use singleton for shared resources
@singleton
class DatabasePool:
    def __init__(self):
        self.pool = create_database_pool()

# โœ… Good: Use transient for stateless operations
@transient
class DataValidator:
    def validate(self, data: dict) -> bool:
        return validate_schema(data)

2. Error Handling

# โœ… Good: Handle task errors gracefully
@scheduler.task
async def process_with_error_handling(
    data: dict,
    processor: IDataProcessor = InjectQDependency(IDataProcessor),
    logger: ILogger = InjectQDependency(ILogger)
):
    try:
        result = await processor.process_data(data)
        return result
    except ValidationError as e:
        logger.error(f"Validation failed: {e}")
        # Retry logic or dead letter queue
        await handle_validation_error(data, e)
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        # Alert system or manual intervention
        await alert_system(f"Task failed: {e}")
        raise

3. Task Dependencies

# โœ… Good: Chain related tasks
@scheduler.task
async def process_order(
    order_id: int,
    order_svc: IOrderService = InjectQDependency(IOrderService)
):
    order = await order_svc.process_order(order_id)
    return order

@scheduler.task
async def notify_order_processed(
    order_id: int,
    notification_svc: INotificationService = InjectQDependency(INotificationService)
):
    await notification_svc.send_order_processed_notification(order_id)

# Chain tasks
order_result = await scheduler.execute_task(process_order, order_id=123)
await scheduler.execute_task(notify_order_processed, order_id=123)

โŒ Bad Patterns

1. Manual Container Access

# โŒ Bad: Manual container access in tasks
container = InjectQ()  # Global container

@scheduler.task
async def manual_task(user_id: int):
    user_service = container.get(IUserService)  # Manual resolution
    return user_service.get_user(user_id)

# โœ… Good: Use dependency injection
@scheduler.task
async def injected_task(
    user_id: int,
    user_service: IUserService = InjectQDependency(IUserService)
):
    return user_service.get_user(user_id)

2. Singleton Abuse

# โŒ Bad: Singleton for task-specific state
@singleton
class TaskState:
    def __init__(self):
        self.current_task_data = None  # Shared across tasks!

    def set_task_data(self, data):
        self.current_task_data = data  # Overwrites other tasks!

# โŒ Bad: Singleton for mutable task data
@singleton
class TaskMetrics:
    def __init__(self):
        self.task_count = 0  # Accumulates across all tasks

    def increment_task_count(self):
        self.task_count += 1  # Not task-specific

# โœ… Good: Scoped for task-specific data
@scoped
class TaskState:
    def __init__(self):
        self.task_data = None

@scoped
class TaskMetrics:
    def __init__(self):
        self.operations = []

3. Heavy Operations in Tasks

# โŒ Bad: Heavy initialization per task
@scheduler.task
async def heavy_task(data: dict):
    # Load model on every task execution
    model = await load_ml_model()  # 2GB model!
    result = model.predict(data)
    return result

# โœ… Good: Pre-load heavy resources
@singleton
class MLModelService:
    def __init__(self):
        self.model = None

    async def initialize(self):
        if self.model is None:
            self.model = await load_ml_model()

    async def predict(self, data: dict):
        await self.initialize()
        return self.model.predict(data)

@scheduler.task
async def light_task(
    data: dict,
    ml_service: MLModelService = InjectQDependency(MLModelService)
):
    return await ml_service.predict(data)

โšก Advanced Features

Custom Task Middleware

from injectq.integrations.taskiq import TaskiqMiddleware

class MetricsMiddleware(TaskiqMiddleware):
    def __init__(self, metrics_service: IMetricsService):
        self.metrics = metrics_service

    async def before_task(self, task_info):
        # Record task start
        self.metrics.increment("tasks_started")
        task_info.start_time = time.time()

    async def after_task(self, task_info, result):
        # Record task completion
        duration = time.time() - task_info.start_time
        self.metrics.histogram("task_duration", duration)
        self.metrics.increment("tasks_completed")

    async def on_task_error(self, task_info, error):
        # Record task failure
        self.metrics.increment("tasks_failed")
        self.metrics.increment(f"task_error_{type(error).__name__}")

# Use custom middleware
setup_taskiq_integration(
    scheduler,
    container,
    middlewares=[MetricsMiddleware(metrics_service)]
)

Task Result Handling

@scheduler.task
async def process_with_result_handling(
    data: dict,
    processor: IDataProcessor = InjectQDependency(IDataProcessor)
):
    result = await processor.process_data(data)

    # Return structured result
    return {
        "task_id": str(uuid.uuid4()),
        "processed_at": time.time(),
        "input_size": len(data),
        "output_size": len(result),
        "result": result
    }

# Handle task results
async def handle_task_result(task_result):
    if task_result.success:
        # Process successful result
        data = task_result.result
        print(f"Task completed: {data['task_id']}")

        # Store result or trigger next task
        await store_task_result(data)
    else:
        # Handle task failure
        print(f"Task failed: {task_result.error}")

        # Retry logic or error handling
        if task_result.retry_count < 3:
            await scheduler.retry_task(task_result.task_id)
        else:
            await handle_permanent_failure(task_result)

Cron Tasks

@scheduler.task
async def cleanup_expired_sessions(
    session_svc: ISessionService = InjectQDependency(ISessionService)
):
    """Clean up expired user sessions."""
    expired_count = await session_svc.cleanup_expired_sessions()
    print(f"Cleaned up {expired_count} expired sessions")

@scheduler.task
async def generate_daily_reports(
    report_svc: IReportService = InjectQDependency(IReportService)
):
    """Generate daily business reports."""
    await report_svc.generate_daily_report()
    print("Daily report generated")

@scheduler.task
async def send_reminders(
    reminder_svc: IReminderService = InjectQDependency(IReminderService)
):
    """Send scheduled reminders."""
    sent_count = await reminder_svc.send_pending_reminders()
    print(f"Sent {sent_count} reminders")

# Schedule cron tasks
await scheduler.schedule_cron(
    cleanup_expired_sessions,
    cron="0 */6 * * *"  # Every 6 hours
)

await scheduler.schedule_cron(
    generate_daily_reports,
    cron="0 2 * * *"  # Daily at 2 AM
)

await scheduler.schedule_cron(
    send_reminders,
    cron="0 */2 * * *"  # Every 2 hours
)

๐ŸŽฏ Summary

Taskiq integration provides:

  • Automatic dependency injection - No manual container management in tasks
  • Task-scoped services - Proper isolation per background task
  • Type-driven injection - Just add type hints to task parameters
  • Framework lifecycle integration - Automatic cleanup and resource management
  • Testing support - Easy mocking and test isolation

Key features: - Seamless integration with Taskiq's task system - Support for all InjectQ scopes (singleton, scoped, transient) - Task-scoped container access - Custom middleware support - Cron task scheduling - Result handling and error recovery

Best practices: - Use scoped services for task-specific data - Use singleton for shared resources and heavy objects - Use transient for stateless operations - Handle errors gracefully in tasks - Test thoroughly with mocked dependencies - Avoid manual container access in tasks

Ready to explore FastMCP integration?