Conformité OWASP Top 10¶
Mesures de protection contre les vulnérabilités OWASP.
A01 - Broken Access Control¶
Protections Implémentées¶
# Row-Level Security (RLS)
CREATE POLICY tenant_isolation ON users
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
# RBAC dans FastAPI
@router.delete("/users/{user_id}")
async def delete_user(
user_id: UUID,
current_user: User = Depends(require_role(["admin"]))
):
# Vérifier que l'admin gère ce tenant
if target_user.tenant_id != current_user.tenant_id:
raise HTTPException(403, "Accès interdit")
Tests¶
def test_cannot_access_other_tenant_data():
# User A ne peut pas voir les données de User B
response = client.get(f"/conversations/{user_b_conversation_id}",
headers=user_a_headers)
assert response.status_code == 404
A02 - Cryptographic Failures¶
Protections¶
| Élément | Protection |
|---|---|
| Passwords | bcrypt (cost=12) |
| JWT | HS256 avec clé 256-bit |
| TLS | TLS 1.3 minimum |
| Données au repos | AES-256 |
# Hashing sécurisé
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Jamais de secrets dans le code
SECRET_KEY = os.environ.get("SECRET_KEY") # Via env
A03 - Injection¶
SQL Injection Prevention¶
# ✅ ORM SQLAlchemy (safe)
result = await session.execute(
select(User).where(User.email == email)
)
# ❌ JAMAIS - Raw SQL avec f-string
query = f"SELECT * FROM users WHERE email = '{email}'"
Command Injection Prevention¶
# ✅ Safe - subprocess avec liste
import subprocess
subprocess.run(["ls", "-la", directory], check=True)
# ❌ JAMAIS - shell=True avec input utilisateur
subprocess.run(f"ls -la {directory}", shell=True)
A04 - Insecure Design¶
Security by Design¶
# Validation stricte avec Pydantic
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
@validator("password")
def validate_password(cls, v):
if not re.search(r"[A-Z]", v):
raise ValueError("Doit contenir une majuscule")
return v
A05 - Security Misconfiguration¶
Headers de Sécurité¶
# Middleware sécurité
@app.middleware("http")
async def security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
CORS Restrictif¶
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS, # Liste explicite
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
A06 - Vulnerable Components¶
CVE Scanning¶
# Pre-push hook obligatoire
#!/bin/bash
pip-audit --require-hashes -r requirements.txt
if [ $? -ne 0 ]; then
echo "❌ Vulnérabilités détectées. Corrigez avant de push."
exit 1
fi
Dépendances Auditées¶
A07 - Authentication Failures¶
Protections¶
# Rate limiting sur login
@limiter.limit("5/minute")
@router.post("/login")
async def login(credentials: LoginRequest):
...
# Lockout après échecs
MAX_FAILED_ATTEMPTS = 5
async def check_lockout(email: str):
attempts = await redis.get(f"failed:{email}")
if attempts and int(attempts) >= MAX_FAILED_ATTEMPTS:
raise HTTPException(429, "Compte verrouillé temporairement")
A08 - Software and Data Integrity¶
Audit Logging¶
async def log_audit(
user_id: UUID,
action: str,
resource_type: str,
resource_id: UUID,
status: str,
metadata: dict = None
):
audit = AuditLog(
user_id=user_id,
action=action,
resource_type=resource_type,
resource_id=resource_id,
ip_address=request.client.host,
user_agent=request.headers.get("user-agent"),
status=status,
metadata=metadata or {}
)
session.add(audit)
await session.commit()
A09 - Security Logging and Monitoring¶
Structured Logging¶
import structlog
logger = structlog.get_logger()
# Log des événements de sécurité
logger.warning(
"failed_login_attempt",
email=email,
ip_address=request.client.host,
reason="invalid_password"
)
# Log des accès sensibles
logger.info(
"sensitive_data_access",
user_id=str(user.id),
resource_type="document",
resource_id=str(doc_id)
)
A10 - Server-Side Request Forgery (SSRF)¶
URL Validation¶
from urllib.parse import urlparse
import ipaddress
BLOCKED_HOSTS = ["localhost", "127.0.0.1", "0.0.0.0"]
BLOCKED_SCHEMES = ["file", "ftp", "gopher"]
def validate_url(url: str) -> bool:
parsed = urlparse(url)
# Bloquer schémas dangereux
if parsed.scheme in BLOCKED_SCHEMES:
raise ValueError("Schéma non autorisé")
# Bloquer localhost et IPs privées
if parsed.hostname in BLOCKED_HOSTS:
raise ValueError("Hôte non autorisé")
try:
ip = ipaddress.ip_address(parsed.hostname)
if ip.is_private or ip.is_loopback:
raise ValueError("IP privée non autorisée")
except ValueError:
pass # hostname, pas IP
return True
Checklist Sécurité¶
- RLS activé sur toutes les tables user-data
- JWT avec expiration courte (15 min)
- Rate limiting sur endpoints sensibles
- Headers de sécurité configurés
- CORS restrictif
- CVE scanning dans CI/CD
- Audit logging activé
- Secrets dans variables d'environnement
- TLS 1.3 en production
- Tests de sécurité automatisés