Skip to content

FastMCP Integration

FastMCP integration enables dependency injection for MCP (Model Context Protocol) servers, providing automatic service resolution with proper request scoping and lifecycle management.

๐ŸŽฏ Getting Started

Basic Setup

from fastmcp import FastMCP
from injectq import InjectQ
from injectq.integrations.fastmcp import setup_fastmcp_integration, InjectQDependency

# 1. Create container and bind services
container = InjectQ()
container.bind(IDocumentService, DocumentService())
container.bind(IUserService, UserService())
container.bind(IAuthService, AuthService())

# 2. Create FastMCP server
mcp = FastMCP("My MCP Server")

# 3. Set up integration
setup_fastmcp_integration(mcp, container)

# 4. Use dependency injection in tools
@mcp.tool()
async def search_documents(
    query: str,
    limit: int = 10,
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    user_service: IUserService = InjectQDependency(IUserService)
):
    """Search documents with user context."""
    # Get current user from context
    user = user_service.get_current_user()

    # Search documents
    results = await doc_service.search_documents(
        query=query,
        user_id=user.id,
        limit=limit
    )

    return results

@mcp.tool()
async def create_document(
    title: str,
    content: str,
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Create a new document."""
    # Verify permissions
    if not auth_service.has_permission("create_document"):
        raise ValueError("Insufficient permissions")

    # Create document
    document = await doc_service.create_document(
        title=title,
        content=content
    )

    return document

# 5. Start the server
if __name__ == "__main__":
    mcp.run()

Service Definitions

from typing import Protocol, List, Optional
from datetime import datetime

# Define service interfaces
class IDocumentService(Protocol):
    async def search_documents(self, query: str, user_id: int, limit: int) -> List[Document]: ...
    async def create_document(self, title: str, content: str) -> Document: ...
    async def get_document(self, doc_id: int) -> Optional[Document]: ...
    async def update_document(self, doc_id: int, title: str, content: str) -> Document: ...
    async def delete_document(self, doc_id: int) -> bool: ...

class IUserService(Protocol):
    def get_current_user(self) -> User: ...
    def get_user(self, user_id: int) -> Optional[User]: ...
    def get_user_permissions(self, user_id: int) -> List[str]: ...

class IAuthService(Protocol):
    def has_permission(self, permission: str) -> bool: ...
    def authenticate_user(self, token: str) -> Optional[User]: ...
    def authorize_action(self, user: User, action: str, resource: str) -> bool: ...

# Data models
class User:
    def __init__(self, id: int, name: str, email: str):
        self.id = id
        self.name = name
        self.email = email

class Document:
    def __init__(self, id: int, title: str, content: str, user_id: int, created_at: datetime):
        self.id = id
        self.title = title
        self.content = content
        self.user_id = user_id
        self.created_at = created_at

# Implement services
class DocumentService:
    def __init__(self, db: IDatabaseConnection):
        self.db = db

    async def search_documents(self, query: str, user_id: int, limit: int) -> List[Document]:
        # Search documents in database
        return await self.db.query(Document).filter(
            Document.user_id == user_id,
            Document.title.contains(query) | Document.content.contains(query)
        ).limit(limit).all()

    async def create_document(self, title: str, content: str) -> Document:
        # Get current user from context
        user_service = self.db.get(IUserService)  # Injected via container
        user = user_service.get_current_user()

        document = Document(
            id=self.db.next_id(),
            title=title,
            content=content,
            user_id=user.id,
            created_at=datetime.now()
        )

        await self.db.save(document)
        return document

    async def get_document(self, doc_id: int) -> Optional[Document]:
        return await self.db.query(Document).filter(Document.id == doc_id).first()

    async def update_document(self, doc_id: int, title: str, content: str) -> Document:
        document = await self.get_document(doc_id)
        if not document:
            raise ValueError(f"Document {doc_id} not found")

        document.title = title
        document.content = content
        await self.db.save(document)
        return document

    async def delete_document(self, doc_id: int) -> bool:
        document = await self.get_document(doc_id)
        if not document:
            return False

        await self.db.delete(document)
        return True

class UserService:
    def __init__(self, db: IDatabaseConnection):
        self.db = db
        self._current_user = None

    def get_current_user(self) -> User:
        if self._current_user is None:
            # Get from request context (set by middleware)
            self._current_user = self._get_user_from_context()
        return self._current_user

    def _get_user_from_context(self) -> User:
        # Implementation depends on your auth system
        # This would typically get user from request context
        pass

    def get_user(self, user_id: int) -> Optional[User]:
        return self.db.query(User).filter(User.id == user_id).first()

    def get_user_permissions(self, user_id: int) -> List[str]:
        # Get user permissions from database
        return self.db.query(Permission).filter(Permission.user_id == user_id).all()

class AuthService:
    def __init__(self, user_service: IUserService):
        self.user_service = user_service

    def has_permission(self, permission: str) -> bool:
        user = self.user_service.get_current_user()
        permissions = self.user_service.get_user_permissions(user.id)
        return permission in permissions

    def authenticate_user(self, token: str) -> Optional[User]:
        # Verify token and return user
        pass

    def authorize_action(self, user: User, action: str, resource: str) -> bool:
        # Check if user can perform action on resource
        pass

๐Ÿ”ง Advanced Configuration

Request-Scoped Services

from injectq import scoped

@scoped
class RequestContext:
    def __init__(self):
        self.request_id = str(uuid.uuid4())
        self.start_time = time.time()
        self.user = None
        self.metadata = {}

    def set_user(self, user: User):
        self.user = user

    def set_metadata(self, key: str, value: Any):
        self.metadata[key] = value

    def get_duration(self) -> float:
        return time.time() - self.start_time

@scoped
class RequestMetrics:
    def __init__(self):
        self.operations = []
        self.errors = []

    def record_operation(self, operation: str, duration: float):
        self.operations.append({
            "operation": operation,
            "duration": duration,
            "timestamp": time.time()
        })

    def record_error(self, error: str):
        self.errors.append({
            "error": error,
            "timestamp": time.time()
        })

# Use in MCP tools
@mcp.tool()
async def complex_document_operation(
    doc_id: int,
    operation: str,
    ctx: RequestContext = InjectQDependency(RequestContext),
    metrics: RequestMetrics = InjectQDependency(RequestMetrics),
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    ctx.set_metadata("operation", operation)
    ctx.set_metadata("doc_id", doc_id)

    try:
        # Perform operation with metrics
        start_time = time.time()

        if operation == "get":
            result = await doc_service.get_document(doc_id)
        elif operation == "update":
            result = await doc_service.update_document(doc_id, "New Title", "New Content")
        else:
            raise ValueError(f"Unknown operation: {operation}")

        duration = time.time() - start_time
        metrics.record_operation(operation, duration)

        return result

    except Exception as e:
        metrics.record_error(str(e))
        raise

Module-Based Setup

from injectq import Module

class DocumentModule(Module):
    def configure(self, binder):
        # Document services
        binder.bind(IDocumentService, DocumentService())
        binder.bind(IUserService, UserService())
        binder.bind(IAuthService, AuthService())

        # Request context services
        binder.bind(RequestContext, RequestContext())
        binder.bind(RequestMetrics, RequestMetrics())

class InfrastructureModule(Module):
    def configure(self, binder):
        # Database and external services
        binder.bind(IDatabaseConnection, PostgresConnection())
        binder.bind(ICacheService, RedisCache())

class AuthModule(Module):
    def configure(self, binder):
        # Authentication services
        binder.bind(ITokenService, JWTTokenService())
        binder.bind(ISessionService, SessionService())

def create_mcp_server() -> FastMCP:
    # Create container with modules
    container = InjectQ()
    container.install(InfrastructureModule())
    container.install(AuthModule())
    container.install(DocumentModule())

    # Create MCP server
    mcp = FastMCP("Document Management Server")

    # Set up integration
    setup_fastmcp_integration(mcp, container)

    return mcp

# Usage
mcp = create_mcp_server()

๐ŸŽจ MCP Tool Patterns

Document Management Tools

@mcp.tool()
async def list_user_documents(
    limit: int = 20,
    offset: int = 0,
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    """List current user's documents."""
    documents = await doc_service.list_user_documents(limit, offset)
    return {
        "documents": [
            {
                "id": doc.id,
                "title": doc.title,
                "created_at": doc.created_at.isoformat()
            }
            for doc in documents
        ],
        "total": len(documents)
    }

@mcp.tool()
async def search_documents(
    query: str,
    category: Optional[str] = None,
    limit: int = 10,
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    """Search documents with advanced filters."""
    results = await doc_service.search_documents(
        query=query,
        category=category,
        limit=limit
    )

    return {
        "query": query,
        "results": [
            {
                "id": result.id,
                "title": result.title,
                "snippet": result.content[:200] + "..." if len(result.content) > 200 else result.content,
                "score": result.score
            }
            for result in results
        ]
    }

@mcp.tool()
async def create_document_from_template(
    template_id: int,
    title: str,
    variables: dict,
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    template_service: ITemplateService = InjectQDependency(ITemplateService)
):
    """Create document from template with variable substitution."""
    # Get template
    template = await template_service.get_template(template_id)

    # Substitute variables
    content = template.content
    for key, value in variables.items():
        content = content.replace(f"{{{{ {key} }}}}", str(value))

    # Create document
    document = await doc_service.create_document(title, content)

    return {
        "document_id": document.id,
        "title": document.title,
        "created_at": document.created_at.isoformat()
    }

User Management Tools

@mcp.tool()
async def get_user_profile(
    user_service: IUserService = InjectQDependency(IUserService)
):
    """Get current user's profile."""
    user = user_service.get_current_user()
    permissions = user_service.get_user_permissions(user.id)

    return {
        "id": user.id,
        "name": user.name,
        "email": user.email,
        "permissions": permissions
    }

@mcp.tool()
async def update_user_preferences(
    preferences: dict,
    user_service: IUserService = InjectQDependency(IUserService),
    preference_service: IPreferenceService = InjectQDependency(IPreferenceService)
):
    """Update user preferences."""
    user = user_service.get_current_user()

    await preference_service.update_preferences(user.id, preferences)

    return {"message": "Preferences updated successfully"}

@mcp.tool()
async def share_document(
    document_id: int,
    target_user_id: int,
    permissions: List[str],
    doc_service: IDocumentService = InjectQDependency(IDocumentService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Share document with another user."""
    # Check if current user owns the document
    document = await doc_service.get_document(document_id)
    current_user = doc_service.user_service.get_current_user()

    if document.user_id != current_user.id:
        raise ValueError("You can only share documents you own")

    # Check permissions
    if not auth_service.authorize_action(current_user, "share", "document"):
        raise ValueError("Insufficient permissions to share documents")

    # Share document
    await doc_service.share_document(document_id, target_user_id, permissions)

    return {"message": f"Document shared with user {target_user_id}"}

Analytics and Reporting Tools

@mcp.tool()
async def get_document_stats(
    document_id: int,
    analytics_service: IAnalyticsService = InjectQDependency(IAnalyticsService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Get analytics for a document."""
    # Check permissions
    if not auth_service.has_permission("view_analytics"):
        raise ValueError("Insufficient permissions")

    stats = await analytics_service.get_document_stats(document_id)

    return {
        "document_id": document_id,
        "views": stats.views,
        "unique_viewers": stats.unique_viewers,
        "last_viewed": stats.last_viewed.isoformat() if stats.last_viewed else None,
        "average_session_duration": stats.average_session_duration
    }

@mcp.tool()
async def generate_user_report(
    user_id: Optional[int] = None,
    date_from: Optional[str] = None,
    date_to: Optional[str] = None,
    report_service: IReportService = InjectQDependency(IReportService),
    auth_service: IAuthService = InjectQDependency(IAuthService)
):
    """Generate user activity report."""
    # Check permissions
    if not auth_service.has_permission("generate_reports"):
        raise ValueError("Insufficient permissions")

    # Default to current user if not specified
    if user_id is None:
        user_service = report_service.container.get(IUserService)
        user_id = user_service.get_current_user().id

    # Parse dates
    from_date = datetime.fromisoformat(date_from) if date_from else None
    to_date = datetime.fromisoformat(date_to) if date_to else None

    # Generate report
    report = await report_service.generate_user_report(
        user_id=user_id,
        from_date=from_date,
        to_date=to_date
    )

    return {
        "user_id": user_id,
        "period": {
            "from": from_date.isoformat() if from_date else None,
            "to": to_date.isoformat() if to_date else None
        },
        "documents_created": report.documents_created,
        "documents_viewed": report.documents_viewed,
        "total_activity_time": report.total_activity_time,
        "most_active_day": report.most_active_day.isoformat() if report.most_active_day else None
    }

๐Ÿงช Testing FastMCP Integration

Unit Testing Tools

import pytest
from injectq.integrations.fastmcp import setup_fastmcp_integration

@pytest.fixture
def test_mcp():
    # Create test container
    container = InjectQ()
    container.bind(IDocumentService, MockDocumentService())
    container.bind(IUserService, MockUserService())

    # Create test MCP server
    mcp = FastMCP("Test Server")
    setup_fastmcp_integration(mcp, container)

    return mcp

def test_search_documents_tool(test_mcp):
    # Define test tool
    @test_mcp.tool()
    async def search_documents(
        query: str,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        results = await doc_service.search_documents(query, user_id=1, limit=5)
        return {"results": results, "count": len(results)}

    # Mock the request context
    with test_mcp.test_context():
        # Execute tool
        result = await test_mcp.call_tool("search_documents", {"query": "test"})

        # Verify result
        assert "results" in result
        assert "count" in result
        assert result["count"] > 0

def test_request_scoping(test_mcp):
    # Define tool with scoped service
    @test_mcp.tool()
    async def scoped_tool(
        data: str,
        ctx: RequestContext = InjectQDependency(RequestContext)
    ):
        ctx.set_metadata("input", data)
        return ctx.metadata

    # Execute multiple requests
    with test_mcp.test_context():
        result1 = await test_mcp.call_tool("scoped_tool", {"data": "test1"})
        result2 = await test_mcp.call_tool("scoped_tool", {"data": "test2"})

    # Each request should have its own context
    assert result1["input"] == "test1"
    assert result2["input"] == "test2"

Integration Testing

@pytest.fixture
def integration_mcp():
    # Real container with test database
    container = InjectQ()
    container.install(TestDatabaseModule())
    container.install(DocumentModule())
    container.install(AuthModule())

    mcp = FastMCP("Integration Test Server")
    setup_fastmcp_integration(mcp, container)

    return mcp

def test_document_creation_integration(integration_mcp):
    # Define integration tool
    @integration_mcp.tool()
    async def create_test_document(
        title: str,
        content: str,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        document = await doc_service.create_document(title, content)
        return {
            "id": document.id,
            "title": document.title,
            "content_length": len(document.content)
        }

    # Execute tool
    with integration_mcp.test_context():
        result = await integration_mcp.call_tool("create_test_document", {
            "title": "Test Document",
            "content": "This is a test document content."
        })

    # Verify result
    assert result["title"] == "Test Document"
    assert result["content_length"] == len("This is a test document content.")
    assert "id" in result

def test_error_handling_integration(integration_mcp):
    # Define tool that might fail
    @integration_mcp.tool()
    async def risky_tool(
        doc_id: int,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        document = await doc_service.get_document(doc_id)
        if not document:
            raise ValueError("Document not found")
        return {"title": document.title}

    # Test successful case
    with integration_mcp.test_context():
        # First create a document
        create_result = await integration_mcp.call_tool("create_test_document", {
            "title": "Test",
            "content": "Content"
        })

        # Then retrieve it
        result = await integration_mcp.call_tool("risky_tool", {
            "doc_id": create_result["id"]
        })

        assert result["title"] == "Test"

    # Test error case
    with integration_mcp.test_context():
        with pytest.raises(ValueError, match="Document not found"):
            await integration_mcp.call_tool("risky_tool", {"doc_id": 99999})

Mock Testing

class MockDocumentService:
    def __init__(self):
        self.documents = {}
        self.next_id = 1

    async def search_documents(self, query: str, user_id: int, limit: int):
        # Simple mock search
        results = [
            doc for doc in self.documents.values()
            if query.lower() in doc.title.lower() or query.lower() in doc.content.lower()
        ]
        return results[:limit]

    async def create_document(self, title: str, content: str):
        doc_id = self.next_id
        self.next_id += 1

        document = Document(
            id=doc_id,
            title=title,
            content=content,
            user_id=1,  # Mock user
            created_at=datetime.now()
        )

        self.documents[doc_id] = document
        return document

    async def get_document(self, doc_id: int):
        return self.documents.get(doc_id)

class MockUserService:
    def __init__(self):
        self.current_user = User(id=1, name="Test User", email="test@example.com")

    def get_current_user(self):
        return self.current_user

    def get_user(self, user_id: int):
        if user_id == 1:
            return self.current_user
        return None

def test_with_mocks():
    container = InjectQ()
    mock_doc = MockDocumentService()
    mock_user = MockUserService()

    container.bind(IDocumentService, mock_doc)
    container.bind(IUserService, mock_user)

    mcp = FastMCP("Mock Test Server")
    setup_fastmcp_integration(mcp, container)

    @mcp.tool()
    async def test_tool(
        title: str,
        doc_service: IDocumentService = InjectQDependency(IDocumentService)
    ):
        doc = await doc_service.create_document(title, "Test content")
        return {"created_id": doc.id, "documents_count": len(mock_doc.documents)}

    # Execute tool
    with mcp.test_context():
        result = await mcp.call_tool("test_tool", {"title": "Mock Test"})

    # Verify mock interactions
    assert result["created_id"] == 1
    assert result["documents_count"] == 1
    assert len(mock_doc.documents) == 1

๐Ÿšจ Common Patterns and Pitfalls

โœ… Good Patterns

1. Proper Request Scoping

# โœ… Good: Use scoped for request-specific data
@scoped
class RequestContext:
    def __init__(self):
        self.request_id = str(uuid.uuid4())
        self.user = None
        self.metadata = {}

# โœ… Good: Use singleton for shared resources
@singleton
class DatabasePool:
    def __init__(self):
        self.pool = create_database_pool()

# โœ… Good: Use transient for stateless operations
@transient
class DataValidator:
    def validate(self, data: dict) -> bool:
        return validate_schema(data)

2. Error Handling

# โœ… Good: Handle tool errors gracefully
@mcp.tool()
async def safe_tool_operation(
    data: dict,
    service: IService = InjectQDependency(IService),
    logger: ILogger = InjectQDependency(ILogger)
):
    try:
        result = await service.process_data(data)
        return result
    except ValidationError as e:
        logger.error(f"Validation failed: {e}")
        return {"error": "Invalid data", "details": str(e)}
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {"error": "Internal server error"}

3. Permission Checking

# โœ… Good: Check permissions before operations
@mcp.tool()
async def secure_document_operation(
    doc_id: int,
    operation: str,
    auth_service: IAuthService = InjectQDependency(IAuthService),
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    # Check permissions first
    if not auth_service.has_permission(f"document.{operation}"):
        raise ValueError(f"Insufficient permissions for {operation}")

    # Perform operation
    if operation == "read":
        return await doc_service.get_document(doc_id)
    elif operation == "update":
        return await doc_service.update_document(doc_id, "New Title", "New Content")
    else:
        raise ValueError(f"Unknown operation: {operation}")

โŒ Bad Patterns

1. Manual Container Access

# โŒ Bad: Manual container access in tools
container = InjectQ()  # Global container

@mcp.tool()
async def manual_tool(user_id: int):
    user_service = container.get(IUserService)  # Manual resolution
    return user_service.get_user(user_id)

# โœ… Good: Use dependency injection
@mcp.tool()
async def injected_tool(
    user_id: int,
    user_service: IUserService = InjectQDependency(IUserService)
):
    return user_service.get_user(user_id)

2. Singleton Abuse

# โŒ Bad: Singleton for request-specific state
@singleton
class RequestState:
    def __init__(self):
        self.current_request_data = None  # Shared across requests!

    def set_request_data(self, data):
        self.current_request_data = data  # Overwrites other requests!

# โŒ Bad: Singleton for mutable request data
@singleton
class RequestMetrics:
    def __init__(self):
        self.request_count = 0  # Accumulates across all requests

    def increment_request_count(self):
        self.request_count += 1  # Not request-specific

# โœ… Good: Scoped for request-specific data
@scoped
class RequestState:
    def __init__(self):
        self.request_data = None

@scoped
class RequestMetrics:
    def __init__(self):
        self.operations = []

3. Heavy Operations in Tools

# โŒ Bad: Heavy initialization per request
@mcp.tool()
async def heavy_tool(data: dict):
    # Load model on every request
    model = await load_ml_model()  # 2GB model!
    result = model.predict(data)
    return result

# โœ… Good: Pre-load heavy resources
@singleton
class MLModelService:
    def __init__(self):
        self.model = None

    async def initialize(self):
        if self.model is None:
            self.model = await load_ml_model()

    async def predict(self, data: dict):
        await self.initialize()
        return self.model.predict(data)

@mcp.tool()
async def light_tool(
    data: dict,
    ml_service: MLModelService = InjectQDependency(MLModelService)
):
    return await ml_service.predict(data)

โšก Advanced Features

Custom MCP Middleware

from injectq.integrations.fastmcp import FastMCPMiddleware

class MetricsMiddleware(FastMCPMiddleware):
    def __init__(self, metrics_service: IMetricsService):
        self.metrics = metrics_service

    async def before_tool_call(self, tool_name, args):
        # Record tool call start
        self.metrics.increment("tool_calls_started")
        self.metrics.increment(f"tool_{tool_name}_calls")

    async def after_tool_call(self, tool_name, args, result, duration):
        # Record tool call completion
        self.metrics.histogram("tool_call_duration", duration)
        self.metrics.increment("tool_calls_completed")

    async def on_tool_error(self, tool_name, args, error):
        # Record tool call failure
        self.metrics.increment("tool_calls_failed")
        self.metrics.increment(f"tool_error_{type(error).__name__}")

# Use custom middleware
setup_fastmcp_integration(
    mcp,
    container,
    middlewares=[MetricsMiddleware(metrics_service)]
)

Tool Result Caching

@scoped
class ToolCache:
    def __init__(self):
        self.cache = {}

    def get(self, key: str):
        return self.cache.get(key)

    def set(self, key: str, value: Any, ttl: int = 300):
        self.cache[key] = {
            "value": value,
            "expires_at": time.time() + ttl
        }

    def is_expired(self, key: str) -> bool:
        if key not in self.cache:
            return True
        return time.time() > self.cache[key]["expires_at"]

@mcp.tool()
async def cached_search_documents(
    query: str,
    cache: ToolCache = InjectQDependency(ToolCache),
    doc_service: IDocumentService = InjectQDependency(IDocumentService)
):
    """Search documents with caching."""
    cache_key = f"search:{query}"

    # Check cache first
    if not cache.is_expired(cache_key):
        return cache.get(cache_key)["value"]

    # Perform search
    results = await doc_service.search_documents(query, user_id=1, limit=10)

    # Cache results
    cache.set(cache_key, results)

    return results

Tool Composition

@mcp.tool()
async def complex_workflow(
    input_data: dict,
    validator: IDataValidator = InjectQDependency(IDataValidator),
    processor: IDataProcessor = InjectQDependency(IDataProcessor),
    formatter: IDataFormatter = InjectQDependency(IDataFormatter)
):
    """Complex workflow combining multiple services."""
    # Step 1: Validate input
    validation_result = validator.validate(input_data)
    if not validation_result.valid:
        return {"error": "Validation failed", "details": validation_result.errors}

    # Step 2: Process data
    processed_data = await processor.process_data(input_data)

    # Step 3: Format output
    formatted_result = formatter.format_data(processed_data)

    return {
        "success": True,
        "original_input": input_data,
        "processed_data": processed_data,
        "formatted_result": formatted_result
    }

@mcp.tool()
async def batch_operation(
    items: List[dict],
    batch_processor: IBatchProcessor = InjectQDependency(IBatchProcessor)
):
    """Process multiple items in batch."""
    # Process items in parallel
    results = await batch_processor.process_batch(items)

    # Group results
    successful = [r for r in results if r.success]
    failed = [r for r in results if not r.success]

    return {
        "total_items": len(items),
        "successful": len(successful),
        "failed": len(failed),
        "results": results
    }

๐ŸŽฏ Summary

FastMCP integration provides:

  • Automatic dependency injection - No manual container management in tools
  • Request-scoped services - Proper isolation per MCP request
  • Type-driven injection - Just add type hints to tool parameters
  • Framework lifecycle integration - Automatic cleanup and resource management
  • Testing support - Easy mocking and test isolation

Key features: - Seamless integration with FastMCP's tool system - Support for all InjectQ scopes (singleton, scoped, transient) - Request-scoped container access - Custom middleware support - Tool result caching - Tool composition patterns

Best practices: - Use scoped services for request-specific data - Use singleton for shared resources and heavy objects - Use transient for stateless operations - Handle errors gracefully in tools - Check permissions before operations - Test thoroughly with mocked dependencies - Avoid manual container access in tools

Congratulations! You've completed the framework integrations section. Ready to explore testing utilities?