Security Best Practices¶
This guide covers essential security considerations when using InjectQ in production applications, including dependency validation, secure configuration management, and protection against common vulnerabilities.
🔒 Dependency Validation and Security¶
Secure Dependency Registration¶
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type
from injectq import InjectQ, Module, inject
import hashlib
import secrets
from dataclasses import dataclass
class ISecurityValidator(ABC):
"""Interface for dependency security validation."""
@abstractmethod
def validate_dependency(self, dependency_type: Type, instance: Any) -> bool:
pass
@abstractmethod
def validate_configuration(self, config: Dict[str, Any]) -> bool:
pass
class DependencySecurityValidator(ISecurityValidator):
"""Validates dependencies for security compliance."""
def __init__(self):
self.allowed_types: set = set()
self.forbidden_patterns: List[str] = [
"eval", "exec", "compile", "__import__"
]
def validate_dependency(self, dependency_type: Type, instance: Any) -> bool:
"""Validate that a dependency is safe to use."""
# Check if type is explicitly allowed
if dependency_type in self.allowed_types:
return True
# Check for dangerous patterns in type name
type_name = dependency_type.__name__.lower()
module_name = getattr(dependency_type, "__module__", "").lower()
for pattern in self.forbidden_patterns:
if pattern in type_name or pattern in module_name:
return False
# Validate instance attributes
if hasattr(instance, "__dict__"):
for attr_name in instance.__dict__:
if any(pattern in attr_name.lower() for pattern in self.forbidden_patterns):
return False
return True
def validate_configuration(self, config: Dict[str, Any]) -> bool:
"""Validate configuration for security issues."""
dangerous_keys = ["password", "secret", "key", "token"]
for key, value in config.items():
# Check for credentials in plain text
if any(dangerous in key.lower() for dangerous in dangerous_keys):
if isinstance(value, str) and len(value) > 0:
# Should not be plain text
if not self._is_encrypted_or_env_var(value):
return False
return True
def _is_encrypted_or_env_var(self, value: str) -> bool:
"""Check if value is encrypted or environment variable reference."""
return (
value.startswith("${") and value.endswith("}") or # Environment variable
value.startswith("enc:") or # Encrypted value
len(value) > 32 and all(c in "0123456789abcdef" for c in value.lower()) # Hash
)
class SecureModule(Module):
"""Base module with security validation."""
def __init__(self):
super().__init__()
self.security_validator = DependencySecurityValidator()
def configure(self):
"""Configure module with security checks."""
self._configure_dependencies()
self._validate_security()
def _configure_dependencies(self):
"""Override in subclasses."""
pass
def _validate_security(self):
"""Validate all configured dependencies."""
# This would be called after dependency configuration
pass
def bind(self, interface: Type, implementation: Type, **kwargs):
"""Secure binding with validation."""
# Validate the implementation type
if hasattr(implementation, "__init__"):
temp_instance = object.__new__(implementation)
if not self.security_validator.validate_dependency(implementation, temp_instance):
raise SecurityError(f"Dependency {implementation} failed security validation")
return super().bind(interface, implementation, **kwargs)
class SecurityError(Exception):
"""Raised when security validation fails."""
pass
Input Validation and Sanitization¶
import re
from typing import Any, Dict, Union
from html import escape
import bleach
class InputValidator:
"""Validates and sanitizes user inputs."""
def __init__(self):
self.sql_injection_patterns = [
r"(\'|(\')+|(;)+|(\-\-)+|(\s)+(\-\-)+)",
r"((\%27)|(\'))((\%6F)|o|(\%4F))((\%72)|r|(\%52))",
r"((\%27)|(\'))union",
r"exec(\s|\+)+(s|x)p\w+",
r"select.*from",
r"insert.*into",
r"update.*set",
r"delete.*from",
r"drop.*table",
r"create.*table"
]
self.xss_patterns = [
r"<script\b[^<]*(?:(?!<\/script>)<[^<]*)*<\/script>",
r"javascript:",
r"vbscript:",
r"onload=",
r"onerror=",
r"onclick="
]
def validate_sql_injection(self, value: str) -> bool:
"""Check for SQL injection patterns."""
if not isinstance(value, str):
return True
value_lower = value.lower()
for pattern in self.sql_injection_patterns:
if re.search(pattern, value_lower, re.IGNORECASE):
return False
return True
def validate_xss(self, value: str) -> bool:
"""Check for XSS patterns."""
if not isinstance(value, str):
return True
for pattern in self.xss_patterns:
if re.search(pattern, value, re.IGNORECASE):
return False
return True
def sanitize_html(self, value: str) -> str:
"""Sanitize HTML content."""
if not isinstance(value, str):
return str(value)
# Allow only safe HTML tags
allowed_tags = ['p', 'br', 'strong', 'em', 'u', 'ol', 'ul', 'li']
allowed_attributes = {}
return bleach.clean(value, tags=allowed_tags, attributes=allowed_attributes)
def sanitize_input(self, value: Any) -> Any:
"""General input sanitization."""
if isinstance(value, str):
# Basic sanitization
value = escape(value) # HTML escape
value = value.strip() # Remove whitespace
# Validate for common attacks
if not self.validate_sql_injection(value):
raise SecurityError("Potential SQL injection detected")
if not self.validate_xss(value):
raise SecurityError("Potential XSS attack detected")
return value
elif isinstance(value, dict):
return {k: self.sanitize_input(v) for k, v in value.items()}
elif isinstance(value, list):
return [self.sanitize_input(item) for item in value]
return value
class SecureDataService:
"""Service that handles data securely."""
@inject
def __init__(
self,
database: Database,
input_validator: InputValidator,
audit_logger: AuditLogger
):
self.database = database
self.validator = input_validator
self.audit_logger = audit_logger
async def create_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create user with input validation."""
# Log the operation
self.audit_logger.log_operation("create_user", user_data.get("email", "unknown"))
try:
# Validate and sanitize inputs
sanitized_data = self.validator.sanitize_input(user_data)
# Additional business validation
if not self._validate_user_data(sanitized_data):
raise ValueError("Invalid user data")
# Create user in database
user = await self.database.create_user(sanitized_data)
# Remove sensitive data from response
safe_user = self._remove_sensitive_fields(user)
return {"success": True, "user": safe_user}
except Exception as e:
self.audit_logger.log_error("create_user_failed", str(e))
raise
def _validate_user_data(self, data: Dict[str, Any]) -> bool:
"""Validate user data according to business rules."""
required_fields = ["email", "username"]
for field in required_fields:
if field not in data or not data[field]:
return False
# Email validation
email = data["email"]
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
if not re.match(email_pattern, email):
return False
return True
def _remove_sensitive_fields(self, user: Dict[str, Any]) -> Dict[str, Any]:
"""Remove sensitive fields from user data."""
sensitive_fields = ["password", "password_hash", "salt", "api_key"]
return {k: v for k, v in user.items() if k not in sensitive_fields}
🔐 Configuration Security¶
Secure Configuration Management¶
import os
import base64
from cryptography.fernet import Fernet
from typing import Dict, Any, Optional
class SecureConfigManager:
"""Manages application configuration securely."""
def __init__(self, encryption_key: Optional[bytes] = None):
if encryption_key is None:
# Get from environment or generate
key_str = os.getenv("CONFIG_ENCRYPTION_KEY")
if key_str:
self.encryption_key = base64.urlsafe_b64decode(key_str.encode())
else:
self.encryption_key = Fernet.generate_key()
else:
self.encryption_key = encryption_key
self.cipher = Fernet(self.encryption_key)
self._config_cache: Dict[str, Any] = {}
def encrypt_value(self, value: str) -> str:
"""Encrypt a configuration value."""
encrypted = self.cipher.encrypt(value.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt_value(self, encrypted_value: str) -> str:
"""Decrypt a configuration value."""
try:
encrypted_bytes = base64.urlsafe_b64decode(encrypted_value.encode())
decrypted = self.cipher.decrypt(encrypted_bytes)
return decrypted.decode()
except Exception:
raise SecurityError("Failed to decrypt configuration value")
def get_config_value(self, key: str, default: Any = None) -> Any:
"""Get configuration value with decryption if needed."""
# Check cache first
if key in self._config_cache:
return self._config_cache[key]
# Get from environment
env_value = os.getenv(key, default)
if isinstance(env_value, str):
# Check if value is encrypted
if env_value.startswith("enc:"):
encrypted_value = env_value[4:] # Remove "enc:" prefix
decrypted_value = self.decrypt_value(encrypted_value)
self._config_cache[key] = decrypted_value
return decrypted_value
else:
self._config_cache[key] = env_value
return env_value
return env_value
def validate_configuration(self) -> bool:
"""Validate that all required secure configurations are present."""
required_configs = [
"DATABASE_PASSWORD",
"JWT_SECRET_KEY",
"API_SECRET_KEY"
]
for config in required_configs:
value = self.get_config_value(config)
if not value:
return False
# Check if sensitive values are encrypted or strong enough
if not self._is_secure_value(value):
return False
return True
def _is_secure_value(self, value: str) -> bool:
"""Check if a value meets security requirements."""
if len(value) < 12: # Minimum length
return False
# Should have complexity (letters, numbers, symbols)
has_lower = any(c.islower() for c in value)
has_upper = any(c.isupper() for c in value)
has_digit = any(c.isdigit() for c in value)
has_symbol = any(not c.isalnum() for c in value)
return sum([has_lower, has_upper, has_digit, has_symbol]) >= 3
@dataclass
class DatabaseConfig:
host: str
port: int
database: str
username: str
password: str
ssl_mode: str = "require"
def __post_init__(self):
# Validate that password is not empty
if not self.password:
raise SecurityError("Database password cannot be empty")
@dataclass
class JWTConfig:
secret_key: str
algorithm: str = "HS256"
expiration_hours: int = 24
def __post_init__(self):
if len(self.secret_key) < 32:
raise SecurityError("JWT secret key must be at least 32 characters")
class SecureConfigModule(Module):
"""Module that provides secure configuration."""
def configure(self):
config_manager = SecureConfigManager()
# Validate configuration
if not config_manager.validate_configuration():
raise SecurityError("Configuration validation failed")
# Database configuration
db_config = DatabaseConfig(
host=config_manager.get_config_value("DATABASE_HOST", "localhost"),
port=int(config_manager.get_config_value("DATABASE_PORT", "5432")),
database=config_manager.get_config_value("DATABASE_NAME", "app"),
username=config_manager.get_config_value("DATABASE_USER", "app"),
password=config_manager.get_config_value("DATABASE_PASSWORD"),
ssl_mode=config_manager.get_config_value("DATABASE_SSL_MODE", "require")
)
# JWT configuration
jwt_config = JWTConfig(
secret_key=config_manager.get_config_value("JWT_SECRET_KEY"),
algorithm=config_manager.get_config_value("JWT_ALGORITHM", "HS256"),
expiration_hours=int(config_manager.get_config_value("JWT_EXPIRATION_HOURS", "24"))
)
# Bind configurations
self.bind(SecureConfigManager, config_manager).singleton()
self.bind(DatabaseConfig, db_config).singleton()
self.bind(JWTConfig, jwt_config).singleton()
# Input validation
self.bind(InputValidator, InputValidator).singleton()
🛡️ Authentication and Authorization¶
Secure Authentication Service¶
import jwt
import bcrypt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
class AuthenticationService:
"""Handles user authentication securely."""
@inject
def __init__(
self,
jwt_config: JWTConfig,
user_repository: IUserRepository,
audit_logger: AuditLogger
):
self.jwt_config = jwt_config
self.user_repository = user_repository
self.audit_logger = audit_logger
self.failed_attempts: Dict[str, int] = {}
self.lockout_threshold = 5
self.lockout_duration = timedelta(minutes=15)
async def authenticate_user(
self,
username: str,
password: str,
client_ip: str
) -> Optional[Dict[str, Any]]:
"""Authenticate user with rate limiting and audit logging."""
# Check for account lockout
if self._is_account_locked(username):
self.audit_logger.log_security_event(
"authentication_blocked_lockout",
username,
client_ip
)
raise SecurityError("Account temporarily locked due to multiple failed attempts")
try:
# Find user
user = await self.user_repository.find_by_username(username)
if not user:
self._record_failed_attempt(username)
self.audit_logger.log_security_event(
"authentication_failed_user_not_found",
username,
client_ip
)
return None
# Verify password
if not self._verify_password(password, user.password_hash):
self._record_failed_attempt(username)
self.audit_logger.log_security_event(
"authentication_failed_invalid_password",
username,
client_ip
)
return None
# Check if account is active
if not user.is_active:
self.audit_logger.log_security_event(
"authentication_failed_inactive_account",
username,
client_ip
)
return None
# Reset failed attempts on successful login
self._reset_failed_attempts(username)
# Generate JWT token
token = self._generate_jwt_token(user)
# Log successful authentication
self.audit_logger.log_security_event(
"authentication_successful",
username,
client_ip
)
return {
"token": token,
"user_id": user.id,
"username": user.username,
"expires_at": datetime.utcnow() + timedelta(hours=self.jwt_config.expiration_hours)
}
except Exception as e:
self.audit_logger.log_error("authentication_error", str(e))
raise
def _verify_password(self, password: str, password_hash: str) -> bool:
"""Verify password using secure hashing."""
try:
return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8'))
except Exception:
return False
def _generate_jwt_token(self, user: Any) -> str:
"""Generate JWT token for authenticated user."""
payload = {
"user_id": user.id,
"username": user.username,
"email": user.email,
"roles": user.roles,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=self.jwt_config.expiration_hours)
}
return jwt.encode(
payload,
self.jwt_config.secret_key,
algorithm=self.jwt_config.algorithm
)
def verify_jwt_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify JWT token and return payload."""
try:
payload = jwt.decode(
token,
self.jwt_config.secret_key,
algorithms=[self.jwt_config.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
raise SecurityError("Token has expired")
except jwt.InvalidTokenError:
raise SecurityError("Invalid token")
def _is_account_locked(self, username: str) -> bool:
"""Check if account is locked due to failed attempts."""
if username not in self.failed_attempts:
return False
attempts, last_attempt = self.failed_attempts[username]
if attempts >= self.lockout_threshold:
if datetime.now() - last_attempt < self.lockout_duration:
return True
else:
# Lockout period expired, reset attempts
self._reset_failed_attempts(username)
return False
def _record_failed_attempt(self, username: str):
"""Record a failed authentication attempt."""
if username not in self.failed_attempts:
self.failed_attempts[username] = [1, datetime.now()]
else:
attempts, _ = self.failed_attempts[username]
self.failed_attempts[username] = [attempts + 1, datetime.now()]
def _reset_failed_attempts(self, username: str):
"""Reset failed attempts for a user."""
self.failed_attempts.pop(username, None)
class AuthorizationService:
"""Handles user authorization and permissions."""
@inject
def __init__(self, auth_service: AuthenticationService):
self.auth_service = auth_service
def check_permission(self, token: str, required_permission: str) -> bool:
"""Check if user has required permission."""
try:
payload = self.auth_service.verify_jwt_token(token)
user_roles = payload.get("roles", [])
# Check if user has required permission
return self._has_permission(user_roles, required_permission)
except SecurityError:
return False
def require_permission(self, required_permission: str):
"""Decorator to require specific permission."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Extract token from request context (implementation depends on framework)
token = self._extract_token_from_context()
if not self.check_permission(token, required_permission):
raise SecurityError(f"Permission denied: {required_permission} required")
return await func(*args, **kwargs)
return wrapper
return decorator
def _has_permission(self, user_roles: List[str], required_permission: str) -> bool:
"""Check if user roles include required permission."""
# Define role permissions (in production, this would be in a database)
role_permissions = {
"admin": ["*"], # Admin has all permissions
"user": ["read_profile", "update_profile"],
"moderator": ["read_profile", "update_profile", "moderate_content"],
"editor": ["read_profile", "update_profile", "create_content", "edit_content"]
}
for role in user_roles:
permissions = role_permissions.get(role, [])
if "*" in permissions or required_permission in permissions:
return True
return False
def _extract_token_from_context(self) -> Optional[str]:
"""Extract JWT token from request context."""
# Implementation depends on web framework
# This is a placeholder
return None
🚨 Audit Logging and Monitoring¶
Security Audit System¶
import json
import uuid
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional
class SecurityEventType(Enum):
AUTHENTICATION_SUCCESS = "authentication_success"
AUTHENTICATION_FAILURE = "authentication_failure"
AUTHORIZATION_FAILURE = "authorization_failure"
SUSPICIOUS_ACTIVITY = "suspicious_activity"
DATA_ACCESS = "data_access"
CONFIGURATION_CHANGE = "configuration_change"
SECURITY_VIOLATION = "security_violation"
@dataclass
class SecurityEvent:
event_id: str
event_type: SecurityEventType
timestamp: datetime
user_id: Optional[str]
username: Optional[str]
client_ip: str
user_agent: Optional[str]
resource: Optional[str]
action: str
result: str
details: Dict[str, Any]
risk_score: int # 1-10, where 10 is highest risk
class AuditLogger:
"""Comprehensive security audit logging."""
@inject
def __init__(self, log_storage: ILogStorage, alert_service: IAlertService):
self.log_storage = log_storage
self.alert_service = alert_service
self.high_risk_threshold = 7
def log_security_event(
self,
event_type: str,
username: Optional[str] = None,
client_ip: str = "unknown",
user_agent: Optional[str] = None,
resource: Optional[str] = None,
action: str = "",
result: str = "",
details: Optional[Dict[str, Any]] = None,
risk_score: int = 5
):
"""Log a security event."""
event = SecurityEvent(
event_id=str(uuid.uuid4()),
event_type=SecurityEventType(event_type),
timestamp=datetime.utcnow(),
user_id=None, # Would be extracted from context
username=username,
client_ip=client_ip,
user_agent=user_agent,
resource=resource,
action=action,
result=result,
details=details or {},
risk_score=risk_score
)
# Store the event
self._store_event(event)
# Check if high-risk event requires immediate alerting
if risk_score >= self.high_risk_threshold:
self._send_security_alert(event)
# Check for suspicious patterns
self._analyze_suspicious_patterns(event)
def log_operation(self, operation: str, target: str, **kwargs):
"""Log a general operation for audit purposes."""
self.log_security_event(
event_type="data_access",
action=operation,
resource=target,
details=kwargs,
risk_score=3
)
def log_error(self, operation: str, error_message: str, **kwargs):
"""Log an error for security analysis."""
self.log_security_event(
event_type="security_violation",
action=operation,
result="error",
details={"error": error_message, **kwargs},
risk_score=6
)
def _store_event(self, event: SecurityEvent):
"""Store security event in persistent storage."""
event_data = {
"event_id": event.event_id,
"event_type": event.event_type.value,
"timestamp": event.timestamp.isoformat(),
"user_id": event.user_id,
"username": event.username,
"client_ip": event.client_ip,
"user_agent": event.user_agent,
"resource": event.resource,
"action": event.action,
"result": event.result,
"details": event.details,
"risk_score": event.risk_score
}
self.log_storage.store_event(event_data)
def _send_security_alert(self, event: SecurityEvent):
"""Send immediate alert for high-risk events."""
alert_message = {
"alert_type": "security_event",
"severity": "high",
"event_id": event.event_id,
"event_type": event.event_type.value,
"timestamp": event.timestamp.isoformat(),
"username": event.username,
"client_ip": event.client_ip,
"risk_score": event.risk_score,
"details": event.details
}
self.alert_service.send_alert(alert_message)
def _analyze_suspicious_patterns(self, event: SecurityEvent):
"""Analyze event for suspicious patterns."""
# Check for brute force attacks
if event.event_type == SecurityEventType.AUTHENTICATION_FAILURE:
recent_failures = self._get_recent_failures(event.client_ip, minutes=10)
if len(recent_failures) >= 5:
self._send_security_alert(SecurityEvent(
event_id=str(uuid.uuid4()),
event_type=SecurityEventType.SUSPICIOUS_ACTIVITY,
timestamp=datetime.utcnow(),
user_id=None,
username=event.username,
client_ip=event.client_ip,
user_agent=event.user_agent,
resource=None,
action="brute_force_detected",
result="blocked",
details={"failure_count": len(recent_failures)},
risk_score=9
))
# Check for unusual access patterns
if event.event_type == SecurityEventType.DATA_ACCESS:
if self._is_unusual_access_pattern(event):
event.risk_score = min(event.risk_score + 3, 10)
def _get_recent_failures(self, client_ip: str, minutes: int) -> List[Dict]:
"""Get recent authentication failures from this IP."""
# This would query the log storage
return []
def _is_unusual_access_pattern(self, event: SecurityEvent) -> bool:
"""Check if access pattern is unusual."""
# Implement pattern analysis logic
return False
class ILogStorage(ABC):
@abstractmethod
def store_event(self, event_data: Dict[str, Any]):
pass
class IAlertService(ABC):
@abstractmethod
def send_alert(self, alert_data: Dict[str, Any]):
pass
class DatabaseLogStorage(ILogStorage):
@inject
def __init__(self, database: Database):
self.database = database
def store_event(self, event_data: Dict[str, Any]):
"""Store security event in database."""
# Insert into security_events table
pass
class EmailAlertService(IAlertService):
@inject
def __init__(self, email_service: EmailService, config: AlertConfig):
self.email_service = email_service
self.config = config
def send_alert(self, alert_data: Dict[str, Any]):
"""Send security alert via email."""
subject = f"Security Alert: {alert_data['event_type']}"
body = json.dumps(alert_data, indent=2)
self.email_service.send_email(
to=self.config.security_team_email,
subject=subject,
body=body
)
🛠️ Security Testing¶
Security Test Suite¶
import pytest
from unittest.mock import Mock, patch
from injectq import InjectQ
class SecurityTestSuite:
"""Comprehensive security test suite."""
def __init__(self, container: InjectQ):
self.container = container
def test_input_validation(self):
"""Test input validation against common attacks."""
validator = self.container.get(InputValidator)
# SQL injection tests
sql_payloads = [
"'; DROP TABLE users; --",
"1' OR '1'='1",
"admin'--",
"1; DELETE FROM users WHERE 1=1; --"
]
for payload in sql_payloads:
assert not validator.validate_sql_injection(payload), f"SQL injection not detected: {payload}"
# XSS tests
xss_payloads = [
"<script>alert('xss')</script>",
"javascript:alert('xss')",
"<img src=x onerror=alert('xss')>",
"<svg onload=alert('xss')>"
]
for payload in xss_payloads:
assert not validator.validate_xss(payload), f"XSS not detected: {payload}"
def test_authentication_security(self):
"""Test authentication security measures."""
auth_service = self.container.get(AuthenticationService)
# Test rate limiting
for i in range(6): # Exceed lockout threshold
try:
auth_service.authenticate_user("testuser", "wrongpassword", "127.0.0.1")
except SecurityError:
pass
# Next attempt should be blocked
with pytest.raises(SecurityError, match="Account temporarily locked"):
auth_service.authenticate_user("testuser", "wrongpassword", "127.0.0.1")
def test_configuration_security(self):
"""Test configuration security."""
config_manager = self.container.get(SecureConfigManager)
# Test that sensitive values are properly encrypted
assert config_manager.validate_configuration()
# Test encryption/decryption
test_value = "sensitive_password_123"
encrypted = config_manager.encrypt_value(test_value)
decrypted = config_manager.decrypt_value(encrypted)
assert test_value == decrypted
assert encrypted != test_value
def test_audit_logging(self):
"""Test audit logging functionality."""
audit_logger = self.container.get(AuditLogger)
with patch.object(audit_logger.log_storage, 'store_event') as mock_store:
audit_logger.log_security_event(
"authentication_failure",
username="testuser",
client_ip="127.0.0.1",
risk_score=8
)
# Verify event was stored
mock_store.assert_called_once()
# Verify high-risk alert was sent
with patch.object(audit_logger.alert_service, 'send_alert') as mock_alert:
audit_logger.log_security_event(
"suspicious_activity",
risk_score=9
)
mock_alert.assert_called_once()
# Security test fixtures
@pytest.fixture
def secure_container():
"""Create container with security modules."""
container = InjectQ()
container.install(SecureConfigModule())
container.install(SecurityModule())
return container
class SecurityModule(Module):
def configure(self):
# Security services
self.bind(InputValidator, InputValidator).singleton()
self.bind(AuthenticationService, AuthenticationService).singleton()
self.bind(AuthorizationService, AuthorizationService).singleton()
self.bind(AuditLogger, AuditLogger).singleton()
# Mock implementations for testing
self.bind(ILogStorage, Mock()).singleton()
self.bind(IAlertService, Mock()).singleton()
self.bind(IUserRepository, Mock()).singleton()
# Usage
def test_security_suite(secure_container):
"""Run complete security test suite."""
test_suite = SecurityTestSuite(secure_container)
test_suite.test_input_validation()
test_suite.test_authentication_security()
test_suite.test_configuration_security()
test_suite.test_audit_logging()
🔒 Security Checklist¶
Pre-Production Security Review¶
- Input Validation
- All user inputs are validated and sanitized
- SQL injection protection is implemented
- XSS protection is in place
-
File upload validation is secure
-
Authentication & Authorization
- Strong password policies are enforced
- Account lockout mechanisms are implemented
- JWT tokens use secure algorithms and keys
- Session management is secure
-
Role-based access control is properly implemented
-
Configuration Security
- Sensitive configuration is encrypted
- No credentials are hardcoded
- Environment variables are used for secrets
-
Configuration validation is implemented
-
Data Protection
- Personal data is encrypted at rest
- Data transmission uses TLS/SSL
- Database connections are encrypted
-
Sensitive data is not logged
-
Audit & Monitoring
- Security events are logged
- Failed authentication attempts are tracked
- Suspicious activities trigger alerts
-
Audit logs are tamper-proof
-
Dependency Security
- Dependencies are validated for security
- Dependency injection is secure
- Third-party libraries are up to date
-
Vulnerable dependencies are avoided
-
Infrastructure Security
- Network security is properly configured
- Database access is restricted
- API endpoints are secured
- Error messages don't leak sensitive information
Following these security best practices ensures your InjectQ applications are protected against common vulnerabilities and maintain high security standards in production environments.