Practical Examples¶
This section provides comprehensive, real-world examples of using InjectQ in various application scenarios and design patterns.
🚀 Web Application Example¶
FastAPI E-commerce Application¶
# main.py - Complete e-commerce application
from fastapi import FastAPI, Depends, HTTPException
from injectq import InjectQ, inject, Module
from typing import List, Optional
import asyncio
# Domain Models
class User:
def __init__(self, id: str, name: str, email: str):
self.id = id
self.name = name
self.email = email
class Product:
def __init__(self, id: str, name: str, price: float, inventory: int):
self.id = id
self.name = name
self.price = price
self.inventory = inventory
class Order:
def __init__(self, id: str, user_id: str, items: List[dict], total: float):
self.id = id
self.user_id = user_id
self.items = items
self.total = total
# Repository Layer
from abc import ABC, abstractmethod
class IUserRepository(ABC):
@abstractmethod
async def find_by_id(self, user_id: str) -> Optional[User]:
pass
@abstractmethod
async def create(self, user: User) -> User:
pass
class IProductRepository(ABC):
@abstractmethod
async def find_by_id(self, product_id: str) -> Optional[Product]:
pass
@abstractmethod
async def update_inventory(self, product_id: str, quantity: int) -> bool:
pass
class IOrderRepository(ABC):
@abstractmethod
async def create(self, order: Order) -> Order:
pass
@abstractmethod
async def find_by_user(self, user_id: str) -> List[Order]:
pass
# Concrete Implementations
class DatabaseUserRepository(IUserRepository):
@inject
async def __init__(self, db_connection: DatabaseConnection):
self.db = db_connection
async def find_by_id(self, user_id: str) -> Optional[User]:
# Simulate database query
await asyncio.sleep(0.01)
return User(user_id, f"User {user_id}", f"user{user_id}@example.com")
async def create(self, user: User) -> User:
# Simulate database insert
await asyncio.sleep(0.01)
return user
class DatabaseProductRepository(IProductRepository):
@inject
async def __init__(self, db_connection: DatabaseConnection, cache: CacheService):
self.db = db_connection
self.cache = cache
async def find_by_id(self, product_id: str) -> Optional[Product]:
# Check cache first
cached = await self.cache.get(f"product:{product_id}")
if cached:
return cached
# Simulate database query
await asyncio.sleep(0.01)
product = Product(product_id, f"Product {product_id}", 99.99, 10)
# Cache the result
await self.cache.set(f"product:{product_id}", product, ttl=300)
return product
async def update_inventory(self, product_id: str, quantity: int) -> bool:
# Simulate inventory update
await asyncio.sleep(0.01)
await self.cache.delete(f"product:{product_id}") # Invalidate cache
return True
class DatabaseOrderRepository(IOrderRepository):
@inject
async def __init__(self, db_connection: DatabaseConnection):
self.db = db_connection
async def create(self, order: Order) -> Order:
# Simulate database insert
await asyncio.sleep(0.01)
return order
async def find_by_user(self, user_id: str) -> List[Order]:
# Simulate database query
await asyncio.sleep(0.01)
return []
# Service Layer
class UserService:
@inject
def __init__(self, user_repo: IUserRepository, email_service: EmailService):
self.user_repo = user_repo
self.email_service = email_service
async def get_user(self, user_id: str) -> Optional[User]:
return await self.user_repo.find_by_id(user_id)
async def create_user(self, user_data: dict) -> User:
user = User(
id=user_data["id"],
name=user_data["name"],
email=user_data["email"]
)
created_user = await self.user_repo.create(user)
# Send welcome email
await self.email_service.send_welcome_email(created_user)
return created_user
class ProductService:
@inject
def __init__(self, product_repo: IProductRepository):
self.product_repo = product_repo
async def get_product(self, product_id: str) -> Optional[Product]:
return await self.product_repo.find_by_id(product_id)
async def check_availability(self, product_id: str, quantity: int) -> bool:
product = await self.product_repo.find_by_id(product_id)
return product and product.inventory >= quantity
class OrderService:
@inject
def __init__(
self,
order_repo: IOrderRepository,
product_service: ProductService,
payment_service: PaymentService,
notification_service: NotificationService
):
self.order_repo = order_repo
self.product_service = product_service
self.payment_service = payment_service
self.notification_service = notification_service
async def create_order(self, user_id: str, items: List[dict]) -> Order:
# Validate inventory
for item in items:
available = await self.product_service.check_availability(
item["product_id"], item["quantity"]
)
if not available:
raise HTTPException(
status_code=400,
detail=f"Product {item['product_id']} not available"
)
# Calculate total
total = 0.0
for item in items:
product = await self.product_service.get_product(item["product_id"])
total += product.price * item["quantity"]
# Process payment
payment_result = await self.payment_service.process_payment(user_id, total)
if not payment_result.success:
raise HTTPException(status_code=400, detail="Payment failed")
# Create order
order = Order(
id=f"order_{user_id}_{len(items)}",
user_id=user_id,
items=items,
total=total
)
created_order = await self.order_repo.create(order)
# Send notifications
await self.notification_service.send_order_confirmation(created_order)
return created_order
# Infrastructure Services
class DatabaseConnection:
@inject
async def __init__(self, config: DatabaseConfig):
self.config = config
await self.connect()
async def connect(self):
# Simulate database connection
await asyncio.sleep(0.1)
class CacheService:
def __init__(self):
self._cache = {}
async def get(self, key: str):
return self._cache.get(key)
async def set(self, key: str, value, ttl: int = 300):
self._cache[key] = value
async def delete(self, key: str):
self._cache.pop(key, None)
class EmailService:
@inject
def __init__(self, email_config: EmailConfig):
self.config = email_config
async def send_welcome_email(self, user: User):
print(f"Sending welcome email to {user.email}")
class PaymentService:
@inject
def __init__(self, payment_config: PaymentConfig):
self.config = payment_config
async def process_payment(self, user_id: str, amount: float):
# Simulate payment processing
await asyncio.sleep(0.1)
return PaymentResult(success=True, transaction_id=f"txn_{user_id}")
class NotificationService:
@inject
def __init__(self, notification_config: NotificationConfig):
self.config = notification_config
async def send_order_confirmation(self, order: Order):
print(f"Order confirmation sent for order {order.id}")
# Configuration Classes
class DatabaseConfig:
def __init__(self, host: str, port: int, database: str):
self.host = host
self.port = port
self.database = database
class EmailConfig:
def __init__(self, smtp_host: str, smtp_port: int):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
class PaymentConfig:
def __init__(self, api_key: str, endpoint: str):
self.api_key = api_key
self.endpoint = endpoint
class NotificationConfig:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
class PaymentResult:
def __init__(self, success: bool, transaction_id: str):
self.success = success
self.transaction_id = transaction_id
# Application Module
class ECommerceModule(Module):
def configure(self):
# Configuration
self.bind(DatabaseConfig, DatabaseConfig(
host="localhost",
port=5432,
database="ecommerce"
)).singleton()
self.bind(EmailConfig, EmailConfig(
smtp_host="smtp.example.com",
smtp_port=587
)).singleton()
self.bind(PaymentConfig, PaymentConfig(
api_key="test_key",
endpoint="https://api.payment.com"
)).singleton()
self.bind(NotificationConfig, NotificationConfig(
webhook_url="https://notifications.example.com"
)).singleton()
# Infrastructure
self.bind(DatabaseConnection, DatabaseConnection).singleton()
self.bind(CacheService, CacheService).singleton()
self.bind(EmailService, EmailService).singleton()
self.bind(PaymentService, PaymentService).singleton()
self.bind(NotificationService, NotificationService).singleton()
# Repositories
self.bind(IUserRepository, DatabaseUserRepository).scoped()
self.bind(IProductRepository, DatabaseProductRepository).scoped()
self.bind(IOrderRepository, DatabaseOrderRepository).scoped()
# Services
self.bind(UserService, UserService).scoped()
self.bind(ProductService, ProductService).scoped()
self.bind(OrderService, OrderService).scoped()
# FastAPI Application
app = FastAPI(title="E-commerce API")
# Setup DI container
container = InjectQ()
container.install(ECommerceModule())
# Dependency provider for FastAPI
def get_container():
return container
# API Endpoints
@app.post("/users")
async def create_user(
user_data: dict,
container: InjectQ = Depends(get_container)
):
user_service = container.get(UserService)
user = await user_service.create_user(user_data)
return {"id": user.id, "name": user.name, "email": user.email}
@app.get("/users/{user_id}")
async def get_user(
user_id: str,
container: InjectQ = Depends(get_container)
):
user_service = container.get(UserService)
user = await user_service.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {"id": user.id, "name": user.name, "email": user.email}
@app.get("/products/{product_id}")
async def get_product(
product_id: str,
container: InjectQ = Depends(get_container)
):
product_service = container.get(ProductService)
product = await product_service.get_product(product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return {
"id": product.id,
"name": product.name,
"price": product.price,
"inventory": product.inventory
}
@app.post("/orders")
async def create_order(
order_data: dict,
container: InjectQ = Depends(get_container)
):
order_service = container.get(OrderService)
order = await order_service.create_order(
order_data["user_id"],
order_data["items"]
)
return {
"id": order.id,
"user_id": order.user_id,
"items": order.items,
"total": order.total
}
# Application startup
@app.on_event("startup")
async def startup_event():
print("E-commerce API started successfully!")
# Run with: uvicorn main:app --reload
🧮 Scientific Computing Example¶
Data Processing Pipeline¶
# data_pipeline.py - Scientific data processing pipeline
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from injectq import InjectQ, inject, Module
import asyncio
from typing import List, Dict, Any
# Data Models
class Dataset:
def __init__(self, name: str, data: pd.DataFrame):
self.name = name
self.data = data
self.metadata = {}
class ProcessingResult:
def __init__(self, dataset_name: str, results: Dict[str, Any]):
self.dataset_name = dataset_name
self.results = results
self.timestamp = pd.Timestamp.now()
# Data Source Interfaces
class IDataSource(ABC):
@abstractmethod
async def load_data(self, source_id: str) -> Dataset:
pass
class IDataSink(ABC):
@abstractmethod
async def save_results(self, results: ProcessingResult) -> bool:
pass
# Data Processing Interfaces
class IDataProcessor(ABC):
@abstractmethod
async def process(self, dataset: Dataset) -> ProcessingResult:
pass
class IDataValidator(ABC):
@abstractmethod
async def validate(self, dataset: Dataset) -> bool:
pass
# Concrete Implementations
class CSVDataSource(IDataSource):
@inject
def __init__(self, config: DataSourceConfig):
self.config = config
async def load_data(self, source_id: str) -> Dataset:
# Simulate loading CSV data
await asyncio.sleep(0.1)
# Generate sample data
np.random.seed(42)
data = pd.DataFrame({
'timestamp': pd.date_range('2023-01-01', periods=1000, freq='H'),
'temperature': np.random.normal(20, 5, 1000),
'humidity': np.random.normal(60, 10, 1000),
'pressure': np.random.normal(1013, 20, 1000)
})
return Dataset(name=f"dataset_{source_id}", data=data)
class DatabaseDataSink(IDataSink):
@inject
def __init__(self, db_connection: DatabaseConnection):
self.db = db_connection
async def save_results(self, results: ProcessingResult) -> bool:
# Simulate saving to database
await asyncio.sleep(0.05)
print(f"Saved results for {results.dataset_name} to database")
return True
class StatisticalProcessor(IDataProcessor):
@inject
def __init__(self, stats_config: StatisticsConfig):
self.config = stats_config
async def process(self, dataset: Dataset) -> ProcessingResult:
# Perform statistical analysis
data = dataset.data
results = {
'mean_temperature': data['temperature'].mean(),
'std_temperature': data['temperature'].std(),
'mean_humidity': data['humidity'].mean(),
'std_humidity': data['humidity'].std(),
'correlation_temp_humidity': data['temperature'].corr(data['humidity']),
'total_records': len(data)
}
return ProcessingResult(dataset.name, results)
class AnomalyDetectionProcessor(IDataProcessor):
@inject
def __init__(self, anomaly_config: AnomalyConfig):
self.config = anomaly_config
async def process(self, dataset: Dataset) -> ProcessingResult:
# Detect anomalies using Z-score
data = dataset.data
z_scores = np.abs((data['temperature'] - data['temperature'].mean()) / data['temperature'].std())
anomalies = data[z_scores > self.config.threshold]
results = {
'anomaly_count': len(anomalies),
'anomaly_percentage': (len(anomalies) / len(data)) * 100,
'max_z_score': z_scores.max(),
'anomaly_timestamps': anomalies['timestamp'].tolist()
}
return ProcessingResult(dataset.name, results)
class DataQualityValidator(IDataValidator):
@inject
def __init__(self, quality_config: QualityConfig):
self.config = quality_config
async def validate(self, dataset: Dataset) -> bool:
data = dataset.data
# Check for missing values
missing_percentage = (data.isnull().sum().sum() / (len(data) * len(data.columns))) * 100
if missing_percentage > self.config.max_missing_percentage:
return False
# Check for outliers
for column in ['temperature', 'humidity', 'pressure']:
z_scores = np.abs((data[column] - data[column].mean()) / data[column].std())
outlier_percentage = (len(data[z_scores > 3]) / len(data)) * 100
if outlier_percentage > self.config.max_outlier_percentage:
return False
return True
# Data Pipeline Orchestrator
class DataPipeline:
@inject
def __init__(
self,
data_source: IDataSource,
data_sink: IDataSink,
processors: List[IDataProcessor],
validator: IDataValidator,
logger: LoggingService
):
self.data_source = data_source
self.data_sink = data_sink
self.processors = processors
self.validator = validator
self.logger = logger
async def process_dataset(self, source_id: str) -> List[ProcessingResult]:
"""Process a single dataset through the entire pipeline."""
try:
# Load data
self.logger.info(f"Loading dataset {source_id}")
dataset = await self.data_source.load_data(source_id)
# Validate data quality
self.logger.info(f"Validating dataset {dataset.name}")
is_valid = await self.validator.validate(dataset)
if not is_valid:
self.logger.error(f"Dataset {dataset.name} failed validation")
return []
# Process data with all processors
results = []
for processor in self.processors:
self.logger.info(f"Processing {dataset.name} with {processor.__class__.__name__}")
result = await processor.process(dataset)
results.append(result)
# Save results
await self.data_sink.save_results(result)
self.logger.info(f"Pipeline completed for dataset {dataset.name}")
return results
except Exception as e:
self.logger.error(f"Pipeline failed for dataset {source_id}: {str(e)}")
raise
async def process_multiple_datasets(self, source_ids: List[str]) -> Dict[str, List[ProcessingResult]]:
"""Process multiple datasets concurrently."""
tasks = [self.process_dataset(source_id) for source_id in source_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
source_id: result if not isinstance(result, Exception) else []
for source_id, result in zip(source_ids, results)
}
# Configuration Classes
class DataSourceConfig:
def __init__(self, base_path: str, file_format: str = "csv"):
self.base_path = base_path
self.file_format = file_format
class StatisticsConfig:
def __init__(self, confidence_level: float = 0.95):
self.confidence_level = confidence_level
class AnomalyConfig:
def __init__(self, threshold: float = 3.0):
self.threshold = threshold
class QualityConfig:
def __init__(self, max_missing_percentage: float = 5.0, max_outlier_percentage: float = 1.0):
self.max_missing_percentage = max_missing_percentage
self.max_outlier_percentage = max_outlier_percentage
# Logging Service
class LoggingService:
def __init__(self):
import logging
self.logger = logging.getLogger("DataPipeline")
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def info(self, message: str):
self.logger.info(message)
def error(self, message: str):
self.logger.error(message)
def warning(self, message: str):
self.logger.warning(message)
# Data Processing Module
class DataProcessingModule(Module):
def configure(self):
# Configuration
self.bind(DataSourceConfig, DataSourceConfig(
base_path="/data/input",
file_format="csv"
)).singleton()
self.bind(StatisticsConfig, StatisticsConfig(
confidence_level=0.95
)).singleton()
self.bind(AnomalyConfig, AnomalyConfig(
threshold=3.0
)).singleton()
self.bind(QualityConfig, QualityConfig(
max_missing_percentage=5.0,
max_outlier_percentage=1.0
)).singleton()
# Infrastructure
self.bind(DatabaseConnection, DatabaseConnection).singleton()
self.bind(LoggingService, LoggingService).singleton()
# Data Sources and Sinks
self.bind(IDataSource, CSVDataSource).singleton()
self.bind(IDataSink, DatabaseDataSink).singleton()
# Validators
self.bind(IDataValidator, DataQualityValidator).singleton()
# Processors (multiple implementations)
self.bind(IDataProcessor, StatisticalProcessor, name="stats").singleton()
self.bind(IDataProcessor, AnomalyDetectionProcessor, name="anomaly").singleton()
# Pipeline
self.bind(List[IDataProcessor], lambda: [
self.container.get(IDataProcessor, name="stats"),
self.container.get(IDataProcessor, name="anomaly")
]).singleton()
self.bind(DataPipeline, DataPipeline).singleton()
# Example Usage
async def main():
# Setup container
container = InjectQ()
container.install(DataProcessingModule())
# Get pipeline
pipeline = container.get(DataPipeline)
# Process multiple datasets
source_ids = ["weather_station_1", "weather_station_2", "weather_station_3"]
results = await pipeline.process_multiple_datasets(source_ids)
# Display results
for source_id, dataset_results in results.items():
print(f"\nResults for {source_id}:")
for result in dataset_results:
print(f" Processor: {result.dataset_name}")
for key, value in result.results.items():
print(f" {key}: {value}")
if __name__ == "__main__":
asyncio.run(main())
🤖 Microservices Example¶
Event-Driven Microservice¶
# microservice.py - Event-driven microservice with InjectQ
from injectq import InjectQ, inject, Module
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Callable
import asyncio
import json
from dataclasses import dataclass, asdict
from datetime import datetime
# Event System
@dataclass
class Event:
event_type: str
event_id: str
timestamp: datetime
data: Dict
source: str
class IEventBus(ABC):
@abstractmethod
async def publish(self, event: Event) -> bool:
pass
@abstractmethod
async def subscribe(self, event_type: str, handler: Callable) -> bool:
pass
class IEventStore(ABC):
@abstractmethod
async def store_event(self, event: Event) -> bool:
pass
@abstractmethod
async def get_events(self, event_type: str, limit: int = 100) -> List[Event]:
pass
# Command and Query Separation
@dataclass
class Command:
command_type: str
command_id: str
data: Dict
timestamp: datetime
@dataclass
class Query:
query_type: str
query_id: str
parameters: Dict
timestamp: datetime
class ICommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> Dict:
pass
class IQueryHandler(ABC):
@abstractmethod
async def handle(self, query: Query) -> Dict:
pass
# Domain Models
@dataclass
class Order:
id: str
customer_id: str
items: List[Dict]
total_amount: float
status: str
created_at: datetime
@dataclass
class Customer:
id: str
name: str
email: str
created_at: datetime
# Repositories
class IOrderRepository(ABC):
@abstractmethod
async def save(self, order: Order) -> Order:
pass
@abstractmethod
async def find_by_id(self, order_id: str) -> Optional[Order]:
pass
@abstractmethod
async def find_by_customer(self, customer_id: str) -> List[Order]:
pass
class ICustomerRepository(ABC):
@abstractmethod
async def save(self, customer: Customer) -> Customer:
pass
@abstractmethod
async def find_by_id(self, customer_id: str) -> Optional[Customer]:
pass
# Event Bus Implementation
class InMemoryEventBus(IEventBus):
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
async def publish(self, event: Event) -> bool:
handlers = self.subscribers.get(event.event_type, [])
for handler in handlers:
try:
await handler(event)
except Exception as e:
print(f"Error handling event {event.event_id}: {e}")
return True
async def subscribe(self, event_type: str, handler: Callable) -> bool:
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
return True
class DatabaseEventStore(IEventStore):
@inject
def __init__(self, db_connection: DatabaseConnection):
self.db = db_connection
self.events = [] # In-memory for demo
async def store_event(self, event: Event) -> bool:
# Simulate database storage
self.events.append(event)
return True
async def get_events(self, event_type: str, limit: int = 100) -> List[Event]:
# Filter events by type
filtered_events = [e for e in self.events if e.event_type == event_type]
return filtered_events[:limit]
# Repository Implementations
class DatabaseOrderRepository(IOrderRepository):
@inject
def __init__(self, db_connection: DatabaseConnection, event_bus: IEventBus):
self.db = db_connection
self.event_bus = event_bus
self.orders = {} # In-memory for demo
async def save(self, order: Order) -> Order:
self.orders[order.id] = order
# Publish event
event = Event(
event_type="order.created" if order.id not in self.orders else "order.updated",
event_id=f"event_{order.id}_{datetime.now().timestamp()}",
timestamp=datetime.now(),
data=asdict(order),
source="order_service"
)
await self.event_bus.publish(event)
return order
async def find_by_id(self, order_id: str) -> Optional[Order]:
return self.orders.get(order_id)
async def find_by_customer(self, customer_id: str) -> List[Order]:
return [order for order in self.orders.values() if order.customer_id == customer_id]
class DatabaseCustomerRepository(ICustomerRepository):
@inject
def __init__(self, db_connection: DatabaseConnection, event_bus: IEventBus):
self.db = db_connection
self.event_bus = event_bus
self.customers = {} # In-memory for demo
async def save(self, customer: Customer) -> Customer:
self.customers[customer.id] = customer
# Publish event
event = Event(
event_type="customer.created" if customer.id not in self.customers else "customer.updated",
event_id=f"event_{customer.id}_{datetime.now().timestamp()}",
timestamp=datetime.now(),
data=asdict(customer),
source="customer_service"
)
await self.event_bus.publish(event)
return customer
async def find_by_id(self, customer_id: str) -> Optional[Customer]:
return self.customers.get(customer_id)
# Command Handlers
class CreateOrderCommandHandler(ICommandHandler):
@inject
def __init__(
self,
order_repo: IOrderRepository,
customer_repo: ICustomerRepository,
event_store: IEventStore
):
self.order_repo = order_repo
self.customer_repo = customer_repo
self.event_store = event_store
async def handle(self, command: Command) -> Dict:
if command.command_type != "create_order":
raise ValueError(f"Unsupported command type: {command.command_type}")
data = command.data
# Validate customer exists
customer = await self.customer_repo.find_by_id(data["customer_id"])
if not customer:
return {"success": False, "error": "Customer not found"}
# Create order
order = Order(
id=data["order_id"],
customer_id=data["customer_id"],
items=data["items"],
total_amount=data["total_amount"],
status="created",
created_at=datetime.now()
)
# Save order
saved_order = await self.order_repo.save(order)
# Store command as event
command_event = Event(
event_type="command.executed",
event_id=command.command_id,
timestamp=command.timestamp,
data={"command": asdict(command), "result": asdict(saved_order)},
source="order_service"
)
await self.event_store.store_event(command_event)
return {"success": True, "order_id": saved_order.id}
class CreateCustomerCommandHandler(ICommandHandler):
@inject
def __init__(self, customer_repo: ICustomerRepository, event_store: IEventStore):
self.customer_repo = customer_repo
self.event_store = event_store
async def handle(self, command: Command) -> Dict:
if command.command_type != "create_customer":
raise ValueError(f"Unsupported command type: {command.command_type}")
data = command.data
# Create customer
customer = Customer(
id=data["customer_id"],
name=data["name"],
email=data["email"],
created_at=datetime.now()
)
# Save customer
saved_customer = await self.customer_repo.save(customer)
# Store command as event
command_event = Event(
event_type="command.executed",
event_id=command.command_id,
timestamp=command.timestamp,
data={"command": asdict(command), "result": asdict(saved_customer)},
source="customer_service"
)
await self.event_store.store_event(command_event)
return {"success": True, "customer_id": saved_customer.id}
# Query Handlers
class GetOrderQueryHandler(IQueryHandler):
@inject
def __init__(self, order_repo: IOrderRepository):
self.order_repo = order_repo
async def handle(self, query: Query) -> Dict:
if query.query_type != "get_order":
raise ValueError(f"Unsupported query type: {query.query_type}")
order_id = query.parameters["order_id"]
order = await self.order_repo.find_by_id(order_id)
if order:
return {"success": True, "order": asdict(order)}
else:
return {"success": False, "error": "Order not found"}
class GetCustomerOrdersQueryHandler(IQueryHandler):
@inject
def __init__(self, order_repo: IOrderRepository):
self.order_repo = order_repo
async def handle(self, query: Query) -> Dict:
if query.query_type != "get_customer_orders":
raise ValueError(f"Unsupported query type: {query.query_type}")
customer_id = query.parameters["customer_id"]
orders = await self.order_repo.find_by_customer(customer_id)
return {
"success": True,
"orders": [asdict(order) for order in orders]
}
# Event Handlers
class OrderEventHandler:
@inject
def __init__(self, notification_service: NotificationService):
self.notification_service = notification_service
async def handle_order_created(self, event: Event):
order_data = event.data
print(f"Order created: {order_data['id']}")
# Send notification
await self.notification_service.send_order_confirmation(order_data)
async def handle_order_updated(self, event: Event):
order_data = event.data
print(f"Order updated: {order_data['id']}")
class CustomerEventHandler:
@inject
def __init__(self, email_service: EmailService):
self.email_service = email_service
async def handle_customer_created(self, event: Event):
customer_data = event.data
print(f"Customer created: {customer_data['id']}")
# Send welcome email
await self.email_service.send_welcome_email(customer_data)
# Application Services
class CommandDispatcher:
@inject
def __init__(self):
self.handlers: Dict[str, ICommandHandler] = {}
def register_handler(self, command_type: str, handler: ICommandHandler):
self.handlers[command_type] = handler
async def dispatch(self, command: Command) -> Dict:
handler = self.handlers.get(command.command_type)
if not handler:
raise ValueError(f"No handler for command type: {command.command_type}")
return await handler.handle(command)
class QueryDispatcher:
@inject
def __init__(self):
self.handlers: Dict[str, IQueryHandler] = {}
def register_handler(self, query_type: str, handler: IQueryHandler):
self.handlers[query_type] = handler
async def dispatch(self, query: Query) -> Dict:
handler = self.handlers.get(query.query_type)
if not handler:
raise ValueError(f"No handler for query type: {query.query_type}")
return await handler.handle(query)
# Infrastructure Services
class NotificationService:
def __init__(self):
pass
async def send_order_confirmation(self, order_data: Dict):
print(f"Sending order confirmation for order {order_data['id']}")
class EmailService:
def __init__(self):
pass
async def send_welcome_email(self, customer_data: Dict):
print(f"Sending welcome email to {customer_data['email']}")
# Microservice Application
class MicroserviceApplication:
@inject
def __init__(
self,
event_bus: IEventBus,
command_dispatcher: CommandDispatcher,
query_dispatcher: QueryDispatcher,
order_event_handler: OrderEventHandler,
customer_event_handler: CustomerEventHandler
):
self.event_bus = event_bus
self.command_dispatcher = command_dispatcher
self.query_dispatcher = query_dispatcher
self.order_event_handler = order_event_handler
self.customer_event_handler = customer_event_handler
async def initialize(self):
"""Initialize the microservice."""
# Register event handlers
await self.event_bus.subscribe("order.created", self.order_event_handler.handle_order_created)
await self.event_bus.subscribe("order.updated", self.order_event_handler.handle_order_updated)
await self.event_bus.subscribe("customer.created", self.customer_event_handler.handle_customer_created)
print("Microservice initialized successfully")
async def process_command(self, command_data: Dict) -> Dict:
"""Process a command."""
command = Command(
command_type=command_data["command_type"],
command_id=command_data["command_id"],
data=command_data["data"],
timestamp=datetime.now()
)
return await self.command_dispatcher.dispatch(command)
async def process_query(self, query_data: Dict) -> Dict:
"""Process a query."""
query = Query(
query_type=query_data["query_type"],
query_id=query_data["query_id"],
parameters=query_data["parameters"],
timestamp=datetime.now()
)
return await self.query_dispatcher.dispatch(query)
# Module Configuration
class MicroserviceModule(Module):
def configure(self):
# Infrastructure
self.bind(DatabaseConnection, DatabaseConnection).singleton()
self.bind(IEventBus, InMemoryEventBus).singleton()
self.bind(IEventStore, DatabaseEventStore).singleton()
# Repositories
self.bind(IOrderRepository, DatabaseOrderRepository).singleton()
self.bind(ICustomerRepository, DatabaseCustomerRepository).singleton()
# Command Handlers
self.bind(ICommandHandler, CreateOrderCommandHandler, name="create_order").singleton()
self.bind(ICommandHandler, CreateCustomerCommandHandler, name="create_customer").singleton()
# Query Handlers
self.bind(IQueryHandler, GetOrderQueryHandler, name="get_order").singleton()
self.bind(IQueryHandler, GetCustomerOrdersQueryHandler, name="get_customer_orders").singleton()
# Dispatchers
self.bind(CommandDispatcher, self.create_command_dispatcher).singleton()
self.bind(QueryDispatcher, self.create_query_dispatcher).singleton()
# Event Handlers
self.bind(OrderEventHandler, OrderEventHandler).singleton()
self.bind(CustomerEventHandler, CustomerEventHandler).singleton()
# Services
self.bind(NotificationService, NotificationService).singleton()
self.bind(EmailService, EmailService).singleton()
# Application
self.bind(MicroserviceApplication, MicroserviceApplication).singleton()
def create_command_dispatcher(self) -> CommandDispatcher:
dispatcher = CommandDispatcher()
dispatcher.register_handler("create_order", self.container.get(ICommandHandler, name="create_order"))
dispatcher.register_handler("create_customer", self.container.get(ICommandHandler, name="create_customer"))
return dispatcher
def create_query_dispatcher(self) -> QueryDispatcher:
dispatcher = QueryDispatcher()
dispatcher.register_handler("get_order", self.container.get(IQueryHandler, name="get_order"))
dispatcher.register_handler("get_customer_orders", self.container.get(IQueryHandler, name="get_customer_orders"))
return dispatcher
# Example Usage
async def main():
# Setup container
container = InjectQ()
container.install(MicroserviceModule())
# Get application
app = container.get(MicroserviceApplication)
await app.initialize()
# Create customer
customer_command = {
"command_type": "create_customer",
"command_id": "cmd_001",
"data": {
"customer_id": "cust_001",
"name": "John Doe",
"email": "john@example.com"
}
}
result = await app.process_command(customer_command)
print(f"Customer creation result: {result}")
# Create order
order_command = {
"command_type": "create_order",
"command_id": "cmd_002",
"data": {
"order_id": "order_001",
"customer_id": "cust_001",
"items": [{"product_id": "prod_001", "quantity": 2, "price": 50.0}],
"total_amount": 100.0
}
}
result = await app.process_command(order_command)
print(f"Order creation result: {result}")
# Query order
order_query = {
"query_type": "get_order",
"query_id": "query_001",
"parameters": {"order_id": "order_001"}
}
result = await app.process_query(order_query)
print(f"Order query result: {result}")
# Query customer orders
customer_orders_query = {
"query_type": "get_customer_orders",
"query_id": "query_002",
"parameters": {"customer_id": "cust_001"}
}
result = await app.process_query(customer_orders_query)
print(f"Customer orders query result: {result}")
if __name__ == "__main__":
asyncio.run(main())
This examples section provides comprehensive, real-world examples that demonstrate:
- Web Application Example: Complete FastAPI e-commerce application showing layered architecture with repositories, services, and proper dependency injection
- Scientific Computing Example: Data processing pipeline with multiple processors, validators, and async processing capabilities
- Microservices Example: Event-driven microservice with CQRS pattern, event sourcing, and proper separation of concerns
Each example shows: - Proper interface usage and abstraction - Module configuration and dependency binding - Async/await patterns - Error handling and validation - Testing considerations - Real-world architectural patterns
Ready to continue with more examples or move to the next section?